In this lab you will create your own human workforce, a human task UI, and then define the human review workflow to perform data labeling. You will make the original predictions of the labels with the custom ML model, and then create a human loop if the probability scores are lower than the preset threshold. After the completion of the human loop tasks, you will review the results and prepare data for re-training.
Let’s install and import the required modules.
# please ignore warning messages during the installation
!pip install --disable-pip-version-check -q sagemaker==2.35.0
!pip install -q protobuf==3.20.*
[33mDEPRECATION: pyodbc 4.0.0-unsupported has a non-standard version number. pip 23.3 will enforce this behaviour change. A possible replacement is to upgrade to a newer version of pyodbc or contact the author to suggest that they release a version with a conforming version number. Discussion can be found at https://github.com/pypa/pip/issues/12063[0m[33m
[0m[33mWARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv[0m[33m
[0m[33mDEPRECATION: pyodbc 4.0.0-unsupported has a non-standard version number. pip 23.3 will enforce this behaviour change. A possible replacement is to upgrade to a newer version of pyodbc or contact the author to suggest that they release a version with a conforming version number. Discussion can be found at https://github.com/pypa/pip/issues/12063[0m[33m
[0m[33mWARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv[0m[33m
[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m23.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
import boto3
import sagemaker
import pandas as pd
from pprint import pprint
import botocore
config = botocore.config.Config(user_agent_extra='dlai-pds/c3/w3')
# low-level service client of the boto3 session
sm = boto3.client(service_name='sagemaker',
config=config)
sm_runtime = boto3.client('sagemaker-runtime',
config=config)
sess = sagemaker.Session(sagemaker_client=sm,
sagemaker_runtime_client=sm_runtime)
bucket = sess.default_bucket()
role = sagemaker.get_execution_role()
region = sess.boto_region_name
s3 = boto3.Session().client(service_name='s3',
config=config)
cognito_idp = boto3.Session().client(service_name='cognito-idp',
config=config)
a2i = boto3.Session().client(service_name='sagemaker-a2i-runtime',
config=config)
/opt/conda/lib/python3.7/site-packages/boto3/compat.py:82: PythonDeprecationWarning: Boto3 will no longer support Python 3.7 starting December 13, 2023. To continue receiving service updates, bug fixes, and security updates please upgrade to Python 3.8 or later. More information can be found here: https://aws.amazon.com/blogs/developer/python-support-policy-updates-for-aws-sdks-and-tools/
warnings.warn(warning, PythonDeprecationWarning)
The first step in the creation of the human-in-the-loop pipeline will be to create your own private workforce.
Amazon Cognito provides authentication, authorization, and user management for apps. This enables your workers to sign in directly to the labeling UI with a username and password.
You will construct an Amazon Cognito user pool, setting up its client, domain, and group. Then you’ll create a SageMaker workforce, linking it to the Cognito user pool. Followed by the creation of a SageMaker workteam, linking it to the Cognito user pool and group. And finally, you will create a pool user and add it to the group.
To get started, let’s construct the user pool and user pool client names.
import time
timestamp = int(time.time())
user_pool_name = 'groundtruth-user-pool-{}'.format(timestamp)
user_pool_client_name = 'groundtruth-user-pool-client-{}'.format(timestamp)
print("Amazon Cognito user pool name: {}".format(user_pool_name))
print("Amazon Cognito user pool client name: {}".format(user_pool_client_name))
Amazon Cognito user pool name: groundtruth-user-pool-1698229936
Amazon Cognito user pool client name: groundtruth-user-pool-client-1698229936
Function cognito_idp.create_user_pool
creates a new Amazon Cognito user pool. Passing the function result into a variable you can get the information about the response. The result is in dictionary format.
create_user_pool_response = cognito_idp.create_user_pool(PoolName=user_pool_name)
user_pool_id = create_user_pool_response['UserPool']['Id']
print("Amazon Cognito user pool ID: {}".format(user_pool_id))
Amazon Cognito user pool ID: us-east-1_fTkk2p8PL
Pull the Amazon Cognito user pool name from its description.
Instructions: Print the keys of the user pool, choose the one that corresponds to the name and print its value.
print(create_user_pool_response['UserPool'].keys())
dict_keys(['Id', 'Name', 'Policies', 'DeletionProtection', 'LambdaConfig', 'LastModifiedDate', 'CreationDate', 'SchemaAttributes', 'VerificationMessageTemplate', 'UserAttributeUpdateSettings', 'MfaConfiguration', 'EstimatedNumberOfUsers', 'EmailConfiguration', 'AdminCreateUserConfig', 'Arn', 'AccountRecoverySetting'])
### BEGIN SOLUTION - DO NOT delete this comment for grading purposes
user_pool_name = create_user_pool_response["UserPool"]["Name"]
### END SOLUTION - DO NOT delete this comment for grading purposes
print('Amazon Cognito user pool name: {}'.format(user_pool_name))
Amazon Cognito user pool name: groundtruth-user-pool-1698229936
Now let’s set up the Amazon Cognito user pool client for the created above user pool.
The Amazon Cognito user pool client implements an open standard for authorization framework, OAuth
. The standard enables apps to obtain limited access (scopes) to a user’s data without giving away a user’s password. It decouples authentication from authorization and supports multiple use cases addressing different device capabilities.
Create the Amazon Cognito user pool client for the constructed user pool.
Instructions: Pass the user pool ID and the user pool client name into the function cognito_idp.create_user_pool_client
. Review the other parameters of the function.
### BEGIN SOLUTION - DO NOT delete this comment for grading purposes
create_user_pool_client_response = cognito_idp.create_user_pool_client(
UserPoolId=user_pool_id,
ClientName=user_pool_client_name,
### END SOLUTION - DO NOT delete this comment for grading purposes
GenerateSecret=True, # boolean to specify whether you want to generate a secret
# a list of provider names for the identity providers that are supported on this client, e.g. Cognito, Facebook, Google
SupportedIdentityProviders=[
'COGNITO'
],
# a list of the allowed OAuth flows, e.g. code, implicit, client_credentials
AllowedOAuthFlows=[
'code',
'implicit'
],
# a list of the allowed OAuth scopes, e.g. phone, email, openid, and profile
AllowedOAuthScopes=[
'email',
'openid',
'profile'
],
# a list of allowed redirect (callback) URLs for the identity providers
CallbackURLs=[
'https://datascienceonaws.com',
],
# set to true if the client is allowed to follow the OAuth protocol when interacting with Cognito user pools
AllowedOAuthFlowsUserPoolClient=True
)
client_id = create_user_pool_client_response['UserPoolClient']['ClientId']
print('Amazon Cognito user pool client ID: {}'.format(client_id))
Amazon Cognito user pool client ID: 19apctv0pmtceunu3ttp5rpkhu
Set up the Amazon Cognito user pool domain for the constructed user pool.
Instructions: Pass the user pool ID and the user pool domain name into the function cognito_idp.create_user_pool_domain
.
user_pool_domain_name = 'groundtruth-user-pool-domain-{}'.format(timestamp)
try:
### BEGIN SOLUTION - DO NOT delete this comment for grading purposes
cognito_idp.create_user_pool_domain(
UserPoolId=user_pool_id,
Domain=user_pool_domain_name
### END SOLUTION - DO NOT delete this comment for grading purposes
)
print("Created Amazon Cognito user pool domain: {}".format(user_pool_domain_name))
except:
print("Amazon Cognito user pool domain {} already exists".format(user_pool_domain_name))
Created Amazon Cognito user pool domain: groundtruth-user-pool-domain-1698229936
You will use the following function to check if the Amazon Cognito user group already exists.
def check_user_pool_group_existence(user_pool_id, user_pool_group_name):
for group in cognito_idp.list_groups(UserPoolId=user_pool_id)['Groups']:
if user_pool_group_name == group['GroupName']:
return True
return False
Set up Amazon Cognito user group.
Instructions: Pass the user pool ID and the user group name into the function cognito_idp.create_group
.
user_pool_group_name = 'groundtruth-user-pool-group-{}'.format(timestamp)
if not check_user_pool_group_existence(user_pool_id, user_pool_group_name):
### BEGIN SOLUTION - DO NOT delete this comment for grading purposes
cognito_idp.create_group(
UserPoolId=user_pool_id,
GroupName=user_pool_group_name
### END SOLUTION - DO NOT delete this comment for grading purposes
)
print("Created Amazon Cognito user group: {}".format(user_pool_group_name))
else:
print("Amazon Cognito user group {} already exists".format(user_pool_group_name))
Created Amazon Cognito user group: groundtruth-user-pool-group-1698229936
Use the following function to check if the workforce already exists. You can only create one workforce per region, therefore you’ll have to delete any other existing workforce, together with all of the related workteams.
def check_workforce_existence(workforce_name):
for workforce in sm.list_workforces()['Workforces']:
if workforce_name == workforce['WorkforceName']:
return True
else:
for workteam in sm.list_workteams()['Workteams']:
sm.delete_workteam(WorkteamName=workteam['WorkteamName'])
sm.delete_workforce(WorkforceName=workforce['WorkforceName'])
return False
Create a workforce.
Instructions: Pass the Amazon Cognito user pool ID and client ID into the Cognito configuration of the function sm.create_workforce
.
workforce_name = 'groundtruth-workforce-name-{}'.format(timestamp)
if not check_workforce_existence(workforce_name):
create_workforce_response = sm.create_workforce(
WorkforceName=workforce_name,
CognitoConfig={
### BEGIN SOLUTION - DO NOT delete this comment for grading purposes
'UserPool': user_pool_id,
'ClientId': client_id
### END SOLUTION - DO NOT delete this comment for grading purposes
}
)
print("Workforce name: {}".format(workforce_name))
pprint(create_workforce_response)
else:
print("Workforce {} already exists".format(workforce_name))
Workforce name: groundtruth-workforce-name-1698229936
{'ResponseMetadata': {'HTTPHeaders': {'content-length': '107',
'content-type': 'application/x-amz-json-1.1',
'date': 'Wed, 25 Oct 2023 10:32:19 GMT',
'x-amzn-requestid': '2683e1a6-6b5f-4ce7-8a20-e4498374d9ef'},
'HTTPStatusCode': 200,
'RequestId': '2683e1a6-6b5f-4ce7-8a20-e4498374d9ef',
'RetryAttempts': 0},
'WorkforceArn': 'arn:aws:sagemaker:us-east-1:657781576110:workforce/groundtruth-workforce-name-1698229936'}
You can use the sm.describe_workforce
function to get the information about the workforce.
describe_workforce_response = sm.describe_workforce(WorkforceName=workforce_name)
describe_workforce_response
{'Workforce': {'WorkforceName': 'groundtruth-workforce-name-1698229936',
'WorkforceArn': 'arn:aws:sagemaker:us-east-1:657781576110:workforce/groundtruth-workforce-name-1698229936',
'LastUpdatedDate': datetime.datetime(2023, 10, 25, 10, 32, 20, 42000, tzinfo=tzlocal()),
'SourceIpConfig': {'Cidrs': []},
'SubDomain': '9nj1o7kppv.labeling.us-east-1.sagemaker.aws',
'CognitoConfig': {'UserPool': 'us-east-1_fTkk2p8PL',
'ClientId': '19apctv0pmtceunu3ttp5rpkhu'},
'CreateDate': datetime.datetime(2023, 10, 25, 10, 32, 19, 503000, tzinfo=tzlocal()),
'Status': 'Initializing'},
'ResponseMetadata': {'RequestId': '6ec6701f-ff6a-4c40-9b84-581bb404bedc',
'HTTPStatusCode': 200,
'HTTPHeaders': {'x-amzn-requestid': '6ec6701f-ff6a-4c40-9b84-581bb404bedc',
'content-type': 'application/x-amz-json-1.1',
'content-length': '445',
'date': 'Wed, 25 Oct 2023 10:32:19 GMT'},
'RetryAttempts': 0}}
Use the following function to check if the workteam already exists. If there are no workteams in the list, give some time for the workforce to set up.
def check_workteam_existence(workteam_name):
if sm.list_workteams()['Workteams']:
for workteam in sm.list_workteams()['Workteams']:
if workteam_name == workteam['WorkteamName']:
return True
else:
time.sleep(60)
return False
return False
Create a workteam.
Instructions: Pass the Amazon Cognito user pool ID, client ID, and group name into the Cognito member definition of the function sm.create_workteam
.
workteam_name = 'groundtruth-workteam-{}'.format(timestamp)
if not check_workteam_existence(workteam_name):
create_workteam_response = sm.create_workteam(
Description='groundtruth workteam',
WorkforceName=workforce_name,
WorkteamName=workteam_name,
# objects that identify the workers that make up the work team
MemberDefinitions=[{
'CognitoMemberDefinition': {
### BEGIN SOLUTION - DO NOT delete this comment for grading purposes
'UserPool': user_pool_id,
'ClientId': client_id,
'UserGroup': user_pool_group_name
### END SOLUTION - DO NOT delete this comment for grading purposes
}
}]
)
pprint(create_workteam_response)
else:
print("Workteam {} already exists".format(workteam_name))
{'ResponseMetadata': {'HTTPHeaders': {'content-length': '113',
'content-type': 'application/x-amz-json-1.1',
'date': 'Wed, 25 Oct 2023 10:33:21 GMT',
'x-amzn-requestid': '1532cd79-81ca-43ef-a4f5-c133844d0762'},
'HTTPStatusCode': 200,
'RequestId': '1532cd79-81ca-43ef-a4f5-c133844d0762',
'RetryAttempts': 0},
'WorkteamArn': 'arn:aws:sagemaker:us-east-1:657781576110:workteam/private-crowd/groundtruth-workteam-1698229936'}
You can use sm.describe_workteam
function to get information about the workteam.
describe_workteam_response = sm.describe_workteam(WorkteamName=workteam_name)
describe_workteam_response
{'Workteam': {'WorkteamName': 'groundtruth-workteam-1698229936',
'MemberDefinitions': [{'CognitoMemberDefinition': {'UserPool': 'us-east-1_fTkk2p8PL',
'UserGroup': 'groundtruth-user-pool-group-1698229936',
'ClientId': '19apctv0pmtceunu3ttp5rpkhu'}}],
'WorkteamArn': 'arn:aws:sagemaker:us-east-1:657781576110:workteam/private-crowd/groundtruth-workteam-1698229936',
'Description': 'groundtruth workteam',
'SubDomain': '9nj1o7kppv.labeling.us-east-1.sagemaker.aws',
'CreateDate': datetime.datetime(2023, 10, 25, 10, 33, 20, 678000, tzinfo=tzlocal()),
'LastUpdatedDate': datetime.datetime(2023, 10, 25, 10, 33, 21, 559000, tzinfo=tzlocal()),
'NotificationConfiguration': {}},
'ResponseMetadata': {'RequestId': '3af113f0-2c2b-4a2f-a03f-dc1b949fcd8b',
'HTTPStatusCode': 200,
'HTTPHeaders': {'x-amzn-requestid': '3af113f0-2c2b-4a2f-a03f-dc1b949fcd8b',
'content-type': 'application/x-amz-json-1.1',
'content-length': '544',
'date': 'Wed, 25 Oct 2023 10:33:21 GMT'},
'RetryAttempts': 0}}
Now you can pull the workteam ARN either from create_workteam_response
or describe_workteam_response
.
workteam_arn = describe_workteam_response['Workteam']['WorkteamArn']
workteam_arn
'arn:aws:sagemaker:us-east-1:657781576110:workteam/private-crowd/groundtruth-workteam-1698229936'
Review the created workteam in the AWS console.
Instructions:
from IPython.core.display import display, HTML
display(HTML('<b>Review <a target="blank" href="https://{}.console.aws.amazon.com/sagemaker/groundtruth?region={}#/labeling-workforces/private-details/{}">workteam</a></b>'.format(region, region, workteam_name)))
Review workteam
Use the following function to check if the Amazon Cognito user already exists.
def check_user_existence(user_pool_id, user_name):
for user in cognito_idp.list_users(UserPoolId=user_pool_id)['Users']:
if user_name == user['Username']:
return True
return False
Create a user passing the username, temporary password, and the Amazon Cognito user pool ID.
user_name = 'user-{}'.format(timestamp)
temporary_password = 'Password@420'
if not check_user_existence(user_pool_id, user_name):
create_user_response=cognito_idp.admin_create_user(
Username=user_name,
UserPoolId=user_pool_id,
TemporaryPassword=temporary_password,
MessageAction='SUPPRESS' # suppress sending the invitation message to a user that already exists
)
pprint(create_user_response)
else:
print("Amazon Cognito user {} already exists".format(user_name))
{'ResponseMetadata': {'HTTPHeaders': {'connection': 'keep-alive',
'content-length': '242',
'content-type': 'application/x-amz-json-1.1',
'date': 'Wed, 25 Oct 2023 10:33:22 GMT',
'x-amzn-requestid': 'bfefdfe3-bb02-4575-a116-88d9668c3d6f'},
'HTTPStatusCode': 200,
'RequestId': 'bfefdfe3-bb02-4575-a116-88d9668c3d6f',
'RetryAttempts': 0},
'User': {'Attributes': [{'Name': 'sub',
'Value': '0dcd26c4-77c4-4bee-850a-2dcdb1bb5a8f'}],
'Enabled': True,
'UserCreateDate': datetime.datetime(2023, 10, 25, 10, 33, 22, 181000, tzinfo=tzlocal()),
'UserLastModifiedDate': datetime.datetime(2023, 10, 25, 10, 33, 22, 181000, tzinfo=tzlocal()),
'UserStatus': 'FORCE_CHANGE_PASSWORD',
'Username': 'user-1698229936'}}
Add the user into the Amazon Cognito user group.
cognito_idp.admin_add_user_to_group(
UserPoolId=user_pool_id,
Username=user_name,
GroupName=user_pool_group_name
)
{'ResponseMetadata': {'RequestId': '9ef25502-ecbf-48da-9804-ab22965bc792',
'HTTPStatusCode': 200,
'HTTPHeaders': {'date': 'Wed, 25 Oct 2023 10:33:22 GMT',
'content-type': 'application/x-amz-json-1.1',
'content-length': '0',
'connection': 'keep-alive',
'x-amzn-requestid': '9ef25502-ecbf-48da-9804-ab22965bc792'},
'RetryAttempts': 0}}
Create a Human Task UI resource, using a worker task UI template. This template will be rendered to the human workers whenever human interaction is required.
Below there is a simple demo template provided, that is compatible with the current use case of classifying product reviews into the three sentiment classes. For other pre-built UIs (there are 70+), check: https://github.com/aws-samples/amazon-a2i-sample-task-uis
template = r"""
<script src="https://assets.crowd.aws/crowd-html-elements.js"></script>
<crowd-form>
<crowd-classifier name="sentiment"
categories="['-1', '0', '1']"
initial-value=""
header="Classify Reviews into Sentiment: -1 (negative), 0 (neutral), and 1 (positive)">
<classification-target>
</classification-target>
<full-instructions header="Classify reviews into sentiment: -1 (negative), 0 (neutral), and 1 (positive)">
<p><strong>1</strong>: joy, excitement, delight</p>
<p><strong>0</strong>: neither positive or negative, such as stating a fact</p>
<p><strong>-1</strong>: anger, sarcasm, anxiety</p>
</full-instructions>
<short-instructions>
Classify reviews into sentiment: -1 (negative), 0 (neutral), and 1 (positive)
</short-instructions>
</crowd-classifier>
</crowd-form>
"""
Create a human task UI resource.
Instructions: Pass the worker task UI template defined above as the content of the UI template parameter to the function sm.create_human_task_ui
.
# Task UI name - this value is unique per account and region. You can also provide your own value here.
task_ui_name = 'ui-{}'.format(timestamp)
human_task_ui_response = sm.create_human_task_ui(
HumanTaskUiName=task_ui_name,
UiTemplate={
### BEGIN SOLUTION - DO NOT delete this comment for grading purposes
"Content": template
### END SOLUTION - DO NOT delete this comment for grading purposes
}
)
human_task_ui_response
{'HumanTaskUiArn': 'arn:aws:sagemaker:us-east-1:657781576110:human-task-ui/ui-1698229936',
'ResponseMetadata': {'RequestId': '08e48f98-1014-44e5-9ea1-939d426d0d5b',
'HTTPStatusCode': 200,
'HTTPHeaders': {'x-amzn-requestid': '08e48f98-1014-44e5-9ea1-939d426d0d5b',
'content-type': 'application/x-amz-json-1.1',
'content-length': '89',
'date': 'Wed, 25 Oct 2023 10:33:22 GMT'},
'RetryAttempts': 0}}
Pull the ARN of the human task UI:
human_task_ui_arn = human_task_ui_response["HumanTaskUiArn"]
print(human_task_ui_arn)
arn:aws:sagemaker:us-east-1:657781576110:human-task-ui/ui-1698229936
In this section, you are going to create a Flow Definition. Flow Definitions allow you to specify:
Here you are going to use the API, but you can optionally create this workflow definition in the console as well.
For more details and instructions, see: https://docs.aws.amazon.com/sagemaker/latest/dg/a2i-create-flow-definition.html.
Let’s construct the S3 bucket output path.
output_path = 's3://{}/a2i-results-{}'.format(bucket, timestamp)
print(output_path)
s3://sagemaker-us-east-1-657781576110/a2i-results-1698229936
Construct the Flow Definition with the workteam and human task UI in the human loop configurations that you created above.
Instructions: Pass the workteam and human task UI ARNs into the HumanLoopConfig
dictionary within the function sm.create_flow_definition
. Review the other parameters.
# Flow definition name - this value is unique per account and region
flow_definition_name = 'fd-{}'.format(timestamp)
create_workflow_definition_response = sm.create_flow_definition(
FlowDefinitionName=flow_definition_name,
RoleArn=role,
HumanLoopConfig={
### BEGIN SOLUTION - DO NOT delete this comment for grading purposes
"WorkteamArn": workteam_arn,
"HumanTaskUiArn": human_task_ui_arn,
### END SOLUTION - DO NOT delete this comment for grading purposes
"TaskCount": 1, # the number of workers that receive a task
"TaskDescription": "Classify Reviews into sentiment: -1 (negative), 0 (neutral), 1 (positive)",
"TaskTitle": "Classify Reviews into sentiment: -1 (negative), 0 (neutral), 1 (positive)",
},
OutputConfig={"S3OutputPath": output_path},
)
augmented_ai_flow_definition_arn = create_workflow_definition_response["FlowDefinitionArn"]
You can pull information about the Flow Definition with the function sm.describe_flow_definition
and wait for its status value FlowDefinitionStatus
to become Active
.
for _ in range(60):
describe_flow_definition_response = sm.describe_flow_definition(FlowDefinitionName=flow_definition_name)
print(describe_flow_definition_response["FlowDefinitionStatus"])
if describe_flow_definition_response["FlowDefinitionStatus"] == "Active":
print("Flow Definition is active")
break
time.sleep(2)
Initializing
Active
Flow Definition is active
Deploy a custom ML model into an endpoint and call it to predict labels for some sample reviews. Check the confidence score for each prediction. If it is smaller than the threshold, engage your workforce for a human review, starting a human loop. Fix the labels by completing the human loop tasks and review the results.
Set up a sentiment predictor class to be wrapped later into the PyTorch Model.
Create a Sentiment Predictor class.
Instructions: Pass the JSON serializer and deserializer objects here, calling them with the functions JSONLinesSerializer()
and JSONLinesDeserializer()
, respectively. More information about the serializers can be found here.
from sagemaker.predictor import Predictor
from sagemaker.serializers import JSONLinesSerializer
from sagemaker.deserializers import JSONLinesDeserializer
class SentimentPredictor(Predictor):
def __init__(self, endpoint_name, sagemaker_session):
super().__init__(
endpoint_name,
sagemaker_session=sagemaker_session,
### BEGIN SOLUTION - DO NOT delete this comment for grading purposes
serializer=JSONLinesSerializer(),
deserializer=JSONLinesDeserializer()
### END SOLUTION - DO NOT delete this comment for grading purposes
)
Create a SageMaker model based on the model artifact saved in the S3 bucket.
from sagemaker.pytorch.model import PyTorchModel
pytorch_model_name = 'model-{}'.format(timestamp)
model = PyTorchModel(name=pytorch_model_name,
model_data='s3://dlai-practical-data-science/models/ab/variant_a/model.tar.gz',
predictor_cls=SentimentPredictor,
entry_point='inference.py',
source_dir='src',
framework_version='1.6.0',
py_version='py3',
role=role)
Now you will create a SageMaker Endpoint from the model. For the purposes of this lab, you will use a relatively small instance type. Please refer to this link for additional instance types that may work for your use cases outside of this lab.
%%time
pytorch_endpoint_name = 'endpoint-{}'.format(timestamp)
predictor = model.deploy(initial_instance_count=1,
instance_type='ml.m5.large',
endpoint_name=pytorch_endpoint_name)
----------!CPU times: user 2min 22s, sys: 9.97 s, total: 2min 32s
Wall time: 8min 6s
You can review the endpoint in the AWS console and check its status.
from IPython.core.display import display, HTML
display(HTML('<b>Review <a target="blank" href="https://console.aws.amazon.com/sagemaker/home?region={}#/endpoints/{}">SageMaker REST Endpoint</a></b>'.format(region, pytorch_endpoint_name)))
Review SageMaker REST Endpoint
Let’s create a list of sample reviews.
reviews = ["I enjoy this product",
"I am unhappy with this product",
"It is okay",
"sometimes it works"]
Now you can send each of the sample reviews to the model via the predictor.predict()
API call. Note that you need to pass the reviews in the JSON format that model expects as input. Then, you parse the model’s response to obtain the predicted label and the confidence score.
After that, you check the condition for when you want to engage a human for review. You can check whether the returned confidence score is under the defined threshold of 90%, which would mean that you would want to start the human loop with the predicted label and the review as inputs. Finally, you start the human loop passing the input content and Flow Definition defined above.
Complete the dictionary input_content
, which should contain the original prediction ('initialValue'
key) and review text ('taskObject'
key).
import json
human_loops_started = []
CONFIDENCE_SCORE_THRESHOLD = 0.90
for review in reviews:
inputs = [
{"features": [review]},
]
response = predictor.predict(inputs)
print(response)
prediction = response[0]['predicted_label']
confidence_score = response[0]['probability']
print('Checking prediction confidence {} for sample review: "{}"'.format(confidence_score, review))
# condition for when you want to engage a human for review
if confidence_score < CONFIDENCE_SCORE_THRESHOLD:
human_loop_name = str(time.time()).replace('.', '-') # using milliseconds
input_content = {
### BEGIN SOLUTION - DO NOT delete this comment for grading purposes
"initialValue": prediction,
"taskObject": review
### END SOLUTION - DO NOT delete this comment for grading purposes
}
start_loop_response = a2i.start_human_loop(
HumanLoopName=human_loop_name,
FlowDefinitionArn=augmented_ai_flow_definition_arn,
HumanLoopInput={"InputContent": json.dumps(input_content)},
)
human_loops_started.append(human_loop_name)
print(
f"Confidence score of {confidence_score * 100}% for prediction of {prediction} is less than the threshold of {CONFIDENCE_SCORE_THRESHOLD * 100}%"
)
print(f"*** ==> Starting human loop with name: {human_loop_name} \n")
else:
print(
f"Confidence score of {confidence_score * 100}% for star rating of {prediction} is above threshold of {CONFIDENCE_SCORE_THRESHOLD * 100}%"
)
print("Human loop not needed. \n")
[{'probability': 0.9376369118690491, 'predicted_label': 1}]
Checking prediction confidence 0.9376369118690491 for sample review: "I enjoy this product"
Confidence score of 93.76369118690491% for star rating of 1 is above threshold of 90.0%
Human loop not needed.
[{'probability': 0.6340296864509583, 'predicted_label': -1}]
Checking prediction confidence 0.6340296864509583 for sample review: "I am unhappy with this product"
Confidence score of 63.402968645095825% for prediction of -1 is less than the threshold of 90.0%
*** ==> Starting human loop with name: 1698230493-857691
[{'probability': 0.5422114729881287, 'predicted_label': 1}]
Checking prediction confidence 0.5422114729881287 for sample review: "It is okay"
Confidence score of 54.221147298812866% for prediction of 1 is less than the threshold of 90.0%
*** ==> Starting human loop with name: 1698230494-3784657
[{'probability': 0.3931102454662323, 'predicted_label': 1}]
Checking prediction confidence 0.3931102454662323 for sample review: "sometimes it works"
Confidence score of 39.31102454662323% for prediction of 1 is less than the threshold of 90.0%
*** ==> Starting human loop with name: 1698230494-8446784
Review the results above. Three of the sample reviews with the probability scores lower than the threshold went into the human loop. The original predicted labels are passed together with the review text and will be seen in the task.
Function a2i.describe_human_loop
can be used to pull the information about the human loop.
completed_human_loops = []
for human_loop_name in human_loops_started:
resp = a2i.describe_human_loop(HumanLoopName=human_loop_name)
print(f"HumanLoop Name: {human_loop_name}")
print(f'HumanLoop Status: {resp["HumanLoopStatus"]}')
print(f'HumanLoop Output Destination: {resp["HumanLoopOutput"]}')
print("")
if resp["HumanLoopStatus"] == "Completed":
completed_human_loops.append(resp)
HumanLoop Name: 1698230493-857691
HumanLoop Status: InProgress
HumanLoop Output Destination: {'OutputS3Uri': 's3://sagemaker-us-east-1-657781576110/a2i-results-1698229936/fd-1698229936/2023/10/25/10/41/34/1698230493-857691/output.json'}
HumanLoop Name: 1698230494-3784657
HumanLoop Status: InProgress
HumanLoop Output Destination: {'OutputS3Uri': 's3://sagemaker-us-east-1-657781576110/a2i-results-1698229936/fd-1698229936/2023/10/25/10/41/34/1698230494-3784657/output.json'}
HumanLoop Name: 1698230494-8446784
HumanLoop Status: InProgress
HumanLoop Output Destination: {'OutputS3Uri': 's3://sagemaker-us-east-1-657781576110/a2i-results-1698229936/fd-1698229936/2023/10/25/10/41/34/1698230494-8446784/output.json'}
Pull labeling UI from the workteam information to get into the human loop tasks in the AWS console.
labeling_ui = sm.describe_workteam(WorkteamName=workteam_name)["Workteam"]["SubDomain"]
print(labeling_ui)
9nj1o7kppv.labeling.us-east-1.sagemaker.aws
Navigate to the link below and login with the defined username and password. Complete the human loop following the provided instructions.
from IPython.core.display import display, HTML
display(HTML('Click <a target="blank" href="https://{}"><b>here</b></a> to start labeling with username <b>{}</b> and temporary password <b>{}</b>'.format(labeling_ui, user_name, temporary_password)))
Click here to start labeling with username user-1698229936 and temporary password Password@420
Wait for workers to complete ^^ their human loop tasks ^^
import time
completed_human_loops = []
for human_loop_name in human_loops_started:
resp = a2i.describe_human_loop(HumanLoopName=human_loop_name)
print(f"HumanLoop Name: {human_loop_name}")
print(f'HumanLoop Status: {resp["HumanLoopStatus"]}')
print(f'HumanLoop Output Destination: {resp["HumanLoopOutput"]}')
print("")
while resp["HumanLoopStatus"] != "Completed":
print(f"Waiting for HumanLoop to complete.")
time.sleep(10)
resp = a2i.describe_human_loop(HumanLoopName=human_loop_name)
if resp["HumanLoopStatus"] == "Completed":
completed_human_loops.append(resp)
print(f"Completed!")
print("")
HumanLoop Name: 1698230493-857691
HumanLoop Status: InProgress
HumanLoop Output Destination: {'OutputS3Uri': 's3://sagemaker-us-east-1-657781576110/a2i-results-1698229936/fd-1698229936/2023/10/25/10/41/34/1698230493-857691/output.json'}
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Completed!
HumanLoop Name: 1698230494-3784657
HumanLoop Status: Completed
HumanLoop Output Destination: {'OutputS3Uri': 's3://sagemaker-us-east-1-657781576110/a2i-results-1698229936/fd-1698229936/2023/10/25/10/41/34/1698230494-3784657/output.json'}
Completed!
HumanLoop Name: 1698230494-8446784
HumanLoop Status: Completed
HumanLoop Output Destination: {'OutputS3Uri': 's3://sagemaker-us-east-1-657781576110/a2i-results-1698229936/fd-1698229936/2023/10/25/10/41/34/1698230494-8446784/output.json'}
Completed!
Once the work is complete, Amazon A2I stores the results in the specified S3 bucket and sends a Cloudwatch Event. Let’s check the S3 contents.
import re
from pprint import pprint
fixed_items = []
for resp in completed_human_loops:
split_string = re.split("s3://" + bucket + "/", resp["HumanLoopOutput"]["OutputS3Uri"])
output_bucket_key = split_string[1]
response = s3.get_object(Bucket=bucket, Key=output_bucket_key)
content = response["Body"].read().decode("utf-8")
json_output = json.loads(content)
pprint(json_output)
input_content = json_output["inputContent"]
human_answer = json_output["humanAnswers"][0]["answerContent"]
fixed_item = {"input_content": input_content, "human_answer": human_answer}
fixed_items.append(fixed_item)
{'flowDefinitionArn': 'arn:aws:sagemaker:us-east-1:657781576110:flow-definition/fd-1698229936',
'humanAnswers': [{'acceptanceTime': '2023-10-25T10:43:47.979Z',
'answerContent': {'sentiment': {'label': '-1'}},
'submissionTime': '2023-10-25T10:43:50.706Z',
'timeSpentInSeconds': 2.727,
'workerId': 'e696ecc324cbfd32',
'workerMetadata': {'identityData': {'identityProviderType': 'Cognito',
'issuer': 'https://cognito-idp.us-east-1.amazonaws.com/us-east-1_fTkk2p8PL',
'sub': '0dcd26c4-77c4-4bee-850a-2dcdb1bb5a8f'}}}],
'humanLoopName': '1698230493-857691',
'inputContent': {'initialValue': -1,
'taskObject': 'I am unhappy with this product'}}
{'flowDefinitionArn': 'arn:aws:sagemaker:us-east-1:657781576110:flow-definition/fd-1698229936',
'humanAnswers': [{'acceptanceTime': '2023-10-25T10:43:45.001Z',
'answerContent': {'sentiment': {'label': '1'}},
'submissionTime': '2023-10-25T10:43:47.895Z',
'timeSpentInSeconds': 2.894,
'workerId': 'e696ecc324cbfd32',
'workerMetadata': {'identityData': {'identityProviderType': 'Cognito',
'issuer': 'https://cognito-idp.us-east-1.amazonaws.com/us-east-1_fTkk2p8PL',
'sub': '0dcd26c4-77c4-4bee-850a-2dcdb1bb5a8f'}}}],
'humanLoopName': '1698230494-3784657',
'inputContent': {'initialValue': 1, 'taskObject': 'It is okay'}}
{'flowDefinitionArn': 'arn:aws:sagemaker:us-east-1:657781576110:flow-definition/fd-1698229936',
'humanAnswers': [{'acceptanceTime': '2023-10-25T10:43:34.898Z',
'answerContent': {'sentiment': {'label': '0'}},
'submissionTime': '2023-10-25T10:43:44.907Z',
'timeSpentInSeconds': 10.009,
'workerId': 'e696ecc324cbfd32',
'workerMetadata': {'identityData': {'identityProviderType': 'Cognito',
'issuer': 'https://cognito-idp.us-east-1.amazonaws.com/us-east-1_fTkk2p8PL',
'sub': '0dcd26c4-77c4-4bee-850a-2dcdb1bb5a8f'}}}],
'humanLoopName': '1698230494-8446784',
'inputContent': {'initialValue': 1, 'taskObject': 'sometimes it works'}}
Now you can prepare the data for re-training.
df_fixed_items = pd.DataFrame(fixed_items)
df_fixed_items.head()
.dataframe tbody tr th {
vertical-align: top;
}
.dataframe thead th {
text-align: right;
}
input_content | human_answer | |
---|---|---|
0 | {'initialValue': -1, 'taskObject': 'I am unhap... | {'sentiment': {'label': '-1'}} |
1 | {'initialValue': 1, 'taskObject': 'It is okay'} | {'sentiment': {'label': '1'}} |
2 | {'initialValue': 1, 'taskObject': 'sometimes i... | {'sentiment': {'label': '0'}} |
Upload the notebook into S3 bucket for grading purposes.
Note: you may need to click on “Save” button before the upload.
!aws s3 cp ./C3_W3_Assignment.ipynb s3://$bucket/C3_W3_Assignment_Learner.ipynb
upload: ./C3_W3_Assignment.ipynb to s3://sagemaker-us-east-1-657781576110/C3_W3_Assignment_Learner.ipynb