Tareas en paralelo#

  • Última modificación: Mayo 14, 2022

[1]:
%cd /tmp
/tmp
[2]:
!rm france.csv germany.csv
rm: cannot remove 'france.csv': No such file or directory
rm: cannot remove 'germany.csv': No such file or directory
[3]:
%%writefile pipeline.py

import sqlalchemy

import luigi
import luigi.contrib.sqla
from luigi import LocalTarget, Task


class DownloadFranceSales(Task):
    def output(self):
        return LocalTarget("france.csv")

    def run(self):
        with self.output().open("w") as file:
            print("May,100", file=file)
            print("May,200", file=file)
            print("Jun,140", file=file)
            print("Jun,150", file=file)


class DownloadGermanySales(Task):
    def output(self):
        return LocalTarget("germany.csv")

    def run(self):
        with self.output().open("w") as file:
            print("May,120", file=file)
            print("May,210", file=file)
            print("Jun,140", file=file)
            print("Jun,150", file=file)


class CreateDatabase(luigi.contrib.sqla.CopyToTable):

    columns = [
        (["month", sqlalchemy.String(64)], {}),
        (["amount", sqlalchemy.Float], {}),
    ]
    connection_string = "sqlite://///tmp/demo.db"  # in memory SQLite database
    table = "sales"  # name of the table to store data
    column_separator = ','

    def requires(self):
        return [
            DownloadFranceSales(),
            DownloadGermanySales(),
        ]

    def rows(self):
        with self.input()[0].open() as f:
            for line in f:
                yield line.split(self.column_separator)

        with self.input()[1].open() as f:
            for line in f:
                yield line.split(self.column_separator)


if __name__ == "__main__":
    luigi.run(["CreateDatabase", "--local-scheduler"])
Overwriting pipeline.py

https://luigi.readthedocs.io/en/stable/api/luigi.contrib.sqla.html

[4]:
!python3 pipeline.py
DEBUG: Checking if CreateDatabase() is complete
DEBUG: Checking if DownloadFranceSales() is complete
DEBUG: Checking if DownloadGermanySales() is complete
INFO: Informed scheduler that task   CreateDatabase__99914b932b   has status   PENDING
INFO: Informed scheduler that task   DownloadGermanySales__99914b932b   has status   PENDING
INFO: Informed scheduler that task   DownloadFranceSales__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 3
INFO: [pid 112] Worker Worker(salt=940235631, workers=1, host=b148b7d2dd75, username=root, pid=112) running   DownloadFranceSales()
INFO: [pid 112] Worker Worker(salt=940235631, workers=1, host=b148b7d2dd75, username=root, pid=112) done      DownloadFranceSales()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   DownloadFranceSales__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 112] Worker Worker(salt=940235631, workers=1, host=b148b7d2dd75, username=root, pid=112) running   DownloadGermanySales()
INFO: [pid 112] Worker Worker(salt=940235631, workers=1, host=b148b7d2dd75, username=root, pid=112) done      DownloadGermanySales()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   DownloadGermanySales__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 112] Worker Worker(salt=940235631, workers=1, host=b148b7d2dd75, username=root, pid=112) running   CreateDatabase()
INFO: Running task copy to table for update id CreateDatabase__99914b932b for table sales
INFO: Finished inserting 0 rows into SQLAlchemy target
INFO: Finished inserting rows into SQLAlchemy target
INFO: [pid 112] Worker Worker(salt=940235631, workers=1, host=b148b7d2dd75, username=root, pid=112) done      CreateDatabase()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   CreateDatabase__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=940235631, workers=1, host=b148b7d2dd75, username=root, pid=112) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====

Scheduled 3 tasks of which:
* 3 ran successfully:
    - 1 CreateDatabase()
    - 1 DownloadFranceSales()
    - 1 DownloadGermanySales()

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

Consulta de la tabla desde la línea de comandos#

$  squlite3 demo.db

assets/luigi_gui_1_02-0.png