Pipeline Validation#
Ultima modificación: Mayo 14, 2022
Pipeline simple#
def pipeline(file_path):
data = load_from_source(file_path) # STEP 1
data = operation_one(data) # STEP 2
data = operation_two(data) # STEP 3
save_to_destination(data) # STEP 4
def pipeline(file_path):
data = load_from_source(file_path) # STEP 1
validate(data.columns, ['user_id', 'first_name', 'last_name'])
data = operation_one(data) # STEP 2
validate(data.columns, ['user_id', 'full_name'])
validate(data, (int, str))
data = operation_two(data) # STEP 3
validate.unique(data['user_id'])
save_to_destination(data) # STEP 4
Uso de __debug__
#
def pipeline(file_path):
data = load_from_source(file_path) # STEP 1
if __debug__:
validate(data.columns, ['user_id', 'first_name', 'last_name'])
data = operation_one(data) # STEP 2
if __debug__:
validate(data.columns, ['user_id', 'full_name'])
validate(data, (int, str))
data = operation_two(data) # STEP 3
if __debug__:
validate.unique(data['user_id'])
save_to_destination(data) # STEP 4
Validation On
$ python3 simple_pipeline.py
Validation Off
$ python3 -O simple_pipeline.py
Datasets de gran tamaño#
DataFrame.sample()
def pipeline(file_path):
data = load_from_source(file_path) # STEP 1
validate(data.columns, ['user_id', 'first_name', 'last_name'])
data = operation_one(data) # STEP 2
sample = data.sample(n=100)
validate(sample.columns, ['user_id', 'full_name'])
validate(sample, (int, str))
data = operation_two(data) # STEP 3
sample = data.sample(n=100)
validate.unique(sample['user_id'])
save_to_destination(data) # STEP 4
Iterator
import itertools
def pipeline(file_path):
iterator = load_from_source(file_path) # STEP 1
iterator = operation_one(iterator) # STEP 2
sample = list(itertools.islice(iterator, 100))
validate(sample, (int, str))
iterator = itertools.chain(sample, iterator)
iterator = operation_two(iterator) # STEP 3
sample = list(itertools.islice(iterator, 100))
validate.unique(item[0] for item in sample)
iterator = itertools.chain(sample, iterator)
save_to_destination(iterator) # STEP 4
Multiples iterators
import itertools
def get_sample(iterable, n=100):
iterator = iter(iterable)
sample = list(itertools.islice(iterator, n))
iterator = itertools.chain(sample, iterator)
return sample, iterator
…
sample, iterator = get_sample(iterator)
validate(sample, (int, str))
…