Laboratorium 5

1. Konfiguracja Dockera

Naszym celem jest zbudowanie konfiguracji jak na rysunku: cztery kontenery połączone wspólną siecią

  • master - obraz spark-python
  • worker-1 - obraz spark-python
  • worker-2 - obraz spark-python
  • driver (tu akurat affectionate_liskov) - obraz spark-jupyter

Załóżmy, że pliki są rozmieszczone w katalogach, jak na poniższym diagramie

|-spark-python--+
|               | Dockerfile
|
|-spark-jupyter +
|               | Dockerfile
|
| -cluster------+
|               | docker-compose.yml
                |
                | data ---------+
                                | owid-energy-data.csv.gz

Pobierz pliki konfiguracyjne i zbiór danych: spark-jdk17.zip. Następnie rozpakuj w dowolnym katalogu roboczym.

Utwórz sieć spark-network

docker network create -d bridge --subnet=172.22.0.0/16 --gateway=172.22.0.1 spark-network

1.1. Zbuduj obraz spark-python

Przejdź do katalogu spark-python. Powinien w nim znajdować się plik Dockerfile z następującą zawartością:

# image: spark-python

FROM spark:3.5.0-scala2.12-java17-ubuntu

USER root

RUN set -ex; \
    apt-get update; \
    apt-get install -y sudo python3 python3-pip; \
    pip3 cache purge; \
    pip3 install pyspark; \
    rm -rf /var/lib/apt/lists/*
# RUN echo "spark ALL=(ALL) NOPASSWD: ALL" >> /etc/sudoers
ENV JAVA_HOME /opt/java/openjdk
RUN export JAVA_HOME

USER spark

Wydaj komendę:

docker build -f Dockerfile --tag spark-python .

1.2 Zbuduj obraz spark-jupyter

Przejdź do katalogu spark-jupyter. Powinien w nim znajdować się plik Dockerfile z następującą zawartością:

# image: spark-jupyter

FROM spark:3.5.0-scala2.12-java17-ubuntu

USER root

RUN set -ex; \
    apt-get update; \
    apt-get install -y sudo python3 python3-pip; \
    pip3 cache purge; \
    pip3 install pyspark numpy scikit-learn pandas matplotlib plotly jupyterlab; \
    apt-get install -y pandoc; \
    apt-get install -y texlive-xetex texlive-fonts-recommended texlive-plain-generic; \
    apt-get install -y texlive-latex-extra texlive-fonts-extra texlive-lang-all; \
    rm -rf /var/lib/apt/lists/*


RUN echo "spark ALL=(ALL) NOPASSWD: ALL" >> /etc/sudoers
ENV JAVA_HOME /opt/java/openjdk
RUN export JAVA_HOME

USER spark

Wydaj komendę:

docker build -f Dockerfile --tag spark-jupyter .

Obraz zawiera zbiór bibliotek ogólnego zastosowania języka Python do eksploracji danych, wizualizacji i uczenie maszynowego:

  • bibliotekę pyspark
  • typowe biblioteki uczenia maszynowego (ale bez tensorflow)
  • jupyter-lab (wsparcie dla notatników)
  • poandoc i instalację latexa (po załadowaniu notatnika możliwy jest wydruk w formacjie PDF z zawijaniem długich linii

Jak uruchomić - opis w sekcji 3.1

1.3 Uruchomienie klastra

W katalogu cluster powinien znaleźć się plik docker-compose.yml z następującą zawartością

services:
 spark-master:
  image: spark-python
  command: /opt/spark/bin/spark-class org.apache.spark.deploy.master.Master
  networks:
    spark-network:
      ipv4_address: 172.22.0.2
#  volumes:
#  - ".:/opt/spark/work-dir"
  ports:
  - "9090:8080"
  - "7077:7077"
  #- "4040:4040"
  #environment:
   #SPARK_DAEMON_MEMORY: 4g


 spark-worker-1:
  image: spark-python                                     

  command: /opt/spark/bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077
  depends_on:
  - spark-master
  networks:
    spark-network:
  ports:
  - "9091:8081"
#      ipv4_address: 172.22.0.3
  volumes:
  - ".:/opt/spark/work-dir"
  environment:
   SPARK_MODE: worker
   SPARK_WORKER_CORES: 4
   #SPARK_WORKER_MEMORY: 4g
   SPARK_MASTER_URL: spark://spark-master:7077
 
 spark-worker-2:
  image: spark-python
  command: /opt/spark/bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077
  depends_on:
  - spark-master
  networks:
    spark-network:
#      ipv4_address: 172.22.0.4
  ports:
  - "9092:8081"

  volumes:
  - ".:/opt/spark/work-dir"
  environment:
   SPARK_MODE: worker
   SPARK_WORKER_CORES: 4
   #SPARK_WORKER_MEMORY: 4g
   SPARK_MASTER_URL: spark://spark-master:7077

networks:
  spark-network:
   external: true
   ipam:
     config:
     - subnet: 172.22.0.0/16
  • Klaster zawiera trzy węzły: master i dwa węzły obliczeniowe worker połączone wspólną siecią spark-network
  • Dla węzłów worker zamontowany jest lokalny katalog
  • Udostępnione są porty, za pomocą których można monitorować stan węzłów (WebUI)

Przejdź do katalogu cluster i wydaj komendę:

docker-compose up

W tym momencie węzły obliczeniowe powinny uruchomić się. Po kilkudziesięciu sekundach dostępne będą ich interfejsy webowe:

1.4 Uruchomienie węzła spark-jupyter

Kontener będzie uruchamiany za pomocą polecenia:

docker run -it --rm -v ".:/opt/spark/work-dir"  -p 4040:4040  -p 8888:8888 --network spark-network spark-jupyter /bin/bash
  • Kontener jest dołączony do sieci spark-network. Dzięki temu będzie mógł komunikować się z węzłami obliczeniowymi
  • Mapowanie portów 4040:4040 pozwoli na dostęp do WebUI
  • Porty 8888:8888 zostaną wykorzystane przez jupyter-lab
  • Montowany jest bieżący katalog
  • W momencie uruchomienia wołany jest shell

2 Java

2.1 PiComputeApp

1. Dodaj kod klasy do projektu i wprowadź odpowiednie wywołanie w funkcji Main.main()

public class PiComputeApp implements Serializable {
  private static long counter = 0;
 
  public static void main(String[] args) {
    int slices=100;
    int n = 1000_000 * slices;
    System.out.printf("Table of %d elements will be divided into %d partitions.\n" +
            "Each partition will be processed as separate task\n",n,slices);
 
    long t0 = System.currentTimeMillis();
    SparkSession spark = SparkSession
            .builder()
            .appName("Spark Pi")
        .master("local[*]")
            .getOrCreate();
    SparkContext sparkContext = spark.sparkContext();
    JavaSparkContext ctx = new JavaSparkContext(sparkContext);
 
    long t1 = System.currentTimeMillis();
    System.out.println("Session initialized in " + (t1 - t0) + " ms");
 
    List<Integer> l = new ArrayList<>(n);
    for (int i = 0; i < n; i++) {
      l.add(i);
    }
 
    JavaRDD<Integer> dataSet = ctx.parallelize(l, slices);
    long t2 = System.currentTimeMillis();
    System.out.println("Initial dataframe built in " + (t2 - t1) + " ms");
 
    final int count = dataSet.map(integer -> {
      double x = Math.random() * 2 - 1;
      double y = Math.random() * 2 - 1;
      return (x * x + y * y < 1) ? 1 : 0;
    }).reduce((a, b) -> a + b);
 
    long t3 = System.currentTimeMillis();
    System.out.println("Map-reduce time " + (t3 - t2) + " ms");
    System.out.println("Pi is roughly " + 4.0 * count / n);
    spark.stop();
 
  }
 
}

Aplikacja tworzy listę 100_000_000 liczb całkowitych i rozdziela je na 100 partycji. Partycje te są przetwarzane równolegle przez lokalnie działające wątki. Podczas operacji map losowane są dwie liczby x oraz y i zwracany wynik

  • 1 - jeżeli (x,y) mieści się w ćwiartce okręgu
  • 0 w przeciwnym przypadku.

Po zsumowaniu i podzieleniu przez liczbę punktów otrzymamy przybliżenie liczby $pi$.

2. Dodaj artefakt. Na przykład będzie miał nazwę pi_compute.jar

W InteliJ:

  • Wybierz:Project Structure→Artifacts→JAR from module with dependencies
  • Nie kopiuj bibliotek do pliku wyjściowego *.jar. W zakładce Output Layout zaznacz wszystkie biblioteki Mavena i usuń je zostawiając sam artefakt. W ten sposób unikniesz przesyłania do serwera Fat JAR/Uber JAR
  • Manipulując opcjami można ustawić katalog wyjściowy i nazwę pliku *.jar.
  • Zbuduj artefakt

3. W tym przypadku możesz przejść do katalogu wyjściowego, gdzie pojawił się artefakt i uruchomić kontener spark-jupyter

docker run -it --rm -v ".:/opt/spark/work-dir"  -p 4040:4040  -p 8888:8888 --network spark-network spark-jupyter /bin/bash

Po uruchomieniu kontenera pojawi się znak zachęty bash. Wydaj komendę ls. Artefakt powinien być widoczny

3. Aplikacja zostanie uruchomiona poprzez przesłanie artefaktu do środowiska spark, za pomocą polecenia spark-submit (wewnątrz bash na dockerze):

/opt/spark/bin/spark-submit --driver-memory 4g --executor-memory 4g --class org.example.Main --master spark://172.22.0.2:7077 pi_compute.jar

4. W tym momencie rozpocznie się wykonywanie aplikacji. Wyznaczona wartość będzie wypisana na konsoli. Możliwe jest także monitorowanie przebiegu aplikacji pod adresem http://localhost:4040

  • Job (zadanie) - wysokopoziomowe obliczenie uruchomione w wyniku takiej akcji, jak map() lub reduce(). Podczas wykonania akcji Spark uruchamia job.
  • Stage (etap) - jest kolekcją podzadań, które mogą być wykonywane równolegle, wszystkie przeprowadzają to samo obliczenie, ale na innej partycji danych. Czyli etap stage reprezentuje krok w planie wykonania zadania. Etapy są tworzone na podstawie transformacji danych, a zwłaszcza zależnościa związanymi z przesyłaniem danych pomiędzy węzłami obliczeniowymi.
  • Executor (wykonawca) - w tym przypadku jednym wykonawcą jest driver.

5. Umieść w spawozdaniu fragmenty zrzutów ekranu pokazujące działanie wykonawcy Sparka (konsoli i DriverUI)

2.2 PiComputeAppDistributed

1. Wprowadź minimalną zmianę w kodzie aplikacji. Różnicą jest wskazanie adresu węzła Master.

    SparkSession spark = SparkSession
            .builder()
            .appName("Spark Pi Distributed")
            .master("spark://172.22.0.2:7077")
            .getOrCreate();

2. Zadbaj, aby kod był wywoływany z Main.main(), zbuduj artefakt

3. Prześlij do wykonania z a pomocą spark-submit (w bash w konsoli kontenera spark-jupyter)

4. Zamieść w spawozdaniu zrzuty WebUI węzłów Master i Worker(ów) wskazujących, że aplikacja była przez nie wykonywana. Możesz też dodać wykres Gantta z wykonywanych zadań (z DriverUI - port 4040, opcja timeline).

2.3 LoadDistributedDataset

Załadujemy zbiór danych do Sparka. Musi być on dostępny dla węzłów

  • Driver
  • Worker

W przemysłowym zastosowaniu byłby widoczny np. na rozproszonym systemie plików. W naszym przypadku skorzystamy z montowanych wolumenów Dockera (czyli będzie widoczny lokalnie).

Kontener wykonujący program węzła driver (spark-jupyter) będzie uruchamiany z katalogu cluster (zawierającym podkatalog data).

1. Dodaj kod klasy dla nowej aplikacji oraz wywołaj ją w Main.main()

public class LoadDistributedDataset {
 
        public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
                .appName("LoadDatasetDistributed")
                .master("spark://172.22.0.2:7077")
                .getOrCreate();
 
        Dataset<Row> df = spark.read().format("csv")
                .option("header", "true")
                .option("inferschema","true")
                .load("data/owid-energy-data.csv.gz");
 
        df.show(5); // To raczej zakomentuj...
        df=df.select("country","year","population","electricity_demand").where("country like \'Po%\' AND year >= 2000");
        df.show(5);
        df.printSchema();
 
 
    }
}

2. Skonfiguruj i zbuduj nowy artefakt (np. distributed_dataset.jar)

3. Przekopiuj ręcznie artefakt do katalogu cluster

4. Przejdź do katalogu cluster i uruchom tam kontener

docker run -it --rm -v ".:/opt/spark/work-dir"  -p 4040:4040  -p 8888:8888 --network spark-network spark-jupyter /bin/bash 

Następnie sprawdź, czy artefakt i podkatalog data są widoczne

5. Wykonaj spark-submit, aby rozpocząć wykonywanie aplikacji

/opt/spark/bin/spark-submit --driver-memory 4g --executor-memory 4g --class org.example.Main --master spark://172.22.0.2:7077 distributed-dataset.jar

6. Oczekiwany efekt:

  • widoczna uruchomiona lub wykonana aplikacja w WebUI węzłów Master i Worker
  • Wydrukowana ramka na konsoli (węzła driver w Dockerze)
+-------+----+----------+------------------+
|country|year|population|electricity_demand|
+-------+----+----------+------------------+
| Poland|2000|  38504432|            136.81|
| Poland|2001|  38662860|            136.99|
| Poland|2002|  38647476|            135.42|
| Poland|2003|  38621536|            139.85|
| Poland|2004|  38596040|            142.97|
+-------+----+----------+------------------+
only showing top 5 rows

root
 |-- country: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- population: long (nullable = true)
 |-- electricity_demand: double (nullable = true)

3 PySpark + Jupyter

Spróbujemy powtórzyć te kroki w środowisku Jupyter korzystając z biblioteki PySpark.

PySpark zapewnia interfejs w języku Python do platformy Spark. Aby go użyć - biblioteka PySpark musi być zainstalowana na wszystkich węzałach (co było uwzględnione przy budowie obrazów). Wewnętrznie korzysta z biblioteki Py4J pozwalającej na dostęp do obiektów Java (w maszynie wirtualnej JVM) z poziomu kodu w języku Python.

3.1 Uruchomienie jupyter-lab

1. Uruchom obraz spark-jupyter w katalogu cluster

docker run -it --rm -v ".:/opt/spark/work-dir"  -p 4040:4040  -p 8888:8888 --network spark-network spark-jupyter /bin/bash

2. Wydaj polecenie

sudo --preserve-env jupyter-lab  --ip=0.0.0.0 --allow-root --IdentityProvider.token='pyspark'

3. W przeglądarce otwórz link http://localhost:8888/?token=pyspark (albo bez parametru token i wpisz pyspark w formularzu)

Obraz spark-jupyter zawiera podstawowe biblioteki do realizacji zadań uczenia maszynowego. Można go wykorzystać na innych zajęciach.

3.2 Wyznaczanie liczby PI

1. Utwórz notatnik PiCompute i wprowadź w nim kod aplikacji (komórki) do wyznaczania liczby $\pi$

import pyspark
from random import random
from operator import add
 
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PiCompute").master("spark://172.22.0.2:7077").getOrCreate()

Tu otwórz WebUI modułu driver na localhost

partitions = 100
n = 1_000_000 * partitions
 
def f(_: int) -> float:
    x = random() * 2 - 1
    y = random() * 2 - 1
    return 1 if x ** 2 + y ** 2 <= 1 else 0
 
count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
print("Pi is roughly %f" % (4.0 * count / n))
spark.stop()

2. Możesz uruchamiać interaktywnie kolejne komórki i monitorować działanie aplikacji w WebUI. Do momentu jawnego zakończenia sesji, będzie ona aktywna.

3.3 LoadEnergyDataset

1. Utwórz notatnik i wprowadź w nim kod:

import pyspark
from random import random
from operator import add
 
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("LoadEnergyDataset").master("spark://172.22.0.2:7077").getOrCreate()

2. załaduj dane

df = spark.read.format("csv").option("header", "true").option("inferschema","true").load("data/owid-energy-data.csv.gz")

Możesz spróbować je wyświetlić

df.show(5,70)

… ale raczej skasuj wyjście komórki

3. Wybierzemy dane do wyświetlenia

df2 = df.select("country","year","population","electricity_demand").where("country like \'Po%\' AND year >= 2000")

I wyświetlimy je:

df2.show(5)
df2.printSchema()

Zauważ, że ze względu na lazy evaluation dopiero ta komórka powoduje aktywność Sparka

3. Wybierzmy podzbiór danych z Polski z interesującymi nas niepustymi wartościami

df_pl = df.select('year','population','electricity_demand').where(("country == 'Poland' AND electricity_demand IS NOT NULL"))
df_pl.show()

Wykresy

Przekonwertujemy dane do postaci list

import matplotlib.pyplot as plt
df_pl = df_pl.orderBy('year')
y = df_pl.select('year').rdd.flatMap(lambda x: x).collect()
pop = df_pl.select('population').rdd.flatMap(lambda x: x).collect()
dem = df_pl.select('electricity_demand').rdd.flatMap(lambda x: x).collect()

oraz wyświetlimy je…

plt.plot(y,pop)
plt.xlabel('year')
plt.ylabel('population')
plt.title('Population vs. year')
plt.show()
plt.plot(y,dem,label='demand')
plt.xlabel('year')
plt.ylabel('demand')
plt.title('Demand vs. year')
plt.show()
plt.scatter(pop,dem)
plt.xlabel('population')
plt.ylabel('demand')
plt.title('Demand vs. population')
plt.show()

Regresja liniowa

Dla których zależności można zastosować regresję?

1. Dodaj kod:

from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
 
va=VectorAssembler().setInputCols(["year"]).setOutputCol("features")
df_plf = va.transform(df_pl)
df_plf.show(5)
 
lr = LinearRegression()\
  .setMaxIter(10)\
  .setRegParam(3.0)\
  .setElasticNetParam(0.5)\
  .setFeaturesCol("features")\
  .setLabelCol("electricity_demand")
model = lr.fit(df_plf)

2. Wyświetl metryki oraz równanie regresji

print(f'RMSE: {model.summary.rootMeanSquaredError}')
print(f'r2: {model.summary.r2}')
print(f'iterations: {model.summary.totalIterations}')
print(f'demand = {model.coefficients}*year {"+" if model.intercept > 0 else ""} {model.intercept}')

3. Wyświetl wykresy

import numpy as np
# from pyspark.sql.types import StructType, StructField, DoubleType
from pyspark.ml.linalg import Vectors
 
xmin = np.min(y)
xmax = np.max(y)
xx = np.linspace(xmin-1,xmax+1,100)
 
yy = [model.predict(Vectors.dense([x])) for x in xx]
 
plt.scatter(y,dem,label='demand')
plt.plot(xx,yy,label='fitted function',c='orange')
plt.legend()
plt.title('Electric energy demand in years')
plt.show()

4. Zakończ sesję

Przed zakończeniem sesji obejrzyj DriverUI (localhost:4040) a zwłaszcza diagramy pokazujące konstrukcję i statystyki kwerend w zakładce SQL / Dataframe – otwórz linki do kwerend.

spark.stop()

3.4 TODO - regresja dla Chin

Przeprowadź taką samą analizę dla Chin, ale spróbuj znaleźć zależność pomiędzy liczbą ludności, a zapotrzebowaniem na energię (która w przypadku Chin jest raczej widoczna).

Zapisz notatnik jako PDF i prześlij razem ze sprawozdaniem

ed/lab_05.txt · Last modified: 2024/03/13 03:24 by pszwed
CC Attribution-Share Alike 4.0 International
Driven by DokuWiki Recent changes RSS feed Valid CSS Valid XHTML 1.0