====== Wątki - obliczenia równoległe ====== Zrealizujemy przykład, w którym obliczana będzie średnia wartości zgromadzonych w tablicy. ==== Mean ==== Zadeklaruj klasę ''Mean'', której statycznym atrybutem będzie tablica ''double[]'' wypełniona losowymi elementami. W testowanych przykładach ta tablica będzie długa, np. będzie miała 100 mln. elementów. public class Mean { static double[] array; static void initArray(int size){ array = new double[size]; for(int i=0;i ==== Wątek MeanCalc ==== Wątek ''MeanCalc'' otrzyma w konstruktorze dwa indeksy: ''start'' i ''end'' i obliczy średnią dla elementów tablicy ''array'' pomiędzy ''start''(włącznie) i ''end'' (wyłącznie). Zakładając, że tablicę podzielimy na bloki tej samej długości i każdy z nich będzie obsłużony przez jeden wątek, będzie mozna następnie obliczyć średnią z wartości zwróconych przez poszczególne wątki. Zadeklaruj ''MeanCalc'' jako klasę zagnieżdżoną wewnątrz ''Mean'' static class MeanCalc extends Thread{ private final int start; private final int end; double mean = 0; MeanCalc(int start, int end){ this.start = start; this.end=end; } public void run(){ // ??? liczymy średnią System.out.printf(Locale.US,"%d-%d mean=%f\n",start,end,mean); } } ==== Uruchamianie wątków i liczenie średnich -- parallelMean() v.1 ==== Napisz funkcję ''parallelMean()'' /** * Oblicza średnią wartości elementów tablicy array uruchamiając równolegle działające wątki. * Wypisuje czasy operacji * @param cnt - liczba wątków */ static void parallelMean(int cnt){ // utwórz tablicę wątków MeanCalc threads[]=new MeanCalc[cnt]; // utwórz wątki, podziel tablice na równe bloki i przekaż indeksy do wątków // załóż, że array.length dzieli się przez cnt) double t1 = System.nanoTime()/1e6; //uruchom wątki double t2 = System.nanoTime()/1e6; // czekaj na ich zakończenie używając metody ''join'' for(MeanCalc mc:threads) { mc.join(); } // oblicz średnią ze średnich double t3 = System.nanoTime()/1e6; System.out.printf(Locale.US,"size = %d cnt=%d > t2-t1=%f t3-t1=%f mean=%f\n", array.length, cnt, t2-t1, t3-t1, mean); } Wywołaj w ''main()'' funkcje initArray() i ''parallelMean()'' z różnymi parametrami i porównaj czasy. ==== parallelMean() v.2 ==== Synchronizacja za pomocą ''join()'' nie jest szczególnie wygodna. Bardziej naturalne jest użycie *semafora (jezeli zalezy nam wyłącznie na synchronizacji) *kolejki wiadomości, jeżeli zainteresowani jesteśmy rezultatami obliczonymi przez wątki Wypróbujemy rozwiązanie z [[https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/BlockingQueue.html|kolejką]]. Kolejka ma dwie interesujące nas metody: * ''put()'' - dodaje element do kolejki, blokuje wątek jeśli jest pełna * ''take()'' - zwraca element z kolejki, blokuje wątek, jeśli jest pusta === Deklaracja === Zadeklaruj w klasie ''Mean'' static BlockingQueue results = new ArrayBlockingQueue(100); === Wywołanie put() === Umieść w klasie ''MeanCalc'' -- po wyznaczeniu częściowej średniej prześlij ją do kolejki. === Wywołanie take() === Umieść w parallelMean() po uruchomieniu wątków. *W pętli odczytaj tyle wartości z kolejki, ile uruchomiłeś wątków. *Wyznacz średnią z tych wartości ==== parallelMean() v.3 ==== Zamiast tworzyć nowy wątek dla każdego zadania skorzystaj z puli wątków zarządzanych przez ExecutorService. Przykład 100 zadań wykonywanych przez 16 wątków należących do puli. class RunnableOrThread extends Thread{ int i; RunnableOrThread(int i){ this.i=i; } public void run(){ System.out.println("My number is "+i); } } ExecutorService executor = Executors.newFixedThreadPool(16); for(int i=0;i<100;i++){ executor.execute(new RunnableOrThread(i)); } executor.shutdown(); Niepoprawne dla lambdy (błąd kompilacji): for(int i=0;i<100;i++){ // executor.execute(new RunnableOrThread(i)); executor.execute(()->{ System.out.println("My number is "+i); }); } Przyczyna: java: local variables referenced from a lambda expression must be final or effectively final ==== Dla ilu wątków czas przetwarzania będzie najmniejszy? ==== public static void main(String[] args) { initArray(128000000); for(int cnt:new int[]{1,2,4,8,16,32,64,128}){ parallelMean(cnt); } } Sprawdź, czy dla każdej wartości ''cnt'' uzyskiwana jest ta sama średnia... ===== AsyncMean ===== Innym rozwiązaniem jest wykorzystanie obliczeń asynchronicznych. *Jeżeli wywoływana jest funkcja synchroniczna, np. ''int foo()'' wołający czeka, aż nastąpi powrót z funkcji. *Dla wywołań asynchronicznych (dostępnych, jeżeli wspiera je język lub biblioteka) funkcja ''foo'' zwraca ''Future'' -- czyli obiekt, który w przyszłości po zakończeniu obliczeń otrzyma tę wartość. *Wątek wołający nie jest jednak zablokowany. *Wątek wołający można w pewnym momencie zablokować w oczekiwaniu na wartość *Zamiast oczekiwania, można dostarczyć funkcję, która zostanie wywołana po pojawieniu się oczekiwanej wartości Napisz public class AsyncMean { static double[] array; static void initArray(int size){ array = new double[size]; for(int i=0;i Napisz klasę ''MeanCalcSupplier'', która będzie obliczała średnią dla części tablicy pomiędzy start i end i zwracała jej wartość po wywołaniu ''get()'' static class MeanCalcSupplier implements Supplier { //... MeanCalcSupplier(int start, int end){ } @Override public Double get() { double mean=0; // oblicz średnią System.out.printf(Locale.US,"%d-%d mean=%f\n",start,end,mean); return mean; } } ==== asyncMean() v.1==== Uzupełnij i zmierz czasy wykonania public static void asyncMeanv1() { int size = 100_000_000; initArray(size); ExecutorService executor = Executors.newFixedThreadPool(16); int n=...; // Utwórz listę future List> partialResults = new ArrayList<>(); for(int i=0;i partialMean = CompletableFuture.supplyAsync( new MeanCalcSupplier(???,???),executor); partialResults.add(partialMean); } // zagreguj wyniki double mean=0; for(var pr:partialResults){ // wywołaj pr.join() aby odczytać wartość future; // join() zawiesza wątek wołający } System.out.printf(Locale.US,"mean=%f\n",mean); executor.shutdown(); } ==== asyncMean() v.2==== W tej wersji nię będziemy zapamiętywali future, ale skonfigurujemy je tak, aby wywołać funkcję po wyznaczeniu wartości. *Utworzymy kolejkę na wyniki *W funkcji lambda przekazanej w ''.thenApply()'' dodajemy wartości do kolejki za pomocą offer static void asyncMeanv2() { int size = 100_000_000; initArray(size); ExecutorService executor = Executors.newFixedThreadPool(16); int n=10; BlockingQueue queue = new ArrayBlockingQueue<>(n); for (int i = 0; i < n; i++) { CompletableFuture.supplyAsync( new MeanCalcSupplier(???,???), executor) .thenApply(d -> queue.offer(d)); } double mean=0; // w pętli obejmującej n iteracji wywołaj queue.take() i oblicz średnią System.out.printf(Locale.US,"mean=%f\n", mean); executor.shutdown(); } W tej wersji watek funkcji ''main'' też jest blokowany, ale na operacji ''take'' odczytu z kolejki, a nie ''join'' odczytu future. **Oblicz czasy wyznaczania wartości średniej**