Pipeline con muchas tareas (WrapperTask)#
Última modificación: Mayo 14, 2022
[1]:
%cd /tmp
/tmp
[2]:
%%writefile pipeline.py
import luigi
from luigi import LocalTarget, Task, WrapperTask
class DownloadFranceSales(Task):
def output(self):
return LocalTarget("france.csv")
def run(self):
with self.output().open("w") as file:
print("May,100", file=file)
print("May,200", file=file)
print("Jun,140", file=file)
print("Jun,150", file=file)
class DownloadGermanySales(Task):
def output(self):
return LocalTarget("germany.csv")
def run(self):
with self.output().open("w") as file:
print("May,120", file=file)
print("May,210", file=file)
print("Jun,140", file=file)
print("Jun,150", file=file)
class SummarizeFranceSales(Task):
def requires(self):
return DownloadFranceSales()
def output(self):
return LocalTarget("summary_france_sales.csv")
def run(self):
total = 0
with self.input().open() as in_file:
for line in in_file:
value = line.split(",")[1]
total += float(value)
with self.output().open("w") as out_file:
out_file.write(str(total))
class SummarizeGermanySales(Task):
def requires(self):
return DownloadGermanySales()
def output(self):
return LocalTarget("summary_germany_sales.csv")
def run(self):
total = 0
with self.input().open() as in_file:
for line in in_file:
value = line.split(",")[1]
total += float(value)
with self.output().open("w") as out_file:
out_file.write(str(total))
class Final(WrapperTask):
def requires(self):
return [
SummarizeFranceSales(),
SummarizeGermanySales(),
]
if __name__ == "__main__":
luigi.run(["Final", "--local-scheduler"])
Overwriting pipeline.py
[3]:
!python3 pipeline.py
DEBUG: Checking if Final() is complete
DEBUG: Checking if SummarizeFranceSales() is complete
DEBUG: Checking if SummarizeGermanySales() is complete
INFO: Informed scheduler that task Final__99914b932b has status PENDING
DEBUG: Checking if DownloadGermanySales() is complete
INFO: Informed scheduler that task SummarizeGermanySales__99914b932b has status PENDING
INFO: Informed scheduler that task DownloadGermanySales__99914b932b has status DONE
DEBUG: Checking if DownloadFranceSales() is complete
INFO: Informed scheduler that task SummarizeFranceSales__99914b932b has status PENDING
INFO: Informed scheduler that task DownloadFranceSales__99914b932b has status DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 3
INFO: [pid 200] Worker Worker(salt=131464293, workers=1, host=b148b7d2dd75, username=root, pid=200) running SummarizeFranceSales()
INFO: [pid 200] Worker Worker(salt=131464293, workers=1, host=b148b7d2dd75, username=root, pid=200) done SummarizeFranceSales()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task SummarizeFranceSales__99914b932b has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 200] Worker Worker(salt=131464293, workers=1, host=b148b7d2dd75, username=root, pid=200) running SummarizeGermanySales()
INFO: [pid 200] Worker Worker(salt=131464293, workers=1, host=b148b7d2dd75, username=root, pid=200) done SummarizeGermanySales()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task SummarizeGermanySales__99914b932b has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 200] Worker Worker(salt=131464293, workers=1, host=b148b7d2dd75, username=root, pid=200) running Final()
INFO: [pid 200] Worker Worker(salt=131464293, workers=1, host=b148b7d2dd75, username=root, pid=200) done Final()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task Final__99914b932b has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=131464293, workers=1, host=b148b7d2dd75, username=root, pid=200) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 5 tasks of which:
* 2 complete ones were encountered:
- 1 DownloadFranceSales()
- 1 DownloadGermanySales()
* 3 ran successfully:
- 1 Final()
- 1 SummarizeFranceSales()
- 1 SummarizeGermanySales()
This progress looks :) because there were no failed tasks or missing dependencies
===== Luigi Execution Summary =====
[5]:
!ls -1 *.csv
france.csv
germany.csv
summary_france_sales.csv
summary_germany_sales.csv
[4]:
!cat summary_france_sales.csv
590.0
[4]:
!cat summary_germany_sales.csv
590.0