Wątki - obliczenia równoległe

Zrealizujemy przykład, w którym obliczana będzie średnia wartości zgromadzonych w tablicy.

Mean

Zadeklaruj klasę Mean, której statycznym atrybutem będzie tablica double[] wypełniona losowymi elementami. W testowanych przykładach ta tablica będzie długa, np. będzie miała 100 mln. elementów.

public class Mean {
    static double[] array;
    static void initArray(int size){
        array = new double[size];
        for(int i=0;i<size;i++){
            array[i]= Math.random()*size/(i+1);
        }
    }
 
    public static void main(String[] args) {
        initArray(100000000);
    }
 
}

Wątek MeanCalc

Wątek MeanCalc otrzyma w konstruktorze dwa indeksy: start i end i obliczy średnią dla elementów tablicy array pomiędzy start(włącznie) i end (wyłącznie). Zakładając, że tablicę podzielimy na bloki tej samej długości i każdy z nich będzie obsłużony przez jeden wątek, będzie mozna następnie obliczyć średnią z wartości zwróconych przez poszczególne wątki. Zadeklaruj MeanCalc jako klasę zagnieżdżoną wewnątrz Mean

    static class MeanCalc extends Thread{
        private final int start;
        private final int end;
        double mean = 0;
 
        MeanCalc(int start, int end){
            this.start = start;
            this.end=end;
        }
        public void run(){
            // ??? liczymy średnią
            System.out.printf(Locale.US,"%d-%d mean=%f\n",start,end,mean);
        }
    }

Uruchamianie wątków i liczenie średnich -- parallelMean() v.1

Napisz funkcję parallelMean()

    /**
     * Oblicza średnią wartości elementów tablicy array uruchamiając równolegle działające wątki. 
     * Wypisuje czasy operacji
     * @param cnt - liczba wątków
     */
    static void parallelMean(int cnt){
        // utwórz tablicę wątków
        MeanCalc threads[]=new MeanCalc[cnt];
        // utwórz wątki, podziel tablice na równe bloki i przekaż indeksy do wątków
        // załóż, że array.length dzieli się przez cnt)
        double t1 = System.nanoTime()/1e6;
        //uruchom wątki
        double t2 = System.nanoTime()/1e6;
        // czekaj na ich zakończenie używając metody ''join''
        for(MeanCalc mc:threads) {
                mc.join();
        }
        // oblicz średnią ze średnich
        double t3 = System.nanoTime()/1e6;
        System.out.printf(Locale.US,"size = %d cnt=%d >  t2-t1=%f t3-t1=%f mean=%f\n",
            array.length, 
            cnt,
            t2-t1,
            t3-t1,
            mean);
    }

Wywołaj w main() funkcje initArray() i parallelMean() z różnymi parametrami i porównaj czasy.

parallelMean() v.2

Synchronizacja za pomocą join() nie jest szczególnie wygodna. Bardziej naturalne jest użycie

  • semafora (jezeli zalezy nam wyłącznie na synchronizacji)
  • kolejki wiadomości, jeżeli zainteresowani jesteśmy rezultatami obliczonymi przez wątki

Wypróbujemy rozwiązanie z kolejką. Kolejka ma dwie interesujące nas metody:

  • put() - dodaje element do kolejki, blokuje wątek jeśli jest pełna
  • take() - zwraca element z kolejki, blokuje wątek, jeśli jest pusta

Deklaracja

Zadeklaruj w klasie Mean

    static BlockingQueue<Double> results = new ArrayBlockingQueue<Double>(100);

Wywołanie put()

Umieść w klasie MeanCalc – po wyznaczeniu częściowej średniej prześlij ją do kolejki.

Wywołanie take()

Umieść w parallelMean() po uruchomieniu wątków.

  • W pętli odczytaj tyle wartości z kolejki, ile uruchomiłeś wątków.
  • Wyznacz średnią z tych wartości

parallelMean() v.3

Zamiast tworzyć nowy wątek dla każdego zadania skorzystaj z puli wątków zarządzanych przez ExecutorService.

Przykład 100 zadań wykonywanych przez 16 wątków należących do puli.

        class RunnableOrThread extends Thread{
            int i;
            RunnableOrThread(int i){
                this.i=i;
            }
            public void run(){
                System.out.println("My number is "+i);
            }
        }
        ExecutorService executor = Executors.newFixedThreadPool(16);
        for(int i=0;i<100;i++){
            executor.execute(new RunnableOrThread(i));
        }
        executor.shutdown();

Niepoprawne dla lambdy (błąd kompilacji):

       for(int i=0;i<100;i++){
//            executor.execute(new RunnableOrThread(i));
            executor.execute(()->{
                System.out.println("My number is "+i);
            });
        }

Przyczyna:

    java: local variables referenced from a lambda expression must be final or effectively final

Dla ilu wątków czas przetwarzania będzie najmniejszy?

    public static void main(String[] args) {
        initArray(128000000);
        for(int cnt:new int[]{1,2,4,8,16,32,64,128}){
            parallelMean(cnt);
        }
    }

Sprawdź, czy dla każdej wartości cnt uzyskiwana jest ta sama średnia…

AsyncMean

Innym rozwiązaniem jest wykorzystanie obliczeń asynchronicznych.

  • Jeżeli wywoływana jest funkcja synchroniczna, np. int foo() wołający czeka, aż nastąpi powrót z funkcji.
  • Dla wywołań asynchronicznych (dostępnych, jeżeli wspiera je język lub biblioteka) funkcja foo zwraca Future<int> – czyli obiekt, który w przyszłości po zakończeniu obliczeń otrzyma tę wartość.
  • Wątek wołający nie jest jednak zablokowany.
    • Wątek wołający można w pewnym momencie zablokować w oczekiwaniu na wartość
    • Zamiast oczekiwania, można dostarczyć funkcję, która zostanie wywołana po pojawieniu się oczekiwanej wartości

Napisz

public class AsyncMean {
    static double[] array;
    static void initArray(int size){
        array = new double[size];
        for(int i=0;i<size;i++){
            array[i]= Math.random()*size/(i+1);
        }
    }

Napisz klasę MeanCalcSupplier, która będzie obliczała średnią dla części tablicy pomiędzy start i end i zwracała jej wartość po wywołaniu get()

 static class MeanCalcSupplier implements Supplier<Double> {
        //...
 
        MeanCalcSupplier(int start, int end){
 
        }
 
        @Override
        public Double get() {
            double mean=0;
            // oblicz średnią
            System.out.printf(Locale.US,"%d-%d mean=%f\n",start,end,mean);
            return mean;
        }
    }

asyncMean() v.1

Uzupełnij i zmierz czasy wykonania

    public static void asyncMeanv1() {
        int size = 100_000_000;
        initArray(size);
        ExecutorService executor = Executors.newFixedThreadPool(16);
        int n=...;
        // Utwórz listę future
        List<CompletableFuture<Double>> partialResults = new ArrayList<>();
        for(int i=0;i<n;i++){
            CompletableFuture<Double> partialMean = CompletableFuture.supplyAsync(
                    new MeanCalcSupplier(???,???),executor);
            partialResults.add(partialMean);
        }
        // zagreguj wyniki 
        double mean=0;
        for(var pr:partialResults){
            // wywołaj pr.join() aby odczytać wartość future;
            // join() zawiesza wątek wołający 
        }
        System.out.printf(Locale.US,"mean=%f\n",mean);
 
        executor.shutdown();
    }

asyncMean() v.2

W tej wersji nię będziemy zapamiętywali future, ale skonfigurujemy je tak, aby wywołać funkcję po wyznaczeniu wartości.

  • Utworzymy kolejkę na wyniki
  • W funkcji lambda przekazanej w .thenApply() dodajemy wartości do kolejki za pomocą offer
    static void asyncMeanv2() {
        int size = 100_000_000;
        initArray(size);
        ExecutorService executor = Executors.newFixedThreadPool(16);
        int n=10;
 
        BlockingQueue<Double> queue = new ArrayBlockingQueue<>(n);
 
        for (int i = 0; i < n; i++) {
            CompletableFuture.supplyAsync(
                    new MeanCalcSupplier(???,???), executor)
            .thenApply(d -> queue.offer(d));
        }
 
        double mean=0;
        // w pętli obejmującej n iteracji wywołaj queue.take() i oblicz średnią 
 
        System.out.printf(Locale.US,"mean=%f\n", mean);
 
        executor.shutdown();
    }

W tej wersji watek funkcji main też jest blokowany, ale na operacji take odczytu z kolejki, a nie join odczytu future.

Oblicz czasy wyznaczania wartości średniej

pz1/watki-obliczenia-rownolegle.txt · Last modified: 2023/12/18 18:42 by pszwed
CC Attribution-Share Alike 4.0 International
Driven by DokuWiki Recent changes RSS feed Valid CSS Valid XHTML 1.0