Wprowadzenie
Artykuł jest kontynuacją wpisu, który możesz znaleźć klikając tutaj. W tej częsci skupiamy się na niezywkle przydatnym pluginie jakim jest rabbitmq_management. Przeprowadzimy również testy pozwalające sprawdzić co dzieje się w momencie, gdy nasz odbiorca uległ "awarii". W ostatnim kroku dodamy więcej niż jednego odbiorcę i odpowiednio skonfigurujemy ruch.
Plugin do zarządzania jest zawarty w naszym pakiecie instalacyjnym RabbitMQ. Zanim jednak zaczniemy go używać musimy dokonać jego włączenia. Możemy tego dokonać za pomocą polecenia:
rabbitmq-plugins enable rabbitmq_management
Po aktywacji plugin’a restart całego serwera nie jest konieczny.
Powyższe, oraz kolejne polecenia, wykonujemy z poziomu linii poleceń uruchamianej dla RabbitMQ:
Najprostszy sposób na dostęp do interfejsu użytkownika to wpisanie adresu: http://localhost:15672/. Jest to domyślny adres dla naszej instalacji. Jeżeli proces instalacji przebiegl pomyślnie powinniście zobaczyć poniższy ekran:
Tutaj zapewne pojawia się pytanie: jak zdefiniować naszego użytkownika? Musimy skorzystać z poniższego polecenia:
rabbitmqctl add_user nazwa_uzytkownika haslo
Powyższe polecenie jest jasne. W kolejnym kroku nadamy użytkownikowi odpowiednie uprawnienia:
rabbitmqctl set_user_tags pawel administrator
oraz przejdziemy do procesu logowania. Jeżeli wszystko przebiegło pomyślnie uzyskacie dostęp do panelu zarządzania (dashboard
):
"Awaria" odbiorcy
W poprzedniej części artykułu pisałem o funkcjonalnościach, które dostarcza nam RabbitMQ. Jedną z nich jest gwarancja dostarczenia wiadomości "mimo wszystko". Zrobimy prosty test. Zmodyfikujemy nieznaczenie naszą aplikację wysyłającą wiadomości (pozwoli nam na wielokrotność nadania wiadomości) ale w tym samym czasie nasz odbiorca będzie wyłączony. Sprawdzimy czy wiadomości zostaną dostarczone do kolejki a po "restracie" naszego odbiory dojdzie do otrzymania wszystkich wiadomości.
Poniżej prosta modyfikacja kodu pozwalająca na wielokrotne wysyłanie wiadomości:
using RabbitMQ.Client;
using System;
using System.Text;
namespace ConsoleSender
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Witajcie w aplikacji, która wysyła wiadomości!");
Console.WriteLine("\n--------------------\n");
Console.WriteLine("Wciśnij ESC, aby zakończyć działanie aplikacji");
Console.WriteLine("\n--------------------\n");
var factory = new ConnectionFactory() { HostName = "localhost" };
// otwarcie połączenia
using (var connection = factory.CreateConnection())
{
// utworzenie kanału komunikacji
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "msgKey",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
do
{
Console.WriteLine("Wprowadz wiadomość do wysłania: ");
string msg = Console.ReadLine();
var msgBody = Encoding.UTF8.GetBytes(msg);
channel.BasicPublish(exchange: "",
routingKey: "msgKey",
basicProperties: null,
body: msgBody);
Console.WriteLine($" [x] wysłano {msgBody}");
} while (Console.ReadKey(true).Key != ConsoleKey.Escape);
}
}
Console.WriteLine("Wciśnij [Enter], aby wyłączyć aplikację");
Console.ReadLine();
}
}
}
Uruchamiamy teraz naszą aplikację konsolową wysyłając kilka wiadomości:
W między czasie uruchamiamy nasz dashboard celem sprawdzenia ruchu.
Możemy zaobserwować, iż 4 wiadomości zostały nadane i dodane do kolejki:
Przychodzi teraz czas na sprawdzenie czy odbiorca otrzyma każdą z tych wiadomości. Włączamy naszą drugą aplikację konsolową celem sprawdzenia:
Wiadomości zostały dotarczone. Dodatkowo możemy zauważyć, że wiadomości zostały zdjęte z kolejki co świadczy o poprawnym działaniu systemu:
Wielu odbiorców
Pod tym pojęciem kryje się zastosowanie w realnym świecie. Tworzymy kilku odbiorców, przychodzące wiadomości dodajemy do odpowiednich kolejek i odpowiednio je przetwarzamy. Najprostyszym sposobem jest utworzenie drugiej aplikacji konsolowej, która będzie odbierała wiadomości. Możemy używać tego samego kodu. Po uruchomieniu obu aplikacji odbiorców i sprawdzeniu naszego dashboard’u możemy zaobserwować, że mamy aktywnych dwóch konsumentów naszych wiadomości:
Load balancer automatycznie zarządza przekazywaniem wiadomości do różnych odbiorców:
To jednak nie jest to co chcieliśmy osiągnać. Wyobraźcie sobie typ operacji w której chcemy zrobić różne rzeczy z tą samą wiadomością. Otrzymana wiadomość musi zostać niezależnie przetworzona przez dwóch różnych odbiorców. W naszym przypadku powinniśmy powiązać osobną kolejkę dla każdego z obiorców. Dzięki temu mogą oni odbierać wiadomości całkowicie niezależnie.
To jest idealny moment do powrotu do wspomnianego w poprzednim artykule exchange. Tzw. wymiana nie jest skomplikowana. Odbiera ona wiadomości od dostawców oraz przekazuje je do kolejek. Exchange musi dokładnie wiedzieć co zrobić z wiadomością, którą otrzymuje, tzn. czy powinna być dodana do konkretnej kolejki a może dołączona do wielu kolejek? Z drugiej strony może zostać również odrzucona. Te zasady określane są przez rodzaj wymiany.
Dostępnych jest kilka typów wymiany:
- direct
- topic
- headers
- fanout
My skupimy się na ostatnim typie, tj. funout. Polskie tłumaczenie będzie wymowne (rozchodzić się) – to dokładnie to czego potrzebujemy. Przekazanie wiadomości do wszystkich znanych kolejek.
Nasz kod wysyłający wiadomości nie będzie znacznie się różnił od poprzedniego artykułu. Musimy mieć jednak na uwadze punkty wspomniane powyżej:
using RabbitMQ.Client;
using System;
using System.Text;
namespace ConsoleSenderExchange
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Witajcie w aplikacji, która wysyła wiadomości!");
Console.WriteLine("\n--------------------\n");
Console.WriteLine("Wciśnij ESC, aby zakończyć działanie aplikacji");
Console.WriteLine("\n--------------------\n");
var factory = new ConnectionFactory() { HostName = "localhost" };
// otwarcie połączenia
using (var connection = factory.CreateConnection())
{
// utworzenie kanału komunikacji
using (var channel = connection.CreateModel())
{
// tworzymy 'exchange', ktorego nazwę defniujemy jako cars
channel.ExchangeDeclare(exchange: "cars", type: "fanout");
do
{
Console.WriteLine("Wprowadz wiadomość do wysłania: ");
string msg = Console.ReadLine();
var msgBody = Encoding.UTF8.GetBytes(msg);
// publikacja do 'exchange' - nie do kolejki jak w poprzednim przykładzie
channel.BasicPublish(exchange: "cars",
routingKey: "",
basicProperties: null,
body: msgBody);
Console.WriteLine($" [x] wysłano {msgBody}");
} while (Console.ReadKey(true).Key != ConsoleKey.Escape);
}
}
Console.WriteLine("Wciśnij [Enter], aby wyłączyć aplikację");
Console.ReadLine();
}
}
}
W powyższym kodzie możecie zobaczyć, że po nawiązaniu połączenia doszło do deklaracji exchange. Ten krok jest niezbędny ponieważ publikacja do nieistniejącego exchange jest niedozwolona.
Musimy jeszcze nieco zmodyfikować stronę odbiorcy:
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace ConsoleReceiverExchange
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Witajcie w aplikacji, która odbiera wiadomości!");
var factory = new ConnectionFactory() { HostName = "localhost" };
// otwarcie połączenia
using (var connection = factory.CreateConnection())
// utworzenie kanału komunikacji
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "cars", type: "fanout");
// tworzenie nietrwałej, wyłącznej, automatycznej kolejki z wygenerowaną nazwą
var queueName = channel.QueueDeclare().QueueName;
// exchange i queue zostały utwrzone. Musimy teraz powiadomić 'exchange', aby wysyłała wiadomości do naszej kolejki
// jest to proste wiązanie pomiędzy 'exchange' a 'queue'
channel.QueueBind(queue: queueName,
exchange: "cars",
routingKey: "");
Console.WriteLine(" [x] Oczekiwanie na wiadomości!");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($" [x] otrzymano {message}");
};
channel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);
Console.WriteLine("Wciśnij [Enter], aby wyłączyć aplikację");
Console.ReadLine();
}
}
}
}
Pamiętajcie, żeby zdublować projekt odbiorcy ale zmodyfikować nieco wiadomość wyświtlaną na konsoli tak, aby mieć pewność, że jedna wiadomość została "przetworzona" na dwa różne sposoby – na tym na zależało, aby odwzorować zachowanie mikroserwisu w realnym świecie.
Po uruchomieniu naszych odbiorców warto również sprawdzić definicję kolejek.
Zależy nam, żeby po zamknięciu naszych aplikacji zostały automatycznie usunięte z listy.
Na poniższym przykładzie możecie zobaczyć, że wiadomości zostały "przetworzone" przez dwóch niezależnych odbiorców w odmienny sposób:
Co równie istotne, po wyłączeniu naszych odbiorców, definicje kolejek (nietrwałe) nie są już dłużej używane: