Coursera

Ungraded Lab: Developing Custom TFX Components

Tensorflow Extended (TFX) provides ready made components for typical steps in a machine learning workflow. Other courses in this specialization focus on specific components and in this lab, you will learn how to make your own. This will be useful in case your project has specific needs that fall outside the standard TFX components. It will make your pipelines more flexible while still leveraging the experiment tracking and orchestration that TFX provides. In particular, you will:

To demonstrate, you will run the pipeline used in this official tutorial then modify it to have a custom component. Some of the discussions here are also taken from that tutorial to explain the motivation and point to additional resources.

Let’s begin!

Note: If you haven’t taken other courses in this specialization and it’s the first time you’re using TFX, please see Understanding TFX Pipelines to get an overview of important concepts.

Imports

import os
from absl import logging
import pandas as pd
import glob

import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))

# Set default logging level.
logging.set_verbosity(logging.INFO) 
TensorFlow version: 2.11.0
TFX version: 1.12.0

Set up variables

There are some variables used to define a pipeline. You can customize these variables as you want. By default all output from the pipeline will be generated under the current directory.

# Pipeline label
PIPELINE_NAME = "penguin-simple"

# Output directory to store artifacts generated from the pipeline.
PIPELINE_ROOT = os.path.join('pipelines', PIPELINE_NAME)

# Path to a SQLite DB file to use as an MLMD storage.
METADATA_PATH = os.path.join('metadata', PIPELINE_NAME, 'metadata.db')

# Output directory where created models from the pipeline will be exported.
SERVING_MODEL_DIR = os.path.join('serving_model', PIPELINE_NAME)

# Dataset directory
DATA_ROOT = 'data'

Dataset

You will use the Palmer Penguins dataset for this exercise. It has four numeric features, namely:

All features were already normalized to have range [0,1]. You will build a classification model which predicts the species of penguins.

Running the pipeline using standard components

The pipeline will consist of three essential TFX components and the graph will look like this:

ExampleGen -> Trainer -> Pusher

The pipeline includes the most minimal ML workflow which is importing data (ExampleGen), training a model (Trainer) and exporting the trained model (Pusher).

Model training code

You will first define the trainer module so the Trainer component can build the model and train it.

_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio
from tensorflow_metadata.proto.v0 import schema_pb2

_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
           for feature in _FEATURE_KEYS
       },
    _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}


def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int = 200) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


def _build_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  model = _build_keras_model()
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py

Write a pipeline definition

Next, you will define a function to create a TFX pipeline. A Pipeline object represents a TFX pipeline which can be run using one of pipeline orchestration systems that TFX supports.

def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     module_file: str, serving_model_dir: str,
                     metadata_path: str) -> tfx.dsl.Pipeline:
  """Creates a three component penguin pipeline with TFX."""
  # Brings data into the pipeline.
  example_gen = tfx.components.CsvExampleGen(input_base=data_root)

  # Uses user-provided Python function that trains a model.
  trainer = tfx.components.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5))

  # Pushes the model to a filesystem destination.
  pusher = tfx.components.Pusher(
      model=trainer.outputs['model'],
      push_destination=tfx.proto.PushDestination(
          filesystem=tfx.proto.PushDestination.Filesystem(
              base_directory=serving_model_dir)))

  # Following three components will be included in the pipeline.
  components = [
      example_gen,
      trainer,
      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      metadata_connection_config=tfx.orchestration.metadata
      .sqlite_metadata_connection_config(metadata_path),
      components=components)

Run the pipeline

TFX supports multiple orchestrators to run pipelines. In this tutorial we will use LocalDagRunner which is included in the TFX Python package and runs pipelines on local environment. We often call TFX pipelines “DAGs” which stands for directed acyclic graph.

LocalDagRunner provides fast iterations for developemnt and debugging. TFX also supports other orchestrators including Kubeflow Pipelines and Apache Airflow which are suitable for production use cases.

See TFX on Cloud AI Platform Pipelines or TFX Airflow Tutorial to learn more about other orchestration systems.

The code below creates a LocalDagRunner and passes a Pipeline object created from the function you defined earlier. The pipeline runs directly and you can see logs for the progress of the pipeline including ML model training.

tfx.orchestration.LocalDagRunner().run(
  _create_pipeline(
      pipeline_name=PIPELINE_NAME,
      pipeline_root=PIPELINE_ROOT,
      data_root=DATA_ROOT,
      module_file=_trainer_module_file,
      serving_model_dir=SERVING_MODEL_DIR,
      metadata_path=METADATA_PATH))
INFO:absl:Generating ephemeral wheel package for '/home/jovyan/work/penguin_trainer.py' (including modules: ['penguin_trainer']).
INFO:absl:User module package has hash fingerprint version a7e2e8dccbb913b74904edeec5549d868a2ea392bcd84fbc1965aba698dce3fc.
INFO:absl:Executing: ['/opt/conda/bin/python', '/tmp/tmp_xkdipl3/_tfx_generated_setup.py', 'bdist_wheel', '--bdist-dir', '/tmp/tmpu5l6ph9u', '--dist-dir', '/tmp/tmp3oxck6hx']
INFO:absl:Successfully built user code wheel distribution at 'pipelines/penguin-simple/_wheels/tfx_user_code_Trainer-0.0+a7e2e8dccbb913b74904edeec5549d868a2ea392bcd84fbc1965aba698dce3fc-py3-none-any.whl'; target user module is 'penguin_trainer'.
INFO:absl:Full user module path is 'penguin_trainer@pipelines/penguin-simple/_wheels/tfx_user_code_Trainer-0.0+a7e2e8dccbb913b74904edeec5549d868a2ea392bcd84fbc1965aba698dce3fc-py3-none-any.whl'
INFO:absl:Using deployment config:
 executor_specs {
  key: "CsvExampleGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor"
      }
    }
  }
}
executor_specs {
  key: "Pusher"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.pusher.executor.Executor"
    }
  }
}
executor_specs {
  key: "Trainer"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.trainer.executor.GenericExecutor"
    }
  }
}
custom_driver_specs {
  key: "CsvExampleGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.example_gen.driver.FileBasedDriver"
    }
  }
}
metadata_connection_config {
  database_connection_config {
    sqlite {
      filename_uri: "metadata/penguin-simple/metadata.db"
      connection_mode: READWRITE_OPENCREATE
    }
  }
}

INFO:absl:Using connection config:
 sqlite {
  filename_uri: "metadata/penguin-simple/metadata.db"
  connection_mode: READWRITE_OPENCREATE
}

INFO:absl:Component CsvExampleGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  }
  id: "CsvExampleGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "penguin-simple"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2023-08-30T16:14:43.887481"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "penguin-simple.CsvExampleGen"
      }
    }
  }
}
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          properties {
            key: "version"
            value: INT
          }
          base_type: DATASET
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "data"
      }
    }
  }
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
      }
    }
  }
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
      }
    }
  }
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
      }
    }
  }
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
      }
    }
  }
}
downstream_nodes: "Trainer"
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:[CsvExampleGen] Resolved inputs: ({},)
INFO:absl:select span and version = (0, None)
INFO:absl:latest span and version = (0, None)
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 1
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=1, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "pipelines/penguin-simple/CsvExampleGen/examples/1"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1683279421,sum_checksum:1683279421"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
base_type: DATASET
)]}), exec_properties={'output_config': '{\n  "split_config": {\n    "splits": [\n      {\n        "hash_buckets": 2,\n        "name": "train"\n      },\n      {\n        "hash_buckets": 1,\n        "name": "eval"\n      }\n    ]\n  }\n}', 'input_base': 'data', 'output_data_format': 6, 'input_config': '{\n  "splits": [\n    {\n      "name": "single_split",\n      "pattern": "*"\n    }\n  ]\n}', 'output_file_format': 5, 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1683279421,sum_checksum:1683279421'}, execution_output_uri='pipelines/penguin-simple/CsvExampleGen/.system/executor_execution/1/executor_output.pb', stateful_working_dir='pipelines/penguin-simple/CsvExampleGen/.system/stateful_working_dir/2023-08-30T16:14:43.887481', tmp_dir='pipelines/penguin-simple/CsvExampleGen/.system/executor_execution/1/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  }
  id: "CsvExampleGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "penguin-simple"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2023-08-30T16:14:43.887481"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "penguin-simple.CsvExampleGen"
      }
    }
  }
}
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          properties {
            key: "version"
            value: INT
          }
          base_type: DATASET
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "data"
      }
    }
  }
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
      }
    }
  }
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
      }
    }
  }
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
      }
    }
  }
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
      }
    }
  }
}
downstream_nodes: "Trainer"
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "penguin-simple"
, pipeline_run_id='2023-08-30T16:14:43.887481')
INFO:absl:Generating examples.
WARNING:apache_beam.runners.interactive.interactive_environment:Dependencies required for Interactive Beam PCollection visualization are not available, please use: `pip install apache-beam[interactive]` to install necessary dependencies to enable all data visualization features.




INFO:absl:Processing input csv data data/* to TFExample.
INFO:absl:Examples generated.
INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it
INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 1 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "pipelines/penguin-simple/CsvExampleGen/examples/1"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1683279421,sum_checksum:1683279421"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
base_type: DATASET
)]}) for execution 1
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component CsvExampleGen is finished.
INFO:absl:Component Trainer is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.trainer.component.Trainer"
    base_type: TRAIN
  }
  id: "Trainer"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "penguin-simple"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2023-08-30T16:14:43.887481"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "penguin-simple.Trainer"
      }
    }
  }
}
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "penguin-simple"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2023-08-30T16:14:43.887481"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "penguin-simple.CsvExampleGen"
            }
          }
        }
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
          }
        }
        output_key: "examples"
      }
      min_count: 1
    }
  }
}
outputs {
  outputs {
    key: "model"
    value {
      artifact_spec {
        type {
          name: "Model"
          base_type: MODEL
        }
      }
    }
  }
  outputs {
    key: "model_run"
    value {
      artifact_spec {
        type {
          name: "ModelRun"
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "custom_config"
    value {
      field_value {
        string_value: "null"
      }
    }
  }
  parameters {
    key: "eval_args"
    value {
      field_value {
        string_value: "{\n  \"num_steps\": 5\n}"
      }
    }
  }
  parameters {
    key: "module_path"
    value {
      field_value {
        string_value: "penguin_trainer@pipelines/penguin-simple/_wheels/tfx_user_code_Trainer-0.0+a7e2e8dccbb913b74904edeec5549d868a2ea392bcd84fbc1965aba698dce3fc-py3-none-any.whl"
      }
    }
  }
  parameters {
    key: "train_args"
    value {
      field_value {
        string_value: "{\n  \"num_steps\": 100\n}"
      }
    }
  }
}
upstream_nodes: "CsvExampleGen"
downstream_nodes: "Pusher"
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
WARNING:absl:ArtifactQuery.property_predicate is not supported.
INFO:absl:[Trainer] Resolved inputs: ({'examples': [Artifact(artifact: id: 1
type_id: 15
uri: "pipelines/penguin-simple/CsvExampleGen/examples/1"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
  }
}
custom_properties {
  key: "file_format"
  value {
    string_value: "tfrecords_gzip"
  }
}
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1683279421,sum_checksum:1683279421"
  }
}
custom_properties {
  key: "is_external"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "payload_format"
  value {
    string_value: "FORMAT_TF_EXAMPLE"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "state"
  value {
    string_value: "published"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.12.0"
  }
}
state: LIVE
create_time_since_epoch: 1693412088158
last_update_time_since_epoch: 1693412088158
, artifact_type: id: 15
name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
base_type: DATASET
)]},)
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 2
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=2, input_dict={'examples': [Artifact(artifact: id: 1
type_id: 15
uri: "pipelines/penguin-simple/CsvExampleGen/examples/1"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
  }
}
custom_properties {
  key: "file_format"
  value {
    string_value: "tfrecords_gzip"
  }
}
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1683279421,sum_checksum:1683279421"
  }
}
custom_properties {
  key: "is_external"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "payload_format"
  value {
    string_value: "FORMAT_TF_EXAMPLE"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "state"
  value {
    string_value: "published"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.12.0"
  }
}
state: LIVE
create_time_since_epoch: 1693412088158
last_update_time_since_epoch: 1693412088158
, artifact_type: id: 15
name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
base_type: DATASET
)]}, output_dict=defaultdict(<class 'list'>, {'model_run': [Artifact(artifact: uri: "pipelines/penguin-simple/Trainer/model_run/2"
, artifact_type: name: "ModelRun"
)], 'model': [Artifact(artifact: uri: "pipelines/penguin-simple/Trainer/model/2"
, artifact_type: name: "Model"
base_type: MODEL
)]}), exec_properties={'eval_args': '{\n  "num_steps": 5\n}', 'module_path': 'penguin_trainer@pipelines/penguin-simple/_wheels/tfx_user_code_Trainer-0.0+a7e2e8dccbb913b74904edeec5549d868a2ea392bcd84fbc1965aba698dce3fc-py3-none-any.whl', 'custom_config': 'null', 'train_args': '{\n  "num_steps": 100\n}'}, execution_output_uri='pipelines/penguin-simple/Trainer/.system/executor_execution/2/executor_output.pb', stateful_working_dir='pipelines/penguin-simple/Trainer/.system/stateful_working_dir/2023-08-30T16:14:43.887481', tmp_dir='pipelines/penguin-simple/Trainer/.system/executor_execution/2/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.trainer.component.Trainer"
    base_type: TRAIN
  }
  id: "Trainer"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "penguin-simple"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2023-08-30T16:14:43.887481"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "penguin-simple.Trainer"
      }
    }
  }
}
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "penguin-simple"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2023-08-30T16:14:43.887481"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "penguin-simple.CsvExampleGen"
            }
          }
        }
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
          }
        }
        output_key: "examples"
      }
      min_count: 1
    }
  }
}
outputs {
  outputs {
    key: "model"
    value {
      artifact_spec {
        type {
          name: "Model"
          base_type: MODEL
        }
      }
    }
  }
  outputs {
    key: "model_run"
    value {
      artifact_spec {
        type {
          name: "ModelRun"
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "custom_config"
    value {
      field_value {
        string_value: "null"
      }
    }
  }
  parameters {
    key: "eval_args"
    value {
      field_value {
        string_value: "{\n  \"num_steps\": 5\n}"
      }
    }
  }
  parameters {
    key: "module_path"
    value {
      field_value {
        string_value: "penguin_trainer@pipelines/penguin-simple/_wheels/tfx_user_code_Trainer-0.0+a7e2e8dccbb913b74904edeec5549d868a2ea392bcd84fbc1965aba698dce3fc-py3-none-any.whl"
      }
    }
  }
  parameters {
    key: "train_args"
    value {
      field_value {
        string_value: "{\n  \"num_steps\": 100\n}"
      }
    }
  }
}
upstream_nodes: "CsvExampleGen"
downstream_nodes: "Pusher"
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "penguin-simple"
, pipeline_run_id='2023-08-30T16:14:43.887481')
INFO:absl:Train on the 'train' split when train_args.splits is not set.
INFO:absl:Evaluate on the 'eval' split when eval_args.splits is not set.
INFO:absl:udf_utils.get_fn {'eval_args': '{\n  "num_steps": 5\n}', 'module_path': 'penguin_trainer@pipelines/penguin-simple/_wheels/tfx_user_code_Trainer-0.0+a7e2e8dccbb913b74904edeec5549d868a2ea392bcd84fbc1965aba698dce3fc-py3-none-any.whl', 'custom_config': 'null', 'train_args': '{\n  "num_steps": 100\n}'} 'run_fn'
INFO:absl:Installing 'pipelines/penguin-simple/_wheels/tfx_user_code_Trainer-0.0+a7e2e8dccbb913b74904edeec5549d868a2ea392bcd84fbc1965aba698dce3fc-py3-none-any.whl' to a temporary directory.
INFO:absl:Executing: ['/opt/conda/bin/python', '-m', 'pip', 'install', '--target', '/tmp/tmpsws5ijsb', 'pipelines/penguin-simple/_wheels/tfx_user_code_Trainer-0.0+a7e2e8dccbb913b74904edeec5549d868a2ea392bcd84fbc1965aba698dce3fc-py3-none-any.whl']
INFO:absl:Successfully installed 'pipelines/penguin-simple/_wheels/tfx_user_code_Trainer-0.0+a7e2e8dccbb913b74904edeec5549d868a2ea392bcd84fbc1965aba698dce3fc-py3-none-any.whl'.
INFO:absl:Training model.
INFO:absl:Feature body_mass_g has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_depth_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_length_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature flipper_length_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature species has a shape dim {
  size: 1
}
. Setting to DenseTensor.


WARNING:tensorflow:From /opt/conda/lib/python3.7/site-packages/tensorflow/python/autograph/pyct/static_analysis/liveness.py:83: Analyzer.lamba_check (from tensorflow.python.autograph.pyct.static_analysis.liveness) is deprecated and will be removed after 2023-09-23.
Instructions for updating:
Lambda fuctions will be no more assumed to be used in the statement where they are used, or at least in the same block. https://github.com/tensorflow/tensorflow/issues/56089


WARNING:tensorflow:From /opt/conda/lib/python3.7/site-packages/tensorflow/python/autograph/pyct/static_analysis/liveness.py:83: Analyzer.lamba_check (from tensorflow.python.autograph.pyct.static_analysis.liveness) is deprecated and will be removed after 2023-09-23.
Instructions for updating:
Lambda fuctions will be no more assumed to be used in the statement where they are used, or at least in the same block. https://github.com/tensorflow/tensorflow/issues/56089
INFO:absl:Feature body_mass_g has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_depth_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_length_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature flipper_length_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature species has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature body_mass_g has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_depth_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_length_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature flipper_length_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature species has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature body_mass_g has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_depth_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_length_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature flipper_length_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature species has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Model: "model"
INFO:absl:__________________________________________________________________________________________________
INFO:absl: Layer (type)                   Output Shape         Param #     Connected to                     
INFO:absl:==================================================================================================
INFO:absl: culmen_length_mm (InputLayer)  [(None, 1)]          0           []                               
INFO:absl:                                                                                                  
INFO:absl: culmen_depth_mm (InputLayer)   [(None, 1)]          0           []                               
INFO:absl:                                                                                                  
INFO:absl: flipper_length_mm (InputLayer)  [(None, 1)]         0           []                               
INFO:absl:                                                                                                  
INFO:absl: body_mass_g (InputLayer)       [(None, 1)]          0           []                               
INFO:absl:                                                                                                  
INFO:absl: concatenate (Concatenate)      (None, 4)            0           ['culmen_length_mm[0][0]',       
INFO:absl:                                                                  'culmen_depth_mm[0][0]',        
INFO:absl:                                                                  'flipper_length_mm[0][0]',      
INFO:absl:                                                                  'body_mass_g[0][0]']            
INFO:absl:                                                                                                  
INFO:absl: dense (Dense)                  (None, 8)            40          ['concatenate[0][0]']            
INFO:absl:                                                                                                  
INFO:absl: dense_1 (Dense)                (None, 8)            72          ['dense[0][0]']                  
INFO:absl:                                                                                                  
INFO:absl: dense_2 (Dense)                (None, 3)            27          ['dense_1[0][0]']                
INFO:absl:                                                                                                  
INFO:absl:==================================================================================================
INFO:absl:Total params: 139
INFO:absl:Trainable params: 139
INFO:absl:Non-trainable params: 0
INFO:absl:__________________________________________________________________________________________________


100/100 [==============================] - 1s 3ms/step - loss: 0.4040 - sparse_categorical_accuracy: 0.8745 - val_loss: 0.1062 - val_sparse_categorical_accuracy: 0.9800


WARNING:absl:Found untraced functions such as _update_step_xla while saving (showing 1 of 1). These functions will not be directly callable after loading.


INFO:tensorflow:Assets written to: pipelines/penguin-simple/Trainer/model/2/Format-Serving/assets


INFO:tensorflow:Assets written to: pipelines/penguin-simple/Trainer/model/2/Format-Serving/assets
INFO:absl:Training complete. Model written to pipelines/penguin-simple/Trainer/model/2/Format-Serving. ModelRun written to pipelines/penguin-simple/Trainer/model_run/2
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 2 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'model_run': [Artifact(artifact: uri: "pipelines/penguin-simple/Trainer/model_run/2"
, artifact_type: name: "ModelRun"
)], 'model': [Artifact(artifact: uri: "pipelines/penguin-simple/Trainer/model/2"
, artifact_type: name: "Model"
base_type: MODEL
)]}) for execution 2
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component Trainer is finished.
INFO:absl:Component Pusher is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.pusher.component.Pusher"
    base_type: DEPLOY
  }
  id: "Pusher"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "penguin-simple"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2023-08-30T16:14:43.887481"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "penguin-simple.Pusher"
      }
    }
  }
}
inputs {
  inputs {
    key: "model"
    value {
      channels {
        producer_node_query {
          id: "Trainer"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "penguin-simple"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2023-08-30T16:14:43.887481"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "penguin-simple.Trainer"
            }
          }
        }
        artifact_query {
          type {
            name: "Model"
            base_type: MODEL
          }
        }
        output_key: "model"
      }
    }
  }
}
outputs {
  outputs {
    key: "pushed_model"
    value {
      artifact_spec {
        type {
          name: "PushedModel"
          base_type: MODEL
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "custom_config"
    value {
      field_value {
        string_value: "null"
      }
    }
  }
  parameters {
    key: "push_destination"
    value {
      field_value {
        string_value: "{\n  \"filesystem\": {\n    \"base_directory\": \"serving_model/penguin-simple\"\n  }\n}"
      }
    }
  }
}
upstream_nodes: "Trainer"
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
WARNING:absl:ArtifactQuery.property_predicate is not supported.
INFO:absl:[Pusher] Resolved inputs: ({'model': [Artifact(artifact: id: 3
type_id: 18
uri: "pipelines/penguin-simple/Trainer/model/2"
custom_properties {
  key: "is_external"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "state"
  value {
    string_value: "published"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.12.0"
  }
}
state: LIVE
create_time_since_epoch: 1693412094684
last_update_time_since_epoch: 1693412094684
, artifact_type: id: 18
name: "Model"
base_type: MODEL
)]},)
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 3
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=3, input_dict={'model': [Artifact(artifact: id: 3
type_id: 18
uri: "pipelines/penguin-simple/Trainer/model/2"
custom_properties {
  key: "is_external"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "state"
  value {
    string_value: "published"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.12.0"
  }
}
state: LIVE
create_time_since_epoch: 1693412094684
last_update_time_since_epoch: 1693412094684
, artifact_type: id: 18
name: "Model"
base_type: MODEL
)]}, output_dict=defaultdict(<class 'list'>, {'pushed_model': [Artifact(artifact: uri: "pipelines/penguin-simple/Pusher/pushed_model/3"
, artifact_type: name: "PushedModel"
base_type: MODEL
)]}), exec_properties={'custom_config': 'null', 'push_destination': '{\n  "filesystem": {\n    "base_directory": "serving_model/penguin-simple"\n  }\n}'}, execution_output_uri='pipelines/penguin-simple/Pusher/.system/executor_execution/3/executor_output.pb', stateful_working_dir='pipelines/penguin-simple/Pusher/.system/stateful_working_dir/2023-08-30T16:14:43.887481', tmp_dir='pipelines/penguin-simple/Pusher/.system/executor_execution/3/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.pusher.component.Pusher"
    base_type: DEPLOY
  }
  id: "Pusher"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "penguin-simple"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2023-08-30T16:14:43.887481"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "penguin-simple.Pusher"
      }
    }
  }
}
inputs {
  inputs {
    key: "model"
    value {
      channels {
        producer_node_query {
          id: "Trainer"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "penguin-simple"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2023-08-30T16:14:43.887481"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "penguin-simple.Trainer"
            }
          }
        }
        artifact_query {
          type {
            name: "Model"
            base_type: MODEL
          }
        }
        output_key: "model"
      }
    }
  }
}
outputs {
  outputs {
    key: "pushed_model"
    value {
      artifact_spec {
        type {
          name: "PushedModel"
          base_type: MODEL
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "custom_config"
    value {
      field_value {
        string_value: "null"
      }
    }
  }
  parameters {
    key: "push_destination"
    value {
      field_value {
        string_value: "{\n  \"filesystem\": {\n    \"base_directory\": \"serving_model/penguin-simple\"\n  }\n}"
      }
    }
  }
}
upstream_nodes: "Trainer"
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "penguin-simple"
, pipeline_run_id='2023-08-30T16:14:43.887481')
WARNING:absl:Pusher is going to push the model without validation. Consider using Evaluator or InfraValidator in your pipeline.
INFO:absl:Model version: 1693412096
INFO:absl:Model written to serving path serving_model/penguin-simple/1693412096.
INFO:absl:Model pushed to pipelines/penguin-simple/Pusher/pushed_model/3.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 3 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'pushed_model': [Artifact(artifact: uri: "pipelines/penguin-simple/Pusher/pushed_model/3"
, artifact_type: name: "PushedModel"
base_type: MODEL
)]}) for execution 3
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component Pusher is finished.

You should see “INFO:absl:Component Pusher is finished.” at the end of the logs if the pipeline finished successfully. Because Pusher component is the last component of the pipeline. You can also see that the metadata (metadata store), pipelines (pipeline root), and serving_model directories have been populated with the metadata, artifacts, and models that the pipeline generates.

Now that you’ve ran this simple pipeline, you will see in the next sections how you can modify it to use custom components.

Building Custom Components

Let’s say you want to modify the pipeline above to filter the data first before running the trainer. Without using a TFX component, you would run something like the code below. This will just get the rows where the culmen_length_mm feature is greater than 0.3.

# search directory for one or more CSVs
files = glob.glob(f'{DATA_ROOT}/*.csv')

# filter the dataset
for file in files:
  df = pd.read_csv(file, index_col=False)
  filtered_df = df[df['culmen_length_mm'] > 0.3].reset_index(drop=True)

# print latest modified file
filtered_df
<style scoped> .dataframe tbody tr th:only-of-type { vertical-align: middle; }
.dataframe tbody tr th {
    vertical-align: top;
}

.dataframe thead th {
    text-align: right;
}
species culmen_length_mm culmen_depth_mm flipper_length_mm body_mass_g
0 0 0.327273 0.535714 0.169492 0.138889
1 0 0.378182 0.904762 0.423729 0.500000
2 0 0.505455 1.000000 0.372881 0.416667
3 0 0.309091 0.654762 0.186441 0.236111
4 0 0.305455 0.571429 0.254237 0.138889
... ... ... ... ... ...
227 2 0.549091 0.071429 0.711864 0.618056
228 2 0.534545 0.142857 0.728814 0.597222
229 2 0.665455 0.309524 0.847458 0.847222
230 2 0.476364 0.202381 0.677966 0.694444
231 2 0.647273 0.357143 0.694915 0.750000

232 rows × 5 columns

You can save the dataset above to a different directory and point the TFX pipeline to it. That definitely works but you can include this code in a custom component as well so it’s part of the TFX pipeline.

As mentioned in the lectures, a TFX component has a driver, executor, and publisher. The driver and publisher interacts with the metadata store while the executor runs the actual processing. More often than not, you only need to modify the executor and that’s what you’ll be doing in the next sections.

Custom components through Python functions

TFX provides a way to define an executor by just using a component decorator on your function. If you’ve done the Kubeflow Pipelines ungraded lab earlier this week, this will look very familiar. It uses the same concepts such as defining the inputs and outputs using type annotations then using those parameters in the function body.

The component below basically uses the same filtering code you saw earlier but wraps it in a Python component. It defines two input parameters and outputs a dictionary with a String artifact (see function docstring for description). Because this component uses the same publisher as a standard TFX component, the output artifact will be saved in the metadata store as you’ll see later.

#import artifact type that you will use
from tfx.types.standard_artifacts import String

# import the decorator
from tfx.dsl.component.experimental.decorators import component

# import type annotations
from tfx.dsl.component.experimental.annotations import InputArtifact, OutputArtifact, Parameter, OutputDict

@component
def CustomFilterComponent(input_base: Parameter[str], 
                    output_base: Parameter[str],
                    ) -> OutputDict(output_path=str):
  '''
  Args:
    input_base - location of the raw CSV
    output_base - location where you want to save the filtered CSV
  
  Returns:
    OutputDict:
      output_path - String artifact that just holds the `output_base` value
  '''
  import pandas as pd
  import glob
  import os

  # create the output base if it does not exist yet
  if not os.path.exists(output_base):
      os.mkdir(output_base)
  
  # search for CSVs in the input base
  files = glob.glob(f'{input_base}/*.csv')

  # loop through CSVs
  for file in files:

    # read the CSV
    df = pd.read_csv(file, index_col=False)

    # filter the data
    filtered_df = df[df['culmen_length_mm'] > 0.3].reset_index(drop=True)

    # compose output filename
    filename = os.path.basename(file).replace('.csv','')
    filtered_filename = f'{filename}_filtered.csv'
  
    # save filtered CSV to output base
    filtered_df.to_csv(f'{output_base}/{filtered_filename}', index=False)

  # define the output artifact
  return {'output_path': output_base}

You can now run your newly built component. You will just run a single node pipeline to prove that it works.

# define a filter task
filter_task = CustomFilterComponent(input_base=DATA_ROOT, 
                                        output_base=f'{DATA_ROOT}_filtered')

# include the task
components = [filter_task]

# define a pipeline with only the single component
pipeline = tfx.dsl.Pipeline(
      pipeline_name=PIPELINE_NAME,
      pipeline_root=PIPELINE_ROOT,
      metadata_connection_config=tfx.orchestration.metadata
      .sqlite_metadata_connection_config(METADATA_PATH),
      components=components)

# run the pipeline
tfx.orchestration.LocalDagRunner().run(pipeline)
INFO:absl:Using deployment config:
 executor_specs {
  key: "CustomFilterComponent"
  value {
    python_class_executable_spec {
      class_path: "__main__.CustomFilterComponent_Executor"
    }
  }
}
metadata_connection_config {
  database_connection_config {
    sqlite {
      filename_uri: "metadata/penguin-simple/metadata.db"
      connection_mode: READWRITE_OPENCREATE
    }
  }
}

INFO:absl:Using connection config:
 sqlite {
  filename_uri: "metadata/penguin-simple/metadata.db"
  connection_mode: READWRITE_OPENCREATE
}

INFO:absl:Component CustomFilterComponent is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "__main__.CustomFilterComponent"
  }
  id: "CustomFilterComponent"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "penguin-simple"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2023-08-30T16:14:56.923046"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "penguin-simple.CustomFilterComponent"
      }
    }
  }
}
outputs {
  outputs {
    key: "output_path"
    value {
      artifact_spec {
        type {
          name: "String"
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "data"
      }
    }
  }
  parameters {
    key: "output_base"
    value {
      field_value {
        string_value: "data_filtered"
      }
    }
  }
}
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:[CustomFilterComponent] Resolved inputs: ({},)
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 4
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=4, input_dict={}, output_dict=defaultdict(<class 'list'>, {'output_path': [Artifact(artifact: uri: "pipelines/penguin-simple/CustomFilterComponent/output_path/4/value"
, artifact_type: name: "String"
)]}), exec_properties={'output_base': 'data_filtered', 'input_base': 'data'}, execution_output_uri='pipelines/penguin-simple/CustomFilterComponent/.system/executor_execution/4/executor_output.pb', stateful_working_dir='pipelines/penguin-simple/CustomFilterComponent/.system/stateful_working_dir/2023-08-30T16:14:56.923046', tmp_dir='pipelines/penguin-simple/CustomFilterComponent/.system/executor_execution/4/.temp/', pipeline_node=node_info {
  type {
    name: "__main__.CustomFilterComponent"
  }
  id: "CustomFilterComponent"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "penguin-simple"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2023-08-30T16:14:56.923046"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "penguin-simple.CustomFilterComponent"
      }
    }
  }
}
outputs {
  outputs {
    key: "output_path"
    value {
      artifact_spec {
        type {
          name: "String"
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "data"
      }
    }
  }
  parameters {
    key: "output_base"
    value {
      field_value {
        string_value: "data_filtered"
      }
    }
  }
}
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "penguin-simple"
, pipeline_run_id='2023-08-30T16:14:56.923046')
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 4 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'output_path': [Artifact(artifact: uri: "pipelines/penguin-simple/CustomFilterComponent/output_path/4/value"
, artifact_type: name: "String"
)]}) for execution 4
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component CustomFilterComponent is finished.

You should now see the filtered CSV in the data_filtered folder in the file explorer. As expected, you can also see that it has less lines than the original because of the filtering.

# number of rows in original csv
!cat data/data.csv | wc -l

# number of rows in filtered csv
!cat data_filtered/data_filtered.csv | wc -l
335
233

If you navigate to pipelines/penguin-simple/CustomFilterComponent/output_path/, you will also see a directory with the run id of the pipeline (e.g. 4). If you click on the value file beneath it, you’ll see the string value you saved which is just the output_base (i.e. data_filtered). You will want to feed this value to the CsvExampleGen component and that’s what you’ll do next.

Building a custom component using standard components

If we try to use our new component with CsvExampleGen, you will encounter an error as shown below:

# Define filter task
filter_task = CustomFilterComponent(input_base=DATA_ROOT, 
                                        output_base=f'{DATA_ROOT}_filtered')

# Try using the custom component with CsvExampleGen. This code will expectedly throw an error.
try:
  example_gen = tfx.components.CsvExampleGen(input_base=filter_task.outputs['output_path'])

except Exception as e:
  print("Error thrown as expected!")
  print(e)
Error thrown as expected!
Expected type <class 'str'> for parameter 'input_base' but got OutputChannel(artifact_type=String, producer_component_id=CustomFilterComponent, output_key=output_path, additional_properties={}, additional_custom_properties={}) instead.

As you can see, the output of our custom component cannot be accepted into CsvExampleGen because it is configured to accept a primitive string. TFX components generate and consume Channel objects and that’s what our custom component outputs. For that, we need to build a custom data ingestion component that reuses the CsvExampleGen code but accepts a Channel. You will do that in the following sections.

Standard ExampleGen code

TFX is open source so the code for standard components can easily be found the public repo. We placed links in the following sections of the actual files that you’ll be modifying/overriding in case you want to compare what was changed for your custom ExampleGen.

The class heirarchy for these components is pretty deep but in summary, you will only need to modify three:

Modify the Component Spec

First, you will need to modify the Component Spec. This file describes the parameters, inputs, and outputs of the standard components. Parameters are values supplied at runtime while inputs and outputs are values read from the metadata store. The original for ExampleGen is found here.

You will need to implement the same for our custom ExampleGen but it should accept a Channel parameter The revised version is shown below. Notice that we revised the INPUTS dictionary to have a ChannelParameter whereas in the original, all are just in the PARAMETERS dictionary.

from tfx.types.component_spec import ChannelParameter
from tfx.types.component_spec import ExecutionParameter
from tfx.types.component_spec import ComponentSpec
from tfx.types import standard_artifacts
from tfx.proto import example_gen_pb2
from tfx.proto import range_config_pb2


# Key for example_gen input that we want to use
INPUT_BASE_KEY = 'input_base'

# Other keys
INPUT_CONFIG_KEY = 'input_config'
OUTPUT_CONFIG_KEY = 'output_config'
OUTPUT_DATA_FORMAT_KEY = 'output_data_format'
RANGE_CONFIG_KEY = 'range_config'
CUSTOM_CONFIG_KEY = 'custom_config'
EXAMPLES_KEY = 'examples'

class MyCustomExampleGenSpec(ComponentSpec):
  """File-based ExampleGen component spec."""
  
  PARAMETERS = {
      INPUT_CONFIG_KEY:
          ExecutionParameter(type=example_gen_pb2.Input),
      OUTPUT_CONFIG_KEY:
          ExecutionParameter(type=example_gen_pb2.Output),
      OUTPUT_DATA_FORMAT_KEY:
          ExecutionParameter(type=int),
      CUSTOM_CONFIG_KEY:
          ExecutionParameter(type=example_gen_pb2.CustomConfig, optional=True),
      RANGE_CONFIG_KEY:
          ExecutionParameter(type=range_config_pb2.RangeConfig, optional=True),
  }

  # Now accepts a channel
  INPUTS = {
      INPUT_BASE_KEY:
          ChannelParameter(type=standard_artifacts.String),
  }
  
  OUTPUTS = {
      EXAMPLES_KEY: ChannelParameter(type=standard_artifacts.Examples),
  }

Customize the Executor

With that, you should now modify the executor code to take note of this change of input types. Instead of looking at just the parameters, it should also look into Channel inputs passed onto the component.

Executor classes are executed by TFX starting with the Do function so you will need to modify that. The original Executor for CsvExampleGen can be found here and it inherits the base class here. The base class includes the Do() function and that’s what you’ll be overriding in your new custom executor below.

Basically, you’re using all the functions defined in the standard component but you’re modifying it so it can find the input_base value from the Channel inputs.

Take note that this tutorial prioritizes code brevity. In your projects, you may take a different approach such as modifying the _CsvToExample code to look for the string value in the input_dict instead of exec_properties. Doing that here will result in very long code blocks so the shorter approach is taken.

from typing import Any, Dict, Iterable, List, Text, Union

from tfx.components.example_gen.csv_example_gen.executor import Executor as CsvExampleGenExecutor
from tfx.types import standard_component_specs
from tfx.types import artifact_utils
from tfx import types

class MyCustomExecutor(CsvExampleGenExecutor):
  """Generic TFX CSV example gen executor."""

  def Do(
      self,
      input_dict: Dict[Text, List[types.Artifact]],
      output_dict: Dict[Text, List[types.Artifact]],
      exec_properties: Dict[Text, Any],
  ) -> None:
    """Take input data source and generates serialized data splits.
    The output is intended to be serialized tf.train.Examples or
    tf.train.SequenceExamples protocol buffer in gzipped TFRecord format,
    but subclasses can choose to override to write to any serialized records
    payload into gzipped TFRecord as specified, so long as downstream
    component can consume it. The format of payload is added to
    `payload_format` custom property of the output Example artifact.
    Args:
      input_dict: Input dict from input key to a list of Artifacts. Depends on
        detailed example gen implementation.
        - input_base: an external directory containing the data files.
      output_dict: Output dict from output key to a list of Artifacts.
        - examples: splits of serialized records.
      exec_properties: A dict of execution properties. Depends on detailed
        example gen implementation.
        - input_config: JSON string of example_gen_pb2.Input instance,
          providing input configuration.
        - output_config: JSON string of example_gen_pb2.Output instance,
          providing output configuration.
        - output_data_format: Payload format of generated data in output
          artifact, one of example_gen_pb2.PayloadFormat enum.
    Returns:
      None
    """
    self._log_startup(input_dict, output_dict, exec_properties)

    # Get the artifact from the Channel input
    filter_component_artifact = artifact_utils.get_single_instance(
        input_dict[standard_component_specs.INPUT_BASE_KEY])
    
    # Put the input string value into the exec_properties fictionary
    exec_properties[standard_component_specs.INPUT_BASE_KEY] = filter_component_artifact.value
    
    # execute superclass
    super(MyCustomExecutor, self).Do(input_dict=input_dict, output_dict=output_dict, exec_properties=exec_properties)

Define the Component class

Lastly, you will need to put everything together in a class. This will be the one you instantiate so you can run the component later. For comparison, the original CsvExampleGen component class is found here and it inherits the FileBasedExampleGen from here. The revised version is shown below:

from typing import Any, Dict, Optional, Text, Union

from tfx.dsl.components.base import base_beam_component
from tfx.dsl.components.base import executor_spec

from tfx.proto import example_gen_pb2
from tfx.proto import range_config_pb2

from tfx import types
from tfx.components.example_gen import utils

class MyCustomExampleGen(base_beam_component.BaseBeamComponent):

  # Define the Spec class and executor spec using the functions and
  # classes you defined earlier.
  SPEC_CLASS = MyCustomExampleGenSpec
  EXECUTOR_SPEC = executor_spec.BeamExecutorSpec(MyCustomExecutor)

  # Define init function. Notice that `input_base` now accepts a Channel.
  def __init__(self,
               input_base: types.Channel = None,
               input_config: Optional[Union[example_gen_pb2.Input,
                                            Dict[Text, Any]]] = None,
               output_config: Optional[Union[example_gen_pb2.Output,
                                             Dict[Text, Any]]] = None,
               range_config: Optional[Union[range_config_pb2.RangeConfig,
                                            Dict[Text, Any]]] = None,
               output_data_format: Optional[int] = example_gen_pb2.FORMAT_TF_EXAMPLE):
    """Customized ExampleGen component.
    Args:
      input_base: an external directory containing the CSV files. Accepts a Channel
        from a previous TFX component.
      input_config: An example_gen_pb2.Input instance, providing input
        configuration. If unset, the files under input_base will be treated as a
        single split. If any field is provided as a RuntimeParameter,
        input_config should be constructed as a dict with the same field names
        as Input proto message.
      output_config: An example_gen_pb2.Output instance, providing output
        configuration. If unset, default splits will be 'train' and 'eval' with
        size 2:1. If any field is provided as a RuntimeParameter, output_config
        should be constructed as a dict with the same field names as Output
        proto message.
      range_config: An optional range_config_pb2.RangeConfig instance,
        specifying the range of span values to consider. If unset, driver will
        default to searching for latest span with no restrictions.
    """
    
    # Configure inputs and outputs.
    input_config = input_config or utils.make_default_input_config()
    output_config = output_config or utils.make_default_output_config(
        input_config)
    
    # Define output type.
    example_artifacts = types.Channel(type=standard_artifacts.Examples)
    
    # Pass input arguments to your custom ExampleGen spec.
    spec = MyCustomExampleGenSpec(
        input_base=input_base,
        input_config=input_config,
        output_config=output_config,
        range_config=range_config,
        output_data_format=output_data_format,
        examples=example_artifacts)
    
    # This will check if the values passed are the correct type else
    # it will throw the error you saw earlier.
    super(MyCustomExampleGen, self).__init__(
        spec=spec)
  

You can now use the custom component (MyCustomExampleGen) in your code as shown below. It will no longer get an error because you reconfigured the input_base to accept a channel.

# Filter the dataset
filter_task = CustomFilterComponent(input_base=DATA_ROOT, 
                                        output_base=f'{DATA_ROOT}_filtered')

# Use the output of filter_task to know the input_base for this custom ExampleGen
custom_example_gen_task = MyCustomExampleGen(input_base=filter_task.outputs['output_path'])

# Define components to include
components = [filter_task,
              custom_example_gen_task]

# Create the pipeline
pipeline = tfx.dsl.Pipeline(
      pipeline_name=PIPELINE_NAME,
      pipeline_root=PIPELINE_ROOT,
      metadata_connection_config=tfx.orchestration.metadata
      .sqlite_metadata_connection_config(METADATA_PATH),
      components=components)

# Run the pipeline
tfx.orchestration.LocalDagRunner().run(pipeline)
INFO:absl:Using deployment config:
 executor_specs {
  key: "CustomFilterComponent"
  value {
    python_class_executable_spec {
      class_path: "__main__.CustomFilterComponent_Executor"
    }
  }
}
executor_specs {
  key: "MyCustomExampleGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "__main__.MyCustomExecutor"
      }
    }
  }
}
metadata_connection_config {
  database_connection_config {
    sqlite {
      filename_uri: "metadata/penguin-simple/metadata.db"
      connection_mode: READWRITE_OPENCREATE
    }
  }
}

INFO:absl:Using connection config:
 sqlite {
  filename_uri: "metadata/penguin-simple/metadata.db"
  connection_mode: READWRITE_OPENCREATE
}

INFO:absl:Component CustomFilterComponent is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "__main__.CustomFilterComponent"
  }
  id: "CustomFilterComponent"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "penguin-simple"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2023-08-30T16:15:00.188180"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "penguin-simple.CustomFilterComponent"
      }
    }
  }
}
outputs {
  outputs {
    key: "output_path"
    value {
      artifact_spec {
        type {
          name: "String"
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "data"
      }
    }
  }
  parameters {
    key: "output_base"
    value {
      field_value {
        string_value: "data_filtered"
      }
    }
  }
}
downstream_nodes: "MyCustomExampleGen"
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:[CustomFilterComponent] Resolved inputs: ({},)
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 5
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=5, input_dict={}, output_dict=defaultdict(<class 'list'>, {'output_path': [Artifact(artifact: uri: "pipelines/penguin-simple/CustomFilterComponent/output_path/5/value"
, artifact_type: name: "String"
)]}), exec_properties={'input_base': 'data', 'output_base': 'data_filtered'}, execution_output_uri='pipelines/penguin-simple/CustomFilterComponent/.system/executor_execution/5/executor_output.pb', stateful_working_dir='pipelines/penguin-simple/CustomFilterComponent/.system/stateful_working_dir/2023-08-30T16:15:00.188180', tmp_dir='pipelines/penguin-simple/CustomFilterComponent/.system/executor_execution/5/.temp/', pipeline_node=node_info {
  type {
    name: "__main__.CustomFilterComponent"
  }
  id: "CustomFilterComponent"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "penguin-simple"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2023-08-30T16:15:00.188180"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "penguin-simple.CustomFilterComponent"
      }
    }
  }
}
outputs {
  outputs {
    key: "output_path"
    value {
      artifact_spec {
        type {
          name: "String"
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "data"
      }
    }
  }
  parameters {
    key: "output_base"
    value {
      field_value {
        string_value: "data_filtered"
      }
    }
  }
}
downstream_nodes: "MyCustomExampleGen"
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "penguin-simple"
, pipeline_run_id='2023-08-30T16:15:00.188180')
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 5 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'output_path': [Artifact(artifact: uri: "pipelines/penguin-simple/CustomFilterComponent/output_path/5/value"
, artifact_type: name: "String"
)]}) for execution 5
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component CustomFilterComponent is finished.
INFO:absl:Component MyCustomExampleGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "__main__.MyCustomExampleGen"
  }
  id: "MyCustomExampleGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "penguin-simple"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2023-08-30T16:15:00.188180"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "penguin-simple.MyCustomExampleGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "input_base"
    value {
      channels {
        producer_node_query {
          id: "CustomFilterComponent"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "penguin-simple"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2023-08-30T16:15:00.188180"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "penguin-simple.CustomFilterComponent"
            }
          }
        }
        artifact_query {
          type {
            name: "String"
          }
        }
        output_key: "output_path"
      }
      min_count: 1
    }
  }
}
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          properties {
            key: "version"
            value: INT
          }
          base_type: DATASET
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
      }
    }
  }
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
      }
    }
  }
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
      }
    }
  }
}
upstream_nodes: "CustomFilterComponent"
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
WARNING:absl:ArtifactQuery.property_predicate is not supported.
INFO:absl:[MyCustomExampleGen] Resolved inputs: ({'input_base': [Artifact(artifact: id: 6
type_id: 22
uri: "pipelines/penguin-simple/CustomFilterComponent/output_path/5/value"
custom_properties {
  key: "__is_null__"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "is_external"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "state"
  value {
    string_value: "published"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.12.0"
  }
}
state: LIVE
create_time_since_epoch: 1693412101448
last_update_time_since_epoch: 1693412101448
, artifact_type: id: 22
name: "String"
)]},)
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 6
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=6, input_dict={'input_base': [Artifact(artifact: id: 6
type_id: 22
uri: "pipelines/penguin-simple/CustomFilterComponent/output_path/5/value"
custom_properties {
  key: "__is_null__"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "is_external"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "state"
  value {
    string_value: "published"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.12.0"
  }
}
state: LIVE
create_time_since_epoch: 1693412101448
last_update_time_since_epoch: 1693412101448
, artifact_type: id: 22
name: "String"
)]}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "pipelines/penguin-simple/MyCustomExampleGen/examples/6"
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
base_type: DATASET
)]}), exec_properties={'output_data_format': 6, 'input_config': '{\n  "splits": [\n    {\n      "name": "single_split",\n      "pattern": "*"\n    }\n  ]\n}', 'output_config': '{\n  "split_config": {\n    "splits": [\n      {\n        "hash_buckets": 2,\n        "name": "train"\n      },\n      {\n        "hash_buckets": 1,\n        "name": "eval"\n      }\n    ]\n  }\n}'}, execution_output_uri='pipelines/penguin-simple/MyCustomExampleGen/.system/executor_execution/6/executor_output.pb', stateful_working_dir='pipelines/penguin-simple/MyCustomExampleGen/.system/stateful_working_dir/2023-08-30T16:15:00.188180', tmp_dir='pipelines/penguin-simple/MyCustomExampleGen/.system/executor_execution/6/.temp/', pipeline_node=node_info {
  type {
    name: "__main__.MyCustomExampleGen"
  }
  id: "MyCustomExampleGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "penguin-simple"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2023-08-30T16:15:00.188180"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "penguin-simple.MyCustomExampleGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "input_base"
    value {
      channels {
        producer_node_query {
          id: "CustomFilterComponent"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "penguin-simple"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2023-08-30T16:15:00.188180"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "penguin-simple.CustomFilterComponent"
            }
          }
        }
        artifact_query {
          type {
            name: "String"
          }
        }
        output_key: "output_path"
      }
      min_count: 1
    }
  }
}
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          properties {
            key: "version"
            value: INT
          }
          base_type: DATASET
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
      }
    }
  }
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
      }
    }
  }
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
      }
    }
  }
}
upstream_nodes: "CustomFilterComponent"
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "penguin-simple"
, pipeline_run_id='2023-08-30T16:15:00.188180')
INFO:absl:Generating examples.
INFO:absl:Processing input csv data data_filtered/* to TFExample.
INFO:absl:Examples generated.
INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 6 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "pipelines/penguin-simple/MyCustomExampleGen/examples/6"
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
base_type: DATASET
)]}) for execution 6
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component MyCustomExampleGen is finished.

As a sanity check, you can compute the number of examples for both the training and eval splits. It should equal the number of examples found in your filtered CSV. You can use the code below by replacing the EXECUTION_ID with the ID shown in your latest run. You can see it in the last three lines of the output cell above. For example:

)]}) for execution 16  # ---> **16 is the EXECUTION ID**
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component MyCustomExampleGen is finished.
EXECUTION_ID = 6 # PLACE THE EXECUTION ID HERE

# Create a `TFRecordDataset` to read these files
train_dataset = tf.data.TFRecordDataset(f'{PIPELINE_ROOT}/MyCustomExampleGen/examples/{EXECUTION_ID}/Split-train/data_tfrecord-00000-of-00001.gz', compression_type="GZIP")
eval_dataset = tf.data.TFRecordDataset(f'{PIPELINE_ROOT}/MyCustomExampleGen/examples/{EXECUTION_ID}/Split-eval/data_tfrecord-00000-of-00001.gz', compression_type="GZIP")

# Get number of records for each dataset (only use for small datasets to avoid memory issues)
num_train_data = len(list(train_dataset))
num_eval_data = len(list(eval_dataset))

# Get the total
total_examples = num_train_data + num_eval_data

print(f'total number of examples: {total_examples}')
total number of examples: 232

Wrap Up

In this lab, you were able to use custom components to create a pipeline. This shows that you can go outside the standard TFX components if your project calls for it. To know more about custom components, you can read more here and see the examples here.