Tareas secuenciales#

  • Última modificación: Mayo 14, 2022

Tareas a ejecutar#

  • ProcessOrders: Genera el archivo CSV con el total de ventas ($) por mes.

  • GenerateReport: Cómputa las ventas totales por mes.

  • SummarizeReport: Calcula la suma de las ventas por mes.

Carpeta de trabajo#

[1]:
%cd /tmp
/tmp

ProcessOrders()#

[2]:
%%writefile /tmp/pipeline.py

import luigi
from luigi import Task, LocalTarget


class ProcessOrders(Task):
    def output(self):
        return LocalTarget('orders.csv')

    def run(self):
        with self.output().open('w') as file:
            print('May,100', file=file)
            print('May,100', file=file)
            print('Jun,200', file=file)
            print('Jun,150', file=file)

if __name__ == '__main__':
    luigi.run(["ProcessOrders", "--local-scheduler"])
Overwriting /tmp/pipeline.py
[3]:
!python3 /tmp/pipeline.py
DEBUG: Checking if ProcessOrders() is complete
INFO: Informed scheduler that task   ProcessOrders__99914b932b   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=378523441, workers=1, host=e3fdad83036a, username=root, pid=661) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 complete ones were encountered:
    - 1 ProcessOrders()

Did not run any tasks
This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

[4]:
!cat orders.csv
May,100
May,100
Jun,200
Jun,150

GenerateReport()#

[5]:
%%writefile /tmp/pipeline.py

import luigi
from luigi import Task, LocalTarget


class ProcessOrders(Task):
    def output(self):
        return LocalTarget('orders.csv')

    def run(self):
        with self.output().open('w') as file:
            print('May,100', file=file)
            print('May,100', file=file)
            print('Jun,200', file=file)
            print('Jun,150', file=file)

class GenerateReport(Task):
    def requires(self):
        #
        # Dependencia de la tarea anterior
        #
        return ProcessOrders()


    def output(self):
        return LocalTarget('report.csv')

    def run(self):
        report = {}
        for line in self.input().open():
            month, amount=line.split(',')
            if month in report:
                report[month] += float(amount)
            else:
                report[month] = float(amount)

        with self.output().open('w') as file:
            for month in report:
                print(month+',' + str(report[month]), file=file)



if __name__ == '__main__':
    luigi.run(["GenerateReport", "--local-scheduler"])
Overwriting /tmp/pipeline.py
[6]:
!python3 pipeline.py
DEBUG: Checking if GenerateReport() is complete
INFO: Informed scheduler that task   GenerateReport__99914b932b   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=108316347, workers=1, host=e3fdad83036a, username=root, pid=667) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 complete ones were encountered:
    - 1 GenerateReport()

Did not run any tasks
This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

[7]:
!cat report.csv
May,200.0
Jun,350.0

SummarizeReport()#

[8]:
%%writefile pipeline.py

import luigi
from luigi import Task, LocalTarget


class ProcessOrders(Task):
    def output(self):
        return LocalTarget('orders.csv')

    def run(self):
        with self.output().open('w') as file:
            print('May,100', file=file)
            print('May,100', file=file)
            print('Jun,200', file=file)
            print('Jun,150', file=file)

class GenerateReport(Task):
    def requires(self):
        #
        # Dependencia
        #
        return ProcessOrders()


    def output(self):
        return LocalTarget('report.csv')

    def run(self):
        report = {}
        for line in self.input().open():
            month, amount=line.split(',')
            if month in report:
                report[month] += float(amount)
            else:
                report[month] = float(amount)

        with self.output().open('w') as file:
            for month in report:
                print(month+',' + str(report[month]), file=file)


class SummarizeReport(Task):
    def requires(self):
        #
        # Dependencia
        #
        return GenerateReport()

    def output(self):
        return LocalTarget('summary.txt')

    def run(self):
        total = 0.0
        for line in self.input().open():
            month, amount = line.split(',')
            total += float(amount)

        with self.output().open('w') as file:
            file.write(str(total))


if __name__ == '__main__':
    luigi.run(["SummarizeReport", "--local-scheduler"])

Overwriting pipeline.py

Luigi GUI#

[9]:
!python3 pipeline.py
DEBUG: Checking if SummarizeReport() is complete
INFO: Informed scheduler that task   SummarizeReport__99914b932b   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=084282489, workers=1, host=e3fdad83036a, username=root, pid=673) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 complete ones were encountered:
    - 1 SummarizeReport()

Did not run any tasks
This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

[10]:
!cat summary.txt
550.0

Luigi GUI help#

[11]:
!luigid --help
usage: luigid [-h] [--background] [--pidfile PIDFILE] [--logdir LOGDIR]
              [--state-path STATE_PATH] [--address ADDRESS]
              [--unix-socket UNIX_SOCKET] [--port PORT]

Central luigi server

optional arguments:
  -h, --help            show this help message and exit
  --background          Run in background mode
  --pidfile PIDFILE     Write pidfile
  --logdir LOGDIR       log directory
  --state-path STATE_PATH
                        Pickled state file
  --address ADDRESS     Listening interface
  --unix-socket UNIX_SOCKET
                        Unix socket path
  --port PORT           Listening port

Ejecución y reporte desde la GUI#

Invocación de la GUI desde el prompt

$ luigid

Para ver la interfaz use:

http://localhost:8082

o

http://127.0.0.1:8082

Modificación de luigi.Run()

[12]:
%%writefile pipeline.py

import luigi
from luigi import Task, LocalTarget


class ProcessOrders(Task):
    def output(self):
        return LocalTarget('orders.csv')

    def run(self):
        with self.output().open('w') as file:
            print('May,100', file=file)
            print('May,100', file=file)
            print('Jun,200', file=file)
            print('Jun,150', file=file)

class GenerateReport(Task):
    def requires(self):
        #
        # Dependencia
        #
        return ProcessOrders()


    def output(self):
        return LocalTarget('report.csv')

    def run(self):
        report = {}
        for line in self.input().open():
            month, amount=line.split(',')
            if month in report:
                report[month] += float(amount)
            else:
                report[month] = float(amount)

        with self.output().open('w') as file:
            for month in report:
                print(month+',' + str(report[month]), file=file)


class SummarizeReport(Task):
    def requires(self):
        #
        # Dependencia
        #
        return GenerateReport()

    def output(self):
        return LocalTarget('summary.txt')

    def run(self):
        total = 0.0
        for line in self.input().open():
            month, amount = line.split(',')
            total += float(amount)

        with self.output().open('w') as file:
            file.write(str(total))


if __name__ == '__main__':
    #
    # Cambia aca:
    #
    # luigi.run(["SummarizeReport", "--local-scheduler"])
    luigi.run(["SummarizeReport"])

Overwriting pipeline.py

Ejecución del pipeline

[13]:
!rm summary.txt report.csv orders.csv
!python3 pipeline.py
DEBUG: Checking if SummarizeReport() is complete
DEBUG: Checking if GenerateReport() is complete
INFO: Informed scheduler that task   SummarizeReport__99914b932b   has status   PENDING
DEBUG: Checking if ProcessOrders() is complete
INFO: Informed scheduler that task   GenerateReport__99914b932b   has status   PENDING
INFO: Informed scheduler that task   ProcessOrders__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 3
INFO: [pid 684] Worker Worker(salt=656333087, workers=1, host=e3fdad83036a, username=root, pid=684) running   ProcessOrders()
INFO: [pid 684] Worker Worker(salt=656333087, workers=1, host=e3fdad83036a, username=root, pid=684) done      ProcessOrders()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   ProcessOrders__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 684] Worker Worker(salt=656333087, workers=1, host=e3fdad83036a, username=root, pid=684) running   GenerateReport()
INFO: [pid 684] Worker Worker(salt=656333087, workers=1, host=e3fdad83036a, username=root, pid=684) done      GenerateReport()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   GenerateReport__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 684] Worker Worker(salt=656333087, workers=1, host=e3fdad83036a, username=root, pid=684) running   SummarizeReport()
INFO: [pid 684] Worker Worker(salt=656333087, workers=1, host=e3fdad83036a, username=root, pid=684) done      SummarizeReport()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   SummarizeReport__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=656333087, workers=1, host=e3fdad83036a, username=root, pid=684) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====

Scheduled 3 tasks of which:
* 3 ran successfully:
    - 1 GenerateReport()
    - 1 ProcessOrders()
    - 1 SummarizeReport()

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

GUI

assets/luigi_gui_1_01-0.png

assets/luigi_gui_1_01-1.png