Differences

This shows you the differences between two versions of the page.

Link to this comparison view

Both sides previous revision Previous revision
Next revision
Previous revision
ed:lab_05 [2024/03/12 05:30]
pszwed [PiComputeApp]
ed:lab_05 [2024/03/13 03:24] (current)
pszwed [1.2 Zbuduj obraz spark-jupyter]
Line 1: Line 1:
-====== Laboratorium 5 ====== +====== Laboratorium 5  ====== 
-FIXME Strona w trakcie edycji + 
-===== Konfiguracja Dockera =====+===== 1. Konfiguracja Dockera =====
  
 Naszym celem jest zbudowanie konfiguracji jak na rysunku: cztery kontenery połączone wspólną siecią Naszym celem jest zbudowanie konfiguracji jak na rysunku: cztery kontenery połączone wspólną siecią
Line 29: Line 29:
 </code> </code>
  
 +
 +**Pobierz pliki konfiguracyjne i zbiór danych: {{ :ed:spark-jdk17.zip |}}. Następnie rozpakuj w dowolnym katalogu roboczym.**
 ==== Utwórz sieć spark-network ==== ==== Utwórz sieć spark-network ====
  
Line 35: Line 37:
 </code> </code>
  
-==== Zbuduj obraz spark-python ====+==== 1.1. Zbuduj obraz spark-python ====
  
 Przejdź do katalogu ''spark-python''. Powinien w nim znajdować się plik Dockerfile z następującą zawartością: Przejdź do katalogu ''spark-python''. Powinien w nim znajdować się plik Dockerfile z następującą zawartością:
Line 67: Line 69:
  
  
-==== Zbuduj obraz spark-jupyter ====+==== 1.2 Zbuduj obraz spark-jupyter ====
  
 Przejdź do katalogu ''spark-jupyter''. Powinien w nim znajdować się plik Dockerfile z następującą zawartością: Przejdź do katalogu ''spark-jupyter''. Powinien w nim znajdować się plik Dockerfile z następującą zawartością:
Line 83: Line 85:
     pip3 cache purge; \     pip3 cache purge; \
     pip3 install pyspark numpy scikit-learn pandas matplotlib plotly jupyterlab; \     pip3 install pyspark numpy scikit-learn pandas matplotlib plotly jupyterlab; \
 +    apt-get install -y pandoc; \
 +    apt-get install -y texlive-xetex texlive-fonts-recommended texlive-plain-generic; \
 +    apt-get install -y texlive-latex-extra texlive-fonts-extra texlive-lang-all; \
     rm -rf /var/lib/apt/lists/*     rm -rf /var/lib/apt/lists/*
 +
 +
 RUN echo "spark ALL=(ALL) NOPASSWD: ALL" >> /etc/sudoers RUN echo "spark ALL=(ALL) NOPASSWD: ALL" >> /etc/sudoers
 ENV JAVA_HOME /opt/java/openjdk ENV JAVA_HOME /opt/java/openjdk
Line 97: Line 104:
 </code> </code>
  
-==== Uruchomienie klastra ====+Obraz zawiera zbiór bibliotek ogólnego zastosowania języka Python do eksploracji danych, wizualizacji i uczenie maszynowego: 
 +  * bibliotekę pyspark 
 +  * typowe biblioteki uczenia maszynowego (ale bez tensorflow) 
 +  * jupyter-lab (wsparcie dla notatników) 
 +  * poandoc i instalację latexa (po załadowaniu notatnika możliwy jest wydruk w formacjie PDF z zawijaniem długich linii  
 + 
 +Jak uruchomić -  [[https://home.agh.edu.pl/~pszwed/wiki/doku.php?id=ed:lab_05#uruchomienie_jupyter-lab|opis w sekcji 3.1]] 
 + 
 +==== 1.3 Uruchomienie klastra ====
  
 W katalogu cluster powinien znaleźć się plik ''docker-compose.yml'' z następującą zawartością W katalogu cluster powinien znaleźć się plik ''docker-compose.yml'' z następującą zawartością
Line 180: Line 195:
   * WorkerUI (worker-1) [[http://localhost:9092/]]   * WorkerUI (worker-1) [[http://localhost:9092/]]
  
-==== Uruchomienie węzła spark-jupyter ====+==== 1.4 Uruchomienie węzła spark-jupyter ====
  
 Kontener będzie uruchamiany za pomocą polecenia: Kontener będzie uruchamiany za pomocą polecenia:
Line 194: Line 209:
   * W momencie uruchomienia wołany jest shell   * W momencie uruchomienia wołany jest shell
  
-===== Java =====+===== Java =====
  
-==== PiComputeApp ====+==== 2.1 PiComputeApp ====
  
 **1.** Dodaj kod klasy do projektu i wprowadź odpowiednie wywołanie w funkcji ''Main.main()'' **1.** Dodaj kod klasy do projektu i wprowadź odpowiednie wywołanie w funkcji ''Main.main()''
Line 282: Line 297:
   * Executor (wykonawca) - w tym przypadku jednym wykonawcą jest driver.   * Executor (wykonawca) - w tym przypadku jednym wykonawcą jest driver.
  
 +**5.** Umieść w spawozdaniu fragmenty zrzutów ekranu pokazujące działanie wykonawcy Sparka (konsoli i DriverUI)
      
 +==== 2.2 PiComputeAppDistributed ====
  
 +**1.** Wprowadź minimalną zmianę w kodzie aplikacji. Różnicą jest wskazanie adresu węzła Master. 
 +
 +<code java>
 +    SparkSession spark = SparkSession
 +            .builder()
 +            .appName("Spark Pi Distributed")
 +            .master("spark://172.22.0.2:7077")
 +            .getOrCreate();
 +
 +</code>
 +
 +**2.** Zadbaj, aby kod był wywoływany z Main.main(), zbuduj artefakt
 +
 +**3.** Prześlij do wykonania z a pomocą ''spark-submit'' (w bash w konsoli kontenera spark-jupyter)
 +
 +**4.** Zamieść w spawozdaniu zrzuty WebUI węzłów Master i Worker(ów) wskazujących, że aplikacja była przez nie wykonywana. Możesz też dodać wykres Gantta z wykonywanych zadań (z DriverUI - port 4040, opcja timeline). 
 +
 +==== 2.3 LoadDistributedDataset ====
 +
 +Załadujemy zbiór danych do Sparka. Musi być on dostępny dla węzłów
 +  * Driver
 +  * Worker
 +
 +W przemysłowym zastosowaniu byłby widoczny np. na rozproszonym systemie plików. W naszym przypadku skorzystamy z montowanych wolumenów Dockera (czyli będzie widoczny lokalnie).
 +
 +**Kontener wykonujący program węzła driver (''spark-jupyter'') będzie uruchamiany z katalogu ''cluster'' (zawierającym podkatalog ''data'').**
 +
 +**1.** Dodaj kod klasy dla nowej aplikacji oraz wywołaj ją w Main.main()
 +
 +<code java>
 +public class LoadDistributedDataset {
 +
 +        public static void main(String[] args) {
 +        SparkSession spark = SparkSession.builder()
 +                .appName("LoadDatasetDistributed")
 +                .master("spark://172.22.0.2:7077")
 +                .getOrCreate();
 +
 +        Dataset<Row> df = spark.read().format("csv")
 +                .option("header", "true")
 +                .option("inferschema","true")
 +                .load("data/owid-energy-data.csv.gz");
 +
 +        df.show(5); // To raczej zakomentuj...
 +        df=df.select("country","year","population","electricity_demand").where("country like \'Po%\' AND year >= 2000");
 +        df.show(5);
 +        df.printSchema();
 +
 +
 +    }
 +}
 +
 +</code>
 +
 +**2.** Skonfiguruj i zbuduj nowy artefakt (np. ''distributed_dataset.jar'')
 +
 +**3.** Przekopiuj **ręcznie** artefakt do katalogu ''cluster''
 +
 +**4.** Przejdź do katalogu ''cluster'' i uruchom tam kontener
 +
 +<code>
 +docker run -it --rm -v ".:/opt/spark/work-dir"  -p 4040:4040  -p 8888:8888 --network spark-network spark-jupyter /bin/bash 
 +</code>
 +
 +Następnie sprawdź, czy artefakt i podkatalog ''data'' są widoczne
 +
 +**5.** Wykonaj ''spark-submit'', aby rozpocząć wykonywanie aplikacji
 +
 +<code>
 +/opt/spark/bin/spark-submit --driver-memory 4g --executor-memory 4g --class org.example.Main --master spark://172.22.0.2:7077 distributed-dataset.jar
 +
 +</code>
 +
 +
 +**6.** Oczekiwany efekt: 
 +  * widoczna uruchomiona lub wykonana aplikacja w WebUI węzłów Master i Worker
 +  * Wydrukowana ramka na konsoli (węzła driver w Dockerze)
 +
 +<code>
 ++-------+----+----------+------------------+
 +|country|year|population|electricity_demand|
 ++-------+----+----------+------------------+
 +| Poland|2000|  38504432|            136.81|
 +| Poland|2001|  38662860|            136.99|
 +| Poland|2002|  38647476|            135.42|
 +| Poland|2003|  38621536|            139.85|
 +| Poland|2004|  38596040|            142.97|
 ++-------+----+----------+------------------+
 +only showing top 5 rows
 +
 +root
 + |-- country: string (nullable = true)
 + |-- year: integer (nullable = true)
 + |-- population: long (nullable = true)
 + |-- electricity_demand: double (nullable = true)
 +
 +</code>
 +
 +
 +===== 3 PySpark + Jupyter =====
 +
 +Spróbujemy powtórzyć te kroki w środowisku Jupyter korzystając z biblioteki PySpark.
 +
 +PySpark zapewnia interfejs w języku Python do platformy Spark. Aby go użyć - biblioteka PySpark musi być zainstalowana na wszystkich węzałach (co było uwzględnione przy budowie obrazów). Wewnętrznie korzysta z biblioteki [[https://www.py4j.org/|Py4J]] pozwalającej na dostęp do obiektów Java (w maszynie wirtualnej JVM) z poziomu kodu w języku Python.
 +
 +
 +==== 3.1 Uruchomienie jupyter-lab ====
 +
 +**1.** Uruchom obraz ''spark-jupyter'' w katalogu cluster
 +
 +<code>
 +docker run -it --rm -v ".:/opt/spark/work-dir"  -p 4040:4040  -p 8888:8888 --network spark-network spark-jupyter /bin/bash
 +</code>
 +
 +
 +**2.** Wydaj polecenie
 +
 +<code>
 +sudo --preserve-env jupyter-lab  --ip=0.0.0.0 --allow-root --IdentityProvider.token='pyspark'
 +</code>
 +
 +
 +**3.** W przeglądarce otwórz  link ''http://localhost:8888/?token=pyspark'' (albo bez parametru token i wpisz pyspark w formularzu)
 +
 +Obraz ''spark-jupyter'' zawiera podstawowe biblioteki do realizacji zadań uczenia maszynowego. Można go wykorzystać na innych zajęciach.
 +
 +
 +==== 3.2 Wyznaczanie liczby PI ====
 +
 +**1.** Utwórz notatnik ''PiCompute'' i wprowadź w nim kod aplikacji (komórki) do wyznaczania liczby $\pi$
 +
 +<code python>
 +import pyspark
 +from random import random
 +from operator import add
 +
 +from pyspark.sql import SparkSession
 +</code>  
 +
 +<code python>
 +spark = SparkSession.builder.appName("PiCompute").master("spark://172.22.0.2:7077").getOrCreate()
 +</code>
 +
 +Tu otwórz [[http://localhost:4040| WebUI modułu driver na localhost]]
 +
 +<code python>
 +partitions = 100
 +n = 1_000_000 * partitions
 +
 +def f(_: int) -> float:
 +    x = random() * 2 - 1
 +    y = random() * 2 - 1
 +    return 1 if x ** 2 + y ** 2 <= 1 else 0
 +
 +count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
 +print("Pi is roughly %f" % (4.0 * count / n))
 +</code>
 +
 +<code python>
 +spark.stop()
 +</code>
 +
 +**2.** Możesz uruchamiać interaktywnie kolejne komórki i monitorować działanie aplikacji w WebUI. Do momentu jawnego zakończenia sesji, będzie ona aktywna.
 +
 +==== 3.3 LoadEnergyDataset ====
 +
 +**1.** Utwórz notatnik i wprowadź w nim kod:
 +
 +<code python>
 +import pyspark
 +from random import random
 +from operator import add
 +
 +from pyspark.sql import SparkSession
 +</code>
 +
 +<code python>
 +spark = SparkSession.builder.appName("LoadEnergyDataset").master("spark://172.22.0.2:7077").getOrCreate()
 +</code>
 +
 +**2.** załaduj dane
 +
 +<code python>
 +df = spark.read.format("csv").option("header", "true").option("inferschema","true").load("data/owid-energy-data.csv.gz")
 +</code>
 +
 +Możesz spróbować je wyświetlić
 +<code python>
 +df.show(5,70)
 +</code>
 +... ale raczej skasuj wyjście komórki
 +
 +**3.** Wybierzemy dane do wyświetlenia 
 +<code python>
 +df2 = df.select("country","year","population","electricity_demand").where("country like \'Po%\' AND year >= 2000")
 +</code>
 +
 +I wyświetlimy je:
 +
 +<code python>
 +df2.show(5)
 +df2.printSchema()
 +</code>
 +
 +Zauważ, że ze względu na //lazy evaluation// dopiero ta komórka powoduje aktywność Sparka
 +
 +
 +**3.** Wybierzmy podzbiór danych z Polski z interesującymi nas niepustymi wartościami 
 +
 +<code python>
 +df_pl = df.select('year','population','electricity_demand').where(("country == 'Poland' AND electricity_demand IS NOT NULL"))
 +df_pl.show()
 +</code>
 +
 +=== Wykresy===
 +Przekonwertujemy dane do postaci list
 +
 +<code python>
 +import matplotlib.pyplot as plt
 +df_pl = df_pl.orderBy('year')
 +y = df_pl.select('year').rdd.flatMap(lambda x: x).collect()
 +pop = df_pl.select('population').rdd.flatMap(lambda x: x).collect()
 +dem = df_pl.select('electricity_demand').rdd.flatMap(lambda x: x).collect()
 +</code>
 +
 +oraz wyświetlimy je...
 +<code python>
 +plt.plot(y,pop)
 +plt.xlabel('year')
 +plt.ylabel('population')
 +plt.title('Population vs. year')
 +plt.show()
 +</code>
 +
 +<code python>
 +plt.plot(y,dem,label='demand')
 +plt.xlabel('year')
 +plt.ylabel('demand')
 +plt.title('Demand vs. year')
 +plt.show()
 +</code>
 +
 +<code python>
 +plt.scatter(pop,dem)
 +plt.xlabel('population')
 +plt.ylabel('demand')
 +plt.title('Demand vs. population')
 +plt.show()
 +</code>
 +
 +=== Regresja liniowa===
 +Dla których zależności można zastosować regresję?
 +
 +**1.** Dodaj kod:
 +
 +<code python>
 +from pyspark.ml.regression import LinearRegression
 +from pyspark.ml.feature import VectorAssembler
 +
 +va=VectorAssembler().setInputCols(["year"]).setOutputCol("features")
 +df_plf = va.transform(df_pl)
 +df_plf.show(5)
 +
 +lr = LinearRegression()\
 +  .setMaxIter(10)\
 +  .setRegParam(3.0)\
 +  .setElasticNetParam(0.5)\
 +  .setFeaturesCol("features")\
 +  .setLabelCol("electricity_demand")
 +model = lr.fit(df_plf)
 +</code>
 +
 +**2.** Wyświetl metryki oraz równanie regresji
 +
 +<code python>
 +print(f'RMSE: {model.summary.rootMeanSquaredError}')
 +print(f'r2: {model.summary.r2}')
 +print(f'iterations: {model.summary.totalIterations}')
 +print(f'demand = {model.coefficients}*year {"+" if model.intercept > 0 else ""} {model.intercept}')
 +</code>
 +
 +**3.** Wyświetl wykresy 
 +
 +<code python>
 +import numpy as np
 +# from pyspark.sql.types import StructType, StructField, DoubleType
 +from pyspark.ml.linalg import Vectors
 +
 +xmin = np.min(y)
 +xmax = np.max(y)
 +xx = np.linspace(xmin-1,xmax+1,100)
 +
 +yy = [model.predict(Vectors.dense([x])) for x in xx]
 +
 +plt.scatter(y,dem,label='demand')
 +plt.plot(xx,yy,label='fitted function',c='orange')
 +plt.legend()
 +plt.title('Electric energy demand in years')
 +plt.show()
 +</code>
 +
 +**4.** Zakończ sesję
 +
 +Przed zakończeniem sesji obejrzyj DriverUI (localhost:4040) a zwłaszcza diagramy pokazujące konstrukcję i statystyki kwerend w zakładce //SQL / Dataframe// -- otwórz linki do kwerend.
    
 +<code python>
 +spark.stop()
 +</code>
 +
 +==== 3.4 TODO - regresja dla Chin ====
 +
 +Przeprowadź taką samą analizę dla Chin, ale spróbuj znaleźć zależność pomiędzy liczbą ludności, a zapotrzebowaniem na energię (która w przypadku Chin jest raczej widoczna).
 +
 +**Zapisz notatnik jako PDF i prześlij razem ze sprawozdaniem**
 +
ed/lab_05.1710217832.txt.gz · Last modified: 2024/03/12 05:30 by pszwed
CC Attribution-Share Alike 4.0 International
Driven by DokuWiki Recent changes RSS feed Valid CSS Valid XHTML 1.0