Tracking con PySpark usando el sistema local de archivos#

  • Ultima modificación: Mayo 14, 2022

Código base#

[1]:
def load_data():

    import pandas as pd

    url = "http://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv"
    df = pd.read_csv(url, sep=";")

    y = df["quality"]
    x = df.copy()
    x.pop("quality")

    return x, y


def make_train_test_split(x, y):

    from sklearn.model_selection import train_test_split

    (x_train, x_test, y_train, y_test) = train_test_split(
        x,
        y,
        test_size=0.25,
        random_state=123456,
    )
    return x_train, x_test, y_train, y_test


def eval_metrics(df):

    from pyspark.mllib.evaluation import RegressionMetrics as rmtrcs

    metrics = rmtrcs(df.rdd.map(lambda x: (x.quality, x.prediction)))

    mse = metrics.meanSquaredError
    mae = metrics.meanAbsoluteError
    r2 = metrics.r2

    return mse, mae, r2


def report(estimator, mse, mae, r2):

    print(estimator, ":", sep="")
    print(f"  MSE: {mse}")
    print(f"  MAE: {mae}")
    print(f"  R2: {r2}")

MLflow Tracking#

[2]:
def make_experiment(experiment_name, regParam, elasticNetParam, verbose=1):

    import os

    import pandas as pd
    from pyspark.ml.feature import VectorAssembler
    from pyspark.ml.regression import LinearRegression
    from pyspark.sql import SparkSession

    import mlflow

    x, y = load_data()
    x_train, x_test, y_train, y_test = make_train_test_split(x, y)

    pdf_train = pd.concat(
        [x_train, pd.to_numeric(y_train, downcast="float")], axis="columns"
    )
    pdf_test = pd.concat(
        [x_test, pd.to_numeric(y_test, downcast="float")], axis="columns"
    )

    #
    # Spark
    #
    spark = SparkSession.builder.getOrCreate()

    df_train = spark.createDataFrame(pdf_train)
    df_test = spark.createDataFrame(pdf_test)

    vectorAssembler = VectorAssembler(
        inputCols=list(pdf_train.columns[:-1]),
        outputCol="features",
    )
    df_train = vectorAssembler.transform(df_train)
    df_test = vectorAssembler.transform(df_test)

    lr = LinearRegression(
        featuresCol="features",
        labelCol="quality",
        predictionCol="prediction",
        maxIter=1000,
        regParam=regParam,
        elasticNetParam=elasticNetParam,
        fitIntercept=True,
        standardization=True,
    )

    if not os.path.exists("mlruns"):
        os.makedirs("mlruns")
    mlflow.set_tracking_uri("file:///workspace/mlflow/corridas")
    print("Tracking directory:", mlflow.get_tracking_uri())

    #
    # Establece el directorio de tracking. Esta es la dirección absoluta al
    # directorio actual en este ejemplo.
    #
    mlflow.pyspark.ml.autolog()

    #
    # Almancena las corridas  en el experimento indicado
    #
    mlflow.set_experiment(experiment_name)

    with mlflow.start_run() as run:

        run = mlflow.active_run()
        print("Active run_id: {}".format(run.info.run_id))

        model = lr.fit(df_train)

        df_test = model.transform(df_test)

        mse, mae, r2 = eval_metrics(df_test)
        if verbose > 0:
            report(model, mse, mae, r2)

        mlflow.log_metric("mse", mse)
        mlflow.log_metric("mae", mae)
        mlflow.log_metric("r2", r2)

    spark.stop()
[3]:
#
# Se realizar el primer tanteo
#
make_experiment(
    experiment_name="red-wine",
    regParam=0.00001,
    elasticNetParam=0.00001,
    verbose=1,
)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Tracking directory: file:///workspace/mlflow/corridas
Active run_id: 2e5f343a65104d28866207a10a55334a

LinearRegressionModel: uid=LinearRegression_7c8a85716278, numFeatures=11:
  MSE: 0.45526360186470766
  MAE: 0.5292753571424285
  R2: -1.114457075123866
[4]:
#
# Se realizar el segundo tanteo
#
make_experiment(
    experiment_name="red-wine",
    regParam=0.0005,
    elasticNetParam=0.0001,
    verbose=1,
)
Tracking directory: file:///workspace/mlflow/corridas
Active run_id: 5b49f614696847119bcfac1e8390121a

LinearRegressionModel: uid=LinearRegression_c74fc1588d38, numFeatures=11:
  MSE: 0.4552568409331781
  MAE: 0.5292824166305712
  R2: -1.116331285203061

MLflow ui#

Para visualizar la interfase use:

mlflow ui

Nota: En docker usar:

mlflow ui --host 0.0.0.0

con:

http://127.0.0.1:5001

assets/mlflow-tracking-3-pyspark-part-0.png

Detalles de la corrida

assets/mlflow-tracking-3-pyspark-part-1.png assets/mlflow-tracking-3-pyspark-part-2.png assets/mlflow-tracking-3-pyspark-part-3.png

Chequeo#

[5]:
# def check_estimator():
#
#     def local_eval_metrics(y_true, y_pred):
#
#         from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
#
#         mse = mean_squared_error(y_true, y_pred)
#         mae = mean_absolute_error(y_true, y_pred)
#         r2 = r2_score(y_true, y_pred)
#
#         return mse, mae, r2
#
#     import pandas as pd
#     from pyspark.ml.feature import VectorAssembler
#     from pyspark.sql import SparkSession
#
#     import mlflow
#
#     x, y = load_data()
#     x_train, x_test, y_train, y_test = make_train_test_split(x, y)
#
#     # -----------------------------------------------------------------------------------
#     # Debe prepararse la data. Este es un buen ejemplo para mostrar las dificultades
#     # asociadas a no tener un feature store.
#     #
#     pdf_test = pd.concat(
#         [x_test, pd.to_numeric(y_test, downcast="float")], axis="columns"
#     )
#
#     spark = SparkSession.builder.getOrCreate()
#     df_test = spark.createDataFrame(pdf_test)
#
#     vectorAssembler = VectorAssembler(
#         inputCols=list(pdf_test.columns[:-1]),
#         outputCol="features",
#     )
#     df_test = vectorAssembler.transform(df_test)
#     x_test = df_test.toPandas()
#     spark.stop()
#     # -----------------------------------------------------------------------------------
#
#     # NOTA: este parámetro es copiado directamente de la interfase de MLflow
#     estimator_path = "runs:/02298313752742d6ae411626d5aff18e/model"
#     estimator = mlflow.pyfunc.load_model(estimator_path)
#
#     y_pred = estimator.predict(pd.DataFrame(x_test))
#
#     mse, mae, r2 = local_eval_metrics(y_test, y_pred)
#     report(estimator, mse, mae, r2)
#
#
# #
# # Debe coincidir con el mejor modelo encontrado en la celdas anteriores
# #
# check_estimator()
[6]:
# -----------------------------------------------------------------------------
# No se borran las corridas para comparar resultados con otras librerías
# -----------------------------------------------------------------------------
# %%bash
# rm -rf outputs mlruns models