====== Laboratorium 7 - przewidywanie liczby wypożyczonych rowerów ====== Kontynuujemy regresję. Będziemy przetwarzali zbiór danych dotyczących wypożyczeń rowerów w okresie dwóch lat. Wykorzystamy środowisko PySpark (obraz dockera spark-jupyter z zajęć 5) ale bez klastra. Spark będzie uruchamiany w konfiguracji standalone. Użyj środowiska jupyter-lab **1.** Uruchom obraz spark-jupyter docker run -it --rm -v ".:/opt/spark/work-dir" -p 4040:4040 -p 8888:8888 -p 443:443 --network spark-network spark-jupyter /bin/bash **2.** W konsoli kontenera wydaj komendę startującą serwer notatników jupyter-lab sudo --preserve-env jupyter-lab --ip=0.0.0.0 --allow-root --IdentityProvider.token='pyspark' **3.** Edytuj kod w notatniku... ===== 1. Pobierz i wczytaj dane ===== !wget https://dysk.agh.edu.pl/s/G6ZNziBRbEEcMeN/download/Bike-Sharing-Dataset.zip -O Bike-Sharing-Dataset.zip !unzip Bike-Sharing-Dataset.zip !cat Readme.txt Po rozpakowaniu przeczytaj opis zbioru danych. **1.** Utwórz lokalną sesję Sparka i wczytaj dane. import pyspark from pyspark.sql import SparkSession from pyspark.sql.functions import * spark = SparkSession.builder.appName("BikerRental").master("local[*]").getOrCreate() df = spark.read.csv('day.csv', header=True, inferSchema=True) df.show(5) df.printSchema() Sprawdź, czy typ danych kolumny z datą został poprawnie zidentyfikowany **2.** Przekonwertuj zbiór do ''pandas.DateFrame'' i wyświetl histogramy. import matplotlib.pyplot as plt plt.rcParams["figure.figsize"] = (15,10) pdf =df.toPandas() **3.** Wyświetl wykres zależności ''cnt'', ''registered'' i ''casual'' do daty ''date'' dates = df.select('dteday').rdd.flatMap(lambda x:x).collect() ... ===== 2. Dane BASIC - regresja (bez przetwarzania wstępnego) ===== **1.** Napisz następujące funkcje from pyspark.ml.regression import LinearRegression from pyspark.ml.feature import VectorAssembler from pyspark.ml import Pipeline from pyspark.ml.evaluation import RegressionEvaluator def evaluate_metrics(df,metrics=['r2','rmse','mse','mae'],label_col='cnt',prediction_col='prediction'): """ Funkcja oblicza metryki regresji na podstawie zbioru danych zawierającego prawdziwe wartosci etykiet i wartości przewidywane :param df: Dataset wejściowy zbiór danych :param metrics: lista metryk do obliczenia :param label_col: nazwa kolumny z wartościami wyjściowymi :param prediction_col: nazwa kolumny zawierającej przewidywane wartości :return: słownik zawierający pary (nazwa_metryki, wartość) """ Będziemy dokonywali predykcji kolumny ''cnt''. Ustaw ją jako parametr ''lr''. def train_and_test(df,lr=LinearRegression(maxIter=100,regParam=3.0,elasticNetParam=0.5)): """ Funkcja (1) dzieli zbiór danych na ``df_train`` oraz ``df_test`` (2) tworzy ciąg przetwarzania zawierający ``VectorAssembler`` i przekazaną jako parametr konfiguracje algorytmu regresji (3) buduje model i dokonuje predykcji dla ``df_train`` oraz ``df_test`` (4) wyświetla metryki :param df: wejściowy zbiór danych :param lr: konfiguracja algorytmu regresji :return: model zwrócony przez ``pipeline.fit()`` """ **2.** Przekonwertuj datę do postaci numerycznej za pomocą funkcji ''unix_timestamp()''. Zakładamy, że kolumna ma mieć nazwę ''unixdate'', z identyfikator przetworzonego zbioru danych ''df2''. Usuń kolumny z którymi zmienna wyjściowa jest powiązana liniową zależnością. **3.** Wywołaj funkcję ''train_and_test()''. Przykładowy wynik: Train: r2=0.8075845429647183 rmse=842.8329062164518 mse=710367.3078012702 mae=634.2406059893968 Test: r2=0.7728820606921822 rmse=934.713606071121 mse=873689.5253744789 mae=660.1099130393472 ==== Grid search ==== **1.** Wykonaj 5-krotną walidację krzyżową przeszukując siatkę parametrów algorytmu regresji. Na przykład: paramGrid = ParamGridBuilder() \ .addGrid(lr.regParam, [0.1, 0.5, 1, 2, 3, 5, 10]) \ .addGrid(lr.elasticNetParam, [0.0, 0.25, 0.5, 0.75, 1.0]) \ .build() **2.** Wydrukuj parametry znalezionego najlepszego modelu bestModel = cvModel.bestModel print("Best model parameters:") print("regParam:", bestModel.stages[-1].getRegParam()) print("elasticNetParam:", bestModel.stages[-1].getElasticNetParam()) **3.** Wyświetl wykresy rzeczywistych i przewidywanych wartości. Obejrzyj wyniki dla różnych przedziałów. import numpy as np def plot(df,model,start=0,end=-1): if end == -1: end = df.count() x = np.arange(start,end) y = df.select('cnt').rdd.flatMap(lambda x:x).collect() df_pred = model.transform(df) y_pred = df_pred.select('prediction').rdd.flatMap(lambda x:x).collect() plt.plot(x,y[start:end],label='true') plt.plot(x,y_pred[start:end],label='pred') plt.legend() plt.title(model.stages[-1].__class__.__name__) plt.show() plot(df2,bestModel,start=,end=) ==== Przetestuj inne algorytmy ==== **1.** Porównaj najlepszą znalezioną konfigurację z innymi algorytmami regresji from pyspark.ml.regression import DecisionTreeRegressor from pyspark.ml.regression import RandomForestRegressor from pyspark.ml.regression import GBTRegressor for r in [LinearRegression(maxIter=,regParam=,elasticNetParam=),DecisionTreeRegressor(),RandomForestRegressor(),GBTRegressor()]: print(f'=== {r.__class__.__name__} ===') **2** Narysuj wykres dla najlepszego algorytmu (według miary r2 na zbiorze testowym) ===== 3. Dane PREPROCESS ===== ==== 3.1 Czy atrybuty są skorelowane z kolumną cnt ==== **1.** Dla wszystkich kolumn ''df2'' za wyjątkiem ''cnt'': * Oblicz współczynnik korelacji Pearsona ze zmienną objaśnianą: ''r = df2.stat.corr(col,'cnt')'' * Wyświetl wykres typu scatter plot pokazujący zależność pomiędzy daną kolumną a ''cnt'' **2.** Obejrzyj wykresy. Czy współczynnik korelacji zmieniłby się, gdyby zmienić kolejność miesięcy? W takim przypadku zastosujemy konwersję one-hot. :!: Uwaga. Nie ma sensu stosować konwersji one-hot jeżeli atrybut ma dwie wartości. {{ :ed:bike-sharing-season.png?direct&400 |}} {{ :ed:bike-sharing-mnth.png?direct&400 |}} **3.** Jak konwersja jest sugerowana w tym przypadku? {{ :ed:bikesharing-unixdate.png?direct&400 |}} ==== 3.2 Konwersja one-hot i cechy wielomainowe ==== Konwersja one-hot zamienia atrybut dyskretny z $k$ wartościami $\{a_1,\dots,a_k\}$, na $k$ kolumn, w których umieszczane są zera i jedynki. Wartość $x'$ po konwersji jest ustalana jako * $x'[i,j]=1$, jeżeli przed konwersją $x[i]=a_j$, * $x'[i,j]=0$, jeżeli przed konwersją $x[i]\neq a_j$ **1.** Przetestuj, jak działa OneHotEncoder. Dodaje on //jedną// kolumnę, która jest rzadkim wektorem. Co więcej, standardowo dla $k$ atrybutów tworzy wektory $k-1$ elementowe. [[https://spark.apache.org/docs/latest/api/java/org/apache/spark/ml/feature/OneHotEncoder.html]] Stosowany później ''VectorAssembler'' łączy wektory i pojedyncze wartości w jeden wektor. from pyspark.ml.feature import OneHotEncoder encoder = OneHotEncoder(inputCol="weekday", outputCol="categoryVec") model = encoder.fit(df2) df_encoded = model.transform(df2) df_encoded.show() **2.** Napisz funkcję: from pyspark.ml.feature import OneHotEncoder def encode_one_hot(df,col): """ Funkcja przekształca wybraną kolumny do postaci one-hot - tablicy wartości 0/1 :param df: Dataset wejściowy zbiór danych :param col: kolumna do konwersji one hot :return: zbiór danych z kolumną col przekształconą do postaci one-hot """ **3.** Dokonaj za jej pomocą konwersji odpowiednich atrybutów df3 = encode_one_hot(df3,'mnth') df3 = ... **4** Dodaj także (jawnie, jako wyrażenia) cechy wielomianowe dla wybranych atrybutów. Jakiego stopnia? Parabola ma 1 garb, krzywa 3 stopnia 2 garby, itd... **5.** Przetestuj i wyświetl wykres model = train_and_test(df3,lr=) plot(df3,model) **6.** Powtórz testy dla innych algorytmów regresji ===== 4. TIMESERIES - regresja szeregów czasowych ===== Interesuje nas zależność ''cnt'' od ''instance''. W pierwszej części dobierzemy postać wielomianu aproksymującego trend, a w drugiej dodamy cechy okresowe.. ==== 4.1 Dopasowanie krzywej trendu ==== **1.** Napisz funkcję from pyspark.ml.feature import PolynomialExpansion # Param for the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. def train_polynomial_features(df,lr=LinearRegression(),degree=2): """ Funkcja przetwarza cały zbiór danych (1) tworzy ciag przetwarzania zawierający ``VectorAssembler`` z jedynym wejściowym atrybutem ``instance``, dalej ''PolynomialExpansion'' i przekazaną jako parametr konfiguracje algorytmu regresji (2) buduje model i dokonuje predykcji dla ``df`` (3) wyświetla metryki za pomocą funkcji ``evaluate_metrics()`` :param df: wejściowy zbiór danych :param lr: konfiguracja algorytmu regresji wielomianu (argument ``PolynomialExpansion``) :param degree: stopień wielomianu (argument PolynomialExpansion) :return: model zwrócony przez ``pipeline.fit()`` """ **2.** Dobierz parametry algorytmu model = train_polynomial_features(df, lr = LinearRegression(maxIter=??,regParam=???,elasticNetParam=???),degree=??) print(model.stages[-1].coefficients) print('Iterations: ',model.stages[-1].summary.totalIterations) plot(df,model) Ustaw * Bardzo małą wartość regParam * Poeksperymentuj ze skrajnymi wartościami elasticNetParam (albo 0 albo 1) oraz skrajnymi wartościami maxIter (np. 10 i 100000) Oczekiwany wynik: {{ :ed:bikesharing-trend-polynomial.png?direct&400 |}} ==== 4.2 Analiza cech okresowych ==== Cechy okresowe można zidentyfikować: * wyznaczając autokorelację (korelację przebiegu z przebiegiem przesuniętym o 1,2,3,4,... przedziały czasu * analizując amplitudy częstotliwości po transformacji Fouriera === 4.2.1 Autokorelacja === Jeśli chcesz, możesz zainstalować brakujące oprogramowanie... !pip install seaborn !pip install statsmodels **1.** Macierz autokorelacji. Jaśniejsze paski wskazują na skorelowane wartości przesuniętych w czasie. Jakie to wartości? import pandas as pd pdf = df.toPandas() shifted = [pd.DataFrame(data = pdf.cnt).shift(i) for i in range(40)] for i in range(len(shifted)): shifted[i].columns=['cnt '+str(i)] # print(shifted) df_shifted = pd.concat(shifted,axis=1) # df_shifted.head(20) corr_mat = df_shifted.corr() import seaborn as sn plt.rcParams['figure.figsize'] = (12, 12) sn.heatmap(corr_mat,xticklabels=df_shifted.columns,yticklabels=df_shifted.columns) {{ :ed:bikesharing-autokorelacja-sn.png?direct&400 |}} **2.** Wykres autokorelacji dla kolejnych okresów from statsmodels.graphics.tsaplots import plot_acf plot_acf(pdf.cnt,lags=40) {{ :ed:bikesharing-autokorelacja-statsmodel.png?direct&400 |}} **3.** Dekompozycja na trend i cechy okresowe (sezonowe). W tym przypadku dla okresu 30 dni. Kod na podstawie: [[https://medium.com/analytics-vidhya/interpreting-acf-or-auto-correlation-plot-d12e9051cd14]] from statsmodels.tsa.seasonal import seasonal_decompose res = seasonal_decompose(pdf.cnt, model = "additive",period=30) fig, (ax1,ax2,ax3) = plt.subplots(3,1, figsize=(15,8)) res.trend.plot(ax=ax1,ylabel = "trend") res.seasonal.plot(ax=ax2,ylabel = "seasoanlity") res.resid.plot(ax=ax3,ylabel = "residual") plt.show() {{ :ed:bikesharing-decomposition.png?direct&400 |}} === 4.2.2 Analiza częstotliwości === Zastosujemy transformację Fouriera i zobaczymy, dla jakich częstotliwości mamy duże amplitudy? import numpy as np import matplotlib.pyplot as plt import scipy.fftpack y = pdf.cnt.to_numpy() x = np.arange(-y.shape[0]//2,y.shape[0]//2) yf = scipy.fftpack.fft(y) N=y.shape[0] yf = scipy.fftpack.fft(y) yf[0]=0 xf = np.arange(1,40) plt.scatter(xf, 2.0/N * np.abs(yf[1:40])) plt.vlines(xf, np.zeros(39),2.0/N * np.abs(yf[1:40])) plt.xticks(xf) plt.show() === 4.2.3 Cechy okresowe === Dodamy cechy okresowe (pary funkcji ortogonalnych) postaci $cos(\frac{2\pi x}{T_i})$ oraz $sin(\frac{2\pi x}{T_i})$. Zmienna $x$ odpowiada atrybutowi ''instance'' natomiast $T_i \in \mathbf{T}$ jest jednym z wybranych okresów. **1.** Dodaj cechy okresowe i zachowaj nazwy kolumn periods=[...] df3 = df cols=[] for k in periods: df3 = df3.withColumn(f'sin_{k}',expr(???)) df3 = df3.withColumn(f'cos_{k}',expr(???)) cols.append(f'sin_{k}') cols.append(f'cos_{k}') **2.** Zaimplementuj poniższą funkcję def train_polynomial_periodic_features(df,lr=LinearRegression(),degree=2,cols=[]): """ Funkcja przetwarza cały zbiór danych (1) tworzy ciag przetwarzania zawierający (a) ``VectorAssembler`` z jedynym wejściowym atrybutem ``instance`` i wyjściową kolumną ``features`` (b) ''PolynomialExpansion'' odpowiadający za cechy wielomianowe - tworzy kolumnę ``features_poly`` (c) ``VectorAssembler`` z wejściowymi atrybutami przekazanymi przez listę cols uzupełniona o ``features_poly`` z wyjściowa kolumną ``features_merged`` (d) przekazaną jako parametr konfiguracje algorytmu regresji (2) buduje model i dokonuje predykcji dla ``df`` (3) wyświetla metryki za pomocą funkcji ``evaluate_metrics()`` :param df: wejściowy zbiór danych :param lr: konfiguracja algorytmu regresji wielomianu (argument ``PolynomialExpansion``) :param degree: stopień wielomianu (argument PolynomialExpansion) :param cols: kolumny z nazwami cech okresowych) :return: model """ **3.** Przetestuj i wyświetl wyniki model = train_polynomial_periodic_features(df3, lr=LinearRegression(maxIter=???,regParam=???,elasticNetParam=???), degree=???,cols=cols) print(model.stages[-1].coefficients) print('Iterations: ',model.stages[-1].summary.totalIterations) plot(df3,model) Oczekiwana jest wartość r2 około 0.70. {{ :ed:bikesharing-timeseries.png?direct&400 |}}