Coursera

Build a SageMaker Pipeline to train and deploy a BERT-Based text classifier

Introduction

In this lab, you will do the following:

Table of Contents

Terminology

This notebook focuses on the following features of Amazon SageMaker Pipelines:

BERT Pipeline

The pipeline that you will create follows a typical machine learning application pattern of pre-processing, training, evaluation, and model registration.

In the processing step, you will perform feature engineering to transform the review_body text into BERT embeddings using the pre-trained BERT model and split the dataset into train, validation and test files. The transformed dataset is stored in a feature store. To optimize for Tensorflow training, the transformed dataset files are saved using the TFRecord format in Amazon S3.

In the training step, you will fine-tune the BERT model to the customer reviews dataset and add a new classification layer to predict the sentiment for a given review_body.

In the evaluation step, you will take the trained model and a test dataset as input, and produce a JSON file containing classification evaluation metrics.

In the condition step, you will register the trained model if the accuracy of the model, as determined by our evaluation step, exceeds a given threshold value.

First, install the required modules.

# please ignore warning messages during the installation
!pip install --disable-pip-version-check -q sagemaker==2.35.0
!pip install -q protobuf==3.20.*
DEPRECATION: pyodbc 4.0.0-unsupported has a non-standard version number. pip 23.3 will enforce this behaviour change. A possible replacement is to upgrade to a newer version of pyodbc or contact the author to suggest that they release a version with a conforming version number. Discussion can be found at https://github.com/pypa/pip/issues/12063
ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
sparkmagic 0.20.4 requires nest-asyncio==1.5.5, but you have nest-asyncio 1.5.8 which is incompatible.
WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv
DEPRECATION: pyodbc 4.0.0-unsupported has a non-standard version number. pip 23.3 will enforce this behaviour change. A possible replacement is to upgrade to a newer version of pyodbc or contact the author to suggest that they release a version with a conforming version number. Discussion can be found at https://github.com/pypa/pip/issues/12063
WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv

[notice] A new release of pip is available: 23.2.1 -> 23.3.1
[notice] To update, run: pip install --upgrade pip
import os
import sagemaker
import logging
import boto3
import sagemaker
import pandas as pd
import json
import botocore
from botocore.exceptions import ClientError

config = botocore.config.Config(user_agent_extra='dlai-pds/c2/w3')

# low-level service client of the boto3 session
sm = boto3.client(service_name='sagemaker', 
                  config=config)

sm_runtime = boto3.client('sagemaker-runtime',
                          config=config)

sess = sagemaker.Session(sagemaker_client=sm,
                         sagemaker_runtime_client=sm_runtime)

bucket = sess.default_bucket()
role = sagemaker.get_execution_role()
region = sess.boto_region_name
/opt/conda/lib/python3.7/site-packages/boto3/compat.py:82: PythonDeprecationWarning: Boto3 will no longer support Python 3.7 starting December 13, 2023. To continue receiving service updates, bug fixes, and security updates please upgrade to Python 3.8 or later. More information can be found here: https://aws.amazon.com/blogs/developer/python-support-policy-updates-for-aws-sdks-and-tools/
  warnings.warn(warning, PythonDeprecationWarning)

Setup the pipeline name.

import time
timestamp = int(time.time())

pipeline_name = 'BERT-pipeline-{}'.format(timestamp)

1. Configure the dataset and processing step

1.1. Configure S3 path for raw input data

The raw dataset is in the public S3 bucket. Let’s start by specifying the S3 location of it:

raw_input_data_s3_uri = 's3://dlai-practical-data-science/data/raw/'
print(raw_input_data_s3_uri)
s3://dlai-practical-data-science/data/raw/

List the files in the S3 bucket (in this case it will be just one file):

!aws s3 ls $raw_input_data_s3_uri
2021-04-30 02:21:06    8457214 womens_clothing_ecommerce_reviews.csv

1.2. Configure processing step

For the pipeline workflow you will need to create workflow parameters of a specific type: integer, string, or float.

from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)

Now set the parameters for the processing step.

processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.c5.2xlarge"
)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)

train_split_percentage = ParameterFloat(
    name="TrainSplitPercentage",
    default_value=0.90,
)

validation_split_percentage = ParameterFloat(
    name="ValidationSplitPercentage",
    default_value=0.05,
)

test_split_percentage = ParameterFloat(
    name="TestSplitPercentage",
    default_value=0.05,
)

balance_dataset = ParameterString(
    name="BalanceDataset",
    default_value="True",
)

max_seq_length = ParameterInteger(
    name="MaxSeqLength",
    default_value=128,
)

feature_store_offline_prefix = ParameterString(
    name="FeatureStoreOfflinePrefix",
    default_value="reviews-feature-store-" + str(timestamp),
)

feature_group_name = ParameterString(
    name="FeatureGroupName",
    default_value="reviews-feature-group-" + str(timestamp)
)

input_data = ParameterString(
    name="InputData",
    default_value=raw_input_data_s3_uri,
)

Setting up scikit-learn-based processor, pass the SageMaker execution role, processing instance type and instance count.

from sagemaker.sklearn.processing import SKLearnProcessor

processor = SKLearnProcessor(
    framework_version='0.23-1',
    role=role,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    env={'AWS_DEFAULT_REGION': region},                             
)

Now you will use the processor instance to construct a ProcessingStep, along with the input and output channels and the code that will be executed when the pipeline invokes pipeline execution. This is very similar to a processor instance’s run method, for those familiar with the existing Python SDK.

Note the "sentiment-train", "sentiment-validation" and "sentiment-test" named channels specified in the output configuration for the processing job. Such step Properties can be used in subsequent steps and will resolve to their runtime values at execution. In particular, you will call out this usage defining the training step.

from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

processing_inputs=[
    ProcessingInput(
        input_name='raw-input-data',
        source=input_data,
        destination='/opt/ml/processing/input/data/',
        s3_data_distribution_type='ShardedByS3Key'
    )
]

processing_outputs=[
    ProcessingOutput(output_name='sentiment-train',
                     source='/opt/ml/processing/output/sentiment/train',
                     s3_upload_mode='EndOfJob'),
    ProcessingOutput(output_name='sentiment-validation',
                     source='/opt/ml/processing/output/sentiment/validation',
                     s3_upload_mode='EndOfJob'),
    ProcessingOutput(output_name='sentiment-test',
                     source='/opt/ml/processing/output/sentiment/test',
                     s3_upload_mode='EndOfJob')
]        

processing_step = ProcessingStep(
    name='Processing', 
    code='src/prepare_data.py',
    processor=processor,
    inputs=processing_inputs,
    outputs=processing_outputs,
    job_arguments=['--train-split-percentage', str(train_split_percentage.default_value),                   
                   '--validation-split-percentage', str(validation_split_percentage.default_value),
                   '--test-split-percentage', str(test_split_percentage.default_value),
                   '--balance-dataset', str(balance_dataset.default_value),
                   '--max-seq-length', str(max_seq_length.default_value),                   
                   '--feature-store-offline-prefix', str(feature_store_offline_prefix.default_value),
                   '--feature-group-name', str(feature_group_name.default_value)
                  ]
)        

print(processing_step)
ProcessingStep(name='Processing', step_type=<StepTypeEnum.PROCESSING: 'Processing'>)

Now you can call out the properties of the processing job as an object using the command processing_step.properties. To print out and explore the attributes use __dict__ method.

# print out the list of the processing job properties
print(json.dumps(
    processing_step.properties.__dict__,
    indent=4, sort_keys=True, default=str
))
{
    "AppSpecification": "<sagemaker.workflow.properties.Properties object at 0x7f618d0e53d0>",
    "AutoMLJobArn": "<sagemaker.workflow.properties.Properties object at 0x7f618d285250>",
    "CreationTime": "<sagemaker.workflow.properties.Properties object at 0x7f618d2fe290>",
    "Environment": "<sagemaker.workflow.properties.Properties object at 0x7f618d0e5b50>",
    "ExitMessage": "<sagemaker.workflow.properties.Properties object at 0x7f618d0e5f90>",
    "ExperimentConfig": "<sagemaker.workflow.properties.Properties object at 0x7f618d0e5950>",
    "FailureReason": "<sagemaker.workflow.properties.Properties object at 0x7f618d0e5fd0>",
    "LastModifiedTime": "<sagemaker.workflow.properties.Properties object at 0x7f618d2fe850>",
    "MonitoringScheduleArn": "<sagemaker.workflow.properties.Properties object at 0x7f618d285110>",
    "NetworkConfig": "<sagemaker.workflow.properties.Properties object at 0x7f618d0e5750>",
    "ProcessingEndTime": "<sagemaker.workflow.properties.Properties object at 0x7f618d0e5b90>",
    "ProcessingInputs": "<sagemaker.workflow.properties.PropertiesList object at 0x7f618d0d5f90>",
    "ProcessingJobArn": "<sagemaker.workflow.properties.Properties object at 0x7f618d0e5e90>",
    "ProcessingJobName": "<sagemaker.workflow.properties.Properties object at 0x7f618d0d5b50>",
    "ProcessingJobStatus": "<sagemaker.workflow.properties.Properties object at 0x7f618d0e5d90>",
    "ProcessingOutputConfig": "<sagemaker.workflow.properties.Properties object at 0x7f618d0d5b90>",
    "ProcessingResources": "<sagemaker.workflow.properties.Properties object at 0x7f618d0e5410>",
    "ProcessingStartTime": "<sagemaker.workflow.properties.Properties object at 0x7f618d3b3350>",
    "RoleArn": "<sagemaker.workflow.properties.Properties object at 0x7f618d0e59d0>",
    "StoppingCondition": "<sagemaker.workflow.properties.Properties object at 0x7f618d0e5810>",
    "TrainingJobArn": "<sagemaker.workflow.properties.Properties object at 0x7f618d285210>",
    "_path": "Steps.Processing",
    "_shape_name": "DescribeProcessingJobResponse"
}

Pull the channel sentiment-train from the output configuration of the processing job. Print out the attributes of the resulting object:

print(json.dumps(
    processing_step.properties.ProcessingOutputConfig.Outputs['sentiment-train'].__dict__, 
    indent=4, sort_keys=True, default=str
))
{
    "AppManaged": "<sagemaker.workflow.properties.Properties object at 0x7f618d100890>",
    "FeatureStoreOutput": "<sagemaker.workflow.properties.Properties object at 0x7f618d1008d0>",
    "OutputName": "<sagemaker.workflow.properties.Properties object at 0x7f618d100a90>",
    "S3Output": "<sagemaker.workflow.properties.Properties object at 0x7f618d1007d0>",
    "_path": "Steps.Processing.ProcessingOutputConfig.Outputs['sentiment-train']",
    "_shape_name": "ProcessingOutput"
}

Now you can pull and print out attributes of the S3 output path related to the sentiment-train output channel:

print(json.dumps(
    processing_step.properties.ProcessingOutputConfig.Outputs['sentiment-train'].S3Output.S3Uri.__dict__,
    indent=4, sort_keys=True, default=str
))
{
    "__str__": "S3Uri",
    "_path": "Steps.Processing.ProcessingOutputConfig.Outputs['sentiment-train'].S3Output.S3Uri",
    "_shape_name": "S3Uri"
}

Exercise 1

Pull and print out attributes of the S3 output path object related to the sentiment-test output channel.

Instructions: Use the example in the cell above.

print(json.dumps(
    ### BEGIN SOLUTION - DO NOT delete this comment for grading purposes
    processing_step.properties.ProcessingOutputConfig.Outputs["sentiment-test"].S3Output.S3Uri.__dict__,
    ### END SOLUTION - DO NOT delete this comment for grading purposes
    indent=4, sort_keys=True, default=str
))
{
    "__str__": "S3Uri",
    "_path": "Steps.Processing.ProcessingOutputConfig.Outputs['sentiment-test'].S3Output.S3Uri",
    "_shape_name": "S3Uri"
}

These objects can be passed into the next steps of the workflow. Also, you can pull the arguments of the processing step with the corresponding function. The result is in the dictionary format. Review the keys of this dictionary:

processing_step.arguments.keys()
dict_keys(['ProcessingResources', 'AppSpecification', 'RoleArn', 'ProcessingInputs', 'ProcessingOutputConfig', 'Environment'])

Pull and review processing inputs from the arguments of the processing step:

processing_step.arguments['ProcessingInputs']
[{'InputName': 'raw-input-data',
  'AppManaged': False,
  'S3Input': {'S3Uri': ParameterString(name='InputData', parameter_type=<ParameterTypeEnum.STRING: 'String'>, default_value='s3://dlai-practical-data-science/data/raw/'),
   'LocalPath': '/opt/ml/processing/input/data/',
   'S3DataType': 'S3Prefix',
   'S3InputMode': 'File',
   'S3DataDistributionType': 'ShardedByS3Key',
   'S3CompressionType': 'None'}},
 {'InputName': 'code',
  'AppManaged': False,
  'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-440427790495/sagemaker-scikit-learn-2023-10-24-04-15-46-150/input/code/prepare_data.py',
   'LocalPath': '/opt/ml/processing/input/code',
   'S3DataType': 'S3Prefix',
   'S3InputMode': 'File',
   'S3DataDistributionType': 'FullyReplicated',
   'S3CompressionType': 'None'}}]

Exercise 2

Pull and review configuration of the processing outputs from the arguments of the processing step.

Instructions: Find the required key in the arguments dictionary and pull the corresponding value following the example above.

### BEGIN SOLUTION - DO NOT delete this comment for grading purposes
processing_step.arguments["ProcessingOutputConfig"]
### END SOLUTION - DO NOT delete this comment for grading purposes
{'Outputs': [{'OutputName': 'sentiment-train',
   'AppManaged': False,
   'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-440427790495/sagemaker-scikit-learn-2023-10-24-04-15-39-910/output/sentiment-train',
    'LocalPath': '/opt/ml/processing/output/sentiment/train',
    'S3UploadMode': 'EndOfJob'}},
  {'OutputName': 'sentiment-validation',
   'AppManaged': False,
   'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-440427790495/sagemaker-scikit-learn-2023-10-24-04-15-39-910/output/sentiment-validation',
    'LocalPath': '/opt/ml/processing/output/sentiment/validation',
    'S3UploadMode': 'EndOfJob'}},
  {'OutputName': 'sentiment-test',
   'AppManaged': False,
   'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-440427790495/sagemaker-scikit-learn-2023-10-24-04-15-39-910/output/sentiment-test',
    'LocalPath': '/opt/ml/processing/output/sentiment/test',
    'S3UploadMode': 'EndOfJob'}}]}

2. Configure training step

2.1. Define parameters

Setup the parameters for the workflow.

freeze_bert_layer = ParameterString(
    name="FreezeBertLayer",
    default_value="False",
)

epochs = ParameterInteger(
    name="Epochs",
    default_value=3
)
    
learning_rate = ParameterFloat(
    name="LearningRate",
    default_value=0.00001
) 
    
train_batch_size = ParameterInteger(
    name="TrainBatchSize",
    default_value=64
)

train_steps_per_epoch = ParameterInteger(
    name="TrainStepsPerEpoch",
    default_value=50
)

validation_batch_size = ParameterInteger(
    name="ValidationBatchSize",
    default_value=64
)

validation_steps_per_epoch = ParameterInteger(
    name="ValidationStepsPerEpoch",
    default_value=50
)

seed = ParameterInteger(
    name="Seed",
    default_value=42
)

run_validation = ParameterString(
    name="RunValidation",
    default_value="True",
)

train_instance_count = ParameterInteger(
    name="TrainInstanceCount",
    default_value=1
)

train_instance_type = ParameterString(
    name="TrainInstanceType",
    default_value="ml.c5.9xlarge"
)

train_volume_size = ParameterInteger(
    name="TrainVolumeSize",
    default_value=256
) 

input_mode = ParameterString(
    name="InputMode",
    default_value="File",
)

2.2. Configure hyper-parameters

Setup the dictionary that will be passed into the hyperparameters argument.

hyperparameters={
    'max_seq_length': max_seq_length,
    'freeze_bert_layer': freeze_bert_layer,
    'epochs': epochs,
    'learning_rate': learning_rate,
    'train_batch_size': train_batch_size,
    'train_steps_per_epoch': train_steps_per_epoch,
    'validation_batch_size': validation_batch_size,
    'validation_steps_per_epoch': validation_steps_per_epoch,
    'seed': seed,
    'run_validation': run_validation
}

2.3. Configure model-evaluation metrics

Choose loss and accuracy as the evaluation metrics.

metric_definitions = [
     {'Name': 'validation:loss', 'Regex': 'val_loss: ([0-9.]+)'},
     {'Name': 'validation:accuracy', 'Regex': 'val_acc: ([0-9.]+)'},
]

For example, these sample log lines…

[step: 100] val_loss: 0.55 - val_acc: 74.64%

…will produce the following metrics in CloudWatch:

validation:loss = 0.55

validation:accuracy = 74.64

2.4. Configure the PyTorchEstimator

Configure an estimator and the input dataset. A typical training script loads data from the input channels, configures training with hyperparameters, trains a model, and saves a model to model_dir so that it can be hosted later.

from sagemaker.pytorch import PyTorch as PyTorchEstimator

estimator = PyTorchEstimator(
    entry_point='train.py',
    source_dir='src',
    role=role,
    instance_count=train_instance_count,
    instance_type=train_instance_type,
    volume_size=train_volume_size,
    py_version='py3',
    framework_version='1.6.0',
    hyperparameters=hyperparameters,
    metric_definitions=metric_definitions,
    input_mode=input_mode
)

2.5. Setup pipeline step caching

Step signature caching allows SageMaker Pipelines, before executing a step, to find a previous execution of a step that was called using the same arguments. Cache hit gets created if the previous execution is found. Then during execution instead of recomputing the step, pipelines propagates the values from the cache hit.

Timeout period is defined using ISO 8601 format, it can contain a year, month, week, day, hour, and minute value.

More details on SageMaker Pipeline step caching can be found here.

from sagemaker.workflow.steps import CacheConfig

cache_config = CacheConfig(enable_caching=True, expire_after="PT1H") # PT1H represents `one hour`

2.6. Configure the TrainingStep

Now configure the TrainingStep calling the outputs of the processing step:

from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

training_step = TrainingStep(
    name='Train',
    estimator=estimator,
    inputs={
        'train': TrainingInput(
            s3_data=processing_step.properties.ProcessingOutputConfig.Outputs[
                'sentiment-train'
            ].S3Output.S3Uri,
            content_type='text/csv'
        ),
        'validation': TrainingInput(
            s3_data=processing_step.properties.ProcessingOutputConfig.Outputs[
                'sentiment-validation'
            ].S3Output.S3Uri,
            content_type='text/csv'
        )
    },
    cache_config=cache_config
)

print(training_step)
TrainingStep(name='Train', step_type=<StepTypeEnum.TRAINING: 'Training'>)

Exercise 3

Use __dict__ method to print out attributes of the training step properties. Briefly review the result. The attributes match the object model of the DescribeTrainingJob response object.

### BEGIN SOLUTION - DO NOT delete this comment for grading purposes
training_step.properties.__dict__
### END SOLUTION - DO NOT delete this comment for grading purposes
{'_path': 'Steps.Train',
 '_shape_name': 'DescribeTrainingJobResponse',
 'TrainingJobName': <sagemaker.workflow.properties.Properties at 0x7f618d1442d0>,
 'TrainingJobArn': <sagemaker.workflow.properties.Properties at 0x7f618d144310>,
 'TuningJobArn': <sagemaker.workflow.properties.Properties at 0x7f618d144350>,
 'LabelingJobArn': <sagemaker.workflow.properties.Properties at 0x7f618d144390>,
 'AutoMLJobArn': <sagemaker.workflow.properties.Properties at 0x7f618d1443d0>,
 'ModelArtifacts': <sagemaker.workflow.properties.Properties at 0x7f618d144410>,
 'TrainingJobStatus': <sagemaker.workflow.properties.Properties at 0x7f618d144490>,
 'SecondaryStatus': <sagemaker.workflow.properties.Properties at 0x7f618d1444d0>,
 'FailureReason': <sagemaker.workflow.properties.Properties at 0x7f618d144510>,
 'HyperParameters': <sagemaker.workflow.properties.Properties at 0x7f618d144550>,
 'AlgorithmSpecification': <sagemaker.workflow.properties.Properties at 0x7f618d144590>,
 'RoleArn': <sagemaker.workflow.properties.Properties at 0x7f618d144890>,
 'InputDataConfig': <sagemaker.workflow.properties.PropertiesList at 0x7f618d1448d0>,
 'OutputDataConfig': <sagemaker.workflow.properties.Properties at 0x7f618d144910>,
 'ResourceConfig': <sagemaker.workflow.properties.Properties at 0x7f618d144a10>,
 'VpcConfig': <sagemaker.workflow.properties.Properties at 0x7f618d144bd0>,
 'StoppingCondition': <sagemaker.workflow.properties.Properties at 0x7f618d144c90>,
 'CreationTime': <sagemaker.workflow.properties.Properties at 0x7f618d144d50>,
 'TrainingStartTime': <sagemaker.workflow.properties.Properties at 0x7f618d144d90>,
 'TrainingEndTime': <sagemaker.workflow.properties.Properties at 0x7f618d144dd0>,
 'LastModifiedTime': <sagemaker.workflow.properties.Properties at 0x7f618d144e10>,
 'SecondaryStatusTransitions': <sagemaker.workflow.properties.PropertiesList at 0x7f618d144e50>,
 'FinalMetricDataList': <sagemaker.workflow.properties.PropertiesList at 0x7f618d144e90>,
 'EnableNetworkIsolation': <sagemaker.workflow.properties.Properties at 0x7f618d144ed0>,
 'EnableInterContainerTrafficEncryption': <sagemaker.workflow.properties.Properties at 0x7f618d144f10>,
 'EnableManagedSpotTraining': <sagemaker.workflow.properties.Properties at 0x7f618d144f50>,
 'CheckpointConfig': <sagemaker.workflow.properties.Properties at 0x7f618d144f90>,
 'TrainingTimeInSeconds': <sagemaker.workflow.properties.Properties at 0x7f618d148090>,
 'BillableTimeInSeconds': <sagemaker.workflow.properties.Properties at 0x7f618d1480d0>,
 'DebugHookConfig': <sagemaker.workflow.properties.Properties at 0x7f618d148110>,
 'ExperimentConfig': <sagemaker.workflow.properties.Properties at 0x7f618d148250>,
 'DebugRuleConfigurations': <sagemaker.workflow.properties.PropertiesList at 0x7f618d148390>,
 'TensorBoardOutputConfig': <sagemaker.workflow.properties.Properties at 0x7f618d1483d0>,
 'DebugRuleEvaluationStatuses': <sagemaker.workflow.properties.PropertiesList at 0x7f618d148490>,
 'ProfilerConfig': <sagemaker.workflow.properties.Properties at 0x7f618d1484d0>,
 'ProfilerRuleConfigurations': <sagemaker.workflow.properties.PropertiesList at 0x7f618d148610>,
 'ProfilerRuleEvaluationStatuses': <sagemaker.workflow.properties.PropertiesList at 0x7f618d148650>,
 'ProfilingStatus': <sagemaker.workflow.properties.Properties at 0x7f618d148690>,
 'RetryStrategy': <sagemaker.workflow.properties.Properties at 0x7f618d1486d0>,
 'Environment': <sagemaker.workflow.properties.Properties at 0x7f618d148750>,
 'WarmPoolStatus': <sagemaker.workflow.properties.Properties at 0x7f618d148790>}

3. Configure model-evaluation step

First, develop an evaluation script that will be specified in the model evaluation processing step. The evaluation script users the trained model and the test dataset to produce a JSON file with classification evaluation metrics such as accuracy.

After pipeline execution, you will examine the resulting evaluation.json for analysis.

The evaluation script performs the following steps:

Create an instance of the SKLearnProcessor to run our evaluation script as a scikit-learn-based SageMaker processing job.

from sagemaker.sklearn.processing import SKLearnProcessor

evaluation_processor = SKLearnProcessor(
    framework_version='0.23-1',
    role=role,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    env={'AWS_DEFAULT_REGION': region},
    max_runtime_in_seconds=7200
)

Setup the output PropertyFile.

from sagemaker.workflow.properties import PropertyFile

evaluation_report = PropertyFile(
    name='EvaluationReport',
    output_name='metrics',
    path='evaluation.json'
)

Use the processor instance to construct a ProcessingStep, along with the input and output channels and the code that will be executed when the pipeline invokes pipeline execution. This is very similar to a processor instance’s run method.

from sagemaker.processing import ProcessingInput, ProcessingOutput

evaluation_step = ProcessingStep(
    name='EvaluateModel',
    processor=evaluation_processor,
    code='src/evaluate_model_metrics.py',
    inputs=[
        ProcessingInput(
            source=training_step.properties.ModelArtifacts.S3ModelArtifacts,
            destination='/opt/ml/processing/input/model'
        ),
        ProcessingInput(
            source=processing_step.properties.ProcessingOutputConfig.Outputs['sentiment-test'].S3Output.S3Uri,
            destination='/opt/ml/processing/input/data'
        )
    ],
    outputs=[
        ProcessingOutput(output_name='metrics', 
                         s3_upload_mode='EndOfJob',
                         source='/opt/ml/processing/output/metrics/'),
    ],
    job_arguments=[
        '--max-seq-length', str(max_seq_length.default_value),
    ],
    property_files=[evaluation_report],
)

4. Configure and register model step

4.1. Configure the model for deployment

Use the estimator instance that was used for the training step to construct an instance of RegisterModel. The result of executing RegisterModel in a pipeline is a model package. A model package is a reusable model artifacts abstraction that packages all ingredients necessary for inference. Primarily, it consists of an inference specification that defines the inference image to use along with an optional model weights location.

A model package group is a collection of model packages. You can create a model package group for a specific ML business problem, and you can keep adding versions/model packages into it. Typically, customers are expected to create a ModelPackageGroup for a SageMaker workflow pipeline so that they can keep adding versions/model packages to the group for every workflow pipeline run.

The construction of RegisterModel is very similar to an estimator instance’s register method, for those familiar with the existing Python SDK.

In particular, you will pass in the S3ModelArtifacts from the training_step properties.

Of note, here you will be provided a specific model package group name which will be used in the Model Registry and Continuous Integration/Continuous Deployment (CI/CD) work later on. Let’s setup the variables.

model_approval_status = ParameterString(
    name="ModelApprovalStatus",
    default_value="PendingManualApproval"
)

deploy_instance_type = ParameterString(
    name="DeployInstanceType",
    default_value="ml.m5.large"
)

deploy_instance_count = ParameterInteger(
    name="DeployInstanceCount",
    default_value=1
)
model_package_group_name = f"BERT-Reviews-{timestamp}"

print(model_package_group_name)
BERT-Reviews-1698120918

Configure the ModelMetrics to be stored as metadata.

from sagemaker.model_metrics import MetricsSource, ModelMetrics 

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            evaluation_step.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json"
    )
)

print(model_metrics)
<sagemaker.model_metrics.ModelMetrics object at 0x7f618cd031d0>

Define deployment image for inference.

inference_image_uri = sagemaker.image_uris.retrieve(
    framework="pytorch",
    region=region,
    version="1.6.0",
    py_version="py36",
    instance_type=deploy_instance_type,
    image_scope="inference"
)
print(inference_image_uri)
763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-inference:1.6.0-cpu-py36

4.2. Register the model for deployment

Exercise 4

Configure the register model step.

Instructions: Pass the inference image defined above into the image_uri argument of the function RegisterModel. Review the rest of the arguments.

from sagemaker.workflow.step_collections import RegisterModel

register_step = RegisterModel(
    name="RegisterModel",
    estimator=estimator,
    ### BEGIN SOLUTION - DO NOT delete this comment for grading purposes
    image_uri=inference_image_uri,
    ### END SOLUTION - DO NOT delete this comment for grading purposes
    model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["application/jsonlines"],
    response_types=["application/jsonlines"],
    inference_instances=[deploy_instance_type],
    transform_instances=[deploy_instance_type], # batch transform is not used in this lab
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics
)

5. Create model for deployment step

Exercise 5

Configure model for deployment.

Instructions: Pass the same inference image into the image_uri argument of the function Model.

from sagemaker.model import Model

model_name = 'bert-model-{}'.format(timestamp)

model = Model(
    name=model_name,
    ### BEGIN SOLUTION - DO NOT delete this comment for grading purposes
    image_uri=inference_image_uri,
    ### END SOLUTION - DO NOT delete this comment for grading purposes
    model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sess,
    role=role,
)

Now configure create model input:

from sagemaker.inputs import CreateModelInput

create_inputs = CreateModelInput(
    instance_type=deploy_instance_type, 
)

Exercise 6

Configure create model step for the workflow.

Instructions: Pass defined above model (the model object, not its name) and model inputs configuration into the related arguments of the function CreateModelStep.

from sagemaker.workflow.steps import CreateModelStep

create_step = CreateModelStep(
    name="CreateModel",
    ### BEGIN SOLUTION - DO NOT delete this comment for grading purposes
    model=model,
    inputs=create_inputs,
    ### END SOLUTION - DO NOT delete this comment for grading purposes
)

6. Check accuracy condition step

Finally, you would like to only register this model if the accuracy of the model, as determined by our evaluation step evaluation_step, exceeded some value. A ConditionStep allows for pipelines to support conditional execution in the pipeline DAG based on conditions of step properties.

Below, you will:

min_accuracy_value = ParameterFloat(
    name="MinAccuracyValue",
    default_value=0.33 # random choice from three classes
)
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import (
    ConditionStep,
    JsonGet,
)

minimum_accuracy_condition = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step=evaluation_step,
        property_file=evaluation_report,
        json_path="metrics.accuracy.value",
    ),
    right=min_accuracy_value # minimum accuracy threshold
)

minimum_accuracy_condition_step = ConditionStep(
    name="AccuracyCondition",
    conditions=[minimum_accuracy_condition],
    if_steps=[register_step, create_step], # successfully exceeded or equaled the minimum accuracy, continue with model registration
    else_steps=[], # did not exceed the minimum accuracy, the model will not be registered
)

7. Create pipeline

7.1. Define a pipeline of parameters, steps, and conditions

Let’s tie it all up into a workflow pipeline so you can execute it, and even schedule it.

A pipeline requires a name, parameters, and steps. Names must be unique within an (account, region) pair so you can append the timestamp to the name to reduce the chance of name conflict.

Note:

from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        input_data,
        processing_instance_count,
        processing_instance_type,
        max_seq_length,
        balance_dataset,
        train_split_percentage,
        validation_split_percentage,
        test_split_percentage,
        feature_store_offline_prefix,
        feature_group_name,
        epochs,
        learning_rate,
        train_batch_size,
        train_steps_per_epoch,
        validation_batch_size,
        validation_steps_per_epoch,
        freeze_bert_layer,
        seed,
        train_instance_count,
        train_instance_type,
        train_volume_size,        
        input_mode,
        run_validation,
        min_accuracy_value,
        model_approval_status,
        deploy_instance_type,
        deploy_instance_count
    ],
    steps=[processing_step, training_step, evaluation_step, minimum_accuracy_condition_step],
    sagemaker_session=sess,
)

Let’s examine the JSON of the pipeline definition that meets the SageMaker Workflow Pipeline DSL specification.

By examining the definition, you are also confirming that the pipeline was well-defined, and that the parameters and step properties resolve correctly.

import json
from pprint import pprint

definition = json.loads(pipeline.definition())

pprint(definition)
No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config


{'Metadata': {},
 'Parameters': [{'DefaultValue': 's3://dlai-practical-data-science/data/raw/',
                 'Name': 'InputData',
                 'Type': 'String'},
                {'DefaultValue': 1,
                 'Name': 'ProcessingInstanceCount',
                 'Type': 'Integer'},
                {'DefaultValue': 'ml.c5.2xlarge',
                 'Name': 'ProcessingInstanceType',
                 'Type': 'String'},
                {'DefaultValue': 128,
                 'Name': 'MaxSeqLength',
                 'Type': 'Integer'},
                {'DefaultValue': 'True',
                 'Name': 'BalanceDataset',
                 'Type': 'String'},
                {'DefaultValue': 0.9,
                 'Name': 'TrainSplitPercentage',
                 'Type': 'Float'},
                {'DefaultValue': 0.05,
                 'Name': 'ValidationSplitPercentage',
                 'Type': 'Float'},
                {'DefaultValue': 0.05,
                 'Name': 'TestSplitPercentage',
                 'Type': 'Float'},
                {'DefaultValue': 'reviews-feature-store-1698120918',
                 'Name': 'FeatureStoreOfflinePrefix',
                 'Type': 'String'},
                {'DefaultValue': 'reviews-feature-group-1698120918',
                 'Name': 'FeatureGroupName',
                 'Type': 'String'},
                {'DefaultValue': 3, 'Name': 'Epochs', 'Type': 'Integer'},
                {'DefaultValue': 1e-05,
                 'Name': 'LearningRate',
                 'Type': 'Float'},
                {'DefaultValue': 64,
                 'Name': 'TrainBatchSize',
                 'Type': 'Integer'},
                {'DefaultValue': 50,
                 'Name': 'TrainStepsPerEpoch',
                 'Type': 'Integer'},
                {'DefaultValue': 64,
                 'Name': 'ValidationBatchSize',
                 'Type': 'Integer'},
                {'DefaultValue': 50,
                 'Name': 'ValidationStepsPerEpoch',
                 'Type': 'Integer'},
                {'DefaultValue': 'False',
                 'Name': 'FreezeBertLayer',
                 'Type': 'String'},
                {'DefaultValue': 42, 'Name': 'Seed', 'Type': 'Integer'},
                {'DefaultValue': 1,
                 'Name': 'TrainInstanceCount',
                 'Type': 'Integer'},
                {'DefaultValue': 'ml.c5.9xlarge',
                 'Name': 'TrainInstanceType',
                 'Type': 'String'},
                {'DefaultValue': 256,
                 'Name': 'TrainVolumeSize',
                 'Type': 'Integer'},
                {'DefaultValue': 'File', 'Name': 'InputMode', 'Type': 'String'},
                {'DefaultValue': 'True',
                 'Name': 'RunValidation',
                 'Type': 'String'},
                {'DefaultValue': 0.33,
                 'Name': 'MinAccuracyValue',
                 'Type': 'Float'},
                {'DefaultValue': 'PendingManualApproval',
                 'Name': 'ModelApprovalStatus',
                 'Type': 'String'},
                {'DefaultValue': 'ml.m5.large',
                 'Name': 'DeployInstanceType',
                 'Type': 'String'},
                {'DefaultValue': 1,
                 'Name': 'DeployInstanceCount',
                 'Type': 'Integer'}],
 'Steps': [{'Arguments': {'AppSpecification': {'ContainerArguments': ['--train-split-percentage',
                                                                      '0.9',
                                                                      '--validation-split-percentage',
                                                                      '0.05',
                                                                      '--test-split-percentage',
                                                                      '0.05',
                                                                      '--balance-dataset',
                                                                      'True',
                                                                      '--max-seq-length',
                                                                      '128',
                                                                      '--feature-store-offline-prefix',
                                                                      'reviews-feature-store-1698120918',
                                                                      '--feature-group-name',
                                                                      'reviews-feature-group-1698120918'],
                                               'ContainerEntrypoint': ['python3',
                                                                       '/opt/ml/processing/input/code/prepare_data.py'],
                                               'ImageUri': '683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-scikit-learn:0.23-1-cpu-py3'},
                          'Environment': {'AWS_DEFAULT_REGION': 'us-east-1'},
                          'ProcessingInputs': [{'AppManaged': False,
                                                'InputName': 'raw-input-data',
                                                'S3Input': {'LocalPath': '/opt/ml/processing/input/data/',
                                                            'S3CompressionType': 'None',
                                                            'S3DataDistributionType': 'ShardedByS3Key',
                                                            'S3DataType': 'S3Prefix',
                                                            'S3InputMode': 'File',
                                                            'S3Uri': {'Get': 'Parameters.InputData'}}},
                                               {'AppManaged': False,
                                                'InputName': 'code',
                                                'S3Input': {'LocalPath': '/opt/ml/processing/input/code',
                                                            'S3CompressionType': 'None',
                                                            'S3DataDistributionType': 'FullyReplicated',
                                                            'S3DataType': 'S3Prefix',
                                                            'S3InputMode': 'File',
                                                            'S3Uri': 's3://sagemaker-us-east-1-440427790495/sagemaker-scikit-learn-2023-10-24-04-18-21-035/input/code/prepare_data.py'}}],
                          'ProcessingOutputConfig': {'Outputs': [{'AppManaged': False,
                                                                  'OutputName': 'sentiment-train',
                                                                  'S3Output': {'LocalPath': '/opt/ml/processing/output/sentiment/train',
                                                                               'S3UploadMode': 'EndOfJob',
                                                                               'S3Uri': 's3://sagemaker-us-east-1-440427790495/sagemaker-scikit-learn-2023-10-24-04-15-39-910/output/sentiment-train'}},
                                                                 {'AppManaged': False,
                                                                  'OutputName': 'sentiment-validation',
                                                                  'S3Output': {'LocalPath': '/opt/ml/processing/output/sentiment/validation',
                                                                               'S3UploadMode': 'EndOfJob',
                                                                               'S3Uri': 's3://sagemaker-us-east-1-440427790495/sagemaker-scikit-learn-2023-10-24-04-15-39-910/output/sentiment-validation'}},
                                                                 {'AppManaged': False,
                                                                  'OutputName': 'sentiment-test',
                                                                  'S3Output': {'LocalPath': '/opt/ml/processing/output/sentiment/test',
                                                                               'S3UploadMode': 'EndOfJob',
                                                                               'S3Uri': 's3://sagemaker-us-east-1-440427790495/sagemaker-scikit-learn-2023-10-24-04-15-39-910/output/sentiment-test'}}]},
                          'ProcessingResources': {'ClusterConfig': {'InstanceCount': {'Get': 'Parameters.ProcessingInstanceCount'},
                                                                    'InstanceType': {'Get': 'Parameters.ProcessingInstanceType'},
                                                                    'VolumeSizeInGB': 30}},
                          'RoleArn': 'arn:aws:iam::440427790495:role/sagemaker-studio-vpc-firewall-us-east-1-sagemaker-execution-role'},
            'Name': 'Processing',
            'Type': 'Processing'},
           {'Arguments': {'AlgorithmSpecification': {'EnableSageMakerMetricsTimeSeries': True,
                                                     'MetricDefinitions': [{'Name': 'validation:loss',
                                                                            'Regex': 'val_loss: '
                                                                                     '([0-9.]+)'},
                                                                           {'Name': 'validation:accuracy',
                                                                            'Regex': 'val_acc: '
                                                                                     '([0-9.]+)'}],
                                                     'TrainingImage': '763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-training:1.6.0-cpu-py3',
                                                     'TrainingInputMode': {'Get': 'Parameters.InputMode'}},
                          'DebugHookConfig': {'CollectionConfigurations': [],
                                              'S3OutputPath': 's3://sagemaker-us-east-1-440427790495/'},
                          'HyperParameters': {'epochs': '3',
                                              'freeze_bert_layer': '"False"',
                                              'learning_rate': '1e-05',
                                              'max_seq_length': '128',
                                              'run_validation': '"True"',
                                              'sagemaker_container_log_level': '20',
                                              'sagemaker_job_name': '"pytorch-training-2023-10-24-04-18-21-197"',
                                              'sagemaker_program': '"train.py"',
                                              'sagemaker_region': '"us-east-1"',
                                              'sagemaker_submit_directory': '"s3://sagemaker-us-east-1-440427790495/pytorch-training-2023-10-24-04-18-21-197/source/sourcedir.tar.gz"',
                                              'seed': '42',
                                              'train_batch_size': '64',
                                              'train_steps_per_epoch': '50',
                                              'validation_batch_size': '64',
                                              'validation_steps_per_epoch': '50'},
                          'InputDataConfig': [{'ChannelName': 'train',
                                               'ContentType': 'text/csv',
                                               'DataSource': {'S3DataSource': {'S3DataDistributionType': 'FullyReplicated',
                                                                               'S3DataType': 'S3Prefix',
                                                                               'S3Uri': {'Get': "Steps.Processing.ProcessingOutputConfig.Outputs['sentiment-train'].S3Output.S3Uri"}}}},
                                              {'ChannelName': 'validation',
                                               'ContentType': 'text/csv',
                                               'DataSource': {'S3DataSource': {'S3DataDistributionType': 'FullyReplicated',
                                                                               'S3DataType': 'S3Prefix',
                                                                               'S3Uri': {'Get': "Steps.Processing.ProcessingOutputConfig.Outputs['sentiment-validation'].S3Output.S3Uri"}}}}],
                          'OutputDataConfig': {'S3OutputPath': 's3://sagemaker-us-east-1-440427790495/'},
                          'ProfilerConfig': {'S3OutputPath': 's3://sagemaker-us-east-1-440427790495/'},
                          'ProfilerRuleConfigurations': [{'RuleConfigurationName': 'ProfilerReport-1698121101',
                                                          'RuleEvaluatorImage': '503895931360.dkr.ecr.us-east-1.amazonaws.com/sagemaker-debugger-rules:latest',
                                                          'RuleParameters': {'rule_to_invoke': 'ProfilerReport'}}],
                          'ResourceConfig': {'InstanceCount': {'Get': 'Parameters.TrainInstanceCount'},
                                             'InstanceType': {'Get': 'Parameters.TrainInstanceType'},
                                             'VolumeSizeInGB': {'Get': 'Parameters.TrainVolumeSize'}},
                          'RoleArn': 'arn:aws:iam::440427790495:role/sagemaker-studio-vpc-firewall-us-east-1-sagemaker-execution-role',
                          'StoppingCondition': {'MaxRuntimeInSeconds': 86400}},
            'CacheConfig': {'Enabled': True, 'ExpireAfter': 'PT1H'},
            'Name': 'Train',
            'Type': 'Training'},
           {'Arguments': {'AppSpecification': {'ContainerArguments': ['--max-seq-length',
                                                                      '128'],
                                               'ContainerEntrypoint': ['python3',
                                                                       '/opt/ml/processing/input/code/evaluate_model_metrics.py'],
                                               'ImageUri': '683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-scikit-learn:0.23-1-cpu-py3'},
                          'Environment': {'AWS_DEFAULT_REGION': 'us-east-1'},
                          'ProcessingInputs': [{'AppManaged': False,
                                                'InputName': 'input-1',
                                                'S3Input': {'LocalPath': '/opt/ml/processing/input/model',
                                                            'S3CompressionType': 'None',
                                                            'S3DataDistributionType': 'FullyReplicated',
                                                            'S3DataType': 'S3Prefix',
                                                            'S3InputMode': 'File',
                                                            'S3Uri': {'Get': 'Steps.Train.ModelArtifacts.S3ModelArtifacts'}}},
                                               {'AppManaged': False,
                                                'InputName': 'input-2',
                                                'S3Input': {'LocalPath': '/opt/ml/processing/input/data',
                                                            'S3CompressionType': 'None',
                                                            'S3DataDistributionType': 'FullyReplicated',
                                                            'S3DataType': 'S3Prefix',
                                                            'S3InputMode': 'File',
                                                            'S3Uri': {'Get': "Steps.Processing.ProcessingOutputConfig.Outputs['sentiment-test'].S3Output.S3Uri"}}},
                                               {'AppManaged': False,
                                                'InputName': 'code',
                                                'S3Input': {'LocalPath': '/opt/ml/processing/input/code',
                                                            'S3CompressionType': 'None',
                                                            'S3DataDistributionType': 'FullyReplicated',
                                                            'S3DataType': 'S3Prefix',
                                                            'S3InputMode': 'File',
                                                            'S3Uri': 's3://sagemaker-us-east-1-440427790495/sagemaker-scikit-learn-2023-10-24-04-18-21-691/input/code/evaluate_model_metrics.py'}}],
                          'ProcessingOutputConfig': {'Outputs': [{'AppManaged': False,
                                                                  'OutputName': 'metrics',
                                                                  'S3Output': {'LocalPath': '/opt/ml/processing/output/metrics/',
                                                                               'S3UploadMode': 'EndOfJob',
                                                                               'S3Uri': 's3://sagemaker-us-east-1-440427790495/sagemaker-scikit-learn-2023-10-24-04-17-46-872/output/metrics'}}]},
                          'ProcessingResources': {'ClusterConfig': {'InstanceCount': {'Get': 'Parameters.ProcessingInstanceCount'},
                                                                    'InstanceType': {'Get': 'Parameters.ProcessingInstanceType'},
                                                                    'VolumeSizeInGB': 30}},
                          'RoleArn': 'arn:aws:iam::440427790495:role/sagemaker-studio-vpc-firewall-us-east-1-sagemaker-execution-role',
                          'StoppingCondition': {'MaxRuntimeInSeconds': 7200}},
            'Name': 'EvaluateModel',
            'PropertyFiles': [{'FilePath': 'evaluation.json',
                               'OutputName': 'metrics',
                               'PropertyFileName': 'EvaluationReport'}],
            'Type': 'Processing'},
           {'Arguments': {'Conditions': [{'LeftValue': {'Std:JsonGet': {'Path': 'metrics.accuracy.value',
                                                                        'PropertyFile': {'Get': 'Steps.EvaluateModel.PropertyFiles.EvaluationReport'}}},
                                          'RightValue': {'Get': 'Parameters.MinAccuracyValue'},
                                          'Type': 'GreaterThanOrEqualTo'}],
                          'ElseSteps': [],
                          'IfSteps': [{'Arguments': {'InferenceSpecification': {'Containers': [{'Image': '763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-inference:1.6.0-cpu-py36',
                                                                                                'ModelDataUrl': {'Get': 'Steps.Train.ModelArtifacts.S3ModelArtifacts'}}],
                                                                                'SupportedContentTypes': ['application/jsonlines'],
                                                                                'SupportedRealtimeInferenceInstanceTypes': [{'Get': 'Parameters.DeployInstanceType'}],
                                                                                'SupportedResponseMIMETypes': ['application/jsonlines'],
                                                                                'SupportedTransformInstanceTypes': [{'Get': 'Parameters.DeployInstanceType'}]},
                                                     'ModelApprovalStatus': {'Get': 'Parameters.ModelApprovalStatus'},
                                                     'ModelMetrics': {'ModelQuality': {'Statistics': {'ContentType': 'application/json',
                                                                                                      'S3Uri': 's3://sagemaker-us-east-1-440427790495/sagemaker-scikit-learn-2023-10-24-04-17-46-872/output/metrics/evaluation.json'}}},
                                                     'ModelPackageGroupName': 'BERT-Reviews-1698120918'},
                                       'Name': 'RegisterModel',
                                       'Type': 'RegisterModel'},
                                      {'Arguments': {'ExecutionRoleArn': 'arn:aws:iam::440427790495:role/sagemaker-studio-vpc-firewall-us-east-1-sagemaker-execution-role',
                                                     'PrimaryContainer': {'Environment': {},
                                                                          'Image': '763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-inference:1.6.0-cpu-py36',
                                                                          'ModelDataUrl': {'Get': 'Steps.Train.ModelArtifacts.S3ModelArtifacts'}}},
                                       'Name': 'CreateModel',
                                       'Type': 'Model'}]},
            'Name': 'AccuracyCondition',
            'Type': 'Condition'}],
 'Version': '2020-12-01'}

Ignore the WARNING below

Create pipeline using the create method and then print the Amazon Resource Name (ARN) of it.

response = pipeline.create(role_arn=role)

pipeline_arn = response["PipelineArn"]
print(pipeline_arn)
No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config


arn:aws:sagemaker:us-east-1:440427790495:pipeline/BERT-pipeline-1698120918

Ignore the WARNING ^^ above ^^

7.2. Start Pipeline

Let’s submit our pipeline definition to the Amazon SageMaker Pipeline service. The role passed in will be used by the service to create all the jobs defined in the steps. You will start the pipeline using the parameters passed into the start() function.

execution = pipeline.start(
    parameters=dict(
        InputData=raw_input_data_s3_uri,
        ProcessingInstanceCount=1,
        ProcessingInstanceType='ml.c5.2xlarge',
        MaxSeqLength=128,
        BalanceDataset='True',
        TrainSplitPercentage=0.9,
        ValidationSplitPercentage=0.05,
        TestSplitPercentage=0.05,
        FeatureStoreOfflinePrefix='reviews-feature-store-'+str(timestamp),
        FeatureGroupName='reviews-feature-group-'+str(timestamp),
        Epochs=3,
        LearningRate=0.000012,
        TrainBatchSize=64,
        TrainStepsPerEpoch=50,
        ValidationBatchSize=64,
        ValidationStepsPerEpoch=64,
        FreezeBertLayer='False',
        Seed=42,         
        TrainInstanceCount=1,
        TrainInstanceType='ml.c5.9xlarge',
        TrainVolumeSize=256,
        InputMode='File',
        RunValidation='True',
        MinAccuracyValue=0.01,
        ModelApprovalStatus='PendingManualApproval', 
        DeployInstanceType='ml.m5.large',
        DeployInstanceCount=1 
    )
)

print(execution.arn)
arn:aws:sagemaker:us-east-1:440427790495:pipeline/BERT-pipeline-1698120918/execution/7j3goj91hb3b

7.3. Wait for pipeline execution

Now you can describe execution instance and list the steps in the execution to find out more about the execution.

from pprint import pprint

execution_run = execution.describe()
pprint(execution_run)
{'CreatedBy': {'DomainId': 'd-hxd7ruk4hbjz',
               'UserProfileArn': 'arn:aws:sagemaker:us-east-1:440427790495:user-profile/d-hxd7ruk4hbjz/sagemaker-user-profile-us-east-1',
               'UserProfileName': 'sagemaker-user-profile-us-east-1'},
 'CreationTime': datetime.datetime(2023, 10, 24, 4, 18, 35, 584000, tzinfo=tzlocal()),
 'LastModifiedBy': {'DomainId': 'd-hxd7ruk4hbjz',
                    'UserProfileArn': 'arn:aws:sagemaker:us-east-1:440427790495:user-profile/d-hxd7ruk4hbjz/sagemaker-user-profile-us-east-1',
                    'UserProfileName': 'sagemaker-user-profile-us-east-1'},
 'LastModifiedTime': datetime.datetime(2023, 10, 24, 4, 18, 35, 584000, tzinfo=tzlocal()),
 'PipelineArn': 'arn:aws:sagemaker:us-east-1:440427790495:pipeline/BERT-pipeline-1698120918',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:440427790495:pipeline/BERT-pipeline-1698120918/execution/7j3goj91hb3b',
 'PipelineExecutionDisplayName': 'execution-1698121115636',
 'PipelineExecutionStatus': 'Executing',
 'ResponseMetadata': {'HTTPHeaders': {'content-length': '815',
                                      'content-type': 'application/x-amz-json-1.1',
                                      'date': 'Tue, 24 Oct 2023 04:18:37 GMT',
                                      'x-amzn-requestid': '1abf8f02-b30a-4382-a1d5-b0c04e9d6fc1'},
                      'HTTPStatusCode': 200,
                      'RequestId': '1abf8f02-b30a-4382-a1d5-b0c04e9d6fc1',
                      'RetryAttempts': 0}}

Print the execution display name and its ARN:

execution_run_name = execution_run['PipelineExecutionDisplayName']
print(execution_run_name)
execution-1698121115636
pipeline_execution_arn = execution_run['PipelineExecutionArn']
print(pipeline_execution_arn)
arn:aws:sagemaker:us-east-1:440427790495:pipeline/BERT-pipeline-1698120918/execution/7j3goj91hb3b

7.4. Describe completed pipeline

Wait for the first step to start running and print the information about it:

import time

time.sleep(30)

execution.list_steps()
[{'StepName': 'Processing',
  'StartTime': datetime.datetime(2023, 10, 24, 4, 18, 36, 93000, tzinfo=tzlocal()),
  'StepStatus': 'Executing',
  'AttemptCount': 0,
  'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:440427790495:processing-job/pipelines-7j3goj91hb3b-Processing-gkscLfBKVl'}}}]

7.5. Wait for the pipeline to complete

To get the information about the pipeline execution you can use low-level service client of the boto3 session. It is also useful for other operations that you will see below.

In the code below you will be observing the pipeline execution summary and waiting for the execution status to change from Executing to Succeeded.

This cell will take approximately 30-45 minutes to run.

%%time

import time
from pprint import pprint

sm = boto3.Session().client(service_name='sagemaker', region_name=region)

executions_response = sm.list_pipeline_executions(PipelineName=pipeline_name)['PipelineExecutionSummaries']
pipeline_execution_status = executions_response[0]['PipelineExecutionStatus']
print(pipeline_execution_status)

while pipeline_execution_status=='Executing':
    try:
        executions_response = sm.list_pipeline_executions(PipelineName=pipeline_name)['PipelineExecutionSummaries']
        pipeline_execution_status = executions_response[0]['PipelineExecutionStatus']
    except Exception as e:
        print('Please wait...')
        time.sleep(30)    
    
pprint(executions_response)
Executing
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
[{'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:440427790495:pipeline/BERT-pipeline-1698120918/execution/7j3goj91hb3b',
  'PipelineExecutionDisplayName': 'execution-1698121115636',
  'PipelineExecutionStatus': 'Succeeded',
  'StartTime': datetime.datetime(2023, 10, 24, 4, 18, 35, 584000, tzinfo=tzlocal())}]
CPU times: user 22 s, sys: 1.01 s, total: 23 s
Wall time: 32min 15s

Wait for the pipeline ^^ above ^^ to complete.

You can list the execution steps to check out the status and artifacts:

pipeline_execution_status = executions_response[0]['PipelineExecutionStatus']
print(pipeline_execution_status)
Succeeded
pipeline_execution_arn = executions_response[0]['PipelineExecutionArn']
print(pipeline_execution_arn)
arn:aws:sagemaker:us-east-1:440427790495:pipeline/BERT-pipeline-1698120918/execution/7j3goj91hb3b

8. Evaluate the model

8.1. Describe evaluation metrics

Examine the resulting model evaluation after the pipeline completes. Download the resulting evaluation.json file from S3 and print the report.

processing_job_name = None

# pull the processing step name
for execution_step in reversed(execution.list_steps()):
    if execution_step['StepName'] == 'Processing':
        processing_job_name=execution_step['Metadata']['ProcessingJob']['Arn'].split('/')[-1]

# get the description of the processing job
describe_transform_processing_job_response = sm.describe_processing_job(ProcessingJobName=processing_job_name)

# get the output S3 path
transform_output_s3_uri = describe_transform_processing_job_response['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri']
print('Transform output {}'.format(transform_output_s3_uri))
Transform output s3://sagemaker-us-east-1-440427790495/sagemaker-scikit-learn-2023-10-24-04-15-39-910/output/sentiment-train
# list the files in the resulting output S3 path
!aws s3 ls --recursive $transform_output_s3_uri
2023-10-24 04:30:31    4889808 sagemaker-scikit-learn-2023-10-24-04-15-39-910/output/sentiment-train/part-algo-1-womens_clothing_ecommerce_reviews.tsv

Exercise 7

Pull the name of the model-evaluation step and then get the S3 path of the evaluation metrics, which will contain the evaluation report.

Instructions: Find the execution step with the step name EvaluateModel following the example above.

processing_job_name = None

for execution_step in reversed(execution.list_steps()):
    ### BEGIN SOLUTION - DO NOT delete this comment for grading purposes
    if execution_step["StepName"] == "EvaluateModel":
    ### END SOLUTION - DO NOT delete this comment for grading purposes
        processing_job_name=execution_step['Metadata']['ProcessingJob']['Arn'].split('/')[-1]

describe_evaluation_processing_job_response = sm.describe_processing_job(ProcessingJobName=processing_job_name)

evaluation_metrics_s3_uri = describe_evaluation_processing_job_response['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri']
print('Evaluation output {}'.format(evaluation_metrics_s3_uri))
Evaluation output s3://sagemaker-us-east-1-440427790495/sagemaker-scikit-learn-2023-10-24-04-17-46-872/output/metrics

8.2. Review the evaluation report

Download the evaluation report and print the accuracy.

from pprint import pprint

evaluation_json = sagemaker.s3.S3Downloader.read_file("{}/evaluation.json".format(
    evaluation_metrics_s3_uri
))

pprint(json.loads(evaluation_json))
{'metrics': {'accuracy': {'value': 0.7443365695792881}}}

8.3. List pipeline artifacts

Exercise 8

Find and print the ARN and job name of the training job.

Instructions: Find the execution step with the step name Train following the example above.

training_job_arn=None

for execution_step in execution.list_steps():
    ### BEGIN SOLUTION - DO NOT delete this comment for grading purposes
    if execution_step["StepName"] == "Train":
    ### END SOLUTION - DO NOT delete this comment for grading purposes
        training_job_arn = execution_step['Metadata']['TrainingJob']['Arn']        
        pprint(execution_step)
        break
print('Training job ARN: {}'.format(training_job_arn))
        
training_job_name = training_job_arn.split('/')[-1]
print('Training job Name: {}'.format(training_job_name))
{'AttemptCount': 0,
 'EndTime': datetime.datetime(2023, 10, 24, 4, 44, 13, 903000, tzinfo=tzlocal()),
 'Metadata': {'TrainingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:440427790495:training-job/pipelines-7j3goj91hb3b-Train-BuWywrPxXU'}},
 'StartTime': datetime.datetime(2023, 10, 24, 4, 30, 38, 821000, tzinfo=tzlocal()),
 'StepName': 'Train',
 'StepStatus': 'Succeeded'}
Training job ARN: arn:aws:sagemaker:us-east-1:440427790495:training-job/pipelines-7j3goj91hb3b-Train-BuWywrPxXU
Training job Name: pipelines-7j3goj91hb3b-Train-BuWywrPxXU

Using similar approach you can find and print the pipeline artifacts.

processing_job_name=None
training_job_name=None
import time
from sagemaker.lineage.visualizer import LineageTableVisualizer

viz = LineageTableVisualizer(sagemaker.session.Session())

for execution_step in reversed(execution.list_steps()):
    pprint(execution_step)
    if execution_step['StepName'] == 'Processing':
        processing_job_name=execution_step['Metadata']['ProcessingJob']['Arn'].split('/')[-1]
        print('Processing job name: {}'.format(processing_job_name))
        display(viz.show(processing_job_name=processing_job_name))
    elif execution_step['StepName'] == 'Train':
        training_job_name=execution_step['Metadata']['TrainingJob']['Arn'].split('/')[-1]
        print('Training job name: {}'.format(training_job_name))
        display(viz.show(training_job_name=training_job_name))
    else:
        display(viz.show(pipeline_execution_step=execution_step))
        time.sleep(5)
{'AttemptCount': 0,
 'EndTime': datetime.datetime(2023, 10, 24, 4, 30, 38, 191000, tzinfo=tzlocal()),
 'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:440427790495:processing-job/pipelines-7j3goj91hb3b-Processing-gkscLfBKVl'}},
 'StartTime': datetime.datetime(2023, 10, 24, 4, 18, 36, 93000, tzinfo=tzlocal()),
 'StepName': 'Processing',
 'StepStatus': 'Succeeded'}
Processing job name: pipelines-7j3goj91hb3b-Processing-gkscLfBKVl
<style scoped> .dataframe tbody tr th:only-of-type { vertical-align: middle; }
.dataframe tbody tr th {
    vertical-align: top;
}

.dataframe thead th {
    text-align: right;
}
Name/Source Direction Type Association Type Lineage Type
0 s3://...-04-18-30-260/input/code/prepare_data.py Input DataSet ContributedTo artifact
1 s3://dlai-practical-data-science/data/raw/ Input DataSet ContributedTo artifact
2 68331...om/sagemaker-scikit-learn:0.23-1-cpu-py3 Input Image ContributedTo artifact
3 s3://...10-24-04-15-39-910/output/sentiment-test Output DataSet Produced artifact
4 s3://...04-15-39-910/output/sentiment-validation Output DataSet Produced artifact
5 s3://...0-24-04-15-39-910/output/sentiment-train Output DataSet Produced artifact
{'AttemptCount': 0,
 'EndTime': datetime.datetime(2023, 10, 24, 4, 44, 13, 903000, tzinfo=tzlocal()),
 'Metadata': {'TrainingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:440427790495:training-job/pipelines-7j3goj91hb3b-Train-BuWywrPxXU'}},
 'StartTime': datetime.datetime(2023, 10, 24, 4, 30, 38, 821000, tzinfo=tzlocal()),
 'StepName': 'Train',
 'StepStatus': 'Succeeded'}
Training job name: pipelines-7j3goj91hb3b-Train-BuWywrPxXU
<style scoped> .dataframe tbody tr th:only-of-type { vertical-align: middle; }
.dataframe tbody tr th {
    vertical-align: top;
}

.dataframe thead th {
    text-align: right;
}
Name/Source Direction Type Association Type Lineage Type
0 s3://...04-15-39-910/output/sentiment-validation Input DataSet ContributedTo artifact
1 s3://...0-24-04-15-39-910/output/sentiment-train Input DataSet ContributedTo artifact
2 76310...onaws.com/pytorch-training:1.6.0-cpu-py3 Input Image ContributedTo artifact
3 s3://...b3b-Train-BuWywrPxXU/output/model.tar.gz Output Model Produced artifact
{'AttemptCount': 0,
 'EndTime': datetime.datetime(2023, 10, 24, 4, 51, 30, 33000, tzinfo=tzlocal()),
 'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:440427790495:processing-job/pipelines-7j3goj91hb3b-EvaluateModel-TzjiP6obCC'}},
 'StartTime': datetime.datetime(2023, 10, 24, 4, 44, 15, 150000, tzinfo=tzlocal()),
 'StepName': 'EvaluateModel',
 'StepStatus': 'Succeeded'}
<style scoped> .dataframe tbody tr th:only-of-type { vertical-align: middle; }
.dataframe tbody tr th {
    vertical-align: top;
}

.dataframe thead th {
    text-align: right;
}
Name/Source Direction Type Association Type Lineage Type
0 s3://...596/input/code/evaluate_model_metrics.py Input DataSet ContributedTo artifact
1 s3://...10-24-04-15-39-910/output/sentiment-test Input DataSet ContributedTo artifact
2 s3://...b3b-Train-BuWywrPxXU/output/model.tar.gz Input Model ContributedTo artifact
3 68331...om/sagemaker-scikit-learn:0.23-1-cpu-py3 Input Image ContributedTo artifact
4 s3://...n-2023-10-24-04-17-46-872/output/metrics Output DataSet Produced artifact
{'AttemptCount': 0,
 'EndTime': datetime.datetime(2023, 10, 24, 4, 51, 31, 983000, tzinfo=tzlocal()),
 'Metadata': {'Condition': {'Outcome': 'True'}},
 'StartTime': datetime.datetime(2023, 10, 24, 4, 51, 31, 621000, tzinfo=tzlocal()),
 'StepName': 'AccuracyCondition',
 'StepStatus': 'Succeeded'}



None


{'AttemptCount': 0,
 'EndTime': datetime.datetime(2023, 10, 24, 4, 51, 34, 630000, tzinfo=tzlocal()),
 'Metadata': {'Model': {'Arn': 'arn:aws:sagemaker:us-east-1:440427790495:model/pipelines-7j3goj91hb3b-createmodel-2mt9xrqqbg'}},
 'StartTime': datetime.datetime(2023, 10, 24, 4, 51, 33, 118000, tzinfo=tzlocal()),
 'StepName': 'CreateModel',
 'StepStatus': 'Succeeded'}



None


{'AttemptCount': 0,
 'EndTime': datetime.datetime(2023, 10, 24, 4, 51, 34, 605000, tzinfo=tzlocal()),
 'Metadata': {'RegisterModel': {'Arn': 'arn:aws:sagemaker:us-east-1:440427790495:model-package/BERT-Reviews-1698120918/1'}},
 'StartTime': datetime.datetime(2023, 10, 24, 4, 51, 33, 118000, tzinfo=tzlocal()),
 'StepName': 'RegisterModel',
 'StepStatus': 'Succeeded'}
<style scoped> .dataframe tbody tr th:only-of-type { vertical-align: middle; }
.dataframe tbody tr th {
    vertical-align: top;
}

.dataframe thead th {
    text-align: right;
}
Name/Source Direction Type Association Type Lineage Type
0 s3://...b3b-Train-BuWywrPxXU/output/model.tar.gz Input Model ContributedTo artifact
1 76310...aws.com/pytorch-inference:1.6.0-cpu-py36 Input Image ContributedTo artifact
2 BERT-Reviews-1698120918-1-PendingManualApprova... Input Approval ContributedTo action
3 BERT-Reviews-1698120918-1698123094-aws-model-p... Output ModelGroup AssociatedWith context

9. Deploy and test the model

9.1. Approve trained model

The pipeline created a model package version within the specified model package group and an approval status of PendingManualApproval. This requires a separate step to manually approve the model before deploying to production.

You can approve the model using the SageMaker Studio UI or programmatically as shown below.

Get the model package ARN.

for execution_step in execution.list_steps():
    if execution_step['StepName'] == 'RegisterModel':
        model_package_arn = execution_step['Metadata']['RegisterModel']['Arn']
        break
print(model_package_arn)
arn:aws:sagemaker:us-east-1:440427790495:model-package/BERT-Reviews-1698120918/1

Update the model package with the Approved status to prepare for deployment.

The model must be Approved before it can be deployed.

model_package_update_response = sm.update_model_package(
    ModelPackageArn=model_package_arn,
    ModelApprovalStatus="Approved",
)

pprint(model_package_update_response)
{'ModelPackageArn': 'arn:aws:sagemaker:us-east-1:440427790495:model-package/BERT-Reviews-1698120918/1',
 'ResponseMetadata': {'HTTPHeaders': {'content-length': '102',
                                      'content-type': 'application/x-amz-json-1.1',
                                      'date': 'Tue, 24 Oct 2023 04:51:59 GMT',
                                      'x-amzn-requestid': 'e41e1142-7749-44dd-ac9e-92f11e322fad'},
                      'HTTPStatusCode': 200,
                      'RequestId': 'e41e1142-7749-44dd-ac9e-92f11e322fad',
                      'RetryAttempts': 0}}

9.2. Deploy model

Get the model ARN and the model name from it.

for execution_step in execution.list_steps():
    print(execution_step['StepName'])
    if execution_step['StepName'] == 'CreateModel':
        model_arn = execution_step['Metadata']['Model']['Arn']
        break
print(model_arn)

model_name = model_arn.split('/')[-1]
print(model_name)
RegisterModel
CreateModel
arn:aws:sagemaker:us-east-1:440427790495:model/pipelines-7j3goj91hb3b-createmodel-2mt9xrqqbg
pipelines-7j3goj91hb3b-createmodel-2mt9xrqqbg

9.3. Create endpoint from registry

Configure the endpoint.

endpoint_config_name = 'bert-model-epc-{}'.format(timestamp)
print(endpoint_config_name)

create_endpoint_config_response = sm.create_endpoint_config(
    EndpointConfigName = endpoint_config_name,
    ProductionVariants=[{
        'InstanceType':'ml.m5.xlarge',
        'InitialVariantWeight':1,
        'InitialInstanceCount':1,
        'ModelName': model_name,
        'VariantName':'AllTraffic'}])
bert-model-epc-1698120918

Create the endpoint.

pipeline_endpoint_name = 'bert-model-ep-{}'.format(timestamp)
print("EndpointName={}".format(pipeline_endpoint_name))

create_endpoint_response = sm.create_endpoint(
    EndpointName=pipeline_endpoint_name,
    EndpointConfigName=endpoint_config_name)
print(create_endpoint_response['EndpointArn'])
EndpointName=bert-model-ep-1698120918
arn:aws:sagemaker:us-east-1:440427790495:endpoint/bert-model-ep-1698120918
from IPython.core.display import display, HTML

display(HTML('<b>Review <a target="blank" href="https://console.aws.amazon.com/sagemaker/home?region={}#/endpoints/{}">SageMaker REST Endpoint</a></b>'.format(region, pipeline_endpoint_name)))

Review SageMaker REST Endpoint

Wait until the endpoint is deployed.

This cell will take approximately 5-10 minutes to run.

%%time

while True:
    try: 
        waiter = sm.get_waiter('endpoint_in_service')
        print('Waiting for endpoint to be in `InService`...')
        waiter.wait(EndpointName=pipeline_endpoint_name)
        break;
    except:
        print('Waiting for endpoint...')
        endpoint_status = sm.describe_endpoint(EndpointName=pipeline_endpoint_name)['EndpointStatus']
        print('Endpoint status: {}'.format(endpoint_status))
        if endpoint_status == 'Failed':
            break
        time.sleep(30)
        
print('Endpoint deployed.')
Waiting for endpoint to be in `InService`...
Endpoint deployed.
CPU times: user 51.9 ms, sys: 15.9 ms, total: 67.9 ms
Wall time: 4min 31s

Wait until the endpoint ^^ above ^^ is deployed.

9.4. Test model

Predict the sentiment with review_body samples and review the result:

from sagemaker.predictor import Predictor
from sagemaker.serializers import JSONLinesSerializer
from sagemaker.deserializers import JSONLinesDeserializer

inputs = [
    {"features": ["I love this product!"]},
    {"features": ["OK, but not great."]},
    {"features": ["This is not the right product."]},
]

predictor = Predictor(
    endpoint_name=pipeline_endpoint_name,
    serializer=JSONLinesSerializer(),
    deserializer=JSONLinesDeserializer(),
    sagemaker_session=sess
)

predicted_classes = predictor.predict(inputs)

for predicted_class in predicted_classes:
    print("Predicted class {} with probability {}".format(predicted_class['predicted_label'], predicted_class['probability']))
Predicted class 1 with probability 0.9319869875907898
Predicted class 0 with probability 0.3933015763759613
Predicted class -1 with probability 0.6994612812995911

9.5. SageMaker Studio extensions

SageMaker Studio provides a rich set of features to visually inspect SageMaker resources including pipelines, training jobs, and endpoints. Please take time to explore it opening the facet shown in the following image.

Congratulations! You have just deployed an end-to-end pipeline with BERT and SageMaker Pipelines.

Upload the notebook into S3 bucket for grading purposes.

Note: you may need to click on “Save” button before the upload.

!aws s3 cp ./C2_W3_Assignment.ipynb s3://$bucket/C2_W3_Assignment_Learner.ipynb
upload: ./C2_W3_Assignment.ipynb to s3://sagemaker-us-east-1-440427790495/C2_W3_Assignment_Learner.ipynb

Please go to the main lab window and click on Submit button (see the Finish the lab section of the instructions).