Operaciones SQL en PySpark#

  • 30 min | Última modificación: Junio 22, 2019

Spark SQL es una interfaz para el procesamiento de datos estructurados usando el lenguaje SQL. En adición, Spark SQL también puede ser usado para leer datos de Apache Hive. Spark SQL opera sobre DataFrames, los cuales son Datasets (RDD) organizado por columnas identificadas por nombres, los cuales equivalen a tablas en los sistemas de bases de datos relacionales.

Al finalizar este tutorial, el lector estará en capacidad de:

  • Crear DataFrames a partir de archivos en distintos formatos.

  • Aplicar operaciones de selección, filtrado y agregación a un DataFrame.

  • Aplicar consultas en SQL sobre un DataFrame.

  • Aplicar consultas en SQL directamente sobre archivos.

  • Escribir los resultados de operaciones al disco.

Preparación

[1]:
#
# findspark permite usar pyspark (interfaz de Python a Spark),
# desde cualquier programa escrito en Python.
#
import findspark

findspark.init()

from pyspark.sql import SparkSession

#
# A continuación se inicializan las variables obligatorias
# requeridas para trabajar con Spark desde Python:
#
#  SparkContext representa la conexión al cluster de Spark.
#  SparkConf representa la configuración particular de una aplicación
#     escrita en Spark.
#  SparkSession representa la conexión para trabajar con SQL.
#
from pyspark import SparkConf, SparkContext

sparkConf = SparkConf().setAppName("My SparkQL Application")
sc = SparkContext(conf=sparkConf)
spark = SparkSession(sc)
22/05/18 04:45:13 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/18 04:45:14 WARN util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.

Creación de DataFrames#

A continuación se presenta la carga de DataFrames desde diferentes formatos.

Formato JSON#

Se crea un archivo en formato JSON en la máquina local.

[2]:
%%writefile /tmp/people.json
{
    "id": 1,
    "firstname": "Vivian",
    "surname": "Hamilton",
    "birthdate": "1971-07-08",
    "color": "green",
    "quantity": 1,
}
{
    "id": 2,
    "firstname": "Karen",
    "surname": "Holcomb",
    "birthdate": "1974-05-23",
    "color": "green",
    "quantity": 4,
}
{
    "id": 3,
    "firstname": "Cody",
    "surname": "Garrett",
    "birthdate": "1973-04-22",
    "color": "orange",
    "quantity": 1,
}
{
    "id": 4,
    "firstname": "Roth",
    "surname": "Fry",
    "birthdate": "1975-01-29",
    "color": "black",
    "quantity": 1,
}
{
    "id": 5,
    "firstname": "Zoe",
    "surname": "Conway",
    "birthdate": "1974-07-03",
    "color": "blue",
    "quantity": 2,
}
{
    "id": 6,
    "firstname": "Gretchen",
    "surname": "Kinney",
    "birthdate": "1974-10-18",
    "color": "violet",
    "quantity": 1,
}
{
    "id": 7,
    "firstname": "Driscoll",
    "surname": "Klein",
    "birthdate": "1970-10-05",
    "color": "blue",
    "quantity": 5,
}
{
    "id": 8,
    "firstname": "Karyn",
    "surname": "Diaz",
    "birthdate": "1969-02-24",
    "color": "red",
    "quantity": 1,
}
{
    "id": 9,
    "firstname": "Merritt",
    "surname": "Guy",
    "birthdate": "1974-10-17",
    "color": "indigo",
    "quantity": 4,
}
{
    "id": 10,
    "firstname": "Kylan",
    "surname": "Sexton",
    "birthdate": "1975-02-28",
    "color": "black",
    "quantity": 4,
}
{
    "id": 11,
    "firstname": "Jordan",
    "surname": "Estes",
    "birthdate": "1969-12-07",
    "color": "indigo",
    "quantity": 4,
}
{
    "id": 12,
    "firstname": "Hope",
    "surname": "Coffey",
    "birthdate": "1973-12-24",
    "color": "green",
    "quantity": 5,
}
{
    "id": 13,
    "firstname": "Vivian",
    "surname": "Crane",
    "birthdate": "1970-08-27",
    "color": "gray",
    "quantity": 5,
}
{
    "id": 14,
    "firstname": "Clio",
    "surname": "Noel",
    "birthdate": "1972-12-12",
    "color": "red",
    "quantity": 5,
}
{
    "id": 15,
    "firstname": "Hope",
    "surname": "Silva",
    "birthdate": "1970-07-01",
    "color": "blue",
    "quantity": 5,
}
{
    "id": 16,
    "firstname": "Ayanna",
    "surname": "Jarvis",
    "birthdate": "1974-02-11",
    "color": "orange",
    "quantity": 5,
}
{
    "id": 17,
    "firstname": "Chanda",
    "surname": "Boyer",
    "birthdate": "1973-04-01",
    "color": "green",
    "quantity": 4,
}
{
    "id": 18,
    "firstname": "Chadwick",
    "surname": "Knight",
    "birthdate": "1973-04-29",
    "color": "yellow",
    "quantity": 1,
}
Writing /tmp/people.json
[3]:
# Copia el archivo al HDFS
!hdfs dfs -copyFromLocal  /tmp/people.json /tmp/people.json
[4]:
#
# La función spark.read.json() carga directamente
# el archivo en JSON con un DataFrame.
#
df = spark.read.json("/tmp/people.json")

#
# La función show() permite imprimirlo en pantalla
#
df.show()

+----------+------+---------+---+--------+--------+
| birthdate| color|firstname| id|quantity| surname|
+----------+------+---------+---+--------+--------+
|1971-07-08| green|   Vivian|  1|       1|Hamilton|
|1974-05-23| green|    Karen|  2|       4| Holcomb|
|1973-04-22|orange|     Cody|  3|       1| Garrett|
|1975-01-29| black|     Roth|  4|       1|     Fry|
|1974-07-03|  blue|      Zoe|  5|       2|  Conway|
|1974-10-18|violet| Gretchen|  6|       1|  Kinney|
|1970-10-05|  blue| Driscoll|  7|       5|   Klein|
|1969-02-24|   red|    Karyn|  8|       1|    Diaz|
|1974-10-17|indigo|  Merritt|  9|       4|     Guy|
|1975-02-28| black|    Kylan| 10|       4|  Sexton|
|1969-12-07|indigo|   Jordan| 11|       4|   Estes|
|1973-12-24| green|     Hope| 12|       5|  Coffey|
|1970-08-27|  gray|   Vivian| 13|       5|   Crane|
|1972-12-12|   red|     Clio| 14|       5|    Noel|
|1970-07-01|  blue|     Hope| 15|       5|   Silva|
|1974-02-11|orange|   Ayanna| 16|       5|  Jarvis|
|1973-04-01| green|   Chanda| 17|       4|   Boyer|
|1973-04-29|yellow| Chadwick| 18|       1|  Knight|
+----------+------+---------+---+--------+--------+

Formato CSV y TXT#

A continuación se ejemplifica como procesar un archivo de texto para convertirlo en un DataFrame.

[5]:
%%writefile  /tmp/people.csv
1,Vivian,Hamilton,1971-07-08,green,1
2,Karen,Holcomb,1974-05-23,green,4
3,Cody,Garrett,1973-04-22,orange,1
4,Roth,Fry,1975-01-29,black,1
5,Zoe,Conway,1974-07-03,blue,2
6,Gretchen,Kinney,1974-10-18,violet,1
7,Driscoll,Klein,1970-10-05,blue,5
8,Karyn,Diaz,1969-02-24,red,1
9,Merritt,Guy,1974-10-17,indigo,4
10,Kylan,Sexton,1975-02-28,black,4
11,Jordan,Estes,1969-12-07,indigo,4
12,Hope,Coffey,1973-12-24,green,5
13,Vivian,Crane,1970-08-27,gray,5
14,Clio,Noel,1972-12-12,red,5
15,Hope,Silva,1970-07-01,blue,5
16,Ayanna,Jarvis,1974-02-11,orange,5
17,Chanda,Boyer,1973-04-01,green,4
18,Chadwick,Knight,1973-04-29,yellow,1
Writing /tmp/people.csv
[6]:
# copia el archivo al HDFS
!hdfs dfs -rm /tmp/people.csv
!hdfs dfs -copyFromLocal  /tmp/people.csv /tmp/people.csv
rm: `/tmp/people.csv': No such file or directory
[7]:
#
# Row representa una fila en un RDD
#
from pyspark.sql import Row

#
# Lectura del archivo como lineas de texto
#
rdd = sc.textFile("/tmp/people.csv")
rdd.collect()
[7]:
['1,Vivian,Hamilton,1971-07-08,green,1',
 '2,Karen,Holcomb,1974-05-23,green,4',
 '3,Cody,Garrett,1973-04-22,orange,1',
 '4,Roth,Fry,1975-01-29,black,1',
 '5,Zoe,Conway,1974-07-03,blue,2',
 '6,Gretchen,Kinney,1974-10-18,violet,1',
 '7,Driscoll,Klein,1970-10-05,blue,5',
 '8,Karyn,Diaz,1969-02-24,red,1',
 '9,Merritt,Guy,1974-10-17,indigo,4',
 '10,Kylan,Sexton,1975-02-28,black,4',
 '11,Jordan,Estes,1969-12-07,indigo,4',
 '12,Hope,Coffey,1973-12-24,green,5',
 '13,Vivian,Crane,1970-08-27,gray,5',
 '14,Clio,Noel,1972-12-12,red,5',
 '15,Hope,Silva,1970-07-01,blue,5',
 '16,Ayanna,Jarvis,1974-02-11,orange,5',
 '17,Chanda,Boyer,1973-04-01,green,4',
 '18,Chadwick,Knight,1973-04-29,yellow,1']
[8]:
#
# Partición de los strings por las comas
#
rdd = rdd.map(lambda row: row.split(","))
rdd.collect()
[8]:
[['1', 'Vivian', 'Hamilton', '1971-07-08', 'green', '1'],
 ['2', 'Karen', 'Holcomb', '1974-05-23', 'green', '4'],
 ['3', 'Cody', 'Garrett', '1973-04-22', 'orange', '1'],
 ['4', 'Roth', 'Fry', '1975-01-29', 'black', '1'],
 ['5', 'Zoe', 'Conway', '1974-07-03', 'blue', '2'],
 ['6', 'Gretchen', 'Kinney', '1974-10-18', 'violet', '1'],
 ['7', 'Driscoll', 'Klein', '1970-10-05', 'blue', '5'],
 ['8', 'Karyn', 'Diaz', '1969-02-24', 'red', '1'],
 ['9', 'Merritt', 'Guy', '1974-10-17', 'indigo', '4'],
 ['10', 'Kylan', 'Sexton', '1975-02-28', 'black', '4'],
 ['11', 'Jordan', 'Estes', '1969-12-07', 'indigo', '4'],
 ['12', 'Hope', 'Coffey', '1973-12-24', 'green', '5'],
 ['13', 'Vivian', 'Crane', '1970-08-27', 'gray', '5'],
 ['14', 'Clio', 'Noel', '1972-12-12', 'red', '5'],
 ['15', 'Hope', 'Silva', '1970-07-01', 'blue', '5'],
 ['16', 'Ayanna', 'Jarvis', '1974-02-11', 'orange', '5'],
 ['17', 'Chanda', 'Boyer', '1973-04-01', 'green', '4'],
 ['18', 'Chadwick', 'Knight', '1973-04-29', 'yellow', '1']]
[9]:
#
# Se transforma cada elemento del RDD usando la función Row().
# Note que esta función agrega el nombre de la columna a la
# que pertenece cada dato.
#
rdd = rdd.map(
    lambda p: Row(
        id=p[0],
        firstname=p[1],
        surname=p[2],
        birthdate=p[3],
        color=p[4],
        quantity=int(p[5]),
    )
)
rdd.collect()
[9]:
[Row(id='1', firstname='Vivian', surname='Hamilton', birthdate='1971-07-08', color='green', quantity=1),
 Row(id='2', firstname='Karen', surname='Holcomb', birthdate='1974-05-23', color='green', quantity=4),
 Row(id='3', firstname='Cody', surname='Garrett', birthdate='1973-04-22', color='orange', quantity=1),
 Row(id='4', firstname='Roth', surname='Fry', birthdate='1975-01-29', color='black', quantity=1),
 Row(id='5', firstname='Zoe', surname='Conway', birthdate='1974-07-03', color='blue', quantity=2),
 Row(id='6', firstname='Gretchen', surname='Kinney', birthdate='1974-10-18', color='violet', quantity=1),
 Row(id='7', firstname='Driscoll', surname='Klein', birthdate='1970-10-05', color='blue', quantity=5),
 Row(id='8', firstname='Karyn', surname='Diaz', birthdate='1969-02-24', color='red', quantity=1),
 Row(id='9', firstname='Merritt', surname='Guy', birthdate='1974-10-17', color='indigo', quantity=4),
 Row(id='10', firstname='Kylan', surname='Sexton', birthdate='1975-02-28', color='black', quantity=4),
 Row(id='11', firstname='Jordan', surname='Estes', birthdate='1969-12-07', color='indigo', quantity=4),
 Row(id='12', firstname='Hope', surname='Coffey', birthdate='1973-12-24', color='green', quantity=5),
 Row(id='13', firstname='Vivian', surname='Crane', birthdate='1970-08-27', color='gray', quantity=5),
 Row(id='14', firstname='Clio', surname='Noel', birthdate='1972-12-12', color='red', quantity=5),
 Row(id='15', firstname='Hope', surname='Silva', birthdate='1970-07-01', color='blue', quantity=5),
 Row(id='16', firstname='Ayanna', surname='Jarvis', birthdate='1974-02-11', color='orange', quantity=5),
 Row(id='17', firstname='Chanda', surname='Boyer', birthdate='1973-04-01', color='green', quantity=4),
 Row(id='18', firstname='Chadwick', surname='Knight', birthdate='1973-04-29', color='yellow', quantity=1)]
[10]:
#
# createDataFrame() permite crear un DataFrame a partir
# de un RDD, una lista o un pandas.DataFrame
#
df = spark.createDataFrame(rdd)

#
# Crea o reemplaza una vista local del DataFrame
# para poder aplicar funciones como show()
#
df.createOrReplaceTempView("miVista")
# también podría usarse createTempView()
df.show()

spark.catalog.dropTempView("miVista")
+---+---------+--------+----------+------+--------+
| id|firstname| surname| birthdate| color|quantity|
+---+---------+--------+----------+------+--------+
|  1|   Vivian|Hamilton|1971-07-08| green|       1|
|  2|    Karen| Holcomb|1974-05-23| green|       4|
|  3|     Cody| Garrett|1973-04-22|orange|       1|
|  4|     Roth|     Fry|1975-01-29| black|       1|
|  5|      Zoe|  Conway|1974-07-03|  blue|       2|
|  6| Gretchen|  Kinney|1974-10-18|violet|       1|
|  7| Driscoll|   Klein|1970-10-05|  blue|       5|
|  8|    Karyn|    Diaz|1969-02-24|   red|       1|
|  9|  Merritt|     Guy|1974-10-17|indigo|       4|
| 10|    Kylan|  Sexton|1975-02-28| black|       4|
| 11|   Jordan|   Estes|1969-12-07|indigo|       4|
| 12|     Hope|  Coffey|1973-12-24| green|       5|
| 13|   Vivian|   Crane|1970-08-27|  gray|       5|
| 14|     Clio|    Noel|1972-12-12|   red|       5|
| 15|     Hope|   Silva|1970-07-01|  blue|       5|
| 16|   Ayanna|  Jarvis|1974-02-11|orange|       5|
| 17|   Chanda|   Boyer|1973-04-01| green|       4|
| 18| Chadwick|  Knight|1973-04-29|yellow|       1|
+---+---------+--------+----------+------+--------+

A continuación se crea un DataFrame a partir de un archivo CSV.

[11]:
%%writefile  /tmp/people.csv
id,firstname,surname,birthdate,color,quantity
1,Vivian,Hamilton,1971-07-08,green,1
2,Karen,Holcomb,1974-05-23,green,4
3,Cody,Garrett,1973-04-22,orange,1
4,Roth,Fry,1975-01-29,black,1
5,Zoe,Conway,1974-07-03,blue,2
6,Gretchen,Kinney,1974-10-18,violet,1
7,Driscoll,Klein,1970-10-05,blue,5
8,Karyn,Diaz,1969-02-24,red,1
9,Merritt,Guy,1974-10-17,indigo,4
10,Kylan,Sexton,1975-02-28,black,4
11,Jordan,Estes,1969-12-07,indigo,4
12,Hope,Coffey,1973-12-24,green,5
13,Vivian,Crane,1970-08-27,gray,5
14,Clio,Noel,1972-12-12,red,5
15,Hope,Silva,1970-07-01,blue,5
16,Ayanna,Jarvis,1974-02-11,orange,5
17,Chanda,Boyer,1973-04-01,green,4
18,Chadwick,Knight,1973-04-29,yellow,1
Overwriting /tmp/people.csv
[12]:
# Mueve el archivo al sistema hdfs
!hdfs dfs -rm /tmp/people.csv
!hdfs dfs -copyFromLocal  /tmp/people.csv /tmp/people.csv
Deleted /tmp/people.csv
[13]:
#
# Crea un DataFrame a partir del archivo con
# formato CSV
#
df = spark.read.load(
    "/tmp/people.csv", format="csv", sep=",", inferSchema="true", header="true",
)
df.show()
+---+---------+--------+----------+------+--------+
| id|firstname| surname| birthdate| color|quantity|
+---+---------+--------+----------+------+--------+
|  1|   Vivian|Hamilton|1971-07-08| green|       1|
|  2|    Karen| Holcomb|1974-05-23| green|       4|
|  3|     Cody| Garrett|1973-04-22|orange|       1|
|  4|     Roth|     Fry|1975-01-29| black|       1|
|  5|      Zoe|  Conway|1974-07-03|  blue|       2|
|  6| Gretchen|  Kinney|1974-10-18|violet|       1|
|  7| Driscoll|   Klein|1970-10-05|  blue|       5|
|  8|    Karyn|    Diaz|1969-02-24|   red|       1|
|  9|  Merritt|     Guy|1974-10-17|indigo|       4|
| 10|    Kylan|  Sexton|1975-02-28| black|       4|
| 11|   Jordan|   Estes|1969-12-07|indigo|       4|
| 12|     Hope|  Coffey|1973-12-24| green|       5|
| 13|   Vivian|   Crane|1970-08-27|  gray|       5|
| 14|     Clio|    Noel|1972-12-12|   red|       5|
| 15|     Hope|   Silva|1970-07-01|  blue|       5|
| 16|   Ayanna|  Jarvis|1974-02-11|orange|       5|
| 17|   Chanda|   Boyer|1973-04-01| green|       4|
| 18| Chadwick|  Knight|1973-04-29|yellow|       1|
+---+---------+--------+----------+------+--------+

Operaciones sobre DataFrames#

[14]:
#
# Imprime el esquema en formato de arbol
#
df.printSchema()
root
 |-- id: integer (nullable = true)
 |-- firstname: string (nullable = true)
 |-- surname: string (nullable = true)
 |-- birthdate: string (nullable = true)
 |-- color: string (nullable = true)
 |-- quantity: integer (nullable = true)

[15]:
#
# Selección de una columna en particular
#
df.select("firstname").show()
+---------+
|firstname|
+---------+
|   Vivian|
|    Karen|
|     Cody|
|     Roth|
|      Zoe|
| Gretchen|
| Driscoll|
|    Karyn|
|  Merritt|
|    Kylan|
|   Jordan|
|     Hope|
|   Vivian|
|     Clio|
|     Hope|
|   Ayanna|
|   Chanda|
| Chadwick|
+---------+

[16]:
#
# Selección de varias columnas
#
df.select(["firstname", "surname"]).show()
+---------+--------+
|firstname| surname|
+---------+--------+
|   Vivian|Hamilton|
|    Karen| Holcomb|
|     Cody| Garrett|
|     Roth|     Fry|
|      Zoe|  Conway|
| Gretchen|  Kinney|
| Driscoll|   Klein|
|    Karyn|    Diaz|
|  Merritt|     Guy|
|    Kylan|  Sexton|
|   Jordan|   Estes|
|     Hope|  Coffey|
|   Vivian|   Crane|
|     Clio|    Noel|
|     Hope|   Silva|
|   Ayanna|  Jarvis|
|   Chanda|   Boyer|
| Chadwick|  Knight|
+---------+--------+

[17]:
#
# Filtrado de registros usando condicionales
#
df.filter(df["color"] == "blue").show()
+---+---------+-------+----------+-----+--------+
| id|firstname|surname| birthdate|color|quantity|
+---+---------+-------+----------+-----+--------+
|  5|      Zoe| Conway|1974-07-03| blue|       2|
|  7| Driscoll|  Klein|1970-10-05| blue|       5|
| 15|     Hope|  Silva|1970-07-01| blue|       5|
+---+---------+-------+----------+-----+--------+

[18]:
#
# Consultas
#   Se crea una vista temporal
#   que desaparece cuando se cierra la
#   sesión actual de Spark
#
df.createOrReplaceTempView("peopleview")  # este es el nombre de la tabla

# Se realiza la consulta usando directamente SQL
spark.sql("SELECT * FROM peopleview").show()
+---+---------+--------+-------------------+------+--------+
| id|firstname| surname|          birthdate| color|quantity|
+---+---------+--------+-------------------+------+--------+
|  1|   Vivian|Hamilton|1971-07-08 00:00:00| green|       1|
|  2|    Karen| Holcomb|1974-05-23 00:00:00| green|       4|
|  3|     Cody| Garrett|1973-04-22 00:00:00|orange|       1|
|  4|     Roth|     Fry|1975-01-29 00:00:00| black|       1|
|  5|      Zoe|  Conway|1974-07-03 00:00:00|  blue|       2|
|  6| Gretchen|  Kinney|1974-10-18 00:00:00|violet|       1|
|  7| Driscoll|   Klein|1970-10-05 00:00:00|  blue|       5|
|  8|    Karyn|    Diaz|1969-02-24 00:00:00|   red|       1|
|  9|  Merritt|     Guy|1974-10-17 00:00:00|indigo|       4|
| 10|    Kylan|  Sexton|1975-02-28 00:00:00| black|       4|
| 11|   Jordan|   Estes|1969-12-07 00:00:00|indigo|       4|
| 12|     Hope|  Coffey|1973-12-24 00:00:00| green|       5|
| 13|   Vivian|   Crane|1970-08-27 00:00:00|  gray|       5|
| 14|     Clio|    Noel|1972-12-12 00:00:00|   red|       5|
| 15|     Hope|   Silva|1970-07-01 00:00:00|  blue|       5|
| 16|   Ayanna|  Jarvis|1974-02-11 00:00:00|orange|       5|
| 17|   Chanda|   Boyer|1973-04-01 00:00:00| green|       4|
| 18| Chadwick|  Knight|1973-04-29 00:00:00|yellow|       1|
+---+---------+--------+-------------------+------+--------+

[19]:
#
# En el siguiente fragmento de código se
# crea una vista temporal que existe entre
# sesiones y solo desaparece cuando se cierra
# la aplicación actual de Spark
#
df.createGlobalTempView("peopleview")

# percatese de la forma de nombrar la tabla
# en la consulta SQL
spark.sql("SELECT * FROM global_temp.peopleview").show()
+---+---------+--------+-------------------+------+--------+
| id|firstname| surname|          birthdate| color|quantity|
+---+---------+--------+-------------------+------+--------+
|  1|   Vivian|Hamilton|1971-07-08 00:00:00| green|       1|
|  2|    Karen| Holcomb|1974-05-23 00:00:00| green|       4|
|  3|     Cody| Garrett|1973-04-22 00:00:00|orange|       1|
|  4|     Roth|     Fry|1975-01-29 00:00:00| black|       1|
|  5|      Zoe|  Conway|1974-07-03 00:00:00|  blue|       2|
|  6| Gretchen|  Kinney|1974-10-18 00:00:00|violet|       1|
|  7| Driscoll|   Klein|1970-10-05 00:00:00|  blue|       5|
|  8|    Karyn|    Diaz|1969-02-24 00:00:00|   red|       1|
|  9|  Merritt|     Guy|1974-10-17 00:00:00|indigo|       4|
| 10|    Kylan|  Sexton|1975-02-28 00:00:00| black|       4|
| 11|   Jordan|   Estes|1969-12-07 00:00:00|indigo|       4|
| 12|     Hope|  Coffey|1973-12-24 00:00:00| green|       5|
| 13|   Vivian|   Crane|1970-08-27 00:00:00|  gray|       5|
| 14|     Clio|    Noel|1972-12-12 00:00:00|   red|       5|
| 15|     Hope|   Silva|1970-07-01 00:00:00|  blue|       5|
| 16|   Ayanna|  Jarvis|1974-02-11 00:00:00|orange|       5|
| 17|   Chanda|   Boyer|1973-04-01 00:00:00| green|       4|
| 18| Chadwick|  Knight|1973-04-29 00:00:00|yellow|       1|
+---+---------+--------+-------------------+------+--------+

[20]:
#
# Cuenta cuandos registros hay por cada valor en
# la columna quantity
#
df.groupBy("quantity").count().show()
+--------+-----+
|quantity|count|
+--------+-----+
|       1|    6|
|       5|    6|
|       4|    5|
|       2|    1|
+--------+-----+

Funciones#

[21]:
#
# La función agg recibe un diccionario que indica
# con que función (valor del diccionario) se agrega
# una determinada columna (clave del diccionario)
#
df.agg({"quantity": "max"}).collect()
[21]:
[Row(max(quantity)=5)]
[22]:
#
# Tmabien pueden usarse las funciones implementadas
# en el modulo sql de pyspark. En el siguiente ejemplo
# se carga el módulo de funciones con el nombre de F.
# Se obtiene el valor máximo de la columna quantity
#
from pyspark.sql import functions as F

df.agg(F.max(df.quantity)).collect()
[22]:
[Row(max(quantity)=5)]

Las funciones implementadas en el módulo pyspark.sql.functions se encuentran documentadas en http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#module-pyspark.sql.functions

Escritura de un resultado al HDFS#

[23]:
#
# Los resultados se escriben al disco con write.save().
# El parámetro es el nombre del directorio. La llamada a
# la función crea el archivo _SUCCESS que indica que la
# función se ejecutó correctamente, y los archivos
# con el resultado.
#
df.filter(df["color"] == "blue").write.save("/tmp/demo")
[24]:
!hdfs dfs -ls /tmp/demo/*
-rw-r--r--   1 root supergroup          0 2019-11-15 00:45 /tmp/demo/_SUCCESS
-rw-r--r--   1 root supergroup       1553 2019-11-15 00:45 /tmp/demo/part-00000-8ed4efa5-937d-4917-83a0-e3e4d3a0924a-c000.snappy.parquet

Ejecución de SQL directamente sobre archivos#

Spark SQL permite ejecutar directamente SQL sobre archivos indicando el tipo de archivo.

[25]:
#
# SQL sobre un archivo en formato JSON. Note que
# en lugar de la tabla se especifica el formato del
# archivo (json) y entre comillas `` el nombre del
# archivo.
#
spark.sql("SELECT * FROM json.`/tmp/people.json`").show()
+----------+------+---------+---+--------+--------+
| birthdate| color|firstname| id|quantity| surname|
+----------+------+---------+---+--------+--------+
|1971-07-08| green|   Vivian|  1|       1|Hamilton|
|1974-05-23| green|    Karen|  2|       4| Holcomb|
|1973-04-22|orange|     Cody|  3|       1| Garrett|
|1975-01-29| black|     Roth|  4|       1|     Fry|
|1974-07-03|  blue|      Zoe|  5|       2|  Conway|
|1974-10-18|violet| Gretchen|  6|       1|  Kinney|
|1970-10-05|  blue| Driscoll|  7|       5|   Klein|
|1969-02-24|   red|    Karyn|  8|       1|    Diaz|
|1974-10-17|indigo|  Merritt|  9|       4|     Guy|
|1975-02-28| black|    Kylan| 10|       4|  Sexton|
|1969-12-07|indigo|   Jordan| 11|       4|   Estes|
|1973-12-24| green|     Hope| 12|       5|  Coffey|
|1970-08-27|  gray|   Vivian| 13|       5|   Crane|
|1972-12-12|   red|     Clio| 14|       5|    Noel|
|1970-07-01|  blue|     Hope| 15|       5|   Silva|
|1974-02-11|orange|   Ayanna| 16|       5|  Jarvis|
|1973-04-01| green|   Chanda| 17|       4|   Boyer|
|1973-04-29|yellow| Chadwick| 18|       1|  Knight|
+----------+------+---------+---+--------+--------+

[26]:
#
# SQL sobre un archivo en formato CSV. Note que en este
# caso, los nombres de las columnas se leen como parte
# de la tabla.
#
spark.sql("SELECT * FROM csv.`/tmp/people.csv`").show()
+---+---------+--------+----------+------+--------+
|_c0|      _c1|     _c2|       _c3|   _c4|     _c5|
+---+---------+--------+----------+------+--------+
| id|firstname| surname| birthdate| color|quantity|
|  1|   Vivian|Hamilton|1971-07-08| green|       1|
|  2|    Karen| Holcomb|1974-05-23| green|       4|
|  3|     Cody| Garrett|1973-04-22|orange|       1|
|  4|     Roth|     Fry|1975-01-29| black|       1|
|  5|      Zoe|  Conway|1974-07-03|  blue|       2|
|  6| Gretchen|  Kinney|1974-10-18|violet|       1|
|  7| Driscoll|   Klein|1970-10-05|  blue|       5|
|  8|    Karyn|    Diaz|1969-02-24|   red|       1|
|  9|  Merritt|     Guy|1974-10-17|indigo|       4|
| 10|    Kylan|  Sexton|1975-02-28| black|       4|
| 11|   Jordan|   Estes|1969-12-07|indigo|       4|
| 12|     Hope|  Coffey|1973-12-24| green|       5|
| 13|   Vivian|   Crane|1970-08-27|  gray|       5|
| 14|     Clio|    Noel|1972-12-12|   red|       5|
| 15|     Hope|   Silva|1970-07-01|  blue|       5|
| 16|   Ayanna|  Jarvis|1974-02-11|orange|       5|
| 17|   Chanda|   Boyer|1973-04-01| green|       4|
| 18| Chadwick|  Knight|1973-04-29|yellow|       1|
+---+---------+--------+----------+------+--------+

[27]:
# SQL sobre un archivo en formato CSV
spark.sql("SELECT DISTINCT(_c4)  FROM csv.`/tmp/people.csv`").show()
+------+
|   _c4|
+------+
|violet|
|orange|
| green|
|yellow|
|indigo|
|  gray|
|   red|
| color|
| black|
|  blue|
+------+

Ejemplos#

Los siguientes ejemplos son realizados usando el archivo people.json creado al principio de este tutorial.

[28]:
df = spark.read.json("/tmp/people.json")

Ejemplo 1#

Seleccione las personas cuya fecha de nacimiento sea del año 1974 en adelante.

[29]:
#
# Se usa la función filter() del DataFrame
#
df.filter(df["birthdate"] >= "1974").show()
+----------+------+---------+---+--------+-------+
| birthdate| color|firstname| id|quantity|surname|
+----------+------+---------+---+--------+-------+
|1974-05-23| green|    Karen|  2|       4|Holcomb|
|1975-01-29| black|     Roth|  4|       1|    Fry|
|1974-07-03|  blue|      Zoe|  5|       2| Conway|
|1974-10-18|violet| Gretchen|  6|       1| Kinney|
|1974-10-17|indigo|  Merritt|  9|       4|    Guy|
|1975-02-28| black|    Kylan| 10|       4| Sexton|
|1974-02-11|orange|   Ayanna| 16|       5| Jarvis|
+----------+------+---------+---+--------+-------+

[30]:
#
# Se crea una vista temporal para ejecutar
# una consulta SQL sobre ella
#
df.createOrReplaceTempView("peopleview")
spark.sql("SELECT * FROM peopleview WHERE YEAR(birthdate) >= 1974").show()
+----------+------+---------+---+--------+-------+
| birthdate| color|firstname| id|quantity|surname|
+----------+------+---------+---+--------+-------+
|1974-05-23| green|    Karen|  2|       4|Holcomb|
|1975-01-29| black|     Roth|  4|       1|    Fry|
|1974-07-03|  blue|      Zoe|  5|       2| Conway|
|1974-10-18|violet| Gretchen|  6|       1| Kinney|
|1974-10-17|indigo|  Merritt|  9|       4|    Guy|
|1975-02-28| black|    Kylan| 10|       4| Sexton|
|1974-02-11|orange|   Ayanna| 16|       5| Jarvis|
+----------+------+---------+---+--------+-------+

Ejemplo 2#

Obtenga una lista de colores únicos.

[31]:
#
# Se usa la función distinct() del DataFrame
#
df.select("color").distinct().show()
+------+
| color|
+------+
|violet|
|orange|
| green|
|yellow|
|indigo|
|  gray|
|   red|
| black|
|  blue|
+------+

[32]:
#
# Como una consulta
#
spark.sql("SELECT DISTINCT(color) FROM peopleview").show()
+------+
| color|
+------+
|violet|
|orange|
| green|
|yellow|
|indigo|
|  gray|
|   red|
| black|
|  blue|
+------+

Ejemplo 3#

Ordene la tabla por cantidad y luego por color.

[33]:
#
# Note que las funciones se aplican de derecha
# a izquierda
#
df.orderBy("color").orderBy("quantity").show()
+----------+------+---------+---+--------+--------+
| birthdate| color|firstname| id|quantity| surname|
+----------+------+---------+---+--------+--------+
|1969-02-24|   red|    Karyn|  8|       1|    Diaz|
|1973-04-29|yellow| Chadwick| 18|       1|  Knight|
|1971-07-08| green|   Vivian|  1|       1|Hamilton|
|1975-01-29| black|     Roth|  4|       1|     Fry|
|1974-10-18|violet| Gretchen|  6|       1|  Kinney|
|1973-04-22|orange|     Cody|  3|       1| Garrett|
|1974-07-03|  blue|      Zoe|  5|       2|  Conway|
|1975-02-28| black|    Kylan| 10|       4|  Sexton|
|1973-04-01| green|   Chanda| 17|       4|   Boyer|
|1974-10-17|indigo|  Merritt|  9|       4|     Guy|
|1969-12-07|indigo|   Jordan| 11|       4|   Estes|
|1974-05-23| green|    Karen|  2|       4| Holcomb|
|1970-10-05|  blue| Driscoll|  7|       5|   Klein|
|1972-12-12|   red|     Clio| 14|       5|    Noel|
|1973-12-24| green|     Hope| 12|       5|  Coffey|
|1970-07-01|  blue|     Hope| 15|       5|   Silva|
|1970-08-27|  gray|   Vivian| 13|       5|   Crane|
|1974-02-11|orange|   Ayanna| 16|       5|  Jarvis|
+----------+------+---------+---+--------+--------+

[34]:
#
# Como una consulta de SQL
#
spark.sql("SELECT * FROM peopleview ORDER BY quantity, color").show()
+----------+------+---------+---+--------+--------+
| birthdate| color|firstname| id|quantity| surname|
+----------+------+---------+---+--------+--------+
|1975-01-29| black|     Roth|  4|       1|     Fry|
|1971-07-08| green|   Vivian|  1|       1|Hamilton|
|1973-04-22|orange|     Cody|  3|       1| Garrett|
|1969-02-24|   red|    Karyn|  8|       1|    Diaz|
|1974-10-18|violet| Gretchen|  6|       1|  Kinney|
|1973-04-29|yellow| Chadwick| 18|       1|  Knight|
|1974-07-03|  blue|      Zoe|  5|       2|  Conway|
|1975-02-28| black|    Kylan| 10|       4|  Sexton|
|1973-04-01| green|   Chanda| 17|       4|   Boyer|
|1974-05-23| green|    Karen|  2|       4| Holcomb|
|1969-12-07|indigo|   Jordan| 11|       4|   Estes|
|1974-10-17|indigo|  Merritt|  9|       4|     Guy|
|1970-10-05|  blue| Driscoll|  7|       5|   Klein|
|1970-07-01|  blue|     Hope| 15|       5|   Silva|
|1970-08-27|  gray|   Vivian| 13|       5|   Crane|
|1973-12-24| green|     Hope| 12|       5|  Coffey|
|1974-02-11|orange|   Ayanna| 16|       5|  Jarvis|
|1972-12-12|   red|     Clio| 14|       5|    Noel|
+----------+------+---------+---+--------+--------+

Limpieza del directorio de trabajo

[35]:
!rm people.*
[36]:
!hdfs dfs -rm /tmp/people*
!hdfs dfs -rm -r -f /tmp/demo/
Deleted /tmp/people.csv
Deleted /tmp/people.json
Deleted /tmp/demo