Desarrollo de Aplicaciones en PySpark#

  • 30 min | Última modificación: Junio 22, 2019

En este tutorial se describe como implementar y ejecutar una aplicación usando PySpark. Al finalizar este tutorial, el lector estará en capacidad de:

  • Describir el proceso general de desarrollo de una aplicación.

  • Gestionar los archivos de entrada y salida de la aplicación.

  • Ejecutar la aplicación en Spark.

Descripción de la aplicación#

La aplicación desarrollada será el conteo de frecuencia de palabras, desarrollado en el tutorial ‘WordCount en Spark’.

Preparación de los datos#

En este proceso, se que los datos originales se encuentran en una carpeta de la máquina local del usuario. Para este ejemplo, se crea el directorio wordcounten la carpeta actual y se crean tres archivos dentro de él.

[1]:
#
# Creación de la carpeta wordcount en la máquina local.
#
!mkdir -p wordcount/

A continuación se crean los tres archivos de prueba.

[2]:
%%writefile wordcount/text0.txt
Analytics is the discovery, interpretation, and communication of meaningful patterns
in data. Especially valuable in areas rich with recorded information, analytics relies
on the simultaneous application of statistics, computer programming and operations research
to quantify performance.

Organizations may apply analytics to business data to describe, predict, and improve business
performance. Specifically, areas within analytics include predictive analytics, prescriptive
analytics, enterprise decision management, descriptive analytics, cognitive analytics, Big
Data Analytics, retail analytics, store assortment and stock-keeping unit optimization,
marketing optimization and marketing mix modeling, web analytics, call analytics, speech
analytics, sales force sizing and optimization, price and promotion modeling, predictive
science, credit risk analysis, and fraud analytics. Since analytics can require extensive
computation (see big data), the algorithms and software used for analytics harness the most
current methods in computer science, statistics, and mathematics.
Writing wordcount/text0.txt
[3]:
%%writefile wordcount/text1.txt
The field of data analysis. Analytics often involves studying past historical data to
research potential trends, to analyze the effects of certain decisions or events, or to
evaluate the performance of a given tool or scenario. The goal of analytics is to improve
the business by gaining knowledge which can be used to make improvements or changes.
Writing wordcount/text1.txt
[4]:
%%writefile wordcount/text2.txt
Data analytics (DA) is the process of examining data sets in order to draw conclusions
about the information they contain, increasingly with the aid of specialized systems
and software. Data analytics technologies and techniques are widely used in commercial
industries to enable organizations to make more-informed business decisions and by
scientists and researchers to verify or disprove scientific models, theories and
hypotheses.
Writing wordcount/text2.txt

Copia de los datos de entrada al sistema HDFS#

En esta aplicación se supone que los datos siempre estarán en la carpeta wordcount del directorio actual de trabajo de la máquina local. El primer paso consisten en mover los archivos de la máquina local al sistema HDFS. Por ahora, este paso se hará manualmente. Se define que la aplicación usará siempre la carpeta /tmp/wordcount/ del sistema HDFS.

[5]:
#
# Se crea la carpeta /tmp/wc en el sistema HDFS.
#
!hdfs dfs -mkdir /tmp/wordcount
!hdfs dfs -mkdir /tmp/wordcount/input
[6]:
#
# Copia los archvios del directorio local wordcount/
# al directorio /tmp/wordcount/input en el hdfs
#
!hdfs dfs -copyFromLocal wordcount/* /tmp/wordcount/input/
[7]:
#
# Verifica que los archivos esten copiados
# en el hdfs
#
!hdfs dfs -ls /tmp/wordcount/input
Found 3 items
-rw-r--r--   1 root supergroup       1093 2022-05-18 17:24 /tmp/wordcount/input/text0.txt
-rw-r--r--   1 root supergroup        352 2022-05-18 17:24 /tmp/wordcount/input/text1.txt
-rw-r--r--   1 root supergroup        440 2022-05-18 17:24 /tmp/wordcount/input/text2.txt

Implementación del programa en PySpark#

El archivo wordcount.py contiene la implementación de la aplicación. El código es el siguiente:

[8]:
%%writefile wordcount.py

import findspark
from pyspark import SparkConf, SparkContext
from operator import add

APP_NAME = "wordcount-app"

findspark.init()
conf = SparkConf().setAppName(APP_NAME)
sc = SparkContext(conf=conf)

# Lee los archivos de la carpeta de entrada
text = sc.textFile("/tmp/wordcount/input/*.txt")

# Este es el algoritmo para el conteo de frecuencia
words = text.flatMap(lambda x: x.split())
wc = words.map(lambda x: (x,1))
counts = wc.reduceByKey(add)

# Escribe los resultados en la carpeta de salida.
counts.saveAsTextFile("/tmp/wordcount/output")
Writing wordcount.py

Ejecución de la aplicación#

[9]:
#
# La aplicación es ejecutada usando spark-submit,
# el cual ejecuta el programa wordcount.py en Spark
#
!spark-submit wordcount.py
22/05/18 17:24:35 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
]0;IPython: workspace/pyspark22/05/18 17:24:36 INFO spark.SparkContext: Running Spark version 3.1.3
22/05/18 17:24:36 INFO resource.ResourceUtils: ==============================================================
22/05/18 17:24:36 INFO resource.ResourceUtils: No custom resources configured for spark.driver.
22/05/18 17:24:36 INFO resource.ResourceUtils: ==============================================================
22/05/18 17:24:36 INFO spark.SparkContext: Submitted application: wordcount-app
22/05/18 17:24:36 INFO resource.ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
22/05/18 17:24:36 INFO resource.ResourceProfile: Limiting resource is cpu
22/05/18 17:24:36 INFO resource.ResourceProfileManager: Added ResourceProfile id: 0
22/05/18 17:24:36 INFO spark.SecurityManager: Changing view acls to: root
22/05/18 17:24:36 INFO spark.SecurityManager: Changing modify acls to: root
22/05/18 17:24:36 INFO spark.SecurityManager: Changing view acls groups to:
22/05/18 17:24:36 INFO spark.SecurityManager: Changing modify acls groups to:
22/05/18 17:24:36 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
22/05/18 17:24:36 INFO util.Utils: Successfully started service 'sparkDriver' on port 36305.
22/05/18 17:24:36 INFO spark.SparkEnv: Registering MapOutputTracker
22/05/18 17:24:36 INFO spark.SparkEnv: Registering BlockManagerMaster
22/05/18 17:24:36 INFO storage.BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
22/05/18 17:24:36 INFO storage.BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
22/05/18 17:24:36 INFO spark.SparkEnv: Registering BlockManagerMasterHeartbeat
22/05/18 17:24:36 INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-54ca9483-950c-4038-a4dc-1bd364951418
22/05/18 17:24:36 INFO memory.MemoryStore: MemoryStore started with capacity 366.3 MiB
22/05/18 17:24:36 INFO spark.SparkEnv: Registering OutputCommitCoordinator
22/05/18 17:24:36 INFO util.log: Logging initialized @2240ms to org.sparkproject.jetty.util.log.Slf4jLog
22/05/18 17:24:36 INFO server.Server: jetty-9.4.40.v20210413; built: 2021-04-13T20:42:42.668Z; git: b881a572662e1943a14ae12e7e1207989f218b74; jvm 1.8.0_312-8u312-b07-0ubuntu1~20.04-b07
22/05/18 17:24:36 INFO server.Server: Started @2320ms
22/05/18 17:24:37 WARN util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/05/18 17:24:37 INFO server.AbstractConnector: Started ServerConnector@3814a513{HTTP/1.1, (http/1.1)}{0.0.0.0:4041}
22/05/18 17:24:37 INFO util.Utils: Successfully started service 'SparkUI' on port 4041.
22/05/18 17:24:37 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@15bfe4c4{/jobs,null,AVAILABLE,@Spark}
22/05/18 17:24:37 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@52588fc9{/jobs/json,null,AVAILABLE,@Spark}
22/05/18 17:24:37 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@66f1af51{/jobs/job,null,AVAILABLE,@Spark}
22/05/18 17:24:37 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@2be720e5{/jobs/job/json,null,AVAILABLE,@Spark}
22/05/18 17:24:37 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@3a1a5b4c{/stages,null,AVAILABLE,@Spark}
22/05/18 17:24:37 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@54b78325{/stages/json,null,AVAILABLE,@Spark}
22/05/18 17:24:37 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@4c500d1c{/stages/stage,null,AVAILABLE,@Spark}
22/05/18 17:24:37 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@665ecf71{/stages/stage/json,null,AVAILABLE,@Spark}
22/05/18 17:24:37 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@3e4d1140{/stages/pool,null,AVAILABLE,@Spark}
22/05/18 17:24:37 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@54569fd1{/stages/pool/json,null,AVAILABLE,@Spark}
22/05/18 17:24:37 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@2fb11d69{/storage,null,AVAILABLE,@Spark}
22/05/18 17:24:37 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@146acc9b{/storage/json,null,AVAILABLE,@Spark}
22/05/18 17:24:37 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@4ce8e870{/storage/rdd,null,AVAILABLE,@Spark}
22/05/18 17:24:37 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@698f87e6{/storage/rdd/json,null,AVAILABLE,@Spark}
22/05/18 17:24:37 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@262f82da{/environment,null,AVAILABLE,@Spark}
22/05/18 17:24:37 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@43e107c{/environment/json,null,AVAILABLE,@Spark}
22/05/18 17:24:37 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@9a60124{/executors,null,AVAILABLE,@Spark}
22/05/18 17:24:37 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@341583ec{/executors/json,null,AVAILABLE,@Spark}
22/05/18 17:24:37 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@75b8817a{/executors/threadDump,null,AVAILABLE,@Spark}
22/05/18 17:24:37 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@4be30dc9{/executors/threadDump/json,null,AVAILABLE,@Spark}
22/05/18 17:24:37 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@46c4274a{/static,null,AVAILABLE,@Spark}
22/05/18 17:24:37 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@666ebb66{/,null,AVAILABLE,@Spark}
22/05/18 17:24:37 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@232e2c26{/api,null,AVAILABLE,@Spark}
22/05/18 17:24:37 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@4df4077b{/jobs/job/kill,null,AVAILABLE,@Spark}
22/05/18 17:24:37 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@176291c4{/stages/stage/kill,null,AVAILABLE,@Spark}
22/05/18 17:24:37 INFO ui.SparkUI: Bound SparkUI to 0.0.0.0, and started at http://43d9326c5b5a:4041
22/05/18 17:24:37 INFO executor.Executor: Starting executor ID driver on host 43d9326c5b5a
22/05/18 17:24:37 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 43145.
22/05/18 17:24:37 INFO netty.NettyBlockTransferService: Server created on 43d9326c5b5a:43145
22/05/18 17:24:37 INFO storage.BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
22/05/18 17:24:37 INFO storage.BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 43d9326c5b5a, 43145, None)
22/05/18 17:24:37 INFO storage.BlockManagerMasterEndpoint: Registering block manager 43d9326c5b5a:43145 with 366.3 MiB RAM, BlockManagerId(driver, 43d9326c5b5a, 43145, None)
22/05/18 17:24:37 INFO storage.BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 43d9326c5b5a, 43145, None)
22/05/18 17:24:37 INFO storage.BlockManager: Initialized BlockManager: BlockManagerId(driver, 43d9326c5b5a, 43145, None)
22/05/18 17:24:37 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@400090da{/metrics/json,null,AVAILABLE,@Spark}
22/05/18 17:24:37 INFO memory.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 418.9 KiB, free 365.9 MiB)
22/05/18 17:24:38 INFO memory.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 37.9 KiB, free 365.9 MiB)
22/05/18 17:24:38 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on 43d9326c5b5a:43145 (size: 37.9 KiB, free: 366.3 MiB)
22/05/18 17:24:38 INFO spark.SparkContext: Created broadcast 0 from textFile at NativeMethodAccessorImpl.java:0
22/05/18 17:24:38 INFO mapred.FileInputFormat: Total input files to process : 3
22/05/18 17:24:38 INFO Configuration.deprecation: mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir
22/05/18 17:24:38 INFO io.HadoopMapRedCommitProtocol: Using output committer class org.apache.hadoop.mapred.FileOutputCommitter
22/05/18 17:24:38 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
22/05/18 17:24:38 INFO output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
22/05/18 17:24:38 INFO spark.SparkContext: Starting job: runJob at SparkHadoopWriter.scala:83
22/05/18 17:24:38 INFO scheduler.DAGScheduler: Registering RDD 3 (reduceByKey at /workspace/pyspark/wordcount.py:18) as input to shuffle 0
22/05/18 17:24:38 INFO scheduler.DAGScheduler: Got job 0 (runJob at SparkHadoopWriter.scala:83) with 4 output partitions
22/05/18 17:24:38 INFO scheduler.DAGScheduler: Final stage: ResultStage 1 (runJob at SparkHadoopWriter.scala:83)
22/05/18 17:24:38 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
22/05/18 17:24:38 INFO scheduler.DAGScheduler: Missing parents: List(ShuffleMapStage 0)
22/05/18 17:24:38 INFO scheduler.DAGScheduler: Submitting ShuffleMapStage 0 (PairwiseRDD[3] at reduceByKey at /workspace/pyspark/wordcount.py:18), which has no missing parents
22/05/18 17:24:38 INFO memory.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 11.8 KiB, free 365.8 MiB)
22/05/18 17:24:38 INFO memory.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 7.1 KiB, free 365.8 MiB)
22/05/18 17:24:38 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on 43d9326c5b5a:43145 (size: 7.1 KiB, free: 366.3 MiB)
22/05/18 17:24:38 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1433
22/05/18 17:24:38 INFO scheduler.DAGScheduler: Submitting 4 missing tasks from ShuffleMapStage 0 (PairwiseRDD[3] at reduceByKey at /workspace/pyspark/wordcount.py:18) (first 15 tasks are for partitions Vector(0, 1, 2, 3))
22/05/18 17:24:38 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 4 tasks resource profile 0
22/05/18 17:24:38 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0) (43d9326c5b5a, executor driver, partition 0, NODE_LOCAL, 4507 bytes) taskResourceAssignments Map()
22/05/18 17:24:38 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1) (43d9326c5b5a, executor driver, partition 1, NODE_LOCAL, 4507 bytes) taskResourceAssignments Map()
22/05/18 17:24:38 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2) (43d9326c5b5a, executor driver, partition 2, NODE_LOCAL, 4507 bytes) taskResourceAssignments Map()
22/05/18 17:24:38 INFO scheduler.TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3) (43d9326c5b5a, executor driver, partition 3, NODE_LOCAL, 4507 bytes) taskResourceAssignments Map()
22/05/18 17:24:38 INFO executor.Executor: Running task 0.0 in stage 0.0 (TID 0)
22/05/18 17:24:38 INFO executor.Executor: Running task 2.0 in stage 0.0 (TID 2)
22/05/18 17:24:38 INFO executor.Executor: Running task 1.0 in stage 0.0 (TID 1)
22/05/18 17:24:38 INFO executor.Executor: Running task 3.0 in stage 0.0 (TID 3)
22/05/18 17:24:39 INFO rdd.HadoopRDD: Input split: hdfs://0.0.0.0:9000/tmp/wordcount/input/text2.txt:0+440
22/05/18 17:24:39 INFO rdd.HadoopRDD: Input split: hdfs://0.0.0.0:9000/tmp/wordcount/input/text0.txt:942+151
22/05/18 17:24:39 INFO rdd.HadoopRDD: Input split: hdfs://0.0.0.0:9000/tmp/wordcount/input/text1.txt:0+352
22/05/18 17:24:39 INFO rdd.HadoopRDD: Input split: hdfs://0.0.0.0:9000/tmp/wordcount/input/text0.txt:0+942
22/05/18 17:24:40 INFO python.PythonRunner: Times: total = 660, boot = 527, init = 132, finish = 1
22/05/18 17:24:40 INFO python.PythonRunner: Times: total = 694, boot = 514, init = 179, finish = 1
22/05/18 17:24:40 INFO python.PythonRunner: Times: total = 640, boot = 506, init = 131, finish = 3
22/05/18 17:24:40 INFO python.PythonRunner: Times: total = 576, boot = 447, init = 127, finish = 2
22/05/18 17:24:40 INFO executor.Executor: Finished task 2.0 in stage 0.0 (TID 2). 1657 bytes result sent to driver
22/05/18 17:24:40 INFO executor.Executor: Finished task 1.0 in stage 0.0 (TID 1). 1614 bytes result sent to driver
22/05/18 17:24:40 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 0). 1657 bytes result sent to driver
22/05/18 17:24:40 INFO executor.Executor: Finished task 3.0 in stage 0.0 (TID 3). 1657 bytes result sent to driver
22/05/18 17:24:40 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 1401 ms on 43d9326c5b5a (executor driver) (1/4)
22/05/18 17:24:40 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 1404 ms on 43d9326c5b5a (executor driver) (2/4)
22/05/18 17:24:40 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1424 ms on 43d9326c5b5a (executor driver) (3/4)
22/05/18 17:24:40 INFO scheduler.TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) in 1403 ms on 43d9326c5b5a (executor driver) (4/4)
22/05/18 17:24:40 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
22/05/18 17:24:40 INFO python.PythonAccumulatorV2: Connected to AccumulatorServer at host: 127.0.0.1 port: 55311
22/05/18 17:24:40 INFO scheduler.DAGScheduler: ShuffleMapStage 0 (reduceByKey at /workspace/pyspark/wordcount.py:18) finished in 1.538 s
22/05/18 17:24:40 INFO scheduler.DAGScheduler: looking for newly runnable stages
22/05/18 17:24:40 INFO scheduler.DAGScheduler: running: Set()
22/05/18 17:24:40 INFO scheduler.DAGScheduler: waiting: Set(ResultStage 1)
22/05/18 17:24:40 INFO scheduler.DAGScheduler: failed: Set()
22/05/18 17:24:40 INFO scheduler.DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[8] at saveAsTextFile at NativeMethodAccessorImpl.java:0), which has no missing parents
22/05/18 17:24:40 INFO memory.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 118.5 KiB, free 365.7 MiB)
22/05/18 17:24:40 INFO memory.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 44.8 KiB, free 365.7 MiB)
22/05/18 17:24:40 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on 43d9326c5b5a:43145 (size: 44.8 KiB, free: 366.2 MiB)
22/05/18 17:24:40 INFO spark.SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1433
22/05/18 17:24:40 INFO scheduler.DAGScheduler: Submitting 4 missing tasks from ResultStage 1 (MapPartitionsRDD[8] at saveAsTextFile at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0, 1, 2, 3))
22/05/18 17:24:40 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 4 tasks resource profile 0
22/05/18 17:24:40 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 4) (43d9326c5b5a, executor driver, partition 0, NODE_LOCAL, 4271 bytes) taskResourceAssignments Map()
22/05/18 17:24:40 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 1.0 (TID 5) (43d9326c5b5a, executor driver, partition 1, NODE_LOCAL, 4271 bytes) taskResourceAssignments Map()
22/05/18 17:24:40 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 1.0 (TID 6) (43d9326c5b5a, executor driver, partition 2, NODE_LOCAL, 4271 bytes) taskResourceAssignments Map()
22/05/18 17:24:40 INFO scheduler.TaskSetManager: Starting task 3.0 in stage 1.0 (TID 7) (43d9326c5b5a, executor driver, partition 3, NODE_LOCAL, 4271 bytes) taskResourceAssignments Map()
22/05/18 17:24:40 INFO executor.Executor: Running task 0.0 in stage 1.0 (TID 4)
22/05/18 17:24:40 INFO executor.Executor: Running task 1.0 in stage 1.0 (TID 5)
22/05/18 17:24:40 INFO executor.Executor: Running task 2.0 in stage 1.0 (TID 6)
22/05/18 17:24:40 INFO executor.Executor: Running task 3.0 in stage 1.0 (TID 7)
22/05/18 17:24:40 INFO storage.ShuffleBlockFetcherIterator: Getting 4 (813.0 B) non-empty blocks including 4 (813.0 B) local and 0 (0.0 B) host-local and 0 (0.0 B) remote blocks
22/05/18 17:24:40 INFO storage.ShuffleBlockFetcherIterator: Getting 4 (760.0 B) non-empty blocks including 4 (760.0 B) local and 0 (0.0 B) host-local and 0 (0.0 B) remote blocks
22/05/18 17:24:40 INFO storage.ShuffleBlockFetcherIterator: Getting 4 (997.0 B) non-empty blocks including 4 (997.0 B) local and 0 (0.0 B) host-local and 0 (0.0 B) remote blocks
22/05/18 17:24:40 INFO storage.ShuffleBlockFetcherIterator: Getting 4 (819.0 B) non-empty blocks including 4 (819.0 B) local and 0 (0.0 B) host-local and 0 (0.0 B) remote blocks
22/05/18 17:24:40 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 6 ms
22/05/18 17:24:40 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 13 ms
22/05/18 17:24:40 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 15 ms
22/05/18 17:24:40 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 15 ms
22/05/18 17:24:40 INFO io.HadoopMapRedCommitProtocol: Using output committer class org.apache.hadoop.mapred.FileOutputCommitter
22/05/18 17:24:40 INFO io.HadoopMapRedCommitProtocol: Using output committer class org.apache.hadoop.mapred.FileOutputCommitter
22/05/18 17:24:40 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
22/05/18 17:24:40 INFO io.HadoopMapRedCommitProtocol: Using output committer class org.apache.hadoop.mapred.FileOutputCommitter
22/05/18 17:24:40 INFO io.HadoopMapRedCommitProtocol: Using output committer class org.apache.hadoop.mapred.FileOutputCommitter
22/05/18 17:24:40 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
22/05/18 17:24:40 INFO output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
22/05/18 17:24:40 INFO output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
22/05/18 17:24:40 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
22/05/18 17:24:40 INFO output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
22/05/18 17:24:40 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
22/05/18 17:24:40 INFO output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
22/05/18 17:24:40 INFO python.PythonRunner: Times: total = 29, boot = -470, init = 499, finish = 0
22/05/18 17:24:40 INFO python.PythonRunner: Times: total = 14, boot = -464, init = 478, finish = 0
22/05/18 17:24:40 INFO python.PythonRunner: Times: total = 14, boot = -494, init = 508, finish = 0
22/05/18 17:24:40 INFO python.PythonRunner: Times: total = 14, boot = -461, init = 475, finish = 0
22/05/18 17:24:40 INFO output.FileOutputCommitter: Saved output of task 'attempt_202205181724388371454551214250831_0008_m_000001_0' to hdfs://0.0.0.0:9000/tmp/wordcount/output/_temporary/0/task_202205181724388371454551214250831_0008_m_000001
22/05/18 17:24:40 INFO output.FileOutputCommitter: Saved output of task 'attempt_202205181724388371454551214250831_0008_m_000000_0' to hdfs://0.0.0.0:9000/tmp/wordcount/output/_temporary/0/task_202205181724388371454551214250831_0008_m_000000
22/05/18 17:24:40 INFO mapred.SparkHadoopMapRedUtil: attempt_202205181724388371454551214250831_0008_m_000001_0: Committed
22/05/18 17:24:40 INFO executor.Executor: Finished task 1.0 in stage 1.0 (TID 5). 1952 bytes result sent to driver
22/05/18 17:24:40 INFO mapred.SparkHadoopMapRedUtil: attempt_202205181724388371454551214250831_0008_m_000000_0: Committed
22/05/18 17:24:40 INFO executor.Executor: Finished task 0.0 in stage 1.0 (TID 4). 1909 bytes result sent to driver
22/05/18 17:24:40 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 1.0 (TID 5) in 394 ms on 43d9326c5b5a (executor driver) (1/4)
22/05/18 17:24:40 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 1.0 (TID 4) in 414 ms on 43d9326c5b5a (executor driver) (2/4)
22/05/18 17:24:41 INFO output.FileOutputCommitter: Saved output of task 'attempt_202205181724388371454551214250831_0008_m_000003_0' to hdfs://0.0.0.0:9000/tmp/wordcount/output/_temporary/0/task_202205181724388371454551214250831_0008_m_000003
22/05/18 17:24:41 INFO mapred.SparkHadoopMapRedUtil: attempt_202205181724388371454551214250831_0008_m_000003_0: Committed
22/05/18 17:24:41 INFO output.FileOutputCommitter: Saved output of task 'attempt_202205181724388371454551214250831_0008_m_000002_0' to hdfs://0.0.0.0:9000/tmp/wordcount/output/_temporary/0/task_202205181724388371454551214250831_0008_m_000002
22/05/18 17:24:41 INFO mapred.SparkHadoopMapRedUtil: attempt_202205181724388371454551214250831_0008_m_000002_0: Committed
22/05/18 17:24:41 INFO executor.Executor: Finished task 2.0 in stage 1.0 (TID 6). 1909 bytes result sent to driver
22/05/18 17:24:41 INFO executor.Executor: Finished task 3.0 in stage 1.0 (TID 7). 1909 bytes result sent to driver
22/05/18 17:24:41 INFO scheduler.TaskSetManager: Finished task 3.0 in stage 1.0 (TID 7) in 954 ms on 43d9326c5b5a (executor driver) (3/4)
22/05/18 17:24:41 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 1.0 (TID 6) in 956 ms on 43d9326c5b5a (executor driver) (4/4)
22/05/18 17:24:41 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
22/05/18 17:24:41 INFO scheduler.DAGScheduler: ResultStage 1 (runJob at SparkHadoopWriter.scala:83) finished in 0.992 s
22/05/18 17:24:41 INFO scheduler.DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job
22/05/18 17:24:41 INFO scheduler.TaskSchedulerImpl: Killing all running tasks in stage 1: Stage finished
22/05/18 17:24:41 INFO scheduler.DAGScheduler: Job 0 finished: runJob at SparkHadoopWriter.scala:83, took 2.706868 s
22/05/18 17:24:41 INFO io.SparkHadoopWriter: Job job_202205181724388371454551214250831_0008 committed.
22/05/18 17:24:41 INFO spark.SparkContext: Invoking stop() from shutdown hook
22/05/18 17:24:41 INFO server.AbstractConnector: Stopped Spark@3814a513{HTTP/1.1, (http/1.1)}{0.0.0.0:4041}
22/05/18 17:24:41 INFO ui.SparkUI: Stopped Spark web UI at http://43d9326c5b5a:4041
22/05/18 17:24:41 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
22/05/18 17:24:41 INFO memory.MemoryStore: MemoryStore cleared
22/05/18 17:24:41 INFO storage.BlockManager: BlockManager stopped
22/05/18 17:24:41 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
22/05/18 17:24:41 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
22/05/18 17:24:41 INFO spark.SparkContext: Successfully stopped SparkContext
22/05/18 17:24:41 INFO util.ShutdownHookManager: Shutdown hook called
22/05/18 17:24:41 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-a3eb68ca-3abb-44db-b04b-acb2600fae89/pyspark-c4d72c4f-34d6-4979-961a-9c3c08659dd4
22/05/18 17:24:41 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-452e9c44-fe8c-4971-ae67-67596f9c1868
22/05/18 17:24:41 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-a3eb68ca-3abb-44db-b04b-acb2600fae89

Archivos de resultados#

La carpeta /tmp/wordcount/output contiene los resultados de la ejecución del programa.

[10]:
# Archivos con los resultados. Note que se
# generan varios archivos de resultados.
!hdfs dfs -ls /tmp/wordcount/output/
Found 5 items
-rw-r--r--   1 root supergroup          0 2022-05-18 17:24 /tmp/wordcount/output/_SUCCESS
-rw-r--r--   1 root supergroup        778 2022-05-18 17:24 /tmp/wordcount/output/part-00000
-rw-r--r--   1 root supergroup        562 2022-05-18 17:24 /tmp/wordcount/output/part-00001
-rw-r--r--   1 root supergroup        510 2022-05-18 17:24 /tmp/wordcount/output/part-00002
-rw-r--r--   1 root supergroup        594 2022-05-18 17:24 /tmp/wordcount/output/part-00003

El archivo output/_SUCCESS es un archivo vacio que indica que el programa fue ejecutado correctamente.

[11]:
!hdfs dfs -cat /tmp/wordcount/output/part-00000 | head
('interpretation,', 1)
('of', 8)
('in', 5)
('data.', 1)
('Especially', 1)
('analytics', 8)
('simultaneous', 1)
('operations', 1)
('research', 2)
('quantify', 1)

Archivo de comandos del sistema operativo#

Finalmente, se crea un script que copie los archivos y ejecute la aplicación.

[12]:
%%writefile wordcountapp
#! /bin/bash

# borra la carpeta input si existe
!hdfs dfs -rm -r -f /tmp/wordcount/input

# crea la carpeta
!hdfs dfs -mkdir /tmp/wordcount/input

# copia los archivos de entrada de la
# maquina local al sistema hdfs
!hdfs dfs -copyFromLocal wordcount/* /tmp/wordcount/input/

# ejecuta la aplicación de spark
spark-submit wordcount.py
Writing wordcountapp

Esta aplicación sería ejecutada en Terminal con el siguiente comando:

$ bash wordcountapp

Limpieza de las carpetas de trabajo

[13]:
!rm wordcountapp
!rm -rf wordcount
!hdfs dfs -rm -r -f /tmp/wordcount/
Deleted /tmp/wordcount