In this lab, you will do the following:
Terminology
This notebook focuses on the following features of Amazon SageMaker Pipelines:
BERT Pipeline
The pipeline that you will create follows a typical machine learning application pattern of pre-processing, training, evaluation, and model registration.
In the processing step, you will perform feature engineering to transform the review_body
text into BERT embeddings using the pre-trained BERT model and split the dataset into train, validation and test files. The transformed dataset is stored in a feature store. To optimize for Tensorflow training, the transformed dataset files are saved using the TFRecord format in Amazon S3.
In the training step, you will fine-tune the BERT model to the customer reviews dataset and add a new classification layer to predict the sentiment
for a given review_body
.
In the evaluation step, you will take the trained model and a test dataset as input, and produce a JSON file containing classification evaluation metrics.
In the condition step, you will register the trained model if the accuracy of the model, as determined by our evaluation step, exceeds a given threshold value.
First, install the required modules.
# please ignore warning messages during the installation
!pip install --disable-pip-version-check -q sagemaker==2.35.0
!pip install -q protobuf==3.20.*
[33mDEPRECATION: pyodbc 4.0.0-unsupported has a non-standard version number. pip 23.3 will enforce this behaviour change. A possible replacement is to upgrade to a newer version of pyodbc or contact the author to suggest that they release a version with a conforming version number. Discussion can be found at https://github.com/pypa/pip/issues/12063[0m[33m
[0m[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
sparkmagic 0.20.4 requires nest-asyncio==1.5.5, but you have nest-asyncio 1.5.8 which is incompatible.[0m[31m
[0m[33mWARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv[0m[33m
[0m[33mDEPRECATION: pyodbc 4.0.0-unsupported has a non-standard version number. pip 23.3 will enforce this behaviour change. A possible replacement is to upgrade to a newer version of pyodbc or contact the author to suggest that they release a version with a conforming version number. Discussion can be found at https://github.com/pypa/pip/issues/12063[0m[33m
[0m[33mWARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv[0m[33m
[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m23.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
import os
import sagemaker
import logging
import boto3
import sagemaker
import pandas as pd
import json
import botocore
from botocore.exceptions import ClientError
config = botocore.config.Config(user_agent_extra='dlai-pds/c2/w3')
# low-level service client of the boto3 session
sm = boto3.client(service_name='sagemaker',
config=config)
sm_runtime = boto3.client('sagemaker-runtime',
config=config)
sess = sagemaker.Session(sagemaker_client=sm,
sagemaker_runtime_client=sm_runtime)
bucket = sess.default_bucket()
role = sagemaker.get_execution_role()
region = sess.boto_region_name
/opt/conda/lib/python3.7/site-packages/boto3/compat.py:82: PythonDeprecationWarning: Boto3 will no longer support Python 3.7 starting December 13, 2023. To continue receiving service updates, bug fixes, and security updates please upgrade to Python 3.8 or later. More information can be found here: https://aws.amazon.com/blogs/developer/python-support-policy-updates-for-aws-sdks-and-tools/
warnings.warn(warning, PythonDeprecationWarning)
Setup the pipeline name.
import time
timestamp = int(time.time())
pipeline_name = 'BERT-pipeline-{}'.format(timestamp)
The raw dataset is in the public S3 bucket. Let’s start by specifying the S3 location of it:
raw_input_data_s3_uri = 's3://dlai-practical-data-science/data/raw/'
print(raw_input_data_s3_uri)
s3://dlai-practical-data-science/data/raw/
List the files in the S3 bucket (in this case it will be just one file):
!aws s3 ls $raw_input_data_s3_uri
2021-04-30 02:21:06 8457214 womens_clothing_ecommerce_reviews.csv
For the pipeline workflow you will need to create workflow parameters of a specific type: integer, string, or float.
from sagemaker.workflow.parameters import (
ParameterInteger,
ParameterString,
ParameterFloat,
)
Now set the parameters for the processing step.
processing_instance_type = ParameterString(
name="ProcessingInstanceType",
default_value="ml.c5.2xlarge"
)
processing_instance_count = ParameterInteger(
name="ProcessingInstanceCount",
default_value=1
)
train_split_percentage = ParameterFloat(
name="TrainSplitPercentage",
default_value=0.90,
)
validation_split_percentage = ParameterFloat(
name="ValidationSplitPercentage",
default_value=0.05,
)
test_split_percentage = ParameterFloat(
name="TestSplitPercentage",
default_value=0.05,
)
balance_dataset = ParameterString(
name="BalanceDataset",
default_value="True",
)
max_seq_length = ParameterInteger(
name="MaxSeqLength",
default_value=128,
)
feature_store_offline_prefix = ParameterString(
name="FeatureStoreOfflinePrefix",
default_value="reviews-feature-store-" + str(timestamp),
)
feature_group_name = ParameterString(
name="FeatureGroupName",
default_value="reviews-feature-group-" + str(timestamp)
)
input_data = ParameterString(
name="InputData",
default_value=raw_input_data_s3_uri,
)
Setting up scikit-learn-based processor, pass the SageMaker execution role, processing instance type and instance count.
from sagemaker.sklearn.processing import SKLearnProcessor
processor = SKLearnProcessor(
framework_version='0.23-1',
role=role,
instance_type=processing_instance_type,
instance_count=processing_instance_count,
env={'AWS_DEFAULT_REGION': region},
)
Now you will use the processor instance to construct a ProcessingStep
, along with the input and output channels and the code that will be executed when the pipeline invokes pipeline execution. This is very similar to a processor instance’s run
method, for those familiar with the existing Python SDK.
Note the "sentiment-train"
, "sentiment-validation"
and "sentiment-test"
named channels specified in the output configuration for the processing job. Such step Properties
can be used in subsequent steps and will resolve to their runtime values at execution. In particular, you will call out this usage defining the training step.
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
processing_inputs=[
ProcessingInput(
input_name='raw-input-data',
source=input_data,
destination='/opt/ml/processing/input/data/',
s3_data_distribution_type='ShardedByS3Key'
)
]
processing_outputs=[
ProcessingOutput(output_name='sentiment-train',
source='/opt/ml/processing/output/sentiment/train',
s3_upload_mode='EndOfJob'),
ProcessingOutput(output_name='sentiment-validation',
source='/opt/ml/processing/output/sentiment/validation',
s3_upload_mode='EndOfJob'),
ProcessingOutput(output_name='sentiment-test',
source='/opt/ml/processing/output/sentiment/test',
s3_upload_mode='EndOfJob')
]
processing_step = ProcessingStep(
name='Processing',
code='src/prepare_data.py',
processor=processor,
inputs=processing_inputs,
outputs=processing_outputs,
job_arguments=['--train-split-percentage', str(train_split_percentage.default_value),
'--validation-split-percentage', str(validation_split_percentage.default_value),
'--test-split-percentage', str(test_split_percentage.default_value),
'--balance-dataset', str(balance_dataset.default_value),
'--max-seq-length', str(max_seq_length.default_value),
'--feature-store-offline-prefix', str(feature_store_offline_prefix.default_value),
'--feature-group-name', str(feature_group_name.default_value)
]
)
print(processing_step)
ProcessingStep(name='Processing', step_type=<StepTypeEnum.PROCESSING: 'Processing'>)
Now you can call out the properties of the processing job as an object using the command processing_step.properties
. To print out and explore the attributes use __dict__
method.
# print out the list of the processing job properties
print(json.dumps(
processing_step.properties.__dict__,
indent=4, sort_keys=True, default=str
))
{
"AppSpecification": "<sagemaker.workflow.properties.Properties object at 0x7f618d0e53d0>",
"AutoMLJobArn": "<sagemaker.workflow.properties.Properties object at 0x7f618d285250>",
"CreationTime": "<sagemaker.workflow.properties.Properties object at 0x7f618d2fe290>",
"Environment": "<sagemaker.workflow.properties.Properties object at 0x7f618d0e5b50>",
"ExitMessage": "<sagemaker.workflow.properties.Properties object at 0x7f618d0e5f90>",
"ExperimentConfig": "<sagemaker.workflow.properties.Properties object at 0x7f618d0e5950>",
"FailureReason": "<sagemaker.workflow.properties.Properties object at 0x7f618d0e5fd0>",
"LastModifiedTime": "<sagemaker.workflow.properties.Properties object at 0x7f618d2fe850>",
"MonitoringScheduleArn": "<sagemaker.workflow.properties.Properties object at 0x7f618d285110>",
"NetworkConfig": "<sagemaker.workflow.properties.Properties object at 0x7f618d0e5750>",
"ProcessingEndTime": "<sagemaker.workflow.properties.Properties object at 0x7f618d0e5b90>",
"ProcessingInputs": "<sagemaker.workflow.properties.PropertiesList object at 0x7f618d0d5f90>",
"ProcessingJobArn": "<sagemaker.workflow.properties.Properties object at 0x7f618d0e5e90>",
"ProcessingJobName": "<sagemaker.workflow.properties.Properties object at 0x7f618d0d5b50>",
"ProcessingJobStatus": "<sagemaker.workflow.properties.Properties object at 0x7f618d0e5d90>",
"ProcessingOutputConfig": "<sagemaker.workflow.properties.Properties object at 0x7f618d0d5b90>",
"ProcessingResources": "<sagemaker.workflow.properties.Properties object at 0x7f618d0e5410>",
"ProcessingStartTime": "<sagemaker.workflow.properties.Properties object at 0x7f618d3b3350>",
"RoleArn": "<sagemaker.workflow.properties.Properties object at 0x7f618d0e59d0>",
"StoppingCondition": "<sagemaker.workflow.properties.Properties object at 0x7f618d0e5810>",
"TrainingJobArn": "<sagemaker.workflow.properties.Properties object at 0x7f618d285210>",
"_path": "Steps.Processing",
"_shape_name": "DescribeProcessingJobResponse"
}
Pull the channel sentiment-train
from the output configuration of the processing job. Print out the attributes of the resulting object:
print(json.dumps(
processing_step.properties.ProcessingOutputConfig.Outputs['sentiment-train'].__dict__,
indent=4, sort_keys=True, default=str
))
{
"AppManaged": "<sagemaker.workflow.properties.Properties object at 0x7f618d100890>",
"FeatureStoreOutput": "<sagemaker.workflow.properties.Properties object at 0x7f618d1008d0>",
"OutputName": "<sagemaker.workflow.properties.Properties object at 0x7f618d100a90>",
"S3Output": "<sagemaker.workflow.properties.Properties object at 0x7f618d1007d0>",
"_path": "Steps.Processing.ProcessingOutputConfig.Outputs['sentiment-train']",
"_shape_name": "ProcessingOutput"
}
Now you can pull and print out attributes of the S3 output path related to the sentiment-train
output channel:
print(json.dumps(
processing_step.properties.ProcessingOutputConfig.Outputs['sentiment-train'].S3Output.S3Uri.__dict__,
indent=4, sort_keys=True, default=str
))
{
"__str__": "S3Uri",
"_path": "Steps.Processing.ProcessingOutputConfig.Outputs['sentiment-train'].S3Output.S3Uri",
"_shape_name": "S3Uri"
}
Pull and print out attributes of the S3 output path object related to the sentiment-test
output channel.
Instructions: Use the example in the cell above.
print(json.dumps(
### BEGIN SOLUTION - DO NOT delete this comment for grading purposes
processing_step.properties.ProcessingOutputConfig.Outputs["sentiment-test"].S3Output.S3Uri.__dict__,
### END SOLUTION - DO NOT delete this comment for grading purposes
indent=4, sort_keys=True, default=str
))
{
"__str__": "S3Uri",
"_path": "Steps.Processing.ProcessingOutputConfig.Outputs['sentiment-test'].S3Output.S3Uri",
"_shape_name": "S3Uri"
}
These objects can be passed into the next steps of the workflow. Also, you can pull the arguments of the processing step with the corresponding function. The result is in the dictionary format. Review the keys of this dictionary:
processing_step.arguments.keys()
dict_keys(['ProcessingResources', 'AppSpecification', 'RoleArn', 'ProcessingInputs', 'ProcessingOutputConfig', 'Environment'])
Pull and review processing inputs from the arguments of the processing step:
processing_step.arguments['ProcessingInputs']
[{'InputName': 'raw-input-data',
'AppManaged': False,
'S3Input': {'S3Uri': ParameterString(name='InputData', parameter_type=<ParameterTypeEnum.STRING: 'String'>, default_value='s3://dlai-practical-data-science/data/raw/'),
'LocalPath': '/opt/ml/processing/input/data/',
'S3DataType': 'S3Prefix',
'S3InputMode': 'File',
'S3DataDistributionType': 'ShardedByS3Key',
'S3CompressionType': 'None'}},
{'InputName': 'code',
'AppManaged': False,
'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-440427790495/sagemaker-scikit-learn-2023-10-24-04-15-46-150/input/code/prepare_data.py',
'LocalPath': '/opt/ml/processing/input/code',
'S3DataType': 'S3Prefix',
'S3InputMode': 'File',
'S3DataDistributionType': 'FullyReplicated',
'S3CompressionType': 'None'}}]
Pull and review configuration of the processing outputs from the arguments of the processing step.
Instructions: Find the required key in the arguments
dictionary and pull the corresponding value following the example above.
### BEGIN SOLUTION - DO NOT delete this comment for grading purposes
processing_step.arguments["ProcessingOutputConfig"]
### END SOLUTION - DO NOT delete this comment for grading purposes
{'Outputs': [{'OutputName': 'sentiment-train',
'AppManaged': False,
'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-440427790495/sagemaker-scikit-learn-2023-10-24-04-15-39-910/output/sentiment-train',
'LocalPath': '/opt/ml/processing/output/sentiment/train',
'S3UploadMode': 'EndOfJob'}},
{'OutputName': 'sentiment-validation',
'AppManaged': False,
'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-440427790495/sagemaker-scikit-learn-2023-10-24-04-15-39-910/output/sentiment-validation',
'LocalPath': '/opt/ml/processing/output/sentiment/validation',
'S3UploadMode': 'EndOfJob'}},
{'OutputName': 'sentiment-test',
'AppManaged': False,
'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-440427790495/sagemaker-scikit-learn-2023-10-24-04-15-39-910/output/sentiment-test',
'LocalPath': '/opt/ml/processing/output/sentiment/test',
'S3UploadMode': 'EndOfJob'}}]}
Setup the parameters for the workflow.
freeze_bert_layer = ParameterString(
name="FreezeBertLayer",
default_value="False",
)
epochs = ParameterInteger(
name="Epochs",
default_value=3
)
learning_rate = ParameterFloat(
name="LearningRate",
default_value=0.00001
)
train_batch_size = ParameterInteger(
name="TrainBatchSize",
default_value=64
)
train_steps_per_epoch = ParameterInteger(
name="TrainStepsPerEpoch",
default_value=50
)
validation_batch_size = ParameterInteger(
name="ValidationBatchSize",
default_value=64
)
validation_steps_per_epoch = ParameterInteger(
name="ValidationStepsPerEpoch",
default_value=50
)
seed = ParameterInteger(
name="Seed",
default_value=42
)
run_validation = ParameterString(
name="RunValidation",
default_value="True",
)
train_instance_count = ParameterInteger(
name="TrainInstanceCount",
default_value=1
)
train_instance_type = ParameterString(
name="TrainInstanceType",
default_value="ml.c5.9xlarge"
)
train_volume_size = ParameterInteger(
name="TrainVolumeSize",
default_value=256
)
input_mode = ParameterString(
name="InputMode",
default_value="File",
)
Setup the dictionary that will be passed into the hyperparameters argument.
hyperparameters={
'max_seq_length': max_seq_length,
'freeze_bert_layer': freeze_bert_layer,
'epochs': epochs,
'learning_rate': learning_rate,
'train_batch_size': train_batch_size,
'train_steps_per_epoch': train_steps_per_epoch,
'validation_batch_size': validation_batch_size,
'validation_steps_per_epoch': validation_steps_per_epoch,
'seed': seed,
'run_validation': run_validation
}
Choose loss and accuracy as the evaluation metrics.
metric_definitions = [
{'Name': 'validation:loss', 'Regex': 'val_loss: ([0-9.]+)'},
{'Name': 'validation:accuracy', 'Regex': 'val_acc: ([0-9.]+)'},
]
For example, these sample log lines…
[step: 100] val_loss: 0.55 - val_acc: 74.64%
…will produce the following metrics in CloudWatch:
validation:loss
= 0.55
validation:accuracy
= 74.64
PyTorchEstimator
Configure an estimator and the input dataset. A typical training script loads data from the input channels, configures training with hyperparameters, trains a model, and saves a model to model_dir
so that it can be hosted later.
from sagemaker.pytorch import PyTorch as PyTorchEstimator
estimator = PyTorchEstimator(
entry_point='train.py',
source_dir='src',
role=role,
instance_count=train_instance_count,
instance_type=train_instance_type,
volume_size=train_volume_size,
py_version='py3',
framework_version='1.6.0',
hyperparameters=hyperparameters,
metric_definitions=metric_definitions,
input_mode=input_mode
)
Step signature caching allows SageMaker Pipelines, before executing a step, to find a previous execution of a step that was called using the same arguments. Cache hit gets created if the previous execution is found. Then during execution instead of recomputing the step, pipelines propagates the values from the cache hit.
Timeout period is defined using ISO 8601 format, it can contain a year, month, week, day, hour, and minute value.
More details on SageMaker Pipeline step caching can be found here.
from sagemaker.workflow.steps import CacheConfig
cache_config = CacheConfig(enable_caching=True, expire_after="PT1H") # PT1H represents `one hour`
TrainingStep
Now configure the TrainingStep
calling the outputs of the processing step:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
training_step = TrainingStep(
name='Train',
estimator=estimator,
inputs={
'train': TrainingInput(
s3_data=processing_step.properties.ProcessingOutputConfig.Outputs[
'sentiment-train'
].S3Output.S3Uri,
content_type='text/csv'
),
'validation': TrainingInput(
s3_data=processing_step.properties.ProcessingOutputConfig.Outputs[
'sentiment-validation'
].S3Output.S3Uri,
content_type='text/csv'
)
},
cache_config=cache_config
)
print(training_step)
TrainingStep(name='Train', step_type=<StepTypeEnum.TRAINING: 'Training'>)
Use __dict__
method to print out attributes of the training step properties. Briefly review the result. The attributes match the object model of the DescribeTrainingJob response object.
### BEGIN SOLUTION - DO NOT delete this comment for grading purposes
training_step.properties.__dict__
### END SOLUTION - DO NOT delete this comment for grading purposes
{'_path': 'Steps.Train',
'_shape_name': 'DescribeTrainingJobResponse',
'TrainingJobName': <sagemaker.workflow.properties.Properties at 0x7f618d1442d0>,
'TrainingJobArn': <sagemaker.workflow.properties.Properties at 0x7f618d144310>,
'TuningJobArn': <sagemaker.workflow.properties.Properties at 0x7f618d144350>,
'LabelingJobArn': <sagemaker.workflow.properties.Properties at 0x7f618d144390>,
'AutoMLJobArn': <sagemaker.workflow.properties.Properties at 0x7f618d1443d0>,
'ModelArtifacts': <sagemaker.workflow.properties.Properties at 0x7f618d144410>,
'TrainingJobStatus': <sagemaker.workflow.properties.Properties at 0x7f618d144490>,
'SecondaryStatus': <sagemaker.workflow.properties.Properties at 0x7f618d1444d0>,
'FailureReason': <sagemaker.workflow.properties.Properties at 0x7f618d144510>,
'HyperParameters': <sagemaker.workflow.properties.Properties at 0x7f618d144550>,
'AlgorithmSpecification': <sagemaker.workflow.properties.Properties at 0x7f618d144590>,
'RoleArn': <sagemaker.workflow.properties.Properties at 0x7f618d144890>,
'InputDataConfig': <sagemaker.workflow.properties.PropertiesList at 0x7f618d1448d0>,
'OutputDataConfig': <sagemaker.workflow.properties.Properties at 0x7f618d144910>,
'ResourceConfig': <sagemaker.workflow.properties.Properties at 0x7f618d144a10>,
'VpcConfig': <sagemaker.workflow.properties.Properties at 0x7f618d144bd0>,
'StoppingCondition': <sagemaker.workflow.properties.Properties at 0x7f618d144c90>,
'CreationTime': <sagemaker.workflow.properties.Properties at 0x7f618d144d50>,
'TrainingStartTime': <sagemaker.workflow.properties.Properties at 0x7f618d144d90>,
'TrainingEndTime': <sagemaker.workflow.properties.Properties at 0x7f618d144dd0>,
'LastModifiedTime': <sagemaker.workflow.properties.Properties at 0x7f618d144e10>,
'SecondaryStatusTransitions': <sagemaker.workflow.properties.PropertiesList at 0x7f618d144e50>,
'FinalMetricDataList': <sagemaker.workflow.properties.PropertiesList at 0x7f618d144e90>,
'EnableNetworkIsolation': <sagemaker.workflow.properties.Properties at 0x7f618d144ed0>,
'EnableInterContainerTrafficEncryption': <sagemaker.workflow.properties.Properties at 0x7f618d144f10>,
'EnableManagedSpotTraining': <sagemaker.workflow.properties.Properties at 0x7f618d144f50>,
'CheckpointConfig': <sagemaker.workflow.properties.Properties at 0x7f618d144f90>,
'TrainingTimeInSeconds': <sagemaker.workflow.properties.Properties at 0x7f618d148090>,
'BillableTimeInSeconds': <sagemaker.workflow.properties.Properties at 0x7f618d1480d0>,
'DebugHookConfig': <sagemaker.workflow.properties.Properties at 0x7f618d148110>,
'ExperimentConfig': <sagemaker.workflow.properties.Properties at 0x7f618d148250>,
'DebugRuleConfigurations': <sagemaker.workflow.properties.PropertiesList at 0x7f618d148390>,
'TensorBoardOutputConfig': <sagemaker.workflow.properties.Properties at 0x7f618d1483d0>,
'DebugRuleEvaluationStatuses': <sagemaker.workflow.properties.PropertiesList at 0x7f618d148490>,
'ProfilerConfig': <sagemaker.workflow.properties.Properties at 0x7f618d1484d0>,
'ProfilerRuleConfigurations': <sagemaker.workflow.properties.PropertiesList at 0x7f618d148610>,
'ProfilerRuleEvaluationStatuses': <sagemaker.workflow.properties.PropertiesList at 0x7f618d148650>,
'ProfilingStatus': <sagemaker.workflow.properties.Properties at 0x7f618d148690>,
'RetryStrategy': <sagemaker.workflow.properties.Properties at 0x7f618d1486d0>,
'Environment': <sagemaker.workflow.properties.Properties at 0x7f618d148750>,
'WarmPoolStatus': <sagemaker.workflow.properties.Properties at 0x7f618d148790>}
First, develop an evaluation script that will be specified in the model evaluation processing step. The evaluation script users the trained model and the test dataset to produce a JSON file with classification evaluation metrics such as accuracy.
After pipeline execution, you will examine the resulting evaluation.json
for analysis.
The evaluation script performs the following steps:
Create an instance of the SKLearnProcessor
to run our evaluation script as a scikit-learn-based SageMaker processing job.
from sagemaker.sklearn.processing import SKLearnProcessor
evaluation_processor = SKLearnProcessor(
framework_version='0.23-1',
role=role,
instance_type=processing_instance_type,
instance_count=processing_instance_count,
env={'AWS_DEFAULT_REGION': region},
max_runtime_in_seconds=7200
)
Setup the output PropertyFile
.
from sagemaker.workflow.properties import PropertyFile
evaluation_report = PropertyFile(
name='EvaluationReport',
output_name='metrics',
path='evaluation.json'
)
Use the processor instance to construct a ProcessingStep
, along with the input and output channels and the code that will be executed when the pipeline invokes pipeline execution. This is very similar to a processor instance’s run
method.
from sagemaker.processing import ProcessingInput, ProcessingOutput
evaluation_step = ProcessingStep(
name='EvaluateModel',
processor=evaluation_processor,
code='src/evaluate_model_metrics.py',
inputs=[
ProcessingInput(
source=training_step.properties.ModelArtifacts.S3ModelArtifacts,
destination='/opt/ml/processing/input/model'
),
ProcessingInput(
source=processing_step.properties.ProcessingOutputConfig.Outputs['sentiment-test'].S3Output.S3Uri,
destination='/opt/ml/processing/input/data'
)
],
outputs=[
ProcessingOutput(output_name='metrics',
s3_upload_mode='EndOfJob',
source='/opt/ml/processing/output/metrics/'),
],
job_arguments=[
'--max-seq-length', str(max_seq_length.default_value),
],
property_files=[evaluation_report],
)
Use the estimator instance that was used for the training step to construct an instance of RegisterModel
. The result of executing RegisterModel
in a pipeline is a model package. A model package is a reusable model artifacts abstraction that packages all ingredients necessary for inference. Primarily, it consists of an inference specification that defines the inference image to use along with an optional model weights location.
A model package group is a collection of model packages. You can create a model package group for a specific ML business problem, and you can keep adding versions/model packages into it. Typically, customers are expected to create a ModelPackageGroup for a SageMaker workflow pipeline so that they can keep adding versions/model packages to the group for every workflow pipeline run.
The construction of RegisterModel
is very similar to an estimator instance’s register
method, for those familiar with the existing Python SDK.
In particular, you will pass in the S3ModelArtifacts
from the training_step
properties.
Of note, here you will be provided a specific model package group name which will be used in the Model Registry and Continuous Integration/Continuous Deployment (CI/CD) work later on. Let’s setup the variables.
model_approval_status = ParameterString(
name="ModelApprovalStatus",
default_value="PendingManualApproval"
)
deploy_instance_type = ParameterString(
name="DeployInstanceType",
default_value="ml.m5.large"
)
deploy_instance_count = ParameterInteger(
name="DeployInstanceCount",
default_value=1
)
model_package_group_name = f"BERT-Reviews-{timestamp}"
print(model_package_group_name)
BERT-Reviews-1698120918
Configure the ModelMetrics
to be stored as metadata.
from sagemaker.model_metrics import MetricsSource, ModelMetrics
model_metrics = ModelMetrics(
model_statistics=MetricsSource(
s3_uri="{}/evaluation.json".format(
evaluation_step.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
),
content_type="application/json"
)
)
print(model_metrics)
<sagemaker.model_metrics.ModelMetrics object at 0x7f618cd031d0>
Define deployment image for inference.
inference_image_uri = sagemaker.image_uris.retrieve(
framework="pytorch",
region=region,
version="1.6.0",
py_version="py36",
instance_type=deploy_instance_type,
image_scope="inference"
)
print(inference_image_uri)
763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-inference:1.6.0-cpu-py36
Configure the register model step.
Instructions: Pass the inference image defined above into the image_uri
argument of the function RegisterModel
. Review the rest of the arguments.
from sagemaker.workflow.step_collections import RegisterModel
register_step = RegisterModel(
name="RegisterModel",
estimator=estimator,
### BEGIN SOLUTION - DO NOT delete this comment for grading purposes
image_uri=inference_image_uri,
### END SOLUTION - DO NOT delete this comment for grading purposes
model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
content_types=["application/jsonlines"],
response_types=["application/jsonlines"],
inference_instances=[deploy_instance_type],
transform_instances=[deploy_instance_type], # batch transform is not used in this lab
model_package_group_name=model_package_group_name,
approval_status=model_approval_status,
model_metrics=model_metrics
)
Configure model for deployment.
Instructions: Pass the same inference image into the image_uri
argument of the function Model
.
from sagemaker.model import Model
model_name = 'bert-model-{}'.format(timestamp)
model = Model(
name=model_name,
### BEGIN SOLUTION - DO NOT delete this comment for grading purposes
image_uri=inference_image_uri,
### END SOLUTION - DO NOT delete this comment for grading purposes
model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
sagemaker_session=sess,
role=role,
)
Now configure create model input:
from sagemaker.inputs import CreateModelInput
create_inputs = CreateModelInput(
instance_type=deploy_instance_type,
)
Configure create model step for the workflow.
Instructions: Pass defined above model (the model object, not its name) and model inputs configuration into the related arguments of the function CreateModelStep
.
from sagemaker.workflow.steps import CreateModelStep
create_step = CreateModelStep(
name="CreateModel",
### BEGIN SOLUTION - DO NOT delete this comment for grading purposes
model=model,
inputs=create_inputs,
### END SOLUTION - DO NOT delete this comment for grading purposes
)
Finally, you would like to only register this model if the accuracy of the model, as determined by our evaluation step evaluation_step
, exceeded some value. A ConditionStep
allows for pipelines to support conditional execution in the pipeline DAG based on conditions of step properties.
Below, you will:
ConditionGreaterThan
on the accuracy value found in the output of the evaluation step, evaluation_step
.ConditionStep
RegisterModel
step collection into the if_steps
of the ConditionStep
min_accuracy_value = ParameterFloat(
name="MinAccuracyValue",
default_value=0.33 # random choice from three classes
)
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import (
ConditionStep,
JsonGet,
)
minimum_accuracy_condition = ConditionGreaterThanOrEqualTo(
left=JsonGet(
step=evaluation_step,
property_file=evaluation_report,
json_path="metrics.accuracy.value",
),
right=min_accuracy_value # minimum accuracy threshold
)
minimum_accuracy_condition_step = ConditionStep(
name="AccuracyCondition",
conditions=[minimum_accuracy_condition],
if_steps=[register_step, create_step], # successfully exceeded or equaled the minimum accuracy, continue with model registration
else_steps=[], # did not exceed the minimum accuracy, the model will not be registered
)
Let’s tie it all up into a workflow pipeline so you can execute it, and even schedule it.
A pipeline requires a name
, parameters
, and steps
. Names must be unique within an (account, region)
pair so you can append the timestamp to the name to reduce the chance of name conflict.
Note:
from sagemaker.workflow.pipeline import Pipeline
pipeline = Pipeline(
name=pipeline_name,
parameters=[
input_data,
processing_instance_count,
processing_instance_type,
max_seq_length,
balance_dataset,
train_split_percentage,
validation_split_percentage,
test_split_percentage,
feature_store_offline_prefix,
feature_group_name,
epochs,
learning_rate,
train_batch_size,
train_steps_per_epoch,
validation_batch_size,
validation_steps_per_epoch,
freeze_bert_layer,
seed,
train_instance_count,
train_instance_type,
train_volume_size,
input_mode,
run_validation,
min_accuracy_value,
model_approval_status,
deploy_instance_type,
deploy_instance_count
],
steps=[processing_step, training_step, evaluation_step, minimum_accuracy_condition_step],
sagemaker_session=sess,
)
Let’s examine the JSON of the pipeline definition that meets the SageMaker Workflow Pipeline DSL specification.
By examining the definition, you are also confirming that the pipeline was well-defined, and that the parameters and step properties resolve correctly.
import json
from pprint import pprint
definition = json.loads(pipeline.definition())
pprint(definition)
No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
{'Metadata': {},
'Parameters': [{'DefaultValue': 's3://dlai-practical-data-science/data/raw/',
'Name': 'InputData',
'Type': 'String'},
{'DefaultValue': 1,
'Name': 'ProcessingInstanceCount',
'Type': 'Integer'},
{'DefaultValue': 'ml.c5.2xlarge',
'Name': 'ProcessingInstanceType',
'Type': 'String'},
{'DefaultValue': 128,
'Name': 'MaxSeqLength',
'Type': 'Integer'},
{'DefaultValue': 'True',
'Name': 'BalanceDataset',
'Type': 'String'},
{'DefaultValue': 0.9,
'Name': 'TrainSplitPercentage',
'Type': 'Float'},
{'DefaultValue': 0.05,
'Name': 'ValidationSplitPercentage',
'Type': 'Float'},
{'DefaultValue': 0.05,
'Name': 'TestSplitPercentage',
'Type': 'Float'},
{'DefaultValue': 'reviews-feature-store-1698120918',
'Name': 'FeatureStoreOfflinePrefix',
'Type': 'String'},
{'DefaultValue': 'reviews-feature-group-1698120918',
'Name': 'FeatureGroupName',
'Type': 'String'},
{'DefaultValue': 3, 'Name': 'Epochs', 'Type': 'Integer'},
{'DefaultValue': 1e-05,
'Name': 'LearningRate',
'Type': 'Float'},
{'DefaultValue': 64,
'Name': 'TrainBatchSize',
'Type': 'Integer'},
{'DefaultValue': 50,
'Name': 'TrainStepsPerEpoch',
'Type': 'Integer'},
{'DefaultValue': 64,
'Name': 'ValidationBatchSize',
'Type': 'Integer'},
{'DefaultValue': 50,
'Name': 'ValidationStepsPerEpoch',
'Type': 'Integer'},
{'DefaultValue': 'False',
'Name': 'FreezeBertLayer',
'Type': 'String'},
{'DefaultValue': 42, 'Name': 'Seed', 'Type': 'Integer'},
{'DefaultValue': 1,
'Name': 'TrainInstanceCount',
'Type': 'Integer'},
{'DefaultValue': 'ml.c5.9xlarge',
'Name': 'TrainInstanceType',
'Type': 'String'},
{'DefaultValue': 256,
'Name': 'TrainVolumeSize',
'Type': 'Integer'},
{'DefaultValue': 'File', 'Name': 'InputMode', 'Type': 'String'},
{'DefaultValue': 'True',
'Name': 'RunValidation',
'Type': 'String'},
{'DefaultValue': 0.33,
'Name': 'MinAccuracyValue',
'Type': 'Float'},
{'DefaultValue': 'PendingManualApproval',
'Name': 'ModelApprovalStatus',
'Type': 'String'},
{'DefaultValue': 'ml.m5.large',
'Name': 'DeployInstanceType',
'Type': 'String'},
{'DefaultValue': 1,
'Name': 'DeployInstanceCount',
'Type': 'Integer'}],
'Steps': [{'Arguments': {'AppSpecification': {'ContainerArguments': ['--train-split-percentage',
'0.9',
'--validation-split-percentage',
'0.05',
'--test-split-percentage',
'0.05',
'--balance-dataset',
'True',
'--max-seq-length',
'128',
'--feature-store-offline-prefix',
'reviews-feature-store-1698120918',
'--feature-group-name',
'reviews-feature-group-1698120918'],
'ContainerEntrypoint': ['python3',
'/opt/ml/processing/input/code/prepare_data.py'],
'ImageUri': '683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-scikit-learn:0.23-1-cpu-py3'},
'Environment': {'AWS_DEFAULT_REGION': 'us-east-1'},
'ProcessingInputs': [{'AppManaged': False,
'InputName': 'raw-input-data',
'S3Input': {'LocalPath': '/opt/ml/processing/input/data/',
'S3CompressionType': 'None',
'S3DataDistributionType': 'ShardedByS3Key',
'S3DataType': 'S3Prefix',
'S3InputMode': 'File',
'S3Uri': {'Get': 'Parameters.InputData'}}},
{'AppManaged': False,
'InputName': 'code',
'S3Input': {'LocalPath': '/opt/ml/processing/input/code',
'S3CompressionType': 'None',
'S3DataDistributionType': 'FullyReplicated',
'S3DataType': 'S3Prefix',
'S3InputMode': 'File',
'S3Uri': 's3://sagemaker-us-east-1-440427790495/sagemaker-scikit-learn-2023-10-24-04-18-21-035/input/code/prepare_data.py'}}],
'ProcessingOutputConfig': {'Outputs': [{'AppManaged': False,
'OutputName': 'sentiment-train',
'S3Output': {'LocalPath': '/opt/ml/processing/output/sentiment/train',
'S3UploadMode': 'EndOfJob',
'S3Uri': 's3://sagemaker-us-east-1-440427790495/sagemaker-scikit-learn-2023-10-24-04-15-39-910/output/sentiment-train'}},
{'AppManaged': False,
'OutputName': 'sentiment-validation',
'S3Output': {'LocalPath': '/opt/ml/processing/output/sentiment/validation',
'S3UploadMode': 'EndOfJob',
'S3Uri': 's3://sagemaker-us-east-1-440427790495/sagemaker-scikit-learn-2023-10-24-04-15-39-910/output/sentiment-validation'}},
{'AppManaged': False,
'OutputName': 'sentiment-test',
'S3Output': {'LocalPath': '/opt/ml/processing/output/sentiment/test',
'S3UploadMode': 'EndOfJob',
'S3Uri': 's3://sagemaker-us-east-1-440427790495/sagemaker-scikit-learn-2023-10-24-04-15-39-910/output/sentiment-test'}}]},
'ProcessingResources': {'ClusterConfig': {'InstanceCount': {'Get': 'Parameters.ProcessingInstanceCount'},
'InstanceType': {'Get': 'Parameters.ProcessingInstanceType'},
'VolumeSizeInGB': 30}},
'RoleArn': 'arn:aws:iam::440427790495:role/sagemaker-studio-vpc-firewall-us-east-1-sagemaker-execution-role'},
'Name': 'Processing',
'Type': 'Processing'},
{'Arguments': {'AlgorithmSpecification': {'EnableSageMakerMetricsTimeSeries': True,
'MetricDefinitions': [{'Name': 'validation:loss',
'Regex': 'val_loss: '
'([0-9.]+)'},
{'Name': 'validation:accuracy',
'Regex': 'val_acc: '
'([0-9.]+)'}],
'TrainingImage': '763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-training:1.6.0-cpu-py3',
'TrainingInputMode': {'Get': 'Parameters.InputMode'}},
'DebugHookConfig': {'CollectionConfigurations': [],
'S3OutputPath': 's3://sagemaker-us-east-1-440427790495/'},
'HyperParameters': {'epochs': '3',
'freeze_bert_layer': '"False"',
'learning_rate': '1e-05',
'max_seq_length': '128',
'run_validation': '"True"',
'sagemaker_container_log_level': '20',
'sagemaker_job_name': '"pytorch-training-2023-10-24-04-18-21-197"',
'sagemaker_program': '"train.py"',
'sagemaker_region': '"us-east-1"',
'sagemaker_submit_directory': '"s3://sagemaker-us-east-1-440427790495/pytorch-training-2023-10-24-04-18-21-197/source/sourcedir.tar.gz"',
'seed': '42',
'train_batch_size': '64',
'train_steps_per_epoch': '50',
'validation_batch_size': '64',
'validation_steps_per_epoch': '50'},
'InputDataConfig': [{'ChannelName': 'train',
'ContentType': 'text/csv',
'DataSource': {'S3DataSource': {'S3DataDistributionType': 'FullyReplicated',
'S3DataType': 'S3Prefix',
'S3Uri': {'Get': "Steps.Processing.ProcessingOutputConfig.Outputs['sentiment-train'].S3Output.S3Uri"}}}},
{'ChannelName': 'validation',
'ContentType': 'text/csv',
'DataSource': {'S3DataSource': {'S3DataDistributionType': 'FullyReplicated',
'S3DataType': 'S3Prefix',
'S3Uri': {'Get': "Steps.Processing.ProcessingOutputConfig.Outputs['sentiment-validation'].S3Output.S3Uri"}}}}],
'OutputDataConfig': {'S3OutputPath': 's3://sagemaker-us-east-1-440427790495/'},
'ProfilerConfig': {'S3OutputPath': 's3://sagemaker-us-east-1-440427790495/'},
'ProfilerRuleConfigurations': [{'RuleConfigurationName': 'ProfilerReport-1698121101',
'RuleEvaluatorImage': '503895931360.dkr.ecr.us-east-1.amazonaws.com/sagemaker-debugger-rules:latest',
'RuleParameters': {'rule_to_invoke': 'ProfilerReport'}}],
'ResourceConfig': {'InstanceCount': {'Get': 'Parameters.TrainInstanceCount'},
'InstanceType': {'Get': 'Parameters.TrainInstanceType'},
'VolumeSizeInGB': {'Get': 'Parameters.TrainVolumeSize'}},
'RoleArn': 'arn:aws:iam::440427790495:role/sagemaker-studio-vpc-firewall-us-east-1-sagemaker-execution-role',
'StoppingCondition': {'MaxRuntimeInSeconds': 86400}},
'CacheConfig': {'Enabled': True, 'ExpireAfter': 'PT1H'},
'Name': 'Train',
'Type': 'Training'},
{'Arguments': {'AppSpecification': {'ContainerArguments': ['--max-seq-length',
'128'],
'ContainerEntrypoint': ['python3',
'/opt/ml/processing/input/code/evaluate_model_metrics.py'],
'ImageUri': '683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-scikit-learn:0.23-1-cpu-py3'},
'Environment': {'AWS_DEFAULT_REGION': 'us-east-1'},
'ProcessingInputs': [{'AppManaged': False,
'InputName': 'input-1',
'S3Input': {'LocalPath': '/opt/ml/processing/input/model',
'S3CompressionType': 'None',
'S3DataDistributionType': 'FullyReplicated',
'S3DataType': 'S3Prefix',
'S3InputMode': 'File',
'S3Uri': {'Get': 'Steps.Train.ModelArtifacts.S3ModelArtifacts'}}},
{'AppManaged': False,
'InputName': 'input-2',
'S3Input': {'LocalPath': '/opt/ml/processing/input/data',
'S3CompressionType': 'None',
'S3DataDistributionType': 'FullyReplicated',
'S3DataType': 'S3Prefix',
'S3InputMode': 'File',
'S3Uri': {'Get': "Steps.Processing.ProcessingOutputConfig.Outputs['sentiment-test'].S3Output.S3Uri"}}},
{'AppManaged': False,
'InputName': 'code',
'S3Input': {'LocalPath': '/opt/ml/processing/input/code',
'S3CompressionType': 'None',
'S3DataDistributionType': 'FullyReplicated',
'S3DataType': 'S3Prefix',
'S3InputMode': 'File',
'S3Uri': 's3://sagemaker-us-east-1-440427790495/sagemaker-scikit-learn-2023-10-24-04-18-21-691/input/code/evaluate_model_metrics.py'}}],
'ProcessingOutputConfig': {'Outputs': [{'AppManaged': False,
'OutputName': 'metrics',
'S3Output': {'LocalPath': '/opt/ml/processing/output/metrics/',
'S3UploadMode': 'EndOfJob',
'S3Uri': 's3://sagemaker-us-east-1-440427790495/sagemaker-scikit-learn-2023-10-24-04-17-46-872/output/metrics'}}]},
'ProcessingResources': {'ClusterConfig': {'InstanceCount': {'Get': 'Parameters.ProcessingInstanceCount'},
'InstanceType': {'Get': 'Parameters.ProcessingInstanceType'},
'VolumeSizeInGB': 30}},
'RoleArn': 'arn:aws:iam::440427790495:role/sagemaker-studio-vpc-firewall-us-east-1-sagemaker-execution-role',
'StoppingCondition': {'MaxRuntimeInSeconds': 7200}},
'Name': 'EvaluateModel',
'PropertyFiles': [{'FilePath': 'evaluation.json',
'OutputName': 'metrics',
'PropertyFileName': 'EvaluationReport'}],
'Type': 'Processing'},
{'Arguments': {'Conditions': [{'LeftValue': {'Std:JsonGet': {'Path': 'metrics.accuracy.value',
'PropertyFile': {'Get': 'Steps.EvaluateModel.PropertyFiles.EvaluationReport'}}},
'RightValue': {'Get': 'Parameters.MinAccuracyValue'},
'Type': 'GreaterThanOrEqualTo'}],
'ElseSteps': [],
'IfSteps': [{'Arguments': {'InferenceSpecification': {'Containers': [{'Image': '763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-inference:1.6.0-cpu-py36',
'ModelDataUrl': {'Get': 'Steps.Train.ModelArtifacts.S3ModelArtifacts'}}],
'SupportedContentTypes': ['application/jsonlines'],
'SupportedRealtimeInferenceInstanceTypes': [{'Get': 'Parameters.DeployInstanceType'}],
'SupportedResponseMIMETypes': ['application/jsonlines'],
'SupportedTransformInstanceTypes': [{'Get': 'Parameters.DeployInstanceType'}]},
'ModelApprovalStatus': {'Get': 'Parameters.ModelApprovalStatus'},
'ModelMetrics': {'ModelQuality': {'Statistics': {'ContentType': 'application/json',
'S3Uri': 's3://sagemaker-us-east-1-440427790495/sagemaker-scikit-learn-2023-10-24-04-17-46-872/output/metrics/evaluation.json'}}},
'ModelPackageGroupName': 'BERT-Reviews-1698120918'},
'Name': 'RegisterModel',
'Type': 'RegisterModel'},
{'Arguments': {'ExecutionRoleArn': 'arn:aws:iam::440427790495:role/sagemaker-studio-vpc-firewall-us-east-1-sagemaker-execution-role',
'PrimaryContainer': {'Environment': {},
'Image': '763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-inference:1.6.0-cpu-py36',
'ModelDataUrl': {'Get': 'Steps.Train.ModelArtifacts.S3ModelArtifacts'}}},
'Name': 'CreateModel',
'Type': 'Model'}]},
'Name': 'AccuracyCondition',
'Type': 'Condition'}],
'Version': '2020-12-01'}
Ignore the WARNING
below
Create pipeline using the create
method and then print the Amazon Resource Name (ARN) of it.
response = pipeline.create(role_arn=role)
pipeline_arn = response["PipelineArn"]
print(pipeline_arn)
No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
arn:aws:sagemaker:us-east-1:440427790495:pipeline/BERT-pipeline-1698120918
Ignore the WARNING
^^ above ^^
Let’s submit our pipeline definition to the Amazon SageMaker Pipeline service. The role passed in will be used by the service to create all the jobs defined in the steps. You will start the pipeline using the parameters passed into the start()
function.
execution = pipeline.start(
parameters=dict(
InputData=raw_input_data_s3_uri,
ProcessingInstanceCount=1,
ProcessingInstanceType='ml.c5.2xlarge',
MaxSeqLength=128,
BalanceDataset='True',
TrainSplitPercentage=0.9,
ValidationSplitPercentage=0.05,
TestSplitPercentage=0.05,
FeatureStoreOfflinePrefix='reviews-feature-store-'+str(timestamp),
FeatureGroupName='reviews-feature-group-'+str(timestamp),
Epochs=3,
LearningRate=0.000012,
TrainBatchSize=64,
TrainStepsPerEpoch=50,
ValidationBatchSize=64,
ValidationStepsPerEpoch=64,
FreezeBertLayer='False',
Seed=42,
TrainInstanceCount=1,
TrainInstanceType='ml.c5.9xlarge',
TrainVolumeSize=256,
InputMode='File',
RunValidation='True',
MinAccuracyValue=0.01,
ModelApprovalStatus='PendingManualApproval',
DeployInstanceType='ml.m5.large',
DeployInstanceCount=1
)
)
print(execution.arn)
arn:aws:sagemaker:us-east-1:440427790495:pipeline/BERT-pipeline-1698120918/execution/7j3goj91hb3b
Now you can describe execution instance and list the steps in the execution to find out more about the execution.
from pprint import pprint
execution_run = execution.describe()
pprint(execution_run)
{'CreatedBy': {'DomainId': 'd-hxd7ruk4hbjz',
'UserProfileArn': 'arn:aws:sagemaker:us-east-1:440427790495:user-profile/d-hxd7ruk4hbjz/sagemaker-user-profile-us-east-1',
'UserProfileName': 'sagemaker-user-profile-us-east-1'},
'CreationTime': datetime.datetime(2023, 10, 24, 4, 18, 35, 584000, tzinfo=tzlocal()),
'LastModifiedBy': {'DomainId': 'd-hxd7ruk4hbjz',
'UserProfileArn': 'arn:aws:sagemaker:us-east-1:440427790495:user-profile/d-hxd7ruk4hbjz/sagemaker-user-profile-us-east-1',
'UserProfileName': 'sagemaker-user-profile-us-east-1'},
'LastModifiedTime': datetime.datetime(2023, 10, 24, 4, 18, 35, 584000, tzinfo=tzlocal()),
'PipelineArn': 'arn:aws:sagemaker:us-east-1:440427790495:pipeline/BERT-pipeline-1698120918',
'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:440427790495:pipeline/BERT-pipeline-1698120918/execution/7j3goj91hb3b',
'PipelineExecutionDisplayName': 'execution-1698121115636',
'PipelineExecutionStatus': 'Executing',
'ResponseMetadata': {'HTTPHeaders': {'content-length': '815',
'content-type': 'application/x-amz-json-1.1',
'date': 'Tue, 24 Oct 2023 04:18:37 GMT',
'x-amzn-requestid': '1abf8f02-b30a-4382-a1d5-b0c04e9d6fc1'},
'HTTPStatusCode': 200,
'RequestId': '1abf8f02-b30a-4382-a1d5-b0c04e9d6fc1',
'RetryAttempts': 0}}
Print the execution display name and its ARN:
execution_run_name = execution_run['PipelineExecutionDisplayName']
print(execution_run_name)
execution-1698121115636
pipeline_execution_arn = execution_run['PipelineExecutionArn']
print(pipeline_execution_arn)
arn:aws:sagemaker:us-east-1:440427790495:pipeline/BERT-pipeline-1698120918/execution/7j3goj91hb3b
Wait for the first step to start running and print the information about it:
import time
time.sleep(30)
execution.list_steps()
[{'StepName': 'Processing',
'StartTime': datetime.datetime(2023, 10, 24, 4, 18, 36, 93000, tzinfo=tzlocal()),
'StepStatus': 'Executing',
'AttemptCount': 0,
'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:440427790495:processing-job/pipelines-7j3goj91hb3b-Processing-gkscLfBKVl'}}}]
To get the information about the pipeline execution you can use low-level service client of the boto3 session. It is also useful for other operations that you will see below.
In the code below you will be observing the pipeline execution summary and waiting for the execution status to change from Executing
to Succeeded
.
%%time
import time
from pprint import pprint
sm = boto3.Session().client(service_name='sagemaker', region_name=region)
executions_response = sm.list_pipeline_executions(PipelineName=pipeline_name)['PipelineExecutionSummaries']
pipeline_execution_status = executions_response[0]['PipelineExecutionStatus']
print(pipeline_execution_status)
while pipeline_execution_status=='Executing':
try:
executions_response = sm.list_pipeline_executions(PipelineName=pipeline_name)['PipelineExecutionSummaries']
pipeline_execution_status = executions_response[0]['PipelineExecutionStatus']
except Exception as e:
print('Please wait...')
time.sleep(30)
pprint(executions_response)
Executing
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
[{'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:440427790495:pipeline/BERT-pipeline-1698120918/execution/7j3goj91hb3b',
'PipelineExecutionDisplayName': 'execution-1698121115636',
'PipelineExecutionStatus': 'Succeeded',
'StartTime': datetime.datetime(2023, 10, 24, 4, 18, 35, 584000, tzinfo=tzlocal())}]
CPU times: user 22 s, sys: 1.01 s, total: 23 s
Wall time: 32min 15s
Wait for the pipeline ^^ above ^^ to complete.
You can list the execution steps to check out the status and artifacts:
pipeline_execution_status = executions_response[0]['PipelineExecutionStatus']
print(pipeline_execution_status)
Succeeded
pipeline_execution_arn = executions_response[0]['PipelineExecutionArn']
print(pipeline_execution_arn)
arn:aws:sagemaker:us-east-1:440427790495:pipeline/BERT-pipeline-1698120918/execution/7j3goj91hb3b
Examine the resulting model evaluation after the pipeline completes. Download the resulting evaluation.json file from S3 and print the report.
processing_job_name = None
# pull the processing step name
for execution_step in reversed(execution.list_steps()):
if execution_step['StepName'] == 'Processing':
processing_job_name=execution_step['Metadata']['ProcessingJob']['Arn'].split('/')[-1]
# get the description of the processing job
describe_transform_processing_job_response = sm.describe_processing_job(ProcessingJobName=processing_job_name)
# get the output S3 path
transform_output_s3_uri = describe_transform_processing_job_response['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri']
print('Transform output {}'.format(transform_output_s3_uri))
Transform output s3://sagemaker-us-east-1-440427790495/sagemaker-scikit-learn-2023-10-24-04-15-39-910/output/sentiment-train
# list the files in the resulting output S3 path
!aws s3 ls --recursive $transform_output_s3_uri
2023-10-24 04:30:31 4889808 sagemaker-scikit-learn-2023-10-24-04-15-39-910/output/sentiment-train/part-algo-1-womens_clothing_ecommerce_reviews.tsv
Pull the name of the model-evaluation step and then get the S3 path of the evaluation metrics, which will contain the evaluation report.
Instructions: Find the execution step with the step name EvaluateModel
following the example above.
processing_job_name = None
for execution_step in reversed(execution.list_steps()):
### BEGIN SOLUTION - DO NOT delete this comment for grading purposes
if execution_step["StepName"] == "EvaluateModel":
### END SOLUTION - DO NOT delete this comment for grading purposes
processing_job_name=execution_step['Metadata']['ProcessingJob']['Arn'].split('/')[-1]
describe_evaluation_processing_job_response = sm.describe_processing_job(ProcessingJobName=processing_job_name)
evaluation_metrics_s3_uri = describe_evaluation_processing_job_response['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri']
print('Evaluation output {}'.format(evaluation_metrics_s3_uri))
Evaluation output s3://sagemaker-us-east-1-440427790495/sagemaker-scikit-learn-2023-10-24-04-17-46-872/output/metrics
Download the evaluation report and print the accuracy.
from pprint import pprint
evaluation_json = sagemaker.s3.S3Downloader.read_file("{}/evaluation.json".format(
evaluation_metrics_s3_uri
))
pprint(json.loads(evaluation_json))
{'metrics': {'accuracy': {'value': 0.7443365695792881}}}
Find and print the ARN and job name of the training job.
Instructions: Find the execution step with the step name Train
following the example above.
training_job_arn=None
for execution_step in execution.list_steps():
### BEGIN SOLUTION - DO NOT delete this comment for grading purposes
if execution_step["StepName"] == "Train":
### END SOLUTION - DO NOT delete this comment for grading purposes
training_job_arn = execution_step['Metadata']['TrainingJob']['Arn']
pprint(execution_step)
break
print('Training job ARN: {}'.format(training_job_arn))
training_job_name = training_job_arn.split('/')[-1]
print('Training job Name: {}'.format(training_job_name))
{'AttemptCount': 0,
'EndTime': datetime.datetime(2023, 10, 24, 4, 44, 13, 903000, tzinfo=tzlocal()),
'Metadata': {'TrainingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:440427790495:training-job/pipelines-7j3goj91hb3b-Train-BuWywrPxXU'}},
'StartTime': datetime.datetime(2023, 10, 24, 4, 30, 38, 821000, tzinfo=tzlocal()),
'StepName': 'Train',
'StepStatus': 'Succeeded'}
Training job ARN: arn:aws:sagemaker:us-east-1:440427790495:training-job/pipelines-7j3goj91hb3b-Train-BuWywrPxXU
Training job Name: pipelines-7j3goj91hb3b-Train-BuWywrPxXU
Using similar approach you can find and print the pipeline artifacts.
processing_job_name=None
training_job_name=None
import time
from sagemaker.lineage.visualizer import LineageTableVisualizer
viz = LineageTableVisualizer(sagemaker.session.Session())
for execution_step in reversed(execution.list_steps()):
pprint(execution_step)
if execution_step['StepName'] == 'Processing':
processing_job_name=execution_step['Metadata']['ProcessingJob']['Arn'].split('/')[-1]
print('Processing job name: {}'.format(processing_job_name))
display(viz.show(processing_job_name=processing_job_name))
elif execution_step['StepName'] == 'Train':
training_job_name=execution_step['Metadata']['TrainingJob']['Arn'].split('/')[-1]
print('Training job name: {}'.format(training_job_name))
display(viz.show(training_job_name=training_job_name))
else:
display(viz.show(pipeline_execution_step=execution_step))
time.sleep(5)
{'AttemptCount': 0,
'EndTime': datetime.datetime(2023, 10, 24, 4, 30, 38, 191000, tzinfo=tzlocal()),
'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:440427790495:processing-job/pipelines-7j3goj91hb3b-Processing-gkscLfBKVl'}},
'StartTime': datetime.datetime(2023, 10, 24, 4, 18, 36, 93000, tzinfo=tzlocal()),
'StepName': 'Processing',
'StepStatus': 'Succeeded'}
Processing job name: pipelines-7j3goj91hb3b-Processing-gkscLfBKVl
.dataframe tbody tr th {
vertical-align: top;
}
.dataframe thead th {
text-align: right;
}
Name/Source | Direction | Type | Association Type | Lineage Type | |
---|---|---|---|---|---|
0 | s3://...-04-18-30-260/input/code/prepare_data.py | Input | DataSet | ContributedTo | artifact |
1 | s3://dlai-practical-data-science/data/raw/ | Input | DataSet | ContributedTo | artifact |
2 | 68331...om/sagemaker-scikit-learn:0.23-1-cpu-py3 | Input | Image | ContributedTo | artifact |
3 | s3://...10-24-04-15-39-910/output/sentiment-test | Output | DataSet | Produced | artifact |
4 | s3://...04-15-39-910/output/sentiment-validation | Output | DataSet | Produced | artifact |
5 | s3://...0-24-04-15-39-910/output/sentiment-train | Output | DataSet | Produced | artifact |
{'AttemptCount': 0,
'EndTime': datetime.datetime(2023, 10, 24, 4, 44, 13, 903000, tzinfo=tzlocal()),
'Metadata': {'TrainingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:440427790495:training-job/pipelines-7j3goj91hb3b-Train-BuWywrPxXU'}},
'StartTime': datetime.datetime(2023, 10, 24, 4, 30, 38, 821000, tzinfo=tzlocal()),
'StepName': 'Train',
'StepStatus': 'Succeeded'}
Training job name: pipelines-7j3goj91hb3b-Train-BuWywrPxXU
.dataframe tbody tr th {
vertical-align: top;
}
.dataframe thead th {
text-align: right;
}
Name/Source | Direction | Type | Association Type | Lineage Type | |
---|---|---|---|---|---|
0 | s3://...04-15-39-910/output/sentiment-validation | Input | DataSet | ContributedTo | artifact |
1 | s3://...0-24-04-15-39-910/output/sentiment-train | Input | DataSet | ContributedTo | artifact |
2 | 76310...onaws.com/pytorch-training:1.6.0-cpu-py3 | Input | Image | ContributedTo | artifact |
3 | s3://...b3b-Train-BuWywrPxXU/output/model.tar.gz | Output | Model | Produced | artifact |
{'AttemptCount': 0,
'EndTime': datetime.datetime(2023, 10, 24, 4, 51, 30, 33000, tzinfo=tzlocal()),
'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:440427790495:processing-job/pipelines-7j3goj91hb3b-EvaluateModel-TzjiP6obCC'}},
'StartTime': datetime.datetime(2023, 10, 24, 4, 44, 15, 150000, tzinfo=tzlocal()),
'StepName': 'EvaluateModel',
'StepStatus': 'Succeeded'}
.dataframe tbody tr th {
vertical-align: top;
}
.dataframe thead th {
text-align: right;
}
Name/Source | Direction | Type | Association Type | Lineage Type | |
---|---|---|---|---|---|
0 | s3://...596/input/code/evaluate_model_metrics.py | Input | DataSet | ContributedTo | artifact |
1 | s3://...10-24-04-15-39-910/output/sentiment-test | Input | DataSet | ContributedTo | artifact |
2 | s3://...b3b-Train-BuWywrPxXU/output/model.tar.gz | Input | Model | ContributedTo | artifact |
3 | 68331...om/sagemaker-scikit-learn:0.23-1-cpu-py3 | Input | Image | ContributedTo | artifact |
4 | s3://...n-2023-10-24-04-17-46-872/output/metrics | Output | DataSet | Produced | artifact |
{'AttemptCount': 0,
'EndTime': datetime.datetime(2023, 10, 24, 4, 51, 31, 983000, tzinfo=tzlocal()),
'Metadata': {'Condition': {'Outcome': 'True'}},
'StartTime': datetime.datetime(2023, 10, 24, 4, 51, 31, 621000, tzinfo=tzlocal()),
'StepName': 'AccuracyCondition',
'StepStatus': 'Succeeded'}
None
{'AttemptCount': 0,
'EndTime': datetime.datetime(2023, 10, 24, 4, 51, 34, 630000, tzinfo=tzlocal()),
'Metadata': {'Model': {'Arn': 'arn:aws:sagemaker:us-east-1:440427790495:model/pipelines-7j3goj91hb3b-createmodel-2mt9xrqqbg'}},
'StartTime': datetime.datetime(2023, 10, 24, 4, 51, 33, 118000, tzinfo=tzlocal()),
'StepName': 'CreateModel',
'StepStatus': 'Succeeded'}
None
{'AttemptCount': 0,
'EndTime': datetime.datetime(2023, 10, 24, 4, 51, 34, 605000, tzinfo=tzlocal()),
'Metadata': {'RegisterModel': {'Arn': 'arn:aws:sagemaker:us-east-1:440427790495:model-package/BERT-Reviews-1698120918/1'}},
'StartTime': datetime.datetime(2023, 10, 24, 4, 51, 33, 118000, tzinfo=tzlocal()),
'StepName': 'RegisterModel',
'StepStatus': 'Succeeded'}
.dataframe tbody tr th {
vertical-align: top;
}
.dataframe thead th {
text-align: right;
}
Name/Source | Direction | Type | Association Type | Lineage Type | |
---|---|---|---|---|---|
0 | s3://...b3b-Train-BuWywrPxXU/output/model.tar.gz | Input | Model | ContributedTo | artifact |
1 | 76310...aws.com/pytorch-inference:1.6.0-cpu-py36 | Input | Image | ContributedTo | artifact |
2 | BERT-Reviews-1698120918-1-PendingManualApprova... | Input | Approval | ContributedTo | action |
3 | BERT-Reviews-1698120918-1698123094-aws-model-p... | Output | ModelGroup | AssociatedWith | context |
The pipeline created a model package version within the specified model package group and an approval status of PendingManualApproval
. This requires a separate step to manually approve the model before deploying to production.
You can approve the model using the SageMaker Studio UI or programmatically as shown below.
Get the model package ARN.
for execution_step in execution.list_steps():
if execution_step['StepName'] == 'RegisterModel':
model_package_arn = execution_step['Metadata']['RegisterModel']['Arn']
break
print(model_package_arn)
arn:aws:sagemaker:us-east-1:440427790495:model-package/BERT-Reviews-1698120918/1
Update the model package with the Approved
status to prepare for deployment.
The model must be Approved
before it can be deployed.
model_package_update_response = sm.update_model_package(
ModelPackageArn=model_package_arn,
ModelApprovalStatus="Approved",
)
pprint(model_package_update_response)
{'ModelPackageArn': 'arn:aws:sagemaker:us-east-1:440427790495:model-package/BERT-Reviews-1698120918/1',
'ResponseMetadata': {'HTTPHeaders': {'content-length': '102',
'content-type': 'application/x-amz-json-1.1',
'date': 'Tue, 24 Oct 2023 04:51:59 GMT',
'x-amzn-requestid': 'e41e1142-7749-44dd-ac9e-92f11e322fad'},
'HTTPStatusCode': 200,
'RequestId': 'e41e1142-7749-44dd-ac9e-92f11e322fad',
'RetryAttempts': 0}}
Get the model ARN and the model name from it.
for execution_step in execution.list_steps():
print(execution_step['StepName'])
if execution_step['StepName'] == 'CreateModel':
model_arn = execution_step['Metadata']['Model']['Arn']
break
print(model_arn)
model_name = model_arn.split('/')[-1]
print(model_name)
RegisterModel
CreateModel
arn:aws:sagemaker:us-east-1:440427790495:model/pipelines-7j3goj91hb3b-createmodel-2mt9xrqqbg
pipelines-7j3goj91hb3b-createmodel-2mt9xrqqbg
Configure the endpoint.
endpoint_config_name = 'bert-model-epc-{}'.format(timestamp)
print(endpoint_config_name)
create_endpoint_config_response = sm.create_endpoint_config(
EndpointConfigName = endpoint_config_name,
ProductionVariants=[{
'InstanceType':'ml.m5.xlarge',
'InitialVariantWeight':1,
'InitialInstanceCount':1,
'ModelName': model_name,
'VariantName':'AllTraffic'}])
bert-model-epc-1698120918
Create the endpoint.
pipeline_endpoint_name = 'bert-model-ep-{}'.format(timestamp)
print("EndpointName={}".format(pipeline_endpoint_name))
create_endpoint_response = sm.create_endpoint(
EndpointName=pipeline_endpoint_name,
EndpointConfigName=endpoint_config_name)
print(create_endpoint_response['EndpointArn'])
EndpointName=bert-model-ep-1698120918
arn:aws:sagemaker:us-east-1:440427790495:endpoint/bert-model-ep-1698120918
from IPython.core.display import display, HTML
display(HTML('<b>Review <a target="blank" href="https://console.aws.amazon.com/sagemaker/home?region={}#/endpoints/{}">SageMaker REST Endpoint</a></b>'.format(region, pipeline_endpoint_name)))
Review SageMaker REST Endpoint
Wait until the endpoint is deployed.
%%time
while True:
try:
waiter = sm.get_waiter('endpoint_in_service')
print('Waiting for endpoint to be in `InService`...')
waiter.wait(EndpointName=pipeline_endpoint_name)
break;
except:
print('Waiting for endpoint...')
endpoint_status = sm.describe_endpoint(EndpointName=pipeline_endpoint_name)['EndpointStatus']
print('Endpoint status: {}'.format(endpoint_status))
if endpoint_status == 'Failed':
break
time.sleep(30)
print('Endpoint deployed.')
Waiting for endpoint to be in `InService`...
Endpoint deployed.
CPU times: user 51.9 ms, sys: 15.9 ms, total: 67.9 ms
Wall time: 4min 31s
Wait until the endpoint ^^ above ^^ is deployed.
Predict the sentiment
with review_body
samples and review the result:
from sagemaker.predictor import Predictor
from sagemaker.serializers import JSONLinesSerializer
from sagemaker.deserializers import JSONLinesDeserializer
inputs = [
{"features": ["I love this product!"]},
{"features": ["OK, but not great."]},
{"features": ["This is not the right product."]},
]
predictor = Predictor(
endpoint_name=pipeline_endpoint_name,
serializer=JSONLinesSerializer(),
deserializer=JSONLinesDeserializer(),
sagemaker_session=sess
)
predicted_classes = predictor.predict(inputs)
for predicted_class in predicted_classes:
print("Predicted class {} with probability {}".format(predicted_class['predicted_label'], predicted_class['probability']))
Predicted class 1 with probability 0.9319869875907898
Predicted class 0 with probability 0.3933015763759613
Predicted class -1 with probability 0.6994612812995911
SageMaker Studio provides a rich set of features to visually inspect SageMaker resources including pipelines, training jobs, and endpoints. Please take time to explore it opening the facet shown in the following image.
Congratulations! You have just deployed an end-to-end pipeline with BERT and SageMaker Pipelines.
Upload the notebook into S3 bucket for grading purposes.
Note: you may need to click on “Save” button before the upload.
!aws s3 cp ./C2_W3_Assignment.ipynb s3://$bucket/C2_W3_Assignment_Learner.ipynb
upload: ./C2_W3_Assignment.ipynb to s3://sagemaker-us-east-1-440427790495/C2_W3_Assignment_Learner.ipynb
Please go to the main lab window and click on Submit
button (see the Finish the lab
section of the instructions).