Paweł Łukasiewicz
2015-07-20
Paweł Łukasiewicz
2015-07-20
Udostępnij Udostępnij Kontakt

Obecnie komputery osobiste i stacje robocze wyposażone są w procesory wielordzeniowe. Większość aplikacji .NET nie wykorzystuje pełnego potencjału komputera. Nawet, jeżeli programiści starają się wykorzystać ten potencjał, zwykle sprowadza się to używania wątków (threads) oraz instrukcji lock (wykluczanie korzystania z danego bloku kodu przez różne obiekty). Tak napisane oprogramowanie prowadzi do sytuacji w których kod staje się nieczytelny oraz jest narażony na wiele zagrożeń. Z zagrożeniami tymi zwykle nie spotykamy się podczas pisania standardowych aplikacji.

Biblioteka Task Parallel pozwala na pisanie czytelnego kodu, który jest mniej podatny na błędu oraz dostosowuje się do liczby dostępnych rdzeni procesora. Można więc mieć pewność, że program dopasuje się automatycznie do środowiska w którym został uruchomiony.


Biblioteka Task Parallel

Biblioteka Task Parallel (TPL) jest zbiorem publicznych typów i API zdefiniowanych w przestrzeniach nazw: System.Threading oraz System.Threading.Tasks. TPL automatycznie dopasuje się do dostępnej liczby rdzeni. Za pomocą biblioteki można zmaksymalizować wydajność kodu skupiając się jedynie na pisaniu kodu, który ma do zrealizowania określone zadanie, a nie na rozdziale pracy pomiędzy poszczególne rdzenie.

Biblioteka ta wprowadza pojęcie zadania (Task). Równoległość zadań (task parallelism) jest procesem równoległego uruchamiania tych zadań. Zadanie jest niezależną jednostką, która jest uruchamiana w ramach konkretnego programu. Korzyści z takiego podejścia są następujące:

  • bardziej wydajne użycie zasobów systemowych;
  • większa kontrola nad kodem niż ma to miejsce w przypadku użycia wątków.

Biblioteka TPL wykorzystuje wątki w tle, aby wykonywać zadania równolegle. Decyzja o liczbie używanych wątków jest dynamicznie obliczana w środowisku uruchomieniowym.


Czemu zadania a nie wątki?

Tworzenie wątku wiąże się z ogromnym kosztem. Tworzenie dużej ilości wątków w aplikacji ma wpływ na przeciążenie przełączania pomiędzy kontekstami (Context Switching). W środowisku jednordzeniowym może to również prowadzić do złej wydajności, ponieważ mamy jeden rdzeń, który będzie obsługiwał wiele wątków.

Task, tj. nasze zadanie, dynamicznie oblicza czy potrzebuje utworzyć wiele różnych wątków aby to zadanie zrealizować. W tle używany jest ThreadPool w celu zarządzania pracą bez konieczności tworzenia lub przełączania się pomiędzy kolejnymi wątkami, jeżeli nie jest to wymagane.

// Tradycyjne podejscie przy uzyciu watków: 552ms
// Podejscie przy uzyciu TPL: 79ms
Powyższe porównanie czasów pochodzi z programu w którym zastosowane zostało podejście tradycyjne oraz biblioteka TPL. W dalej części tego artykułu dostępny jest kod tego programu wraz ze szczegółowym opisem.

Poniżej przykład pokazujący tworzenie zadań równoległych (parallel tasks) używając wątków (Threads) oraz zadań (Task):
// tworzenie nowego wątku
var thread = new Thread(start =>
    {
        // zadanie do wykonania
    });
thread.Start();
// tworzenie nowego zadania
Task.Factory.StartNew(()=>
{
    // zadanie do wykonania
});

Wyjaśnijmy dokładniej czym różni się tworzenie zadań od tworzenia wątków. Jedną z pierwszy zalet zadań jest to, że łatwiej zagwarantować, że wykorzystają one w pełni potencjał środowiska na którym zostały uruchomione. Dla przykładu, jeżeli mamy zamiar wystartować wiele wątków, które w dużym stopniu obciążą procesor, ich wykonanie na maszynie z jednym rdzeniem spowoduje, że będą one wykonywane zauważanie dłużej. Oczywistym jest, że chcą wykonywać więcej wątków na maszynie z jednym rdzeniem, możemy napotkać problemy. Za każdym razem kiedy procesor musi przełączać się z wątku na wątek, jest to niewielkim obciążaniem, ale w momencie, gdy mamy jednocześnie uruchomione wiele wątków proces ten będzie występował dość często co znacząco wpłynie na czas wykonywania całej operacji niż gdyby zadania te były wykonywane synchronicznie.

Jeżeli nie mielibyśmy przełączania pomiędzy blokami do wykonania, nie mielibyśmy przełączania kontekstu pomiędzy wątkami. W wyniku takiego zachowania całkowity czas potrzebny na wykonanie wszystkich bloków kodu jest większy, przy czym do wykonania była dokładnie ta sama praca. Jeżeli bloki te byłyby podzielone na dwa rdzenie, mogłyby one zostać wykonanie równoległe zapewniając najwyższą możliwą efektywność.


Czemu zadania a nie pule wątków?

Teraz, gdy mamy już niewielkie pojęcie o zadaniach i ich możliwościach, zagłębimy się głębiej w temat oraz powiemy czym różnią się od puli wątków (ThreadPools).

Poniżej przykład wykonania nowej puli wątków:

ThreadPool.QueueUserWorkItem(obj => doSomework());
Zobaczmy, co należałoby zrobić, jeżeli chcielibyśmy Wait() (zaczekać), aż wątek się wykona:
var resetEvent = new ManualResetEvent(false);
ThreadPool.QueueUserWorkItem(obj =>
{
    doSomework();
    resetEvent.Set();
});
Zapis dla jednego wątku może nie wygląda na szczególnie skomplikowany.
A co jeżeli chcielibyśmy zaczekać na wykonanie 15 wątków?
Jak przechwycić wartości zwracane przez wiele wątków?
Jak przywrócić kontrolę z powrotem do GUI wątku?
Jest odpowiedź na te wszystkie pytania: delegaty oraz zdarzenia. Może to jednak prowadzić do sytuacji w której nasza aplikacja będzie podatna na błędy, a jeden błąd może nieść za sobą wystąpienia kolejnych błędów.
Wyjściem z tej sytuacji są zadania (Tasks).


Tworzenie nowych zadań

var task = new Task(doSomework());
task.Start();
Oczekiwanie na wykonanie zadań:
Task task1 = Task.Factory.StartNew(doSomework());
Task task2 = Task.Factory.StartNew(doSomework());
Task task3 = Task.Factory.StartNew(doSomework());
Task.WaitAll(task1, task2, task3);
Wykonanie kolejnego zadania asynchronicznie, kiedy obecne zadanie zostanie ukończone:
Task.Factory.StartNew(doSomework()).ContinueWith(anotherAsyncWork());
W prawdziwym świecie często zachodzi potrzeba wykonania asynchronicznie wielu operacji. Poniższy fragment pokazuje jak możemy tego dokonać:
Task.Factory.StartNew(GetImagesFromTumblr())
    .ContinueWith((Func<Task, List<string>>)GetImagesFromMyBirthday())
    .ContinueWith(ShareThisOnFacebook())
    .ContinueWith(SendNotificationToMyFriends())
    .Continue(WaitForConfirmation());

Poniższy kod (wraz ze szczegółowym opisem), pokazuje porównanie wykonanie tej samej operacji przy użyciu wątków oraz zastosowaniu biblioteki Task Parallel Library:

using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace ParallelTasksExample
{
    class Program
    {
        static void Main(string[] args)
        {
            const int maxThreadAllowed = 64;
            var watch = new Stopwatch();
            // Klasa ManualResetEventSlim to lżejsza wersja klasy ManualResetEvent
            // Lżejszej wersji używamy dla krótkich operacji, gdzie blokada nie będzie trwała długo
            // ManualResetEvent używamy, gdy operacja będzie trwała długo a metoda WaitOne będzie
            // blokowała wątek przez długi czas
            // Klasa jest implemetacją po stronie jądra, a takie operacje, tj. .NET-kernel są zwykle bardzo dużym ociążaniem dla procesora i pamięci.
            // Jednym z przykładów użycia klasy jest oczekiwanie na zamknięcie przez pewien określony czas oraz przerwanie, jeżeli nie istnieje.
            // Przypadek taki może się zdarzyć, kiedy wątek utknie np. w tzw. deadlock'u - martwy punkt.
            var mres = new ManualResetEventSlim[maxThreadAllowed];
            for (int i = 0; i < mres.Length; i++)
            {
                // poniższy zapis pozwala na ustawienie stanu początkowego obiektu, w tym przypadku ustawiamy go na false dla każdego elementu
                // innymi słowy, początkowa wartość sygnału jest ustawiona na false
                mres[i] = new ManualResetEventSlim(false);
            }
            watch.Start();
            // uruchamiamy nowy klasyczny wątek (thread) oraz powiadamiamy ManualResetEvent kiedy wykonanie jest skończone
            // możemy więc sprawdzić czas wykonania
            for (int i = 0; i < mres.Length; i++)
            {
                var id = i;
                var thread = new Thread(state =>
                  {
                      for (int j = 0; j < 10; j++)
                      {
                          Console.WriteLine(string.Format("Wątek: {0}, numer operacji: {1}", state.ToString(), j.ToString()));
                      }
                      // wysłamy zdarzenie za pomocą metody Set(), zmiana wartości sygnału, który pozwala by jeden lub więcej wątków oczekiwał ma wykonanie zdarzenia
                      mres[id].Set();
                  });
                // Uruchamiany wykonanie danego wątku
                thread.Start(string.Format("Wątek: {0}", i));
            }
            // WaitHandle - mechanim pozwalający na kontrolę nad wykonaniem wątków
            // Oczekujemy, aż każdy element na liście otrzyma sygnał
            WaitHandle.WaitAll((from x in mres select x.WaitHandle).ToArray());
            // Zapisujemy czas wykonania klasycznego wątku
            var threadTime = watch.ElapsedMilliseconds;
            watch.Reset();
            foreach (ManualResetEventSlim t in mres)
            {
                // zmieniamy status zdarzenia na brak sygnału, powoduje to zablokowanie wątków
                t.Reset();
            }
            watch.Start();
            // Wykonujemy to samo zadanie, tym razem używając biblioteki Task Parallel Library
            for (int i = 0; i < mres.Length; i++)
            {
                var id = i;
                Task.Factory.StartNew(state =>
                {
                    for (int j = 0; j < 10; j++)
                    {
                        Console.WriteLine(string.Format("Wątek: {0}, numer operacji: {1}", state.ToString(), j.ToString()));
                    }
                    mres[id].Set();
                }, string.Format("Zadanie: {0}", i));
            }
            WaitHandle.WaitAll((from x in mres select x.WaitHandle).ToArray());
            // zapisujemy czas wykonania zadania
            var taskTime = watch.ElapsedMilliseconds;
            // wypisujemy czasy wykonania operacji
            Console.WriteLine("Tradycyjne podejście przy użyciu wątków: {0}ms", threadTime);
            Console.WriteLine("Podejście przy użyciu TPL: {0}ms", taskTime);
            foreach (var item in mres)
            {
                // zmieniamy status zdarzenia na brak sygnału, powoduje to zablokowanie wątków
                item.Reset();
            }
            Console.ReadKey();

            // Wynik działania programu
            // Tradycyjne podejscie przy uzyciu watków: 552ms
            // Podejscie przy uzyciu TPL: 79ms

        }
    }
}

Parallel Extensions

Rozszerzenia równoległe zostały zaprezentowane wraz z biblioteką Task Parallel Library, aby osiągnąć równoległość przetwarzania danych. Równoległość w tym przypadku odnosi się do scenariusza w którym te same operacje wykonywane są jednocześnie na tym samym źródle danych, tj. kolekcjach czy tablicach. Platforma .NET dostarcza konstrukcję, które pozwalają to osiągnąć: Parallel.For oraz Parallel.Foreach.

Poniżej przykład użycia obu konstrukcji:

// tradycyjne zastosowanie pętli foreach
foreach(var item in sourceCollection)
{
	doSomework(item);
}
// odpowiednik równoległy
Parallel.ForEach(sourceCollection, item = > doSomework(item));

W celu pokazania sposóbu użycia, korzyści z wykorzystania wyżej wspomnianej biblioteki, poniżej obszerny przykład ze szczegółowym wyjaśnieniem zastosowanych zagadnień:

using System;
using System.Diagnostics;
using System.Threading.Tasks;
namespace ParallelExtension
{
    class Program
    {
        static void Main(string[] args)
        {
            // Ustawiamy rozmiary naszych tablic.
            // Zwiększ liczbę elementów aby lepiej zobaczyć porównanie uzyskanych rezultatów
            int colCounter = 800;
            int rowCounter = 800;
            // tworzymy dwuwymiarowe tablice
            double[,] m1 = InitializeMatrix(rowCounter, colCounter);
            double[,] m2 = InitializeMatrix(rowCounter, colCounter);
            // tworzymy tablicę wynikową. Będzie ona rezutatem mnożenia obu tablic
            var result = new double[rowCounter, colCounter];
            // W pierwszej kolejności wykonamy tradycyjne podejście
            Console.WriteLine("Tradycyjne wykonanie... \nOperacja ta chwilę potrwa...");
            var stopwatch = new Stopwatch();
            stopwatch.Start();
            // mnożenie naszych tablic
            MultiplyTables(m1, m2, result);
            stopwatch.Stop();
            Console.WriteLine("Czas operacji: {0}ms", stopwatch.ElapsedMilliseconds);
            // resetujemy stoper oraz tworzymy nową tablicę wyników
            stopwatch.Reset();
            Console.WriteLine("Podejście równoległe...");
            result = new double[rowCounter, colCounter];
            stopwatch.Start();
            MultiplyTablesParallel(m1, m2, result);
            Console.WriteLine("Czas operacji: {0}ms", stopwatch.ElapsedMilliseconds);
            stopwatch.Stop();
            Console.ReadKey();
            // Wynik działania programu dla 800 kolumn i wierszy
            // Tradycyjne wykonanie...
            // Operacja ta chwile potrwa...
            // Czas operacji: 11136ms
            // Podejscie równolegle...
            // Czas operacji: 5046ms
        }
        private static void MultiplyTablesParallel(double[,] m1, double[,] m2, double[,] result)
        {
            // zwracamy liczbe kolumn we wskazanym wymiarze
            int m1Columns = m1.GetLength(1);
            int m2Columns = m2.GetLength(1);
            int resultColumns = result.GetLength(0);
            // Wykonany dokładnie tą samą operację ale przy użyciu biblioteki TPL
            Parallel.For(0, m1Columns, i =>
            {
                for (int j = 0; j < m2Columns; j++)
                {
                    // zmienna tymczasowa pozwoli na optymalizację równległych obliczeń
                    double temp = 0;
                    for (int k = 0; k < m1Columns; k++)
                    {
                        temp += m1[i, k] * m2[k, j];
                    }
                    result[i, j] = temp;
                }
            });
        }
        private static void MultiplyTables(double[,] m1, double[,] m2, double[,] result)
        {
            // zwracamy liczbe kolumn we wskazanym wymiarze
            int m1Columns = m1.GetLength(1);
            int m2Columns = m2.GetLength(1);
            int resultColumns = result.GetLength(0);
            for (int i = 0; i < m1Columns; i++)
            {
                for (int j = 0; j < m2Columns; j++)
                {
                    // zmienna tymczasowa pozwoli na optymalizację obliczeń
                    double temp = 0;
                    for (int k = 0; k < m1Columns; k++)
                    {
                        temp += m1[i, k] * m2[k, j];
                    }
                    result[i, j] = temp;
                }
            }
        }
        private static double[,] InitializeMatrix(int rowCounter, int colCounter)
        {
            var table = new double[rowCounter, colCounter];
            // klasa dzięki, której możemy generować liczby losowe
            var r = new Random();
            for (int i = 0; i < rowCounter; i++)
            {
                for (int j = 0; j < colCounter; j++)
                {
                    // liczba losowa, gdzie maksymalna wartość jest mniejsza niż 100
                    table[i, j] = r.Next(100);
                }
            }
            return table;
        }
    }
}


Podsumowanie

Biblioteka ta wraz z dostępnym rozszerzeniem pozwala na pełne wykorzystanie potencjału sprzętowego na którym uruchamiany jest program. Ten sam kod automatycznie dostosowuje się do platformy na której jest uruchamiany dzięki czemu możemy osiągnąć zauważalne korzyści na lepszych maszynach. Biblioteka ta poprawia również czytelność kodu oraz zmniejsza prawdopodobieństwo wystąpienia błędów.