Conteo de palabras en Apache Hive (script)#

  • Última modificación: Mayo 17, 2022

Datos#

[21]:
!mkdir -p /tmp/wordcount/
[22]:
%%writefile /tmp/wordcount/text0.txt
Analytics is the discovery, interpretation, and communication of meaningful patterns
in data. Especially valuable in areas rich with recorded information, analytics relies
on the simultaneous application of statistics, computer programming and operations research
to quantify performance.

Organizations may apply analytics to business data to describe, predict, and improve business
performance. Specifically, areas within analytics include predictive analytics, prescriptive
analytics, enterprise decision management, descriptive analytics, cognitive analytics, Big
Data Analytics, retail analytics, store assortment and stock-keeping unit optimization,
marketing optimization and marketing mix modeling, web analytics, call analytics, speech
analytics, sales force sizing and optimization, price and promotion modeling, predictive
science, credit risk analysis, and fraud analytics. Since analytics can require extensive
computation (see big data), the algorithms and software used for analytics harness the most
current methods in computer science, statistics, and mathematics.
Overwriting /tmp/wordcount/text0.txt
[23]:
%%writefile /tmp/wordcount/text1.txt
The field of data analysis. Analytics often involves studying past historical data to
research potential trends, to analyze the effects of certain decisions or events, or to
evaluate the performance of a given tool or scenario. The goal of analytics is to improve
the business by gaining knowledge which can be used to make improvements or changes.
Overwriting /tmp/wordcount/text1.txt
[24]:
%%writefile /tmp/wordcount/text2.txt
Data analytics (DA) is the process of examining data sets in order to draw conclusions
about the information they contain, increasingly with the aid of specialized systems
and software. Data analytics technologies and techniques are widely used in commercial
industries to enable organizations to make more-informed business decisions and by
scientists and researchers to verify or disprove scientific models, theories and
hypotheses.
Overwriting /tmp/wordcount/text2.txt

Versión en productivo#

En la segunda parte, se procede a llevar el aplicativo a productivo con los siguientes cambios:

  • Los datos son leidos del sistema HDFS de Hadoop.

  • Los resultdos son guardados en una carpeta del sistema Hadoop.

  • El script se almacena en un archivo en el disco duro, para su uso posterior.

Copia de los datos al sistema HDFS#

[25]:
#
# Se usan un directorio temporal en el HDFS. La siguiente
# instrucción muestra el contenido del dicho directorio
#
!hdfs dfs -ls /tmp
Found 3 items
drwxrwx---   - root supergroup          0 2022-05-27 04:19 /tmp/hadoop-yarn
drwxrwxrwx   - root supergroup          0 2022-05-27 04:19 /tmp/hive
drwxrwxrwx   - root supergroup          0 2022-05-27 04:25 /tmp/output
[26]:
#
# Crea la carpeta wordcount en el hdfs
#
!hdfs dfs -mkdir /tmp/wordcount
[27]:
#
# Verifica la creación de la carpeta
#
!hdfs dfs -ls /tmp/
Found 4 items
drwxrwx---   - root supergroup          0 2022-05-27 04:19 /tmp/hadoop-yarn
drwxrwxrwx   - root supergroup          0 2022-05-27 04:19 /tmp/hive
drwxrwxrwx   - root supergroup          0 2022-05-27 04:25 /tmp/output
drwxr-xr-x   - root supergroup          0 2022-05-27 04:29 /tmp/wordcount
[28]:
#
# Copia los archvios del directorio local /tmp/wordcount/
# al directorio /tmp/wordcount/ en el hdfs
#
!hdfs dfs -copyFromLocal /tmp/wordcount/*  /tmp/wordcount/
[29]:
#
# Verifica que los archivos esten copiados
# en el hdfs
#
!hdfs dfs -ls /tmp/wordcount
Found 3 items
-rw-r--r--   1 root supergroup       1093 2022-05-27 04:29 /tmp/wordcount/text0.txt
-rw-r--r--   1 root supergroup        352 2022-05-27 04:29 /tmp/wordcount/text1.txt
-rw-r--r--   1 root supergroup        440 2022-05-27 04:29 /tmp/wordcount/text2.txt

Generación del script y ajuste del código#

Se realizan dos cambios. En primer lugar, se sustituye la línea

LOAD DATA LOCAL INPATH "wordcount/" OVERWRITE INTO TABLE docs;

por:

LOAD DATA INPATH "/tmp/wordcount/" OVERWRITE INTO TABLE docs;

para que Hive lea los datos del directorio /tmp/wordcount/ en el HDFS. En segundo lugar, se agrega

INSERT OVERWRITE DIRECTORY '/tmp/output'
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
SELECT * FROM word_counts;

para que los resultados sean almacenados en la carpeta /tmp/output como un archivo en formato CSV. El programa es guadado como wordcount.hql en el computador local.

[30]:
%%writefile /tmp/wordcount.hql

DROP TABLE IF EXISTS docs;
DROP TABLE IF EXISTS word_counts;

CREATE TABLE docs (line STRING);

LOAD DATA INPATH "/tmp/wordcount/" OVERWRITE INTO TABLE docs;

CREATE TABLE word_counts
AS
    SELECT word, count(1) AS count
    FROM
        (SELECT explode(split(line, '\\s')) AS word FROM docs) w
GROUP BY
    word
ORDER BY
    word;

INSERT OVERWRITE DIRECTORY '/tmp/output'
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
SELECT * FROM word_counts;

Overwriting /tmp/wordcount.hql

Ejecución#

[31]:
!hive -f /tmp/wordcount.hql

Logging initialized using configuration in jar:file:/opt/hive/lib/hive-common-2.3.9.jar!/hive-log4j2.properties Async: true
OK
Time taken: 6.613 seconds
OK
Time taken: 0.049 seconds
OK
Time taken: 0.421 seconds
Loading data to table default.docs
OK
Time taken: 0.53 seconds
Query ID = root_20220527042936_e173e339-063c-4111-ae65-3cc9d15c8d89
Total jobs = 2
Launching Job 1 out of 2
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Starting Job = job_1653625133568_0008, Tracking URL = http://10e0f132c3fa:8088/proxy/application_1653625133568_0008/
Kill Command = /opt/hadoop/bin/hadoop job  -kill job_1653625133568_0008
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2022-05-27 04:29:43,190 Stage-1 map = 0%,  reduce = 0%
2022-05-27 04:29:46,333 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 2.05 sec
2022-05-27 04:29:51,482 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 3.35 sec
MapReduce Total cumulative CPU time: 3 seconds 350 msec
Ended Job = job_1653625133568_0008
Launching Job 2 out of 2
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Starting Job = job_1653625133568_0009, Tracking URL = http://10e0f132c3fa:8088/proxy/application_1653625133568_0009/
Kill Command = /opt/hadoop/bin/hadoop job  -kill job_1653625133568_0009
Hadoop job information for Stage-2: number of mappers: 1; number of reducers: 1
2022-05-27 04:30:02,186 Stage-2 map = 0%,  reduce = 0%
2022-05-27 04:30:06,309 Stage-2 map = 100%,  reduce = 0%, Cumulative CPU 1.15 sec
2022-05-27 04:30:11,448 Stage-2 map = 100%,  reduce = 100%, Cumulative CPU 2.71 sec
MapReduce Total cumulative CPU time: 2 seconds 710 msec
Ended Job = job_1653625133568_0009
Moving data to directory hdfs://0.0.0.0:9000/user/hive/warehouse/word_counts
MapReduce Jobs Launched:
Stage-Stage-1: Map: 1  Reduce: 1   Cumulative CPU: 3.35 sec   HDFS Read: 10194 HDFS Write: 4345 SUCCESS
Stage-Stage-2: Map: 1  Reduce: 1   Cumulative CPU: 2.71 sec   HDFS Read: 9491 HDFS Write: 1731 SUCCESS
Total MapReduce CPU Time Spent: 6 seconds 60 msec
OK
Time taken: 36.562 seconds
Query ID = root_20220527043012_82339861-3111-4ce0-9d68-48576cc592a7
Total jobs = 3
Launching Job 1 out of 3
Number of reduce tasks is set to 0 since there's no reduce operator
Starting Job = job_1653625133568_0010, Tracking URL = http://10e0f132c3fa:8088/proxy/application_1653625133568_0010/
Kill Command = /opt/hadoop/bin/hadoop job  -kill job_1653625133568_0010
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 0
2022-05-27 04:30:21,678 Stage-1 map = 0%,  reduce = 0%
2022-05-27 04:30:25,783 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 1.5 sec
MapReduce Total cumulative CPU time: 1 seconds 500 msec
Ended Job = job_1653625133568_0010
Stage-3 is selected by condition resolver.
Stage-2 is filtered out by condition resolver.
Stage-4 is filtered out by condition resolver.
Moving data to directory hdfs://0.0.0.0:9000/tmp/output/.hive-staging_hive_2022-05-27_04-30-12_746_4329887753979883221-1/-ext-10000
Moving data to directory /tmp/output
MapReduce Jobs Launched:
Stage-Stage-1: Map: 1   Cumulative CPU: 1.5 sec   HDFS Read: 5028 HDFS Write: 1653 SUCCESS
Total MapReduce CPU Time Spent: 1 seconds 500 msec
OK
Time taken: 14.14 seconds

Hive on Tez#

[32]:
%%writefile /tmp/wordcount.hql

set hive.execution.engine = tez;

DROP TABLE IF EXISTS docs;
DROP TABLE IF EXISTS word_counts;

CREATE TABLE docs (line STRING);

LOAD DATA INPATH "/tmp/wordcount/" OVERWRITE INTO TABLE docs;

CREATE TABLE word_counts
AS
    SELECT word, count(1) AS count
    FROM
        (SELECT explode(split(line, '\\s')) AS word FROM docs) w
GROUP BY
    word
ORDER BY
    word;

INSERT OVERWRITE DIRECTORY '/tmp/output'
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
SELECT * FROM word_counts;
Overwriting /tmp/wordcount.hql
[37]:
!hdfs dfs -mkdir /tmp/wordcount
!hdfs dfs -copyFromLocal /tmp/wordcount/*  /tmp/wordcount/
!hive -f /tmp/wordcount.hql

Logging initialized using configuration in jar:file:/opt/hive/lib/hive-common-2.3.9.jar!/hive-log4j2.properties Async: true
OK
Time taken: 6.397 seconds
OK
Time taken: 0.049 seconds
OK
Time taken: 0.412 seconds
Loading data to table default.docs
OK
Time taken: 0.497 seconds
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/tez/runtime/api/Event
        at org.apache.hadoop.hive.ql.parse.TezCompiler.runStatsDependentOptimizations(TezCompiler.java:336)
        at org.apache.hadoop.hive.ql.parse.TezCompiler.optimizeOperatorPlan(TezCompiler.java:104)
        at org.apache.hadoop.hive.ql.parse.TaskCompiler.compile(TaskCompiler.java:140)
        at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticAnalyzer.java:11273)
        at org.apache.hadoop.hive.ql.parse.CalcitePlanner.analyzeInternal(CalcitePlanner.java:286)
        at org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:258)
        at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:512)
        at org.apache.hadoop.hive.ql.Driver.compileInternal(Driver.java:1317)
        at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1457)
        at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1237)
        at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1227)
        at org.apache.hadoop.hive.cli.CliDriver.processLocalCmd(CliDriver.java:233)
        at org.apache.hadoop.hive.cli.CliDriver.processCmd(CliDriver.java:184)
        at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:403)
        at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:336)
        at org.apache.hadoop.hive.cli.CliDriver.processReader(CliDriver.java:474)
        at org.apache.hadoop.hive.cli.CliDriver.processFile(CliDriver.java:490)
        at org.apache.hadoop.hive.cli.CliDriver.executeDriver(CliDriver.java:793)
        at org.apache.hadoop.hive.cli.CliDriver.run(CliDriver.java:759)
        at org.apache.hadoop.hive.cli.CliDriver.main(CliDriver.java:686)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.hadoop.util.RunJar.run(RunJar.java:244)
        at org.apache.hadoop.util.RunJar.main(RunJar.java:158)
Caused by: java.lang.ClassNotFoundException: org.apache.tez.runtime.api.Event
        at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
        ... 26 more
[36]:
!hdfs dfs -ls /tmp/

Found 3 items
drwxrwx---   - root supergroup          0 2022-05-27 04:19 /tmp/hadoop-yarn
drwxrwxrwx   - root supergroup          0 2022-05-27 04:19 /tmp/hive
drwxrwxrwx   - root supergroup          0 2022-05-27 04:30 /tmp/output

Hive on Spark#

[42]:
%%writefile /tmp/wordcount.hql

set hive.execution.engine = spark;

DROP TABLE IF EXISTS docs;
DROP TABLE IF EXISTS word_counts;

CREATE TABLE docs (line STRING);

LOAD DATA INPATH "/tmp/wordcount/" OVERWRITE INTO TABLE docs;

CREATE TABLE word_counts
AS
    SELECT word, count(1) AS count
    FROM
        (SELECT explode(split(line, '\\s')) AS word FROM docs) w
GROUP BY
    word
ORDER BY
    word;

INSERT OVERWRITE DIRECTORY '/tmp/output'
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
SELECT * FROM word_counts;
Overwriting /tmp/wordcount.hql
[43]:
!hdfs dfs -mkdir /tmp/wordcount
!hdfs dfs -copyFromLocal /tmp/wordcount/*  /tmp/wordcount/
!hive -f /tmp/wordcount.hql
mkdir: `/tmp/wordcount': File exists
copyFromLocal: `/tmp/wordcount/text0.txt': File exists
copyFromLocal: `/tmp/wordcount/text1.txt': File exists
copyFromLocal: `/tmp/wordcount/text2.txt': File exists

Logging initialized using configuration in jar:file:/opt/hive/lib/hive-common-2.3.9.jar!/hive-log4j2.properties Async: true
OK
Time taken: 6.46 seconds
OK
Time taken: 0.05 seconds
OK
Time taken: 0.42 seconds
Loading data to table default.docs
OK
Time taken: 0.513 seconds
Query ID = root_20220527044041_9f5e46a6-f837-42de-9047-ec329e89722a
Total jobs = 1
Launching Job 1 out of 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Starting Spark Job = c2fec351-64c0-4fbc-85f5-ee9f8946b148
Job failed with java.lang.NoSuchMethodError: org.apache.spark.api.java.JavaSparkContext.accumulator(Ljava/lang/Object;Ljava/lang/String;Lorg/apache/spark/AccumulatorParam;)Lorg/apache/spark/Accumulator;
        at org.apache.hive.spark.counter.SparkCounter.<init>(SparkCounter.java:60)
        at org.apache.hive.spark.counter.SparkCounterGroup.createCounter(SparkCounterGroup.java:52)
        at org.apache.hive.spark.counter.SparkCounters.createCounter(SparkCounters.java:71)
        at org.apache.hive.spark.counter.SparkCounters.createCounter(SparkCounters.java:67)
        at org.apache.hadoop.hive.ql.exec.spark.RemoteHiveSparkClient$JobStatusJob.call(RemoteHiveSparkClient.java:337)
        at org.apache.hive.spark.client.RemoteDriver$JobWrapper.call(RemoteDriver.java:358)
        at org.apache.hive.spark.client.RemoteDriver$JobWrapper.call(RemoteDriver.java:323)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.spark.SparkTask. java.lang.NoSuchMethodError: org.apache.spark.api.java.JavaSparkContext.accumulator(Ljava/lang/Object;Ljava/lang/String;Lorg/apache/spark/AccumulatorParam;)Lorg/apache/spark/Accumulator;
        at org.apache.hive.spark.counter.SparkCounter.<init>(SparkCounter.java:60)
        at org.apache.hive.spark.counter.SparkCounterGroup.createCounter(SparkCounterGroup.java:52)
        at org.apache.hive.spark.counter.SparkCounters.createCounter(SparkCounters.java:71)
        at org.apache.hive.spark.counter.SparkCounters.createCounter(SparkCounters.java:67)
        at org.apache.hadoop.hive.ql.exec.spark.RemoteHiveSparkClient$JobStatusJob.call(RemoteHiveSparkClient.java:337)
        at org.apache.hive.spark.client.RemoteDriver$JobWrapper.call(RemoteDriver.java:358)
        at org.apache.hive.spark.client.RemoteDriver$JobWrapper.call(RemoteDriver.java:323)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Visualización de resultados#

Los resultados quedan almacenados en la carpeta /tmp/output del sistema HDFS

[12]:
!hdfs dfs -ls /tmp/output
Found 1 items
-rwxrwxrwx   1 root supergroup       1653 2022-05-27 04:25 /tmp/output/000000_0
[13]:
!hdfs dfs -cat /tmp/output/000000_0 | head
,20
(DA),1
(see,1
Analytics,2
Analytics,,1
Big,1
Data,3
Especially,1
Organizations,1
Since,1

Copia de los resultados a la máquina local#

[14]:
!hadoop fs -copyToLocal /tmp/output /tmp/output
!ls /tmp/output/*
/tmp/output/000000_0
[15]:
!cat /tmp/output/000000_0
,20
(DA),1
(see,1
Analytics,2
Analytics,,1
Big,1
Data,3
Especially,1
Organizations,1
Since,1
Specifically,,1
The,2
a,1
about,1
aid,1
algorithms,1
analysis,,1
analysis.,1
analytics,8
analytics,,8
analytics.,1
analyze,1
and,15
application,1
apply,1
are,1
areas,2
assortment,1
be,1
big,1
business,4
by,2
call,1
can,2
certain,1
changes.,1
cognitive,1
commercial,1
communication,1
computation,1
computer,2
conclusions,1
contain,,1
credit,1
current,1
data,4
data),,1
data.,1
decision,1
decisions,2
describe,,1
descriptive,1
discovery,,1
disprove,1
draw,1
effects,1
enable,1
enterprise,1
evaluate,1
events,,1
examining,1
extensive,1
field,1
for,1
force,1
fraud,1
gaining,1
given,1
goal,1
harness,1
historical,1
hypotheses.,1
improve,2
improvements,1
in,5
include,1
increasingly,1
industries,1
information,1
information,,1
interpretation,,1
involves,1
is,3
knowledge,1
make,2
management,,1
marketing,2
mathematics.,1
may,1
meaningful,1
methods,1
mix,1
modeling,,2
models,,1
more-informed,1
most,1
of,8
often,1
on,1
operations,1
optimization,1
optimization,,2
or,5
order,1
organizations,1
past,1
patterns,1
performance,1
performance.,2
potential,1
predict,,1
predictive,2
prescriptive,1
price,1
process,1
programming,1
promotion,1
quantify,1
recorded,1
relies,1
require,1
research,2
researchers,1
retail,1
rich,1
risk,1
sales,1
scenario.,1
science,,2
scientific,1
scientists,1
sets,1
simultaneous,1
sizing,1
software,1
software.,1
specialized,1
speech,1
statistics,,2
stock-keeping,1
store,1
studying,1
systems,1
techniques,1
technologies,1
the,10
theories,1
they,1
to,12
tool,1
trends,,1
unit,1
used,3
valuable,1
verify,1
web,1
which,1
widely,1
with,2
within,1

Otra opción para extraer los resultados es usar

$ hive -S -e 'SELECT * FROM word_counts;' > result.csv

en donde el archivo result.txt se almacena localmente.

[16]:
!rm -rf *.log