Coursera

Data labeling and human-in-the-loop pipelines with Amazon Augmented AI (A2I)

Introduction

In this lab you will create your own human workforce, a human task UI, and then define the human review workflow to perform data labeling. You will make the original predictions of the labels with the custom ML model, and then create a human loop if the probability scores are lower than the preset threshold. After the completion of the human loop tasks, you will review the results and prepare data for re-training.

Table of Contents

Let’s install and import the required modules.

# please ignore warning messages during the installation
!pip install --disable-pip-version-check -q sagemaker==2.35.0
!pip install -q protobuf==3.20.*
DEPRECATION: pyodbc 4.0.0-unsupported has a non-standard version number. pip 23.3 will enforce this behaviour change. A possible replacement is to upgrade to a newer version of pyodbc or contact the author to suggest that they release a version with a conforming version number. Discussion can be found at https://github.com/pypa/pip/issues/12063
WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv
DEPRECATION: pyodbc 4.0.0-unsupported has a non-standard version number. pip 23.3 will enforce this behaviour change. A possible replacement is to upgrade to a newer version of pyodbc or contact the author to suggest that they release a version with a conforming version number. Discussion can be found at https://github.com/pypa/pip/issues/12063
WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv

[notice] A new release of pip is available: 23.2.1 -> 23.3.1
[notice] To update, run: pip install --upgrade pip
import boto3
import sagemaker
import pandas as pd
from pprint import pprint
import botocore

config = botocore.config.Config(user_agent_extra='dlai-pds/c3/w3')

# low-level service client of the boto3 session
sm = boto3.client(service_name='sagemaker', 
                  config=config)

sm_runtime = boto3.client('sagemaker-runtime',
                          config=config)

sess = sagemaker.Session(sagemaker_client=sm,
                         sagemaker_runtime_client=sm_runtime)

bucket = sess.default_bucket()
role = sagemaker.get_execution_role()
region = sess.boto_region_name

s3 = boto3.Session().client(service_name='s3', 
                            config=config)
cognito_idp = boto3.Session().client(service_name='cognito-idp', 
                                     config=config)
a2i = boto3.Session().client(service_name='sagemaker-a2i-runtime', 
                             config=config)
/opt/conda/lib/python3.7/site-packages/boto3/compat.py:82: PythonDeprecationWarning: Boto3 will no longer support Python 3.7 starting December 13, 2023. To continue receiving service updates, bug fixes, and security updates please upgrade to Python 3.8 or later. More information can be found here: https://aws.amazon.com/blogs/developer/python-support-policy-updates-for-aws-sdks-and-tools/
  warnings.warn(warning, PythonDeprecationWarning)

1. Set up Amazon Cognito user pool and define human workforce

The first step in the creation of the human-in-the-loop pipeline will be to create your own private workforce.

Amazon Cognito provides authentication, authorization, and user management for apps. This enables your workers to sign in directly to the labeling UI with a username and password.

You will construct an Amazon Cognito user pool, setting up its client, domain, and group. Then you’ll create a SageMaker workforce, linking it to the Cognito user pool. Followed by the creation of a SageMaker workteam, linking it to the Cognito user pool and group. And finally, you will create a pool user and add it to the group.

To get started, let’s construct the user pool and user pool client names.

import time
timestamp = int(time.time())

user_pool_name = 'groundtruth-user-pool-{}'.format(timestamp)
user_pool_client_name = 'groundtruth-user-pool-client-{}'.format(timestamp)

print("Amazon Cognito user pool name: {}".format(user_pool_name))
print("Amazon Cognito user pool client name: {}".format(user_pool_client_name))
Amazon Cognito user pool name: groundtruth-user-pool-1698229936
Amazon Cognito user pool client name: groundtruth-user-pool-client-1698229936

1.1. Create Amazon Cognito user pool

Function cognito_idp.create_user_pool creates a new Amazon Cognito user pool. Passing the function result into a variable you can get the information about the response. The result is in dictionary format.

create_user_pool_response = cognito_idp.create_user_pool(PoolName=user_pool_name)
user_pool_id = create_user_pool_response['UserPool']['Id']

print("Amazon Cognito user pool ID: {}".format(user_pool_id))
Amazon Cognito user pool ID: us-east-1_fTkk2p8PL

Exercise 1

Pull the Amazon Cognito user pool name from its description.

Instructions: Print the keys of the user pool, choose the one that corresponds to the name and print its value.

print(create_user_pool_response['UserPool'].keys())
dict_keys(['Id', 'Name', 'Policies', 'DeletionProtection', 'LambdaConfig', 'LastModifiedDate', 'CreationDate', 'SchemaAttributes', 'VerificationMessageTemplate', 'UserAttributeUpdateSettings', 'MfaConfiguration', 'EstimatedNumberOfUsers', 'EmailConfiguration', 'AdminCreateUserConfig', 'Arn', 'AccountRecoverySetting'])
### BEGIN SOLUTION - DO NOT delete this comment for grading purposes
user_pool_name = create_user_pool_response["UserPool"]["Name"]
### END SOLUTION - DO NOT delete this comment for grading purposes
print('Amazon Cognito user pool name: {}'.format(user_pool_name))
Amazon Cognito user pool name: groundtruth-user-pool-1698229936

1.2. Create Amazon Cognito user pool client

Now let’s set up the Amazon Cognito user pool client for the created above user pool.

The Amazon Cognito user pool client implements an open standard for authorization framework, OAuth. The standard enables apps to obtain limited access (scopes) to a user’s data without giving away a user’s password. It decouples authentication from authorization and supports multiple use cases addressing different device capabilities.

Exercise 2

Create the Amazon Cognito user pool client for the constructed user pool.

Instructions: Pass the user pool ID and the user pool client name into the function cognito_idp.create_user_pool_client. Review the other parameters of the function.

### BEGIN SOLUTION - DO NOT delete this comment for grading purposes
create_user_pool_client_response = cognito_idp.create_user_pool_client(
    UserPoolId=user_pool_id,
    ClientName=user_pool_client_name,
### END SOLUTION - DO NOT delete this comment for grading purposes
    GenerateSecret=True, # boolean to specify whether you want to generate a secret
    # a list of provider names for the identity providers that are supported on this client, e.g. Cognito, Facebook, Google
    SupportedIdentityProviders=[
        'COGNITO' 
    ],
    # a list of the allowed OAuth flows, e.g. code, implicit, client_credentials
    AllowedOAuthFlows=[
        'code',
        'implicit'
    ],
    # a list of the allowed OAuth scopes, e.g. phone, email, openid, and profile
    AllowedOAuthScopes=[
        'email',
        'openid',
        'profile'
    ],
    # a list of allowed redirect (callback) URLs for the identity providers
    CallbackURLs=[
        'https://datascienceonaws.com', 
    ],
    # set to true if the client is allowed to follow the OAuth protocol when interacting with Cognito user pools
    AllowedOAuthFlowsUserPoolClient=True
)

client_id = create_user_pool_client_response['UserPoolClient']['ClientId']
print('Amazon Cognito user pool client ID: {}'.format(client_id))
Amazon Cognito user pool client ID: 19apctv0pmtceunu3ttp5rpkhu

1.3. Create Amazon Cognito user pool domain and group

Exercise 3

Set up the Amazon Cognito user pool domain for the constructed user pool.

Instructions: Pass the user pool ID and the user pool domain name into the function cognito_idp.create_user_pool_domain.

user_pool_domain_name = 'groundtruth-user-pool-domain-{}'.format(timestamp)

try:
    ### BEGIN SOLUTION - DO NOT delete this comment for grading purposes
    cognito_idp.create_user_pool_domain(
        UserPoolId=user_pool_id,
        Domain=user_pool_domain_name 
    ### END SOLUTION - DO NOT delete this comment for grading purposes
    )
    print("Created Amazon Cognito user pool domain: {}".format(user_pool_domain_name))
except:
    print("Amazon Cognito user pool domain {} already exists".format(user_pool_domain_name))
Created Amazon Cognito user pool domain: groundtruth-user-pool-domain-1698229936

You will use the following function to check if the Amazon Cognito user group already exists.

def check_user_pool_group_existence(user_pool_id, user_pool_group_name):  
    for group in cognito_idp.list_groups(UserPoolId=user_pool_id)['Groups']:
        if user_pool_group_name == group['GroupName']:
            return True
    return False

Exercise 4

Set up Amazon Cognito user group.

Instructions: Pass the user pool ID and the user group name into the function cognito_idp.create_group.

user_pool_group_name = 'groundtruth-user-pool-group-{}'.format(timestamp)

if not check_user_pool_group_existence(user_pool_id, user_pool_group_name):
    ### BEGIN SOLUTION - DO NOT delete this comment for grading purposes
    cognito_idp.create_group(
        UserPoolId=user_pool_id,
        GroupName=user_pool_group_name
    ### END SOLUTION - DO NOT delete this comment for grading purposes
    )
    print("Created Amazon Cognito user group: {}".format(user_pool_group_name))
else:
    print("Amazon Cognito user group {} already exists".format(user_pool_group_name))
Created Amazon Cognito user group: groundtruth-user-pool-group-1698229936

1.4. Create workforce and workteam

Use the following function to check if the workforce already exists. You can only create one workforce per region, therefore you’ll have to delete any other existing workforce, together with all of the related workteams.

def check_workforce_existence(workforce_name):  
    for workforce in sm.list_workforces()['Workforces']:
        if workforce_name == workforce['WorkforceName']:
            return True
        else:
            for workteam in sm.list_workteams()['Workteams']:
                sm.delete_workteam(WorkteamName=workteam['WorkteamName'])
            sm.delete_workforce(WorkforceName=workforce['WorkforceName'])
    return False

Exercise 5

Create a workforce.

Instructions: Pass the Amazon Cognito user pool ID and client ID into the Cognito configuration of the function sm.create_workforce.

workforce_name = 'groundtruth-workforce-name-{}'.format(timestamp)

if not check_workforce_existence(workforce_name):
    create_workforce_response = sm.create_workforce(
        WorkforceName=workforce_name,
        CognitoConfig={
            ### BEGIN SOLUTION - DO NOT delete this comment for grading purposes
            'UserPool': user_pool_id,
            'ClientId': client_id
            ### END SOLUTION - DO NOT delete this comment for grading purposes
        }
    )
    print("Workforce name: {}".format(workforce_name))
    pprint(create_workforce_response)
else:
    print("Workforce {} already exists".format(workforce_name))
Workforce name: groundtruth-workforce-name-1698229936
{'ResponseMetadata': {'HTTPHeaders': {'content-length': '107',
                                      'content-type': 'application/x-amz-json-1.1',
                                      'date': 'Wed, 25 Oct 2023 10:32:19 GMT',
                                      'x-amzn-requestid': '2683e1a6-6b5f-4ce7-8a20-e4498374d9ef'},
                      'HTTPStatusCode': 200,
                      'RequestId': '2683e1a6-6b5f-4ce7-8a20-e4498374d9ef',
                      'RetryAttempts': 0},
 'WorkforceArn': 'arn:aws:sagemaker:us-east-1:657781576110:workforce/groundtruth-workforce-name-1698229936'}

You can use the sm.describe_workforce function to get the information about the workforce.

describe_workforce_response = sm.describe_workforce(WorkforceName=workforce_name)
describe_workforce_response
{'Workforce': {'WorkforceName': 'groundtruth-workforce-name-1698229936',
  'WorkforceArn': 'arn:aws:sagemaker:us-east-1:657781576110:workforce/groundtruth-workforce-name-1698229936',
  'LastUpdatedDate': datetime.datetime(2023, 10, 25, 10, 32, 20, 42000, tzinfo=tzlocal()),
  'SourceIpConfig': {'Cidrs': []},
  'SubDomain': '9nj1o7kppv.labeling.us-east-1.sagemaker.aws',
  'CognitoConfig': {'UserPool': 'us-east-1_fTkk2p8PL',
   'ClientId': '19apctv0pmtceunu3ttp5rpkhu'},
  'CreateDate': datetime.datetime(2023, 10, 25, 10, 32, 19, 503000, tzinfo=tzlocal()),
  'Status': 'Initializing'},
 'ResponseMetadata': {'RequestId': '6ec6701f-ff6a-4c40-9b84-581bb404bedc',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '6ec6701f-ff6a-4c40-9b84-581bb404bedc',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '445',
   'date': 'Wed, 25 Oct 2023 10:32:19 GMT'},
  'RetryAttempts': 0}}

Use the following function to check if the workteam already exists. If there are no workteams in the list, give some time for the workforce to set up.

def check_workteam_existence(workteam_name):  
    if sm.list_workteams()['Workteams']:
        for workteam in sm.list_workteams()['Workteams']:
            if workteam_name == workteam['WorkteamName']:
                return True
    else:
        time.sleep(60)
        return False
    return False

Exercise 6

Create a workteam.

Instructions: Pass the Amazon Cognito user pool ID, client ID, and group name into the Cognito member definition of the function sm.create_workteam.

This cell may take 1-2 minutes to run.

workteam_name = 'groundtruth-workteam-{}'.format(timestamp)

if not check_workteam_existence(workteam_name):
    create_workteam_response = sm.create_workteam(
        Description='groundtruth workteam',
        WorkforceName=workforce_name,
        WorkteamName=workteam_name,
        # objects that identify the workers that make up the work team
        MemberDefinitions=[{
            'CognitoMemberDefinition': {
                ### BEGIN SOLUTION - DO NOT delete this comment for grading purposes
                'UserPool': user_pool_id,
                'ClientId': client_id,
                'UserGroup': user_pool_group_name
                ### END SOLUTION - DO NOT delete this comment for grading purposes
            }
        }]
    )
    pprint(create_workteam_response)
else:
    print("Workteam {} already exists".format(workteam_name))
{'ResponseMetadata': {'HTTPHeaders': {'content-length': '113',
                                      'content-type': 'application/x-amz-json-1.1',
                                      'date': 'Wed, 25 Oct 2023 10:33:21 GMT',
                                      'x-amzn-requestid': '1532cd79-81ca-43ef-a4f5-c133844d0762'},
                      'HTTPStatusCode': 200,
                      'RequestId': '1532cd79-81ca-43ef-a4f5-c133844d0762',
                      'RetryAttempts': 0},
 'WorkteamArn': 'arn:aws:sagemaker:us-east-1:657781576110:workteam/private-crowd/groundtruth-workteam-1698229936'}

You can use sm.describe_workteam function to get information about the workteam.

describe_workteam_response = sm.describe_workteam(WorkteamName=workteam_name)
describe_workteam_response
{'Workteam': {'WorkteamName': 'groundtruth-workteam-1698229936',
  'MemberDefinitions': [{'CognitoMemberDefinition': {'UserPool': 'us-east-1_fTkk2p8PL',
     'UserGroup': 'groundtruth-user-pool-group-1698229936',
     'ClientId': '19apctv0pmtceunu3ttp5rpkhu'}}],
  'WorkteamArn': 'arn:aws:sagemaker:us-east-1:657781576110:workteam/private-crowd/groundtruth-workteam-1698229936',
  'Description': 'groundtruth workteam',
  'SubDomain': '9nj1o7kppv.labeling.us-east-1.sagemaker.aws',
  'CreateDate': datetime.datetime(2023, 10, 25, 10, 33, 20, 678000, tzinfo=tzlocal()),
  'LastUpdatedDate': datetime.datetime(2023, 10, 25, 10, 33, 21, 559000, tzinfo=tzlocal()),
  'NotificationConfiguration': {}},
 'ResponseMetadata': {'RequestId': '3af113f0-2c2b-4a2f-a03f-dc1b949fcd8b',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '3af113f0-2c2b-4a2f-a03f-dc1b949fcd8b',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '544',
   'date': 'Wed, 25 Oct 2023 10:33:21 GMT'},
  'RetryAttempts': 0}}

Now you can pull the workteam ARN either from create_workteam_response or describe_workteam_response.

workteam_arn = describe_workteam_response['Workteam']['WorkteamArn']
workteam_arn
'arn:aws:sagemaker:us-east-1:657781576110:workteam/private-crowd/groundtruth-workteam-1698229936'

Review the created workteam in the AWS console.

Instructions:

from IPython.core.display import display, HTML

display(HTML('<b>Review <a target="blank" href="https://{}.console.aws.amazon.com/sagemaker/groundtruth?region={}#/labeling-workforces/private-details/{}">workteam</a></b>'.format(region, region, workteam_name)))

Review workteam

1.5. Create an Amazon Cognito user and add the user to the group

Use the following function to check if the Amazon Cognito user already exists.

def check_user_existence(user_pool_id, user_name):  
    for user in cognito_idp.list_users(UserPoolId=user_pool_id)['Users']:
        if user_name == user['Username']:
            return True
    return False

Create a user passing the username, temporary password, and the Amazon Cognito user pool ID.

user_name = 'user-{}'.format(timestamp)

temporary_password = 'Password@420'

if not check_user_existence(user_pool_id, user_name):
    create_user_response=cognito_idp.admin_create_user(
        Username=user_name,
        UserPoolId=user_pool_id,
        TemporaryPassword=temporary_password,
        MessageAction='SUPPRESS' # suppress sending the invitation message to a user that already exists
    )
    pprint(create_user_response)
else:
    print("Amazon Cognito user {} already exists".format(user_name))
{'ResponseMetadata': {'HTTPHeaders': {'connection': 'keep-alive',
                                      'content-length': '242',
                                      'content-type': 'application/x-amz-json-1.1',
                                      'date': 'Wed, 25 Oct 2023 10:33:22 GMT',
                                      'x-amzn-requestid': 'bfefdfe3-bb02-4575-a116-88d9668c3d6f'},
                      'HTTPStatusCode': 200,
                      'RequestId': 'bfefdfe3-bb02-4575-a116-88d9668c3d6f',
                      'RetryAttempts': 0},
 'User': {'Attributes': [{'Name': 'sub',
                          'Value': '0dcd26c4-77c4-4bee-850a-2dcdb1bb5a8f'}],
          'Enabled': True,
          'UserCreateDate': datetime.datetime(2023, 10, 25, 10, 33, 22, 181000, tzinfo=tzlocal()),
          'UserLastModifiedDate': datetime.datetime(2023, 10, 25, 10, 33, 22, 181000, tzinfo=tzlocal()),
          'UserStatus': 'FORCE_CHANGE_PASSWORD',
          'Username': 'user-1698229936'}}

Add the user into the Amazon Cognito user group.

cognito_idp.admin_add_user_to_group(
    UserPoolId=user_pool_id,
    Username=user_name,
    GroupName=user_pool_group_name
)
{'ResponseMetadata': {'RequestId': '9ef25502-ecbf-48da-9804-ab22965bc792',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Wed, 25 Oct 2023 10:33:22 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '0',
   'connection': 'keep-alive',
   'x-amzn-requestid': '9ef25502-ecbf-48da-9804-ab22965bc792'},
  'RetryAttempts': 0}}

2. Create Human Task UI

Create a Human Task UI resource, using a worker task UI template. This template will be rendered to the human workers whenever human interaction is required.

Below there is a simple demo template provided, that is compatible with the current use case of classifying product reviews into the three sentiment classes. For other pre-built UIs (there are 70+), check: https://github.com/aws-samples/amazon-a2i-sample-task-uis

template = r"""
<script src="https://assets.crowd.aws/crowd-html-elements.js"></script>

<crowd-form>
    <crowd-classifier name="sentiment"
                      categories="['-1', '0', '1']"
                      initial-value=""
                      header="Classify Reviews into Sentiment:  -1 (negative), 0 (neutral), and 1 (positive)">
      
        <classification-target>
            
        </classification-target>
      
        <full-instructions header="Classify reviews into sentiment:  -1 (negative), 0 (neutral), and 1 (positive)">
            <p><strong>1</strong>: joy, excitement, delight</p>       
            <p><strong>0</strong>: neither positive or negative, such as stating a fact</p>
            <p><strong>-1</strong>: anger, sarcasm, anxiety</p>
        </full-instructions>

        <short-instructions>
            Classify reviews into sentiment:  -1 (negative), 0 (neutral), and 1 (positive)
        </short-instructions>
    </crowd-classifier>
</crowd-form>
"""

Exercise 7

Create a human task UI resource.

Instructions: Pass the worker task UI template defined above as the content of the UI template parameter to the function sm.create_human_task_ui.

# Task UI name - this value is unique per account and region. You can also provide your own value here.
task_ui_name = 'ui-{}'.format(timestamp)

human_task_ui_response = sm.create_human_task_ui(
    HumanTaskUiName=task_ui_name,
    UiTemplate={
        ### BEGIN SOLUTION - DO NOT delete this comment for grading purposes
        "Content": template
        ### END SOLUTION - DO NOT delete this comment for grading purposes
    }
)
human_task_ui_response
{'HumanTaskUiArn': 'arn:aws:sagemaker:us-east-1:657781576110:human-task-ui/ui-1698229936',
 'ResponseMetadata': {'RequestId': '08e48f98-1014-44e5-9ea1-939d426d0d5b',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '08e48f98-1014-44e5-9ea1-939d426d0d5b',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '89',
   'date': 'Wed, 25 Oct 2023 10:33:22 GMT'},
  'RetryAttempts': 0}}

Pull the ARN of the human task UI:

human_task_ui_arn = human_task_ui_response["HumanTaskUiArn"]
print(human_task_ui_arn)
arn:aws:sagemaker:us-east-1:657781576110:human-task-ui/ui-1698229936

3. Define human review workflow

In this section, you are going to create a Flow Definition. Flow Definitions allow you to specify:

Here you are going to use the API, but you can optionally create this workflow definition in the console as well.

For more details and instructions, see: https://docs.aws.amazon.com/sagemaker/latest/dg/a2i-create-flow-definition.html.

Let’s construct the S3 bucket output path.

output_path = 's3://{}/a2i-results-{}'.format(bucket, timestamp)
print(output_path)
s3://sagemaker-us-east-1-657781576110/a2i-results-1698229936

Exercise 8

Construct the Flow Definition with the workteam and human task UI in the human loop configurations that you created above.

Instructions: Pass the workteam and human task UI ARNs into the HumanLoopConfig dictionary within the function sm.create_flow_definition. Review the other parameters.

# Flow definition name - this value is unique per account and region
flow_definition_name = 'fd-{}'.format(timestamp)

create_workflow_definition_response = sm.create_flow_definition(
    FlowDefinitionName=flow_definition_name,
    RoleArn=role,
    HumanLoopConfig={
        ### BEGIN SOLUTION - DO NOT delete this comment for grading purposes
        "WorkteamArn": workteam_arn,
        "HumanTaskUiArn": human_task_ui_arn,
        ### END SOLUTION - DO NOT delete this comment for grading purposes
        "TaskCount": 1, # the number of workers that receive a task
        "TaskDescription": "Classify Reviews into sentiment:  -1 (negative), 0 (neutral), 1 (positive)",
        "TaskTitle": "Classify Reviews into sentiment:  -1 (negative), 0 (neutral), 1 (positive)",
    },
    OutputConfig={"S3OutputPath": output_path},
)

augmented_ai_flow_definition_arn = create_workflow_definition_response["FlowDefinitionArn"]

You can pull information about the Flow Definition with the function sm.describe_flow_definition and wait for its status value FlowDefinitionStatus to become Active.

for _ in range(60):
    describe_flow_definition_response = sm.describe_flow_definition(FlowDefinitionName=flow_definition_name)
    print(describe_flow_definition_response["FlowDefinitionStatus"])
    if describe_flow_definition_response["FlowDefinitionStatus"] == "Active":
        print("Flow Definition is active")
        break
    time.sleep(2)
Initializing
Active
Flow Definition is active

4. Start human loop with custom ML model

Deploy a custom ML model into an endpoint and call it to predict labels for some sample reviews. Check the confidence score for each prediction. If it is smaller than the threshold, engage your workforce for a human review, starting a human loop. Fix the labels by completing the human loop tasks and review the results.

4.1. Deploy a custom model

Set up a sentiment predictor class to be wrapped later into the PyTorch Model.

Exercise 9

Create a Sentiment Predictor class.

Instructions: Pass the JSON serializer and deserializer objects here, calling them with the functions JSONLinesSerializer() and JSONLinesDeserializer(), respectively. More information about the serializers can be found here.

from sagemaker.predictor import Predictor
from sagemaker.serializers import JSONLinesSerializer
from sagemaker.deserializers import JSONLinesDeserializer

class SentimentPredictor(Predictor):
    def __init__(self, endpoint_name, sagemaker_session):
        super().__init__(
            endpoint_name, 
            sagemaker_session=sagemaker_session,
            ### BEGIN SOLUTION - DO NOT delete this comment for grading purposes
            serializer=JSONLinesSerializer(),
            deserializer=JSONLinesDeserializer()
            ### END SOLUTION - DO NOT delete this comment for grading purposes
        )

Create a SageMaker model based on the model artifact saved in the S3 bucket.

from sagemaker.pytorch.model import PyTorchModel

pytorch_model_name = 'model-{}'.format(timestamp)

model = PyTorchModel(name=pytorch_model_name,
                     model_data='s3://dlai-practical-data-science/models/ab/variant_a/model.tar.gz',
                     predictor_cls=SentimentPredictor,
                     entry_point='inference.py',
                     source_dir='src',
                     framework_version='1.6.0',
                     py_version='py3',
                     role=role)

Now you will create a SageMaker Endpoint from the model. For the purposes of this lab, you will use a relatively small instance type. Please refer to this link for additional instance types that may work for your use cases outside of this lab.

This cell will take approximately 5-10 minutes to run.

%%time

pytorch_endpoint_name = 'endpoint-{}'.format(timestamp)

predictor = model.deploy(initial_instance_count=1, 
                         instance_type='ml.m5.large', 
                         endpoint_name=pytorch_endpoint_name)
----------!CPU times: user 2min 22s, sys: 9.97 s, total: 2min 32s
Wall time: 8min 6s

You can review the endpoint in the AWS console and check its status.

from IPython.core.display import display, HTML

display(HTML('<b>Review <a target="blank" href="https://console.aws.amazon.com/sagemaker/home?region={}#/endpoints/{}">SageMaker REST Endpoint</a></b>'.format(region, pytorch_endpoint_name)))

Review SageMaker REST Endpoint

4.2. Start the human loop

Let’s create a list of sample reviews.

reviews = ["I enjoy this product", 
           "I am unhappy with this product", 
           "It is okay", 
           "sometimes it works"]

Now you can send each of the sample reviews to the model via the predictor.predict() API call. Note that you need to pass the reviews in the JSON format that model expects as input. Then, you parse the model’s response to obtain the predicted label and the confidence score.

After that, you check the condition for when you want to engage a human for review. You can check whether the returned confidence score is under the defined threshold of 90%, which would mean that you would want to start the human loop with the predicted label and the review as inputs. Finally, you start the human loop passing the input content and Flow Definition defined above.

Exercise 10

Complete the dictionary input_content, which should contain the original prediction ('initialValue' key) and review text ('taskObject' key).

import json

human_loops_started = []

CONFIDENCE_SCORE_THRESHOLD = 0.90

for review in reviews:
    inputs = [
        {"features": [review]},
    ]

    response = predictor.predict(inputs)
    print(response)
    prediction = response[0]['predicted_label']
    confidence_score = response[0]['probability']

    print('Checking prediction confidence {} for sample review: "{}"'.format(confidence_score, review))

    # condition for when you want to engage a human for review
    if confidence_score < CONFIDENCE_SCORE_THRESHOLD:
        human_loop_name = str(time.time()).replace('.', '-') # using milliseconds
        input_content = {
            ### BEGIN SOLUTION - DO NOT delete this comment for grading purposes
            "initialValue": prediction,
            "taskObject": review
            ### END SOLUTION - DO NOT delete this comment for grading purposes
        }
        start_loop_response = a2i.start_human_loop(
            HumanLoopName=human_loop_name,
            FlowDefinitionArn=augmented_ai_flow_definition_arn,
            HumanLoopInput={"InputContent": json.dumps(input_content)},
        )

        human_loops_started.append(human_loop_name)

        print(
            f"Confidence score of {confidence_score * 100}% for prediction of {prediction} is less than the threshold of {CONFIDENCE_SCORE_THRESHOLD * 100}%"
        )
        print(f"*** ==> Starting human loop with name: {human_loop_name}  \n")
    else:
        print(
            f"Confidence score of {confidence_score * 100}% for star rating of {prediction} is above threshold of {CONFIDENCE_SCORE_THRESHOLD * 100}%"
        )
        print("Human loop not needed. \n")
[{'probability': 0.9376369118690491, 'predicted_label': 1}]
Checking prediction confidence 0.9376369118690491 for sample review: "I enjoy this product"
Confidence score of 93.76369118690491% for star rating of 1 is above threshold of 90.0%
Human loop not needed. 

[{'probability': 0.6340296864509583, 'predicted_label': -1}]
Checking prediction confidence 0.6340296864509583 for sample review: "I am unhappy with this product"
Confidence score of 63.402968645095825% for prediction of -1 is less than the threshold of 90.0%
*** ==> Starting human loop with name: 1698230493-857691  

[{'probability': 0.5422114729881287, 'predicted_label': 1}]
Checking prediction confidence 0.5422114729881287 for sample review: "It is okay"
Confidence score of 54.221147298812866% for prediction of 1 is less than the threshold of 90.0%
*** ==> Starting human loop with name: 1698230494-3784657  

[{'probability': 0.3931102454662323, 'predicted_label': 1}]
Checking prediction confidence 0.3931102454662323 for sample review: "sometimes it works"
Confidence score of 39.31102454662323% for prediction of 1 is less than the threshold of 90.0%
*** ==> Starting human loop with name: 1698230494-8446784  

Review the results above. Three of the sample reviews with the probability scores lower than the threshold went into the human loop. The original predicted labels are passed together with the review text and will be seen in the task.

4.3. Check status of the human loop

Function a2i.describe_human_loop can be used to pull the information about the human loop.

completed_human_loops = []
for human_loop_name in human_loops_started:
    resp = a2i.describe_human_loop(HumanLoopName=human_loop_name)
    print(f"HumanLoop Name: {human_loop_name}")
    print(f'HumanLoop Status: {resp["HumanLoopStatus"]}')
    print(f'HumanLoop Output Destination: {resp["HumanLoopOutput"]}')
    print("")

    if resp["HumanLoopStatus"] == "Completed":
        completed_human_loops.append(resp)
HumanLoop Name: 1698230493-857691
HumanLoop Status: InProgress
HumanLoop Output Destination: {'OutputS3Uri': 's3://sagemaker-us-east-1-657781576110/a2i-results-1698229936/fd-1698229936/2023/10/25/10/41/34/1698230493-857691/output.json'}

HumanLoop Name: 1698230494-3784657
HumanLoop Status: InProgress
HumanLoop Output Destination: {'OutputS3Uri': 's3://sagemaker-us-east-1-657781576110/a2i-results-1698229936/fd-1698229936/2023/10/25/10/41/34/1698230494-3784657/output.json'}

HumanLoop Name: 1698230494-8446784
HumanLoop Status: InProgress
HumanLoop Output Destination: {'OutputS3Uri': 's3://sagemaker-us-east-1-657781576110/a2i-results-1698229936/fd-1698229936/2023/10/25/10/41/34/1698230494-8446784/output.json'}

4.4. Complete the human loop tasks

Pull labeling UI from the workteam information to get into the human loop tasks in the AWS console.

labeling_ui = sm.describe_workteam(WorkteamName=workteam_name)["Workteam"]["SubDomain"]
print(labeling_ui)
9nj1o7kppv.labeling.us-east-1.sagemaker.aws

Navigate to the link below and login with the defined username and password. Complete the human loop following the provided instructions.

from IPython.core.display import display, HTML

display(HTML('Click <a target="blank" href="https://{}"><b>here</b></a> to start labeling with username <b>{}</b> and temporary password <b>{}</b>'.format(labeling_ui, user_name, temporary_password)))

Click here to start labeling with username user-1698229936 and temporary password Password@420

Wait for workers to complete ^^ their human loop tasks ^^

4.5. Verify that the human loops were completed by the workforce

Note: This cell will not complete until you label the data following the instructions above.

import time

completed_human_loops = []
for human_loop_name in human_loops_started:
    resp = a2i.describe_human_loop(HumanLoopName=human_loop_name)
    print(f"HumanLoop Name: {human_loop_name}")
    print(f'HumanLoop Status: {resp["HumanLoopStatus"]}')
    print(f'HumanLoop Output Destination: {resp["HumanLoopOutput"]}')
    print("")
    while resp["HumanLoopStatus"] != "Completed":
        print(f"Waiting for HumanLoop to complete.")
        time.sleep(10)
        resp = a2i.describe_human_loop(HumanLoopName=human_loop_name)
    if resp["HumanLoopStatus"] == "Completed":
        completed_human_loops.append(resp)
        print(f"Completed!")
        print("")
HumanLoop Name: 1698230493-857691
HumanLoop Status: InProgress
HumanLoop Output Destination: {'OutputS3Uri': 's3://sagemaker-us-east-1-657781576110/a2i-results-1698229936/fd-1698229936/2023/10/25/10/41/34/1698230493-857691/output.json'}

Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Completed!

HumanLoop Name: 1698230494-3784657
HumanLoop Status: Completed
HumanLoop Output Destination: {'OutputS3Uri': 's3://sagemaker-us-east-1-657781576110/a2i-results-1698229936/fd-1698229936/2023/10/25/10/41/34/1698230494-3784657/output.json'}

Completed!

HumanLoop Name: 1698230494-8446784
HumanLoop Status: Completed
HumanLoop Output Destination: {'OutputS3Uri': 's3://sagemaker-us-east-1-657781576110/a2i-results-1698229936/fd-1698229936/2023/10/25/10/41/34/1698230494-8446784/output.json'}

Completed!

Note: This cell ^^ above ^^ will not complete until you label the data following the instructions above.

4.6. View human labels and prepare the data for re-training

Once the work is complete, Amazon A2I stores the results in the specified S3 bucket and sends a Cloudwatch Event. Let’s check the S3 contents.

import re
from pprint import pprint

fixed_items = []

for resp in completed_human_loops:
    split_string = re.split("s3://" + bucket + "/", resp["HumanLoopOutput"]["OutputS3Uri"])
    output_bucket_key = split_string[1]

    response = s3.get_object(Bucket=bucket, Key=output_bucket_key)
    content = response["Body"].read().decode("utf-8")
    json_output = json.loads(content)
    pprint(json_output)

    input_content = json_output["inputContent"]
    human_answer = json_output["humanAnswers"][0]["answerContent"]
    fixed_item = {"input_content": input_content, "human_answer": human_answer}
    fixed_items.append(fixed_item)
{'flowDefinitionArn': 'arn:aws:sagemaker:us-east-1:657781576110:flow-definition/fd-1698229936',
 'humanAnswers': [{'acceptanceTime': '2023-10-25T10:43:47.979Z',
                   'answerContent': {'sentiment': {'label': '-1'}},
                   'submissionTime': '2023-10-25T10:43:50.706Z',
                   'timeSpentInSeconds': 2.727,
                   'workerId': 'e696ecc324cbfd32',
                   'workerMetadata': {'identityData': {'identityProviderType': 'Cognito',
                                                       'issuer': 'https://cognito-idp.us-east-1.amazonaws.com/us-east-1_fTkk2p8PL',
                                                       'sub': '0dcd26c4-77c4-4bee-850a-2dcdb1bb5a8f'}}}],
 'humanLoopName': '1698230493-857691',
 'inputContent': {'initialValue': -1,
                  'taskObject': 'I am unhappy with this product'}}
{'flowDefinitionArn': 'arn:aws:sagemaker:us-east-1:657781576110:flow-definition/fd-1698229936',
 'humanAnswers': [{'acceptanceTime': '2023-10-25T10:43:45.001Z',
                   'answerContent': {'sentiment': {'label': '1'}},
                   'submissionTime': '2023-10-25T10:43:47.895Z',
                   'timeSpentInSeconds': 2.894,
                   'workerId': 'e696ecc324cbfd32',
                   'workerMetadata': {'identityData': {'identityProviderType': 'Cognito',
                                                       'issuer': 'https://cognito-idp.us-east-1.amazonaws.com/us-east-1_fTkk2p8PL',
                                                       'sub': '0dcd26c4-77c4-4bee-850a-2dcdb1bb5a8f'}}}],
 'humanLoopName': '1698230494-3784657',
 'inputContent': {'initialValue': 1, 'taskObject': 'It is okay'}}
{'flowDefinitionArn': 'arn:aws:sagemaker:us-east-1:657781576110:flow-definition/fd-1698229936',
 'humanAnswers': [{'acceptanceTime': '2023-10-25T10:43:34.898Z',
                   'answerContent': {'sentiment': {'label': '0'}},
                   'submissionTime': '2023-10-25T10:43:44.907Z',
                   'timeSpentInSeconds': 10.009,
                   'workerId': 'e696ecc324cbfd32',
                   'workerMetadata': {'identityData': {'identityProviderType': 'Cognito',
                                                       'issuer': 'https://cognito-idp.us-east-1.amazonaws.com/us-east-1_fTkk2p8PL',
                                                       'sub': '0dcd26c4-77c4-4bee-850a-2dcdb1bb5a8f'}}}],
 'humanLoopName': '1698230494-8446784',
 'inputContent': {'initialValue': 1, 'taskObject': 'sometimes it works'}}

Now you can prepare the data for re-training.

df_fixed_items = pd.DataFrame(fixed_items)  
df_fixed_items.head()
<style scoped> .dataframe tbody tr th:only-of-type { vertical-align: middle; }
.dataframe tbody tr th {
    vertical-align: top;
}

.dataframe thead th {
    text-align: right;
}
input_content human_answer
0 {'initialValue': -1, 'taskObject': 'I am unhap... {'sentiment': {'label': '-1'}}
1 {'initialValue': 1, 'taskObject': 'It is okay'} {'sentiment': {'label': '1'}}
2 {'initialValue': 1, 'taskObject': 'sometimes i... {'sentiment': {'label': '0'}}

Upload the notebook into S3 bucket for grading purposes.

Note: you may need to click on “Save” button before the upload.

!aws s3 cp ./C3_W3_Assignment.ipynb s3://$bucket/C3_W3_Assignment_Learner.ipynb
upload: ./C3_W3_Assignment.ipynb to s3://sagemaker-us-east-1-657781576110/C3_W3_Assignment_Learner.ipynb