Tensorflow Extended (TFX) provides ready made components for typical steps in a machine learning workflow. Other courses in this specialization focus on specific components and in this lab, you will learn how to make your own. This will be useful in case your project has specific needs that fall outside the standard TFX components. It will make your pipelines more flexible while still leveraging the experiment tracking and orchestration that TFX provides. In particular, you will:
To demonstrate, you will run the pipeline used in this official tutorial then modify it to have a custom component. Some of the discussions here are also taken from that tutorial to explain the motivation and point to additional resources.
Let’s begin!
Note: If you haven’t taken other courses in this specialization and it’s the first time you’re using TFX, please see Understanding TFX Pipelines to get an overview of important concepts.
import os
from absl import logging
import pandas as pd
import glob
import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
# Set default logging level.
logging.set_verbosity(logging.INFO)
TensorFlow version: 2.11.0
TFX version: 1.12.0
There are some variables used to define a pipeline. You can customize these variables as you want. By default all output from the pipeline will be generated under the current directory.
# Pipeline label
PIPELINE_NAME = "penguin-simple"
# Output directory to store artifacts generated from the pipeline.
PIPELINE_ROOT = os.path.join('pipelines', PIPELINE_NAME)
# Path to a SQLite DB file to use as an MLMD storage.
METADATA_PATH = os.path.join('metadata', PIPELINE_NAME, 'metadata.db')
# Output directory where created models from the pipeline will be exported.
SERVING_MODEL_DIR = os.path.join('serving_model', PIPELINE_NAME)
# Dataset directory
DATA_ROOT = 'data'
You will use the Palmer Penguins dataset for this exercise. It has four numeric features, namely:
All features were already normalized to have range [0,1]. You will build a
classification model which predicts the species
of penguins.
The pipeline will consist of three essential TFX components and the graph will look like this:
ExampleGen -> Trainer -> Pusher
The pipeline includes the most minimal ML workflow which is importing data (ExampleGen), training a model (Trainer) and exporting the trained model (Pusher).
You will first define the trainer module so the Trainer
component can build the model and train it.
_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}
from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils
from tfx import v1 as tfx
from tfx_bsl.public import tfxio
from tensorflow_metadata.proto.v0 import schema_pb2
_FEATURE_KEYS = [
'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'
_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10
# Since we're not generating or creating a schema, we will instead create
# a feature spec. Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
**{
feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
for feature in _FEATURE_KEYS
},
_LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}
def _input_fn(file_pattern: List[str],
data_accessor: tfx.components.DataAccessor,
schema: schema_pb2.Schema,
batch_size: int = 200) -> tf.data.Dataset:
"""Generates features and label for training.
Args:
file_pattern: List of paths or patterns of input tfrecord files.
data_accessor: DataAccessor for converting input to RecordBatch.
schema: schema of the input data.
batch_size: representing the number of consecutive elements of returned
dataset to combine in a single batch
Returns:
A dataset that contains (features, indices) tuple where features is a
dictionary of Tensors, and indices is a single Tensor of label indices.
"""
return data_accessor.tf_dataset_factory(
file_pattern,
tfxio.TensorFlowDatasetOptions(
batch_size=batch_size, label_key=_LABEL_KEY),
schema=schema).repeat()
def _build_keras_model() -> tf.keras.Model:
"""Creates a DNN Keras model for classifying penguin data.
Returns:
A Keras Model.
"""
# The model below is built with Functional API, please refer to
# https://www.tensorflow.org/guide/keras/overview for all API options.
inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
d = keras.layers.concatenate(inputs)
for _ in range(2):
d = keras.layers.Dense(8, activation='relu')(d)
outputs = keras.layers.Dense(3)(d)
model = keras.Model(inputs=inputs, outputs=outputs)
model.compile(
optimizer=keras.optimizers.Adam(1e-2),
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=[keras.metrics.SparseCategoricalAccuracy()])
model.summary(print_fn=logging.info)
return model
# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
"""Train the model based on given args.
Args:
fn_args: Holds args used to train the model as name/value pairs.
"""
# This schema is usually either an output of SchemaGen or a manually-curated
# version provided by pipeline author. A schema can also derived from TFT
# graph if a Transform component is used. In the case when either is missing,
# `schema_from_feature_spec` could be used to generate schema from very simple
# feature_spec, but the schema returned would be very primitive.
schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)
train_dataset = _input_fn(
fn_args.train_files,
fn_args.data_accessor,
schema,
batch_size=_TRAIN_BATCH_SIZE)
eval_dataset = _input_fn(
fn_args.eval_files,
fn_args.data_accessor,
schema,
batch_size=_EVAL_BATCH_SIZE)
model = _build_keras_model()
model.fit(
train_dataset,
steps_per_epoch=fn_args.train_steps,
validation_data=eval_dataset,
validation_steps=fn_args.eval_steps)
# The result of the training should be saved in `fn_args.serving_model_dir`
# directory.
model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py
Next, you will define a function to create a TFX pipeline. A Pipeline
object represents a TFX pipeline which can be run using one of pipeline orchestration systems that TFX supports.
def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
module_file: str, serving_model_dir: str,
metadata_path: str) -> tfx.dsl.Pipeline:
"""Creates a three component penguin pipeline with TFX."""
# Brings data into the pipeline.
example_gen = tfx.components.CsvExampleGen(input_base=data_root)
# Uses user-provided Python function that trains a model.
trainer = tfx.components.Trainer(
module_file=module_file,
examples=example_gen.outputs['examples'],
train_args=tfx.proto.TrainArgs(num_steps=100),
eval_args=tfx.proto.EvalArgs(num_steps=5))
# Pushes the model to a filesystem destination.
pusher = tfx.components.Pusher(
model=trainer.outputs['model'],
push_destination=tfx.proto.PushDestination(
filesystem=tfx.proto.PushDestination.Filesystem(
base_directory=serving_model_dir)))
# Following three components will be included in the pipeline.
components = [
example_gen,
trainer,
pusher,
]
return tfx.dsl.Pipeline(
pipeline_name=pipeline_name,
pipeline_root=pipeline_root,
metadata_connection_config=tfx.orchestration.metadata
.sqlite_metadata_connection_config(metadata_path),
components=components)
TFX supports multiple orchestrators to run pipelines.
In this tutorial we will use LocalDagRunner
which is included in the TFX
Python package and runs pipelines on local environment.
We often call TFX pipelines “DAGs” which stands for directed acyclic graph.
LocalDagRunner
provides fast iterations for developemnt and debugging.
TFX also supports other orchestrators including Kubeflow Pipelines and Apache
Airflow which are suitable for production use cases.
See TFX on Cloud AI Platform Pipelines or TFX Airflow Tutorial to learn more about other orchestration systems.
The code below creates a LocalDagRunner
and passes a Pipeline
object created from the
function you defined earlier. The pipeline runs directly and you can see logs for the progress of the pipeline including ML model training.
tfx.orchestration.LocalDagRunner().run(
_create_pipeline(
pipeline_name=PIPELINE_NAME,
pipeline_root=PIPELINE_ROOT,
data_root=DATA_ROOT,
module_file=_trainer_module_file,
serving_model_dir=SERVING_MODEL_DIR,
metadata_path=METADATA_PATH))
INFO:absl:Generating ephemeral wheel package for '/home/jovyan/work/penguin_trainer.py' (including modules: ['penguin_trainer']).
INFO:absl:User module package has hash fingerprint version a7e2e8dccbb913b74904edeec5549d868a2ea392bcd84fbc1965aba698dce3fc.
INFO:absl:Executing: ['/opt/conda/bin/python', '/tmp/tmp_xkdipl3/_tfx_generated_setup.py', 'bdist_wheel', '--bdist-dir', '/tmp/tmpu5l6ph9u', '--dist-dir', '/tmp/tmp3oxck6hx']
INFO:absl:Successfully built user code wheel distribution at 'pipelines/penguin-simple/_wheels/tfx_user_code_Trainer-0.0+a7e2e8dccbb913b74904edeec5549d868a2ea392bcd84fbc1965aba698dce3fc-py3-none-any.whl'; target user module is 'penguin_trainer'.
INFO:absl:Full user module path is 'penguin_trainer@pipelines/penguin-simple/_wheels/tfx_user_code_Trainer-0.0+a7e2e8dccbb913b74904edeec5549d868a2ea392bcd84fbc1965aba698dce3fc-py3-none-any.whl'
INFO:absl:Using deployment config:
executor_specs {
key: "CsvExampleGen"
value {
beam_executable_spec {
python_executor_spec {
class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor"
}
}
}
}
executor_specs {
key: "Pusher"
value {
python_class_executable_spec {
class_path: "tfx.components.pusher.executor.Executor"
}
}
}
executor_specs {
key: "Trainer"
value {
python_class_executable_spec {
class_path: "tfx.components.trainer.executor.GenericExecutor"
}
}
}
custom_driver_specs {
key: "CsvExampleGen"
value {
python_class_executable_spec {
class_path: "tfx.components.example_gen.driver.FileBasedDriver"
}
}
}
metadata_connection_config {
database_connection_config {
sqlite {
filename_uri: "metadata/penguin-simple/metadata.db"
connection_mode: READWRITE_OPENCREATE
}
}
}
INFO:absl:Using connection config:
sqlite {
filename_uri: "metadata/penguin-simple/metadata.db"
connection_mode: READWRITE_OPENCREATE
}
INFO:absl:Component CsvExampleGen is running.
INFO:absl:Running launcher for node_info {
type {
name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
}
id: "CsvExampleGen"
}
contexts {
contexts {
type {
name: "pipeline"
}
name {
field_value {
string_value: "penguin-simple"
}
}
}
contexts {
type {
name: "pipeline_run"
}
name {
field_value {
string_value: "2023-08-30T16:14:43.887481"
}
}
}
contexts {
type {
name: "node"
}
name {
field_value {
string_value: "penguin-simple.CsvExampleGen"
}
}
}
}
outputs {
outputs {
key: "examples"
value {
artifact_spec {
type {
name: "Examples"
properties {
key: "span"
value: INT
}
properties {
key: "split_names"
value: STRING
}
properties {
key: "version"
value: INT
}
base_type: DATASET
}
}
}
}
}
parameters {
parameters {
key: "input_base"
value {
field_value {
string_value: "data"
}
}
}
parameters {
key: "input_config"
value {
field_value {
string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}"
}
}
}
parameters {
key: "output_config"
value {
field_value {
string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}"
}
}
}
parameters {
key: "output_data_format"
value {
field_value {
int_value: 6
}
}
}
parameters {
key: "output_file_format"
value {
field_value {
int_value: 5
}
}
}
}
downstream_nodes: "Trainer"
execution_options {
caching_options {
}
}
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:[CsvExampleGen] Resolved inputs: ({},)
INFO:absl:select span and version = (0, None)
INFO:absl:latest span and version = (0, None)
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 1
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=1, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "pipelines/penguin-simple/CsvExampleGen/examples/1"
custom_properties {
key: "input_fingerprint"
value {
string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1683279421,sum_checksum:1683279421"
}
}
custom_properties {
key: "span"
value {
int_value: 0
}
}
, artifact_type: name: "Examples"
properties {
key: "span"
value: INT
}
properties {
key: "split_names"
value: STRING
}
properties {
key: "version"
value: INT
}
base_type: DATASET
)]}), exec_properties={'output_config': '{\n "split_config": {\n "splits": [\n {\n "hash_buckets": 2,\n "name": "train"\n },\n {\n "hash_buckets": 1,\n "name": "eval"\n }\n ]\n }\n}', 'input_base': 'data', 'output_data_format': 6, 'input_config': '{\n "splits": [\n {\n "name": "single_split",\n "pattern": "*"\n }\n ]\n}', 'output_file_format': 5, 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1683279421,sum_checksum:1683279421'}, execution_output_uri='pipelines/penguin-simple/CsvExampleGen/.system/executor_execution/1/executor_output.pb', stateful_working_dir='pipelines/penguin-simple/CsvExampleGen/.system/stateful_working_dir/2023-08-30T16:14:43.887481', tmp_dir='pipelines/penguin-simple/CsvExampleGen/.system/executor_execution/1/.temp/', pipeline_node=node_info {
type {
name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
}
id: "CsvExampleGen"
}
contexts {
contexts {
type {
name: "pipeline"
}
name {
field_value {
string_value: "penguin-simple"
}
}
}
contexts {
type {
name: "pipeline_run"
}
name {
field_value {
string_value: "2023-08-30T16:14:43.887481"
}
}
}
contexts {
type {
name: "node"
}
name {
field_value {
string_value: "penguin-simple.CsvExampleGen"
}
}
}
}
outputs {
outputs {
key: "examples"
value {
artifact_spec {
type {
name: "Examples"
properties {
key: "span"
value: INT
}
properties {
key: "split_names"
value: STRING
}
properties {
key: "version"
value: INT
}
base_type: DATASET
}
}
}
}
}
parameters {
parameters {
key: "input_base"
value {
field_value {
string_value: "data"
}
}
}
parameters {
key: "input_config"
value {
field_value {
string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}"
}
}
}
parameters {
key: "output_config"
value {
field_value {
string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}"
}
}
}
parameters {
key: "output_data_format"
value {
field_value {
int_value: 6
}
}
}
parameters {
key: "output_file_format"
value {
field_value {
int_value: 5
}
}
}
}
downstream_nodes: "Trainer"
execution_options {
caching_options {
}
}
, pipeline_info=id: "penguin-simple"
, pipeline_run_id='2023-08-30T16:14:43.887481')
INFO:absl:Generating examples.
WARNING:apache_beam.runners.interactive.interactive_environment:Dependencies required for Interactive Beam PCollection visualization are not available, please use: `pip install apache-beam[interactive]` to install necessary dependencies to enable all data visualization features.
INFO:absl:Processing input csv data data/* to TFExample.
INFO:absl:Examples generated.
INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it
INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 1 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "pipelines/penguin-simple/CsvExampleGen/examples/1"
custom_properties {
key: "input_fingerprint"
value {
string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1683279421,sum_checksum:1683279421"
}
}
custom_properties {
key: "span"
value {
int_value: 0
}
}
, artifact_type: name: "Examples"
properties {
key: "span"
value: INT
}
properties {
key: "split_names"
value: STRING
}
properties {
key: "version"
value: INT
}
base_type: DATASET
)]}) for execution 1
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component CsvExampleGen is finished.
INFO:absl:Component Trainer is running.
INFO:absl:Running launcher for node_info {
type {
name: "tfx.components.trainer.component.Trainer"
base_type: TRAIN
}
id: "Trainer"
}
contexts {
contexts {
type {
name: "pipeline"
}
name {
field_value {
string_value: "penguin-simple"
}
}
}
contexts {
type {
name: "pipeline_run"
}
name {
field_value {
string_value: "2023-08-30T16:14:43.887481"
}
}
}
contexts {
type {
name: "node"
}
name {
field_value {
string_value: "penguin-simple.Trainer"
}
}
}
}
inputs {
inputs {
key: "examples"
value {
channels {
producer_node_query {
id: "CsvExampleGen"
}
context_queries {
type {
name: "pipeline"
}
name {
field_value {
string_value: "penguin-simple"
}
}
}
context_queries {
type {
name: "pipeline_run"
}
name {
field_value {
string_value: "2023-08-30T16:14:43.887481"
}
}
}
context_queries {
type {
name: "node"
}
name {
field_value {
string_value: "penguin-simple.CsvExampleGen"
}
}
}
artifact_query {
type {
name: "Examples"
base_type: DATASET
}
}
output_key: "examples"
}
min_count: 1
}
}
}
outputs {
outputs {
key: "model"
value {
artifact_spec {
type {
name: "Model"
base_type: MODEL
}
}
}
}
outputs {
key: "model_run"
value {
artifact_spec {
type {
name: "ModelRun"
}
}
}
}
}
parameters {
parameters {
key: "custom_config"
value {
field_value {
string_value: "null"
}
}
}
parameters {
key: "eval_args"
value {
field_value {
string_value: "{\n \"num_steps\": 5\n}"
}
}
}
parameters {
key: "module_path"
value {
field_value {
string_value: "penguin_trainer@pipelines/penguin-simple/_wheels/tfx_user_code_Trainer-0.0+a7e2e8dccbb913b74904edeec5549d868a2ea392bcd84fbc1965aba698dce3fc-py3-none-any.whl"
}
}
}
parameters {
key: "train_args"
value {
field_value {
string_value: "{\n \"num_steps\": 100\n}"
}
}
}
}
upstream_nodes: "CsvExampleGen"
downstream_nodes: "Pusher"
execution_options {
caching_options {
}
}
INFO:absl:MetadataStore with DB connection initialized
WARNING:absl:ArtifactQuery.property_predicate is not supported.
INFO:absl:[Trainer] Resolved inputs: ({'examples': [Artifact(artifact: id: 1
type_id: 15
uri: "pipelines/penguin-simple/CsvExampleGen/examples/1"
properties {
key: "split_names"
value {
string_value: "[\"train\", \"eval\"]"
}
}
custom_properties {
key: "file_format"
value {
string_value: "tfrecords_gzip"
}
}
custom_properties {
key: "input_fingerprint"
value {
string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1683279421,sum_checksum:1683279421"
}
}
custom_properties {
key: "is_external"
value {
int_value: 0
}
}
custom_properties {
key: "payload_format"
value {
string_value: "FORMAT_TF_EXAMPLE"
}
}
custom_properties {
key: "span"
value {
int_value: 0
}
}
custom_properties {
key: "state"
value {
string_value: "published"
}
}
custom_properties {
key: "tfx_version"
value {
string_value: "1.12.0"
}
}
state: LIVE
create_time_since_epoch: 1693412088158
last_update_time_since_epoch: 1693412088158
, artifact_type: id: 15
name: "Examples"
properties {
key: "span"
value: INT
}
properties {
key: "split_names"
value: STRING
}
properties {
key: "version"
value: INT
}
base_type: DATASET
)]},)
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 2
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=2, input_dict={'examples': [Artifact(artifact: id: 1
type_id: 15
uri: "pipelines/penguin-simple/CsvExampleGen/examples/1"
properties {
key: "split_names"
value {
string_value: "[\"train\", \"eval\"]"
}
}
custom_properties {
key: "file_format"
value {
string_value: "tfrecords_gzip"
}
}
custom_properties {
key: "input_fingerprint"
value {
string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1683279421,sum_checksum:1683279421"
}
}
custom_properties {
key: "is_external"
value {
int_value: 0
}
}
custom_properties {
key: "payload_format"
value {
string_value: "FORMAT_TF_EXAMPLE"
}
}
custom_properties {
key: "span"
value {
int_value: 0
}
}
custom_properties {
key: "state"
value {
string_value: "published"
}
}
custom_properties {
key: "tfx_version"
value {
string_value: "1.12.0"
}
}
state: LIVE
create_time_since_epoch: 1693412088158
last_update_time_since_epoch: 1693412088158
, artifact_type: id: 15
name: "Examples"
properties {
key: "span"
value: INT
}
properties {
key: "split_names"
value: STRING
}
properties {
key: "version"
value: INT
}
base_type: DATASET
)]}, output_dict=defaultdict(<class 'list'>, {'model_run': [Artifact(artifact: uri: "pipelines/penguin-simple/Trainer/model_run/2"
, artifact_type: name: "ModelRun"
)], 'model': [Artifact(artifact: uri: "pipelines/penguin-simple/Trainer/model/2"
, artifact_type: name: "Model"
base_type: MODEL
)]}), exec_properties={'eval_args': '{\n "num_steps": 5\n}', 'module_path': 'penguin_trainer@pipelines/penguin-simple/_wheels/tfx_user_code_Trainer-0.0+a7e2e8dccbb913b74904edeec5549d868a2ea392bcd84fbc1965aba698dce3fc-py3-none-any.whl', 'custom_config': 'null', 'train_args': '{\n "num_steps": 100\n}'}, execution_output_uri='pipelines/penguin-simple/Trainer/.system/executor_execution/2/executor_output.pb', stateful_working_dir='pipelines/penguin-simple/Trainer/.system/stateful_working_dir/2023-08-30T16:14:43.887481', tmp_dir='pipelines/penguin-simple/Trainer/.system/executor_execution/2/.temp/', pipeline_node=node_info {
type {
name: "tfx.components.trainer.component.Trainer"
base_type: TRAIN
}
id: "Trainer"
}
contexts {
contexts {
type {
name: "pipeline"
}
name {
field_value {
string_value: "penguin-simple"
}
}
}
contexts {
type {
name: "pipeline_run"
}
name {
field_value {
string_value: "2023-08-30T16:14:43.887481"
}
}
}
contexts {
type {
name: "node"
}
name {
field_value {
string_value: "penguin-simple.Trainer"
}
}
}
}
inputs {
inputs {
key: "examples"
value {
channels {
producer_node_query {
id: "CsvExampleGen"
}
context_queries {
type {
name: "pipeline"
}
name {
field_value {
string_value: "penguin-simple"
}
}
}
context_queries {
type {
name: "pipeline_run"
}
name {
field_value {
string_value: "2023-08-30T16:14:43.887481"
}
}
}
context_queries {
type {
name: "node"
}
name {
field_value {
string_value: "penguin-simple.CsvExampleGen"
}
}
}
artifact_query {
type {
name: "Examples"
base_type: DATASET
}
}
output_key: "examples"
}
min_count: 1
}
}
}
outputs {
outputs {
key: "model"
value {
artifact_spec {
type {
name: "Model"
base_type: MODEL
}
}
}
}
outputs {
key: "model_run"
value {
artifact_spec {
type {
name: "ModelRun"
}
}
}
}
}
parameters {
parameters {
key: "custom_config"
value {
field_value {
string_value: "null"
}
}
}
parameters {
key: "eval_args"
value {
field_value {
string_value: "{\n \"num_steps\": 5\n}"
}
}
}
parameters {
key: "module_path"
value {
field_value {
string_value: "penguin_trainer@pipelines/penguin-simple/_wheels/tfx_user_code_Trainer-0.0+a7e2e8dccbb913b74904edeec5549d868a2ea392bcd84fbc1965aba698dce3fc-py3-none-any.whl"
}
}
}
parameters {
key: "train_args"
value {
field_value {
string_value: "{\n \"num_steps\": 100\n}"
}
}
}
}
upstream_nodes: "CsvExampleGen"
downstream_nodes: "Pusher"
execution_options {
caching_options {
}
}
, pipeline_info=id: "penguin-simple"
, pipeline_run_id='2023-08-30T16:14:43.887481')
INFO:absl:Train on the 'train' split when train_args.splits is not set.
INFO:absl:Evaluate on the 'eval' split when eval_args.splits is not set.
INFO:absl:udf_utils.get_fn {'eval_args': '{\n "num_steps": 5\n}', 'module_path': 'penguin_trainer@pipelines/penguin-simple/_wheels/tfx_user_code_Trainer-0.0+a7e2e8dccbb913b74904edeec5549d868a2ea392bcd84fbc1965aba698dce3fc-py3-none-any.whl', 'custom_config': 'null', 'train_args': '{\n "num_steps": 100\n}'} 'run_fn'
INFO:absl:Installing 'pipelines/penguin-simple/_wheels/tfx_user_code_Trainer-0.0+a7e2e8dccbb913b74904edeec5549d868a2ea392bcd84fbc1965aba698dce3fc-py3-none-any.whl' to a temporary directory.
INFO:absl:Executing: ['/opt/conda/bin/python', '-m', 'pip', 'install', '--target', '/tmp/tmpsws5ijsb', 'pipelines/penguin-simple/_wheels/tfx_user_code_Trainer-0.0+a7e2e8dccbb913b74904edeec5549d868a2ea392bcd84fbc1965aba698dce3fc-py3-none-any.whl']
INFO:absl:Successfully installed 'pipelines/penguin-simple/_wheels/tfx_user_code_Trainer-0.0+a7e2e8dccbb913b74904edeec5549d868a2ea392bcd84fbc1965aba698dce3fc-py3-none-any.whl'.
INFO:absl:Training model.
INFO:absl:Feature body_mass_g has a shape dim {
size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_depth_mm has a shape dim {
size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_length_mm has a shape dim {
size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature flipper_length_mm has a shape dim {
size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature species has a shape dim {
size: 1
}
. Setting to DenseTensor.
WARNING:tensorflow:From /opt/conda/lib/python3.7/site-packages/tensorflow/python/autograph/pyct/static_analysis/liveness.py:83: Analyzer.lamba_check (from tensorflow.python.autograph.pyct.static_analysis.liveness) is deprecated and will be removed after 2023-09-23.
Instructions for updating:
Lambda fuctions will be no more assumed to be used in the statement where they are used, or at least in the same block. https://github.com/tensorflow/tensorflow/issues/56089
WARNING:tensorflow:From /opt/conda/lib/python3.7/site-packages/tensorflow/python/autograph/pyct/static_analysis/liveness.py:83: Analyzer.lamba_check (from tensorflow.python.autograph.pyct.static_analysis.liveness) is deprecated and will be removed after 2023-09-23.
Instructions for updating:
Lambda fuctions will be no more assumed to be used in the statement where they are used, or at least in the same block. https://github.com/tensorflow/tensorflow/issues/56089
INFO:absl:Feature body_mass_g has a shape dim {
size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_depth_mm has a shape dim {
size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_length_mm has a shape dim {
size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature flipper_length_mm has a shape dim {
size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature species has a shape dim {
size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature body_mass_g has a shape dim {
size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_depth_mm has a shape dim {
size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_length_mm has a shape dim {
size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature flipper_length_mm has a shape dim {
size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature species has a shape dim {
size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature body_mass_g has a shape dim {
size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_depth_mm has a shape dim {
size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_length_mm has a shape dim {
size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature flipper_length_mm has a shape dim {
size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature species has a shape dim {
size: 1
}
. Setting to DenseTensor.
INFO:absl:Model: "model"
INFO:absl:__________________________________________________________________________________________________
INFO:absl: Layer (type) Output Shape Param # Connected to
INFO:absl:==================================================================================================
INFO:absl: culmen_length_mm (InputLayer) [(None, 1)] 0 []
INFO:absl:
INFO:absl: culmen_depth_mm (InputLayer) [(None, 1)] 0 []
INFO:absl:
INFO:absl: flipper_length_mm (InputLayer) [(None, 1)] 0 []
INFO:absl:
INFO:absl: body_mass_g (InputLayer) [(None, 1)] 0 []
INFO:absl:
INFO:absl: concatenate (Concatenate) (None, 4) 0 ['culmen_length_mm[0][0]',
INFO:absl: 'culmen_depth_mm[0][0]',
INFO:absl: 'flipper_length_mm[0][0]',
INFO:absl: 'body_mass_g[0][0]']
INFO:absl:
INFO:absl: dense (Dense) (None, 8) 40 ['concatenate[0][0]']
INFO:absl:
INFO:absl: dense_1 (Dense) (None, 8) 72 ['dense[0][0]']
INFO:absl:
INFO:absl: dense_2 (Dense) (None, 3) 27 ['dense_1[0][0]']
INFO:absl:
INFO:absl:==================================================================================================
INFO:absl:Total params: 139
INFO:absl:Trainable params: 139
INFO:absl:Non-trainable params: 0
INFO:absl:__________________________________________________________________________________________________
100/100 [==============================] - 1s 3ms/step - loss: 0.4040 - sparse_categorical_accuracy: 0.8745 - val_loss: 0.1062 - val_sparse_categorical_accuracy: 0.9800
WARNING:absl:Found untraced functions such as _update_step_xla while saving (showing 1 of 1). These functions will not be directly callable after loading.
INFO:tensorflow:Assets written to: pipelines/penguin-simple/Trainer/model/2/Format-Serving/assets
INFO:tensorflow:Assets written to: pipelines/penguin-simple/Trainer/model/2/Format-Serving/assets
INFO:absl:Training complete. Model written to pipelines/penguin-simple/Trainer/model/2/Format-Serving. ModelRun written to pipelines/penguin-simple/Trainer/model_run/2
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 2 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'model_run': [Artifact(artifact: uri: "pipelines/penguin-simple/Trainer/model_run/2"
, artifact_type: name: "ModelRun"
)], 'model': [Artifact(artifact: uri: "pipelines/penguin-simple/Trainer/model/2"
, artifact_type: name: "Model"
base_type: MODEL
)]}) for execution 2
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component Trainer is finished.
INFO:absl:Component Pusher is running.
INFO:absl:Running launcher for node_info {
type {
name: "tfx.components.pusher.component.Pusher"
base_type: DEPLOY
}
id: "Pusher"
}
contexts {
contexts {
type {
name: "pipeline"
}
name {
field_value {
string_value: "penguin-simple"
}
}
}
contexts {
type {
name: "pipeline_run"
}
name {
field_value {
string_value: "2023-08-30T16:14:43.887481"
}
}
}
contexts {
type {
name: "node"
}
name {
field_value {
string_value: "penguin-simple.Pusher"
}
}
}
}
inputs {
inputs {
key: "model"
value {
channels {
producer_node_query {
id: "Trainer"
}
context_queries {
type {
name: "pipeline"
}
name {
field_value {
string_value: "penguin-simple"
}
}
}
context_queries {
type {
name: "pipeline_run"
}
name {
field_value {
string_value: "2023-08-30T16:14:43.887481"
}
}
}
context_queries {
type {
name: "node"
}
name {
field_value {
string_value: "penguin-simple.Trainer"
}
}
}
artifact_query {
type {
name: "Model"
base_type: MODEL
}
}
output_key: "model"
}
}
}
}
outputs {
outputs {
key: "pushed_model"
value {
artifact_spec {
type {
name: "PushedModel"
base_type: MODEL
}
}
}
}
}
parameters {
parameters {
key: "custom_config"
value {
field_value {
string_value: "null"
}
}
}
parameters {
key: "push_destination"
value {
field_value {
string_value: "{\n \"filesystem\": {\n \"base_directory\": \"serving_model/penguin-simple\"\n }\n}"
}
}
}
}
upstream_nodes: "Trainer"
execution_options {
caching_options {
}
}
INFO:absl:MetadataStore with DB connection initialized
WARNING:absl:ArtifactQuery.property_predicate is not supported.
INFO:absl:[Pusher] Resolved inputs: ({'model': [Artifact(artifact: id: 3
type_id: 18
uri: "pipelines/penguin-simple/Trainer/model/2"
custom_properties {
key: "is_external"
value {
int_value: 0
}
}
custom_properties {
key: "state"
value {
string_value: "published"
}
}
custom_properties {
key: "tfx_version"
value {
string_value: "1.12.0"
}
}
state: LIVE
create_time_since_epoch: 1693412094684
last_update_time_since_epoch: 1693412094684
, artifact_type: id: 18
name: "Model"
base_type: MODEL
)]},)
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 3
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=3, input_dict={'model': [Artifact(artifact: id: 3
type_id: 18
uri: "pipelines/penguin-simple/Trainer/model/2"
custom_properties {
key: "is_external"
value {
int_value: 0
}
}
custom_properties {
key: "state"
value {
string_value: "published"
}
}
custom_properties {
key: "tfx_version"
value {
string_value: "1.12.0"
}
}
state: LIVE
create_time_since_epoch: 1693412094684
last_update_time_since_epoch: 1693412094684
, artifact_type: id: 18
name: "Model"
base_type: MODEL
)]}, output_dict=defaultdict(<class 'list'>, {'pushed_model': [Artifact(artifact: uri: "pipelines/penguin-simple/Pusher/pushed_model/3"
, artifact_type: name: "PushedModel"
base_type: MODEL
)]}), exec_properties={'custom_config': 'null', 'push_destination': '{\n "filesystem": {\n "base_directory": "serving_model/penguin-simple"\n }\n}'}, execution_output_uri='pipelines/penguin-simple/Pusher/.system/executor_execution/3/executor_output.pb', stateful_working_dir='pipelines/penguin-simple/Pusher/.system/stateful_working_dir/2023-08-30T16:14:43.887481', tmp_dir='pipelines/penguin-simple/Pusher/.system/executor_execution/3/.temp/', pipeline_node=node_info {
type {
name: "tfx.components.pusher.component.Pusher"
base_type: DEPLOY
}
id: "Pusher"
}
contexts {
contexts {
type {
name: "pipeline"
}
name {
field_value {
string_value: "penguin-simple"
}
}
}
contexts {
type {
name: "pipeline_run"
}
name {
field_value {
string_value: "2023-08-30T16:14:43.887481"
}
}
}
contexts {
type {
name: "node"
}
name {
field_value {
string_value: "penguin-simple.Pusher"
}
}
}
}
inputs {
inputs {
key: "model"
value {
channels {
producer_node_query {
id: "Trainer"
}
context_queries {
type {
name: "pipeline"
}
name {
field_value {
string_value: "penguin-simple"
}
}
}
context_queries {
type {
name: "pipeline_run"
}
name {
field_value {
string_value: "2023-08-30T16:14:43.887481"
}
}
}
context_queries {
type {
name: "node"
}
name {
field_value {
string_value: "penguin-simple.Trainer"
}
}
}
artifact_query {
type {
name: "Model"
base_type: MODEL
}
}
output_key: "model"
}
}
}
}
outputs {
outputs {
key: "pushed_model"
value {
artifact_spec {
type {
name: "PushedModel"
base_type: MODEL
}
}
}
}
}
parameters {
parameters {
key: "custom_config"
value {
field_value {
string_value: "null"
}
}
}
parameters {
key: "push_destination"
value {
field_value {
string_value: "{\n \"filesystem\": {\n \"base_directory\": \"serving_model/penguin-simple\"\n }\n}"
}
}
}
}
upstream_nodes: "Trainer"
execution_options {
caching_options {
}
}
, pipeline_info=id: "penguin-simple"
, pipeline_run_id='2023-08-30T16:14:43.887481')
WARNING:absl:Pusher is going to push the model without validation. Consider using Evaluator or InfraValidator in your pipeline.
INFO:absl:Model version: 1693412096
INFO:absl:Model written to serving path serving_model/penguin-simple/1693412096.
INFO:absl:Model pushed to pipelines/penguin-simple/Pusher/pushed_model/3.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 3 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'pushed_model': [Artifact(artifact: uri: "pipelines/penguin-simple/Pusher/pushed_model/3"
, artifact_type: name: "PushedModel"
base_type: MODEL
)]}) for execution 3
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component Pusher is finished.
You should see “INFO:absl:Component Pusher is finished.” at the end of the
logs if the pipeline finished successfully. Because Pusher
component is the
last component of the pipeline. You can also see that the metadata
(metadata store), pipelines
(pipeline root), and serving_model
directories have been populated with the metadata, artifacts, and models that the pipeline generates.
Now that you’ve ran this simple pipeline, you will see in the next sections how you can modify it to use custom components.
Let’s say you want to modify the pipeline above to filter the data first before running the trainer. Without using a TFX component, you would run something like the code below. This will just get the rows where the culmen_length_mm
feature is greater than 0.3
.
# search directory for one or more CSVs
files = glob.glob(f'{DATA_ROOT}/*.csv')
# filter the dataset
for file in files:
df = pd.read_csv(file, index_col=False)
filtered_df = df[df['culmen_length_mm'] > 0.3].reset_index(drop=True)
# print latest modified file
filtered_df
.dataframe tbody tr th {
vertical-align: top;
}
.dataframe thead th {
text-align: right;
}
species | culmen_length_mm | culmen_depth_mm | flipper_length_mm | body_mass_g | |
---|---|---|---|---|---|
0 | 0 | 0.327273 | 0.535714 | 0.169492 | 0.138889 |
1 | 0 | 0.378182 | 0.904762 | 0.423729 | 0.500000 |
2 | 0 | 0.505455 | 1.000000 | 0.372881 | 0.416667 |
3 | 0 | 0.309091 | 0.654762 | 0.186441 | 0.236111 |
4 | 0 | 0.305455 | 0.571429 | 0.254237 | 0.138889 |
... | ... | ... | ... | ... | ... |
227 | 2 | 0.549091 | 0.071429 | 0.711864 | 0.618056 |
228 | 2 | 0.534545 | 0.142857 | 0.728814 | 0.597222 |
229 | 2 | 0.665455 | 0.309524 | 0.847458 | 0.847222 |
230 | 2 | 0.476364 | 0.202381 | 0.677966 | 0.694444 |
231 | 2 | 0.647273 | 0.357143 | 0.694915 | 0.750000 |
232 rows × 5 columns
You can save the dataset above to a different directory and point the TFX pipeline to it. That definitely works but you can include this code in a custom component as well so it’s part of the TFX pipeline.
As mentioned in the lectures, a TFX component has a driver, executor, and publisher. The driver and publisher interacts with the metadata store while the executor runs the actual processing. More often than not, you only need to modify the executor and that’s what you’ll be doing in the next sections.
TFX provides a way to define an executor by just using a component
decorator on your function. If you’ve done the Kubeflow Pipelines ungraded lab earlier this week, this will look very familiar. It uses the same concepts such as defining the inputs and outputs using type annotations then using those parameters in the function body.
The component below basically uses the same filtering code you saw earlier but wraps it in a Python component. It defines two input parameters and outputs a dictionary with a String artifact (see function docstring for description). Because this component uses the same publisher as a standard TFX component, the output artifact will be saved in the metadata store as you’ll see later.
#import artifact type that you will use
from tfx.types.standard_artifacts import String
# import the decorator
from tfx.dsl.component.experimental.decorators import component
# import type annotations
from tfx.dsl.component.experimental.annotations import InputArtifact, OutputArtifact, Parameter, OutputDict
@component
def CustomFilterComponent(input_base: Parameter[str],
output_base: Parameter[str],
) -> OutputDict(output_path=str):
'''
Args:
input_base - location of the raw CSV
output_base - location where you want to save the filtered CSV
Returns:
OutputDict:
output_path - String artifact that just holds the `output_base` value
'''
import pandas as pd
import glob
import os
# create the output base if it does not exist yet
if not os.path.exists(output_base):
os.mkdir(output_base)
# search for CSVs in the input base
files = glob.glob(f'{input_base}/*.csv')
# loop through CSVs
for file in files:
# read the CSV
df = pd.read_csv(file, index_col=False)
# filter the data
filtered_df = df[df['culmen_length_mm'] > 0.3].reset_index(drop=True)
# compose output filename
filename = os.path.basename(file).replace('.csv','')
filtered_filename = f'{filename}_filtered.csv'
# save filtered CSV to output base
filtered_df.to_csv(f'{output_base}/{filtered_filename}', index=False)
# define the output artifact
return {'output_path': output_base}
You can now run your newly built component. You will just run a single node pipeline to prove that it works.
# define a filter task
filter_task = CustomFilterComponent(input_base=DATA_ROOT,
output_base=f'{DATA_ROOT}_filtered')
# include the task
components = [filter_task]
# define a pipeline with only the single component
pipeline = tfx.dsl.Pipeline(
pipeline_name=PIPELINE_NAME,
pipeline_root=PIPELINE_ROOT,
metadata_connection_config=tfx.orchestration.metadata
.sqlite_metadata_connection_config(METADATA_PATH),
components=components)
# run the pipeline
tfx.orchestration.LocalDagRunner().run(pipeline)
INFO:absl:Using deployment config:
executor_specs {
key: "CustomFilterComponent"
value {
python_class_executable_spec {
class_path: "__main__.CustomFilterComponent_Executor"
}
}
}
metadata_connection_config {
database_connection_config {
sqlite {
filename_uri: "metadata/penguin-simple/metadata.db"
connection_mode: READWRITE_OPENCREATE
}
}
}
INFO:absl:Using connection config:
sqlite {
filename_uri: "metadata/penguin-simple/metadata.db"
connection_mode: READWRITE_OPENCREATE
}
INFO:absl:Component CustomFilterComponent is running.
INFO:absl:Running launcher for node_info {
type {
name: "__main__.CustomFilterComponent"
}
id: "CustomFilterComponent"
}
contexts {
contexts {
type {
name: "pipeline"
}
name {
field_value {
string_value: "penguin-simple"
}
}
}
contexts {
type {
name: "pipeline_run"
}
name {
field_value {
string_value: "2023-08-30T16:14:56.923046"
}
}
}
contexts {
type {
name: "node"
}
name {
field_value {
string_value: "penguin-simple.CustomFilterComponent"
}
}
}
}
outputs {
outputs {
key: "output_path"
value {
artifact_spec {
type {
name: "String"
}
}
}
}
}
parameters {
parameters {
key: "input_base"
value {
field_value {
string_value: "data"
}
}
}
parameters {
key: "output_base"
value {
field_value {
string_value: "data_filtered"
}
}
}
}
execution_options {
caching_options {
}
}
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:[CustomFilterComponent] Resolved inputs: ({},)
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 4
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=4, input_dict={}, output_dict=defaultdict(<class 'list'>, {'output_path': [Artifact(artifact: uri: "pipelines/penguin-simple/CustomFilterComponent/output_path/4/value"
, artifact_type: name: "String"
)]}), exec_properties={'output_base': 'data_filtered', 'input_base': 'data'}, execution_output_uri='pipelines/penguin-simple/CustomFilterComponent/.system/executor_execution/4/executor_output.pb', stateful_working_dir='pipelines/penguin-simple/CustomFilterComponent/.system/stateful_working_dir/2023-08-30T16:14:56.923046', tmp_dir='pipelines/penguin-simple/CustomFilterComponent/.system/executor_execution/4/.temp/', pipeline_node=node_info {
type {
name: "__main__.CustomFilterComponent"
}
id: "CustomFilterComponent"
}
contexts {
contexts {
type {
name: "pipeline"
}
name {
field_value {
string_value: "penguin-simple"
}
}
}
contexts {
type {
name: "pipeline_run"
}
name {
field_value {
string_value: "2023-08-30T16:14:56.923046"
}
}
}
contexts {
type {
name: "node"
}
name {
field_value {
string_value: "penguin-simple.CustomFilterComponent"
}
}
}
}
outputs {
outputs {
key: "output_path"
value {
artifact_spec {
type {
name: "String"
}
}
}
}
}
parameters {
parameters {
key: "input_base"
value {
field_value {
string_value: "data"
}
}
}
parameters {
key: "output_base"
value {
field_value {
string_value: "data_filtered"
}
}
}
}
execution_options {
caching_options {
}
}
, pipeline_info=id: "penguin-simple"
, pipeline_run_id='2023-08-30T16:14:56.923046')
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 4 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'output_path': [Artifact(artifact: uri: "pipelines/penguin-simple/CustomFilterComponent/output_path/4/value"
, artifact_type: name: "String"
)]}) for execution 4
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component CustomFilterComponent is finished.
You should now see the filtered CSV in the data_filtered
folder in the file explorer. As expected, you can also see that it has less lines than the original because of the filtering.
# number of rows in original csv
!cat data/data.csv | wc -l
# number of rows in filtered csv
!cat data_filtered/data_filtered.csv | wc -l
335
233
If you navigate to pipelines/penguin-simple/CustomFilterComponent/output_path/
, you will also see a directory with the run id of the pipeline (e.g. 4
). If you click on the value
file beneath it, you’ll see the string value you saved which is just the output_base
(i.e. data_filtered
). You will want to feed this value to the CsvExampleGen
component and that’s what you’ll do next.
If we try to use our new component with CsvExampleGen, you will encounter an error as shown below:
# Define filter task
filter_task = CustomFilterComponent(input_base=DATA_ROOT,
output_base=f'{DATA_ROOT}_filtered')
# Try using the custom component with CsvExampleGen. This code will expectedly throw an error.
try:
example_gen = tfx.components.CsvExampleGen(input_base=filter_task.outputs['output_path'])
except Exception as e:
print("Error thrown as expected!")
print(e)
Error thrown as expected!
Expected type <class 'str'> for parameter 'input_base' but got OutputChannel(artifact_type=String, producer_component_id=CustomFilterComponent, output_key=output_path, additional_properties={}, additional_custom_properties={}) instead.
As you can see, the output of our custom component cannot be accepted into CsvExampleGen
because it is configured to accept a primitive string. TFX components generate and consume Channel
objects and that’s what our custom component outputs. For that, we need to build a custom data ingestion component that reuses the CsvExampleGen
code but accepts a Channel
. You will do that in the following sections.
TFX is open source so the code for standard components can easily be found the public repo. We placed links in the following sections of the actual files that you’ll be modifying/overriding in case you want to compare what was changed for your custom ExampleGen.
The class heirarchy for these components is pretty deep but in summary, you will only need to modify three:
First, you will need to modify the Component Spec. This file describes the parameters, inputs, and outputs of the standard components. Parameters are values supplied at runtime while inputs and outputs are values read from the metadata store. The original for ExampleGen is found here.
You will need to implement the same for our custom ExampleGen but it should accept a Channel
parameter The revised version is shown below. Notice that we revised the INPUTS
dictionary to have a ChannelParameter
whereas in the original, all are just in the PARAMETERS
dictionary.
from tfx.types.component_spec import ChannelParameter
from tfx.types.component_spec import ExecutionParameter
from tfx.types.component_spec import ComponentSpec
from tfx.types import standard_artifacts
from tfx.proto import example_gen_pb2
from tfx.proto import range_config_pb2
# Key for example_gen input that we want to use
INPUT_BASE_KEY = 'input_base'
# Other keys
INPUT_CONFIG_KEY = 'input_config'
OUTPUT_CONFIG_KEY = 'output_config'
OUTPUT_DATA_FORMAT_KEY = 'output_data_format'
RANGE_CONFIG_KEY = 'range_config'
CUSTOM_CONFIG_KEY = 'custom_config'
EXAMPLES_KEY = 'examples'
class MyCustomExampleGenSpec(ComponentSpec):
"""File-based ExampleGen component spec."""
PARAMETERS = {
INPUT_CONFIG_KEY:
ExecutionParameter(type=example_gen_pb2.Input),
OUTPUT_CONFIG_KEY:
ExecutionParameter(type=example_gen_pb2.Output),
OUTPUT_DATA_FORMAT_KEY:
ExecutionParameter(type=int),
CUSTOM_CONFIG_KEY:
ExecutionParameter(type=example_gen_pb2.CustomConfig, optional=True),
RANGE_CONFIG_KEY:
ExecutionParameter(type=range_config_pb2.RangeConfig, optional=True),
}
# Now accepts a channel
INPUTS = {
INPUT_BASE_KEY:
ChannelParameter(type=standard_artifacts.String),
}
OUTPUTS = {
EXAMPLES_KEY: ChannelParameter(type=standard_artifacts.Examples),
}
With that, you should now modify the executor code to take note of this change of input types. Instead of looking at just the parameters, it should also look into Channel inputs passed onto the component.
Executor classes are executed by TFX starting with the Do
function so you will need to modify that. The original Executor for CsvExampleGen
can be found here and it inherits the base class here. The base class includes the Do()
function and that’s what you’ll be overriding in your new custom executor below.
Basically, you’re using all the functions defined in the standard component but you’re modifying it so it can find the input_base
value from the Channel inputs.
Take note that this tutorial prioritizes code brevity. In your projects, you may take a different approach such as modifying the _CsvToExample
code to look for the string value in the input_dict
instead of exec_properties
. Doing that here will result in very long code blocks so the shorter approach is taken.
from typing import Any, Dict, Iterable, List, Text, Union
from tfx.components.example_gen.csv_example_gen.executor import Executor as CsvExampleGenExecutor
from tfx.types import standard_component_specs
from tfx.types import artifact_utils
from tfx import types
class MyCustomExecutor(CsvExampleGenExecutor):
"""Generic TFX CSV example gen executor."""
def Do(
self,
input_dict: Dict[Text, List[types.Artifact]],
output_dict: Dict[Text, List[types.Artifact]],
exec_properties: Dict[Text, Any],
) -> None:
"""Take input data source and generates serialized data splits.
The output is intended to be serialized tf.train.Examples or
tf.train.SequenceExamples protocol buffer in gzipped TFRecord format,
but subclasses can choose to override to write to any serialized records
payload into gzipped TFRecord as specified, so long as downstream
component can consume it. The format of payload is added to
`payload_format` custom property of the output Example artifact.
Args:
input_dict: Input dict from input key to a list of Artifacts. Depends on
detailed example gen implementation.
- input_base: an external directory containing the data files.
output_dict: Output dict from output key to a list of Artifacts.
- examples: splits of serialized records.
exec_properties: A dict of execution properties. Depends on detailed
example gen implementation.
- input_config: JSON string of example_gen_pb2.Input instance,
providing input configuration.
- output_config: JSON string of example_gen_pb2.Output instance,
providing output configuration.
- output_data_format: Payload format of generated data in output
artifact, one of example_gen_pb2.PayloadFormat enum.
Returns:
None
"""
self._log_startup(input_dict, output_dict, exec_properties)
# Get the artifact from the Channel input
filter_component_artifact = artifact_utils.get_single_instance(
input_dict[standard_component_specs.INPUT_BASE_KEY])
# Put the input string value into the exec_properties fictionary
exec_properties[standard_component_specs.INPUT_BASE_KEY] = filter_component_artifact.value
# execute superclass
super(MyCustomExecutor, self).Do(input_dict=input_dict, output_dict=output_dict, exec_properties=exec_properties)
Lastly, you will need to put everything together in a class. This will be the one you instantiate so you can run the component later. For comparison, the original CsvExampleGen
component class is found here and it inherits the FileBasedExampleGen
from here. The revised version is shown below:
from typing import Any, Dict, Optional, Text, Union
from tfx.dsl.components.base import base_beam_component
from tfx.dsl.components.base import executor_spec
from tfx.proto import example_gen_pb2
from tfx.proto import range_config_pb2
from tfx import types
from tfx.components.example_gen import utils
class MyCustomExampleGen(base_beam_component.BaseBeamComponent):
# Define the Spec class and executor spec using the functions and
# classes you defined earlier.
SPEC_CLASS = MyCustomExampleGenSpec
EXECUTOR_SPEC = executor_spec.BeamExecutorSpec(MyCustomExecutor)
# Define init function. Notice that `input_base` now accepts a Channel.
def __init__(self,
input_base: types.Channel = None,
input_config: Optional[Union[example_gen_pb2.Input,
Dict[Text, Any]]] = None,
output_config: Optional[Union[example_gen_pb2.Output,
Dict[Text, Any]]] = None,
range_config: Optional[Union[range_config_pb2.RangeConfig,
Dict[Text, Any]]] = None,
output_data_format: Optional[int] = example_gen_pb2.FORMAT_TF_EXAMPLE):
"""Customized ExampleGen component.
Args:
input_base: an external directory containing the CSV files. Accepts a Channel
from a previous TFX component.
input_config: An example_gen_pb2.Input instance, providing input
configuration. If unset, the files under input_base will be treated as a
single split. If any field is provided as a RuntimeParameter,
input_config should be constructed as a dict with the same field names
as Input proto message.
output_config: An example_gen_pb2.Output instance, providing output
configuration. If unset, default splits will be 'train' and 'eval' with
size 2:1. If any field is provided as a RuntimeParameter, output_config
should be constructed as a dict with the same field names as Output
proto message.
range_config: An optional range_config_pb2.RangeConfig instance,
specifying the range of span values to consider. If unset, driver will
default to searching for latest span with no restrictions.
"""
# Configure inputs and outputs.
input_config = input_config or utils.make_default_input_config()
output_config = output_config or utils.make_default_output_config(
input_config)
# Define output type.
example_artifacts = types.Channel(type=standard_artifacts.Examples)
# Pass input arguments to your custom ExampleGen spec.
spec = MyCustomExampleGenSpec(
input_base=input_base,
input_config=input_config,
output_config=output_config,
range_config=range_config,
output_data_format=output_data_format,
examples=example_artifacts)
# This will check if the values passed are the correct type else
# it will throw the error you saw earlier.
super(MyCustomExampleGen, self).__init__(
spec=spec)
You can now use the custom component (MyCustomExampleGen
) in your code as shown below. It will no longer get an error because you reconfigured the input_base
to accept a channel.
# Filter the dataset
filter_task = CustomFilterComponent(input_base=DATA_ROOT,
output_base=f'{DATA_ROOT}_filtered')
# Use the output of filter_task to know the input_base for this custom ExampleGen
custom_example_gen_task = MyCustomExampleGen(input_base=filter_task.outputs['output_path'])
# Define components to include
components = [filter_task,
custom_example_gen_task]
# Create the pipeline
pipeline = tfx.dsl.Pipeline(
pipeline_name=PIPELINE_NAME,
pipeline_root=PIPELINE_ROOT,
metadata_connection_config=tfx.orchestration.metadata
.sqlite_metadata_connection_config(METADATA_PATH),
components=components)
# Run the pipeline
tfx.orchestration.LocalDagRunner().run(pipeline)
INFO:absl:Using deployment config:
executor_specs {
key: "CustomFilterComponent"
value {
python_class_executable_spec {
class_path: "__main__.CustomFilterComponent_Executor"
}
}
}
executor_specs {
key: "MyCustomExampleGen"
value {
beam_executable_spec {
python_executor_spec {
class_path: "__main__.MyCustomExecutor"
}
}
}
}
metadata_connection_config {
database_connection_config {
sqlite {
filename_uri: "metadata/penguin-simple/metadata.db"
connection_mode: READWRITE_OPENCREATE
}
}
}
INFO:absl:Using connection config:
sqlite {
filename_uri: "metadata/penguin-simple/metadata.db"
connection_mode: READWRITE_OPENCREATE
}
INFO:absl:Component CustomFilterComponent is running.
INFO:absl:Running launcher for node_info {
type {
name: "__main__.CustomFilterComponent"
}
id: "CustomFilterComponent"
}
contexts {
contexts {
type {
name: "pipeline"
}
name {
field_value {
string_value: "penguin-simple"
}
}
}
contexts {
type {
name: "pipeline_run"
}
name {
field_value {
string_value: "2023-08-30T16:15:00.188180"
}
}
}
contexts {
type {
name: "node"
}
name {
field_value {
string_value: "penguin-simple.CustomFilterComponent"
}
}
}
}
outputs {
outputs {
key: "output_path"
value {
artifact_spec {
type {
name: "String"
}
}
}
}
}
parameters {
parameters {
key: "input_base"
value {
field_value {
string_value: "data"
}
}
}
parameters {
key: "output_base"
value {
field_value {
string_value: "data_filtered"
}
}
}
}
downstream_nodes: "MyCustomExampleGen"
execution_options {
caching_options {
}
}
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:[CustomFilterComponent] Resolved inputs: ({},)
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 5
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=5, input_dict={}, output_dict=defaultdict(<class 'list'>, {'output_path': [Artifact(artifact: uri: "pipelines/penguin-simple/CustomFilterComponent/output_path/5/value"
, artifact_type: name: "String"
)]}), exec_properties={'input_base': 'data', 'output_base': 'data_filtered'}, execution_output_uri='pipelines/penguin-simple/CustomFilterComponent/.system/executor_execution/5/executor_output.pb', stateful_working_dir='pipelines/penguin-simple/CustomFilterComponent/.system/stateful_working_dir/2023-08-30T16:15:00.188180', tmp_dir='pipelines/penguin-simple/CustomFilterComponent/.system/executor_execution/5/.temp/', pipeline_node=node_info {
type {
name: "__main__.CustomFilterComponent"
}
id: "CustomFilterComponent"
}
contexts {
contexts {
type {
name: "pipeline"
}
name {
field_value {
string_value: "penguin-simple"
}
}
}
contexts {
type {
name: "pipeline_run"
}
name {
field_value {
string_value: "2023-08-30T16:15:00.188180"
}
}
}
contexts {
type {
name: "node"
}
name {
field_value {
string_value: "penguin-simple.CustomFilterComponent"
}
}
}
}
outputs {
outputs {
key: "output_path"
value {
artifact_spec {
type {
name: "String"
}
}
}
}
}
parameters {
parameters {
key: "input_base"
value {
field_value {
string_value: "data"
}
}
}
parameters {
key: "output_base"
value {
field_value {
string_value: "data_filtered"
}
}
}
}
downstream_nodes: "MyCustomExampleGen"
execution_options {
caching_options {
}
}
, pipeline_info=id: "penguin-simple"
, pipeline_run_id='2023-08-30T16:15:00.188180')
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 5 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'output_path': [Artifact(artifact: uri: "pipelines/penguin-simple/CustomFilterComponent/output_path/5/value"
, artifact_type: name: "String"
)]}) for execution 5
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component CustomFilterComponent is finished.
INFO:absl:Component MyCustomExampleGen is running.
INFO:absl:Running launcher for node_info {
type {
name: "__main__.MyCustomExampleGen"
}
id: "MyCustomExampleGen"
}
contexts {
contexts {
type {
name: "pipeline"
}
name {
field_value {
string_value: "penguin-simple"
}
}
}
contexts {
type {
name: "pipeline_run"
}
name {
field_value {
string_value: "2023-08-30T16:15:00.188180"
}
}
}
contexts {
type {
name: "node"
}
name {
field_value {
string_value: "penguin-simple.MyCustomExampleGen"
}
}
}
}
inputs {
inputs {
key: "input_base"
value {
channels {
producer_node_query {
id: "CustomFilterComponent"
}
context_queries {
type {
name: "pipeline"
}
name {
field_value {
string_value: "penguin-simple"
}
}
}
context_queries {
type {
name: "pipeline_run"
}
name {
field_value {
string_value: "2023-08-30T16:15:00.188180"
}
}
}
context_queries {
type {
name: "node"
}
name {
field_value {
string_value: "penguin-simple.CustomFilterComponent"
}
}
}
artifact_query {
type {
name: "String"
}
}
output_key: "output_path"
}
min_count: 1
}
}
}
outputs {
outputs {
key: "examples"
value {
artifact_spec {
type {
name: "Examples"
properties {
key: "span"
value: INT
}
properties {
key: "split_names"
value: STRING
}
properties {
key: "version"
value: INT
}
base_type: DATASET
}
}
}
}
}
parameters {
parameters {
key: "input_config"
value {
field_value {
string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}"
}
}
}
parameters {
key: "output_config"
value {
field_value {
string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}"
}
}
}
parameters {
key: "output_data_format"
value {
field_value {
int_value: 6
}
}
}
}
upstream_nodes: "CustomFilterComponent"
execution_options {
caching_options {
}
}
INFO:absl:MetadataStore with DB connection initialized
WARNING:absl:ArtifactQuery.property_predicate is not supported.
INFO:absl:[MyCustomExampleGen] Resolved inputs: ({'input_base': [Artifact(artifact: id: 6
type_id: 22
uri: "pipelines/penguin-simple/CustomFilterComponent/output_path/5/value"
custom_properties {
key: "__is_null__"
value {
int_value: 0
}
}
custom_properties {
key: "is_external"
value {
int_value: 0
}
}
custom_properties {
key: "state"
value {
string_value: "published"
}
}
custom_properties {
key: "tfx_version"
value {
string_value: "1.12.0"
}
}
state: LIVE
create_time_since_epoch: 1693412101448
last_update_time_since_epoch: 1693412101448
, artifact_type: id: 22
name: "String"
)]},)
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 6
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=6, input_dict={'input_base': [Artifact(artifact: id: 6
type_id: 22
uri: "pipelines/penguin-simple/CustomFilterComponent/output_path/5/value"
custom_properties {
key: "__is_null__"
value {
int_value: 0
}
}
custom_properties {
key: "is_external"
value {
int_value: 0
}
}
custom_properties {
key: "state"
value {
string_value: "published"
}
}
custom_properties {
key: "tfx_version"
value {
string_value: "1.12.0"
}
}
state: LIVE
create_time_since_epoch: 1693412101448
last_update_time_since_epoch: 1693412101448
, artifact_type: id: 22
name: "String"
)]}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "pipelines/penguin-simple/MyCustomExampleGen/examples/6"
, artifact_type: name: "Examples"
properties {
key: "span"
value: INT
}
properties {
key: "split_names"
value: STRING
}
properties {
key: "version"
value: INT
}
base_type: DATASET
)]}), exec_properties={'output_data_format': 6, 'input_config': '{\n "splits": [\n {\n "name": "single_split",\n "pattern": "*"\n }\n ]\n}', 'output_config': '{\n "split_config": {\n "splits": [\n {\n "hash_buckets": 2,\n "name": "train"\n },\n {\n "hash_buckets": 1,\n "name": "eval"\n }\n ]\n }\n}'}, execution_output_uri='pipelines/penguin-simple/MyCustomExampleGen/.system/executor_execution/6/executor_output.pb', stateful_working_dir='pipelines/penguin-simple/MyCustomExampleGen/.system/stateful_working_dir/2023-08-30T16:15:00.188180', tmp_dir='pipelines/penguin-simple/MyCustomExampleGen/.system/executor_execution/6/.temp/', pipeline_node=node_info {
type {
name: "__main__.MyCustomExampleGen"
}
id: "MyCustomExampleGen"
}
contexts {
contexts {
type {
name: "pipeline"
}
name {
field_value {
string_value: "penguin-simple"
}
}
}
contexts {
type {
name: "pipeline_run"
}
name {
field_value {
string_value: "2023-08-30T16:15:00.188180"
}
}
}
contexts {
type {
name: "node"
}
name {
field_value {
string_value: "penguin-simple.MyCustomExampleGen"
}
}
}
}
inputs {
inputs {
key: "input_base"
value {
channels {
producer_node_query {
id: "CustomFilterComponent"
}
context_queries {
type {
name: "pipeline"
}
name {
field_value {
string_value: "penguin-simple"
}
}
}
context_queries {
type {
name: "pipeline_run"
}
name {
field_value {
string_value: "2023-08-30T16:15:00.188180"
}
}
}
context_queries {
type {
name: "node"
}
name {
field_value {
string_value: "penguin-simple.CustomFilterComponent"
}
}
}
artifact_query {
type {
name: "String"
}
}
output_key: "output_path"
}
min_count: 1
}
}
}
outputs {
outputs {
key: "examples"
value {
artifact_spec {
type {
name: "Examples"
properties {
key: "span"
value: INT
}
properties {
key: "split_names"
value: STRING
}
properties {
key: "version"
value: INT
}
base_type: DATASET
}
}
}
}
}
parameters {
parameters {
key: "input_config"
value {
field_value {
string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}"
}
}
}
parameters {
key: "output_config"
value {
field_value {
string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}"
}
}
}
parameters {
key: "output_data_format"
value {
field_value {
int_value: 6
}
}
}
}
upstream_nodes: "CustomFilterComponent"
execution_options {
caching_options {
}
}
, pipeline_info=id: "penguin-simple"
, pipeline_run_id='2023-08-30T16:15:00.188180')
INFO:absl:Generating examples.
INFO:absl:Processing input csv data data_filtered/* to TFExample.
INFO:absl:Examples generated.
INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 6 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "pipelines/penguin-simple/MyCustomExampleGen/examples/6"
, artifact_type: name: "Examples"
properties {
key: "span"
value: INT
}
properties {
key: "split_names"
value: STRING
}
properties {
key: "version"
value: INT
}
base_type: DATASET
)]}) for execution 6
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component MyCustomExampleGen is finished.
As a sanity check, you can compute the number of examples for both the training and eval splits. It should equal the number of examples found in your filtered CSV. You can use the code below by replacing the EXECUTION_ID
with the ID shown in your latest run. You can see it in the last three lines of the output cell above. For example:
)]}) for execution 16 # ---> **16 is the EXECUTION ID**
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component MyCustomExampleGen is finished.
EXECUTION_ID = 6 # PLACE THE EXECUTION ID HERE
# Create a `TFRecordDataset` to read these files
train_dataset = tf.data.TFRecordDataset(f'{PIPELINE_ROOT}/MyCustomExampleGen/examples/{EXECUTION_ID}/Split-train/data_tfrecord-00000-of-00001.gz', compression_type="GZIP")
eval_dataset = tf.data.TFRecordDataset(f'{PIPELINE_ROOT}/MyCustomExampleGen/examples/{EXECUTION_ID}/Split-eval/data_tfrecord-00000-of-00001.gz', compression_type="GZIP")
# Get number of records for each dataset (only use for small datasets to avoid memory issues)
num_train_data = len(list(train_dataset))
num_eval_data = len(list(eval_dataset))
# Get the total
total_examples = num_train_data + num_eval_data
print(f'total number of examples: {total_examples}')
total number of examples: 232
In this lab, you were able to use custom components to create a pipeline. This shows that you can go outside the standard TFX components if your project calls for it. To know more about custom components, you can read more here and see the examples here.