AWS Database Blog

Integrate your Amazon DynamoDB table with machine learning for sentiment analysis

Amazon DynamoDB is a non-relational database that delivers reliable performance at any scale. It’s a fully managed, multi-Region, multi-active database that provides consistent single-digit millisecond latency and offers built-in security, backup and restore, and in-memory caching. DynamoDB offers a serverless and event-driven architecture, which enables you to use other AWS services to extend DynamoDB capability. DynamoDB provides this capability using Kinesis Data Streams for DynamoDB and DynamoDB Streams with AWS Lambda. When you enable DynamoDB Streams on a DynamoDB table, it captures a time-ordered sequence of item-level modifications in the table and stores this information in a change log for up to 24 hours. Downstream AWS services can access these change logs and view the data items as they appeared before and after they were modified, in near-real time, using a Lambda function. This allows the DynamoDB table to integrate functionally for additional use cases like machine learning (ML), ad hoc queries, full text search, event alerting, and more, such as the following:

  • Processing DynamoDB data with Apache Hive on Amazon EMR for data warehousing use cases
  • Configuring AWS credentials using Amazon Cognito to authenticate access to DynamoDB tables for mobile application
  • Using Amazon API Gateway and Lambda with DynamoDB for a front-end serverless architecture

In this post, I show you how to integrate ML with Amazon DynamoDB using Amazon Comprehend to analyze sentiments on incoming product reviews.

Serverless and event-driven architecture

When you perform create, read, update, and delete (CRUD) operations on a DynamoDB table, DynamoDB Streams keep a 24-hour change log of all CRUD operations. The stream offers you four options of log attributes. For more information, see Enabling a Stream.

Lambda functions process each incoming log event, extract the necessary information required for downstream services, and invoke said services, such as in the following situations:

  • A function can loop back to a DynamoDB table to create or update the aggregate item, like a summary item or average of statistics in real time.
  • You can use a function to send specific attributes to Amazon OpenSearch Service for full text search use cases.
  • For historical analysis or ad hoc queries, the function sends the updates to Amazon Simple Storage Service (Amazon S3) in optimized format through Amazon Kinesis Data Firehose, where you can run ad hoc queries or analytics using Amazon Athena
  • You can use the recently announced option to export DynamoDB table data to Amazon S3 for the initial setup of data for analysis; however, you can’t use it for continuous read requirements. Additionally, using Kinesis Data Firehose for data export offers optimized Apache Parquet
  • If you want to be updated on a specific event on the DynamoDB table like the deletion of a critical record, Lambda can notify you through Amazon Simple Notification Service (Amazon SNS).

The following diagram illustrates this event-driven architecture.

DynamoDB event-driven architecture using DynamoDB Streams and Lambda to send events to other AWS services.

Using ML and analytics with the DynamoDB solution

You can configure Amazon review analysis and analytics by running AWS CloudFormation on your own account. This solution uses a serverless event-driven architecture with automated steps. The solution checks the sentiment on the incoming Amazon product review, creates a product review summary based on the sentiment, and keeps updates in optimized format for future ad hoc queries and analytics. The following diagram illustrates this architecture.

Diagram shows a Lambda function consuming the DynamoDB streams and interacting with Amazon Comprehend and with Kinesis Firehose.

For this post, the solution uses the incoming Amazon product reviews by putting a review object in the S3 bucket in the form of JSON records. The S3 bucket publishes the s3:ObjectCreated:put event to Lambda by invoking the Lambda function, as specified in the bucket notification configuration. Lambda is meant for quick processing, so the solution limits the size of the product review object on Amazon S3 to smaller than 5 KB.

If you want to run this solution in your account, follow these steps:

  1. Download the following artifacts:
  1. Deploy the CloudFormation Part 1 stack. Check the CloudFormation documentation for instructions.
  2. Once Part 1 stack deployment is complete, deploy the Part 2 stack.
  3. Once Part 2 stack deployment is complete, upload the sample gz file to the S3 bucket (<stackname>-amazonreviewsbucket-*) with Server-side encryption option. Check Amazon S3 documentation for instructions.

The CloudFormation Part 1 stack creates the following resources:

  • A Glue database and table for cataloging,
  • Two Lambda functions and associated permissions,
  • Required roles and policies,
  • CloudWatch LogStream and LogGroup,
  • Kinesis Firehose for sending streams data to S3 for analysis,
  • Two S3 buckets for incoming reviews and parquet output for analysis,
  • DynamoDB table and associated Streams.

The CloudFormation Part 2 stack imports the resources created and setup a managed policy. You can run the stacks on all regions where these services are supported. I tested it on US regions, North Virginia, Oregon and Ohio. Please note running this solution in your account will incur costs.

The Lambda function amazon-review-processing runs and assumes the AWS Identity and Access Management (IAM) role created by AWS CloudFormation. The function reads the Amazon S3 events it receives as a parameter, determines where the review object is, reads the review object, and processes the records in the review object. The function breaks the incoming record into multiple records and DynamoDB items: a review item and product item (if it doesn’t already exist).  This allows you to retrieve an individual item based on the partition key and sort key. If you want all the product items, you can query items based on just the product partition key.

The following screenshot shows product summary item.

Screenshot of a DynamoDB item with attributes pk, sk, positive sentiment count, and mixed sentiment count

The following screenshot shows review items associated with the product.

screenshot of a DynamoDB table with many items.

The following Lambda code add the review data to DynamoDB table when review file is uploaded to S3 bucket.

import io
import os
import gzip
import json
import boto3

def get_records(session, bucket, key):
    """
    Generator for the bucket and key names of each CloudTrail log
    file contained in the event sent to this function from S3.
    (usually only one but this ensures we process them all).
    :param event: S3:ObjectCreated:Put notification event
    :return: yields bucket and key names
    """
    s3 = session.client('s3')
    response = s3.get_object(Bucket=bucket, Key=key)

    with io.BytesIO(response['Body'].read()) as obj:
        with gzip.GzipFile(fileobj=obj) as logfile:
            records = json.load(logfile)
            return records

def handler(event, context):
    """
    Checks for API calls with RunInstances, TerminateInstances, and DeleteDBInstance in CloudTrail.
    if found, send specific records to SQS for processing

    :return: 200, success if records process successfully
    """
    session = boto3.session.Session()
    dynamodb = boto3.resource("dynamodb", region_name='us-east-1')
    table = dynamodb.Table('AmazonReviews')

    # Get the S3 bucket and key for each log file contained in the event
    for event_record in event['Records']:
        try:
            bucket = event_record['s3']['bucket']['name']
            key = event_record['s3']['object']['key']
            print('Loading Amazon Reviews file s3://{}/{}'.format(bucket, key))
            records = get_records(session, bucket, key)
            print('Number of records in log file: {}'.format(len(records)))

            for record in records:
                response = table.get_item(Key={'pk': record['product_id'], 'sk': '2099-12-31#PRODUCTSUMMARY'})
                if 'Items' not in response:
                    table.put_item(
                        Item={
                            'pk': record['product_id'],
                            'sk': '2099-12-31#PRODUCTSUMMARY',
                            'marketplace': record['marketplace'],
                            'product_parent': record['product_parent'],
                            'product_title': record['product_title'],
                            'product_category': record['product_category'],
                        }
                    )
                table.put_item(
                    Item={
                        'pk': record['product_id'],
                        'sk': record['review_date'] + '#' + record['review_id'],
                        'customer_id': record['customer_id'],
                        'star_rating': record['star_rating'],
                        'helpful_votes': record['helpful_votes'],
                        'total_votes': record['total_votes'],
                        'vine': record['vine'],
                        'verified_purchase': record['verified_purchase'],
                        'review_headline': record['review_headline'],
                        'review_body': record['review_body']
                    }
                )
        except Exception as e:
            print (e)
            return {'Exception status': e}
        else:
            print("records processed successfully!!")

    return {
        'statusCode': 200,
        'body': json.dumps('records inserted successfully to DynamoDB!!')
    }

After the review records are added to the DynamoDB table, the items enter the DynamoDB stream in real time with new and old images.

The amazon_reviews_summary Lambda function captures the stream records from the stream and processes them one by one. This Lambda function has multiple responsibilities:

  • Capture review text from the stream record and call Amazon Comprehend for sentimental analysis. Amazon Comprehend limits review strings to fewer than 5,000 characters, so the code truncates review text to 4,999 characters before calling the Amazon Comprehend sentiment API.
  • Add the sentiment response for the review record and create a product review summary record with sentiment counts.
  • Flatten the DynamoDB streaming JSON logs and add the record to the Kinesis Data Firehose delivery stream.
  • Invoke the Firehose delivery stream put_record API for putting updates in the S3 bucket.

The following Lambda code processes the incoming DynamoDB Streams for sentiment analysis and save it to S3 for analytics.

import json
import boto3
import os

def convert_file(f):
    out = {}
    def convert(element, name=''):
        if type(element) is dict:
            for sub in element:
                convert(element[sub], name + sub + '_')
        elif type(element) is list:
            ctr = 0
            for sub in element:
                convert(sub, name + str(ctr) + '_')
                ctr += 1
        else:
            out[name[:-1]] = element
    convert(f)
    return out

def handler(event, context):
    cmphd = boto3.client(service_name='comprehend', region_name='us-east-1')
    fh = boto3.client('firehose')
    ddb = boto3.resource('dynamodb', region_name='us-east-1')
    dt=ddb.Table('AmazonReviews')
    FIREHOSE_URL = os.environ['FIREHOSE_URL']
    for rec in event['Records']:
        if (rec['eventName'] == 'INSERT' and ('review_body' in rec['dynamodb']['NewImage'])):
            convt=convert_file(rec)
            response = fh.put_record(
                DeliveryStreamName=FIREHOSE_URL,
                Record={'Data': json.dumps(convt)}
            )
            review_body=rec['dynamodb']['NewImage']['review_body']['S']
            review_body=review_body[:4999]
            pk=rec['dynamodb']['Keys']['pk']['S']
            sk=rec['dynamodb']['Keys']['sk']['S']
            res=cmphd.detect_sentiment(Text=review_body, LanguageCode='en')
            st=res['Sentiment']
            try:
                d_response = dt.put_item(
                  Item={'pk': pk, 'sk': sk + '#' + 'SENTIMENT', 'sentiment': st}
                )
                if st == "POSITIVE":
                    d_s_response = dt.update_item(
                        Key={'pk': pk,'sk': '2099-12-31#REVIEWSUMMARY'},
                        UpdateExpression="set positive_sentiment_count= if_not_exists(positive_sentiment_count, :start) + :inc",ExpressionAttributeValues={':inc': 1,':start': 0},ReturnValues="UPDATED_NEW"
                    )
                elif st == "NEGATIVE":
                    d_s_response = dt.update_item(
                        Key={'pk': pk,'sk': '2099-12-31#REVIEWSUMMARY'},
                        UpdateExpression="set negative_sentiment_count= if_not_exists(negative_sentiment_count, :start) + :inc",ExpressionAttributeValues={':inc': 1,':start': 0},ReturnValues="UPDATED_NEW"
                    )
                elif st == "NEUTRAL":
                    d_s_response = dt.update_item(
                        Key={'pk': pk,'sk': '2099-12-31#REVIEWSUMMARY'},
                        UpdateExpression="set neutral_sentiment_count= if_not_exists(neutral_sentiment_count, :start) + :inc",ExpressionAttributeValues={':inc': 1,':start': 0},ReturnValues="UPDATED_NEW"
                    )
                elif st == "MIXED":
                    d_s_response = dt.update_item(
                        Key={'pk': pk,'sk': '2099-12-31#REVIEWSUMMARY'},
                        UpdateExpression="set mixed_sentiment_count= if_not_exists(mixed_sentiment_count, :start) + :inc",ExpressionAttributeValues={':inc': 1,':start': 0},ReturnValues="UPDATED_NEW"
                    )
                else:
                    print("No sentiment value: " + st)
            except Exception as e:
                return {'Exception status': e}
            else:
                print("record processed successfully")

The amazon_reviews_summary Lambda function calls the Amazon Comprehend detect_sentiment API with the review text, and Amazon Comprehend returns one of the four sentiments: positive, negative, neutral, or mixed. If you want more granular sentiments, you can replace Amazon Comprehend with your own ML model.

The amazon_reviews_summary Lambda function calls the DynamoDB table to update the sentiment response on the review item and create product review summary items (or update them if they already exist).

The following screenshot shows sentiment item associated with review.

screenshot of a Dynamodb table with a "sentiment" attribute

The following screenshot shows review summary item.

screenshot of a dynamodb item including the attributes positive sentiment count, mixed sentiment, count, and negative sentiment count

The amazon_reviews_summary Lambda function calls the Firehose delivery stream to flatten the DynamoDB streaming JSON payload.

The delivery stream converts the record format to Parquet and adds these records to the S3 bucket.

Kinesis Firehose is configured with Record format conversion enabled and Output format as Apache Parquet.

You can now catalog the S3 bucket on Athena and run ad hoc queries. See the following query and result:

SELECT * FROM "amazon_review"."amazon_reviews" 
where dynamodb_keys_pk_s ='B00004U2JW'

eventid,eventname,eventversion,eventsource,awsregion,dynamodb_approximatecreationdatetime,dynamodb_keys_sk_s,dynamodb_keys_pk_s,dynamodb_newimage_total_votes_n,dynamodb_newimage_star_rating_n,dynamodb_newimage_sk_s,dynamodb_newimage_verified_purchase_s,dynamodb_newimage_pk_s,dynamodb_newimage_customer_id_n,dynamodb_newimage_helpful_votes_n,dynamodb_newimage_vine_s,dynamodb_newimage_review_body_s,dynamodb_newimage_review_headline_s,dynamodb_sequencenumber,dynamodb_sizebytes,dynamodb_streamviewtype,eventsourcearn

35f3c7ce3083aa72eb6b39cd56f3681d,INSERT,1.1,aws:dynamodb,us-east-1,1598908834,2000-09-07#R3MA96SRZSSBER,B00004U2JW,27,4,2000-09-07#R3MA96SRZSSBER,N,B00004U2JW,48308880,5,N,"cool camera with functions whitch i do need, but I cant buy her /and  others from my wish list/ because I am not living in U.S. Amazon hurry, she  is one of my favourite...",cool but to long way to get her to me,8.53782E+25,393,NEW_AND_OLD_IMAGES,arn:aws:dynamodb:us-east-1:080931190378:table/amazon-reviews-v1/stream/2020-08-14T06:28:33.219

Cleaning up

To avoid incurring future costs, delete the resources you created with the CloudFormation stack:

  1. Delete the contents of the review bucket and Parquet analytics bucket.
  2. Delete the Part 2 CloudFormation stack first and then delete the Part 1 CloudFormation stack.

Conclusion

In this post, I showed how you can integrate your DynamoDB table with Amazon Comprehend for sentiment analysis and Athena for ad hoc queries across historical activities.

You can use your own ML model instead of Amazon Comprehend if you want more granular sentiment analysis. You can also use other AWS services to integrate with DynamoDB by using DynamoDB Streams to extend DynamoDB features.

I welcome your feedback, suggestions or questions on this post, please leave me comments below.


About the author

Utsav Joshi pictureUtsav Joshi is a Technical Account Manager at AWS. He lives in New Jersey and enjoys working with AWS customers to solve architectural, operational, and cost optimization challenges. In his spare time, he enjoys traveling, road trips, and playing with his kids.