Conteo de palabras en Python usando Hadoop Streaming (Python avanzado)#
Última modificación: Noviembre 03, 2019
Definición del problema#
Se desea contar la frecuencia de ocurrencia de palabras en un conjunto de documentos. Debido a los requerimientos de diseño (gran volúmen de datos y tiempos rápidos de respuesta) se desea implementar una arquitectura Big Data. Se desea implementar una solución computacional eficiente en Python.
A continuación se generarán tres archivos de prueba para probar el sistema.
[1]:
#
# Se crea el directorio de entrada
#
!rm -rf /tmp/wordcount
!mkdir -p /tmp/wordcount/input
%cd /tmp/wordcount
!ls
/tmp/wordcount
input
[2]:
%%writefile input/text0.txt
Analytics is the discovery, interpretation, and communication of meaningful patterns
in data. Especially valuable in areas rich with recorded information, analytics relies
on the simultaneous application of statistics, computer programming and operations research
to quantify performance.
Organizations may apply analytics to business data to describe, predict, and improve business
performance. Specifically, areas within analytics include predictive analytics, prescriptive
analytics, enterprise decision management, descriptive analytics, cognitive analytics, Big
Data Analytics, retail analytics, store assortment and stock-keeping unit optimization,
marketing optimization and marketing mix modeling, web analytics, call analytics, speech
analytics, sales force sizing and optimization, price and promotion modeling, predictive
science, credit risk analysis, and fraud analytics. Since analytics can require extensive
computation (see big data), the algorithms and software used for analytics harness the most
current methods in computer science, statistics, and mathematics.
Writing input/text0.txt
[3]:
%%writefile input/text1.txt
The field of data analysis. Analytics often involves studying past historical data to
research potential trends, to analyze the effects of certain decisions or events, or to
evaluate the performance of a given tool or scenario. The goal of analytics is to improve
the business by gaining knowledge which can be used to make improvements or changes.
Writing input/text1.txt
[4]:
%%writefile input/text2.txt
Data analytics (DA) is the process of examining data sets in order to draw conclusions
about the information they contain, increasingly with the aid of specialized systems
and software. Data analytics technologies and techniques are widely used in commercial
industries to enable organizations to make more-informed business decisions and by
scientists and researchers to verify or disprove scientific models, theories and
hypotheses.
Writing input/text2.txt
Paso 1 — Implementación del mapper#
[5]:
%%writefile mapper.py
#! /usr/bin/python3
#
# Esta es la funcion que mapea la entrada a parejas (clave, valor)
#
import sys
#
# Se usa una clase iterable para implementar el mapper.
#
class Mapper:
def __init__(self, stream):
#
# almacena el flujo de entrada como una variable del objeto
#
self.stream = stream
def emit(self, key, value):
#
# escribe al flujo estandar de salida
#
sys.stdout.write("{}\t{}\n".format(key, value))
def status(self, message):
#
# imprime un reporte en el flujo de error
# no se debe usar el stdout, ya que en este
# unicamente deben ir las parejas (key, value)
#
sys.stderr.write('reporter:status:{}\n'.format(message))
def counter(self, counter, amount=1, group="ApplicationCounter"):
#
# imprime el valor del contador
#
sys.stderr.write('reporter:counter:{},{},{}\n'.format(group, counter, amount))
def map(self):
word_counter = 0
#
# imprime un mensaje a la entrada
#
self.status('Iniciando procesamiento ')
for word in self:
#
# cuenta la cantidad de palabras procesadas
#
word_counter += 1
#
# por cada palabra del flujo de datos
# emite la pareja (word, 1)
#
self.emit(key=word, value=1)
#
# imprime un mensaje a la salida
#
self.counter('num_words', amount=word_counter)
self.status('Finalizadno procesamiento ')
def __iter__(self):
#
# itera sobre cada linea de codigo recibida
# a traves del flujo de entrada
#
for line in self.stream:
#
# itera sobre cada palabra de la linea
# (en los ciclos for, retorna las palabras
# una a una)
#
for word in line.split():
#
# retorna la palabra siguiente en el ciclo for
#
yield word
if __name__ == "__main__":
#
# inicializa el objeto con el flujo de entrada
#
mapper = Mapper(sys.stdin)
#
# ejecuta el mapper
#
mapper.map()
Writing mapper.py
Paso 2 — Verificación#
[6]:
#
# El programa anterior se hace ejecutable
#
!chmod +x /tmp/mapper.py
[7]:
#
# la salida de la función anterior es:
#
!cat input/text*.txt | python3 mapper.py | head
reporter:status:Iniciando procesamiento
reporter:counter:ApplicationCounter,num_words,252
reporter:status:Finalizadno procesamiento
Analytics 1
is 1
the 1
discovery, 1
interpretation, 1
and 1
communication 1
of 1
meaningful 1
patterns 1
Paso 3 —- Implementación del Reducer#
El reducer recibe las parejas (key, value) a través del flujo de salida. En los ejemplos anteriores, el reducer verifica si la clave cambia de un elemento al siguiente. Sin embargo, resulta más eficiente que se pueda iterar directamente sobre elementos consecutivos que tienen la misma clave. La función groupby
de la librería itertools
permite hacer esto. Dicha función recibe como argumentos los datos y una función que genera la clave para cada dato. Retorna una tupla con la clave y los
elementos consecutivos que contienen la misma clave. El siguiente ejemplo permite clarificar su operación.
[8]:
import itertools
#
# la letra es la clave y los números son los valores
#
data = [('A', 1), ('B', 10), ('A', 2), ('A', 3), ('A', 4) , ('B', 20)]
#
# retorna la parte correspondiente a la clave
#
def keyfun(x):
k, v = x
return k
#
# itera sobre la clave y los elementos que contiene la misma clave
#
for key, group in itertools.groupby(data, keyfun):
print(key)
for g in group:
print(' ', g)
A
('A', 1)
B
('B', 10)
A
('A', 2)
('A', 3)
('A', 4)
B
('B', 20)
A continuación se modifica el reducer para incoporar el uso de clases y de la función groupby
.
[9]:
%%writefile reducer.py
#! /usr/bin/python3
import sys
import itertools
class Reducer:
def __init__(self, stream):
self.stream = stream
def emit(self, key, value):
sys.stdout.write("{}\t{}\n".format(key, value))
def reduce(self):
#
# Esta funcion reduce los elementos que
# tienen la misma clave
#
for key, group in itertools.groupby(self, lambda x: x[0]):
total = 0
for _, val in group:
total += val
self.emit(key=key, value=total)
def __iter__(self):
for line in self.stream:
#
# Lee el stream de datos y lo parte
# en (clave, valor)
#
key, val = line.split("\t")
val = int(val)
#
# retorna la tupla (clave, valor)
# como el siguiente elemento del ciclo for
#
yield (key, val)
if __name__ == '__main__':
reducer = Reducer(sys.stdin)
reducer.reduce()
Writing reducer.py
Paso 4#
[10]:
#
# Se hace ejecutable el archivo
#
!chmod +x reducer.py
Paso 5#
Se prueba la implementación localmente.
[11]:
#
# La función sort hace que todos los elementos con
# la misma clave queden en lineas consecutivas.
# Hace el papel del módulo Shuffle & Sort
#
!cat input/text*.txt | python3 mapper.py | sort | python3 reducer.py | head
reporter:status:Iniciando procesamiento
reporter:counter:ApplicationCounter,num_words,252
reporter:status:Finalizadno procesamiento
(DA) 1
(see 1
Analytics 2
Analytics, 1
Big 1
Data 3
Especially 1
Organizations 1
Since 1
Specifically, 1
Paso 6 — Movimiento de los archivos al HDFS#
[12]:
!hadoop fs -mkdir input
!hadoop fs -copyFromLocal input/* input
!hadoop fs -ls input/*
-rw-r--r-- 1 root supergroup 1093 2022-05-26 02:31 input/text0.txt
-rw-r--r-- 1 root supergroup 352 2022-05-26 02:31 input/text1.txt
-rw-r--r-- 1 root supergroup 440 2022-05-26 02:31 input/text2.txt
Paso 7#
[13]:
%%writefile app.sh
#
# Se ejecuta en Hadoop.
# -input: archivo de entrada
# -output: directorio de salida
# -maper: programa que ejecuta el map
# -reducer: programa que ejecuta la reducción
#
hadoop jar \
$HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-files mapper.py,reducer.py \
-input input \
-output output \
-mapper mapper.py \
-reducer reducer.py
Writing app.sh
[14]:
!bash app.sh
packageJobJar: [/tmp/hadoop-unjar5327758262618317368/] [] /tmp/streamjob4110569942363235024.jar tmpDir=null
[15]:
#
# Contenido del directorio con los resultados de la corrida
#
!hadoop fs -ls output
Found 2 items
-rw-r--r-- 1 root supergroup 0 2022-05-26 02:32 output/_SUCCESS
-rw-r--r-- 1 root supergroup 1649 2022-05-26 02:32 output/part-00000
[16]:
#
# Contenido del directorio con los resultados de la corrida
#
!hadoop fs -ls output
Found 2 items
-rw-r--r-- 1 root supergroup 0 2022-05-26 02:32 output/_SUCCESS
-rw-r--r-- 1 root supergroup 1649 2022-05-26 02:32 output/part-00000
[17]:
#
# Se visualiza el archivo con los resultados de la corrida
#
!hadoop fs -cat output/part-00000 | head
(DA) 1
(see 1
Analytics 2
Analytics, 1
Big 1
Data 3
Especially 1
Organizations 1
Since 1
Specifically, 1
Paso 8 — Movimiento de los resultados al sistema local#
[18]:
!hadoop fs -copyToLocal output output
!ls output/*
output/_SUCCESS output/part-00000
Paso 9 — Limpieza del HDFS#
[19]:
#
# Se elimina el directorio de salida en el hdfs si existe
#
!hadoop fs -rm input/*
!hadoop fs -rm output/*
!hadoop fs -rmdir input output
Deleted input/text0.txt
Deleted input/text1.txt
Deleted input/text2.txt
Deleted output/_SUCCESS
Deleted output/part-00000
Limpieza de la máquina local#
[20]:
!rm reducer.py mapper.py
!rm -rf input output