Paweł Łukasiewicz
2022-02-15
Paweł Łukasiewicz
2022-02-15
Wprowadzenie
Główna idea kryjąca się pod pojęciem event sourcingu polega na zapewnieniu, że każda zmiana stanu aplikacji jest przechwytywana w obiekcie zdarzenia a te obiekty przechowywane są w kolejności w jakiej wystąpiły w aplikacji. Mówiąc prościej: event sourcing to ‘dziennik zdarzeń’ do których doszło w naszej aplikacji. Rejestrowanie tych zmian może być przydatne, jeżeli chcemy odzyskać informacje dotyczące zmian jakie zaszły na naszych obiektach.
W tym wpisie przygotujemy prostą aplikację, która będzie symulowała zachowania dziejące się pod spodem dashboard’a Azure DevOps. Przygotujemy proste API, które będzie pozwalało nam zarządzać Tech Item’em. Lista operacji, które będziemy obsługiwać i rejestrować:
- [POST] api/techItem/create
- [PUT] api/techItem/{id}/assign
- [PUT] api/techItem/{id}/move
- [PUT] api/techItem/{id}/complete
Każda z powyższych metod kontrolera będzie dodawała informacje o wystąpieniu danego zdarzenia:
- TechItemCreated - zdarzenie, które zostanie utworzone, gdy dojdzie do utworzenia nowego zadania;
- TechItemAssigned - zdarzenie, które zostanie utworzone, gdy zadanie zostanie przypisane do danej osoby;
-
TechItemMoved - zdarzenie, które zostanie utworzone, gdy zadanie zostanie przeniesione do innej kolumny, np. Ready to Development -> Active;
- TechItemCompleted - zdarzenie, które zostanie utworzone, gdy dojdzie do zakończenia pracy nad danym zadaniem.
EventStore i konfiguracja projektu
W pierwszym kroku musimy skonfigurować EventStore - jest to rodzaj bazy danych zoptymalizowanej do przechowywania zdarzeń. Zdarzenia przechowywane są w postaci niezmiennych strumieni zdarzeń.
W celu lokalnego postawienia EventStora użyjemy Dockera:
docker run -d --name eventstore -p 2113:2113 -p 1113:1113 eventstore/eventstore
Po uruchomieniu Event Stora możemy przejść do panelu zarządzania dostępnego pod adresem: http://localhost:2113/ - domyślna nazwa użytkownika to admin a hasło changeit:
Kolejny krok to utworzenie projektu ASP.NET Core Web API oraz zainstalowanie poniższej paczki:
Następnie musimy zdefiniować połączenie do Event Stora w pliku appsettings.json:
{
"EventStore": {
"ConnectionString": "ConnectTo=tcp://admin:changeit@localhost:1113; DefaultUserCredentials=admin:changeit;",
"ConnectionName": "Task"
},
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft": "Warning",
"Microsoft.Hosting.Lifetime": "Information"
}
},
"AllowedHosts": "*"
}
Kolejny krok to konfiguracja projektu i podłączenie Event Stora przy wykorzystaniu Dependency Injection:
// Wymagana paczka: using EventStore.ClientAPI;
public void ConfigureServices(IServiceCollection services)
{
var eventStoreConnection = EventStoreConnection.Create(
connectionString: Configuration.GetValue<string>("EventStore:ConnectionString"),
builder: ConnectionSettings.Create().KeepReconnecting(),
connectionName: Configuration.GetValue<string>("EventStore:ConnectionName"));
eventStoreConnection.ConnectAsync().GetAwaiter().GetResult();
services.AddSingleton(eventStoreConnection);
services.AddControllers();
services.AddSwaggerGen(c =>
{
c.SwaggerDoc("v1", new OpenApiInfo { Title = "EventStoreExample", Version = "v1" });
});
}
Agregaty
Agregat jest wzorcem w Domain Driven Design. Agregat DDD jest zbiorem obiektów domeny, które mogą być traktowane jako pojedyncza jednostka. Przykładem może być zamówienie i jego pozycje liniowe – będą to oddzielne obiekty ale całe zamówienie (wraz z jego pozycjami liniowymi) należy traktować jako pojedynczy agregat.
Jeden z obiektów agregata będzie traktowany jako korzeń (root). Wszelkie odwołania spoza agregata powinny iść tylko do korzenia agregata. Korzeń może w ten sposób zapewnić integralność agregatu jako całości.
Agregaty są podstawowym elementem przenoszenia danych – użytkownik żąda wczytania lub zapisania całych agregatów. Transakcje nie powinny przekraczać granic agregatów.
Agregaty DDD często są mylone z klasami kolekcji (listy, mapy, itp.). Agregaty są pojęciami domenowymi, podczas gdy kolekcje są ogólne. Agregat często będzie zawierał wiele kolekcji wraz z prostymi polami. Termin "agregat" jest powszechny i często używany w różnych kontekstach, np. UML w którym to przypadku nie odnosi się do tego samego pojęcia co agregat w Domain Driven Design.
Zanim przejdziemy dalej spójrzmy jeszcze na prosty przykład definicji agregata:
Wracamy do projektu:
Dodajemy dwa nowe foldery o nazwach Core oraz Infrastructure. W pierwszym z nich znajdą się encje, zdarzenia oraz wyjątki z nimi związane. W drugim z nich dodamy klasę repozytorium, która będzie łączyła się z Event Storem.
W folderze Core możemy utworzyć nowy folder pod nazwą Framework do którego dodamy klasę Aggregate.cs:
using System;
using System.Collections.Generic;
using System.Linq;
namespace EventStoreExample.Core.Framework
{
/// <summary>
/// Standardowa klasa abstrakcyjna na bazie której będą tworzone Agregaty.
/// W naszym przypadku TechItem będzie Agregatem
/// </summary>
public abstract class Aggregate
{
// Defincja listy w której będziemy przechowywać agregaty
readonly IList<object> _changes = new List<object>();
// Agregat będzie skadał się z unikalnego identyfikatora
public Guid Id { get; protected set; } = Guid.Empty;
// Agregat będzie wersjonowany
public long Version { get; private set; } = -1;
protected abstract void When(object @event);
// Metoda, która doda event do naszej kolekcji
// @ - znak pozwala używać zarezerwowanych nazw parametrów
public void Apply(object @event)
{
When(@event);
_changes.Add(@event);
}
// Metoda, która będzie dodawała zdarzenia do agregatu
// Ostateczna wersja agregatu będzie tworzona przez uruchomienie tej metody dla
// każdego zdarzenia odczytanego z Event Stora
public void Load(long version, IEnumerable<object> history)
{
Version = version;
foreach (var item in history)
{
When(item);
}
}
// Metoda, która zwraca zdarzenia, które wystąpiły na danym agregacie
// Metoda ta będzie uruchamiana w trakcie wysyłania zdarzeń do Event Stora
// Eventy będą odbierane i dodawane do naszej kolekcji
public object[] GetChanges() => _changes.ToArray();
}
}
Repozytorium
Pora na dodanie repozytorium do folderu infrastruktury. Klasa ta pozwoli nam na wysyłanie zdarzeń do Event Stora oraz ich odbieranie kiedy będzie to koniecznie. Metoda składa się z dwóch prostych metod: Save oraz Load używając przygotowanego wcześniej połączenia z Event Storem:
using EventStore.ClientAPI;
using EventStoreExample.Core.Framework;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
namespace EventStoreExample.Infrastructure
{
public class AggregateRepository
{
private readonly IEventStoreConnection _eventStore;
public AggregateRepository(IEventStoreConnection eventStore)
{
_eventStore = eventStore;
}
// Zapisywanie eventów
public async Task SaveAsync<T>(T aggregate) where T : Aggregate, new ()
{
// Pobieramy zdarzenie danego agregatu i dokonujemy mapowania do klasy EventData
// Wszelkie zdarzenia w EventStore przechowywane są w typie EventData
// Parametr 1: identyfikator zdarzenia, Guid
// Parametr 2: nazwa zdarzenia, np. TechItemCreated, TechItemMoved
// Parametr 3: Określenie czy zdarzenie jest obiektem typu json
// Parametr 4: Dane zdarzenia poddane serializacji ponieważ oczekiwany typ to tablica bajtów
// Parametr 5: Metadane. Parametr może być null'em
var events = aggregate.GetChanges()
.Select(@event => new EventData(
Guid.NewGuid(),
@event.GetType().Name,
true,
Encoding.UTF8.GetBytes(JsonSerializer.Serialize(@event)),
Encoding.UTF8.GetBytes(@event.GetType().FullName)))
.ToArray();
if(!events.Any())
{
return;
}
// Nazwa strumienia: Zdarzenia danego agregata w EventStore nazywane są strumieniami
// Przykładowa nazwa strumienia: TechItem-86f720f3-88e0-4145-91b5-19c7a765fa4a
var streamName = GetStreamName(aggregate, aggregate.Id);
// Dodanie eventa do EventStora
var result = _eventStore.AppendToStreamAsync(streamName, ExpectedVersion.Any, events);
}
// Odczytywanie/pobieranie eventów
public async Task<T> LoadAsync<T>(Guid aggregateId) where T : Aggregate, new()
{
if (aggregateId == Guid.Empty)
throw new ArgumentException("Guid nie może być pusty", nameof(aggregateId));
var aggregate = new T();
// Określenie nazwy strumienia
var streamName = GetStreamName(aggregate, aggregateId);
var nextPageStart = 0L;
do
{
// Pobieranie kolejnych zdarzeń z Event Stora w kolejności zgodnej z iteracją
var page = await _eventStore.ReadStreamEventsForwardAsync(
streamName, nextPageStart, 4096, false);
if (page.Events.Length > 0)
{
// Wywołanie metody Load aggregatu, która pozwoli nam otrzymać ostateczną postać agregatu, tj.
// zebranie wszystkich eventów, które wystąpiły dla danego agregatu
aggregate.Load(
page.Events.Last().Event.EventNumber,
page.Events.Select(@event => JsonSerializer.Deserialize(Encoding.UTF8.GetString(@event.OriginalEvent.Data), Type.GetType(Encoding.UTF8.GetString(@event.OriginalEvent.Metadata)))
).ToArray());
}
nextPageStart = !page.IsEndOfStream ? page.NextEventNumber : -1;
} while (nextPageStart != -1);
return aggregate;
}
private string GetStreamName<T>(T type, Guid aggregateId) => $"{type.GetType().Name}-{aggregateId}";
}
}
Zanim przejdziemy dalej dodamy naszą klasę do kontenera Dependency Injection:
public void ConfigureServices(IServiceCollection services)
{
var eventStoreConnection = EventStoreConnection.Create(
connectionString: Configuration.GetValue<string>("EventStore:ConnectionString"),
builder: ConnectionSettings.Create().KeepReconnecting(),
connectionName: Configuration.GetValue<string>("EventStore:ConnectionName"));
eventStoreConnection.ConnectAsync().GetAwaiter().GetResult();
services.AddSingleton(eventStoreConnection);
services.AddTransient<AggregateRepository>();
services.AddControllers();
services.AddSwaggerGen(c =>
{
c.SwaggerDoc("v1", new OpenApiInfo { Title = "EventStoreExample", Version = "v1" });
});
}
Dlaczego EventStore został dodany ze scopem Singleton a repozytorium Transient? W pierwszym przypadku potrzebujemy aby Event Store był zawsze tym samym obiektem dla każdego żądania. W przypadku repozytorium agregata chcemy mieć nową instancję dla każdego kontrolera i każdej usługi.
Definiowanie przypadków użycia
Pora na kolejne składowe naszej architektury. Musimy dodać zdarzenia, wyjątki oraz przypadki użycia dla naszych TechItem’ów. Zaczniemy od dodania klasy TechItem wewnątrz folderu Core. W tym miejscu zdefiniujemy nasz agregat:
using EventStoreExample.Core.Framework;
using System;
namespace EventStoreExample.Core
{
public class TechItem : Aggregate
{
public string Name { get; private set; }
public string Description { get; private set; }
public string ActiveColumn { get; private set; }
public bool IsCompleted { get; private set; }
protected override void When(object @event)
{
throw new NotImplementedException();
}
}
}
Dodamy jeszcze prostą obsługę wyjątków. Utworzmy trzy osobne klasy w folderze Core:
using System;
namespace EventStoreExample.Core
{
public class TechItemCompletedException : Exception
{
public TechItemCompletedException() : base("Tech Item został już ukończony!")
{
}
}
}
using System;
namespace EventStoreExample.Core
{
public class TechItemNotFoundException : Exception
{
public TechItemNotFoundException():base("Tech Item nie został znaleziony!")
{
}
}
}
using System;
namespace EventStoreExample.Core
{
public class TechItemCreatedException : Exception
{
public TechItemCreatedException():base("Tech Item został już utworzony!")
{
}
}
}
Pora na przygotowanie konkretnych przypadków użycia dla naszych Tech Itemów.
TechItemCreated:
Dla każdego nowo utworzonego zadania chcemy przechowywać informacje dotyczące identyfikatora, tytułu oraz informacji kto utworzył dane zadanie. W tym celu utworzymy folder Events wewnątrz folderu Core gdzie dodamy klasę TechItemCreated:
public class TechItemCreated
{
public Guid Id { get; set; }
public string TItle { get; set; }
public string CreatedBy { get; set; }
}
Możemy teraz przejść do klasy TechItem w celu implementacji zdarzenia TechItemCreated na danym agregacie:
using EventStoreExample.Core.Events;
using EventStoreExample.Core.Framework;
using System;
namespace EventStoreExample.Core
{
public class TechItem : Aggregate
{
public string Name { get; private set; }
public string Description { get; private set; }
public string ActiveColumn { get; private set; }
public bool IsCompleted { get; private set; }
protected override void When(object @event)
{
switch (@event)
{
case TechItemCreated t:
OnCreated(t);
break;
default:
break;
}
}
public void Create(Guid id, string title, string createdBy)
{
if (Version >=0)
{
throw new TechItemCreatedException();
}
Apply(new TechItemCreated
{
Id = id,
TItle = title,
CreatedBy = createdBy
});
}
#region Event Handlers
private void OnCreated(TechItemCreated @event)
{
Id = @event.Id;
Name = @event.TItle;
ActiveColumn = "Ready for Development";
}
#endregion
}
}
TechItemAssigned:
Kolejnym naturalnym krokiem przy utworzeniu zadania jest przypisanie go do konkretnej osoby. Tym razem chcemy przechowywać identyfikator oraz informacje kto do kogo przypisał zadanie. Tworzymy nową klasę TechItemAssigned o poniższej definicji:
public class TechItemAssigned
{
public Guid Id { get; set; }
public string AssignedBy { get; set; }
public string AssignedTo { get; set; }
}
Kolejny krok to obsługa tego zdarzenia:
using EventStoreExample.Core.Events;
using EventStoreExample.Core.Framework;
using System;
namespace EventStoreExample.Core
{
public class TechItem : Aggregate
{
public string Name { get; private set; }
public string Description { get; private set; }
public string ActiveColumn { get; private set; }
public string AssignedTo { get; private set; }
public bool IsCompleted { get; private set; }
protected override void When(object @event)
{
switch (@event)
{
case TechItemCreated t:
OnCreated(t);
break;
case TechItemAssigned t:
OnAssigned(t);
break;
default:
break;
}
}
public void Create(Guid id, string title, string createdBy)
{
if (Version >= 0)
{
throw new TechItemCreatedException();
}
Apply(new TechItemCreated
{
Id = id,
TItle = title,
CreatedBy = createdBy
});
}
public void Assign(string assignedTo, string assignedBy)
{
if (Version == -1)
{
throw new TechItemNotFoundException();
}
if(IsCompleted)
{
throw new TechItemCompletedException();
}
Apply(new TechItemAssigned
{
Id = Id,
AssignedBy = assignedBy,
AssignedTo = assignedTo,
});
}
#region Event Handlers
private void OnCreated(TechItemCreated @event)
{
Id = @event.Id;
Name = @event.TItle;
ActiveColumn = "Ready for Development";
}
private void OnAssigned(TechItemAssigned @event)
{
AssignedTo = @event.AssignedTo;
}
#endregion
}
}
Kolejny krok to ukończenie pracy nad danym zadaniem i przesunięcie pomiędzy kolumnami na naszym boardzie. Utworzymy nową klasę, która będzie przechowywała informacje dotyczące informacji kto przesunął dane zadanie i do jakiej sekcji:
public class TechItemMoved
{
public Guid Id { get; set; }
public string MovedBy { get; set; }
public string ActiveColumn { get; set; }
}
Następny krok to obsługa zdarzenia TechItemMoved:
using EventStoreExample.Core.Events;
using EventStoreExample.Core.Framework;
using System;
namespace EventStoreExample.Core
{
public class TechItem : Aggregate
{
public string Name { get; private set; }
public string Description { get; private set; }
public string ActiveColumn { get; private set; }
public string AssignedTo { get; private set; }
public bool IsCompleted { get; private set; }
protected override void When(object @event)
{
switch (@event)
{
case TechItemCreated t:
OnCreated(t);
break;
case TechItemAssigned t:
OnAssigned(t);
break;
case TechItemMoved t:
OnMoved(t);
break;
default:
break;
}
}
public void Create(Guid id, string title, string createdBy)
{
if (Version >= 0)
{
throw new TechItemCreatedException();
}
Apply(new TechItemCreated
{
Id = id,
TItle = title,
CreatedBy = createdBy
});
}
public void Assign(string assignedTo, string assignedBy)
{
if (Version == -1)
{
throw new TechItemNotFoundException();
}
if (IsCompleted)
{
throw new TechItemCompletedException();
}
Apply(new TechItemAssigned
{
Id = Id,
AssignedBy = assignedBy,
AssignedTo = assignedTo,
});
}
public void Moved(string activeColumn, string movedBy)
{
if (Version == -1)
{
throw new TechItemNotFoundException();
}
if (IsCompleted)
{
throw new TechItemCompletedException();
}
Apply(new TechItemMoved
{
Id = Id,
MovedBy = movedBy,
ActiveColumn = activeColumn,
});
}
#region Event Handlers
private void OnCreated(TechItemCreated @event)
{
Id = @event.Id;
Name = @event.TItle;
ActiveColumn = "Ready for Development";
}
private void OnAssigned(TechItemAssigned @event)
{
AssignedTo = @event.AssignedTo;
}
private void OnMoved(TechItemMoved @event)
{
ActiveColumn = @event.ActiveColumn;
}
#endregion
}
}
TechItemCompleted
Ostatni krok to dodanie zdarzenia dla zakończenia pracy nad danym zadaniem. Dodajemy kolejną klasę o poniższej definicji:
public class TechItemCompleted
{
public int Id { get; set; }
public string CompletedBy { get; set; }
}
Dodajemy jeszcze obsługę zdarzenia:
using EventStoreExample.Core.Events;
using EventStoreExample.Core.Framework;
using System;
namespace EventStoreExample.Core
{
public class TechItem : Aggregate
{
public string Name { get; private set; }
public string Description { get; private set; }
public string ActiveColumn { get; private set; }
public string AssignedTo { get; private set; }
public bool IsCompleted { get; private set; }
protected override void When(object @event)
{
switch (@event)
{
case TechItemCreated t:
OnCreated(t);
break;
case TechItemAssigned t:
OnAssigned(t);
break;
case TechItemMoved t:
OnMoved(t);
break;
case TechItemCompleted t:
OnCompleted(t);
break;
default:
break;
}
}
public void Create(Guid id, string title, string createdBy)
{
if (Version >= 0)
{
throw new TechItemCreatedException();
}
Apply(new TechItemCreated
{
Id = id,
TItle = title,
CreatedBy = createdBy
});
}
public void Assign(string assignedTo, string assignedBy)
{
if (Version == -1)
{
throw new TechItemNotFoundException();
}
if (IsCompleted)
{
throw new TechItemCompletedException();
}
Apply(new TechItemAssigned
{
Id = Id,
AssignedBy = assignedBy,
AssignedTo = assignedTo,
});
}
public void Moved(string activeColumn, string movedBy)
{
if (Version == -1)
{
throw new TechItemNotFoundException();
}
if (IsCompleted)
{
throw new TechItemCompletedException();
}
Apply(new TechItemMoved
{
Id = Id,
MovedBy = movedBy,
ActiveColumn = activeColumn,
});
}
public void Complete(string completedBy)
{
if (Version == -1)
{
throw new TechItemNotFoundException();
}
if (IsCompleted)
{
throw new TechItemCompletedException();
}
Apply(new TechItemCompleted
{
Id = Id,
CompletedBy = completedBy
});
}
#region Event Handlers
private void OnCreated(TechItemCreated @event)
{
Id = @event.Id;
Name = @event.TItle;
ActiveColumn = "Ready for Development";
}
private void OnAssigned(TechItemAssigned @event)
{
AssignedTo = @event.AssignedTo;
}
private void OnMoved(TechItemMoved @event)
{
ActiveColumn = @event.ActiveColumn;
}
private void OnCompleted(TechItemCompleted @event)
{
IsCompleted = true;
}
#endregion
}
}
API
Powoli zbliżamy się do końca. Musimy jeszcze przygotować API, które pozwoli nam na wykorzystanie przygotowanego repozytorium. Spójrzcie na poniższy, przykładowy kod dla kilku metod:
using EventStoreExample.Infrastructure;
using Microsoft.AspNetCore.Mvc;
using System;
using System.Threading.Tasks;
namespace EventStoreExample.Controllers
{
[Route("api/techitem/{id}")]
[ApiController]
public class TechItemController : ControllerBase
{
private readonly AggregateRepository _aggregateRepository;
public TechItemController(AggregateRepository aggregateRepository)
{
_aggregateRepository = aggregateRepository;
}
[HttpPost, Route("create")]
public async Task<IActionResult> Create(Guid id, [FromForm] string title)
{
var aggregate = await _aggregateRepository.LoadAsync<Core.TechItem>(id);
aggregate.Create(id, title, "Pawel");
await _aggregateRepository.SaveAsync(aggregate);
return Ok();
}
[HttpPatch, Route("assign")]
public async Task<IActionResult> Assign(Guid id, [FromForm] string assignedTo)
{
var aggregate = await _aggregateRepository.LoadAsync<Core.TechItem>(id);
aggregate.Assign(assignedTo, "Krzysztof");
await _aggregateRepository.SaveAsync(aggregate);
return Ok();
}
[HttpPatch, Route("move")]
public async Task<IActionResult> Move(Guid id, [FromForm] string activeColumn)
{
var aggregate = await _aggregateRepository.LoadAsync<Core.TechItem>(id);
aggregate.Move(activeColumn, "Krzysztof");
await _aggregateRepository.SaveAsync(aggregate);
return Ok();
}
[HttpPatch, Route("complete")]
public async Task<IActionResult> Complete(Guid id)
{
var aggregate = await _aggregateRepository.LoadAsync<Core.TechItem>(id);
aggregate.Complete("Krzysztof");
await _aggregateRepository.SaveAsync(aggregate);
return Ok();
}
}
}
Patrząc na powyższy kod możecie zobaczyć, że w pierwszej kolejności pobierane są zdarzenia z Event Stora przy pomocy metody Load, którą napisaliśmy w repozytorium agregata. Na bazie zwróconego rezultatu tworzony jest agregat. Następnie, nowe zdarzenia przechowywane są w agregacie dzięki przygotowanym przez nas metodą (przypadki użycia) na agregacie. Przy pomocy metody Save przechowywane zdarzenia są wysyłane do Event Stora.
Jeżeli coś jest jeszcze nie jasne polecam przejść powoli przez odpalony kod krok-po-kroku, żeby zobaczyć jak tworzony jest agregat, jak dodajemy do niego nowe zdarzania a następnie zapisujemy je w Event Storze. Dodatkowo, możecie wykorzystać panel zarządzania w celu podejrzenia danego agregata i zdarzeń do których na nim doszło wykorzystując swoją lokalną instancję dostępną pod adresem: http://localhost:2113
Największe problemy będzie sprawiało lokalne postawienie Event Stora na Windowsie. Jeżeli napotkacie jakieś problemy skierujcie się tutaj: https://github.com/EventStore/EventStore/issues/2547 - znajdziecie prawdopodobnie odpowiedzi na wszystkie wasze pytania. Z rozwiązaniem przyjdą Wam flagi --insecure oraz --enable-external-tcp.
Pliki