Zrealizujemy przykład, w którym obliczana będzie średnia wartości zgromadzonych w tablicy.
Zadeklaruj klasę Mean
, której statycznym atrybutem będzie tablica double[]
wypełniona losowymi elementami. W testowanych przykładach ta tablica będzie długa, np. będzie miała 100 mln. elementów.
public class Mean { static double[] array; static void initArray(int size){ array = new double[size]; for(int i=0;i<size;i++){ array[i]= Math.random()*size/(i+1); } } public static void main(String[] args) { initArray(100000000); } }
Wątek MeanCalc
otrzyma w konstruktorze dwa indeksy: start
i end
i obliczy średnią dla elementów tablicy array
pomiędzy start
(włącznie) i end
(wyłącznie). Zakładając, że tablicę podzielimy na bloki tej samej długości i każdy z nich będzie obsłużony przez jeden wątek, będzie mozna następnie obliczyć średnią z wartości zwróconych przez poszczególne wątki. Zadeklaruj MeanCalc
jako klasę zagnieżdżoną wewnątrz Mean
static class MeanCalc extends Thread{ private final int start; private final int end; double mean = 0; MeanCalc(int start, int end){ this.start = start; this.end=end; } public void run(){ // ??? liczymy średnią System.out.printf(Locale.US,"%d-%d mean=%f\n",start,end,mean); } }
Napisz funkcję parallelMean()
/** * Oblicza średnią wartości elementów tablicy array uruchamiając równolegle działające wątki. * Wypisuje czasy operacji * @param cnt - liczba wątków */ static void parallelMean(int cnt){ // utwórz tablicę wątków MeanCalc threads[]=new MeanCalc[cnt]; // utwórz wątki, podziel tablice na równe bloki i przekaż indeksy do wątków // załóż, że array.length dzieli się przez cnt) double t1 = System.nanoTime()/1e6; //uruchom wątki double t2 = System.nanoTime()/1e6; // czekaj na ich zakończenie używając metody ''join'' for(MeanCalc mc:threads) { mc.join(); } // oblicz średnią ze średnich double t3 = System.nanoTime()/1e6; System.out.printf(Locale.US,"size = %d cnt=%d > t2-t1=%f t3-t1=%f mean=%f\n", array.length, cnt, t2-t1, t3-t1, mean); }
Wywołaj w main()
funkcje initArray() i parallelMean()
z różnymi parametrami i porównaj czasy.
Synchronizacja za pomocą join()
nie jest szczególnie wygodna. Bardziej naturalne jest użycie
Wypróbujemy rozwiązanie z kolejką. Kolejka ma dwie interesujące nas metody:
put()
- dodaje element do kolejki, blokuje wątek jeśli jest pełnatake()
- zwraca element z kolejki, blokuje wątek, jeśli jest pusta
Zadeklaruj w klasie Mean
static BlockingQueue<Double> results = new ArrayBlockingQueue<Double>(100);
Umieść w klasie MeanCalc
– po wyznaczeniu częściowej średniej prześlij ją do kolejki.
Umieść w parallelMean() po uruchomieniu wątków.
Zamiast tworzyć nowy wątek dla każdego zadania skorzystaj z puli wątków zarządzanych przez ExecutorService.
Przykład 100 zadań wykonywanych przez 16 wątków należących do puli.
class RunnableOrThread extends Thread{ int i; RunnableOrThread(int i){ this.i=i; } public void run(){ System.out.println("My number is "+i); } } ExecutorService executor = Executors.newFixedThreadPool(16); for(int i=0;i<100;i++){ executor.execute(new RunnableOrThread(i)); } executor.shutdown();
Niepoprawne dla lambdy (błąd kompilacji):
for(int i=0;i<100;i++){ // executor.execute(new RunnableOrThread(i)); executor.execute(()->{ System.out.println("My number is "+i); }); }
Przyczyna:
java: local variables referenced from a lambda expression must be final or effectively final
public static void main(String[] args) { initArray(128000000); for(int cnt:new int[]{1,2,4,8,16,32,64,128}){ parallelMean(cnt); } }
Sprawdź, czy dla każdej wartości cnt
uzyskiwana jest ta sama średnia…
Innym rozwiązaniem jest wykorzystanie obliczeń asynchronicznych.
int foo()
wołający czeka, aż nastąpi powrót z funkcji.foo
zwraca Future<int>
– czyli obiekt, który w przyszłości po zakończeniu obliczeń otrzyma tę wartość. Napisz
public class AsyncMean { static double[] array; static void initArray(int size){ array = new double[size]; for(int i=0;i<size;i++){ array[i]= Math.random()*size/(i+1); } }
Napisz klasę MeanCalcSupplier
, która będzie obliczała średnią dla części tablicy pomiędzy start i end i zwracała jej wartość po wywołaniu get()
static class MeanCalcSupplier implements Supplier<Double> { //... MeanCalcSupplier(int start, int end){ } @Override public Double get() { double mean=0; // oblicz średnią System.out.printf(Locale.US,"%d-%d mean=%f\n",start,end,mean); return mean; } }
Uzupełnij i zmierz czasy wykonania
public static void asyncMeanv1() { int size = 100_000_000; initArray(size); ExecutorService executor = Executors.newFixedThreadPool(16); int n=...; // Utwórz listę future List<CompletableFuture<Double>> partialResults = new ArrayList<>(); for(int i=0;i<n;i++){ CompletableFuture<Double> partialMean = CompletableFuture.supplyAsync( new MeanCalcSupplier(???,???),executor); partialResults.add(partialMean); } // zagreguj wyniki double mean=0; for(var pr:partialResults){ // wywołaj pr.join() aby odczytać wartość future; // join() zawiesza wątek wołający } System.out.printf(Locale.US,"mean=%f\n",mean); executor.shutdown(); }
W tej wersji nię będziemy zapamiętywali future, ale skonfigurujemy je tak, aby wywołać funkcję po wyznaczeniu wartości.
.thenApply()
dodajemy wartości do kolejki za pomocą offerstatic void asyncMeanv2() { int size = 100_000_000; initArray(size); ExecutorService executor = Executors.newFixedThreadPool(16); int n=10; BlockingQueue<Double> queue = new ArrayBlockingQueue<>(n); for (int i = 0; i < n; i++) { CompletableFuture.supplyAsync( new MeanCalcSupplier(???,???), executor) .thenApply(d -> queue.offer(d)); } double mean=0; // w pętli obejmującej n iteracji wywołaj queue.take() i oblicz średnią System.out.printf(Locale.US,"mean=%f\n", mean); executor.shutdown(); }
W tej wersji watek funkcji main
też jest blokowany, ale na operacji take
odczytu z kolejki, a nie join
odczytu future.
Oblicz czasy wyznaczania wartości średniej