====== Laboratorium 7 - przewidywanie liczby wypożyczonych rowerów ======
Kontynuujemy regresję. Będziemy przetwarzali zbiór danych dotyczących wypożyczeń rowerów w okresie dwóch lat.
Wykorzystamy środowisko PySpark (obraz dockera spark-jupyter z zajęć 5) ale bez klastra. Spark będzie uruchamiany w konfiguracji standalone.
Użyj środowiska jupyter-lab
**1.** Uruchom obraz spark-jupyter
docker run -it --rm -v ".:/opt/spark/work-dir" -p 4040:4040 -p 8888:8888 -p 443:443 --network spark-network spark-jupyter /bin/bash
**2.** W konsoli kontenera wydaj komendę startującą serwer notatników jupyter-lab
sudo --preserve-env jupyter-lab --ip=0.0.0.0 --allow-root --IdentityProvider.token='pyspark'
**3.** Edytuj kod w notatniku...
===== 1. Pobierz i wczytaj dane =====
!wget https://dysk.agh.edu.pl/s/G6ZNziBRbEEcMeN/download/Bike-Sharing-Dataset.zip -O Bike-Sharing-Dataset.zip
!unzip Bike-Sharing-Dataset.zip
!cat Readme.txt
Po rozpakowaniu przeczytaj opis zbioru danych.
**1.** Utwórz lokalną sesję Sparka i wczytaj dane.
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder.appName("BikerRental").master("local[*]").getOrCreate()
df = spark.read.csv('day.csv', header=True, inferSchema=True)
df.show(5)
df.printSchema()
Sprawdź, czy typ danych kolumny z datą został poprawnie zidentyfikowany
**2.** Przekonwertuj zbiór do ''pandas.DateFrame'' i wyświetl histogramy.
import matplotlib.pyplot as plt
plt.rcParams["figure.figsize"] = (15,10)
pdf =df.toPandas()
**3.** Wyświetl wykres zależności ''cnt'', ''registered'' i ''casual'' do daty ''date''
dates = df.select('dteday').rdd.flatMap(lambda x:x).collect()
...
===== 2. Dane BASIC - regresja (bez przetwarzania wstępnego) =====
**1.** Napisz następujące funkcje
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
def evaluate_metrics(df,metrics=['r2','rmse','mse','mae'],label_col='cnt',prediction_col='prediction'):
"""
Funkcja oblicza metryki regresji na podstawie zbioru danych zawierającego prawdziwe wartosci etykiet
i wartości przewidywane
:param df: Dataset wejściowy zbiór danych
:param metrics: lista metryk do obliczenia
:param label_col: nazwa kolumny z wartościami wyjściowymi
:param prediction_col: nazwa kolumny zawierającej przewidywane wartości
:return: słownik zawierający pary (nazwa_metryki, wartość)
"""
Będziemy dokonywali predykcji kolumny ''cnt''. Ustaw ją jako parametr ''lr''.
def train_and_test(df,lr=LinearRegression(maxIter=100,regParam=3.0,elasticNetParam=0.5)):
"""
Funkcja (1) dzieli zbiór danych na ``df_train`` oraz ``df_test``
(2) tworzy ciąg przetwarzania zawierający ``VectorAssembler``
i przekazaną jako parametr konfiguracje algorytmu regresji
(3) buduje model i dokonuje predykcji dla ``df_train`` oraz ``df_test``
(4) wyświetla metryki
:param df: wejściowy zbiór danych
:param lr: konfiguracja algorytmu regresji
:return: model zwrócony przez ``pipeline.fit()``
"""
**2.** Przekonwertuj datę do postaci numerycznej za pomocą funkcji ''unix_timestamp()''. Zakładamy, że kolumna ma mieć nazwę ''unixdate'', z identyfikator przetworzonego zbioru danych ''df2''. Usuń kolumny z którymi zmienna wyjściowa jest powiązana liniową zależnością.
**3.** Wywołaj funkcję ''train_and_test()''. Przykładowy wynik:
Train: r2=0.8075845429647183 rmse=842.8329062164518 mse=710367.3078012702 mae=634.2406059893968
Test: r2=0.7728820606921822 rmse=934.713606071121 mse=873689.5253744789 mae=660.1099130393472
==== Grid search ====
**1.** Wykonaj 5-krotną walidację krzyżową przeszukując siatkę parametrów algorytmu regresji. Na przykład:
paramGrid = ParamGridBuilder() \
.addGrid(lr.regParam, [0.1, 0.5, 1, 2, 3, 5, 10]) \
.addGrid(lr.elasticNetParam, [0.0, 0.25, 0.5, 0.75, 1.0]) \
.build()
**2.** Wydrukuj parametry znalezionego najlepszego modelu
bestModel = cvModel.bestModel
print("Best model parameters:")
print("regParam:", bestModel.stages[-1].getRegParam())
print("elasticNetParam:", bestModel.stages[-1].getElasticNetParam())
**3.** Wyświetl wykresy rzeczywistych i przewidywanych wartości. Obejrzyj wyniki dla różnych przedziałów.
import numpy as np
def plot(df,model,start=0,end=-1):
if end == -1:
end = df.count()
x = np.arange(start,end)
y = df.select('cnt').rdd.flatMap(lambda x:x).collect()
df_pred = model.transform(df)
y_pred = df_pred.select('prediction').rdd.flatMap(lambda x:x).collect()
plt.plot(x,y[start:end],label='true')
plt.plot(x,y_pred[start:end],label='pred')
plt.legend()
plt.title(model.stages[-1].__class__.__name__)
plt.show()
plot(df2,bestModel,start=,end=)
==== Przetestuj inne algorytmy ====
**1.** Porównaj najlepszą znalezioną konfigurację z innymi algorytmami regresji
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import GBTRegressor
for r in [LinearRegression(maxIter=,regParam=,elasticNetParam=),DecisionTreeRegressor(),RandomForestRegressor(),GBTRegressor()]:
print(f'=== {r.__class__.__name__} ===')
**2** Narysuj wykres dla najlepszego algorytmu (według miary r2 na zbiorze testowym)
===== 3. Dane PREPROCESS =====
==== 3.1 Czy atrybuty są skorelowane z kolumną cnt ====
**1.** Dla wszystkich kolumn ''df2'' za wyjątkiem ''cnt'':
* Oblicz współczynnik korelacji Pearsona ze zmienną objaśnianą: ''r = df2.stat.corr(col,'cnt')''
* Wyświetl wykres typu scatter plot pokazujący zależność pomiędzy daną kolumną a ''cnt''
**2.** Obejrzyj wykresy. Czy współczynnik korelacji zmieniłby się, gdyby zmienić kolejność miesięcy? W takim przypadku zastosujemy konwersję one-hot.
:!: Uwaga. Nie ma sensu stosować konwersji one-hot jeżeli atrybut ma dwie wartości.
{{ :ed:bike-sharing-season.png?direct&400 |}}
{{ :ed:bike-sharing-mnth.png?direct&400 |}}
**3.** Jak konwersja jest sugerowana w tym przypadku?
{{ :ed:bikesharing-unixdate.png?direct&400 |}}
==== 3.2 Konwersja one-hot i cechy wielomainowe ====
Konwersja one-hot zamienia atrybut dyskretny z $k$ wartościami $\{a_1,\dots,a_k\}$, na $k$ kolumn, w których umieszczane są zera i jedynki.
Wartość $x'$ po konwersji jest ustalana jako
* $x'[i,j]=1$, jeżeli przed konwersją $x[i]=a_j$,
* $x'[i,j]=0$, jeżeli przed konwersją $x[i]\neq a_j$
**1.** Przetestuj, jak działa OneHotEncoder. Dodaje on //jedną// kolumnę, która jest rzadkim wektorem. Co więcej, standardowo dla $k$ atrybutów tworzy wektory $k-1$ elementowe. [[https://spark.apache.org/docs/latest/api/java/org/apache/spark/ml/feature/OneHotEncoder.html]]
Stosowany później ''VectorAssembler'' łączy wektory i pojedyncze wartości w jeden wektor.
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder(inputCol="weekday", outputCol="categoryVec")
model = encoder.fit(df2)
df_encoded = model.transform(df2)
df_encoded.show()
**2.** Napisz funkcję:
from pyspark.ml.feature import OneHotEncoder
def encode_one_hot(df,col):
"""
Funkcja przekształca wybraną kolumny do postaci one-hot - tablicy wartości 0/1
:param df: Dataset wejściowy zbiór danych
:param col: kolumna do konwersji one hot
:return: zbiór danych z kolumną col przekształconą do postaci one-hot
"""
**3.** Dokonaj za jej pomocą konwersji odpowiednich atrybutów
df3 = encode_one_hot(df3,'mnth')
df3 = ...
**4** Dodaj także (jawnie, jako wyrażenia) cechy wielomianowe dla wybranych atrybutów. Jakiego stopnia? Parabola ma 1 garb, krzywa 3 stopnia 2 garby, itd...
**5.** Przetestuj i wyświetl wykres
model = train_and_test(df3,lr=)
plot(df3,model)
**6.** Powtórz testy dla innych algorytmów regresji
===== 4. TIMESERIES - regresja szeregów czasowych =====
Interesuje nas zależność ''cnt'' od ''instance''. W pierwszej części dobierzemy postać wielomianu aproksymującego trend, a w drugiej dodamy cechy okresowe..
==== 4.1 Dopasowanie krzywej trendu ====
**1.** Napisz funkcję
from pyspark.ml.feature import PolynomialExpansion
# Param for the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.
def train_polynomial_features(df,lr=LinearRegression(),degree=2):
"""
Funkcja przetwarza cały zbiór danych
(1) tworzy ciag przetwarzania zawierający ``VectorAssembler``
z jedynym wejściowym atrybutem ``instance``, dalej ''PolynomialExpansion''
i przekazaną jako parametr konfiguracje algorytmu regresji
(2) buduje model i dokonuje predykcji dla ``df``
(3) wyświetla metryki za pomocą funkcji ``evaluate_metrics()``
:param df: wejściowy zbiór danych
:param lr: konfiguracja algorytmu regresji wielomianu (argument ``PolynomialExpansion``)
:param degree: stopień wielomianu (argument PolynomialExpansion)
:return: model zwrócony przez ``pipeline.fit()``
"""
**2.** Dobierz parametry algorytmu
model = train_polynomial_features(df, lr = LinearRegression(maxIter=??,regParam=???,elasticNetParam=???),degree=??)
print(model.stages[-1].coefficients)
print('Iterations: ',model.stages[-1].summary.totalIterations)
plot(df,model)
Ustaw
* Bardzo małą wartość regParam
* Poeksperymentuj ze skrajnymi wartościami elasticNetParam (albo 0 albo 1) oraz skrajnymi wartościami maxIter (np. 10 i 100000)
Oczekiwany wynik:
{{ :ed:bikesharing-trend-polynomial.png?direct&400 |}}
==== 4.2 Analiza cech okresowych ====
Cechy okresowe można zidentyfikować:
* wyznaczając autokorelację (korelację przebiegu z przebiegiem przesuniętym o 1,2,3,4,... przedziały czasu
* analizując amplitudy częstotliwości po transformacji Fouriera
=== 4.2.1 Autokorelacja ===
Jeśli chcesz, możesz zainstalować brakujące oprogramowanie...
!pip install seaborn
!pip install statsmodels
**1.** Macierz autokorelacji. Jaśniejsze paski wskazują na skorelowane wartości przesuniętych w czasie. Jakie to wartości?
import pandas as pd
pdf = df.toPandas()
shifted = [pd.DataFrame(data = pdf.cnt).shift(i) for i in range(40)]
for i in range(len(shifted)):
shifted[i].columns=['cnt '+str(i)]
# print(shifted)
df_shifted = pd.concat(shifted,axis=1)
# df_shifted.head(20)
corr_mat = df_shifted.corr()
import seaborn as sn
plt.rcParams['figure.figsize'] = (12, 12)
sn.heatmap(corr_mat,xticklabels=df_shifted.columns,yticklabels=df_shifted.columns)
{{ :ed:bikesharing-autokorelacja-sn.png?direct&400 |}}
**2.** Wykres autokorelacji dla kolejnych okresów
from statsmodels.graphics.tsaplots import plot_acf
plot_acf(pdf.cnt,lags=40)
{{ :ed:bikesharing-autokorelacja-statsmodel.png?direct&400 |}}
**3.** Dekompozycja na trend i cechy okresowe (sezonowe). W tym przypadku dla okresu 30 dni. Kod na podstawie: [[https://medium.com/analytics-vidhya/interpreting-acf-or-auto-correlation-plot-d12e9051cd14]]
from statsmodels.tsa.seasonal import seasonal_decompose
res = seasonal_decompose(pdf.cnt, model = "additive",period=30)
fig, (ax1,ax2,ax3) = plt.subplots(3,1, figsize=(15,8))
res.trend.plot(ax=ax1,ylabel = "trend")
res.seasonal.plot(ax=ax2,ylabel = "seasoanlity")
res.resid.plot(ax=ax3,ylabel = "residual")
plt.show()
{{ :ed:bikesharing-decomposition.png?direct&400 |}}
=== 4.2.2 Analiza częstotliwości ===
Zastosujemy transformację Fouriera i zobaczymy, dla jakich częstotliwości mamy duże amplitudy?
import numpy as np
import matplotlib.pyplot as plt
import scipy.fftpack
y = pdf.cnt.to_numpy()
x = np.arange(-y.shape[0]//2,y.shape[0]//2)
yf = scipy.fftpack.fft(y)
N=y.shape[0]
yf = scipy.fftpack.fft(y)
yf[0]=0
xf = np.arange(1,40)
plt.scatter(xf, 2.0/N * np.abs(yf[1:40]))
plt.vlines(xf, np.zeros(39),2.0/N * np.abs(yf[1:40]))
plt.xticks(xf)
plt.show()
=== 4.2.3 Cechy okresowe ===
Dodamy cechy okresowe (pary funkcji ortogonalnych) postaci $cos(\frac{2\pi x}{T_i})$ oraz $sin(\frac{2\pi x}{T_i})$. Zmienna $x$ odpowiada atrybutowi ''instance'' natomiast $T_i \in \mathbf{T}$ jest jednym z wybranych okresów.
**1.** Dodaj cechy okresowe i zachowaj nazwy kolumn
periods=[...]
df3 = df
cols=[]
for k in periods:
df3 = df3.withColumn(f'sin_{k}',expr(???))
df3 = df3.withColumn(f'cos_{k}',expr(???))
cols.append(f'sin_{k}')
cols.append(f'cos_{k}')
**2.** Zaimplementuj poniższą funkcję
def train_polynomial_periodic_features(df,lr=LinearRegression(),degree=2,cols=[]):
"""
Funkcja przetwarza cały zbiór danych
(1) tworzy ciag przetwarzania zawierający
(a) ``VectorAssembler`` z jedynym wejściowym atrybutem ``instance`` i wyjściową kolumną ``features``
(b) ''PolynomialExpansion'' odpowiadający za cechy wielomianowe - tworzy kolumnę ``features_poly``
(c) ``VectorAssembler`` z wejściowymi atrybutami przekazanymi przez listę cols uzupełniona o
``features_poly`` z wyjściowa kolumną ``features_merged``
(d) przekazaną jako parametr konfiguracje algorytmu regresji
(2) buduje model i dokonuje predykcji dla ``df``
(3) wyświetla metryki za pomocą funkcji ``evaluate_metrics()``
:param df: wejściowy zbiór danych
:param lr: konfiguracja algorytmu regresji wielomianu (argument ``PolynomialExpansion``)
:param degree: stopień wielomianu (argument PolynomialExpansion)
:param cols: kolumny z nazwami cech okresowych)
:return: model
"""
**3.** Przetestuj i wyświetl wyniki
model = train_polynomial_periodic_features(df3,
lr=LinearRegression(maxIter=???,regParam=???,elasticNetParam=???),
degree=???,cols=cols)
print(model.stages[-1].coefficients)
print('Iterations: ',model.stages[-1].summary.totalIterations)
plot(df3,model)
Oczekiwana jest wartość r2 około 0.70.
{{ :ed:bikesharing-timeseries.png?direct&400 |}}