====== Laboratorium 6: Regresja - przewidywanie cen sprzedaży domów ======
Celem zadania jest przewidywanie cen sprzedaży domów.
* Zbiór danych dotyczących sprzedaży domów dostępny na [[https://www.kaggle.com/harlfoxem/housesalesprediction|Kaggle]].
* Jego lokalna kopia może zostać pobrana z {{ :med:kc_house_data.csv.zip |kc_house_data.csv.zip}} lub z [[https://dysk.agh.edu.pl/s/nWNWeirnCJDQycp/download/kc_house_data.csv.gz| kc_house_data.csv.gz]] (bezpośrednio obsługiwana przez Spark)
Wykorzystamy środowisko PySpark (obraz dockera spark-jupyter z zajęć 5) ale bez klastra. Spark będzie uruchamiany w konfiguracji standalone.
===== 1. Utworzenie sesji Spark i załadowanie danych =====
**1.** Uruchom obraz spark-jupyter
docker run -it --rm -v ".:/opt/spark/work-dir" -p 4040:4040 -p 8888:8888 -p 443:443 --network spark-network spark-jupyter /bin/bash
Dodano mapowanie portu 443:443, aby bezpośrednio uzyskać dostęp do zbioru danych
**2.** W konsoli kontenera wydaj komendę startującą serwer notatników jupyter-lab
sudo --preserve-env jupyter-lab --ip=0.0.0.0 --allow-root --IdentityProvider.token='pyspark'
**3.** Edytuj kod w notatniku...
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder.appName("KcHouseData").master("local[*]").getOrCreate()
Zamiast "local[*]" możesz uruchomić wcześniej klaster, i przeprowadzić oblicznia na klastrze. Konieczne jednak będzie modyfikacja sposobu udostępniania pliku węzłom obliczeniowym (jak na poprzednim laboratorium).
**4.** Załaduj i wyświetl zbiór danych
from pyspark import SparkFiles
url = "https://dysk.agh.edu.pl/s/nWNWeirnCJDQycp/download/kc_house_data.csv.gz"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("kc_house_data.csv.gz"), header=True, inferSchema=True)
df.show(5)
//Dodanie pliku do kontekstu Sparka w środowisku rozproszonym pozwala na automatyczne udostępnienie go węzłom obliczeniowym. W przypadku klastra na Dokerze problemem jest zapewnienie dostępu do tego samego portu przez serwisy.//
**5.** W środowisku PySpark możliwa jets konwersja zbioru danych do Pandas Dataframe
pdf = df.toPandas()
print(pdf.info())
pdf.head()
===== 2. Przetwarzanie wstępne =====
**1.** Kolumna ''date'' jest tekstem. Zamień ją na typ ''timestamp'' wywołując funkcję ''to_timestamp("date", "yyyyMMdd'T'HHmmss")''. Wyświetl zawartość zbioru danych
**2.** Zamień kolumnę ''date'' już typu ''timestamp'' na liczbę za pomocą funkcji ''unix_timestamp()'' nowa kolumna może nazywać się''unixdate''. Usuń kolumnę ''date'' - będzimey dalej przetwarzali atrybuty numeryczne.
**3.** Usuń kolumnę ''id''
===== 3. Regresja =====
Zakładamy, że po konwersjach zbiór danych identyfikowany jest przez zmienną ''df2''.
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
**1.** Utwórz obiekt VectorAssembler ustawiając jako kolumny wejściowe kolumny ''df2'' bez ''price''. Atrybut ''df2.columns'' umożliwia dostęp do kolumn zbioru danych. Kolumną wyjściową ma być zwyczajoweo ''features''.
**2.** Zdefiniuj parametry algorytmu i wywołaj algorytm uczenia
lr = LinearRegression()\
.setMaxIter(100)\
.setRegParam(3.0)\
.setElasticNetParam(0.5)\
.setFeaturesCol("features")\
.setLabelCol("price")
model = lr.fit(....)
**3.** Wyświetl informacje o procesie uczenia
print(f'RMSE: {model.summary.rootMeanSquaredError}')
print(f'r2: {model.summary.r2}')
print(f'iterations: {model.summary.totalIterations}')
==== 3.1 Train test split ====
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
**1.** Podziel zbiór na dane uczące i testowe
df_train,df_test = df2.randomSplit([0.7,0.3],seed=1)
**2.** Skonfiguruje ''VectorAssembler'' i ''LinearRegression'' i umieść obiekty w ciągu przetwarzania pipeline
pipeline = Pipeline(stages=[va,lr])
**3.** Przeprowadź budowę modelu i predykcję dla zbioru uczącego i testowego
model = pipeline.fit(df_train)
df_train_pred = model.transform(df_train)
df_test_pred = model.transform(df_test)
**4.** Wyświetl metryki rmse i r2 dla zbiorów treningowego i testowego
evaluator = RegressionEvaluator() \
.setLabelCol("price") \
.setPredictionCol("prediction") \
.setMetricName("rmse");
rmse = evaluator.evaluate(df_train_pred);
**5.** Czy wyniki zależą od wielkości zbioru uczącego? Podziel zbiór w proporcji ''[1-test_size,test_size]''? Poeksperymentuj z różnymi wartościami.
Wypróbuj formułę: ''test_size = 1-k*len(df2.columns)/count'', gdzie ''count = df2.count()''
Sprawdź dla k=1,2,3...10,20,30,...100,200,300
Ile danych (w stosunku do liczby atrybutów) potrzeba do nauczenie modelu, aby uzyskać akceptowalne rezultaty?
** Przedstaw wyniki w formie tabelki **
==== 3.2 Funkcje ====
**1.** Zaimplementuj następujące funkcje
def evaluate_metrics(df,metrics=['r2','rmse','mse','mae'],label_col='price',prediction_col='prediction'):
"""
Funkcja oblicza metryki regresji na podstawie zbioru danych zawierającego prawdziwe wartosci etykiet
i wartości przewidywane
:param df: Dataset wejściowy zbiór danych
:param metrics: lista metryk do obliczenia
:param label_col: nazwa kolumny z wartosciami ściowymi
:param prediction_col: nazwa kolumny zawierającej przewidywane wartosci
:return: słownik zawierajacy pary (nazwa_metryki,wartosc)
"""
def train_and_test(df,lr=LinearRegression()
.setMaxIter(100)
.setRegParam(3.0)
.setElasticNetParam(0.5)
.setFeaturesCol("features")
.setLabelCol("price")):
"""
Funkcja (1) dzieli zbiór danych na ``df_train`` oraz ``df_test``
(2) tworzy ciag przetwarzania zawierający ``VectorAssembler``
i przekazaną jako parametr konfiguracje algorytmu regresji
(3) buduje model i dokonuje predykcji dla ``df_train`` oraz ``df_test``
(4) wyświetla metryki
:param df: wejściowy zbiór danych
:param lr: konfiguracja algorytmu regresji
:return: model zwrócony przez ``pipeline.fit()``
"""
**2.** Przeprowadź regresję
**3.** Wydrukuj równanie regresji na podstawie współczynników w zwróconym modelu
Wartość i-tego współczynnika (odpowiadającego i-tej kolumnie można odczytać za pomocą następującego kodu:
model.stages[1].coefficients[i]
Oczekiwany wynik typu:
price=
+ bedrooms * -49991.75843365818
+ bathrooms * 55476.70050934714
+ sqft_living * 104.96315420356594
+ sqft_lot * 0.07232134546026013
+ floors * 2232.279968637929
+ waterfront * 537739.5601844544
+ view * 57847.597444994964
+ condition * 24601.336611537612
+ grade * 97515.11069734246
+ sqft_above * 96.13859631725865
+ sqft_basement * 73.33408127400288
+ yr_built * -2755.7047349795776
+ yr_renovated * 19.590837790620963
+ zipcode * -667.2478851292814
+ lat * 592243.1919793531
+ long * -234232.13414446148
+ sqft_living15 * 19.225301468618166
+ sqft_lot15 * -0.4392023871115651
+ unixdate * 0.0013054659138663496
+ 11531943.585431337
**4.** Przeanalizuj wpływ kilku atrybutów na cenę. Na przykład:
* O ile rośnie cena wraz ze wzrostem powierzchni o 1 m2
* O ile są droższe domy przy linii brzegowej
* Czy ceny zmieniały się z czasem. O ile miesięcznie?
Aby wpływ atrybutów ocenić należy uwzględnić zakresy wartości... Przykłady zapytań
df2.agg({'bathrooms':'max'}).collect()[0][0]
df2.agg({'bathrooms':'median'}).collect()[0][0]
df2.agg({'sqft_lot':'mean'}).collect()[0][0]
df2.agg({'bedrooms':'min'}).collect()[0][0]
==== 3.3 Walidacja krzyżowa ====
**1.** Poniższy kod przeprowadza walidację krzyżową wykonując ją na zbiorze treningowym. Podczas walidacji obliczana jest jedna metryka przekazana jako parametr
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
cols = df2.columns
cols.remove('price')
va=VectorAssembler().setInputCols(cols).setOutputCol("features")
lr=LinearRegression()\
.setMaxIter(100)\
.setRegParam(1) \
.setElasticNetParam(0.5) \
.setFeaturesCol("features")\
.setLabelCol("price")
pipeline = Pipeline(stages=[va,lr])
evaluator = RegressionEvaluator(metricName='r2',labelCol='price')
crossval = CrossValidator(estimator=pipeline,
estimatorParamMaps = ParamGridBuilder().build(),
evaluator=evaluator,
numFolds=5)
# Train the model using cross-validation
df_train,df_test = df2.randomSplit([0.9, 0.1],seed=1)
cvModel = crossval.fit(df_train)
print(f'AVG r2 {cvModel.avgMetrics}')
**2.** Użyj model ''cvModel'' do predykcji i obliczania wyników dla zbioru uczącego i testowego.
Czy poprawne jest, że wartość r2 dla całego zbioru jest inna niż wartość średnia uzyskana przy walidacji krzyżowej?
==== 3.4 Grid search ====
Walidacja krzyżowa służy do doboru hiperparametrów algorytmu.
**1.** Definiując konfigurację ''LinearRegression'' pomiń ''regParam'' i ''elasticNetParam''
**2.** Zdefiniuj siatkę parametrów
paramGrid = ParamGridBuilder() \
.addGrid(lr.regParam, [0.1, 1, 3,5]) \
.addGrid(lr.elasticNetParam, [0, 0.5, 1]) \
.build()
i skonfiguruj walidację krzyżową
crossval = CrossValidator(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=evaluator,
numFolds=5,parallelism=4)
**3.** Wykonaj walidację krzyżową na 90% danych i wyznacz najlepszy model (w świetle metryki r2)
Tym razem instrukcja ''print(f'AVG r2 {cvModel.avgMetrics}')'' wypisze listę 3*3=9 wartości.
bestModel = cvModel.bestModel
Wypisz jego parametry: ''regParam'' oraz ''elasticNetParam''
**4.** Oblicz metryki dla zbioru testowego (pozostawione 10% danych)
===== 4. Analiza cech =====
==== 4.1 Czy atrybuty są skorelowane ====
**1.** Wyznacz macierz korelacji
n = len(df2.columns)
rs = np.eye(n)
#oblicz elementy macierzy za pomocą instrukcji
r = df2.stat.corr(df2.columns[i],df2.columns[j])
**2.** Wydrukuj macierz
import itertools
import matplotlib.pyplot as plt
def plot_matrix(cm, labels,
normalize=False,
title='',
cmap=plt.cm.Blues):
fig = plt.figure(figsize=(10,10))
ax=fig.add_subplot(111)
ax.matshow(cm, interpolation='nearest', cmap=cmap)
plt.title(title)
# plt.colorbar()
tick_marks = np.arange(len(labels))
plt.xticks(tick_marks, labels, rotation=90)
ax.set_xticks(tick_marks)
ax.set_yticks(tick_marks)
ax.set_xticklabels(labels)
# ax.set_yticklabels(['']+labels)
ax.set_yticklabels(labels)
fmt = '.2f' #if normalize else 'd'
thresh = 0.5
for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
plt.text(j, i, format(cm[i, j], fmt),
horizontalalignment="center",
color="white" if cm[i, j] > thresh else "black")
plt.tight_layout()
plt.show()
plot_matrix(rs,labels=df2.columns)
**3.** Usuń kolumny słabo skorelowane z ceną (ewentualnie mocno skorelowane z innymi zmiennymi objaśniającymi), a następnie przeprowadź regresję i wyświetl wyniki
df3 = df2.drop(?,?,?,?,...)
train_and_test(df3)
==== 4.2 Transformacja logarytmiczna ceny ====
=== 4.2.1 Wstępne i końcowe przetwarzanie danych ===
**1.** Przeanalizuj wykresy. Który z wykresów zmiennej objaśnianej jest bliższy założeniom regresji liniowej?
plt.hist(df2.toPandas()['price'],bins=50)
plt.show()
plt.hist(np.log(df2.toPandas()['price']),bins=50)
plt.show()
**2.** Dodaj kolumnę ''logprice'' zawierającą wynik wyrażenia ''log(price)'' Wynik zapisz w ''df3''
**3.** Wyznacz i wyświetl macierz korelacji
Czy atrybuty są mocniej skorelowane z ''logprice''?
**4** Napisz i przetestuj kod (lub funkcję ''train_and_test_log''), która
* Przeprowadzi regresję na atrybucie ''logprice''
* Dokona predykjci dla zbioru treningowego i tetsowego
* Doda do nich kolumnę ''exp_pred'' jako wynik ''exp(prediction)'' oraz odtworzy oryginalną cenę
* Obliczy metryki i wypisze ich wartości
=== 4.2.2 Zastosowanie uogólnionego modelu liniowego (Generalized Linear Model) ===
Rozwiazanie jest analogiczne, jak przy regresji logistycznej, gdzie funkcją wiążącą jest
$$logit(p)=ln(\frac{p}{1-p})$$
ale w tym przypadku funkcją wiążącą jest po prostu $ln$, czyli dokonujemy regresji dla
$$ln(y)=w^Tx$$
Przetestuj poniższą konfigurację algorytmu. Jakie były wyniki?
from pyspark.ml.regression import GeneralizedLinearRegression
glr = GeneralizedLinearRegression(family="gaussian",
link="log",
maxIter=100,
regParam=0.1,
featuresCol="features",
labelCol="price")
df3 = df2.drop(....)
==== 4.3 Variance Inflation Factor ====
Variance Inflation Factor (VIF) to miara pozwalająca określić stopień zależności liniowej pomiędzy wieloma zmiennymi objasniającymi.
Patrz: [[https://www.investopedia.com/terms/v/variance-inflation-factor.asp]]
Wartość VIF dla poszczególnych zmiennych $X = {x_1,...,x_n}$ określa się empirycznie przez
* Przeprowadzenie regresji $X_i = f(X \setminus {x_i})$ - zależność atrybutu $x_i$ od pozostałych atrybutów
* Wyznaczenie wartości współczynnika determinacji $r^2_i$
* Obliczenie $VIF(x_i)=\frac{1}{1-r^2_i}$
Wartości VIF powyżej 5 wskazują na wielokrotną liniową zależność i taka zmienna jest zazwyczaj usuwana.
**1.** Napisz funkcję
def compute_vif(df,label_col):
"""
Funkcja przeprowadza regresję na całym zbiorze danych, traktując label_col, jako zmienną objasnianą.
Czyli konfigurując ''Vector Assembler'' pomija tę kolumnę. buduje model regresji i odczytuje
wartośc współczynnika determinacji ''r2'' z model.summary. Następnie zwraca 1/(1-r2)
Uwaga, jeżeli r2==1, nelezy odjąć od r2 niewielką wartośc, np. 1e-10
:param df: wejściowy zbiór danych
:param label_col: nazwa atrybut, dla którego należy obliczyć VIF
:return: wartość parazmetru VIF
"""
**2.** Oblicz wartości VIF, a następnie przedstaw je w postaci tabelki (np. Pandas Dataframe posortowanej według wartości VIF malejąco)
df3 = df2.drop('price')
vifs = []
for f in df3.columns:
vifs.append((f,compute_vif(df3,f)))
print(vifs)
**3.** Przeprowadź regresję po odrzuceniu współliniowych atrybutów i podaj wyniki
**4.** Przeprowadź regresję pozostawiając jeden ze współliniowych atrybutów (sqft_living)
==== 4.4 Normalizacja cech ====
**1.** Napisz funkcję dodającą etap skalowania cech do ciagu przetwarzania ''pipeline''
from pyspark.ml.feature import StandardScaler
def train_and_test_scaled(df,lr=LinearRegression()
.setMaxIter(100)
.setRegParam(3.0)
.setElasticNetParam(0.5)
.setFeaturesCol("features_scaled")
.setLabelCol("price"),apply_normalization = False):
"""
Funkcja konfiguruje ciąg przetwarzania obejmujący trzy etapy: VectoAssembler, StandardScaler, LinearRegression
param: df
param: lr
return: model
"""
**2.** Przetestuj jej działanie
**3.** Wydrukuj równanie regresji dla przeskalowanych zmiennych
**Czy skalowanie ma wpływ na działanie algorytmu?** W przypadku OLS nie. W przypadku regularyzacji tak. Zastanów się dlaczego?
===== 5. Cechy wielomianowe =====
==== 5.1 Czy lokalizacja domów ma wpływ na cenę? ====
Jeżeli chcesz, możesz zainstalować dodatkowe biblioteki i wyświetlić interaktywną mapę...
!pip install geopandas folium mapclassify
pdf= df2.toPandas()
import geopandas as gpd
gdf = gpd.GeoDataFrame(pdf, geometry=gpd.GeoSeries.from_xy(pdf['long'], pdf['lat']), crs=4326)
gdf.explore('price')
{{ :ed:kc_house_data_all.png?direct&400 |}}
Różnice nie są szczególnie widoczne, ponieważ skala obejmuje domy od najtańszych (75000) do najdroższych (7700000), przy medianie 45000
df2.agg({'price':'min'}).collect()[0][0]
df2.agg({'price':'max'}).collect()[0][0]
Ograniczenie danych do kwantyla 0.95 pozwala lepiej zobrazować zależności od położenia
gdf[gdf.price<=gdf.price.quantile(.95)].explore('price')
{{ :ed:kc_house_data_95.png?direct&400 |}}
Dodamy cechy wielomianowe dla długości i szerokości geograficznej
**1.** Dodaj kolumny:
*'lat_2' zawierającą wartości wyrażenia ''lat*lat''
*'long_2' zawierającą wartości wyrażenia ''long*long''
*'lat_long' zawierającą wartości wyrażenia ''lat*long''
i przeprowadź regresję. Jakie były wyniki?
**2.** Wydrukuj równanie regresji.
Jaką krzywą jest opisana zależność między ''lat'' i ''long''
==== 5.2 Cechy wielomianowe dla wszystkich atrybutów ====
**1.** Zaimplementuj funkcję dokonującą ekspansji wielomianowej dla wszystkich atrybutów
from pyspark.ml.feature import MinMaxScaler
def train_and_test_scaled_polynomial_features(df,lr=LinearRegression()
.setMaxIter(100)
.setRegParam(3.0)
.setElasticNetParam(0.5)
.setFeaturesCol("features_poly")
.setLabelCol("price")):
"""
(1) Buduje ciąg przetwarzania obejmujący VectorAssembler, MinMaxScaler, PolynomialExpansioni lr
(2) Dzieli zbiór na treningowy i testowy
(3) Przeprowadza regresje i drukuje metryki dla zbioru treningowego i testowego
param: df - zbiór danych
param: lr - algorytm regresji
returns: model pipeline
"""
**2.** Przetestuj funkcję. Czy wyniki poprawiły się?
==== 5.3 Inne algorytmy regresji ====
Przetestuj funkcję ''train_and_test_scaled_polynomial_features'' dla innych algorytmów regresji. Który osiągnął najlepsze rezultaty (przyjmując jako kryterium r2 na zbiorze testowym).
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import GBTRegressor
for r in [DecisionTreeRegressor(),RandomForestRegressor(),GBTRegressor()]:
print(f'=== {r.__class__.__name__} ===')