Paweł Łukasiewicz
2015-07-20
Paweł Łukasiewicz
2015-07-20
Obecnie komputery osobiste i stacje robocze wyposażone są w procesory wielordzeniowe. Większość aplikacji .NET nie
wykorzystuje pełnego potencjału komputera. Nawet, jeżeli programiści starają się wykorzystać ten potencjał, zwykle sprowadza się to używania wątków
(threads) oraz instrukcji lock (wykluczanie korzystania z danego bloku kodu przez
różne obiekty). Tak napisane oprogramowanie prowadzi do sytuacji w których kod staje się nieczytelny oraz jest narażony na wiele zagrożeń. Z zagrożeniami
tymi zwykle nie spotykamy się podczas pisania standardowych aplikacji.
Biblioteka Task Parallel pozwala na pisanie czytelnego kodu, który jest mniej podatny na błędu oraz dostosowuje się
do liczby dostępnych rdzeni procesora. Można więc mieć pewność, że program dopasuje się automatycznie do środowiska w którym został uruchomiony.
Biblioteka Task Parallel
Biblioteka Task Parallel (TPL) jest zbiorem publicznych typów i API zdefiniowanych w przestrzeniach nazw:
System.Threading oraz System.Threading.Tasks. TPL
automatycznie dopasuje się do dostępnej liczby rdzeni. Za pomocą biblioteki można zmaksymalizować wydajność kodu skupiając się jedynie na pisaniu
kodu, który ma do zrealizowania określone zadanie, a nie na rozdziale pracy pomiędzy poszczególne rdzenie.
Biblioteka ta wprowadza pojęcie zadania (Task). Równoległość zadań (task parallelism) jest procesem równoległego
uruchamiania tych zadań. Zadanie jest niezależną jednostką, która jest uruchamiana w ramach konkretnego programu. Korzyści z takiego podejścia są
następujące:
- bardziej wydajne użycie zasobów systemowych;
- większa kontrola nad kodem niż ma to miejsce w przypadku użycia wątków.
Biblioteka TPL wykorzystuje wątki w tle, aby wykonywać zadania równolegle. Decyzja o liczbie używanych wątków jest
dynamicznie obliczana w środowisku uruchomieniowym.
Czemu zadania a nie wątki?
Tworzenie wątku wiąże się z ogromnym kosztem. Tworzenie dużej ilości wątków w aplikacji ma wpływ na przeciążenie przełączania pomiędzy kontekstami
(Context Switching). W środowisku jednordzeniowym może to również prowadzić do złej wydajności, ponieważ mamy jeden
rdzeń, który będzie obsługiwał wiele wątków.
Task, tj. nasze zadanie, dynamicznie oblicza czy potrzebuje utworzyć wiele różnych wątków aby to zadanie zrealizować.
W tle używany jest ThreadPool w celu zarządzania pracą bez konieczności tworzenia lub przełączania się pomiędzy
kolejnymi wątkami, jeżeli nie jest to wymagane.
// Tradycyjne podejscie przy uzyciu watków: 552ms
// Podejscie przy uzyciu TPL: 79ms
Powyższe porównanie czasów pochodzi z programu w którym zastosowane zostało podejście tradycyjne oraz biblioteka TPL.
W dalej części tego artykułu dostępny jest kod tego programu wraz ze szczegółowym opisem.
Poniżej przykład pokazujący tworzenie zadań równoległych (parallel tasks) używając wątków
(Threads) oraz zadań (Task):
// tworzenie nowego wątku
var thread = new Thread(start =>
{
// zadanie do wykonania
});
thread.Start();
// tworzenie nowego zadania
Task.Factory.StartNew(()=>
{
// zadanie do wykonania
});
Wyjaśnijmy dokładniej czym różni się tworzenie zadań od tworzenia wątków. Jedną z pierwszy zalet zadań jest to, że łatwiej zagwarantować, że
wykorzystają one w pełni potencjał środowiska na którym zostały uruchomione. Dla przykładu, jeżeli mamy zamiar wystartować wiele wątków, które w
dużym stopniu obciążą procesor, ich wykonanie na maszynie z jednym rdzeniem spowoduje, że będą one wykonywane zauważanie dłużej. Oczywistym jest,
że chcą wykonywać więcej wątków na maszynie z jednym rdzeniem, możemy napotkać problemy. Za każdym razem kiedy procesor musi przełączać się z wątku
na wątek, jest to niewielkim obciążaniem, ale w momencie, gdy mamy jednocześnie uruchomione wiele wątków proces ten będzie występował dość często
co znacząco wpłynie na czas wykonywania całej operacji niż gdyby zadania te były wykonywane synchronicznie.
Jeżeli nie mielibyśmy przełączania pomiędzy blokami do wykonania, nie mielibyśmy przełączania kontekstu pomiędzy wątkami. W wyniku takiego
zachowania całkowity czas potrzebny na wykonanie wszystkich bloków kodu jest większy, przy czym do wykonania była dokładnie ta sama praca.
Jeżeli bloki te byłyby podzielone na dwa rdzenie, mogłyby one zostać wykonanie równoległe zapewniając najwyższą możliwą efektywność.
Czemu zadania a nie pule wątków?
Teraz, gdy mamy już niewielkie pojęcie o zadaniach i ich możliwościach, zagłębimy się głębiej w temat oraz powiemy czym różnią się od puli wątków
(ThreadPools).
Poniżej przykład wykonania nowej puli wątków:
ThreadPool.QueueUserWorkItem(obj => doSomework());
Zobaczmy, co należałoby zrobić, jeżeli chcielibyśmy Wait() (zaczekać), aż wątek się wykona:
var resetEvent = new ManualResetEvent(false);
ThreadPool.QueueUserWorkItem(obj =>
{
doSomework();
resetEvent.Set();
});
Zapis dla jednego wątku może nie wygląda na szczególnie skomplikowany.
A co jeżeli chcielibyśmy zaczekać na wykonanie 15 wątków?
Jak przechwycić wartości zwracane przez wiele wątków?
Jak przywrócić kontrolę z powrotem do GUI wątku?
Jest odpowiedź na te wszystkie pytania: delegaty oraz zdarzenia. Może to jednak prowadzić do sytuacji w której nasza aplikacja będzie podatna na błędy,
a jeden błąd może nieść za sobą wystąpienia kolejnych błędów.
Wyjściem z tej sytuacji są zadania (Tasks).
Tworzenie nowych zadań
var task = new Task(doSomework());
task.Start();
Oczekiwanie na wykonanie zadań:
Task task1 = Task.Factory.StartNew(doSomework());
Task task2 = Task.Factory.StartNew(doSomework());
Task task3 = Task.Factory.StartNew(doSomework());
Task.WaitAll(task1, task2, task3);
Wykonanie kolejnego zadania asynchronicznie, kiedy obecne zadanie zostanie ukończone:
Task.Factory.StartNew(doSomework()).ContinueWith(anotherAsyncWork());
W prawdziwym świecie często zachodzi potrzeba wykonania asynchronicznie wielu operacji. Poniższy fragment pokazuje jak możemy tego dokonać:
Task.Factory.StartNew(GetImagesFromTumblr())
.ContinueWith((Func<Task, List<string>>)GetImagesFromMyBirthday())
.ContinueWith(ShareThisOnFacebook())
.ContinueWith(SendNotificationToMyFriends())
.Continue(WaitForConfirmation());
Poniższy kod (wraz ze szczegółowym opisem), pokazuje porównanie wykonanie tej samej operacji przy użyciu wątków oraz zastosowaniu biblioteki
Task Parallel Library:
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace ParallelTasksExample
{
class Program
{
static void Main(string[] args)
{
const int maxThreadAllowed = 64;
var watch = new Stopwatch();
// Klasa ManualResetEventSlim to lżejsza wersja klasy ManualResetEvent
// Lżejszej wersji używamy dla krótkich operacji, gdzie blokada nie będzie trwała długo
// ManualResetEvent używamy, gdy operacja będzie trwała długo a metoda WaitOne będzie
// blokowała wątek przez długi czas
// Klasa jest implemetacją po stronie jądra, a takie operacje, tj. .NET-kernel są zwykle bardzo dużym ociążaniem dla procesora i pamięci.
// Jednym z przykładów użycia klasy jest oczekiwanie na zamknięcie przez pewien określony czas oraz przerwanie, jeżeli nie istnieje.
// Przypadek taki może się zdarzyć, kiedy wątek utknie np. w tzw. deadlock'u - martwy punkt.
var mres = new ManualResetEventSlim[maxThreadAllowed];
for (int i = 0; i < mres.Length; i++)
{
// poniższy zapis pozwala na ustawienie stanu początkowego obiektu, w tym przypadku ustawiamy go na false dla każdego elementu
// innymi słowy, początkowa wartość sygnału jest ustawiona na false
mres[i] = new ManualResetEventSlim(false);
}
watch.Start();
// uruchamiamy nowy klasyczny wątek (thread) oraz powiadamiamy ManualResetEvent kiedy wykonanie jest skończone
// możemy więc sprawdzić czas wykonania
for (int i = 0; i < mres.Length; i++)
{
var id = i;
var thread = new Thread(state =>
{
for (int j = 0; j < 10; j++)
{
Console.WriteLine(string.Format("Wątek: {0}, numer operacji: {1}", state.ToString(), j.ToString()));
}
// wysłamy zdarzenie za pomocą metody Set(), zmiana wartości sygnału, który pozwala by jeden lub więcej wątków oczekiwał ma wykonanie zdarzenia
mres[id].Set();
});
// Uruchamiany wykonanie danego wątku
thread.Start(string.Format("Wątek: {0}", i));
}
// WaitHandle - mechanim pozwalający na kontrolę nad wykonaniem wątków
// Oczekujemy, aż każdy element na liście otrzyma sygnał
WaitHandle.WaitAll((from x in mres select x.WaitHandle).ToArray());
// Zapisujemy czas wykonania klasycznego wątku
var threadTime = watch.ElapsedMilliseconds;
watch.Reset();
foreach (ManualResetEventSlim t in mres)
{
// zmieniamy status zdarzenia na brak sygnału, powoduje to zablokowanie wątków
t.Reset();
}
watch.Start();
// Wykonujemy to samo zadanie, tym razem używając biblioteki Task Parallel Library
for (int i = 0; i < mres.Length; i++)
{
var id = i;
Task.Factory.StartNew(state =>
{
for (int j = 0; j < 10; j++)
{
Console.WriteLine(string.Format("Wątek: {0}, numer operacji: {1}", state.ToString(), j.ToString()));
}
mres[id].Set();
}, string.Format("Zadanie: {0}", i));
}
WaitHandle.WaitAll((from x in mres select x.WaitHandle).ToArray());
// zapisujemy czas wykonania zadania
var taskTime = watch.ElapsedMilliseconds;
// wypisujemy czasy wykonania operacji
Console.WriteLine("Tradycyjne podejście przy użyciu wątków: {0}ms", threadTime);
Console.WriteLine("Podejście przy użyciu TPL: {0}ms", taskTime);
foreach (var item in mres)
{
// zmieniamy status zdarzenia na brak sygnału, powoduje to zablokowanie wątków
item.Reset();
}
Console.ReadKey();
// Wynik działania programu
// Tradycyjne podejscie przy uzyciu watków: 552ms
// Podejscie przy uzyciu TPL: 79ms
}
}
}
Parallel Extensions
Rozszerzenia równoległe zostały zaprezentowane wraz z biblioteką Task Parallel Library, aby osiągnąć równoległość
przetwarzania danych. Równoległość w tym przypadku odnosi się do scenariusza w którym te same operacje wykonywane są jednocześnie na tym samym
źródle danych, tj. kolekcjach czy tablicach. Platforma .NET dostarcza konstrukcję, które pozwalają to osiągnąć:
Parallel.For oraz Parallel.Foreach.
Poniżej przykład użycia obu konstrukcji:
// tradycyjne zastosowanie pętli foreach
foreach(var item in sourceCollection)
{
doSomework(item);
}
// odpowiednik równoległy
Parallel.ForEach(sourceCollection, item = > doSomework(item));
W celu pokazania sposóbu użycia, korzyści z wykorzystania wyżej wspomnianej biblioteki, poniżej obszerny przykład ze szczegółowym wyjaśnieniem
zastosowanych zagadnień:
using System;
using System.Diagnostics;
using System.Threading.Tasks;
namespace ParallelExtension
{
class Program
{
static void Main(string[] args)
{
// Ustawiamy rozmiary naszych tablic.
// Zwiększ liczbę elementów aby lepiej zobaczyć porównanie uzyskanych rezultatów
int colCounter = 800;
int rowCounter = 800;
// tworzymy dwuwymiarowe tablice
double[,] m1 = InitializeMatrix(rowCounter, colCounter);
double[,] m2 = InitializeMatrix(rowCounter, colCounter);
// tworzymy tablicę wynikową. Będzie ona rezutatem mnożenia obu tablic
var result = new double[rowCounter, colCounter];
// W pierwszej kolejności wykonamy tradycyjne podejście
Console.WriteLine("Tradycyjne wykonanie... \nOperacja ta chwilę potrwa...");
var stopwatch = new Stopwatch();
stopwatch.Start();
// mnożenie naszych tablic
MultiplyTables(m1, m2, result);
stopwatch.Stop();
Console.WriteLine("Czas operacji: {0}ms", stopwatch.ElapsedMilliseconds);
// resetujemy stoper oraz tworzymy nową tablicę wyników
stopwatch.Reset();
Console.WriteLine("Podejście równoległe...");
result = new double[rowCounter, colCounter];
stopwatch.Start();
MultiplyTablesParallel(m1, m2, result);
Console.WriteLine("Czas operacji: {0}ms", stopwatch.ElapsedMilliseconds);
stopwatch.Stop();
Console.ReadKey();
// Wynik działania programu dla 800 kolumn i wierszy
// Tradycyjne wykonanie...
// Operacja ta chwile potrwa...
// Czas operacji: 11136ms
// Podejscie równolegle...
// Czas operacji: 5046ms
}
private static void MultiplyTablesParallel(double[,] m1, double[,] m2, double[,] result)
{
// zwracamy liczbe kolumn we wskazanym wymiarze
int m1Columns = m1.GetLength(1);
int m2Columns = m2.GetLength(1);
int resultColumns = result.GetLength(0);
// Wykonany dokładnie tą samą operację ale przy użyciu biblioteki TPL
Parallel.For(0, m1Columns, i =>
{
for (int j = 0; j < m2Columns; j++)
{
// zmienna tymczasowa pozwoli na optymalizację równległych obliczeń
double temp = 0;
for (int k = 0; k < m1Columns; k++)
{
temp += m1[i, k] * m2[k, j];
}
result[i, j] = temp;
}
});
}
private static void MultiplyTables(double[,] m1, double[,] m2, double[,] result)
{
// zwracamy liczbe kolumn we wskazanym wymiarze
int m1Columns = m1.GetLength(1);
int m2Columns = m2.GetLength(1);
int resultColumns = result.GetLength(0);
for (int i = 0; i < m1Columns; i++)
{
for (int j = 0; j < m2Columns; j++)
{
// zmienna tymczasowa pozwoli na optymalizację obliczeń
double temp = 0;
for (int k = 0; k < m1Columns; k++)
{
temp += m1[i, k] * m2[k, j];
}
result[i, j] = temp;
}
}
}
private static double[,] InitializeMatrix(int rowCounter, int colCounter)
{
var table = new double[rowCounter, colCounter];
// klasa dzięki, której możemy generować liczby losowe
var r = new Random();
for (int i = 0; i < rowCounter; i++)
{
for (int j = 0; j < colCounter; j++)
{
// liczba losowa, gdzie maksymalna wartość jest mniejsza niż 100
table[i, j] = r.Next(100);
}
}
return table;
}
}
}
Podsumowanie
Biblioteka ta wraz z dostępnym rozszerzeniem pozwala na pełne wykorzystanie potencjału sprzętowego na którym uruchamiany jest program. Ten sam
kod automatycznie dostosowuje się do platformy na której jest uruchamiany dzięki czemu możemy osiągnąć zauważalne korzyści na lepszych maszynach.
Biblioteka ta poprawia również czytelność kodu oraz zmniejsza prawdopodobieństwo wystąpienia błędów.