====== Laboratorium 5 ====== ===== 1. Konfiguracja Dockera ===== Naszym celem jest zbudowanie konfiguracji jak na rysunku: cztery kontenery połączone wspólną siecią * master - obraz spark-python * worker-1 - obraz spark-python * worker-2 - obraz spark-python * driver (tu akurat affectionate_liskov) - obraz spark-jupyter {{ :ed:docker-spark-network.png?direct&400 |}} Załóżmy, że pliki są rozmieszczone w katalogach, jak na poniższym diagramie |-spark-python--+ | | Dockerfile | |-spark-jupyter + | | Dockerfile | | -cluster------+ | | docker-compose.yml | | data ---------+ | owid-energy-data.csv.gz **Pobierz pliki konfiguracyjne i zbiór danych: {{ :ed:spark-jdk17.zip |}}. Następnie rozpakuj w dowolnym katalogu roboczym.** ==== Utwórz sieć spark-network ==== docker network create -d bridge --subnet=172.22.0.0/16 --gateway=172.22.0.1 spark-network ==== 1.1. Zbuduj obraz spark-python ==== Przejdź do katalogu ''spark-python''. Powinien w nim znajdować się plik Dockerfile z następującą zawartością: # image: spark-python FROM spark:3.5.0-scala2.12-java17-ubuntu USER root RUN set -ex; \ apt-get update; \ apt-get install -y sudo python3 python3-pip; \ pip3 cache purge; \ pip3 install pyspark; \ rm -rf /var/lib/apt/lists/* # RUN echo "spark ALL=(ALL) NOPASSWD: ALL" >> /etc/sudoers ENV JAVA_HOME /opt/java/openjdk RUN export JAVA_HOME USER spark Wydaj komendę: docker build -f Dockerfile --tag spark-python . ==== 1.2 Zbuduj obraz spark-jupyter ==== Przejdź do katalogu ''spark-jupyter''. Powinien w nim znajdować się plik Dockerfile z następującą zawartością: # image: spark-jupyter FROM spark:3.5.0-scala2.12-java17-ubuntu USER root RUN set -ex; \ apt-get update; \ apt-get install -y sudo python3 python3-pip; \ pip3 cache purge; \ pip3 install pyspark numpy scikit-learn pandas matplotlib plotly jupyterlab; \ apt-get install -y pandoc; \ apt-get install -y texlive-xetex texlive-fonts-recommended texlive-plain-generic; \ apt-get install -y texlive-latex-extra texlive-fonts-extra texlive-lang-all; \ rm -rf /var/lib/apt/lists/* RUN echo "spark ALL=(ALL) NOPASSWD: ALL" >> /etc/sudoers ENV JAVA_HOME /opt/java/openjdk RUN export JAVA_HOME USER spark Wydaj komendę: docker build -f Dockerfile --tag spark-jupyter . Obraz zawiera zbiór bibliotek ogólnego zastosowania języka Python do eksploracji danych, wizualizacji i uczenie maszynowego: * bibliotekę pyspark * typowe biblioteki uczenia maszynowego (ale bez tensorflow) * jupyter-lab (wsparcie dla notatników) * poandoc i instalację latexa (po załadowaniu notatnika możliwy jest wydruk w formacjie PDF z zawijaniem długich linii Jak uruchomić - [[https://home.agh.edu.pl/~pszwed/wiki/doku.php?id=ed:lab_05#uruchomienie_jupyter-lab|opis w sekcji 3.1]] ==== 1.3 Uruchomienie klastra ==== W katalogu cluster powinien znaleźć się plik ''docker-compose.yml'' z następującą zawartością services: spark-master: image: spark-python command: /opt/spark/bin/spark-class org.apache.spark.deploy.master.Master networks: spark-network: ipv4_address: 172.22.0.2 # volumes: # - ".:/opt/spark/work-dir" ports: - "9090:8080" - "7077:7077" #- "4040:4040" #environment: #SPARK_DAEMON_MEMORY: 4g spark-worker-1: image: spark-python command: /opt/spark/bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077 depends_on: - spark-master networks: spark-network: ports: - "9091:8081" # ipv4_address: 172.22.0.3 volumes: - ".:/opt/spark/work-dir" environment: SPARK_MODE: worker SPARK_WORKER_CORES: 4 #SPARK_WORKER_MEMORY: 4g SPARK_MASTER_URL: spark://spark-master:7077 spark-worker-2: image: spark-python command: /opt/spark/bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077 depends_on: - spark-master networks: spark-network: # ipv4_address: 172.22.0.4 ports: - "9092:8081" volumes: - ".:/opt/spark/work-dir" environment: SPARK_MODE: worker SPARK_WORKER_CORES: 4 #SPARK_WORKER_MEMORY: 4g SPARK_MASTER_URL: spark://spark-master:7077 networks: spark-network: external: true ipam: config: - subnet: 172.22.0.0/16 * Klaster zawiera trzy węzły: ''master'' i dwa węzły obliczeniowe ''worker'' połączone wspólną siecią spark-network * Dla węzłów ''worker'' zamontowany jest lokalny katalog * Udostępnione są porty, za pomocą których można monitorować stan węzłów (WebUI) Przejdź do katalogu ''cluster'' i wydaj komendę: docker-compose up W tym momencie węzły obliczeniowe powinny uruchomić się. Po kilkudziesięciu sekundach dostępne będą ich interfejsy webowe: * MasterUI [[http://localhost:9090/]] * WorkerUI (worker-1) [[http://localhost:9091/]] * WorkerUI (worker-1) [[http://localhost:9092/]] ==== 1.4 Uruchomienie węzła spark-jupyter ==== Kontener będzie uruchamiany za pomocą polecenia: docker run -it --rm -v ".:/opt/spark/work-dir" -p 4040:4040 -p 8888:8888 --network spark-network spark-jupyter /bin/bash * Kontener jest dołączony do sieci ''spark-network''. Dzięki temu będzie mógł komunikować się z węzłami obliczeniowymi * Mapowanie portów 4040:4040 pozwoli na dostęp do WebUI * Porty 8888:8888 zostaną wykorzystane przez jupyter-lab * Montowany jest bieżący katalog * W momencie uruchomienia wołany jest shell ===== 2 Java ===== ==== 2.1 PiComputeApp ==== **1.** Dodaj kod klasy do projektu i wprowadź odpowiednie wywołanie w funkcji ''Main.main()'' public class PiComputeApp implements Serializable { private static long counter = 0; public static void main(String[] args) { int slices=100; int n = 1000_000 * slices; System.out.printf("Table of %d elements will be divided into %d partitions.\n" + "Each partition will be processed as separate task\n",n,slices); long t0 = System.currentTimeMillis(); SparkSession spark = SparkSession .builder() .appName("Spark Pi") .master("local[*]") .getOrCreate(); SparkContext sparkContext = spark.sparkContext(); JavaSparkContext ctx = new JavaSparkContext(sparkContext); long t1 = System.currentTimeMillis(); System.out.println("Session initialized in " + (t1 - t0) + " ms"); List l = new ArrayList<>(n); for (int i = 0; i < n; i++) { l.add(i); } JavaRDD dataSet = ctx.parallelize(l, slices); long t2 = System.currentTimeMillis(); System.out.println("Initial dataframe built in " + (t2 - t1) + " ms"); final int count = dataSet.map(integer -> { double x = Math.random() * 2 - 1; double y = Math.random() * 2 - 1; return (x * x + y * y < 1) ? 1 : 0; }).reduce((a, b) -> a + b); long t3 = System.currentTimeMillis(); System.out.println("Map-reduce time " + (t3 - t2) + " ms"); System.out.println("Pi is roughly " + 4.0 * count / n); spark.stop(); } } Aplikacja tworzy listę 100_000_000 liczb całkowitych i rozdziela je na 100 partycji. Partycje te są przetwarzane równolegle przez lokalnie działające wątki. Podczas operacji map losowane są dwie liczby x oraz y i zwracany wynik * 1 - jeżeli (x,y) mieści się w ćwiartce okręgu * 0 w przeciwnym przypadku. Po zsumowaniu i podzieleniu przez liczbę punktów otrzymamy przybliżenie liczby $pi$. **2.** Dodaj artefakt. Na przykład będzie miał nazwę pi_compute.jar W InteliJ: * Wybierz://Project Structure->Artifacts->JAR from module with dependencies// * Nie kopiuj bibliotek do pliku wyjściowego ''*.jar''. W zakładce //Output Layout// zaznacz wszystkie biblioteki Mavena i usuń je zostawiając sam artefakt. W ten sposób unikniesz przesyłania do serwera Fat JAR/Uber JAR * Manipulując opcjami można ustawić katalog wyjściowy i nazwę pliku *.jar. * Zbuduj artefakt **3.** W tym przypadku możesz przejść do katalogu wyjściowego, gdzie pojawił się artefakt i uruchomić kontener ''spark-jupyter'' docker run -it --rm -v ".:/opt/spark/work-dir" -p 4040:4040 -p 8888:8888 --network spark-network spark-jupyter /bin/bash Po uruchomieniu kontenera pojawi się znak zachęty ''bash''. Wydaj komendę ''ls''. Artefakt powinien być widoczny **3.** Aplikacja zostanie uruchomiona poprzez przesłanie artefaktu do środowiska spark, za pomocą polecenia ''spark-submit'' (wewnątrz bash na dockerze): /opt/spark/bin/spark-submit --driver-memory 4g --executor-memory 4g --class org.example.Main --master spark://172.22.0.2:7077 pi_compute.jar **4.** W tym momencie rozpocznie się wykonywanie aplikacji. Wyznaczona wartość będzie wypisana na konsoli. Możliwe jest także monitorowanie przebiegu aplikacji pod adresem [[http://localhost:4040]] * Job (zadanie) - wysokopoziomowe obliczenie uruchomione w wyniku takiej akcji, jak ''map()'' lub ''reduce()''. Podczas wykonania akcji Spark uruchamia job. * Stage (etap) - jest kolekcją podzadań, które mogą być wykonywane równolegle, wszystkie przeprowadzają to samo obliczenie, ale na innej partycji danych. Czyli etap stage reprezentuje krok w planie wykonania zadania. Etapy są tworzone na podstawie transformacji danych, a zwłaszcza zależnościa związanymi z przesyłaniem danych pomiędzy węzłami obliczeniowymi. * Executor (wykonawca) - w tym przypadku jednym wykonawcą jest driver. **5.** Umieść w spawozdaniu fragmenty zrzutów ekranu pokazujące działanie wykonawcy Sparka (konsoli i DriverUI) ==== 2.2 PiComputeAppDistributed ==== **1.** Wprowadź minimalną zmianę w kodzie aplikacji. Różnicą jest wskazanie adresu węzła Master. SparkSession spark = SparkSession .builder() .appName("Spark Pi Distributed") .master("spark://172.22.0.2:7077") .getOrCreate(); **2.** Zadbaj, aby kod był wywoływany z Main.main(), zbuduj artefakt **3.** Prześlij do wykonania z a pomocą ''spark-submit'' (w bash w konsoli kontenera spark-jupyter) **4.** Zamieść w spawozdaniu zrzuty WebUI węzłów Master i Worker(ów) wskazujących, że aplikacja była przez nie wykonywana. Możesz też dodać wykres Gantta z wykonywanych zadań (z DriverUI - port 4040, opcja timeline). ==== 2.3 LoadDistributedDataset ==== Załadujemy zbiór danych do Sparka. Musi być on dostępny dla węzłów * Driver * Worker W przemysłowym zastosowaniu byłby widoczny np. na rozproszonym systemie plików. W naszym przypadku skorzystamy z montowanych wolumenów Dockera (czyli będzie widoczny lokalnie). **Kontener wykonujący program węzła driver (''spark-jupyter'') będzie uruchamiany z katalogu ''cluster'' (zawierającym podkatalog ''data'').** **1.** Dodaj kod klasy dla nowej aplikacji oraz wywołaj ją w Main.main() public class LoadDistributedDataset { public static void main(String[] args) { SparkSession spark = SparkSession.builder() .appName("LoadDatasetDistributed") .master("spark://172.22.0.2:7077") .getOrCreate(); Dataset df = spark.read().format("csv") .option("header", "true") .option("inferschema","true") .load("data/owid-energy-data.csv.gz"); df.show(5); // To raczej zakomentuj... df=df.select("country","year","population","electricity_demand").where("country like \'Po%\' AND year >= 2000"); df.show(5); df.printSchema(); } } **2.** Skonfiguruj i zbuduj nowy artefakt (np. ''distributed_dataset.jar'') **3.** Przekopiuj **ręcznie** artefakt do katalogu ''cluster'' **4.** Przejdź do katalogu ''cluster'' i uruchom tam kontener docker run -it --rm -v ".:/opt/spark/work-dir" -p 4040:4040 -p 8888:8888 --network spark-network spark-jupyter /bin/bash Następnie sprawdź, czy artefakt i podkatalog ''data'' są widoczne **5.** Wykonaj ''spark-submit'', aby rozpocząć wykonywanie aplikacji /opt/spark/bin/spark-submit --driver-memory 4g --executor-memory 4g --class org.example.Main --master spark://172.22.0.2:7077 distributed-dataset.jar **6.** Oczekiwany efekt: * widoczna uruchomiona lub wykonana aplikacja w WebUI węzłów Master i Worker * Wydrukowana ramka na konsoli (węzła driver w Dockerze) +-------+----+----------+------------------+ |country|year|population|electricity_demand| +-------+----+----------+------------------+ | Poland|2000| 38504432| 136.81| | Poland|2001| 38662860| 136.99| | Poland|2002| 38647476| 135.42| | Poland|2003| 38621536| 139.85| | Poland|2004| 38596040| 142.97| +-------+----+----------+------------------+ only showing top 5 rows root |-- country: string (nullable = true) |-- year: integer (nullable = true) |-- population: long (nullable = true) |-- electricity_demand: double (nullable = true) ===== 3 PySpark + Jupyter ===== Spróbujemy powtórzyć te kroki w środowisku Jupyter korzystając z biblioteki PySpark. PySpark zapewnia interfejs w języku Python do platformy Spark. Aby go użyć - biblioteka PySpark musi być zainstalowana na wszystkich węzałach (co było uwzględnione przy budowie obrazów). Wewnętrznie korzysta z biblioteki [[https://www.py4j.org/|Py4J]] pozwalającej na dostęp do obiektów Java (w maszynie wirtualnej JVM) z poziomu kodu w języku Python. ==== 3.1 Uruchomienie jupyter-lab ==== **1.** Uruchom obraz ''spark-jupyter'' w katalogu cluster docker run -it --rm -v ".:/opt/spark/work-dir" -p 4040:4040 -p 8888:8888 --network spark-network spark-jupyter /bin/bash **2.** Wydaj polecenie sudo --preserve-env jupyter-lab --ip=0.0.0.0 --allow-root --IdentityProvider.token='pyspark' **3.** W przeglądarce otwórz link ''http://localhost:8888/?token=pyspark'' (albo bez parametru token i wpisz pyspark w formularzu) Obraz ''spark-jupyter'' zawiera podstawowe biblioteki do realizacji zadań uczenia maszynowego. Można go wykorzystać na innych zajęciach. ==== 3.2 Wyznaczanie liczby PI ==== **1.** Utwórz notatnik ''PiCompute'' i wprowadź w nim kod aplikacji (komórki) do wyznaczania liczby $\pi$ import pyspark from random import random from operator import add from pyspark.sql import SparkSession spark = SparkSession.builder.appName("PiCompute").master("spark://172.22.0.2:7077").getOrCreate() Tu otwórz [[http://localhost:4040| WebUI modułu driver na localhost]] partitions = 100 n = 1_000_000 * partitions def f(_: int) -> float: x = random() * 2 - 1 y = random() * 2 - 1 return 1 if x ** 2 + y ** 2 <= 1 else 0 count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add) print("Pi is roughly %f" % (4.0 * count / n)) spark.stop() **2.** Możesz uruchamiać interaktywnie kolejne komórki i monitorować działanie aplikacji w WebUI. Do momentu jawnego zakończenia sesji, będzie ona aktywna. ==== 3.3 LoadEnergyDataset ==== **1.** Utwórz notatnik i wprowadź w nim kod: import pyspark from random import random from operator import add from pyspark.sql import SparkSession spark = SparkSession.builder.appName("LoadEnergyDataset").master("spark://172.22.0.2:7077").getOrCreate() **2.** załaduj dane df = spark.read.format("csv").option("header", "true").option("inferschema","true").load("data/owid-energy-data.csv.gz") Możesz spróbować je wyświetlić df.show(5,70) ... ale raczej skasuj wyjście komórki **3.** Wybierzemy dane do wyświetlenia df2 = df.select("country","year","population","electricity_demand").where("country like \'Po%\' AND year >= 2000") I wyświetlimy je: df2.show(5) df2.printSchema() Zauważ, że ze względu na //lazy evaluation// dopiero ta komórka powoduje aktywność Sparka **3.** Wybierzmy podzbiór danych z Polski z interesującymi nas niepustymi wartościami df_pl = df.select('year','population','electricity_demand').where(("country == 'Poland' AND electricity_demand IS NOT NULL")) df_pl.show() === Wykresy=== Przekonwertujemy dane do postaci list import matplotlib.pyplot as plt df_pl = df_pl.orderBy('year') y = df_pl.select('year').rdd.flatMap(lambda x: x).collect() pop = df_pl.select('population').rdd.flatMap(lambda x: x).collect() dem = df_pl.select('electricity_demand').rdd.flatMap(lambda x: x).collect() oraz wyświetlimy je... plt.plot(y,pop) plt.xlabel('year') plt.ylabel('population') plt.title('Population vs. year') plt.show() plt.plot(y,dem,label='demand') plt.xlabel('year') plt.ylabel('demand') plt.title('Demand vs. year') plt.show() plt.scatter(pop,dem) plt.xlabel('population') plt.ylabel('demand') plt.title('Demand vs. population') plt.show() === Regresja liniowa=== Dla których zależności można zastosować regresję? **1.** Dodaj kod: from pyspark.ml.regression import LinearRegression from pyspark.ml.feature import VectorAssembler va=VectorAssembler().setInputCols(["year"]).setOutputCol("features") df_plf = va.transform(df_pl) df_plf.show(5) lr = LinearRegression()\ .setMaxIter(10)\ .setRegParam(3.0)\ .setElasticNetParam(0.5)\ .setFeaturesCol("features")\ .setLabelCol("electricity_demand") model = lr.fit(df_plf) **2.** Wyświetl metryki oraz równanie regresji print(f'RMSE: {model.summary.rootMeanSquaredError}') print(f'r2: {model.summary.r2}') print(f'iterations: {model.summary.totalIterations}') print(f'demand = {model.coefficients}*year {"+" if model.intercept > 0 else ""} {model.intercept}') **3.** Wyświetl wykresy import numpy as np # from pyspark.sql.types import StructType, StructField, DoubleType from pyspark.ml.linalg import Vectors xmin = np.min(y) xmax = np.max(y) xx = np.linspace(xmin-1,xmax+1,100) yy = [model.predict(Vectors.dense([x])) for x in xx] plt.scatter(y,dem,label='demand') plt.plot(xx,yy,label='fitted function',c='orange') plt.legend() plt.title('Electric energy demand in years') plt.show() **4.** Zakończ sesję Przed zakończeniem sesji obejrzyj DriverUI (localhost:4040) a zwłaszcza diagramy pokazujące konstrukcję i statystyki kwerend w zakładce //SQL / Dataframe// -- otwórz linki do kwerend. spark.stop() ==== 3.4 TODO - regresja dla Chin ==== Przeprowadź taką samą analizę dla Chin, ale spróbuj znaleźć zależność pomiędzy liczbą ludności, a zapotrzebowaniem na energię (która w przypadku Chin jest raczej widoczna). **Zapisz notatnik jako PDF i prześlij razem ze sprawozdaniem**