(Opcional) Paso 2: Data Pipelines en DVC#

  • Ultima modificación: Abril 4, 2022

Este es un proyecto que desarrolla en varios pasos

No funciona a gran escala, pero este es un buen ejemplo de pipelines

[1]:
%cd dvcdemo
/workspace/dvcdemo

Descarga del proyecto#

[2]:
!wget https://code.dvc.org/get-started/code.zip
!unzip code.zip
!rm -f code.zip
--2022-06-07 16:40:57--  https://code.dvc.org/get-started/code.zip
Resolving code.dvc.org (code.dvc.org)... 104.21.81.205, 172.67.164.76, 2606:4700:3036::6815:51cd, ...
Connecting to code.dvc.org (code.dvc.org)|104.21.81.205|:443... connected.
HTTP request sent, awaiting response... 303 See Other
Location: https://s3-us-east-2.amazonaws.com/dvc-public/code/get-started/code.zip [following]
--2022-06-07 16:40:59--  https://s3-us-east-2.amazonaws.com/dvc-public/code/get-started/code.zip
Resolving s3-us-east-2.amazonaws.com (s3-us-east-2.amazonaws.com)... 52.219.103.65
Connecting to s3-us-east-2.amazonaws.com (s3-us-east-2.amazonaws.com)|52.219.103.65|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 5800 (5.7K) [application/zip]
Saving to: ‘code.zip’

code.zip            100%[===================>]   5.66K  --.-KB/s    in 0.06s

2022-06-07 16:41:02 (93.3 KB/s) - ‘code.zip’ saved [5800/5800]

Archive:  code.zip
  inflating: params.yaml
  inflating: src/evaluate.py
  inflating: src/featurization.py
  inflating: src/prepare.py
  inflating: src/requirements.txt
  inflating: src/train.py
   creating: .github/workflows/
  inflating: .github/workflows/cml.yaml
  inflating: .devcontainer/Dockerfile
  inflating: .devcontainer/devcontainer.json
[5]:
!pip3 install --quiet -r  src/requirements.txt
WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv

Stage prepare#

[6]:
cmd = """
dvc stage add -n prepare \
              -p prepare.seed,prepare.split \
              -d src/prepare.py \
              -d data/data.xml \
              -o data/prepared \
               python3 src/prepare.py data/data.xml
"""
!{cmd}
Creating 'dvc.yaml'                                                   core>
Adding stage 'prepare' in 'dvc.yaml'

To track the changes with git, run:

    git add data/.gitignore dvc.yaml

To enable auto staging, run:

        dvc config core.autostage true

[7]:
!cat dvc.yaml
stages:
  prepare:
    cmd: python3 src/prepare.py data/data.xml
    deps:
    - data/data.xml
    - src/prepare.py
    params:
    - prepare.seed
    - prepare.split
    outs:
    - data/prepared

src/prepare.py#

[8]:
!pygmentize src/prepare.py
import io
import os
import random
import re
import sys
import xml.etree.ElementTree

import yaml

params = yaml.safe_load(open("params.yaml"))["prepare"]

if len(sys.argv) != 2:
    sys.stderr.write("Arguments error. Usage:\n")
    sys.stderr.write("\tpython prepare.py data-file\n")
    sys.exit(1)

# Test data set split ratio
split = params["split"]
random.seed(params["seed"])

input = sys.argv[1]
output_train = os.path.join("data", "prepared", "train.tsv")
output_test = os.path.join("data", "prepared", "test.tsv")


def process_posts(fd_in, fd_out_train, fd_out_test, target_tag):
    num = 1
    for line in fd_in:
        try:
            fd_out = fd_out_train if random.random() > split else fd_out_test
            attr = xml.etree.ElementTree.fromstring(line).attrib

            pid = attr.get("Id", "")
            label = 1 if target_tag in attr.get("Tags", "") else 0
            title = re.sub(r"\s+", " ", attr.get("Title", "")).strip()
            body = re.sub(r"\s+", " ", attr.get("Body", "")).strip()
            text = title + " " + body

            fd_out.write("{}\t{}\t{}\n".format(pid, label, text))

            num += 1
        except Exception as ex:
            sys.stderr.write(f"Skipping the broken line {num}: {ex}\n")


os.makedirs(os.path.join("data", "prepared"), exist_ok=True)

with io.open(input, encoding="utf8") as fd_in:
    with io.open(output_train, "w", encoding="utf8") as fd_out_train:
        with io.open(output_test, "w", encoding="utf8") as fd_out_test:
            process_posts(fd_in, fd_out_train, fd_out_test, "<r>")

Stage featurize#

[9]:
cmd = """
dvc stage add -n featurize \
              -p featurize.max_features,featurize.ngrams \
              -d src/featurization.py \
              -d data/prepared \
              -o data/features \
              python3 src/featurization.py data/prepared data/features
"""
!{cmd}
Adding stage 'featurize' in 'dvc.yaml'                                core>

To track the changes with git, run:

    git add data/.gitignore dvc.yaml

To enable auto staging, run:

        dvc config core.autostage true

[10]:
!cat dvc.yaml
stages:
  prepare:
    cmd: python3 src/prepare.py data/data.xml
    deps:
    - data/data.xml
    - src/prepare.py
    params:
    - prepare.seed
    - prepare.split
    outs:
    - data/prepared
  featurize:
    cmd: python3 src/featurization.py data/prepared data/features
    deps:
    - data/prepared
    - src/featurization.py
    params:
    - featurize.max_features
    - featurize.ngrams
    outs:
    - data/features

src/featurization.py#

[11]:
!pygmentize src/featurization.py
import os
import pickle
import sys

import numpy as np
import pandas as pd
import scipy.sparse as sparse
import yaml
from sklearn.feature_extraction.text import CountVectorizer, TfidfTransformer

params = yaml.safe_load(open("params.yaml"))["featurize"]

np.set_printoptions(suppress=True)

if len(sys.argv) != 3 and len(sys.argv) != 5:
    sys.stderr.write("Arguments error. Usage:\n")
    sys.stderr.write("\tpython featurization.py data-dir-path features-dir-path\n")
    sys.exit(1)

train_input = os.path.join(sys.argv[1], "train.tsv")
test_input = os.path.join(sys.argv[1], "test.tsv")
train_output = os.path.join(sys.argv[2], "train.pkl")
test_output = os.path.join(sys.argv[2], "test.pkl")

max_features = params["max_features"]
ngrams = params["ngrams"]


def get_df(data):
    df = pd.read_csv(
        data,
        encoding="utf-8",
        header=None,
        delimiter="\t",
        names=["id", "label", "text"],
    )
    sys.stderr.write(f"The input data frame {data} size is {df.shape}\n")
    return df


def save_matrix(df, matrix, names, output):
    id_matrix = sparse.csr_matrix(df.id.astype(np.int64)).T
    label_matrix = sparse.csr_matrix(df.label.astype(np.int64)).T

    result = sparse.hstack([id_matrix, label_matrix, matrix], format="csr")

    msg = "The output matrix {} size is {} and data type is {}\n"
    sys.stderr.write(msg.format(output, result.shape, result.dtype))

    with open(output, "wb") as fd:
        pickle.dump((result, names), fd)
    pass


os.makedirs(sys.argv[2], exist_ok=True)

# Generate train feature matrix
df_train = get_df(train_input)
train_words = np.array(df_train.text.str.lower().values.astype("U"))

bag_of_words = CountVectorizer(
    stop_words="english", max_features=max_features, ngram_range=(1, ngrams)
)

bag_of_words.fit(train_words)
train_words_binary_matrix = bag_of_words.transform(train_words)
feature_names = bag_of_words.get_feature_names_out()
tfidf = TfidfTransformer(smooth_idf=False)
tfidf.fit(train_words_binary_matrix)
train_words_tfidf_matrix = tfidf.transform(train_words_binary_matrix)

save_matrix(df_train, train_words_tfidf_matrix, feature_names, train_output)

# Generate test feature matrix
df_test = get_df(test_input)
test_words = np.array(df_test.text.str.lower().values.astype("U"))
test_words_binary_matrix = bag_of_words.transform(test_words)
test_words_tfidf_matrix = tfidf.transform(test_words_binary_matrix)

save_matrix(df_test, test_words_tfidf_matrix, feature_names, test_output)

Stage train#

[12]:
cmd = """
dvc stage add -n train \
              -p train.seed,train.n_est,train.min_split \
              -d src/train.py \
              -d data/features \
              -o model.pkl \
              python3 src/train.py data/features model.pkl
"""
!{cmd}
Adding stage 'train' in 'dvc.yaml'                                    core>

To track the changes with git, run:

    git add .gitignore dvc.yaml

To enable auto staging, run:

        dvc config core.autostage true

[13]:
!cat dvc.yaml
stages:
  prepare:
    cmd: python3 src/prepare.py data/data.xml
    deps:
    - data/data.xml
    - src/prepare.py
    params:
    - prepare.seed
    - prepare.split
    outs:
    - data/prepared
  featurize:
    cmd: python3 src/featurization.py data/prepared data/features
    deps:
    - data/prepared
    - src/featurization.py
    params:
    - featurize.max_features
    - featurize.ngrams
    outs:
    - data/features
  train:
    cmd: python3 src/train.py data/features model.pkl
    deps:
    - data/features
    - src/train.py
    params:
    - train.min_split
    - train.n_est
    - train.seed
    outs:
    - model.pkl

src/train.py#

[14]:
!pygmentize src/train.py
import os
import pickle
import sys

import numpy as np
import yaml
from sklearn.ensemble import RandomForestClassifier

params = yaml.safe_load(open("params.yaml"))["train"]

if len(sys.argv) != 3:
    sys.stderr.write("Arguments error. Usage:\n")
    sys.stderr.write("\tpython train.py features model\n")
    sys.exit(1)

input = sys.argv[1]
output = sys.argv[2]
seed = params["seed"]
n_est = params["n_est"]
min_split = params["min_split"]

with open(os.path.join(input, "train.pkl"), "rb") as fd:
    matrix, _ = pickle.load(fd)

labels = np.squeeze(matrix[:, 1].toarray())
x = matrix[:, 2:]

sys.stderr.write("Input matrix size {}\n".format(matrix.shape))
sys.stderr.write("X matrix size {}\n".format(x.shape))
sys.stderr.write("Y matrix size {}\n".format(labels.shape))

clf = RandomForestClassifier(
    n_estimators=n_est, min_samples_split=min_split, n_jobs=2, random_state=seed
)

clf.fit(x, labels)

with open(output, "wb") as fd:
    pickle.dump(clf, fd)

Data#

[15]:
#
# Descarga los datos desde el repositorio de ejemplo de dvc
#
repo = "https://github.com/iterative/dataset-registry"
src = "get-started/data.xml"
dst = "data/data.xml"

!dvc get {repo} {src} -o {dst}
ERROR: unexpected error - [Errno 17] File exists: 'data/data.xml'

Having any troubles? Hit us up at https://dvc.org/support, we are always happy to help!

Reproducción#

[16]:
pwd
[16]:
'/workspace/dvcdemo'
[17]:
!dvc repro
Verifying data sources in stage: 'data/data.xml.dvc'                  core>

Running stage 'prepare':
> python3 src/prepare.py data/data.xml
  0% Transferring|                                   |0/3 [00:00<?,     ?file/s]
!
  0%|          |e72f304d64c28867d884e798568460.dir 0.00/? [00:00<?,        ?B/s]
  0%|          |e72f304d64c28867d884e798568460.di0.00/137 [00:00<?,        ?B/s]
Generating lock file 'dvc.lock'
Updating lock file 'dvc.lock'

Running stage 'featurize':
> python3 src/featurization.py data/prepared data/features
The input data frame data/prepared/train.tsv size is (16011, 3)
The output matrix data/features/train.pkl size is (16011, 102) and data type is float64
The input data frame data/prepared/test.tsv size is (3989, 3)
The output matrix data/features/test.pkl size is (3989, 102) and data type is float64
  0% Transferring|                                   |0/3 [00:00<?,     ?file/s]
!
  0%|          |28c4afe2ae56365ae96716fff987a5.dir 0.00/? [00:00<?,        ?B/s]
  0%|          |28c4afe2ae56365ae96716fff987a5.di0.00/137 [00:00<?,        ?B/s]
Updating lock file 'dvc.lock'

Running stage 'train':
> python3 src/train.py data/features model.pkl
Input matrix size (16011, 102)
X matrix size (16011, 100)
Y matrix size (16011,)
Updating lock file 'dvc.lock'

To track the changes with git, run:

    git add data/data.xml.dvc dvc.lock

To enable auto staging, run:

        dvc config core.autostage true
Use `dvc push` to send your updates to remote storage.

[18]:
!cat dvc.lock
schema: '2.0'
stages:
  prepare:
    cmd: python3 src/prepare.py data/data.xml
    deps:
    - path: data/data.xml
      md5: 079fbd15fa2c32c539c4c4e3675b514a
      size: 28890194
    - path: src/prepare.py
      md5: f09ea0c15980b43010257ccb9f0055e2
      size: 1576
    params:
      params.yaml:
        prepare.seed: 20170428
        prepare.split: 0.2
    outs:
    - path: data/prepared
      md5: 2fe72f304d64c28867d884e798568460.dir
      size: 16874726
      nfiles: 2
  featurize:
    cmd: python3 src/featurization.py data/prepared data/features
    deps:
    - path: data/prepared
      md5: 2fe72f304d64c28867d884e798568460.dir
      size: 16874726
      nfiles: 2
    - path: src/featurization.py
      md5: e0265fc22f056a4b86d85c3056bc2894
      size: 2490
    params:
      params.yaml:
        featurize.max_features: 100
        featurize.ngrams: 1
    outs:
    - path: data/features
      md5: 1e28c4afe2ae56365ae96716fff987a5.dir
      size: 3122242
      nfiles: 2
  train:
    cmd: python3 src/train.py data/features model.pkl
    deps:
    - path: data/features
      md5: 1e28c4afe2ae56365ae96716fff987a5.dir
      size: 3122242
      nfiles: 2
    - path: src/train.py
      md5: c3961d777cfbd7727f9fde4851896006
      size: 967
    params:
      params.yaml:
        train.min_split: 0.01
        train.n_est: 50
        train.seed: 20170428
    outs:
    - path: model.pkl
      md5: b1a7524da7b6b2552b6f3e402ac6aa77
      size: 1873894

Visualización#

[19]:
!dvc dag
+-------------------+
| data/data.xml.dvc |
+-------------------+
          *
          *
          *
     +---------+
     | prepare |
     +---------+
          *
          *
          *
    +-----------+
    | featurize |
    +-----------+
          *
          *
          *
      +-------+
      | train |
      +-------+