(Opcional) Paso 2: Data Pipelines en DVC#
Ultima modificación: Abril 4, 2022
Este es un proyecto que desarrolla en varios pasos
No funciona a gran escala, pero este es un buen ejemplo de pipelines
[1]:
%cd dvcdemo
/workspace/dvcdemo
Descarga del proyecto#
[2]:
!wget https://code.dvc.org/get-started/code.zip
!unzip code.zip
!rm -f code.zip
--2022-06-07 16:40:57-- https://code.dvc.org/get-started/code.zip
Resolving code.dvc.org (code.dvc.org)... 104.21.81.205, 172.67.164.76, 2606:4700:3036::6815:51cd, ...
Connecting to code.dvc.org (code.dvc.org)|104.21.81.205|:443... connected.
HTTP request sent, awaiting response... 303 See Other
Location: https://s3-us-east-2.amazonaws.com/dvc-public/code/get-started/code.zip [following]
--2022-06-07 16:40:59-- https://s3-us-east-2.amazonaws.com/dvc-public/code/get-started/code.zip
Resolving s3-us-east-2.amazonaws.com (s3-us-east-2.amazonaws.com)... 52.219.103.65
Connecting to s3-us-east-2.amazonaws.com (s3-us-east-2.amazonaws.com)|52.219.103.65|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 5800 (5.7K) [application/zip]
Saving to: ‘code.zip’
code.zip 100%[===================>] 5.66K --.-KB/s in 0.06s
2022-06-07 16:41:02 (93.3 KB/s) - ‘code.zip’ saved [5800/5800]
Archive: code.zip
inflating: params.yaml
inflating: src/evaluate.py
inflating: src/featurization.py
inflating: src/prepare.py
inflating: src/requirements.txt
inflating: src/train.py
creating: .github/workflows/
inflating: .github/workflows/cml.yaml
inflating: .devcontainer/Dockerfile
inflating: .devcontainer/devcontainer.json
[5]:
!pip3 install --quiet -r src/requirements.txt
WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv
Stage prepare#
[6]:
cmd = """
dvc stage add -n prepare \
-p prepare.seed,prepare.split \
-d src/prepare.py \
-d data/data.xml \
-o data/prepared \
python3 src/prepare.py data/data.xml
"""
!{cmd}
Creating 'dvc.yaml' core>
Adding stage 'prepare' in 'dvc.yaml'
To track the changes with git, run:
git add data/.gitignore dvc.yaml
To enable auto staging, run:
dvc config core.autostage true
[7]:
!cat dvc.yaml
stages:
prepare:
cmd: python3 src/prepare.py data/data.xml
deps:
- data/data.xml
- src/prepare.py
params:
- prepare.seed
- prepare.split
outs:
- data/prepared
src/prepare.py#
[8]:
!pygmentize src/prepare.py
import io
import os
import random
import re
import sys
import xml.etree.ElementTree
import yaml
params = yaml.safe_load(open("params.yaml"))["prepare"]
if len(sys.argv) != 2:
sys.stderr.write("Arguments error. Usage:\n")
sys.stderr.write("\tpython prepare.py data-file\n")
sys.exit(1)
# Test data set split ratio
split = params["split"]
random.seed(params["seed"])
input = sys.argv[1]
output_train = os.path.join("data", "prepared", "train.tsv")
output_test = os.path.join("data", "prepared", "test.tsv")
def process_posts(fd_in, fd_out_train, fd_out_test, target_tag):
num = 1
for line in fd_in:
try:
fd_out = fd_out_train if random.random() > split else fd_out_test
attr = xml.etree.ElementTree.fromstring(line).attrib
pid = attr.get("Id", "")
label = 1 if target_tag in attr.get("Tags", "") else 0
title = re.sub(r"\s+", " ", attr.get("Title", "")).strip()
body = re.sub(r"\s+", " ", attr.get("Body", "")).strip()
text = title + " " + body
fd_out.write("{}\t{}\t{}\n".format(pid, label, text))
num += 1
except Exception as ex:
sys.stderr.write(f"Skipping the broken line {num}: {ex}\n")
os.makedirs(os.path.join("data", "prepared"), exist_ok=True)
with io.open(input, encoding="utf8") as fd_in:
with io.open(output_train, "w", encoding="utf8") as fd_out_train:
with io.open(output_test, "w", encoding="utf8") as fd_out_test:
process_posts(fd_in, fd_out_train, fd_out_test, "<r>")
Stage featurize#
[9]:
cmd = """
dvc stage add -n featurize \
-p featurize.max_features,featurize.ngrams \
-d src/featurization.py \
-d data/prepared \
-o data/features \
python3 src/featurization.py data/prepared data/features
"""
!{cmd}
Adding stage 'featurize' in 'dvc.yaml' core>
To track the changes with git, run:
git add data/.gitignore dvc.yaml
To enable auto staging, run:
dvc config core.autostage true
[10]:
!cat dvc.yaml
stages:
prepare:
cmd: python3 src/prepare.py data/data.xml
deps:
- data/data.xml
- src/prepare.py
params:
- prepare.seed
- prepare.split
outs:
- data/prepared
featurize:
cmd: python3 src/featurization.py data/prepared data/features
deps:
- data/prepared
- src/featurization.py
params:
- featurize.max_features
- featurize.ngrams
outs:
- data/features
src/featurization.py#
[11]:
!pygmentize src/featurization.py
import os
import pickle
import sys
import numpy as np
import pandas as pd
import scipy.sparse as sparse
import yaml
from sklearn.feature_extraction.text import CountVectorizer, TfidfTransformer
params = yaml.safe_load(open("params.yaml"))["featurize"]
np.set_printoptions(suppress=True)
if len(sys.argv) != 3 and len(sys.argv) != 5:
sys.stderr.write("Arguments error. Usage:\n")
sys.stderr.write("\tpython featurization.py data-dir-path features-dir-path\n")
sys.exit(1)
train_input = os.path.join(sys.argv[1], "train.tsv")
test_input = os.path.join(sys.argv[1], "test.tsv")
train_output = os.path.join(sys.argv[2], "train.pkl")
test_output = os.path.join(sys.argv[2], "test.pkl")
max_features = params["max_features"]
ngrams = params["ngrams"]
def get_df(data):
df = pd.read_csv(
data,
encoding="utf-8",
header=None,
delimiter="\t",
names=["id", "label", "text"],
)
sys.stderr.write(f"The input data frame {data} size is {df.shape}\n")
return df
def save_matrix(df, matrix, names, output):
id_matrix = sparse.csr_matrix(df.id.astype(np.int64)).T
label_matrix = sparse.csr_matrix(df.label.astype(np.int64)).T
result = sparse.hstack([id_matrix, label_matrix, matrix], format="csr")
msg = "The output matrix {} size is {} and data type is {}\n"
sys.stderr.write(msg.format(output, result.shape, result.dtype))
with open(output, "wb") as fd:
pickle.dump((result, names), fd)
pass
os.makedirs(sys.argv[2], exist_ok=True)
# Generate train feature matrix
df_train = get_df(train_input)
train_words = np.array(df_train.text.str.lower().values.astype("U"))
bag_of_words = CountVectorizer(
stop_words="english", max_features=max_features, ngram_range=(1, ngrams)
)
bag_of_words.fit(train_words)
train_words_binary_matrix = bag_of_words.transform(train_words)
feature_names = bag_of_words.get_feature_names_out()
tfidf = TfidfTransformer(smooth_idf=False)
tfidf.fit(train_words_binary_matrix)
train_words_tfidf_matrix = tfidf.transform(train_words_binary_matrix)
save_matrix(df_train, train_words_tfidf_matrix, feature_names, train_output)
# Generate test feature matrix
df_test = get_df(test_input)
test_words = np.array(df_test.text.str.lower().values.astype("U"))
test_words_binary_matrix = bag_of_words.transform(test_words)
test_words_tfidf_matrix = tfidf.transform(test_words_binary_matrix)
save_matrix(df_test, test_words_tfidf_matrix, feature_names, test_output)
Stage train#
[12]:
cmd = """
dvc stage add -n train \
-p train.seed,train.n_est,train.min_split \
-d src/train.py \
-d data/features \
-o model.pkl \
python3 src/train.py data/features model.pkl
"""
!{cmd}
Adding stage 'train' in 'dvc.yaml' core>
To track the changes with git, run:
git add .gitignore dvc.yaml
To enable auto staging, run:
dvc config core.autostage true
[13]:
!cat dvc.yaml
stages:
prepare:
cmd: python3 src/prepare.py data/data.xml
deps:
- data/data.xml
- src/prepare.py
params:
- prepare.seed
- prepare.split
outs:
- data/prepared
featurize:
cmd: python3 src/featurization.py data/prepared data/features
deps:
- data/prepared
- src/featurization.py
params:
- featurize.max_features
- featurize.ngrams
outs:
- data/features
train:
cmd: python3 src/train.py data/features model.pkl
deps:
- data/features
- src/train.py
params:
- train.min_split
- train.n_est
- train.seed
outs:
- model.pkl
src/train.py#
[14]:
!pygmentize src/train.py
import os
import pickle
import sys
import numpy as np
import yaml
from sklearn.ensemble import RandomForestClassifier
params = yaml.safe_load(open("params.yaml"))["train"]
if len(sys.argv) != 3:
sys.stderr.write("Arguments error. Usage:\n")
sys.stderr.write("\tpython train.py features model\n")
sys.exit(1)
input = sys.argv[1]
output = sys.argv[2]
seed = params["seed"]
n_est = params["n_est"]
min_split = params["min_split"]
with open(os.path.join(input, "train.pkl"), "rb") as fd:
matrix, _ = pickle.load(fd)
labels = np.squeeze(matrix[:, 1].toarray())
x = matrix[:, 2:]
sys.stderr.write("Input matrix size {}\n".format(matrix.shape))
sys.stderr.write("X matrix size {}\n".format(x.shape))
sys.stderr.write("Y matrix size {}\n".format(labels.shape))
clf = RandomForestClassifier(
n_estimators=n_est, min_samples_split=min_split, n_jobs=2, random_state=seed
)
clf.fit(x, labels)
with open(output, "wb") as fd:
pickle.dump(clf, fd)
Data#
[15]:
#
# Descarga los datos desde el repositorio de ejemplo de dvc
#
repo = "https://github.com/iterative/dataset-registry"
src = "get-started/data.xml"
dst = "data/data.xml"
!dvc get {repo} {src} -o {dst}
ERROR: unexpected error - [Errno 17] File exists: 'data/data.xml'
Having any troubles? Hit us up at https://dvc.org/support, we are always happy to help!
Reproducción#
[16]:
pwd
[16]:
'/workspace/dvcdemo'
[17]:
!dvc repro
Verifying data sources in stage: 'data/data.xml.dvc' core>
Running stage 'prepare':
> python3 src/prepare.py data/data.xml
0% Transferring| |0/3 [00:00<?, ?file/s]
!
0%| |e72f304d64c28867d884e798568460.dir 0.00/? [00:00<?, ?B/s]
0%| |e72f304d64c28867d884e798568460.di0.00/137 [00:00<?, ?B/s]
Generating lock file 'dvc.lock'
Updating lock file 'dvc.lock'
Running stage 'featurize':
> python3 src/featurization.py data/prepared data/features
The input data frame data/prepared/train.tsv size is (16011, 3)
The output matrix data/features/train.pkl size is (16011, 102) and data type is float64
The input data frame data/prepared/test.tsv size is (3989, 3)
The output matrix data/features/test.pkl size is (3989, 102) and data type is float64
0% Transferring| |0/3 [00:00<?, ?file/s]
!
0%| |28c4afe2ae56365ae96716fff987a5.dir 0.00/? [00:00<?, ?B/s]
0%| |28c4afe2ae56365ae96716fff987a5.di0.00/137 [00:00<?, ?B/s]
Updating lock file 'dvc.lock'
Running stage 'train':
> python3 src/train.py data/features model.pkl
Input matrix size (16011, 102)
X matrix size (16011, 100)
Y matrix size (16011,)
Updating lock file 'dvc.lock'
To track the changes with git, run:
git add data/data.xml.dvc dvc.lock
To enable auto staging, run:
dvc config core.autostage true
Use `dvc push` to send your updates to remote storage.
[18]:
!cat dvc.lock
schema: '2.0'
stages:
prepare:
cmd: python3 src/prepare.py data/data.xml
deps:
- path: data/data.xml
md5: 079fbd15fa2c32c539c4c4e3675b514a
size: 28890194
- path: src/prepare.py
md5: f09ea0c15980b43010257ccb9f0055e2
size: 1576
params:
params.yaml:
prepare.seed: 20170428
prepare.split: 0.2
outs:
- path: data/prepared
md5: 2fe72f304d64c28867d884e798568460.dir
size: 16874726
nfiles: 2
featurize:
cmd: python3 src/featurization.py data/prepared data/features
deps:
- path: data/prepared
md5: 2fe72f304d64c28867d884e798568460.dir
size: 16874726
nfiles: 2
- path: src/featurization.py
md5: e0265fc22f056a4b86d85c3056bc2894
size: 2490
params:
params.yaml:
featurize.max_features: 100
featurize.ngrams: 1
outs:
- path: data/features
md5: 1e28c4afe2ae56365ae96716fff987a5.dir
size: 3122242
nfiles: 2
train:
cmd: python3 src/train.py data/features model.pkl
deps:
- path: data/features
md5: 1e28c4afe2ae56365ae96716fff987a5.dir
size: 3122242
nfiles: 2
- path: src/train.py
md5: c3961d777cfbd7727f9fde4851896006
size: 967
params:
params.yaml:
train.min_split: 0.01
train.n_est: 50
train.seed: 20170428
outs:
- path: model.pkl
md5: b1a7524da7b6b2552b6f3e402ac6aa77
size: 1873894
Visualización#
[19]:
!dvc dag
+-------------------+
| data/data.xml.dvc |
+-------------------+
*
*
*
+---------+
| prepare |
+---------+
*
*
*
+-----------+
| featurize |
+-----------+
*
*
*
+-------+
| train |
+-------+