This is an old revision of the document!


Laboratorium 5

FIXME Strona w trakcie edycji

Konfiguracja Dockera

Naszym celem jest zbudowanie konfiguracji jak na rysunku: cztery kontenery połączone wspólną siecią

  • master - obraz spark-python
  • worker-1 - obraz spark-python
  • worker-2 - obraz spark-python
  • driver (tu akurat affectionate_liskov) - obraz spark-jupyter

Załóżmy, że pliki są rozmieszczone w katalogach, jak na poniższym diagramie

|-spark-python--+
|               | Dockerfile
|
|-spark-jupyter +
|               | Dockerfile
|
| -cluster------+
|               | docker-compose.yml
                |
                | data ---------+
                                | owid-energy-data.csv.gz

Utwórz sieć spark-network

docker network create -d bridge --subnet=172.22.0.0/16 --gateway=172.22.0.1 spark-network

Zbuduj obraz spark-python

Przejdź do katalogu spark-python. Powinien w nim znajdować się plik Dockerfile z następującą zawartością:

# image: spark-python

FROM spark:3.5.0-scala2.12-java17-ubuntu

USER root

RUN set -ex; \
    apt-get update; \
    apt-get install -y sudo python3 python3-pip; \
    pip3 cache purge; \
    pip3 install pyspark; \
    rm -rf /var/lib/apt/lists/*
# RUN echo "spark ALL=(ALL) NOPASSWD: ALL" >> /etc/sudoers
ENV JAVA_HOME /opt/java/openjdk
RUN export JAVA_HOME

USER spark

Wydaj komendę:

docker build -f Dockerfile --tag spark-python .

Zbuduj obraz spark-jupyter

Przejdź do katalogu spark-jupyter. Powinien w nim znajdować się plik Dockerfile z następującą zawartością:

# image: spark-jupyter

FROM spark:3.5.0-scala2.12-java17-ubuntu

USER root

RUN set -ex; \
    apt-get update; \
    apt-get install -y sudo python3 python3-pip; \
    pip3 cache purge; \
    pip3 install pyspark numpy scikit-learn pandas matplotlib plotly jupyterlab; \
    rm -rf /var/lib/apt/lists/*
RUN echo "spark ALL=(ALL) NOPASSWD: ALL" >> /etc/sudoers
ENV JAVA_HOME /opt/java/openjdk
RUN export JAVA_HOME

USER spark

Wydaj komendę:

docker build -f Dockerfile --tag spark-jupyter .

Uruchomienie klastra

W katalogu cluster powinien znaleźć się plik docker-compose.yml z następującą zawartością

services:
 spark-master:
  image: spark-python
  command: /opt/spark/bin/spark-class org.apache.spark.deploy.master.Master
  networks:
    spark-network:
      ipv4_address: 172.22.0.2
#  volumes:
#  - ".:/opt/spark/work-dir"
  ports:
  - "9090:8080"
  - "7077:7077"
  #- "4040:4040"
  #environment:
   #SPARK_DAEMON_MEMORY: 4g


 spark-worker-1:
  image: spark-python                                     

  command: /opt/spark/bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077
  depends_on:
  - spark-master
  networks:
    spark-network:
  ports:
  - "9091:8081"
#      ipv4_address: 172.22.0.3
  volumes:
  - ".:/opt/spark/work-dir"
  environment:
   SPARK_MODE: worker
   SPARK_WORKER_CORES: 4
   #SPARK_WORKER_MEMORY: 4g
   SPARK_MASTER_URL: spark://spark-master:7077
 
 spark-worker-2:
  image: spark-python
  command: /opt/spark/bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077
  depends_on:
  - spark-master
  networks:
    spark-network:
#      ipv4_address: 172.22.0.4
  ports:
  - "9092:8081"

  volumes:
  - ".:/opt/spark/work-dir"
  environment:
   SPARK_MODE: worker
   SPARK_WORKER_CORES: 4
   #SPARK_WORKER_MEMORY: 4g
   SPARK_MASTER_URL: spark://spark-master:7077

networks:
  spark-network:
   external: true
   ipam:
     config:
     - subnet: 172.22.0.0/16
  • Klaster zawiera trzy węzły: master i dwa węzły obliczeniowe worker połączone wspólną siecią spark-network
  • Dla węzłów worker zamontowany jest lokalny katalog
  • Udostępnione są porty, za pomocą których można monitorować stan węzłów (WebUI)

Przejdź do katalogu cluster i wydaj komendę:

docker-compose up

W tym momencie węzły obliczeniowe powinny uruchomić się. Po kilkudziesięciu sekundach dostępne będą ich interfejsy webowe:

Uruchomienie węzła spark-jupyter

Kontener będzie uruchamiany za pomocą polecenia:

docker run -it --rm -v ".:/opt/spark/work-dir"  -p 4040:4040  -p 8888:8888 --network spark-network spark-jupyter /bin/bash
  • Kontener jest dołączony do sieci spark-network. Dzięki temu będzie mógł komunikować się z węzłami obliczeniowymi
  • Mapowanie portów 4040:4040 pozwoli na dostęp do WebUI
  • Porty 8888:8888 zostaną wykorzystane przez jupyter-lab
  • Montowany jest bieżący katalog
  • W momencie uruchomienia wołany jest shell

Java

PiComputeApp

1. Dodaj kod klasy do projektu i wprowadź odpowiednie wywołanie w funkcji Main.main()

public class PiComputeApp implements Serializable {
  private static long counter = 0;
 
  public static void main(String[] args) {
    int slices=100;
    int n = 1000_000 * slices;
    System.out.printf("Table of %d elements will be divided into %d partitions.\n" +
            "Each partition will be processed as separate task\n",n,slices);
 
    long t0 = System.currentTimeMillis();
    SparkSession spark = SparkSession
            .builder()
            .appName("Spark Pi")
        .master("local[*]")
            .getOrCreate();
    SparkContext sparkContext = spark.sparkContext();
    JavaSparkContext ctx = new JavaSparkContext(sparkContext);
 
    long t1 = System.currentTimeMillis();
    System.out.println("Session initialized in " + (t1 - t0) + " ms");
 
    List<Integer> l = new ArrayList<>(n);
    for (int i = 0; i < n; i++) {
      l.add(i);
    }
 
    JavaRDD<Integer> dataSet = ctx.parallelize(l, slices);
    long t2 = System.currentTimeMillis();
    System.out.println("Initial dataframe built in " + (t2 - t1) + " ms");
 
    final int count = dataSet.map(integer -> {
      double x = Math.random() * 2 - 1;
      double y = Math.random() * 2 - 1;
      return (x * x + y * y < 1) ? 1 : 0;
    }).reduce((a, b) -> a + b);
 
    long t3 = System.currentTimeMillis();
    System.out.println("Map-reduce time " + (t3 - t2) + " ms");
    System.out.println("Pi is roughly " + 4.0 * count / n);
    spark.stop();
 
  }
 
}

Aplikacja tworzy listę 100_000_000 liczb całkowitych i rozdziela je na 100 partycji. Partycje te są przetwarzane równolegle przez lokalnie działające wątki. Podczas operacji map losowane są dwie liczby x oraz y i zwracany wynik

  • 1 - jeżeli (x,y) mieści się w ćwiartce okręgu
  • 0 w przeciwnym przypadku.

Po zsumowaniu i podzieleniu przez liczbę punktów otrzymamy przybliżenie liczby $pi$.

2. Dodaj artefakt. Na przykład będzie miał nazwę pi_compute.jar

W InteliJ:

  • Wybierz:Project Structure→Artifacts→JAR from module with dependencies
  • Nie kopiuj bibliotek do pliku wyjściowego *.jar. W zakładce Output Layout zaznacz wszystkie biblioteki Mavena i usuń je zostawiając sam artefakt. W ten sposób unikniesz przesyłania do serwera Fat JAR/Uber JAR
  • Manipulując opcjami można ustawić katalog wyjściowy i nazwę pliku *.jar.
  • Zbuduj artefakt

3. W tym przypadku możesz przejść do katalogu wyjściowego, gdzie pojawił się artefakt i uruchomić kontener spark-jupyter

docker run -it --rm -v ".:/opt/spark/work-dir"  -p 4040:4040  -p 8888:8888 --network spark-network spark-jupyter /bin/bash

Po uruchomieniu kontenera pojawi się znak zachęty bash. Wydaj komendę ls. Artefakt powinien być widoczny

3. Aplikacja zostanie uruchomiona poprzez przesłanie artefaktu do środowiska spark, za pomocą polecenia spark-submit (wewnątrz bash na dockerze):

/opt/spark/bin/spark-submit --driver-memory 4g --executor-memory 4g --class org.example.Main --master spark://172.22.0.2:7077 pi_compute.jar

4. W tym momencie rozpocznie się wykonywanie aplikacji. Wyznaczona wartość będzie wypisana na konsoli. Możliwe jest także monitorowanie przebiegu aplikacji pod adresem http://localhost:4040

  • Job (zadanie) - wysokopoziomowe obliczenie uruchomione w wyniku takiej akcji, jak map() lub reduce(). Podczas wykonania akcji Spark uruchamia job.
  • Stage (etap) - jest kolekcją podzadań, które mogą być wykonywane równolegle, wszystkie przeprowadzają to samo obliczenie, ale na innej partycji danych. Czyli etap stage reprezentuje krok w planie wykonania zadania. Etapy są tworzone na podstawie transformacji danych, a zwłaszcza zależnościa związanymi z przesyłaniem danych pomiędzy węzłami obliczeniowymi.
  • Executor (wykonawca) - w tym przypadku jednym wykonawcą jest driver.
ed/lab_05.1710217832.txt.gz · Last modified: 2024/03/12 05:30 by pszwed
CC Attribution-Share Alike 4.0 International
Driven by DokuWiki Recent changes RSS feed Valid CSS Valid XHTML 1.0