Segmentación del mercado de adolecentes en PySpark#
30 min | Última modificación: Junio 22, 2019
En este tutorial se aplica el algoritmo K-means para clasificar un grupo de adolecentes con base en sus intéreses, con el fin de diseñar estrategias publicitarias y servicios encaminados a cada grupo de interés usando PySpark. Este tutorial se enfoca en la programación de PySpark y no en el análisis del problema. Para abordar este tutorial, el lector debe tener suficiencia en los módulos correspondientes de analítica predictiva.
Definición del problema#
Un vendedor desea enviar publicidad electrónica a una población de adolecentes y adultos jóvenes con el fin de maximizar sus ventas. Para ello, desea poder clasificar a sus clientes potenciales por grupos de interés de acuerdo con sus intereses y consecuentemente enviar publicidad específica a cada uno de ellos.
En este problema se desea determina que grupos de interés existen en una población de clientes a partir de los mensajes enviados por un servicio de redes sociales. La información disponible consiste en 30000 observaciones de 40 variables que podrían caracterizar los intereses de la población analizada. Estas variables corresponden a palabras que pueden asociarse a un interés de la poblaión analizada. Cada variable mide la frecuencia con que una determinada palabra aparece en los mensajes de texto; adicionalmente, dentro de estas variables se incluye información como el sexo, la edad y la cantidad de contactos de la persona.
Solución#
Preparación#
[1]:
##
## Se cargan librerías auxiliares que
## pueden ser de utilidad en el desarrollo
## del código
##
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
[2]:
##
## Carga de las librerías de Spark
##
import findspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
findspark.init()
APP_NAME = "spark-kmeans-app"
conf = SparkConf().setAppName(APP_NAME)
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
### Carga de datos
El archivo con los datos se encuentra en la carpeta actual de trabajo en la máquina local.
[3]:
!wget https://raw.githubusercontent.com/jdvelasq/datalabs/master/datasets/snsdata.csv
--2019-11-15 00:56:14-- https://raw.githubusercontent.com/jdvelasq/datalabs/master/datasets/snsdata.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 199.232.48.133
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|199.232.48.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2631136 (2.5M) [text/plain]
Saving to: 'snsdata.csv'
snsdata.csv 100%[===================>] 2.51M 1.97MB/s in 1.3s
2019-11-15 00:56:18 (1.97 MB/s) - 'snsdata.csv' saved [2631136/2631136]
[4]:
##
## Contenido del archivo
##
!head snsdata.csv
gradyear,gender,age,friends,basketball,football,soccer,softball,volleyball,swimming,cheerleading,baseball,tennis,sports,cute,sex,sexy,hot,kissed,dance,band,marching,music,rock,god,church,jesus,bible,hair,dress,blonde,mall,shopping,clothes,hollister,abercrombie,die,death,drunk,drugs
2006,M,18.982,7,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
2006,F,18.801,0,0,1,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,2,2,1,0,0,0,6,4,0,1,0,0,0,0,0,0,0,0
2006,M,18.335,69,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,2,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0
2006,F,18.875,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
2006,NA,18.995,10,0,0,0,0,0,0,0,0,0,0,0,1,0,0,5,1,1,0,3,0,1,0,0,0,1,0,0,0,2,0,0,0,0,0,1,1
2006,F,,142,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,1,2,0,0,0,0,0,0,1,0,0,1,0,0,0,0,0,1,0
2006,F,18.93,72,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,0,0,0,0,0,0,0,0,0,2,0,0,2,0,0,0,0,0
2006,M,18.322,17,0,0,0,1,0,0,0,0,0,0,0,2,1,0,0,0,0,0,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
2006,F,19.055,52,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
[5]:
##
## Mueve el archivo de datos al hdfs
##
!hdfs dfs -copyFromLocal snsdata.csv /tmp/snsdata.csv
[6]:
##
## Se carga el archivo en PySpark
##
spark_df = spark.read.load("/tmp/snsdata.csv",
format="csv",
sep=",",
inferSchema="true",
header="true")
##
## Número de registros cargados
##
spark_df.count()
[6]:
30000
[7]:
##
## Tipos de datos de los campos del DataFrame
##
spark_df.printSchema()
root
|-- gradyear: integer (nullable = true)
|-- gender: string (nullable = true)
|-- age: double (nullable = true)
|-- friends: integer (nullable = true)
|-- basketball: integer (nullable = true)
|-- football: integer (nullable = true)
|-- soccer: integer (nullable = true)
|-- softball: integer (nullable = true)
|-- volleyball: integer (nullable = true)
|-- swimming: integer (nullable = true)
|-- cheerleading: integer (nullable = true)
|-- baseball: integer (nullable = true)
|-- tennis: integer (nullable = true)
|-- sports: integer (nullable = true)
|-- cute: integer (nullable = true)
|-- sex: integer (nullable = true)
|-- sexy: integer (nullable = true)
|-- hot: integer (nullable = true)
|-- kissed: integer (nullable = true)
|-- dance: integer (nullable = true)
|-- band: integer (nullable = true)
|-- marching: integer (nullable = true)
|-- music: integer (nullable = true)
|-- rock: integer (nullable = true)
|-- god: integer (nullable = true)
|-- church: integer (nullable = true)
|-- jesus: integer (nullable = true)
|-- bible: integer (nullable = true)
|-- hair: integer (nullable = true)
|-- dress: integer (nullable = true)
|-- blonde: integer (nullable = true)
|-- mall: integer (nullable = true)
|-- shopping: integer (nullable = true)
|-- clothes: integer (nullable = true)
|-- hollister: integer (nullable = true)
|-- abercrombie: integer (nullable = true)
|-- die: integer (nullable = true)
|-- death: integer (nullable = true)
|-- drunk: integer (nullable = true)
|-- drugs: integer (nullable = true)
Análisis exploratorio#
A continuación se ejemplifican algunos cómputos típicos para el análisis exploratorio.
[8]:
##
## Conteo por género
##
spark_df.groupBy('gender').count().toPandas()
[8]:
gender | count | |
---|---|---|
0 | F | 22054 |
1 | NA | 2724 |
2 | M | 5222 |
[9]:
##
## Se analizan los rangos de las variables
## para determinar si hay datos por fuera de sus
## rangos válidos. La variable `edad` contiene
## datos por fuera de la población de interés.
##
## La muestra contiene un rango de edades
## por fuera de la población de interés
##
spark_df.select('age').describe().toPandas()
[9]:
summary | age | |
---|---|---|
0 | count | 24914 |
1 | mean | 17.993949546439772 |
2 | stddev | 7.858054477853863 |
3 | min | 3.086 |
4 | max | 106.927 |
[10]:
##
## Se imprimen algunos registros para ver los
## valores del campo edad
##
spark_df.select('age').show()
+------+
| age|
+------+
|18.982|
|18.801|
|18.335|
|18.875|
|18.995|
| null|
| 18.93|
|18.322|
|19.055|
|18.708|
|18.543|
|19.463|
|18.097|
| null|
|18.398|
| null|
| null|
|18.987|
|17.158|
|18.497|
+------+
only showing top 20 rows
[11]:
##
## Cantidad de nulos en la columna age
##
spark_df.filter(spark_df['age'].isNull()).count()
[11]:
5086
[12]:
##
## Se seleccionan las personas entre 13 y 20 años y
## se descartan las demás observaciones
##
from pyspark.sql.functions import lit
from pyspark.sql.functions import when
from pyspark.sql.types import DoubleType
##
## Se agrega una columna con las edades entre 13 y 19,
## reemplazando por null los valores por fuera de este
## rango
##
spark_df = spark_df.withColumn(
'age1319',
when((spark_df['age'] >= 13) & (spark_df['age'] < 20),
spark_df['age']
).otherwise(lit(None).cast(DoubleType())))
##
## Se verifican los valores en la nueva columna
##
spark_df.select('age1319').describe().toPandas()
[12]:
summary | age1319 | |
---|---|---|
0 | count | 24477 |
1 | mean | 17.25242893328433 |
2 | stddev | 1.1574649278955391 |
3 | min | 13.027 |
4 | max | 19.995 |
[13]:
##
## Se calcula la edad promedio por año
## de graduación para la muestra en el
## rango de edades considerado
##
age_df = spark_df.groupby("gradyear").mean().select(['gradyear', 'avg(age1319)']).orderBy('gradyear')
age_df.show()
+--------+------------------+
|gradyear| avg(age1319)|
+--------+------------------+
| 2006|18.655857950872626|
| 2007| 17.70617237497992|
| 2008|16.767700737100785|
| 2009|15.819573344509596|
+--------+------------------+
Entrenamiento del modelo#
Nombres de las columnas de interés#
[14]:
##
## Se obtienen las columnas de las
## características de interes
##
inputCols = [a for a,_ in spark_df.dtypes]
inputCols = inputCols[3:-1]
inputCols
[14]:
['friends',
'basketball',
'football',
'soccer',
'softball',
'volleyball',
'swimming',
'cheerleading',
'baseball',
'tennis',
'sports',
'cute',
'sex',
'sexy',
'hot',
'kissed',
'dance',
'band',
'marching',
'music',
'rock',
'god',
'church',
'jesus',
'bible',
'hair',
'dress',
'blonde',
'mall',
'shopping',
'clothes',
'hollister',
'abercrombie',
'die',
'death',
'drunk',
'drugs']
#### Ensamble de las columnas usadas en el clustering
Se ensamblan las columnas en una lista por cada registro del DataFrame.
[15]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(
inputCols = inputCols,
outputCol = 'rawFeatures')
spark_df = vectorAssembler.transform(spark_df)
spark_df.select('rawFeatures').show()
+--------------------+
| rawFeatures|
+--------------------+
|(37,[0,16],[7.0,1...|
|(37,[2,11,19,20,2...|
|(37,[0,2,17,19,34...|
|(37,[11,20],[1.0,...|
|(37,[0,12,15,16,1...|
|(37,[0,12,18,19,2...|
|(37,[0,17,18,28,3...|
|(37,[0,4,12,13,19...|
| (37,[0],[52.0])|
|(37,[0,11,14,19,2...|
| (37,[0],[8.0])|
|(37,[0,2,22],[21....|
|(37,[0,12,19],[87...|
| (37,[],[])|
|(37,[16,19,21,26,...|
| (37,[],[])|
|(37,[0,20,21],[13...|
|(37,[0,3,10,19],[...|
|(37,[0,1,2,10,22]...|
|(37,[0,7,19,21,28...|
+--------------------+
only showing top 20 rows
Escalamiento de los datos#
Se procede al escalamiento de los datos relevantes para el agrupamiento.
[16]:
from pyspark.ml.feature import MinMaxScaler
##
## Se construye el modelo y se entrena
##
scalerModel = MinMaxScaler(inputCol="rawFeatures", outputCol="features").fit(spark_df)
##
## Se aplica al DataFrame original para escalar los datos
##
spark_df = scalerModel.transform(spark_df)
##
## Se imprimen los datos escalados
##
spark_df.select(['features']).show()
+--------------------+
| features|
+--------------------+
|[0.00843373493975...|
|[0.0,0.0,0.066666...|
|[0.08313253012048...|
|[0.0,0.0,0.0,0.0,...|
|[0.01204819277108...|
|[0.17108433734939...|
|[0.08674698795180...|
|[0.02048192771084...|
|[0.06265060240963...|
|[0.04698795180722...|
|[0.00963855421686...|
|[0.02530120481927...|
|[0.10481927710843...|
|[0.0,0.0,0.0,0.0,...|
|[0.0,0.0,0.0,0.0,...|
|[0.0,0.0,0.0,0.0,...|
|[0.16265060240963...|
|[0.03132530120481...|
|[0.03253012048192...|
|[0.14819277108433...|
+--------------------+
only showing top 20 rows
#### Determinación de los grupos
[17]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
##
## Se realiza el agrupamiento. SetK() indica la
## cantidad de grupos para los que deben obtenerse
## sus centros.
##
model = KMeans().setK(5).setSeed(1).fit(spark_df)
spark_df = model.transform(spark_df)
silhouette = ClusteringEvaluator().evaluate(spark_df)
print("Silhouette with squared euclidean distance = " + str(silhouette))
Silhouette with squared euclidean distance = 0.4111397632582284
[18]:
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
print(center)
Cluster Centers:
[0.03319446 0.00894352 0.01386418 0.00704525 0.00718048 0.00752434
0.00329701 0.00325638 0.00601355 0.00510883 0.00979218 0.01168149
0.00109098 0.00634891 0.00791208 0.00176525 0.01047724 0.00405721
0.00361525 0.00989082 0.00927761 0.00515989 0.00441119 0.00343609
0.0017008 0.00643388 0.00628235 0.00016319 0.0098997 0.01184277
0. 0.0031591 0.00252869 0.00657909 0.00662576 0.
0.00188643]
[0.04732319 0.01824388 0.03681733 0.00983469 0.01404121 0.01513317
0.00610534 0.32093534 0.01156427 0.00508475 0.01400659 0.03185813
0.00263901 0.01129944 0.02429379 0.00727944 0.02231638 0.00395908
0.00218285 0.01167461 0.01715093 0.00770579 0.00834617 0.00386064
0.00243965 0.01786532 0.01647834 0.00088547 0.03495763 0.06471495
0.02683616 0.02354049 0.01977401 0.00866718 0.0090799 0.01059322
0.00512006]
[0.05157551 0.01875615 0.02536873 0.01379875 0.01931286 0.02498946
0.00851651 0.00940675 0.00704277 0.0094002 0.01499508 0.04242871
0.00224603 0.01052114 0.0319469 0.00502609 0.02823009 0.00519353
0.00362027 0.0150212 0.01591516 0.00712072 0.01041164 0.00486726
0.00182355 0.01958862 0.04018355 0.00048533 0.08043265 0.15253419
0.01924779 0.02713864 0.02297198 0.01016358 0.01093552 0.00567847
0.00431416]
[0.03767757 0.01132303 0.01799075 0.00882724 0.00908686 0.00860205
0.00518651 0.00525431 0.00669924 0.00592686 0.01718159 0.02360936
0.00719564 0.01492224 0.01664565 0.01581143 0.01784363 0.00648668
0.00338186 0.01583189 0.01948598 0.00930611 0.00600424 0.00405633
0.00298063 0.03004328 0.01800476 0.00117619 0.02690206 0.02361573
0.01875788 0.00735603 0.00732976 0.02117964 0.01657359 0.17946091
0.02088588]
[0.03914672 0.01744742 0.02414277 0.01028256 0.01484647 0.01313849
0.00653796 0.00662843 0.00901052 0.00724028 0.01889739 0.03089016
0.00402871 0.01155726 0.0214914 0.01281071 0.02217973 0.00666898
0.00500608 0.01794336 0.01884731 0.00778856 0.00869981 0.00452518
0.00330262 0.02917679 0.02183981 0.00053444 0.03384321 0.04029202
0.16935946 0.016826 0.01281071 0.01303668 0.01193663 0.00707457
0.00803059]
[19]:
spark_df.select('prediction').show()
+----------+
|prediction|
+----------+
| 0|
| 2|
| 0|
| 0|
| 3|
| 3|
| 0|
| 0|
| 0|
| 0|
| 0|
| 0|
| 0|
| 0|
| 4|
| 0|
| 0|
| 0|
| 0|
| 2|
+----------+
only showing top 20 rows
Análisis del modelo#
[20]:
##
## Número de patrones asignados a cada cluster
##
spark_df.groupBy('prediction').count().toPandas()
[20]:
prediction | count | |
---|---|---|
0 | 1 | 708 |
1 | 3 | 1592 |
2 | 4 | 2607 |
3 | 2 | 3361 |
4 | 0 | 21732 |
[21]:
##
## clusters a los que pertenecen los primeros patrones
##
spark_df.select(["prediction", "gender", "age", "friends"]).show()
+----------+------+------+-------+
|prediction|gender| age|friends|
+----------+------+------+-------+
| 0| M|18.982| 7|
| 2| F|18.801| 0|
| 0| M|18.335| 69|
| 0| F|18.875| 0|
| 3| NA|18.995| 10|
| 3| F| null| 142|
| 0| F| 18.93| 72|
| 0| M|18.322| 17|
| 0| F|19.055| 52|
| 0| F|18.708| 39|
| 0| F|18.543| 8|
| 0| F|19.463| 21|
| 0| F|18.097| 87|
| 0| NA| null| 0|
| 4| F|18.398| 0|
| 0| NA| null| 0|
| 0| NA| null| 135|
| 0| F|18.987| 26|
| 0| F|17.158| 27|
| 2| F|18.497| 123|
+----------+------+------+-------+
only showing top 20 rows
[22]:
spark_df.join(age_df, spark_df.gradyear == age_df.gradyear)
spark_df.printSchema()
root
|-- gradyear: integer (nullable = true)
|-- gender: string (nullable = true)
|-- age: double (nullable = true)
|-- friends: integer (nullable = true)
|-- basketball: integer (nullable = true)
|-- football: integer (nullable = true)
|-- soccer: integer (nullable = true)
|-- softball: integer (nullable = true)
|-- volleyball: integer (nullable = true)
|-- swimming: integer (nullable = true)
|-- cheerleading: integer (nullable = true)
|-- baseball: integer (nullable = true)
|-- tennis: integer (nullable = true)
|-- sports: integer (nullable = true)
|-- cute: integer (nullable = true)
|-- sex: integer (nullable = true)
|-- sexy: integer (nullable = true)
|-- hot: integer (nullable = true)
|-- kissed: integer (nullable = true)
|-- dance: integer (nullable = true)
|-- band: integer (nullable = true)
|-- marching: integer (nullable = true)
|-- music: integer (nullable = true)
|-- rock: integer (nullable = true)
|-- god: integer (nullable = true)
|-- church: integer (nullable = true)
|-- jesus: integer (nullable = true)
|-- bible: integer (nullable = true)
|-- hair: integer (nullable = true)
|-- dress: integer (nullable = true)
|-- blonde: integer (nullable = true)
|-- mall: integer (nullable = true)
|-- shopping: integer (nullable = true)
|-- clothes: integer (nullable = true)
|-- hollister: integer (nullable = true)
|-- abercrombie: integer (nullable = true)
|-- die: integer (nullable = true)
|-- death: integer (nullable = true)
|-- drunk: integer (nullable = true)
|-- drugs: integer (nullable = true)
|-- age1319: double (nullable = true)
|-- rawFeatures: vector (nullable = true)
|-- features: vector (nullable = true)
|-- prediction: integer (nullable = false)
[23]:
##
## Características demográficas de los clusters.
## Edad por cluster.
##
spark_df.groupby("prediction").mean().select(['prediction', 'avg(age1319)']).show()
+----------+------------------+
|prediction| avg(age1319)|
+----------+------------------+
| 1|16.982795681063113|
| 3|17.393787091988127|
| 4|17.115304945054945|
| 2|17.101725897255488|
| 0|17.292400662819276|
+----------+------------------+
[24]:
##
## Se agrega una columna indicadora para el genero
##
spark_df = spark_df.withColumn(
'isFemale',
when(spark_df['gender'] == 'F', 1).otherwise(0))
[25]:
##
## Cantidad de mujeres por cluster.
##
spark_df.groupby("prediction").mean().select(['prediction', 'avg(isFemale)']).show()
+----------+------------------+
|prediction| avg(isFemale)|
+----------+------------------+
| 1|0.9067796610169492|
| 3|0.7594221105527639|
| 4|0.8304564633678557|
| 2| 0.911335911930973|
| 0|0.6890760169335542|
+----------+------------------+