Operaciones sobre RDD (resilient distributed datasets)#
Última modificación: Junio 22, 2019
Spark (mediante sus interfaces en Scala, Java, Python) permite ejecutar operaciones de transformación en paralelo sobre conjuntos de datos que se encuentran particionados sobre los nodos del cluster. Estas operaciones son usadas principalmente en tareas de extracción, transformación y preparación de datos. Los datos procesados son usados en aplicaciones de aprendizaje automático e inteligencia de negocios.
Al finalizar este tutorial, el lector estará en capacidad de:
Crear colecciones paralelizadas en Spark a partir de datos existentes en Python.
Cargar archivos de datos en diferentes formatos.
Aplicar transformaciones y acciones sobre Datasets.
Usar algoritmos simples basados en transformaciones y acciones sobre conjuntos de datos.
Archivos de datos de ejemplo para las corridas#
En este tutorial se usan archivos de texto para demostrar algunas de las características de Spark. A coninuación se crean dichos archivos de texto y se mueven al HDFS.
[1]:
# Se borra la carpeta y su contenido en la máquina local si existe
!rm -rf wordcount/
# Se crea la carpeta vacía
!mkdir -p wordcount/
[2]:
%%writefile wordcount/text0.txt
Analytics is the discovery, interpretation, and communication of meaningful patterns
in data. Especially valuable in areas rich with recorded information, analytics relies
on the simultaneous application of statistics, computer programming and operations research
to quantify performance.
Organizations may apply analytics to business data to describe, predict, and improve business
performance. Specifically, areas within analytics include predictive analytics, prescriptive
analytics, enterprise decision management, descriptive analytics, cognitive analytics, Big
Data Analytics, retail analytics, store assortment and stock-keeping unit optimization,
marketing optimization and marketing mix modeling, web analytics, call analytics, speech
analytics, sales force sizing and optimization, price and promotion modeling, predictive
science, credit risk analysis, and fraud analytics. Since analytics can require extensive
computation (see big data), the algorithms and software used for analytics harness the most
current methods in computer science, statistics, and mathematics.
Writing wordcount/text0.txt
[3]:
%%writefile wordcount/text1.txt
The field of data analysis. Analytics often involves studying past historical data to
research potential trends, to analyze the effects of certain decisions or events, or to
evaluate the performance of a given tool or scenario. The goal of analytics is to improve
the business by gaining knowledge which can be used to make improvements or changes.
Writing wordcount/text1.txt
[4]:
%%writefile wordcount/text2.txt
Data analytics (DA) is the process of examining data sets in order to draw conclusions
about the information they contain, increasingly with the aid of specialized systems
and software. Data analytics technologies and techniques are widely used in commercial
industries to enable organizations to make more-informed business decisions and by
scientists and researchers to verify or disprove scientific models, theories and
hypotheses.
Writing wordcount/text2.txt
[5]:
# crea la carpeta /tmp/wordcount en el hdfs
!hdfs dfs -mkdir /tmp/wordcount
# copia los archvios del directorio local wordcount/
# al directorio /tmp/wordcount/ en el hdfs
!hdfs dfs -copyFromLocal wordcount/* /tmp/wordcount/
Inicialización de la aplicación#
A continuación se realiza la inicialización del sistema. findspark
permite la conexión de Python con Spark.
[6]:
#
# findspark permite usar pyspark (interfaz de Python a Spark),
# desde cualquier programa escrito en Python.
#
import findspark
findspark.init()
#
# A continuación se inicializan las variables obligatorias
# requeridas para trabajar con Spark desde Python:
#
# SparkContext representa la conexión al cluster de Spark.
# SparkConf representa la configuración particular de una aplicación
# escrita en Spark.
# SparkSession representa la conexión para trabajar con SQL.
#
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
sparkConf = SparkConf().setAppName("My SparkQL Application")
sc = SparkContext(conf=sparkConf)
spark = SparkSession(sc)
Creación de colecciones paralelizadas a partir de datos nativos en Python#
Se crean mediante la función parallelize
del SparkContext
. Sobre estas colecciones se pueden aplicar operaciones en paralelo. Se crean a partir de datos nativos de Python.
[7]:
#
# crea una colección a partir de una lista
#
rdd = sc.parallelize([1, 2, 3, 4, 5])
#
# rdd contiene un objeto en memoria donde
# se almacena la lista, no la lista como tal.
#
rdd
[7]:
ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195
[8]:
#
# Retorna una lista que contiene todos los
# elementos en un RDD
#
rdd.collect()
[8]:
[1, 2, 3, 4, 5]
[9]:
#
# En el siguiente ejemplo se paraleliza un diccionario.
# collect() retorna las claves.
#
rdd = sc.parallelize({"a":1, "b":2, "c":3, "d":4})
rdd.collect()
[9]:
['a', 'b', 'c', 'd']
Lectura de archivos con formato <clave, valor>#
Spark permite la carga de datos desde el sistema local, servicios web y formato (key, value)
de Hadoop.
[10]:
#
# crea una secuencia de parejas clave-valor
#
rdd = sc.parallelize([1, 2, 3, 4, 5])
#
# Una vez se tiene un objeto RDD, es posible aplicar operaciones
# en paralelo sobre él. En la siguiente línea de código, se aplica
# la función lambda x:(x, x**2) a cada elemento del RDD, mediante
# la función map. Como resultado se genera una secuencia de pares
# [(1, 1), (2, 4), ..., (5, 25)]
#
rdd = rdd.map(lambda x:(x, x**2))
#
# Salva la secuencia al hdfs
#
rdd.saveAsSequenceFile('/tmp/out')
[11]:
#
# El archivo se almacena en /tmp/out/. Aquí se usa
# ls para listar los archivos de la carpeta /tmp/
# en el sistema hdfs. Note se creó la carpeta
# /tmp/out como resultado del paso anterior
#
!hdfs dfs -ls /tmp/out/
Found 5 items
-rw-r--r-- 1 root supergroup 0 2019-11-15 00:44 /tmp/out/_SUCCESS
-rw-r--r-- 1 root supergroup 108 2019-11-15 00:44 /tmp/out/part-00000
-rw-r--r-- 1 root supergroup 108 2019-11-15 00:44 /tmp/out/part-00001
-rw-r--r-- 1 root supergroup 108 2019-11-15 00:44 /tmp/out/part-00002
-rw-r--r-- 1 root supergroup 124 2019-11-15 00:44 /tmp/out/part-00003
[12]:
#
# Una vez se tiene una secuencia de pares almacenada en el hdfs,
# es posible volverla a cargar mediante sequenceFile().
#
sc.sequenceFile("/tmp/out").collect()
[12]:
[(1, 1), (2, 4), (3, 9), (4, 16), (5, 25)]
[13]:
# Se limpia el hdfs
!hdfs dfs -rm -r -f /tmp/out
Deleted /tmp/out
Lectura de archivos de texto con textFile y wholeTextFiles#
[14]:
#
# textFile() puede leer un archivo de texto o los archivos
# especificados. A continuación, se leen todos los archivos
# de texto de la carpeta /tmp/wordcount/ en el hdfs
#
rdd = sc.textFile("/tmp/wordcount/")
rdd
[14]:
/tmp/wordcount/ MapPartitionsRDD[11] at textFile at NativeMethodAccessorImpl.java:0
[15]:
#
# Se obtiene el timpo de dato almacenado en la variable rdd
#
type(rdd)
[15]:
pyspark.rdd.RDD
[16]:
#
# rdd almacena como strings, el contenido de los archivos
# text0.txt, text1.txt y text2.txt
#
rdd.collect()
[16]:
['Analytics is the discovery, interpretation, and communication of meaningful patterns ',
'in data. Especially valuable in areas rich with recorded information, analytics relies ',
'on the simultaneous application of statistics, computer programming and operations research ',
'to quantify performance.',
'',
'Organizations may apply analytics to business data to describe, predict, and improve business ',
'performance. Specifically, areas within analytics include predictive analytics, prescriptive ',
'analytics, enterprise decision management, descriptive analytics, cognitive analytics, Big ',
'Data Analytics, retail analytics, store assortment and stock-keeping unit optimization, ',
'marketing optimization and marketing mix modeling, web analytics, call analytics, speech ',
'analytics, sales force sizing and optimization, price and promotion modeling, predictive ',
'science, credit risk analysis, and fraud analytics. Since analytics can require extensive ',
'computation (see big data), the algorithms and software used for analytics harness the most ',
'current methods in computer science, statistics, and mathematics.',
'The field of data analysis. Analytics often involves studying past historical data to ',
'research potential trends, to analyze the effects of certain decisions or events, or to ',
'evaluate the performance of a given tool or scenario. The goal of analytics is to improve ',
'the business by gaining knowledge which can be used to make improvements or changes.',
'Data analytics (DA) is the process of examining data sets in order to draw conclusions ',
'about the information they contain, increasingly with the aid of specialized systems ',
'and software. Data analytics technologies and techniques are widely used in commercial ',
'industries to enable organizations to make more-informed business decisions and by ',
'scientists and researchers to verify or disprove scientific models, theories and ',
'hypotheses.']
[17]:
#
# wholeTextFiles() retorna una lista de pares que contienen el nombre
# del archivo y su correspondiente texto
#
rdd = sc.wholeTextFiles("/tmp/wordcount/")
for row in rdd.collect():
print(row)
print('----')
('hdfs://0.0.0.0:9000/tmp/wordcount/text0.txt', 'Analytics is the discovery, interpretation, and communication of meaningful patterns \nin data. Especially valuable in areas rich with recorded information, analytics relies \non the simultaneous application of statistics, computer programming and operations research \nto quantify performance.\n\nOrganizations may apply analytics to business data to describe, predict, and improve business \nperformance. Specifically, areas within analytics include predictive analytics, prescriptive \nanalytics, enterprise decision management, descriptive analytics, cognitive analytics, Big \nData Analytics, retail analytics, store assortment and stock-keeping unit optimization, \nmarketing optimization and marketing mix modeling, web analytics, call analytics, speech \nanalytics, sales force sizing and optimization, price and promotion modeling, predictive \nscience, credit risk analysis, and fraud analytics. Since analytics can require extensive \ncomputation (see big data), the algorithms and software used for analytics harness the most \ncurrent methods in computer science, statistics, and mathematics.\n')
----
('hdfs://0.0.0.0:9000/tmp/wordcount/text1.txt', 'The field of data analysis. Analytics often involves studying past historical data to \nresearch potential trends, to analyze the effects of certain decisions or events, or to \nevaluate the performance of a given tool or scenario. The goal of analytics is to improve \nthe business by gaining knowledge which can be used to make improvements or changes.\n')
----
('hdfs://0.0.0.0:9000/tmp/wordcount/text2.txt', 'Data analytics (DA) is the process of examining data sets in order to draw conclusions \nabout the information they contain, increasingly with the aid of specialized systems \nand software. Data analytics technologies and techniques are widely used in commercial \nindustries to enable organizations to make more-informed business decisions and by \nscientists and researchers to verify or disprove scientific models, theories and \nhypotheses.\n')
----
Operaciones sobre RDD#
Spark soporta dos tipos de transformaciones sobre RDD:
Transformaciones: Son funciones que crean un nuevo RDD a partir de uno existente. Estas pueden entenderse como equivalentes a la función Map en el algorimto MapReduce.
Acciones: Retornan un valor después de realizar una computación sobre el RDD. Pueden entenderse como un reducer.
Ver http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame
En este ejemplo se computa la cantidad de caracteres leídos de un grupo de archivos.
[18]:
#
# Carga de los archivos.
#
rdd = sc.textFile("/tmp/wordcount/")
#
# Calculo de la longitud de cada línea usando la función len().
# Se aplica la función len() a cada elemento del rdd
#
rdd = rdd.map(len)
print(rdd.collect())
#
# Calculo del total de caracteres
#
rdd = rdd.reduce(lambda a, b: a+b)
rdd
[85, 87, 92, 24, 0, 94, 93, 91, 88, 89, 89, 90, 92, 65, 86, 88, 90, 84, 87, 85, 87, 83, 81, 11]
[18]:
1861
[19]:
#
# En este ejemplo se pasa una función arbitraria a `map`.
# La función add es equivalente a `lambda a, b: a+b`
#
from operator import add
rdd = sc.textFile("/tmp/wordcount/")
rdd = rdd.map(len)
print(rdd.collect())
rdd = rdd.reduce(add)
rdd
[85, 87, 92, 24, 0, 94, 93, 91, 88, 89, 89, 90, 92, 65, 86, 88, 90, 84, 87, 85, 87, 83, 81, 11]
[19]:
1861
Transformaciones#
Las transformaciones son las siguientes:
map( f ): Aplica la función f a cada elemento del dataset.
filter( f ): retorna un dataset con los elementos para los cuales f retorna
True
.flatMap( f ): Es similar a
map
pero cada item de entrada es mapeado a los items de salida.mapPattitions( f ): Similar a
map
pero se ejecuta separadamente en cada partición.sample( withReplacement, fraction, seed ): muestrea una fracción de los datos.
union( otherRDD ): retorna la unión de los dos RDD.
intersection( otherRDD ): retorna la intersección de los dos RDD.
distinct(): retorna los elementos diferentes en el RDD.
groupByKey(): retorna los elementos agrupados por clave.
reduceByKey( f ): reduce por clave usando la función f.
aggregateByKey(): agrega los elementos por clave.
sortByKey(): retorna RDD ordenado por clave.
join( otherRDD ): para los RDD (K, V) y (K, W) retorna (K, (V, W)).
cogroup( otherRDD ): para los RDD (K, V) y (K, W) retorna (K, (iterable(V), iterable(K))).
cartesian( otherRDD ): retorna el producto cartesiano de los dos RDD.
Acciones#
reduce( f ): agrega los elementos con la misma clave usando f.
collect(): retorna todos los elementos del RDD.
count(): Retorna el número de elementos del RDD.
first(): Retorna el primer elemento del RDD.
take( n ): Retorna los primeros n elementos del RDD.
takeSample(withReplacement, n): Retorna una muestra aleatoria del dataset.
takeOrdered(n): Retorna los primeros n elementos del RDD ordenado.
saveAsText(path): Salva el RDD como un arhivo en disco.
saveAsSequenceFile(path): Salva como clave-valor a Hadoop.
countByKey(): cuenta los elementos por clave.
foreach( f ): le aplica la función f a cada elemento del RDD.
Ejemplos#
A continuación se presentan varios ejemplos de cálculos comúnes, implementados como operaciones sobre RDDs.
Ejemplo 1#
Conteo de frecuencia de palabras.
[20]:
#
# creación de parejas (key, value). Se desea
# contar el número de ocurrencias de cada letra
# en la siguiente lista.
#
rdd = sc.parallelize(["a", "b", "a", "c", "d", "a", "b"])
##
# Se crean las parejas <clave, valor>
##
rdd = rdd.map(lambda s: (s, 1))
rdd.collect()
[20]:
[('a', 1), ('b', 1), ('a', 1), ('c', 1), ('d', 1), ('a', 1), ('b', 1)]
[21]:
#
# Se reducen las parejas con la misma clave, sumando
# los valores.
#
rdd = rdd.reduceByKey(lambda a, b: a + b)
print(rdd.collect())
[('b', 2), ('c', 1), ('a', 3), ('d', 1)]
Ejemplo 2#
Para este ejemplo, se genera el siguiente archivo de datos separados por tabuladores.
[22]:
%%writefile data.txt
E 1 b,g,f jjj:3,bbb:0,ddd:9,ggg:8,hhh:2
A 2 a,f,c ccc:2,ddd:0,aaa:3,hhh:9
B 5 f,e,a,c ddd:2,ggg:5,ccc:6,jjj:1
A 3 a,b hhh:9,iii:5,eee:7,bbb:1
C 6 f,g,d,a iii:6,ddd:5,eee:4,jjj:3
A 7 c,d bbb:2,hhh:0,ccc:4,fff:1,aaa:7
A 9 g,d,a aaa:5,fff:8,ddd:2,iii:0,jjj:7,ccc:1
B 1 b,a fff:3,hhh:1,ddd:2
E 2 d,e,a,f eee:4,ccc:5,iii:9,fff:7,ggg:6,bbb:0
B 3 d,b,g,f bbb:7,jjj:9,fff:5,iii:4,ggg:2,eee:3
C 7 d,c,f,b hhh:6,eee:4,iii:0,fff:2,jjj:1
C 5 d,e,a,c bbb:7,iii:6,ggg:9
D 3 g,e,f,b bbb:9,aaa:3,ccc:6,fff:4,eee:2
E 8 c,f aaa:8,ddd:5,jjj:1
B 9 d,b ccc:0,jjj:6,fff:7,ddd:3,aaa:2
D 1 f,e ccc:0,eee:6,bbb:9,ddd:3
E 3 e,b,f bbb:6,iii:3,hhh:5,fff:4,ggg:9,ddd:2
D 5 g,a hhh:4,jjj:5,ccc:9
E 8 e,c,f,a ccc:1,iii:6,fff:9
E 9 e,a bbb:9,aaa:3,fff:1
E 7 e,f ddd:9,iii:2,aaa:4
E 3 c,b,g ccc:5,fff:8,iii:7
D 5 c,f,a eee:3,jjj:2,ddd:7
A 1 f,a,d jjj:1,ggg:0,ccc:7,ddd:9,bbb:3
E 4 c,d jjj:6,ccc:0,aaa:1,hhh:9,iii:7,ggg:8
E 6 e,d,c fff:3,eee:6,iii:4,bbb:7,ddd:0,ccc:1
A 8 a,e,f fff:0,ddd:5,ccc:4
E 5 c,a,g ggg:6,hhh:3,ddd:9,ccc:0,jjj:7
A 6 f,e hhh:6,jjj:0,eee:5,iii:7,ccc:3
C 0 f,c,a,g eee:1,fff:4,aaa:2,ccc:7,ggg:0,ddd:6
A 1 b,f ccc:6,aaa:9,eee:5,ddd:0,bbb:3
D 2 b,f bbb:7,hhh:1,aaa:6,iii:4,fff:9,ddd:5
E 5 a,c fff:3,ccc:1,ggg:2,eee:5
B 4 b,f,c iii:7,ggg:3,ddd:0,jjj:8,hhh:5,ccc:1
B 6 f,a,e hhh:6,ccc:3,jjj:0,bbb:8,ddd:7
D 7 a,f aaa:0,fff:5,ddd:3
B 8 c,a ddd:5,jjj:2,iii:7,ccc:0,bbb:4
C 9 c,a,e,f eee:0,fff:2,hhh:6
E 1 e,d fff:9,iii:2,eee:0
E 5 f,a,d hhh:8,ggg:3,jjj:5
Overwriting data.txt
[23]:
# Copia el archivo del sistema local al hdfs
!hdfs dfs -copyFromLocal data.txt /tmp/data.txt
Parte a
Compute la suma de la columna 2.
[24]:
# carga los datos
rdd = sc.textFile("/tmp/data.txt")
# separa los datos por los tabuladores
rdd = rdd.map(lambda x: x.split('\t'))
rdd.collect()
[24]:
[['E', '1', 'b,g,f', 'jjj:3,bbb:0,ddd:9,ggg:8,hhh:2'],
['A', '2', 'a,f,c', 'ccc:2,ddd:0,aaa:3,hhh:9'],
['B', '5', 'f,e,a,c', 'ddd:2,ggg:5,ccc:6,jjj:1'],
['A', '3', 'a,b', 'hhh:9,iii:5,eee:7,bbb:1'],
['C', '6', 'f,g,d,a', 'iii:6,ddd:5,eee:4,jjj:3'],
['A', '7', 'c,d', 'bbb:2,hhh:0,ccc:4,fff:1,aaa:7'],
['A', '9', 'g,d,a', 'aaa:5,fff:8,ddd:2,iii:0,jjj:7,ccc:1'],
['B', '1', 'b,a', 'fff:3,hhh:1,ddd:2'],
['E', '2', 'd,e,a,f', 'eee:4,ccc:5,iii:9,fff:7,ggg:6,bbb:0'],
['B', '3', 'd,b,g,f', 'bbb:7,jjj:9,fff:5,iii:4,ggg:2,eee:3'],
['C', '7', 'd,c,f,b', 'hhh:6,eee:4,iii:0,fff:2,jjj:1'],
['C', '5', 'd,e,a,c', 'bbb:7,iii:6,ggg:9'],
['D', '3', 'g,e,f,b', 'bbb:9,aaa:3,ccc:6,fff:4,eee:2'],
['E', '8', 'c,f', 'aaa:8,ddd:5,jjj:1'],
['B', '9', 'd,b', 'ccc:0,jjj:6,fff:7,ddd:3,aaa:2'],
['D', '1', 'f,e', 'ccc:0,eee:6,bbb:9,ddd:3'],
['E', '3', 'e,b,f', 'bbb:6,iii:3,hhh:5,fff:4,ggg:9,ddd:2'],
['D', '5', 'g,a', 'hhh:4,jjj:5,ccc:9'],
['E', '8', 'e,c,f,a', 'ccc:1,iii:6,fff:9'],
['E', '9', 'e,a', 'bbb:9,aaa:3,fff:1'],
['E', '7', 'e,f', 'ddd:9,iii:2,aaa:4'],
['E', '3', 'c,b,g', 'ccc:5,fff:8,iii:7'],
['D', '5', 'c,f,a', 'eee:3,jjj:2,ddd:7'],
['A', '1', 'f,a,d', 'jjj:1,ggg:0,ccc:7,ddd:9,bbb:3'],
['E', '4', 'c,d', 'jjj:6,ccc:0,aaa:1,hhh:9,iii:7,ggg:8'],
['E', '6', 'e,d,c', 'fff:3,eee:6,iii:4,bbb:7,ddd:0,ccc:1'],
['A', '8', 'a,e,f', 'fff:0,ddd:5,ccc:4'],
['E', '5', 'c,a,g', 'ggg:6,hhh:3,ddd:9,ccc:0,jjj:7'],
['A', '6', 'f,e', 'hhh:6,jjj:0,eee:5,iii:7,ccc:3'],
['C', '0', 'f,c,a,g', 'eee:1,fff:4,aaa:2,ccc:7,ggg:0,ddd:6'],
['A', '1', 'b,f', 'ccc:6,aaa:9,eee:5,ddd:0,bbb:3'],
['D', '2', 'b,f', 'bbb:7,hhh:1,aaa:6,iii:4,fff:9,ddd:5'],
['E', '5', 'a,c', 'fff:3,ccc:1,ggg:2,eee:5'],
['B', '4', 'b,f,c', 'iii:7,ggg:3,ddd:0,jjj:8,hhh:5,ccc:1'],
['B', '6', 'f,a,e', 'hhh:6,ccc:3,jjj:0,bbb:8,ddd:7'],
['D', '7', 'a,f', 'aaa:0,fff:5,ddd:3'],
['B', '8', 'c,a', 'ddd:5,jjj:2,iii:7,ccc:0,bbb:4'],
['C', '9', 'c,a,e,f', 'eee:0,fff:2,hhh:6'],
['E', '1', 'e,d', 'fff:9,iii:2,eee:0'],
['E', '5', 'f,a,d', 'hhh:8,ggg:3,jjj:5']]
[25]:
# extrae la segunda columna y la convierte en número
rdd = rdd.map(lambda x: int(x[1]))
rdd.collect()[0:5] # se imprimen los primeros cinco valores
[25]:
[1, 2, 5, 3, 6]
[26]:
from operator import add
rdd.reduce(add) # Se reduce sumando los valores. Note que no hay parejas clave-valor
[26]:
190
[27]:
#
# El codigo se puede reescribir como:
#
sc.textFile("/tmp/data.txt") \
.map(lambda x: x.split('\t')) \
.map(lambda x: int(x[1])) \
.reduce(add)
[27]:
190
Parte b
Genere un RDD donde la clave sea el número de la columna 2, y el valor sean las letras correspondientes de la columna 1 del archivo original.
[28]:
# Carga el archivo y lo separa por tabuladores
rdd = sc.textFile("/tmp/data.txt").map(lambda x: x.split('\t'))
#
# Se forman parejas <columna 2, [columna 1]> puesto que
# se desea agrupar por la columna 2. Esto es,
# [ (1, E), (2, A), ..., (1, E), (5, E)]
# Note que el segundo elemento del par es una lista.
#
rdd = rdd.map(lambda x: (x[1], [x[0]]))
#
rdd.reduceByKey(add).collect()
[28]:
[('1', ['E', 'B', 'D', 'A', 'A', 'E']),
('9', ['A', 'B', 'E', 'C']),
('8', ['E', 'E', 'A', 'B']),
('4', ['E', 'B']),
('0', ['C']),
('2', ['A', 'E', 'D']),
('5', ['B', 'C', 'D', 'D', 'E', 'E', 'E']),
('3', ['A', 'B', 'D', 'E', 'E']),
('6', ['C', 'E', 'A', 'B']),
('7', ['A', 'C', 'E', 'D'])]
Parte c
Calcule la cantidad de registros por clave de la columna 4. En otras palabras, ¿cuántos registros hay que tengan la clave aaa
?
[29]:
# Carga el archivo y lo separa por tabuladores
rdd = sc.textFile("/tmp/data.txt").map(lambda x: x.split('\t'))
#
# Se obtiene un rdd que contenga únicamente la columna 4.
#
# ['jjj:3,bbb:0,ddd:9,ggg:8,hhh:2',
# ...
# 'hhh:8,ggg:3,jjj:5']
#
rdd = rdd.map(lambda x: x[3])
#
# Se parte por la ','. flatMap() genera un nuevo registro
# en el rdd por cada valor en la lista retornada por
# la función lambda x: x.split(',')
#
# ['jjj:3',
# 'bbb:0',
# ......
# 'ggg:3',
# 'jjj:5']
#
rdd = rdd.flatMap(lambda x: x.split(','))
#
# Separa por los ':' y toma el primer elemento.
#
# ['jjj',
# 'bbb',
# .....
# 'ggg',
# 'jjj']
#
rdd = rdd.map(lambda x: x.split(':')[0])
#
# Se aplica el algoritmo de conteo de palabras
#
rdd = rdd.map(lambda s: (s, 1)).reduceByKey(add)
rdd.collect()
[29]:
[('jjj', 18),
('ccc', 23),
('aaa', 13),
('iii', 18),
('eee', 15),
('bbb', 16),
('ddd', 23),
('ggg', 13),
('hhh', 16),
('fff', 20)]
Parte d
Genere una tabla que contenga la primera columna, la cantidad de elementos en la columna 3 y la cantidad de elementos en la columna 4. La columna 4 es una lista de claves y valores separados por comas.
[30]:
# Carga el archivo y lo separa por tabuladores
rdd = sc.textFile("/tmp/data.txt").map(lambda x: x.split('\t'))
#
# Elimina la columna 2. Queda así:
#
# [['E', 'b,g,f', 'jjj:3,bbb:0,ddd:9,ggg:8,hhh:2'],
# ['A', 'a,f,c', 'ccc:2,ddd:0,aaa:3,hhh:9'],
# ....
# ['E', 'e,d', 'fff:9,iii:2,eee:0'],
# ['E', 'f,a,d', 'hhh:8,ggg:3,jjj:5']]
#
rdd = rdd.map(lambda x: [x[0], x[2], x[3]])
#
# Parte las columnas 2 y 3 por la coma y cuenta los elementos
#
rdd = rdd.map(lambda x: [x[0], \
len(x[1].split(',')), \
len(x[2].split(','))])
rdd.collect()
[30]:
[['E', 3, 5],
['A', 3, 4],
['B', 4, 4],
['A', 2, 4],
['C', 4, 4],
['A', 2, 5],
['A', 3, 6],
['B', 2, 3],
['E', 4, 6],
['B', 4, 6],
['C', 4, 5],
['C', 4, 3],
['D', 4, 5],
['E', 2, 3],
['B', 2, 5],
['D', 2, 4],
['E', 3, 6],
['D', 2, 3],
['E', 4, 3],
['E', 2, 3],
['E', 2, 3],
['E', 3, 3],
['D', 3, 3],
['A', 3, 5],
['E', 2, 6],
['E', 3, 6],
['A', 3, 3],
['E', 3, 5],
['A', 2, 5],
['C', 4, 6],
['A', 2, 5],
['D', 2, 6],
['E', 2, 4],
['B', 3, 6],
['B', 3, 5],
['D', 2, 3],
['B', 2, 5],
['C', 4, 3],
['E', 2, 3],
['E', 3, 3]]
Parte e
Compute la cantidad de registros por letra de la columna 3 y clave de al columna 4; esto es, por ejemplo, la cantidad de registros en tienen la letra a
en la columna 3 y la clave aaa
en la columna 4 es:
((a,aaa), 5)
[31]:
# Carga el archivo y lo separa por tabuladores
rdd = sc.textFile("/tmp/data.txt").map(lambda x: x.split('\t'))
#
# Elimina las dos primeras columnas.
#
# Resultado:
#
# [['b,g,f', 'jjj:3,bbb:0,ddd:9,ggg:8,hhh:2'],
# ['a,f,c', 'ccc:2,ddd:0,aaa:3,hhh:9'],
# ...
# ['e,d', 'fff:9,iii:2,eee:0'],
# ['f,a,d', 'hhh:8,ggg:3,jjj:5']]
#
rdd = rdd.map(lambda x: [x[2], x[3]])
#
# Expande el dataset generando un registro por cada
# elemento de la columna 1.
#
# Resultado:
#
# [['b', 'jjj:3,bbb:0,ddd:9,ggg:8,hhh:2'],
# ['g', 'jjj:3,bbb:0,ddd:9,ggg:8,hhh:2'],
# ...
# ['a', 'hhh:8,ggg:3,jjj:5'],
# ['d', 'hhh:8,ggg:3,jjj:5']]
#
rdd = rdd.flatMap(lambda x: [[e, x[1]] for e in x[0].split(',')])
#
# Cambia la segunda columna por una lista
# partiendo los elementos por las comas
##
# Resultado:
#
# [['b', ['jjj:3', 'bbb:0', 'ddd:9', 'ggg:8', 'hhh:2']],
# ['g', ['jjj:3', 'bbb:0', 'ddd:9', 'ggg:8', 'hhh:2']],
# ....
# ['a', ['hhh:8', 'ggg:3', 'jjj:5']],
# ['d', ['hhh:8', 'ggg:3', 'jjj:5']]]
#
rdd = rdd.map(lambda x: [x[0], x[1].split(',')])
#
# Parte cada elemento por los ':' y toma el primer elemento
#
# Resultado:
#
# [['b', ['jjj', 'bbb', 'ddd', 'ggg', 'hhh']],
# ['g', ['jjj', 'bbb', 'ddd', 'ggg', 'hhh']],
# ...
# ['a', ['hhh', 'ggg', 'jjj']],
# ['d', ['hhh', 'ggg', 'jjj']]]
#
rdd = rdd.map(lambda x: [x[0], [a.split(':')[0] for a in x[1] ]])
#
# Genera las parejas clave valor.
#
# Resultado:
#
# [(('b', 'jjj'), 1),
# (('b', 'bbb'), 1),
# ...
# (('d', 'ggg'), 1),
# (('d', 'jjj'), 1)]
#
rdd = rdd.flatMap(lambda x: [((x[0], a), 1) for a in x[1]])
#
# Reduce por clave para hacer el conteo.
# Se usa sortByKey() para ordenas la salida
# por claves
#
rdd.reduceByKey(add).sortByKey().collect()
[31]:
[(('a', 'aaa'), 5),
(('a', 'bbb'), 7),
(('a', 'ccc'), 13),
(('a', 'ddd'), 13),
(('a', 'eee'), 7),
(('a', 'fff'), 10),
(('a', 'ggg'), 8),
(('a', 'hhh'), 8),
(('a', 'iii'), 7),
(('a', 'jjj'), 10),
(('b', 'aaa'), 4),
(('b', 'bbb'), 7),
(('b', 'ccc'), 5),
(('b', 'ddd'), 7),
(('b', 'eee'), 5),
(('b', 'fff'), 8),
(('b', 'ggg'), 4),
(('b', 'hhh'), 7),
(('b', 'iii'), 7),
(('b', 'jjj'), 5),
(('c', 'aaa'), 5),
(('c', 'bbb'), 4),
(('c', 'ccc'), 12),
(('c', 'ddd'), 9),
(('c', 'eee'), 6),
(('c', 'fff'), 8),
(('c', 'ggg'), 7),
(('c', 'hhh'), 7),
(('c', 'iii'), 8),
(('c', 'jjj'), 8),
(('d', 'aaa'), 4),
(('d', 'bbb'), 6),
(('d', 'ccc'), 7),
(('d', 'ddd'), 5),
(('d', 'eee'), 6),
(('d', 'fff'), 8),
(('d', 'ggg'), 6),
(('d', 'hhh'), 4),
(('d', 'iii'), 9),
(('d', 'jjj'), 8),
(('e', 'aaa'), 3),
(('e', 'bbb'), 8),
(('e', 'ccc'), 9),
(('e', 'ddd'), 7),
(('e', 'eee'), 7),
(('e', 'fff'), 9),
(('e', 'ggg'), 4),
(('e', 'hhh'), 4),
(('e', 'iii'), 8),
(('e', 'jjj'), 3),
(('f', 'aaa'), 8),
(('f', 'bbb'), 10),
(('f', 'ccc'), 13),
(('f', 'ddd'), 17),
(('f', 'eee'), 11),
(('f', 'fff'), 11),
(('f', 'ggg'), 9),
(('f', 'hhh'), 10),
(('f', 'iii'), 10),
(('f', 'jjj'), 12),
(('g', 'aaa'), 3),
(('g', 'bbb'), 3),
(('g', 'ccc'), 6),
(('g', 'ddd'), 5),
(('g', 'eee'), 4),
(('g', 'fff'), 5),
(('g', 'ggg'), 4),
(('g', 'hhh'), 3),
(('g', 'iii'), 4),
(('g', 'jjj'), 6)]
Limpieza de la carpeta de trabajo
[32]:
!hdfs dfs -rm -r -f /tmp/wordcount
!hdfs dfs -rm /tmp/data.txt
!hdfs dfs -ls /tmp
Deleted /tmp/wordcount
Deleted /tmp/data.txt