Tareas secuenciales#
Última modificación: Mayo 14, 2022
Tareas a ejecutar#
ProcessOrders: Genera el archivo CSV con el total de ventas (
$
) por mes.GenerateReport: Cómputa las ventas totales por mes.
SummarizeReport: Calcula la suma de las ventas por mes.
Carpeta de trabajo#
[1]:
%cd /tmp
/tmp
ProcessOrders()#
[2]:
%%writefile /tmp/pipeline.py
import luigi
from luigi import Task, LocalTarget
class ProcessOrders(Task):
def output(self):
return LocalTarget('orders.csv')
def run(self):
with self.output().open('w') as file:
print('May,100', file=file)
print('May,100', file=file)
print('Jun,200', file=file)
print('Jun,150', file=file)
if __name__ == '__main__':
luigi.run(["ProcessOrders", "--local-scheduler"])
Overwriting /tmp/pipeline.py
[3]:
!python3 /tmp/pipeline.py
DEBUG: Checking if ProcessOrders() is complete
INFO: Informed scheduler that task ProcessOrders__99914b932b has status DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=378523441, workers=1, host=e3fdad83036a, username=root, pid=661) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 1 tasks of which:
* 1 complete ones were encountered:
- 1 ProcessOrders()
Did not run any tasks
This progress looks :) because there were no failed tasks or missing dependencies
===== Luigi Execution Summary =====
[4]:
!cat orders.csv
May,100
May,100
Jun,200
Jun,150
GenerateReport()#
[5]:
%%writefile /tmp/pipeline.py
import luigi
from luigi import Task, LocalTarget
class ProcessOrders(Task):
def output(self):
return LocalTarget('orders.csv')
def run(self):
with self.output().open('w') as file:
print('May,100', file=file)
print('May,100', file=file)
print('Jun,200', file=file)
print('Jun,150', file=file)
class GenerateReport(Task):
def requires(self):
#
# Dependencia de la tarea anterior
#
return ProcessOrders()
def output(self):
return LocalTarget('report.csv')
def run(self):
report = {}
for line in self.input().open():
month, amount=line.split(',')
if month in report:
report[month] += float(amount)
else:
report[month] = float(amount)
with self.output().open('w') as file:
for month in report:
print(month+',' + str(report[month]), file=file)
if __name__ == '__main__':
luigi.run(["GenerateReport", "--local-scheduler"])
Overwriting /tmp/pipeline.py
[6]:
!python3 pipeline.py
DEBUG: Checking if GenerateReport() is complete
INFO: Informed scheduler that task GenerateReport__99914b932b has status DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=108316347, workers=1, host=e3fdad83036a, username=root, pid=667) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 1 tasks of which:
* 1 complete ones were encountered:
- 1 GenerateReport()
Did not run any tasks
This progress looks :) because there were no failed tasks or missing dependencies
===== Luigi Execution Summary =====
[7]:
!cat report.csv
May,200.0
Jun,350.0
SummarizeReport()#
[8]:
%%writefile pipeline.py
import luigi
from luigi import Task, LocalTarget
class ProcessOrders(Task):
def output(self):
return LocalTarget('orders.csv')
def run(self):
with self.output().open('w') as file:
print('May,100', file=file)
print('May,100', file=file)
print('Jun,200', file=file)
print('Jun,150', file=file)
class GenerateReport(Task):
def requires(self):
#
# Dependencia
#
return ProcessOrders()
def output(self):
return LocalTarget('report.csv')
def run(self):
report = {}
for line in self.input().open():
month, amount=line.split(',')
if month in report:
report[month] += float(amount)
else:
report[month] = float(amount)
with self.output().open('w') as file:
for month in report:
print(month+',' + str(report[month]), file=file)
class SummarizeReport(Task):
def requires(self):
#
# Dependencia
#
return GenerateReport()
def output(self):
return LocalTarget('summary.txt')
def run(self):
total = 0.0
for line in self.input().open():
month, amount = line.split(',')
total += float(amount)
with self.output().open('w') as file:
file.write(str(total))
if __name__ == '__main__':
luigi.run(["SummarizeReport", "--local-scheduler"])
Overwriting pipeline.py
Luigi GUI#
[9]:
!python3 pipeline.py
DEBUG: Checking if SummarizeReport() is complete
INFO: Informed scheduler that task SummarizeReport__99914b932b has status DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=084282489, workers=1, host=e3fdad83036a, username=root, pid=673) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 1 tasks of which:
* 1 complete ones were encountered:
- 1 SummarizeReport()
Did not run any tasks
This progress looks :) because there were no failed tasks or missing dependencies
===== Luigi Execution Summary =====
[10]:
!cat summary.txt
550.0
Luigi GUI help#
[11]:
!luigid --help
usage: luigid [-h] [--background] [--pidfile PIDFILE] [--logdir LOGDIR]
[--state-path STATE_PATH] [--address ADDRESS]
[--unix-socket UNIX_SOCKET] [--port PORT]
Central luigi server
optional arguments:
-h, --help show this help message and exit
--background Run in background mode
--pidfile PIDFILE Write pidfile
--logdir LOGDIR log directory
--state-path STATE_PATH
Pickled state file
--address ADDRESS Listening interface
--unix-socket UNIX_SOCKET
Unix socket path
--port PORT Listening port
Ejecución y reporte desde la GUI#
Invocación de la GUI desde el prompt
$ luigid
Para ver la interfaz use:
o
Modificación de luigi.Run()
[12]:
%%writefile pipeline.py
import luigi
from luigi import Task, LocalTarget
class ProcessOrders(Task):
def output(self):
return LocalTarget('orders.csv')
def run(self):
with self.output().open('w') as file:
print('May,100', file=file)
print('May,100', file=file)
print('Jun,200', file=file)
print('Jun,150', file=file)
class GenerateReport(Task):
def requires(self):
#
# Dependencia
#
return ProcessOrders()
def output(self):
return LocalTarget('report.csv')
def run(self):
report = {}
for line in self.input().open():
month, amount=line.split(',')
if month in report:
report[month] += float(amount)
else:
report[month] = float(amount)
with self.output().open('w') as file:
for month in report:
print(month+',' + str(report[month]), file=file)
class SummarizeReport(Task):
def requires(self):
#
# Dependencia
#
return GenerateReport()
def output(self):
return LocalTarget('summary.txt')
def run(self):
total = 0.0
for line in self.input().open():
month, amount = line.split(',')
total += float(amount)
with self.output().open('w') as file:
file.write(str(total))
if __name__ == '__main__':
#
# Cambia aca:
#
# luigi.run(["SummarizeReport", "--local-scheduler"])
luigi.run(["SummarizeReport"])
Overwriting pipeline.py
Ejecución del pipeline
[13]:
!rm summary.txt report.csv orders.csv
!python3 pipeline.py
DEBUG: Checking if SummarizeReport() is complete
DEBUG: Checking if GenerateReport() is complete
INFO: Informed scheduler that task SummarizeReport__99914b932b has status PENDING
DEBUG: Checking if ProcessOrders() is complete
INFO: Informed scheduler that task GenerateReport__99914b932b has status PENDING
INFO: Informed scheduler that task ProcessOrders__99914b932b has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 3
INFO: [pid 684] Worker Worker(salt=656333087, workers=1, host=e3fdad83036a, username=root, pid=684) running ProcessOrders()
INFO: [pid 684] Worker Worker(salt=656333087, workers=1, host=e3fdad83036a, username=root, pid=684) done ProcessOrders()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task ProcessOrders__99914b932b has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 684] Worker Worker(salt=656333087, workers=1, host=e3fdad83036a, username=root, pid=684) running GenerateReport()
INFO: [pid 684] Worker Worker(salt=656333087, workers=1, host=e3fdad83036a, username=root, pid=684) done GenerateReport()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task GenerateReport__99914b932b has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 684] Worker Worker(salt=656333087, workers=1, host=e3fdad83036a, username=root, pid=684) running SummarizeReport()
INFO: [pid 684] Worker Worker(salt=656333087, workers=1, host=e3fdad83036a, username=root, pid=684) done SummarizeReport()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task SummarizeReport__99914b932b has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=656333087, workers=1, host=e3fdad83036a, username=root, pid=684) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 3 tasks of which:
* 3 ran successfully:
- 1 GenerateReport()
- 1 ProcessOrders()
- 1 SummarizeReport()
This progress looks :) because there were no failed tasks or missing dependencies
===== Luigi Execution Summary =====
GUI