Parametrización (1)#

  • Última modificación: Mayo 14, 2022

[3]:
%cd /tmp
!mkdir -p input/
/tmp
[4]:
%%writefile input/sales1.csv
Fance,May,100
NA
Germany,May,200
Writing input/sales1.csv
[5]:
%%writefile input/sales2.csv
Fance,June,140
Germany,June,180
UK,June,180
Writing input/sales2.csv
[6]:
%%writefile input/sales3.csv
France,June,240
None
Germany,June,180
Writing input/sales3.csv
[9]:
%%writefile pipeline.py

import os
import luigi
from luigi import LocalTarget, Task, Parameter


INPUT_FOLDER = 'input'
OUTPUT_FOLDER = 'output'

class DownloadFile(Task):

    file_name = Parameter()

    def output(self):
        output_path = os.path.join(OUTPUT_FOLDER, self.file_name)
        return LocalTarget(output_path)

    def run(self):
        input_path = os.path.join(INPUT_FOLDER, self.file_name)
        with open(input_path) as in_file:
            with self.output().open('w') as out_file:
                for line in in_file:
                    if ',' in line:
                        out_file.write(line)


class DownloadSalesData(Task):
    def output(self):
        return LocalTarget('all_sales.csv')

    def run(self):
        processed_files = []
        for file in os.listdir(INPUT_FOLDER):
            target = yield DownloadFile(file)
            processed_files.append(target)

        with self.output().open('w') as out_file:
            for file in processed_files:
                with file.open() as in_file:
                    for line in in_file:
                        out_file.write(line)


if __name__ == "__main__":
    luigi.run(["DownloadSalesData", "--local-scheduler"])
Overwriting pipeline.py
[10]:
!python3 pipeline.py
DEBUG: Checking if DownloadSalesData() is complete
INFO: Informed scheduler that task   DownloadSalesData__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 235] Worker Worker(salt=659667897, workers=1, host=b148b7d2dd75, username=root, pid=235) running   DownloadSalesData()
INFO: [pid 235] Worker Worker(salt=659667897, workers=1, host=b148b7d2dd75, username=root, pid=235) new requirements      DownloadSalesData()
DEBUG: 1 running tasks, waiting for next task to finish
DEBUG: Checking if DownloadFile(file_name=sales3.csv) is complete
INFO: Informed scheduler that task   DownloadFile_sales3_csv_3fb93ee78b   has status   PENDING
INFO: Informed scheduler that task   DownloadSalesData__99914b932b   has status   PENDING
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 235] Worker Worker(salt=659667897, workers=1, host=b148b7d2dd75, username=root, pid=235) running   DownloadFile(file_name=sales3.csv)
INFO: [pid 235] Worker Worker(salt=659667897, workers=1, host=b148b7d2dd75, username=root, pid=235) done      DownloadFile(file_name=sales3.csv)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   DownloadFile_sales3_csv_3fb93ee78b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 235] Worker Worker(salt=659667897, workers=1, host=b148b7d2dd75, username=root, pid=235) running   DownloadSalesData()
INFO: [pid 235] Worker Worker(salt=659667897, workers=1, host=b148b7d2dd75, username=root, pid=235) new requirements      DownloadSalesData()
DEBUG: 1 running tasks, waiting for next task to finish
DEBUG: Checking if DownloadFile(file_name=sales2.csv) is complete
INFO: Informed scheduler that task   DownloadFile_sales2_csv_b92d8cd1c2   has status   PENDING
INFO: Informed scheduler that task   DownloadSalesData__99914b932b   has status   PENDING
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 235] Worker Worker(salt=659667897, workers=1, host=b148b7d2dd75, username=root, pid=235) running   DownloadFile(file_name=sales2.csv)
INFO: [pid 235] Worker Worker(salt=659667897, workers=1, host=b148b7d2dd75, username=root, pid=235) done      DownloadFile(file_name=sales2.csv)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   DownloadFile_sales2_csv_b92d8cd1c2   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 235] Worker Worker(salt=659667897, workers=1, host=b148b7d2dd75, username=root, pid=235) running   DownloadSalesData()
INFO: [pid 235] Worker Worker(salt=659667897, workers=1, host=b148b7d2dd75, username=root, pid=235) new requirements      DownloadSalesData()
DEBUG: 1 running tasks, waiting for next task to finish
DEBUG: Checking if DownloadFile(file_name=sales1.csv) is complete
INFO: Informed scheduler that task   DownloadFile_sales1_csv_955381c3e3   has status   PENDING
INFO: Informed scheduler that task   DownloadSalesData__99914b932b   has status   PENDING
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 235] Worker Worker(salt=659667897, workers=1, host=b148b7d2dd75, username=root, pid=235) running   DownloadFile(file_name=sales1.csv)
INFO: [pid 235] Worker Worker(salt=659667897, workers=1, host=b148b7d2dd75, username=root, pid=235) done      DownloadFile(file_name=sales1.csv)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   DownloadFile_sales1_csv_955381c3e3   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 235] Worker Worker(salt=659667897, workers=1, host=b148b7d2dd75, username=root, pid=235) running   DownloadSalesData()
INFO: [pid 235] Worker Worker(salt=659667897, workers=1, host=b148b7d2dd75, username=root, pid=235) done      DownloadSalesData()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   DownloadSalesData__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=659667897, workers=1, host=b148b7d2dd75, username=root, pid=235) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====

Scheduled 4 tasks of which:
* 4 ran successfully:
    - 3 DownloadFile(file_name=sales1.csv,sales2.csv,sales3.csv)
    - 1 DownloadSalesData()

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

[12]:
!cat all_sales.csv
France,June,240
Germany,June,180
Fance,June,140
Germany,June,180
UK,June,180
Fance,May,100
Germany,May,200