Pronóstico de la popularidad de libros#

  • Última modificación: Junio 22, 2019

En este tutorial se presentan varios métodos de clasificación usando PySpark, aplicados a un problema de minería de texto. Al finalizar el estudio de este documento, el lector estará en capacidad de:

  • Aplicar técnicas de preparación de texto en Spark.

  • Usar modelos de regresión logística,

Definición del problema#

La editorial O’Really desea construir una herramienta analítica que le permita a un editor estimar la popularidad relativa de un nuevo libro antes de su lanzamiento, con el fin de poder priorizar los títulos a publicar e inclusive rechazar posibles proyectos editoriales.

Para resolver este problema se tiene una base de datos con los 100 libros más vendidos por O’Really durante el año 2011. La base contiene el título del libro, su descripción y su ranking en pupularidad. Para este caso se hipotetiza que la aparición de ciertas palabras en la descripción del libro permitirá determinar su popularidad.

Solución#

[1]:
##
## Carga de las librerías de Spark
##
import findspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

findspark.init()

APP_NAME = "spark-logreg-app"

conf = SparkConf().setAppName(APP_NAME)
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

Exploración#

[2]:
!wget https://raw.githubusercontent.com/jdvelasq/datalabs/master/datasets/oreilly.csv
--2020-11-01 03:09:34--  https://raw.githubusercontent.com/jdvelasq/datalabs/master/datasets/oreilly.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 199.232.48.133
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|199.232.48.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 203329 (199K) [text/plain]
Saving to: 'oreilly.csv.1'

oreilly.csv.1       100%[===================>] 198.56K   852KB/s    in 0.2s

2020-11-01 03:09:34 (852 KB/s) - 'oreilly.csv.1' saved [203329/203329]

[3]:
import pandas as pd
##
## Este archivo resulta particularmente difícil de
## leer en Spark, por lo que se lee usando Pandas
## para luego cargarlo en Spark.
##
pdDF = pd.read_csv(
    "oreilly.csv",
    sep = ',',           # separador de campos
    thousands = None,    # separador de miles para números
    decimal = '.',       # separador de los decimales para números
    encoding='latin-1')  # idioma
[4]:
##
## Se crea el esquema de la tabla en Spark
##
from pyspark.sql.types import *

mySchema = StructType([
    StructField("IP_Family", StringType(), True),\
    StructField("BOOK_title", StringType(), True),\
    StructField("BOOK_ISBN", StringType(), True),\
    StructField("Rank", IntegerType(), True),\
    StructField("Long_Desc", StringType(), True)])

##
## Se crea el DataFrame de Spark a partir
## del DataFrame de Spark
##
df = spark.createDataFrame(pdDF, schema=mySchema)
[5]:
##
## Se verifican los tipos de los
## campos del DataFrame
##
df.printSchema()
root
 |-- IP_Family: string (nullable = true)
 |-- BOOK_title: string (nullable = true)
 |-- BOOK_ISBN: string (nullable = true)
 |-- Rank: integer (nullable = true)
 |-- Long_Desc: string (nullable = true)

Construcción de los modelos

Transformación del texto en variables numéricas#

[6]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

##
## Se usa el tokenizer para separar las palabras. Cada
## elemento de la columna words es una lista con las
## palabras que conforman el texto
##
tokenizer = Tokenizer(inputCol="Long_Desc", outputCol="words")
df = tokenizer.transform(df)
df.select('Long_Desc', 'words').show()
+--------------------+--------------------+
|           Long_Desc|               words|
+--------------------+--------------------+
|Perl is a powerfu...|[perl, is, a, pow...|
|JavaScript is a p...|[javascript, is, ...|
|You're not alone....|[you're, not, alo...|
|Learning a comple...|[learning, a, com...|
|With Leopard, App...|[with, leopard,, ...|
|Tired of reading ...|[tired, of, readi...|
|This bestselling ...|[this, bestsellin...|
|You may have seen...|[you, may, have, ...|
|You can set your ...|[you, can, set, y...|
|Once a little-kno...|[once, a, little-...|
|JavaScript is a p...|[javascript, is, ...|
|<i>Web Design in ...|[<i>web, design, ...|
|New York Times co...|[new, york, times...|
|In this update of...|[in, this, update...|
|The <i>Perl Cookb...|[the, <i>perl, co...|
|For a company tha...|[for, a, company,...|
|If you are a Web ...|[if, you, are, a,...|
|Apple says that M...|[apple, says, tha...|
|<p>This Fifth Edi...|[<p>this, fifth, ...|
|If you ask Perl p...|[if, you, ask, pe...|
+--------------------+--------------------+
only showing top 20 rows

[7]:
from pyspark.ml.feature import StopWordsRemover

##
## Se procede a remover las stop words del texto
##
df = StopWordsRemover(inputCol="words",
                      outputCol="filtered").transform(df)
df.select('words', 'filtered').show()
+--------------------+--------------------+
|               words|            filtered|
+--------------------+--------------------+
|[perl, is, a, pow...|[perl, powerful, ...|
|[javascript, is, ...|[javascript, powe...|
|[you're, not, alo...|[alone.<br, />, <...|
|[learning, a, com...|[learning, comple...|
|[with, leopard,, ...|[leopard,, apple,...|
|[tired, of, readi...|[tired, reading, ...|
|[this, bestsellin...|[bestselling, qui...|
|[you, may, have, ...|[may, seen, unix,...|
|[you, can, set, y...|[set, watch, it:,...|
|[once, a, little-...|[little-known, pr...|
|[javascript, is, ...|[javascript, powe...|
|[<i>web, design, ...|[<i>web, design, ...|
|[new, york, times...|[new, york, times...|
|[in, this, update...|[update, bestsell...|
|[the, <i>perl, co...|[<i>perl, cookboo...|
|[for, a, company,...|[company, promise...|
|[if, you, are, a,...|[web, content, de...|
|[apple, says, tha...|[apple, says, mac...|
|[<p>this, fifth, ...|[<p>this, fifth, ...|
|[if, you, ask, pe...|[ask, perl, progr...|
+--------------------+--------------------+
only showing top 20 rows

[8]:
##
## Una vez se han removido las stopwords, se
## procede a transformar el texto en bag-of-words
##
hashingTF = HashingTF(
    inputCol="filtered",
    outputCol="rawFeatures",
    numFeatures=50)

df = hashingTF.transform(df)

df.select(['filtered', 'rawFeatures']).show()
+--------------------+--------------------+
|            filtered|         rawFeatures|
+--------------------+--------------------+
|[perl, powerful, ...|(50,[0,1,2,3,4,5,...|
|[javascript, powe...|(50,[0,2,4,5,6,7,...|
|[alone.<br, />, <...|(50,[0,1,2,3,5,6,...|
|[learning, comple...|(50,[0,1,2,3,4,5,...|
|[leopard,, apple,...|(50,[0,1,2,3,4,5,...|
|[tired, reading, ...|(50,[0,1,2,3,4,5,...|
|[bestselling, qui...|(50,[0,1,2,3,4,5,...|
|[may, seen, unix,...|(50,[0,1,2,3,4,5,...|
|[set, watch, it:,...|(50,[0,1,2,3,4,5,...|
|[little-known, pr...|(50,[0,1,2,3,4,5,...|
|[javascript, powe...|(50,[0,2,3,4,5,6,...|
|[<i>web, design, ...|(50,[1,2,3,4,5,7,...|
|[new, york, times...|(50,[0,1,2,3,4,5,...|
|[update, bestsell...|(50,[0,1,2,3,5,6,...|
|[<i>perl, cookboo...|(50,[0,1,2,3,4,5,...|
|[company, promise...|(50,[0,1,2,3,4,5,...|
|[web, content, de...|(50,[0,1,2,3,4,5,...|
|[apple, says, mac...|(50,[0,1,2,3,4,5,...|
|[<p>this, fifth, ...|(50,[0,2,3,4,5,6,...|
|[ask, perl, progr...|(50,[0,2,3,5,6,7,...|
+--------------------+--------------------+
only showing top 20 rows

[9]:
##
## Luego se procede a reescalar los valores
## usando la función IDF para mejorar el
## desempeño del modelo
##

idf = IDF(inputCol="rawFeatures",
          outputCol="features")

idfModel = idf.fit(df)

df = idfModel.transform(df)

df.select("filtered", "rawFeatures", "features").show()
+--------------------+--------------------+--------------------+
|            filtered|         rawFeatures|            features|
+--------------------+--------------------+--------------------+
|[perl, powerful, ...|(50,[0,1,2,3,4,5,...|(50,[0,1,2,3,4,5,...|
|[javascript, powe...|(50,[0,2,4,5,6,7,...|(50,[0,2,4,5,6,7,...|
|[alone.<br, />, <...|(50,[0,1,2,3,5,6,...|(50,[0,1,2,3,5,6,...|
|[learning, comple...|(50,[0,1,2,3,4,5,...|(50,[0,1,2,3,4,5,...|
|[leopard,, apple,...|(50,[0,1,2,3,4,5,...|(50,[0,1,2,3,4,5,...|
|[tired, reading, ...|(50,[0,1,2,3,4,5,...|(50,[0,1,2,3,4,5,...|
|[bestselling, qui...|(50,[0,1,2,3,4,5,...|(50,[0,1,2,3,4,5,...|
|[may, seen, unix,...|(50,[0,1,2,3,4,5,...|(50,[0,1,2,3,4,5,...|
|[set, watch, it:,...|(50,[0,1,2,3,4,5,...|(50,[0,1,2,3,4,5,...|
|[little-known, pr...|(50,[0,1,2,3,4,5,...|(50,[0,1,2,3,4,5,...|
|[javascript, powe...|(50,[0,2,3,4,5,6,...|(50,[0,2,3,4,5,6,...|
|[<i>web, design, ...|(50,[1,2,3,4,5,7,...|(50,[1,2,3,4,5,7,...|
|[new, york, times...|(50,[0,1,2,3,4,5,...|(50,[0,1,2,3,4,5,...|
|[update, bestsell...|(50,[0,1,2,3,5,6,...|(50,[0,1,2,3,5,6,...|
|[<i>perl, cookboo...|(50,[0,1,2,3,4,5,...|(50,[0,1,2,3,4,5,...|
|[company, promise...|(50,[0,1,2,3,4,5,...|(50,[0,1,2,3,4,5,...|
|[web, content, de...|(50,[0,1,2,3,4,5,...|(50,[0,1,2,3,4,5,...|
|[apple, says, mac...|(50,[0,1,2,3,4,5,...|(50,[0,1,2,3,4,5,...|
|[<p>this, fifth, ...|(50,[0,2,3,4,5,6,...|(50,[0,2,3,4,5,6,...|
|[ask, perl, progr...|(50,[0,2,3,5,6,7,...|(50,[0,2,3,5,6,7,...|
+--------------------+--------------------+--------------------+
only showing top 20 rows

Construcción de la variable de salida#

A continuación, se procede a construir una variable binaria de salida para indicar el ranking de los libros. Si la variable vale 1, el libro pertence a los primeros 50, y 0 en caso contrario.

[10]:
from pyspark.sql.functions import lit
from pyspark.sql.functions import when
from pyspark.sql.types import DoubleType

df = df.withColumn(
    'label',
    when(df['Rank'] >= 50, lit(0).cast(DoubleType())).otherwise(lit(1).cast(DoubleType())))

df.select('Rank').show()
+----+
|Rank|
+----+
|   1|
|   2|
|   3|
|   4|
|   5|
|   6|
|   7|
|   8|
|   9|
|  10|
|  11|
|  12|
|  13|
|  14|
|  15|
|  16|
|  17|
|  18|
|  19|
|  20|
+----+
only showing top 20 rows

Regresión Logística#

A continuación se construye un modelo de regresión logística que pronóstica si un libro pertence al grupo de los primeros 50 o no, con base en las palabras del abstract y que ya fueron representadas como un bag-of-words.

[11]:
## Importa la librería
from pyspark.ml.classification import LogisticRegression

## Parametriza el modelo
lr = LogisticRegression(
    featuresCol='features',
    labelCol='label',
    rawPredictionCol='rawLR',
    probabilityCol='probLR',
    predictionCol='LR',
    maxIter=1000,
    regParam=0.1,
    elasticNetParam=0.8)

## Entrena el modelo
model = lr.fit(df)

## Pronostico
df = model.transform(df)

Arbol de clasificación#

[12]:
from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier(
    labelCol="label",
    featuresCol="features",
    predictionCol='DTC',
    rawPredictionCol='rawDTC',
    probabilityCol='probDTC')

model = dt.fit(df)

df = model.transform(df)

Random Forest#

[13]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(
    labelCol="label",
    featuresCol="features",
    predictionCol='RFC',
    rawPredictionCol='rawRFC',
    probabilityCol='probRFC',
    numTrees=10)

model = rf.fit(df)

df = model.transform(df)

Resumen de resultados#

[14]:
##
## Pronósticos para los modelos
##
df.select(['label', 'LR', 'DTC', 'RFC']).show()
+-----+---+---+---+
|label| LR|DTC|RFC|
+-----+---+---+---+
|  1.0|0.0|1.0|1.0|
|  1.0|1.0|1.0|1.0|
|  1.0|0.0|1.0|1.0|
|  1.0|0.0|1.0|1.0|
|  1.0|1.0|1.0|1.0|
|  1.0|1.0|1.0|1.0|
|  1.0|1.0|1.0|1.0|
|  1.0|0.0|1.0|1.0|
|  1.0|1.0|1.0|1.0|
|  1.0|1.0|1.0|1.0|
|  1.0|1.0|1.0|1.0|
|  1.0|1.0|1.0|1.0|
|  1.0|1.0|1.0|1.0|
|  1.0|1.0|1.0|1.0|
|  1.0|1.0|1.0|1.0|
|  1.0|0.0|1.0|1.0|
|  1.0|1.0|1.0|1.0|
|  1.0|1.0|1.0|1.0|
|  1.0|1.0|1.0|1.0|
|  1.0|1.0|1.0|1.0|
+-----+---+---+---+
only showing top 20 rows

[15]:
##
## Métricas de desempeño
##

from pyspark.mllib.evaluation import BinaryClassificationMetrics

for m in ['LR', 'DTC', 'RFC']:

    data = df.select(['label', m]).rdd.map(lambda x: (x[0], x[1]))
    metrics = BinaryClassificationMetrics(data)
    print(m)
    print('  areaUnderROC :', metrics.areaUnderROC)
    print('  areaUnderPR  :', metrics.areaUnderPR)
    print()
LR
  areaUnderROC : 0.6749999999999999
  areaUnderPR  : 0.5457142857142857

DTC
  areaUnderROC : 0.9102564102564101
  areaUnderPR  : 0.8805442176870748

RFC
  areaUnderROC : 0.9803921568627452
  areaUnderPR  : 0.9903921568627451