DynamoDB streams enable you to track and respond to table item changes. Employ this functionality to create an application which responds to changes by updating information across sources. Synchronize data for thousands of users of a large, multi-user system. Use it to send notifications to users on updates. Its applications prove diverse and substantial. DynamoDB streams serve as the main tool used to achieve this functionality.
The streams capture time-ordered sequences containing item modifications within a table. They hold this data for a maximum of 24 hours. Applications use them to view the original and modified items, almost in real-time.
Streams enabled on a table capture all modifications. On any CRUD operation, DynamoDB creates a stream record with the primary key attributes of the modified items. You can configure streams for additional information such as before and after images.
The Streams carry two guarantees −
Each record appears one time in the stream and
Each item modification results in the stream records of the same order as that of the modifications.
All streams process in real-time to allow you to employ them for related functionality in applications.
On table creation, you can enable a stream. Existing tables allow stream disabling or settings changes. Streams offer the feature of asynchronous operation, which means no table performance impact.
Utilize the AWS Management console for simple stream management. First, navigate to the console, and choose Tables. In the Overview tab, choose Manage Stream. Inside the window, select the information added to a stream on table data modifications. After entering all settings, select Enable.
If you want to disable any existing streams, select Manage Stream, and then Disable.
You can also utilize the APIs CreateTable and UpdateTable to enable or change a stream. Use the parameter StreamSpecification to configure the stream. StreamEnabled specifies status, meaning true for enabled and false for disabled.
StreamViewType specifies information added to the stream: KEYS_ONLY, NEW_IMAGE, OLD_IMAGE, and NEW_AND_OLD_IMAGES.
Read and process streams by connecting to an endpoint and making API requests. Each stream consists of stream records, and every record exists as a single modification which owns the stream. Stream records include a sequence number revealing publishing order. Records belong to groups also known as shards. Shards function as containers for several records, and also hold information needed for accessing and traversing records. After 24 hours, records automatically delete.
These Shards are generated and deleted as needed, and do not last long. They also divide into multiple new shards automatically, typically in response to write activity spikes. On stream disabling, open shards close. The hierarchical relationship between shards means applications must prioritize the parent shards for correct processing order. You can use Kinesis Adapter to automatically do this.
Note − The operations resulting in no change do not write stream records.
Accessing and processing records requires performing the following tasks −
Note − There should be a maximum of 2 processes reading a shard at once. If it exceeds 2 processes, then it can throttle the source.
The stream API actions available include
You can review the following example of the stream reading −
import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import com.amazonaws.auth.profile.ProfileCredentialsProvider; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient; import com.amazonaws.services.dynamodbv2.model.AttributeAction; import com.amazonaws.services.dynamodbv2.model.AttributeDefinition; import com.amazonaws.services.dynamodbv2.model.AttributeValue; import com.amazonaws.services.dynamodbv2.model.AttributeValueUpdate; import com.amazonaws.services.dynamodbv2.model.CreateTableRequest; import com.amazonaws.services.dynamodbv2.model.DescribeStreamRequest; import com.amazonaws.services.dynamodbv2.model.DescribeStreamResult; import com.amazonaws.services.dynamodbv2.model.DescribeTableResult; import com.amazonaws.services.dynamodbv2.model.GetRecordsRequest; import com.amazonaws.services.dynamodbv2.model.GetRecordsResult; import com.amazonaws.services.dynamodbv2.model.GetShardIteratorRequest; import com.amazonaws.services.dynamodbv2.model.GetShardIteratorResult; import com.amazonaws.services.dynamodbv2.model.KeySchemaElement; import com.amazonaws.services.dynamodbv2.model.KeyType; import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput; import com.amazonaws.services.dynamodbv2.model.Record; import com.amazonaws.services.dynamodbv2.model.Shard; import com.amazonaws.services.dynamodbv2.model.ShardIteratorType; import com.amazonaws.services.dynamodbv2.model.StreamSpecification; import com.amazonaws.services.dynamodbv2.model.StreamViewType; import com.amazonaws.services.dynamodbv2.util.Tables; public class StreamsExample { private static AmazonDynamoDBClient dynamoDBClient = new AmazonDynamoDBClient(new ProfileCredentialsProvider()); private static AmazonDynamoDBStreamsClient streamsClient = new AmazonDynamoDBStreamsClient(new ProfileCredentialsProvider()); public static void main(String args[]) { dynamoDBClient.setEndpoint("InsertDbEndpointHere"); streamsClient.setEndpoint("InsertStreamEndpointHere"); // table creation String tableName = "MyTestingTable"; ArrayList<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>(); attributeDefinitions.add(new AttributeDefinition() .withAttributeName("ID") .withAttributeType("N")); ArrayList<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>(); keySchema.add(new KeySchemaElement() .withAttributeName("ID") .withKeyType(KeyType.HASH)); //Partition key StreamSpecification streamSpecification = new StreamSpecification(); streamSpecification.setStreamEnabled(true); streamSpecification.setStreamViewType(StreamViewType.NEW_AND_OLD_IMAGES); CreateTableRequest createTableRequest = new CreateTableRequest() .withTableName(tableName) .withKeySchema(keySchema) .withAttributeDefinitions(attributeDefinitions) .withProvisionedThroughput(new ProvisionedThroughput() .withReadCapacityUnits(1L) .withWriteCapacityUnits(1L)) .withStreamSpecification(streamSpecification); System.out.println("Executing CreateTable for " + tableName); dynamoDBClient.createTable(createTableRequest); System.out.println("Creating " + tableName); try { Tables.awaitTableToBecomeActive(dynamoDBClient, tableName); } catch (InterruptedException e) { e.printStackTrace(); } // Get the table's stream settings DescribeTableResult describeTableResult = dynamoDBClient.describeTable(tableName); String myStreamArn = describeTableResult.getTable().getLatestStreamArn(); StreamSpecification myStreamSpec = describeTableResult.getTable().getStreamSpecification(); System.out.println("Current stream ARN for " + tableName + ": "+ myStreamArn); System.out.println("Stream enabled: "+ myStreamSpec.getStreamEnabled()); System.out.println("Update view type: "+ myStreamSpec.getStreamViewType()); // Add an item int numChanges = 0; System.out.println("Making some changes to table data"); Map<String, AttributeValue> item = new HashMap<String, AttributeValue>(); item.put("ID", new AttributeValue().withN("222")); item.put("Alert", new AttributeValue().withS("item!")); dynamoDBClient.putItem(tableName, item); numChanges++; // Update the item Map<String, AttributeValue> key = new HashMap<String, AttributeValue>(); key.put("ID", new AttributeValue().withN("222")); Map<String, AttributeValueUpdate> attributeUpdates = new HashMap<String, AttributeValueUpdate>(); attributeUpdates.put("Alert", new AttributeValueUpdate() .withAction(AttributeAction.PUT) .withValue(new AttributeValue().withS("modified item"))); dynamoDBClient.updateItem(tableName, key, attributeUpdates); numChanges++; // Delete the item dynamoDBClient.deleteItem(tableName, key); numChanges++; // Get stream shards DescribeStreamResult describeStreamResult = streamsClient.describeStream(new DescribeStreamRequest() .withStreamArn(myStreamArn)); String streamArn = describeStreamResult.getStreamDescription().getStreamArn(); List<Shard> shards = describeStreamResult.getStreamDescription().getShards(); // Process shards for (Shard shard : shards) { String shardId = shard.getShardId(); System.out.println("Processing " + shardId + " in "+ streamArn); // Get shard iterator GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest() .withStreamArn(myStreamArn) .withShardId(shardId) .withShardIteratorType(ShardIteratorType.TRIM_HORIZON); GetShardIteratorResult getShardIteratorResult = streamsClient.getShardIterator(getShardIteratorRequest); String nextItr = getShardIteratorResult.getShardIterator(); while (nextItr != null && numChanges > 0) { // Read data records with iterator GetRecordsResult getRecordsResult = streamsClient.getRecords(new GetRecordsRequest(). withShardIterator(nextItr)); List<Record> records = getRecordsResult.getRecords(); System.out.println("Pulling records..."); for (Record record : records) { System.out.println(record); numChanges--; } nextItr = getRecordsResult.getNextShardIterator(); } } } }