Conteo de palabras en Hadoop y carga de resultados a MariaDB#
Última modificación: Mayo 26, 2022
Archivos de datos#
[1]:
!rm -rf /tmp/wordcount
!mkdir -p /tmp/wordcount/input
%cd /tmp/wordcount
!ls
/tmp/wordcount
input
[2]:
%%writefile input/text0.txt
Analytics is the discovery, interpretation, and communication of meaningful patterns
in data. Especially valuable in areas rich with recorded information, analytics relies
on the simultaneous application of statistics, computer programming and operations research
to quantify performance.
Organizations may apply analytics to business data to describe, predict, and improve business
performance. Specifically, areas within analytics include predictive analytics, prescriptive
analytics, enterprise decision management, descriptive analytics, cognitive analytics, Big
Data Analytics, retail analytics, store assortment and stock-keeping unit optimization,
marketing optimization and marketing mix modeling, web analytics, call analytics, speech
analytics, sales force sizing and optimization, price and promotion modeling, predictive
science, credit risk analysis, and fraud analytics. Since analytics can require extensive
computation (see big data), the algorithms and software used for analytics harness the most
current methods in computer science, statistics, and mathematics.
Writing input/text0.txt
[3]:
%%writefile input/text1.txt
The field of data analysis. Analytics often involves studying past historical data to
research potential trends, to analyze the effects of certain decisions or events, or to
evaluate the performance of a given tool or scenario. The goal of analytics is to improve
the business by gaining knowledge which can be used to make improvements or changes.
Writing input/text1.txt
[4]:
%%writefile input/text2.txt
Data analytics (DA) is the process of examining data sets in order to draw conclusions
about the information they contain, increasingly with the aid of specialized systems
and software. Data analytics technologies and techniques are widely used in commercial
industries to enable organizations to make more-informed business decisions and by
scientists and researchers to verify or disprove scientific models, theories and
hypotheses.
Writing input/text2.txt
Movimiento de datos al HDFS#
[5]:
!hadoop fs -rm -r input
!hadoop fs -mkdir input
!hadoop fs -copyFromLocal input/* input
!hadoop fs -ls input/*
Deleted input
-rw-r--r-- 1 root supergroup 1093 2022-05-26 19:52 input/text0.txt
-rw-r--r-- 1 root supergroup 352 2022-05-26 19:52 input/text1.txt
-rw-r--r-- 1 root supergroup 440 2022-05-26 19:52 input/text2.txt
Importante:
Se supone que los datos para ejecutar la aplicación siempre estarán ubicados en el HDFS. Es decir, la aplicación no es responsable de hacer la ingesta al HDFS.
Parte 1: Creación del mapper y reducer#
[6]:
%%writefile mapper.py
#! /usr/bin/python3
import sys
if __name__ == "__main__":
for line in sys.stdin:
for word in line.split():
sys.stdout.write("{}\t1\n".format(word))
Writing mapper.py
[7]:
%%writefile reducer.py
#! /usr/bin/python3
import sys
if __name__ == "__main__":
curkey = None
total = 0
for line in sys.stdin:
key, val = line.split("\t")
val = int(val)
if key == curkey:
total += val
else:
if curkey is not None:
sys.stdout.write("{}\t{}\n".format(curkey, total))
curkey = key
total = val
sys.stdout.write("{}\t{}\n".format(curkey, total))
Writing reducer.py
[8]:
!chmod +x mapper.py reducer.py
[9]:
ls - 1
ls: cannot access '-': No such file or directory
ls: cannot access '1': No such file or directory
Paso 2: Aplicación en Hadoop#
[10]:
%%writefile hadoop_app.sh
#
# Se ejecuta en Hadoop.
# -files: archivos a copiar al hdfs
# -input: archivo de entrada
# -output: directorio de salida
# -file: archivos a copiar de la maquina local al hdfs
# -maper: programa que ejecuta el map
# -reducer: programa que ejecuta la reducción
#
if hdfs dfs -test -d output; then
hdfs dfs -rm -r output
fi
hadoop jar \
/opt/hadoop/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-files mapper.py,reducer.py \
-input input \
-output output \
-mapper mapper.py \
-reducer reducer.py
rm -rf output
hdfs dfs -copyToLocal output output
Writing hadoop_app.sh
Paso 3: Aplicación para llenar la base de datos#
[11]:
%%writefile populate_db.py
import fileinput
import glob
import pandas as pd
import mariadb
def run():
conn = create_connection()
create_table(conn)
populate_table(conn)
conn.close()
def populate_table(conn):
cur = conn.cursor()
files = glob.glob("output/*")
with fileinput.input(files=files) as f:
for line in f:
row = line.replace("\n", "").split("\t")
sql = "INSERT INTO words VALUES (%s,%d)"
cur.execute(sql, tuple(row))
conn.commit()
def create_table(conn):
cur = conn.cursor()
cur.execute("DROP DATABASE IF EXISTS wordcount;")
cur.execute("CREATE DATABASE wordcount;")
cur.execute("USE wordcount;")
cur.execute("DROP TABLE IF EXISTS words;")
cur.execute(
"""
CREATE TABLE words (
word VARCHAR(20),
frequency INT
);
"""
)
conn.commit()
def create_connection():
return mariadb.connect(
user="root",
password="",
)
if __name__ == "__main__":
run()
Writing populate_db.py
Coordinador#
[12]:
%%writefile my_program.sh
bash hadoop_app.sh \
&& hdfs dfs -ls output/ \
&& python3 populate_db.py
Writing my_program.sh
[13]:
!bash my_program.sh
Deleted output
packageJobJar: [/tmp/hadoop-unjar1956316352292756910/] [] /tmp/streamjob5088729400220191662.jar tmpDir=null
Found 2 items
-rw-r--r-- 1 root supergroup 0 2022-05-26 19:52 output/_SUCCESS
-rw-r--r-- 1 root supergroup 1649 2022-05-26 19:52 output/part-00000
Verificación#
[14]:
!mariadb -u root -e "USE wordcount; SELECT * FROM words LIMIT 5;"
+------------+-----------+
| word | frequency |
+------------+-----------+
| (DA) | 1 |
| (see | 1 |
| Analytics | 2 |
| Analytics, | 1 |
| Big | 1 |
+------------+-----------+