Pipeline Validation#

  • Ultima modificación: Mayo 14, 2022

Pipeline simple#

def pipeline(file_path):
    data = load_from_source(file_path)  # STEP 1

    data = operation_one(data)          # STEP 2

    data = operation_two(data)          # STEP 3

    save_to_destination(data)           # STEP 4
def pipeline(file_path):
    data = load_from_source(file_path)  # STEP 1

    validate(data.columns, ['user_id', 'first_name', 'last_name'])

    data = operation_one(data)          # STEP 2

    validate(data.columns, ['user_id', 'full_name'])

    validate(data, (int, str))

    data = operation_two(data)          # STEP 3

    validate.unique(data['user_id'])

    save_to_destination(data)           # STEP 4

Uso de __debug__#

def pipeline(file_path):
    data = load_from_source(file_path)  # STEP 1

    if __debug__:
        validate(data.columns, ['user_id', 'first_name', 'last_name'])

    data = operation_one(data)          # STEP 2

    if __debug__:
        validate(data.columns, ['user_id', 'full_name'])
        validate(data, (int, str))

    data = operation_two(data)          # STEP 3

    if __debug__:
        validate.unique(data['user_id'])

    save_to_destination(data)           # STEP 4

Validation On

$ python3 simple_pipeline.py

Validation Off

$ python3 -O simple_pipeline.py

Datasets de gran tamaño#

DataFrame.sample()

def pipeline(file_path):
    data = load_from_source(file_path)  # STEP 1

    validate(data.columns, ['user_id', 'first_name', 'last_name'])

    data = operation_one(data)          # STEP 2

    sample = data.sample(n=100)
    validate(sample.columns, ['user_id', 'full_name'])
    validate(sample, (int, str))

    data = operation_two(data)          # STEP 3

    sample = data.sample(n=100)
    validate.unique(sample['user_id'])

    save_to_destination(data)           # STEP 4

Iterator

import itertools

def pipeline(file_path):
    iterator = load_from_source(file_path)  # STEP 1

    iterator = operation_one(iterator)      # STEP 2

    sample = list(itertools.islice(iterator, 100))
    validate(sample, (int, str))
    iterator = itertools.chain(sample, iterator)

    iterator = operation_two(iterator)      # STEP 3

    sample = list(itertools.islice(iterator, 100))
    validate.unique(item[0] for item in sample)
    iterator = itertools.chain(sample, iterator)

    save_to_destination(iterator)           # STEP 4

Multiples iterators

import itertools

def get_sample(iterable, n=100):
    iterator = iter(iterable)
    sample = list(itertools.islice(iterator, n))
    iterator = itertools.chain(sample, iterator)
    return sample, iterator

sample, iterator = get_sample(iterator)
validate(sample, (int, str))