Configuration files (revisar)#
Última modificación: Mayo 14, 2022
[1]:
%cd /tmp
!rm -rf output/ all_sales.csv
!mkdir -p input/
/tmp
[2]:
%%writefile /tmp/luigi.cfg
[DownloadSalesData]
params={"input_folder": "input", "output": "all_sales.csv"}
Overwriting /tmp/luigi.cfg
[3]:
%%writefile input/sales1.csv
Fance,May,100
NA
Germany,May,200
Overwriting input/sales1.csv
[4]:
%%writefile input/sales2.csv
Fance,June,140
Germany,June,180
UK,June,180
Overwriting input/sales2.csv
[5]:
%%writefile input/sales3.csv
France,June,240
None
Germany,June,180
Overwriting input/sales3.csv
[6]:
%%writefile __init__.py
#
Overwriting __init__.py
[7]:
%%writefile pipeline.py
import os
import luigi
from luigi import IntParameter, LocalTarget, Parameter, Task, DictParameter
OUTPUT_FOLDER = "output"
class DownloadFile(Task):
input_folder = Parameter()
file_name = Parameter()
index = IntParameter()
def output(self):
output_path = os.path.join(
OUTPUT_FOLDER,
str(self.index),
self.file_name,
)
return LocalTarget(output_path)
def run(self):
input_path = os.path.join(
self.input_folder,
self.file_name,
)
with open(input_path) as in_file:
with self.output().open("w") as out_file:
for line in in_file:
if "," in line:
out_file.write(line)
class DownloadSalesData(Task):
params = DictParameter()
def output(self):
return LocalTarget(self.params['output'])
def run(self):
processed_files = []
counter = 1
for file in sorted(os.listdir(self.params['input_folder'])):
target = yield DownloadFile(
self.params['input_folder'],
file,
counter,
)
counter += 1
processed_files.append(target)
with self.output().open("w") as out_file:
for file in processed_files:
with file.open() as in_file:
for line in in_file:
out_file.write(line)
Overwriting pipeline.py
[8]:
!LUIGI_CONFIG_PATH=/tmp/luigi.cfg python3 -m luigi --module pipeline DownloadSalesData --local-scheduler
DEBUG: Checking if DownloadSalesData(params={"input_folder": "input", "output": "all_sales.csv"}) is complete
INFO: Informed scheduler that task DownloadSalesData___input_folder___85c746fcb6 has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 1252] Worker Worker(salt=391761500, workers=1, host=b148b7d2dd75, username=root, pid=1252) running DownloadSalesData(params={"input_folder": "input", "output": "all_sales.csv"})
INFO: [pid 1252] Worker Worker(salt=391761500, workers=1, host=b148b7d2dd75, username=root, pid=1252) new requirements DownloadSalesData(params={"input_folder": "input", "output": "all_sales.csv"})
DEBUG: 1 running tasks, waiting for next task to finish
DEBUG: Checking if DownloadFile(input_folder=input, file_name=sales1.csv, index=1) is complete
INFO: Informed scheduler that task DownloadFile_sales1_csv_1_input_c3a4c863ff has status PENDING
INFO: Informed scheduler that task DownloadSalesData___input_folder___85c746fcb6 has status PENDING
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 1252] Worker Worker(salt=391761500, workers=1, host=b148b7d2dd75, username=root, pid=1252) running DownloadFile(input_folder=input, file_name=sales1.csv, index=1)
INFO: [pid 1252] Worker Worker(salt=391761500, workers=1, host=b148b7d2dd75, username=root, pid=1252) done DownloadFile(input_folder=input, file_name=sales1.csv, index=1)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task DownloadFile_sales1_csv_1_input_c3a4c863ff has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 1252] Worker Worker(salt=391761500, workers=1, host=b148b7d2dd75, username=root, pid=1252) running DownloadSalesData(params={"input_folder": "input", "output": "all_sales.csv"})
INFO: [pid 1252] Worker Worker(salt=391761500, workers=1, host=b148b7d2dd75, username=root, pid=1252) new requirements DownloadSalesData(params={"input_folder": "input", "output": "all_sales.csv"})
DEBUG: 1 running tasks, waiting for next task to finish
DEBUG: Checking if DownloadFile(input_folder=input, file_name=sales2.csv, index=2) is complete
INFO: Informed scheduler that task DownloadFile_sales2_csv_2_input_83e73fcbf0 has status PENDING
INFO: Informed scheduler that task DownloadSalesData___input_folder___85c746fcb6 has status PENDING
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 1252] Worker Worker(salt=391761500, workers=1, host=b148b7d2dd75, username=root, pid=1252) running DownloadFile(input_folder=input, file_name=sales2.csv, index=2)
INFO: [pid 1252] Worker Worker(salt=391761500, workers=1, host=b148b7d2dd75, username=root, pid=1252) done DownloadFile(input_folder=input, file_name=sales2.csv, index=2)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task DownloadFile_sales2_csv_2_input_83e73fcbf0 has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 1252] Worker Worker(salt=391761500, workers=1, host=b148b7d2dd75, username=root, pid=1252) running DownloadSalesData(params={"input_folder": "input", "output": "all_sales.csv"})
INFO: [pid 1252] Worker Worker(salt=391761500, workers=1, host=b148b7d2dd75, username=root, pid=1252) new requirements DownloadSalesData(params={"input_folder": "input", "output": "all_sales.csv"})
DEBUG: 1 running tasks, waiting for next task to finish
DEBUG: Checking if DownloadFile(input_folder=input, file_name=sales3.csv, index=3) is complete
INFO: Informed scheduler that task DownloadFile_sales3_csv_3_input_18bd499a8c has status PENDING
INFO: Informed scheduler that task DownloadSalesData___input_folder___85c746fcb6 has status PENDING
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 1252] Worker Worker(salt=391761500, workers=1, host=b148b7d2dd75, username=root, pid=1252) running DownloadFile(input_folder=input, file_name=sales3.csv, index=3)
INFO: [pid 1252] Worker Worker(salt=391761500, workers=1, host=b148b7d2dd75, username=root, pid=1252) done DownloadFile(input_folder=input, file_name=sales3.csv, index=3)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task DownloadFile_sales3_csv_3_input_18bd499a8c has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 1252] Worker Worker(salt=391761500, workers=1, host=b148b7d2dd75, username=root, pid=1252) running DownloadSalesData(params={"input_folder": "input", "output": "all_sales.csv"})
INFO: [pid 1252] Worker Worker(salt=391761500, workers=1, host=b148b7d2dd75, username=root, pid=1252) done DownloadSalesData(params={"input_folder": "input", "output": "all_sales.csv"})
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task DownloadSalesData___input_folder___85c746fcb6 has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=391761500, workers=1, host=b148b7d2dd75, username=root, pid=1252) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 4 tasks of which:
* 4 ran successfully:
- 3 DownloadFile(input_folder=input, file_name=sales1.csv, index=1) ...
- 1 DownloadSalesData(params={"input_folder": "input", "output": "all_sales.csv"})
This progress looks :) because there were no failed tasks or missing dependencies
===== Luigi Execution Summary =====
[9]:
!cat all_sales.csv
Fance,May,100
Germany,May,200
Fance,June,140
Germany,June,180
UK,June,180
France,June,240
Germany,June,180
[10]:
!ls -1 /tmp/output
1
2
3
[11]:
!ls -1 /tmp/output/1/
sales1.csv
[12]:
!ls -1 /tmp/output/2/
sales2.csv
[13]:
!ls -1 /tmp/output/3/
sales3.csv
[14]:
!cat /tmp/output/1/sales1.csv
Fance,May,100
Germany,May,200
[ ]: