(Deprecated) WordCount en PySpark (modo standalone)#

  • 30 min | Última modificacion: Noviembre 16, 2019

Contenedor en Docker#

  • Usando la máquina local:

docker run --rm -it -v "$PWD":/datalake  --name pyspark -p 8888:8888 jdvelasq/pyspark:2.4.4-standalone
  • Usando un volumen de docker:

docker run --rm -it -v datalake:/datalake --name pyspark  -p 8888:8888 jdvelasq/pyspark:2.4.4-standalone
  • Acceso a un contenedor corriendo:

docker exec -it pyspark bash

Definición del problema#

[1]:
#
# Se crea el directorio wordcount en la carpeta actual de trabajo
# y se escriben tres archivos en ella.
#
!mkdir -p wordcount/
[2]:
%%writefile wordcount/text0.txt
Analytics is the discovery, interpretation, and communication of meaningful patterns
in data. Especially valuable in areas rich with recorded information, analytics relies
on the simultaneous application of statistics, computer programming and operations research
to quantify performance.

Organizations may apply analytics to business data to describe, predict, and improve business
performance. Specifically, areas within analytics include predictive analytics, prescriptive
analytics, enterprise decision management, descriptive analytics, cognitive analytics, Big
Data Analytics, retail analytics, store assortment and stock-keeping unit optimization,
marketing optimization and marketing mix modeling, web analytics, call analytics, speech
analytics, sales force sizing and optimization, price and promotion modeling, predictive
science, credit risk analysis, and fraud analytics. Since analytics can require extensive
computation (see big data), the algorithms and software used for analytics harness the most
current methods in computer science, statistics, and mathematics.
Writing wordcount/text0.txt
[3]:
%%writefile wordcount/text1.txt
The field of data analysis. Analytics often involves studying past historical data to
research potential trends, to analyze the effects of certain decisions or events, or to
evaluate the performance of a given tool or scenario. The goal of analytics is to improve
the business by gaining knowledge which can be used to make improvements or changes.
Writing wordcount/text1.txt
[4]:
%%writefile wordcount/text2.txt
Data analytics (DA) is the process of examining data sets in order to draw conclusions
about the information they contain, increasingly with the aid of specialized systems
and software. Data analytics technologies and techniques are widely used in commercial
industries to enable organizations to make more-informed business decisions and by
scientists and researchers to verify or disprove scientific models, theories and
hypotheses.
Writing wordcount/text2.txt

Solución#

[5]:
#
# Se usan un directorio temporal en el HDFS. La siguiente
# instrucción muestra el contenido del dicho directorio
#
!hdfs dfs -ls /tmp
Found 1 items
drwxr-xr-x   - root root       4096 2019-11-16 11:27 /tmp/hsperfdata_root
[6]:
#
# Crea la carpeta wordcount en el hdfs
#
!hdfs dfs -mkdir /tmp/wordcount
[7]:
#
# Verifica la creación de la carpeta
#
!hdfs dfs -ls /tmp/
Found 2 items
drwxr-xr-x   - root root       4096 2019-11-16 11:28 /tmp/hsperfdata_root
drwxr-xr-x   - root root       4096 2019-11-16 11:28 /tmp/wordcount
[8]:
#
# Copia los archvios del directorio local wordcount/
# al directorio /tmp/wordcount/ en el hdfs
#
!hdfs dfs -copyFromLocal wordcount/*  /tmp/wordcount/
[9]:
#
# Verifica que los archivos esten copiados
# en el hdfs
#
!hdfs dfs -ls /tmp/wordcount
Found 3 items
-rw-r--r--   1 root root       1093 2019-11-16 11:28 /tmp/wordcount/text0.txt
-rw-r--r--   1 root root        352 2019-11-16 11:28 /tmp/wordcount/text1.txt
-rw-r--r--   1 root root        440 2019-11-16 11:28 /tmp/wordcount/text2.txt

Implementación en PySpark#

La implementación en PySpark es la siguiente.

[10]:
#
# findspark: Permite usar PySpark como una libreria de Python
#
import findspark
findspark.init()

#
# Importa las librerias requeridas para conectar
# a Python con PySpark
#
from pyspark import SparkConf, SparkContext

#
# operador de agregación (MapReduce)
#
from operator import add

#
# Nombre de la aplicación en el cluster
#
APP_NAME = "My Spark Application"

#
# Configure Spark
#
conf = SparkConf().setAppName(APP_NAME)
sc = SparkContext(conf=conf)
sc.setLogLevel('ERROR')
[13]:
#
# Lee los archivos del hdfs y los carga
# a la variable text
#
text = sc.textFile("/tmp/wordcount/*.txt")

# Se imprimen las primeras 10 líneas
text.collect()[0:10]
[13]:
['Analytics is the discovery, interpretation, and communication of meaningful patterns ',
 'in data. Especially valuable in areas rich with recorded information, analytics relies ',
 'on the simultaneous application of statistics, computer programming and operations research ',
 'to quantify performance.',
 '',
 'Organizations may apply analytics to business data to describe, predict, and improve business ',
 'performance. Specifically, areas within analytics include predictive analytics, prescriptive ',
 'analytics, enterprise decision management, descriptive analytics, cognitive analytics, Big ',
 'Data Analytics, retail analytics, store assortment and stock-keeping unit optimization, ',
 'marketing optimization and marketing mix modeling, web analytics, call analytics, speech ']
[14]:
#
# separa por palabras (split)
# con una palabra por registro
#
words = text.flatMap(lambda x: x.split())

# Se imprimen las primeras 10 palabras
words.collect()[0:10]
[14]:
['Analytics',
 'is',
 'the',
 'discovery,',
 'interpretation,',
 'and',
 'communication',
 'of',
 'meaningful',
 'patterns']
[15]:
#
# Genera las parejas <clave, valor> representandolas
# com la tupla (word, 1)
#
wc = words.map(lambda x: (x,1))
wc.collect()[0:10]
[15]:
[('Analytics', 1),
 ('is', 1),
 ('the', 1),
 ('discovery,', 1),
 ('interpretation,', 1),
 ('and', 1),
 ('communication', 1),
 ('of', 1),
 ('meaningful', 1),
 ('patterns', 1)]
[16]:
#
# Suma los valores para la misma clave.
# Spark internamente ordena por claves
#
counts = wc.reduceByKey(add)
counts.collect()[0:10]
[16]:
[('interpretation,', 1),
 ('of', 8),
 ('in', 5),
 ('data.', 1),
 ('Especially', 1),
 ('analytics', 8),
 ('simultaneous', 1),
 ('operations', 1),
 ('research', 2),
 ('quantify', 1)]
[17]:
#
# Escribe los resultados al directorio `/tmp/output`
#
counts.saveAsTextFile("/tmp/output")

Archivo de resultados#

[18]:
!hdfs dfs -ls /tmp/
Found 7 items
drwxr-xr-x   - root root       4096 2019-11-16 11:29 /tmp/blockmgr-dc0a35be-c1e7-4952-88ad-e823aebb8ef3
drwxr-xr-x   - root root       4096 2019-11-16 11:29 /tmp/hsperfdata_root
-rw-r--r--   1 root root      59545 2019-11-16 11:28 /tmp/liblz4-java2548858282861632294.so
drwxr-xr-x   - root root       4096 2019-11-16 11:29 /tmp/output
drwx------   - root root       4096 2019-11-16 11:28 /tmp/spark-5bea8c51-1e31-400b-a7a9-57118c19e7cb
drwxr-xr-x   - root root       4096 2019-11-16 11:28 /tmp/spark-d68ac211-749a-41c9-b07f-69ccfbde47ef
drwxr-xr-x   - root root       4096 2019-11-16 11:28 /tmp/wordcount
[19]:
# Archivos con los resultados. Note que se
# generan varios archivos de resultados.
!hdfs dfs -ls /tmp/output/
Found 5 items
-rw-r--r--   1 root root          0 2019-11-16 11:29 /tmp/output/_SUCCESS
-rw-r--r--   1 root root        778 2019-11-16 11:29 /tmp/output/part-00000
-rw-r--r--   1 root root        562 2019-11-16 11:29 /tmp/output/part-00001
-rw-r--r--   1 root root        510 2019-11-16 11:29 /tmp/output/part-00002
-rw-r--r--   1 root root        594 2019-11-16 11:29 /tmp/output/part-00003

El archivo /tmp/output/_SUCCESS es un archivo vacio que indica que el programa fue ejecutado correctamente.

[20]:
!hdfs dfs -cat /tmp/output/part-00000
('interpretation,', 1)
('of', 8)
('in', 5)
('data.', 1)
('Especially', 1)
('analytics', 8)
('simultaneous', 1)
('operations', 1)
('research', 2)
('quantify', 1)
('Organizations', 1)
('may', 1)
('business', 4)
('predict,', 1)
('include', 1)
('decision', 1)
('descriptive', 1)
('store', 1)
('optimization,', 2)
('modeling,', 2)
('speech', 1)
('promotion', 1)
('risk', 1)
('fraud', 1)
('Since', 1)
('algorithms', 1)
('used', 3)
('harness', 1)
('current', 1)
('field', 1)
('involves', 1)
('studying', 1)
('potential', 1)
('trends,', 1)
('performance', 1)
('goal', 1)
('changes.', 1)
('process', 1)
('draw', 1)
('specialized', 1)
('systems', 1)
('software.', 1)
('techniques', 1)
('are', 1)
('commercial', 1)
('organizations', 1)
('disprove', 1)
('scientific', 1)
('hypotheses.', 1)

Movimiento de los archivos de resultados a la máquina local#

[21]:
# crea la carpeta local para poder mover los archivos
!mkdir -p output
!hdfs dfs -copyToLocal /tmp/output/* output/
[22]:
!ls -l output/*
-rw-r--r-- 1 root root   0 Nov 16 11:29 output/_SUCCESS
-rw-r--r-- 1 root root 778 Nov 16 11:29 output/part-00000
-rw-r--r-- 1 root root 562 Nov 16 11:29 output/part-00001
-rw-r--r-- 1 root root 510 Nov 16 11:29 output/part-00002
-rw-r--r-- 1 root root 594 Nov 16 11:29 output/part-00003
[24]:
#spark.stop()

Limpieza de las carpetas de trabajo

[25]:
!rm -rf wordcount
!rm -rf output
!hdfs dfs -rm -r -f /tmp/wordcount/
!hdfs dfs -rm -r -f /tmp/output/
Deleted /tmp/wordcount
Deleted /tmp/output
[26]:
!hdfs dfs -ls /tmp
Found 5 items
drwxr-xr-x   - root root       4096 2019-11-16 11:29 /tmp/blockmgr-dc0a35be-c1e7-4952-88ad-e823aebb8ef3
drwxr-xr-x   - root root       4096 2019-11-16 11:30 /tmp/hsperfdata_root
-rw-r--r--   1 root root      59545 2019-11-16 11:28 /tmp/liblz4-java2548858282861632294.so
drwx------   - root root       4096 2019-11-16 11:28 /tmp/spark-5bea8c51-1e31-400b-a7a9-57118c19e7cb
drwxr-xr-x   - root root       4096 2019-11-16 11:28 /tmp/spark-d68ac211-749a-41c9-b07f-69ccfbde47ef