Pipeline con muchas tareas (WrapperTask)#

  • Última modificación: Mayo 14, 2022

[1]:
%cd /tmp
/tmp
[2]:
%%writefile pipeline.py


import luigi
from luigi import LocalTarget, Task, WrapperTask


class DownloadFranceSales(Task):
    def output(self):
        return LocalTarget("france.csv")

    def run(self):
        with self.output().open("w") as file:
            print("May,100", file=file)
            print("May,200", file=file)
            print("Jun,140", file=file)
            print("Jun,150", file=file)


class DownloadGermanySales(Task):
    def output(self):
        return LocalTarget("germany.csv")

    def run(self):
        with self.output().open("w") as file:
            print("May,120", file=file)
            print("May,210", file=file)
            print("Jun,140", file=file)
            print("Jun,150", file=file)


class SummarizeFranceSales(Task):
    def requires(self):
        return DownloadFranceSales()

    def output(self):
        return LocalTarget("summary_france_sales.csv")

    def run(self):
        total = 0
        with self.input().open() as in_file:
            for line in in_file:
                value = line.split(",")[1]
                total += float(value)

        with self.output().open("w") as out_file:
            out_file.write(str(total))


class SummarizeGermanySales(Task):
    def requires(self):
        return DownloadGermanySales()

    def output(self):
        return LocalTarget("summary_germany_sales.csv")

    def run(self):
        total = 0
        with self.input().open() as in_file:
            for line in in_file:
                value = line.split(",")[1]
                total += float(value)

        with self.output().open("w") as out_file:
            out_file.write(str(total))


class Final(WrapperTask):
    def requires(self):
        return [
            SummarizeFranceSales(),
            SummarizeGermanySales(),
        ]


if __name__ == "__main__":
    luigi.run(["Final", "--local-scheduler"])
Overwriting pipeline.py
[3]:
!python3 pipeline.py
DEBUG: Checking if Final() is complete
DEBUG: Checking if SummarizeFranceSales() is complete
DEBUG: Checking if SummarizeGermanySales() is complete
INFO: Informed scheduler that task   Final__99914b932b   has status   PENDING
DEBUG: Checking if DownloadGermanySales() is complete
INFO: Informed scheduler that task   SummarizeGermanySales__99914b932b   has status   PENDING
INFO: Informed scheduler that task   DownloadGermanySales__99914b932b   has status   DONE
DEBUG: Checking if DownloadFranceSales() is complete
INFO: Informed scheduler that task   SummarizeFranceSales__99914b932b   has status   PENDING
INFO: Informed scheduler that task   DownloadFranceSales__99914b932b   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 3
INFO: [pid 200] Worker Worker(salt=131464293, workers=1, host=b148b7d2dd75, username=root, pid=200) running   SummarizeFranceSales()
INFO: [pid 200] Worker Worker(salt=131464293, workers=1, host=b148b7d2dd75, username=root, pid=200) done      SummarizeFranceSales()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   SummarizeFranceSales__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 200] Worker Worker(salt=131464293, workers=1, host=b148b7d2dd75, username=root, pid=200) running   SummarizeGermanySales()
INFO: [pid 200] Worker Worker(salt=131464293, workers=1, host=b148b7d2dd75, username=root, pid=200) done      SummarizeGermanySales()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   SummarizeGermanySales__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 200] Worker Worker(salt=131464293, workers=1, host=b148b7d2dd75, username=root, pid=200) running   Final()
INFO: [pid 200] Worker Worker(salt=131464293, workers=1, host=b148b7d2dd75, username=root, pid=200) done      Final()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Final__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=131464293, workers=1, host=b148b7d2dd75, username=root, pid=200) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====

Scheduled 5 tasks of which:
* 2 complete ones were encountered:
    - 1 DownloadFranceSales()
    - 1 DownloadGermanySales()
* 3 ran successfully:
    - 1 Final()
    - 1 SummarizeFranceSales()
    - 1 SummarizeGermanySales()

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

[5]:
!ls -1 *.csv
france.csv
germany.csv
summary_france_sales.csv
summary_germany_sales.csv
[4]:
!cat summary_france_sales.csv
590.0
[4]:
!cat summary_germany_sales.csv
590.0