Amazon Kinesis ułatwia zbieranie, przetwarzanie i analizowanie danych strumieniowych w czasie rzeczywistym dzięki czemu można uzyskać aktualny wgląd w dane i szybko reagować na nowe informacje. Amazon Kinesis oferuje kluczowe możliwości efektywnego kosztowo przetwarzania danych strumieniowych w dowolnej skali, wraz z elastycznością związaną z wyborem narzędzia, które najlepiej pasuje do wymagań aplikacji.
Dzięki Amazon Kinesis można pobierać dane w czasie rzeczywistym takie jak audio, video, logi aplikacji, dane telemetryczne IoT do uczenia maszynowego, analizy i innych zastosowań. Usługa pozwala na przetwarzanie i analizowanie danych w miarę ich napływu i natychmiastową reakcję zamiast czekania, aż wszystkie dane zostaną zebrane przed rozpoczęciem przetwarzania.
Kolejne kroki, które wykonamy w ramach tego wpisu:
utworzenie roli z wymaganymi uprawnieniami;
utworzenie strumienia danych Kinesis;
utworzenie funkcji Lambda
dodanie danych do strumienia Kinesis;
Przykład
Zanim przejdziemy do implementacji spójrzmy jeszcze na poniższy diagram:
Tym razem nasz przykład jest nieco prostszy. Dodanie danych do strumienia Kinesis spowoduje uruchomienie w tle funkcji Lambda, która dokona prostego przetwarzania danych a następnie korzystając z usługi SES wyślemy wiadomość email z danymi wyjściowymi.
Tworzenie roli
Jak się doskonale domyślacie potrzebujemy roli z uprawnieniami do Kinesis, usługi SES oraz Lambdy:
Tworzenie strumienia Kinesis
Wykorzystując wyszukiwarkę usług przechodzimy do Amazon Kinesis:
Z poziomu ekranu wybieramy utworzenie strumienia danych, tj. Data Streams. W nowo otwartym oknie konfiguracyjnym wprowadzamy nazwę strumienia oraz wskazujemy liczbę shardów (jeżeli zapomnieliście czym jest pojedynczy shard odsyłam do odpowiedniego wpisu z poprzedniej serii: AWS - Kinesis):
Proces tworzenia strumienia powinien zająć kilka sekund a Wy, po chwili, zobaczycie poniższy ekran:
Tworzenie Lambdy i wyzwalacza
Myślę, że po tylu wpisach każdy poradzi sobie z dodaniem funkcji, wybraniem odpowiedniego środowiska uruchomieniowego, przypisaniem odpowiedniej roli oraz utworzeniem wyzwalacza na strumień Kinesis bez żadnych problemów. Dodam jedynie, że w ramach konfiguracji ustawiłem batch size na 50 - jest to maksymalna liczba rekordów pobrana ze strumienia w danym momencie. Tak powinien prezentować się ekran główny Waszej funkcji z ustawionym wyzwalaczem:
Implementacja
Podobnie jak w poprzednich przypadkach kod piszemy wykorzystując środowisko Visual Studio 2019 a następnie dokonujemy publikacji wykorzystując zainstalowaną wtyczkę. Samo wysyłanie wiadomości email jest banalnie proste o czym przekonaliście się w poprzednich wpisach. Jedyna zmiana w implementacji wynika z wykorzystania paczki, która potrafi przyjąć strumień danych (rekordy), które następnie "przetwarzamy":
using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Threading.Tasks;
using Amazon;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using Amazon.SimpleEmail;
using Amazon.SimpleEmail.Model;
// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]
namespace AWSLambda4
{
public class Function
{
public async Task FunctionHandler(KinesisEvent kinesisEvent, ILambdaContext context)
{
context.Logger.LogLine($"Beginning to process {kinesisEvent.Records.Count} records...");
foreach (var record in kinesisEvent.Records)
{
context.Logger.LogLine($"Event ID: {record.EventId}");
context.Logger.LogLine($"Event Name: {record.EventName}");
string recordData = GetRecordContents(record.Kinesis);
context.Logger.LogLine($"Record Data:");
context.Logger.LogLine(recordData);
// Ustawcie adres email zweryfikowany w obrębie konkretnego regionu
string senderAddress = "zweryfikowany_adres_email@gmail.com";
// Pamiętajcie o ustawieniu regionu w którym dokonaliście weryfikacji adresu email
// W przeciwnym wypadku zobaczycie poniższy błąd:
// Email address is not verified.
// The following identities failed the check in region EU-WEST-2: zweryfikowany_adres_email@gmail.com
using (var client = new AmazonSimpleEmailServiceClient(RegionEndpoint.USEast1))
{
var sendRequest = new SendEmailRequest
{
Source = senderAddress,
Destination = new Destination
{
ToAddresses =
new List<string> { "zweryfikowany_adres_email@gmail.com" }
},
Message = new Message
{
Subject = new Content("Wpisz temat wysyłanej wiadomości"),
Body = new Body
{
Html = new Content
{
Charset = "UTF-8",
Data = recordData
},
Text = new Content
{
Charset = "UTF-8",
Data = "body"
}
}
}
};
var sendEmail = await client.SendEmailAsync(sendRequest);
}
}
context.Logger.LogLine("Stream processing complete.");
}
private string GetRecordContents(KinesisEvent.Record streamRecord)
{
using (var reader = new StreamReader(streamRecord.Data, Encoding.ASCII))
{
return reader.ReadToEnd();
}
}
}
}
Testowanie
Zastanawiacie się pewnie jak dodać dane do strumienia? W przypadku SNS mogliśmy w prosty sposób dodać nową wiadomość wykorzystując konsole usług AWS. Tym razem możemy posłużyć się gotowymi przykładami lub wykorzystać AWS CLI:
Jeżeli jeszcze nie zainstalowaliście AWS CLI zerknijcie w ten wpis: AWS Lambda - AWS CLI w którym omówiłem proces instalacji oraz niezbędnej konfiguracji.
My przechodzimy do konkretów. Posługując się poniższym poleceniem konsoli jesteśmy w stanie dodać dane do naszego strumienia:
aws kinesis put-record --stream-name kinesislambda --data "Witaj Swiecie - przetwarzamy rekord z Kinesis" --partition-key "789675"
W momencie wykonania polecenia doszło do uruchomienia funkcji Lambda - sprawdźcie czy dostaliście wiadomość na skrzynkę mailową:
Klucz ten jest używany do grupowania danych w obrębie strumienia. AWS Kinesis segreguje rekordy należące do strumienia na wiele bloków. Partition key jest powiązany z każdym rekordem danych w celu określenia do którego bloku należy dany rekord danych. Spróbujcie dodać (w ramach własnych testów) jeszcze parę rekordów z innym kluczem i spójrzcie na shardId.