Tracking con PySpark usando el sistema local de archivos#
Ultima modificación: Mayo 14, 2022
Código base#
[1]:
def load_data():
import pandas as pd
url = "http://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv"
df = pd.read_csv(url, sep=";")
y = df["quality"]
x = df.copy()
x.pop("quality")
return x, y
def make_train_test_split(x, y):
from sklearn.model_selection import train_test_split
(x_train, x_test, y_train, y_test) = train_test_split(
x,
y,
test_size=0.25,
random_state=123456,
)
return x_train, x_test, y_train, y_test
def eval_metrics(df):
from pyspark.mllib.evaluation import RegressionMetrics as rmtrcs
metrics = rmtrcs(df.rdd.map(lambda x: (x.quality, x.prediction)))
mse = metrics.meanSquaredError
mae = metrics.meanAbsoluteError
r2 = metrics.r2
return mse, mae, r2
def report(estimator, mse, mae, r2):
print(estimator, ":", sep="")
print(f" MSE: {mse}")
print(f" MAE: {mae}")
print(f" R2: {r2}")
MLflow Tracking#
[2]:
def make_experiment(experiment_name, regParam, elasticNetParam, verbose=1):
import os
import pandas as pd
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.sql import SparkSession
import mlflow
x, y = load_data()
x_train, x_test, y_train, y_test = make_train_test_split(x, y)
pdf_train = pd.concat(
[x_train, pd.to_numeric(y_train, downcast="float")], axis="columns"
)
pdf_test = pd.concat(
[x_test, pd.to_numeric(y_test, downcast="float")], axis="columns"
)
#
# Spark
#
spark = SparkSession.builder.getOrCreate()
df_train = spark.createDataFrame(pdf_train)
df_test = spark.createDataFrame(pdf_test)
vectorAssembler = VectorAssembler(
inputCols=list(pdf_train.columns[:-1]),
outputCol="features",
)
df_train = vectorAssembler.transform(df_train)
df_test = vectorAssembler.transform(df_test)
lr = LinearRegression(
featuresCol="features",
labelCol="quality",
predictionCol="prediction",
maxIter=1000,
regParam=regParam,
elasticNetParam=elasticNetParam,
fitIntercept=True,
standardization=True,
)
if not os.path.exists("mlruns"):
os.makedirs("mlruns")
mlflow.set_tracking_uri("file:///workspace/mlflow/corridas")
print("Tracking directory:", mlflow.get_tracking_uri())
#
# Establece el directorio de tracking. Esta es la dirección absoluta al
# directorio actual en este ejemplo.
#
mlflow.pyspark.ml.autolog()
#
# Almancena las corridas en el experimento indicado
#
mlflow.set_experiment(experiment_name)
with mlflow.start_run() as run:
run = mlflow.active_run()
print("Active run_id: {}".format(run.info.run_id))
model = lr.fit(df_train)
df_test = model.transform(df_test)
mse, mae, r2 = eval_metrics(df_test)
if verbose > 0:
report(model, mse, mae, r2)
mlflow.log_metric("mse", mse)
mlflow.log_metric("mae", mae)
mlflow.log_metric("r2", r2)
spark.stop()
[3]:
#
# Se realizar el primer tanteo
#
make_experiment(
experiment_name="red-wine",
regParam=0.00001,
elasticNetParam=0.00001,
verbose=1,
)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Tracking directory: file:///workspace/mlflow/corridas
Active run_id: 2e5f343a65104d28866207a10a55334a
LinearRegressionModel: uid=LinearRegression_7c8a85716278, numFeatures=11:
MSE: 0.45526360186470766
MAE: 0.5292753571424285
R2: -1.114457075123866
[4]:
#
# Se realizar el segundo tanteo
#
make_experiment(
experiment_name="red-wine",
regParam=0.0005,
elasticNetParam=0.0001,
verbose=1,
)
Tracking directory: file:///workspace/mlflow/corridas
Active run_id: 5b49f614696847119bcfac1e8390121a
LinearRegressionModel: uid=LinearRegression_c74fc1588d38, numFeatures=11:
MSE: 0.4552568409331781
MAE: 0.5292824166305712
R2: -1.116331285203061
MLflow ui#
Para visualizar la interfase use:
mlflow ui
Nota: En docker usar:
mlflow ui --host 0.0.0.0
con:
Detalles de la corrida
Chequeo#
[5]:
# def check_estimator():
#
# def local_eval_metrics(y_true, y_pred):
#
# from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
#
# mse = mean_squared_error(y_true, y_pred)
# mae = mean_absolute_error(y_true, y_pred)
# r2 = r2_score(y_true, y_pred)
#
# return mse, mae, r2
#
# import pandas as pd
# from pyspark.ml.feature import VectorAssembler
# from pyspark.sql import SparkSession
#
# import mlflow
#
# x, y = load_data()
# x_train, x_test, y_train, y_test = make_train_test_split(x, y)
#
# # -----------------------------------------------------------------------------------
# # Debe prepararse la data. Este es un buen ejemplo para mostrar las dificultades
# # asociadas a no tener un feature store.
# #
# pdf_test = pd.concat(
# [x_test, pd.to_numeric(y_test, downcast="float")], axis="columns"
# )
#
# spark = SparkSession.builder.getOrCreate()
# df_test = spark.createDataFrame(pdf_test)
#
# vectorAssembler = VectorAssembler(
# inputCols=list(pdf_test.columns[:-1]),
# outputCol="features",
# )
# df_test = vectorAssembler.transform(df_test)
# x_test = df_test.toPandas()
# spark.stop()
# # -----------------------------------------------------------------------------------
#
# # NOTA: este parámetro es copiado directamente de la interfase de MLflow
# estimator_path = "runs:/02298313752742d6ae411626d5aff18e/model"
# estimator = mlflow.pyfunc.load_model(estimator_path)
#
# y_pred = estimator.predict(pd.DataFrame(x_test))
#
# mse, mae, r2 = local_eval_metrics(y_test, y_pred)
# report(estimator, mse, mae, r2)
#
#
# #
# # Debe coincidir con el mejor modelo encontrado en la celdas anteriores
# #
# check_estimator()
[6]:
# -----------------------------------------------------------------------------
# No se borran las corridas para comparar resultados con otras librerías
# -----------------------------------------------------------------------------
# %%bash
# rm -rf outputs mlruns models