Operaciones básicas en SQLAlquemy ORM#

  • Ultima modificación: Mar 6, 2024 | YouTube

Este tutorial esta basado en https://es.hortonworks.com/tutorial/beginners-guide-to-apache-pig/

Fuentes de datos#

[1]:
filename = "truck_event_text_partition.csv"
url = "https://raw.githubusercontent.com/jdvelasq/datalabs/master/datasets/drivers/"

!curl --silent -o /tmp/{filename} {url + filename}

!ls -1 /tmp/*.csv
/tmp/drivers.csv
/tmp/timesheet.csv
/tmp/truck_event_text_partition.csv

Inspección de los datos#

[2]:
!head /tmp/{filename}
driverId,truckId,eventTime,eventType,longitude,latitude,eventKey,CorrelationId,driverName,routeId,routeName,eventDate
14,25,59:21.4,Normal,-94.58,37.03,14|25|9223370572464814373,3.66E+18,Adis Cesir,160405074,Joplin to Kansas City Route 2,2016-05-27-22
18,16,59:21.7,Normal,-89.66,39.78,18|16|9223370572464814089,3.66E+18,Grant Liu,1565885487,Springfield to KC Via Hanibal,2016-05-27-22
27,105,59:21.7,Normal,-90.21,38.65,27|105|9223370572464814070,3.66E+18,Mark Lochbihler,1325562373,Springfield to KC Via Columbia Route 2,2016-05-27-22
11,74,59:21.7,Normal,-90.2,38.65,11|74|9223370572464814123,3.66E+18,Jamie Engesser,1567254452,Saint Louis to Memphis Route2,2016-05-27-22
22,87,59:21.7,Normal,-90.04,35.19,22|87|9223370572464814101,3.66E+18,Nadeem Asghar,1198242881, Saint Louis to Chicago Route2,2016-05-27-22
22,87,59:22.3,Normal,-90.37,35.21,22|87|9223370572464813486,3.66E+18,Nadeem Asghar,1198242881, Saint Louis to Chicago Route2,2016-05-27-22
23,68,59:22.4,Normal,-89.91,40.86,23|68|9223370572464813450,3.66E+18,Adam Diaz,160405074,Joplin to Kansas City Route 2,2016-05-27-22
11,74,59:22.5,Normal,-89.74,39.1,11|74|9223370572464813355,3.66E+18,Jamie Engesser,1567254452,Saint Louis to Memphis Route2,2016-05-27-22
20,41,59:22.5,Normal,-93.36,41.69,20|41|9223370572464813344,3.66E+18,Chris Harris,160779139,Des Moines to Chicago Route 2,2016-05-27-22

Creación de la máquina#

[3]:
from sqlalchemy import create_engine

engine = create_engine(
    "sqlite+pysqlite:///:memory:",
    echo=False,
    future=True,
)

Creación de la tabla usando el ORM#

[4]:
from sqlalchemy import Column, Float, Integer, String
from sqlalchemy.orm import declarative_base

Base = declarative_base()


class Events(Base):
    __abstract__ = True
    id = Column(Integer, primary_key=True)
    driverId = Column(Integer)
    truckId = Column(Integer)
    eventTime = Column(String)
    eventType = Column(String)
    longitude = Column(Float)
    latitude = Column(Float)
    eventKey = Column(String)
    correlationId = Column(String)
    driverName = Column(String)
    routeId = Column(String)
    routeName = Column(String)
    eventDate = Column(String)

    def __repr__(self):
        return f"TruckEvent(driverId={self.driverId!r}, truckId={self.truckId!r}, eventTime={self.eventTime!r}, eventType={self.eventType!r}, routeId={self.routeId!r}"


class TruckEvents(Events):
    __tablename__ = "truck_eventts"


Base.metadata.create_all(engine)

Carga de datos a la BD#

[5]:
import csv

from sqlalchemy.orm import Session

with Session(engine) as session:

    with open("/tmp/truck_event_text_partition.csv") as csvfile:
        spamreader = csv.reader(csvfile, delimiter=",")
        next(csvfile)
        for row in spamreader:
            record = TruckEvents(
                driverId=row[0],
                truckId=row[1],
                eventTime=row[2],
                eventType=row[3],
                longitude=row[4],
                latitude=row[5],
                eventKey=row[6],
                correlationId=row[7],
                driverName=row[8],
                routeId=row[9],
                routeName=row[10],
                eventDate=row[11],
            )

            session.add(record)

    session.commit()

Consulta de datos#

[6]:
from sqlalchemy import select

session = Session(engine)

stmt = select(TruckEvents).limit(3)

for truckevent in session.scalars(stmt):
    print(truckevent)
TruckEvent(driverId=14, truckId=25, eventTime='59:21.4', eventType='Normal', routeId='160405074'
TruckEvent(driverId=18, truckId=16, eventTime='59:21.7', eventType='Normal', routeId='1565885487'
TruckEvent(driverId=27, truckId=105, eventTime='59:21.7', eventType='Normal', routeId='1325562373'

Obtención de un subconjunto de registros#

[7]:
class TruckEventsSubset(Events):
    __tablename__ = "truck_eventts_subset"
    __table_args__ = {"extend_existing": True}


with Session(engine) as session:
    session.execute(
        """
        CREATE TABLE truck_eventts_subset AS
        SELECT * FROM truck_eventts
        LIMIT 100
       """
    )

    session.commit()

    #
    # Chequeo
    #
    stmt = select(TruckEventsSubset).limit(3)
    for truckevent in session.scalars(stmt):
        print(truckevent)
TruckEvent(driverId=14, truckId=25, eventTime='59:21.4', eventType='Normal', routeId='160405074'
TruckEvent(driverId=18, truckId=16, eventTime='59:21.7', eventType='Normal', routeId='1565885487'
TruckEvent(driverId=27, truckId=105, eventTime='59:21.7', eventType='Normal', routeId='1325562373'

Obtención de un subconjunto de campos#

[8]:
# cree una tabla llamada specific_columns con las columnas driverId, eventTime y eventType de la tabla truck_events_subset


class SpecificColumns(Base):
    __tablename__ = "specific_columns"
    id = Column(Integer, primary_key=True)
    driverId = Column(Integer)
    eventTime = Column(String)
    eventType = Column(String)

    def __repr__(self):
        return f"SpecificColumns(driverId={self.driverId!r}, eventTime={self.eventTime!r}, eventType={self.eventType!r})"


with Session(engine) as session:
    session.execute(
        """
        CREATE TABLE specific_columns AS
        SELECT id, driverId, eventTime, eventType FROM truck_eventts_subset
       """
    )

    session.commit()

    #
    # Chequeo
    #
    stmt = select(SpecificColumns).limit(3)
    for record in session.scalars(stmt):
        print(record)
SpecificColumns(driverId=14, eventTime='59:21.4', eventType='Normal')
SpecificColumns(driverId=18, eventTime='59:21.7', eventType='Normal')
SpecificColumns(driverId=27, eventTime='59:21.7', eventType='Normal')
[9]:
#
# SELECT * FROM specific_columns LIMIT 2,3;
#
stmt = select(SpecificColumns).limit(3).offset(2)

with Session(engine) as session:
    for record in session.scalars(stmt):
        print(record)
SpecificColumns(driverId=27, eventTime='59:21.7', eventType='Normal')
SpecificColumns(driverId=11, eventTime='59:21.7', eventType='Normal')
SpecificColumns(driverId=22, eventTime='59:21.7', eventType='Normal')
[10]:
#
# SELECT * FROM specific_columns WHERE driverId = 11;
#
stmt = select(SpecificColumns).where(SpecificColumns.driverId == 11)

with Session(engine) as session:
    for record in session.scalars(stmt):
        print(record)
SpecificColumns(driverId=11, eventTime='59:21.7', eventType='Normal')
SpecificColumns(driverId=11, eventTime='59:22.5', eventType='Normal')
SpecificColumns(driverId=11, eventTime='59:28.3', eventType='Normal')
SpecificColumns(driverId=11, eventTime='59:30.0', eventType='Normal')
SpecificColumns(driverId=11, eventTime='59:31.8', eventType='Normal')

Escritura de la tabla en el disco#

[11]:
#
# Escriba el contenido de la tabla truck_eevents_subset en un archivo llamado truck_events_subset.csv
#

import csv

with Session(engine) as session:
    stmt = select(TruckEventsSubset)
    with open("/tmp/truck_events_subset.csv", "w") as csvfile:
        spamwriter = csv.writer(csvfile, delimiter=",")
        spamwriter.writerow(TruckEventsSubset.__table__.columns.keys())
        for record in session.scalars(stmt):
            spamwriter.writerow(
                [
                    getattr(record, column.name)
                    for column in TruckEventsSubset.__table__.columns
                ]
            )
[12]:
!head /tmp/truck_events_subset.csv
id,driverId,truckId,eventTime,eventType,longitude,latitude,eventKey,correlationId,driverName,routeId,routeName,eventDate
1,14,25,59:21.4,Normal,-94.58,37.03,14|25|9223370572464814373,3.66E+18,Adis Cesir,160405074,Joplin to Kansas City Route 2,2016-05-27-22
2,18,16,59:21.7,Normal,-89.66,39.78,18|16|9223370572464814089,3.66E+18,Grant Liu,1565885487,Springfield to KC Via Hanibal,2016-05-27-22
3,27,105,59:21.7,Normal,-90.21,38.65,27|105|9223370572464814070,3.66E+18,Mark Lochbihler,1325562373,Springfield to KC Via Columbia Route 2,2016-05-27-22
4,11,74,59:21.7,Normal,-90.2,38.65,11|74|9223370572464814123,3.66E+18,Jamie Engesser,1567254452,Saint Louis to Memphis Route2,2016-05-27-22
5,22,87,59:21.7,Normal,-90.04,35.19,22|87|9223370572464814101,3.66E+18,Nadeem Asghar,1198242881, Saint Louis to Chicago Route2,2016-05-27-22
6,22,87,59:22.3,Normal,-90.37,35.21,22|87|9223370572464813486,3.66E+18,Nadeem Asghar,1198242881, Saint Louis to Chicago Route2,2016-05-27-22
7,23,68,59:22.4,Normal,-89.91,40.86,23|68|9223370572464813450,3.66E+18,Adam Diaz,160405074,Joplin to Kansas City Route 2,2016-05-27-22
8,11,74,59:22.5,Normal,-89.74,39.1,11|74|9223370572464813355,3.66E+18,Jamie Engesser,1567254452,Saint Louis to Memphis Route2,2016-05-27-22
9,20,41,59:22.5,Normal,-93.36,41.69,20|41|9223370572464813344,3.66E+18,Chris Harris,160779139,Des Moines to Chicago Route 2,2016-05-27-22