Conteo de palabras en Python usando Hadoop Streaming pseudo-distribuido#
Última modificación: Noviembre 05, 2019
Definición del problema#
Se desea contar la frecuencia de ocurrencia de palabras en un conjunto de documentos. Debido a los requerimientos de diseño (gran volúmen de datos y tiempos rápidos de respuesta) se desea implementar una arquitectura Big Data. Se desea implementar la solución en Python3 y ejecutar Hadoop en modo streamming.
Archivos de prueba#
A continuación se generarán tres archivos de prueba para probar el sistema. Puede usar directamente comandos del sistema operativo en el Terminal y el editor de texto pico
para crear los archivos.
[1]:
#
# Se crea el directorio de entrada
#
!rm -rf /tmp/wordcount
!mkdir -p /tmp/wordcount/input
%cd /tmp/wordcount
!ls
/tmp/wordcount
input
[2]:
%%writefile input/text0.txt
Analytics is the discovery, interpretation, and communication of meaningful patterns
in data. Especially valuable in areas rich with recorded information, analytics relies
on the simultaneous application of statistics, computer programming and operations research
to quantify performance.
Organizations may apply analytics to business data to describe, predict, and improve business
performance. Specifically, areas within analytics include predictive analytics, prescriptive
analytics, enterprise decision management, descriptive analytics, cognitive analytics, Big
Data Analytics, retail analytics, store assortment and stock-keeping unit optimization,
marketing optimization and marketing mix modeling, web analytics, call analytics, speech
analytics, sales force sizing and optimization, price and promotion modeling, predictive
science, credit risk analysis, and fraud analytics. Since analytics can require extensive
computation (see big data), the algorithms and software used for analytics harness the most
current methods in computer science, statistics, and mathematics.
Writing input/text0.txt
[3]:
%%writefile input/text1.txt
The field of data analysis. Analytics often involves studying past historical data to
research potential trends, to analyze the effects of certain decisions or events, or to
evaluate the performance of a given tool or scenario. The goal of analytics is to improve
the business by gaining knowledge which can be used to make improvements or changes.
Writing input/text1.txt
[4]:
%%writefile input/text2.txt
Data analytics (DA) is the process of examining data sets in order to draw conclusions
about the information they contain, increasingly with the aid of specialized systems
and software. Data analytics technologies and techniques are widely used in commercial
industries to enable organizations to make more-informed business decisions and by
scientists and researchers to verify or disprove scientific models, theories and
hypotheses.
Writing input/text2.txt
Prueba de la implementación fuera de Hadoop#
Este es el mismo código del tutorial “Conteo de palabras en Python usando el algortitmo MapReduce”.
[5]:
!which python3
/usr/bin/python3
[6]:
%%writefile mapper.py
#! /usr/bin/python3
import sys
if __name__ == "__main__":
for line in sys.stdin:
for word in line.split():
sys.stdout.write("{}\t1\n".format(word))
Writing mapper.py
[7]:
%%writefile reducer.py
#! /usr/bin/python3
import sys
if __name__ == '__main__':
curkey = None
total = 0
for line in sys.stdin:
key, val = line.split("\t")
val = int(val)
if key == curkey:
total += val
else:
if curkey is not None:
sys.stdout.write("{}\t{}\n".format(curkey, total))
curkey = key
total = val
sys.stdout.write("{}\t{}\n".format(curkey, total))
Writing reducer.py
[8]:
!chmod +x mapper.py reducer.py
[9]:
!ls -1
input
mapper.py
reducer.py
[10]:
!ls -1 input/
text0.txt
text1.txt
text2.txt
Paso 1. Movimiento de los datos al HDFS#
Se copian los archivos de entrada del sistema local (o del datalake) al HDFS.
[11]:
!hadoop fs -mkdir input
!hadoop fs -copyFromLocal input/* input
!hadoop fs -ls input/*
mkdir: `input': File exists
-rw-r--r-- 1 root supergroup 1093 2022-05-26 19:08 input/text0.txt
-rw-r--r-- 1 root supergroup 352 2022-05-26 19:08 input/text1.txt
-rw-r--r-- 1 root supergroup 440 2022-05-26 19:08 input/text2.txt
Vaya a la pantalla de monitoreo del NameNode (http://127.0.0.1:50070/) y ubique los archivos en el HDFS.
Comando para la gestión de archivos y directorios#
La gestión de archivos entre el sistema local y el HDFS se realiza mediante comandos similares a los del sistema operativo Unix en Terminal. A continuación se resumen los principales comandos.
hadoop fs -help
: Imprime la ayuda en pantalla para todos los comandos.hadoop fs -ls <path>
hadoop fs -mkdir <path>
hadoop fs -rmdir <path>
hadoop fs -cp <src> <dest>
hadoop fs -mv <src> <dest>
hadoop fs -rm <path>
hadoop fs -cat <path>
hadoop fs -head <path>
hadoop fs -tail <path>
hadoop fs -text <path>
. Imprime el arachivo en<path>
y lo imprime en formato texto. Soporta archivos zip, TextRecordInputStream y Avro.hadoop fs -stat <path>
: Imprime estadísticos de<path>
.
Transferencia de información entre el sistema local y el HDFS.
hadoop fs -get <src> <localdest>
/hadoop fs -copyToLocal <src> <localdest>
. Copia el contenido de<src>
en el HDFS en<localdest>
en el sistema local.hadoop fs -put <localsrc> <dest>
/hadoop fs -copyFromLocal <src> <localdest>
. Copia el contenido de<localsrc>
en el sistema local a<dest>
en el HDFS.hadoop fs -count <path>
. Cuenta el número de directorios, archivos y bytes en<path>
.hadoop fs -appendToFile <localsrc> <dest>
: pega al final de<dest>
el contenido de los archivos en<localsrc>
.
Paso 2. Ejecución#
[12]:
%%writefile app.sh
#
# Se ejecuta en Hadoop.
# -files: archivos a copiar al hdfs
# -input: archivo de entrada
# -output: directorio de salida
# -file: archivos a copiar de la maquina local al hdfs
# -maper: programa que ejecuta el map
# -reducer: programa que ejecuta la reducción
#
hdfs dfs -rm -r output
hadoop jar \
$HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-files mapper.py,reducer.py \
-input input \
-output output \
-mapper mapper.py \
-reducer reducer.py
Writing app.sh
[13]:
!bash app.sh
Deleted output
packageJobJar: [/tmp/hadoop-unjar1274556947240167022/] [] /tmp/streamjob4380793870891757123.jar tmpDir=null
[14]:
#
# Contenido del directorio con los resultados de la corrida
#
!hadoop fs -ls output
Found 2 items
-rw-r--r-- 1 root supergroup 0 2022-05-26 19:08 output/_SUCCESS
-rw-r--r-- 1 root supergroup 14300 2022-05-26 19:08 output/part-00000
[15]:
#
# Se visualiza el archivo con los resultados de la corrida
#
!hadoop fs -cat output/part-00000 | head
"*" 19
"AS 5
"License"); 5
"alice,bob 19
"clumping" 1
"full_queue_name" 1
"priority". 1
"workflowId" 1
"kerberos". 1
"simple" 1
Paso 3. Movimiento de los resultados al sistema local#
[16]:
!hadoop fs -copyToLocal output output
!ls -1 output/*
output/_SUCCESS
output/part-00000
Paso 4. Limpieza del hdfs#
[17]:
#
# Se elimina el directorio de salida en el hdfs si existe
#
!hadoop fs -rm -r input/*
!hadoop fs -rm -r output/*
!hadoop fs -rmdir input output
Deleted input/text0.txt
Deleted input/text1.txt
Deleted input/text2.txt
Deleted output/_SUCCESS
Deleted output/part-00000
rmdir: `input': Directory is not empty
Limpieza de la máquina local#
[18]:
!rm reducer.py mapper.py
!rm -rf input output
Notas.#
Combiners.– Los combiners son reducers que se ejecutan sobre los resultdos que produce cada mapper antes de pasar al modulo de suffle-&-sort, con el fin de reducir la carga computacional. Suelen ser identicos a los reducers. Una llamada típica sería:
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-input input \
-output output \
-mapper mapper.py \
-reducer reducer.py \
-combiner combiner.py
Partitioners.– Son rutinas que controlan como se enviar las parejas (clave, valor) a cada reducers, tal que elementos con la misma clave son enviados al mismo reducer.
Job Chain.– Se refiere al encadenamiento de varias tareas cuando el cómputo que se desea realizar es muy complejo para que pueda realizarse en un MapReduce.