Adding CDK Constructs to the AWS Analytics Reference Architecture

In 2021, we released the AWS Analytics Reference Architecture, a new AWS Cloud Development Kit (AWS CDK) application end-to-end example, as open source (docs are CC-BY-SA 4.0 International, sample code is MIT-0). It shows how our customers can use the available AWS products and features to implement well-architected analytics solutions. It also regroups AWS best practices for designing, implementing and operating analytics solutions through different purpose-built patterns. Altogether, the AWS Analytics Reference Architecture answers common requirements and solves customer challenges.

In 2022, we extended the scope of this project with AWS CDK constructs to provide more granular and reusable examples. This project is now composed of:

Reusable core components exposed in an AWS CDK library currently available in Typescript and Python. This library contains the AWS CDK constructs that can be used to quickly provision prepackaged analytics solutions.
Reference architectures consuming the reusable components in AWS CDK applications, and demonstrating end-to-end examples in a business context. Currently, only the AWS native reference architecture is available but others will follow.

In this blog post, we will first show how to consume the core library to quickly provision analytics solutions using CDK Constructs and experiment with AWS analytics products.

Building solutions with the Core Library

To illustrate how to use the core components,  let’s see how we can quickly build a Data Lake, a central piece for most analytics projects. The storage layer is implemented with the DataLakeStorage CDK construct relying on Amazon Simple Storage Service (Amazon S3), a durable, scalable and cost-effective object storage service. The query layer is implemented with the AthenaDemoSetup construct using Amazon Athena, an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. With regard to the data catalog, it‘s implemented with the DataLakeCatalog construct using AWS Glue Data Catalog.

Before getting started, please make sure to follow the instructions available here for setting up the prerequisites:

Install the necessary build dependencies
Bootstrap the AWS account
Initialize the CDK application.

This architecture diagram depicts the data lake building blocks we are going to deploy using the AWS Analytics Reference Architecture library. These are higher level constructs (commonly called L3 constructs) as they integrate several AWS services together in patterns.

To assemble these components, you can add this code snippet in your app.py file:

import aws_analytics_reference_architecture as ara

# Create a new DataLakeStorage with Raw, Clean and Transform buckets
storage = ara.DataLakeStorage(scope=self, id=”storage”)

# Create a new DataLakeCatalog with Raw, Clean and Transform databases
catalog = ara.DataLakeCatalog(scope=self, id=”catalog”)

# Configure a new Athena Workgroup
athena_defaults = ara.AthenaDemoSetup(scope=self, id=”demo_setup”)

# Generate data from Customer TPC dataset
data_generator = ara.BatchReplayer(
scope=self,
id=”customer-data”,
dataset=ara.PreparedDataset.RETAIL_1_GB_CUSTOMER,
sink_object_key=”customer”,
sink_bucket=storage.raw_bucket,
)

# Role with default permissions for any Glue service
glue_role = ara.GlueDemoRole.get_or_create(self)

# Crawler to create tables automatically
crawler = glue.CfnCrawler(self, id=’ara-crawler’, name=’ara-crawler’,
role=glue_role.iam_role.role_arn, database_name=’raw’,
targets={‘s3Targets’: [{“path”: f”s3://{storage.raw_bucket.bucket_name}/{data_generator.sink_object_key}/”}],}
)

# Trigger to kick off the crawler
cfn_trigger = glue.CfnTrigger(self, id=”MyCfnTrigger”,
actions=[{‘crawlerName’: crawler.name}],
type=”SCHEDULED”, description=”ara_crawler_trigger”,
name=”min_based_trigger”, schedule=”cron(0/5 * * * ? *)”, start_on_creation=True,
)

In addition to this library construct, the example also includes lower level constructs (commonly called L1 constructs) from the AWS CDK standard library. This shows that you can combine constructs from any CDK library interchangeably.

For use cases where customers have a need to adjust the default configurations in order to align with their organization specific requirements (e.g. data retention rules), the constructs can be changed through the class parameters as shown in this example:

storage = ara.DataLakeStorage(scope=self, id=”storage”, raw_archive_delay=180, clean_archive_delay=1095)

Finally, you can deploy the solution using the AWS CDK CLI from the root of the application with this command: cdk deploy. Once you deploy the solution, AWS CDK provisions the AWS resources included in the Constructs and you can log into your AWS account.

Go to the Athena console and start querying the data. The AthenaDemoSetup provides an Athena workgroup called “demo” that you can select to start querying the BatchReplayer data very quickly. Data is stored in the DataLakeStorage and registered in the DataLakeCatalog. Here is an example of an Athena query accessing the customer data from the BatchReplayer:

Accelerate the implementation

Earlier in the post we pointed out that the library simplifies and accelerates the development process. First, writing Python code is more appealing than writing CloudFormation markup code, either in json or yaml. Second, the CloudFormation template generated by the AWS CDK for the data lake example is 16 times more verbose than Python scripts.

❯ cdk synth | wc -w
2483

❯ wc -w ara_demo/ara_demo_stack.py
154

Demonstrating end-to-end examples with reference architectures

The AWS native reference architecture is the first reference architecture available. It explains the journey of a fake company, MyStore Inc., as it implements its data platform solution with AWS products and services . Deploying the AWS native reference architecture demonstrates a fully working example of a data platform from data ingestion to business analysis. AWS customers can learn from it, see analytics solutions in action, and play with retail dataset and business analysis.

More reference architectures will will be added to this project in Github later.

Business Story

The AWS native reference architecture is faking a retail company called MyStore Inc. that is building a new analytics platform on top of AWS products. This example shows how retail data can be ingested, processed, and analyzed in streaming and batch processes to provide business insights like sales analysis. The platform is built on top of the CDK Constructs from the core library to minimize development effort and inherit from AWS best practices.

Here is the architecture deployed by the AWS native reference architecture:

The platform is implemented in purpose-built modules. They are decoupled and can be independently provisioned but still integrate with each other. The global platformMyStore’s analytics platform has been able to deploy the following modules thanks to:

Data Lake foundations: This mandatory module (based on DataLakeCatalog and DataLakeStorage core constructs) is the core of the analytics platform. It contains the data lake storage and associated metadata for both batch and streaming data. The data lake is organized in multiple Amazon S3 buckets representing different versions of the data. (a) The raw layer contains the data coming from the data sources in the raw format. (b) The cleaned layer contains the raw data that has been cleaned and parsed to a consumable schema. (c) And the curated layer contains refactored data based on business requirements.

Batch analytics: This module is in charge of ingesting and processing data from a Stores channel generated by the legacy systems in batch mode. Data is then exposed to other modules for downstream consumption. The data preparation process leverages various features of AWS Glue, a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning, and application development via the Apache Spark framework. The orchestration of the preparation is handled using AWS Glue Workflows that allows managing and monitoring executions of Extract, Transform, and Load (ETL) activities involving multiple crawlers, jobs, and triggers. The metadata management is implemented via AWS Glue Crawlers, a serverless process that crawls data sources and sinks to extract the metadata including schemas, statistics and partitions. It saves them in the AWS Glue Data Catalog.

Streaming analytics: This module is ingesting and processing real time data from the Web channel generated by cloud native systems. The solution minimizes data analysis latency but also to feed the data lake for downstream consumption.

Data Warehouse: This module is ingesting data from the data lake to support reporting, dashboarding and ad hoc querying capabilities. The module is using an Extract, Load, and Transform (ELT) process to transform the data from the Data Lake foundations module. Here are the steps that outline the data pipeline from the data lake into the data warehouse. 1. AWS Glue Workflow reads CSV files from the Raw layer of the data lake and writes them to the Clean layer as Parquet files. 2. Stored procedures in Amazon Redshift’s stg_mystore schema extract data from the Clean layer of the data lake using Amazon Redshift Spectrum. 3. The stored procedures then transform and load the data into a star schema model.

Data Visualization: This module is providing dashboarding capabilities to business users like data analysts on top of the Data Warehouse module, but also provides data exploration on top of the Data Lake module. It is implemented with Amazon Quicksight, a scalable, serverless, embeddable, and machine learning-powered business intelligence tool. Amazon QuickSight is connected to the data lake via Amazon Athena and the data lake via Amazon Redshift using direct query mode, in opposition to the caching mode with SPICE.

Project Materials

The AWS native reference architecture provides both code and documentation about MyStore’s analytics platform:

Documentation is available on GitHub and comes in two different parts:

The high level design describes the overall data platform implemented by MyStore, and the different components involved. This is the recommended entry point to discover the solution.
The analytics solutions provide fine-grained solutions to the challenges MyStore met during the project. These technical patterns can help you choose the right solution for common challenges in analytics.

The code is publicly available here and can be reused as an example for other analytics platform implementations. The code can be deployed in an AWS account by following the getting started guide.

Conclusion

In this blog post, we introduced new AWS CDK content available for customers and partners to easily implement AWS analytics solutions with the AWS Analytics Reference Architecture. The core library provides reusable building blocks with best practices to accelerate the development life cycle on AWS and the reference architecture demonstrates running examples with end-to-end integration in a business context.

Because of its reusable nature, this project will be the foundation for lots of additional content. We plan to extend the technical scope of it with Constructs and reference architectures for a data mesh. We’ll also expand the business scope with industry focused examples. In a future blog post, we will go deeper into the constructs related to Amazon EMR Studio and Amazon EMR on EKS to demonstrate how customers can easily bootstrap an efficient data platform based on Amazon EMR Spark and notebooks.

Flatlogic Admin Templates banner

Making it Easier to Build Connectors with Apache Flink: Introducing the Async Sink

Apache Flink is a popular open source framework for stateful computations over data streams. It allows you to formulate queries that are continuously evaluated in near real time against an incoming stream of events. To persist derived insights from these queries in downstream systems, Apache Flink comes with a rich connector ecosystem that supports a wide range of sources and destinations. However, the existing connectors may not always be enough to support all conceivable use cases. Our customers and the community kept asking for more connectors and better integrations with various open source tools and services.

But that’s not an easy problem to solve. Creating and maintaining production-ready sinks for a new destination is a lot of work. For critical use cases, it’s undesirable to lose messages or to compromise on performance when writing into a destination. However, sinks have commonly been developed and maintained independently of each other. This further adds to the complexity and cost of adding sinks to Apache Flink, as more functionality had to be independently reimplemented and optimized for each sink.

To better support our customers and the entire Apache Flink community, we set out to make it easier and less time consuming to build and maintain sinks. We contributed the Async Sink to the Flink 1.15 release, which improved cloud interoperability and added more sink connectors and formats, among other updates. The Async Sink is an abstraction for building sinks with at-least-once semantics. Instead of reimplementing the same core functionality for every new sink that is created, the Async Sink provides common sink functionality that can be extended upon. In the remainder of this post, we’ll explain how the Async Sink works, how you can build a new sink based on the Async Sink, and discuss our plans to continue our contributions to Apache Flink.

Abstracting away common components with the Async Sink

Although sinks have been commonly developed in isolation, their basic functionality is often similar. Sinks buffer multiple messages to send them in a single batch request to improve efficiency. They check completed requests for success and resend messages that were not persisted successfully at a later point. They participate in Flink’s checkpointing mechanism to avoid losing any messages in case the Flink application fails and needs to recover. Lastly, sinks monitor and control the throughput to the destination to not overload it and to fairly divide the capacity amongst multiple concurrent producers. There are usually only two main things that differ between destinations: the structure and information contained in both the destination requests and responses.

Instead of creating independent sinks and duplicating all of this common functionality for every sink, we can abstract away and implement these common requirements once. To implement a new sink, developers then only need to specify those aspects that are specific to the sink: how to build and send requests, and how to identify from the response which records were not persisted successfully and need to be resent. In this way, building a new sink just requires the creation of a lightweight shim that is specific to the destination.

Building a new sink with the Async Sink abstraction

Let’s look at what it takes to build a new sink based on the Async Sink abstraction. For this example, we’ll implement a simplified sink for Amazon Kinesis Data Streams. Kinesis Data Streams is a streaming data service to capture and store data streams. Data is persisted into a kinesis stream by means of the PutRecords API that can persist multiple records with a single batch request.

There are three main aspects that are specific to our sink that we need to implement. First, we need to specify how to extract the information required to make a batch request from the event. In Kinesis Data Streams, this includes the actual payload and a partition key. Second, we need to specify how to construct and make a batch request. And third, we need to inspect the response of the request to know whether all elements of the batch request have been persisted successfully.

Let’s start with extracting the required information from an event. We need to specify how to convert an event to a so-called request entry that forms a batch request. The following code example shows what this looks like for our Kinesis Data Streams sink. The code simply specifies how to extract the actual payload and a partition key from the event and return a PutRecordsRequestEntry object. In this simplified example, we use the string representation of the event as the payload and the hash code of the event as partition key. For a more sophisticated implementation, it may be more desirable to use a serializer that is configurable and provides more flexibility on how to construct the payload and partition key to end users of the sink.

@Override
public PutRecordsRequestEntry apply(InputT event, SinkWriter.Context context) {
return PutRecordsRequestEntry.builder()
.data(SdkBytes.fromUtf8String(event.toString()))
.partitionKey(String.valueOf(event.hashCode()))
.build();
}

The sink will buffer these objects until it has collected enough of them according to the buffering hints. These buffering hints include a limit on the number of messages, total size of messages, and a timeout condition.

Next, we need to specify how to construct and make the actual batch request. This is, again, specific to the destination we are writing to, and therefore something we need to implement as part of the submitRequestEntries method that you can see in the code example below. The Async Sink invokes this method with a set of buffered request entries that should form the batch request.

For the Kinesis Data Streams sink, we need to specify how to construct and run the PutRecords request from a set of PutRecordsRequestEntry objects (Lines 6-9 in the example below). In addition to making the batch request, we also need to check the response of the PutRecords request for entries that were not persisted successfully. These entries need to be requeued in the internal buffer so the Async Sink can retry them at a later point (Lines 11-31).

@Override
protected void submitRequestEntries(
List<PutRecordsRequestEntry> requestEntriesToSend,
Consumer<List<PutRecordsRequestEntry>> requestEntriesToRetry) {

//construct and run the PutRecords request
PutRecordsRequest batchRequest =
PutRecordsRequest.builder().records(requestEntriesToSend).streamName(streamName).build();

CompletableFuture<PutRecordsResponse> future = kinesisClient.putRecords(batchRequest);

//check the response of the PutRecords request
future.whenComplete(
(response, err) -> {
if (err != null) {
// entire batch request failed, all request entries need to be retried
requestEntriesToRetry.accept(requestEntriesToSend);
} else if (response.failedRecordCount() > 0) {
// some request entries in the batch request were not persisted and need to be retried
List<PutRecordsRequestEntry> failedRequestEntries = new ArrayList<>(response.failedRecordCount());
List<PutRecordsResultEntry> records = response.records();

for (int i = 0; i < records.size(); i++) {
if (records.get(i).errorCode() != null) {
failedRequestEntries.add(requestEntriesToSend.get(i));
}
}
requestEntriesToRetry.accept(failedRequestEntries);
} else {
// all request entries of the batch request have been successfully persisted
requestEntriesToRetry.accept(Collections.emptyList());
}
});
}

That’s basically it. These are the main components of the sink you need to implement for a basic Kinesis Data Streams sink. These are parts that are specific to the destination and cannot be abstracted away.

For each event the sink receives, it applies the conversion and buffers the result. Once the conditions of the buffering hints are met, the sink will then construct and send a batch request. The buffering hints also help to satisfy constraints of the destination. For instance, the PutRecords API supports up to 500 records with a total size of 5 MiB and the buffering hints help to enforce these limits. From the response of the request, the sink identifies which request entries were not persisted correctly and requeues them in the internal queue. In addition, the sink will automatically adapt the throughput to the limits of the destination and slow down the entire Flink application by applying back pressure in case the destination becomes overloaded.

However, we left out a couple of details for the sake of simplicity. Some additional boilerplate code is required to assemble these individual pieces into a complete sink. For a production-ready sink, we would also need to extract the message size to support size-based buffering hints, implement serialization for request entries to obtain exactly once semantics, and add support for Flink’s Python and Table API. In addition, adding tests to the implementation is highly encouraged to obtain a well-tested implementation.

We have just used Kinesis Data Streams as an example here to explain the basic components that are required to create a simplified sink. We have implemented a complete and production-ready Kinesis Data Streams sink in Flink 1.15. If you want to sink data into a Kinesis data stream or are interested in a complete example, you can find the sources in the official Apache Flink GitHub repository. If you are curious to see additional examples, you can refer to the Amazon Kinesis Data Firehose sink that is also part of Flink 1.15 or a sample implementation of an Amazon CloudWatch sink.

What’s next?

We’ve started the work on the Async Sink to make it easier to build integrations with AWS services. But we soon realized that our contributions could be generalized to be useful to a much wider set of use cases. We are excited to see how the community is already using the Async Sink since it became available with the Flink 1.15 release. In addition to the sinks for Kinesis Data Streams and Amazon Kinesis Data Firehose that we have contributed, the community has been working on a sink for Amazon DynamoDB and Redis Streams. There are also efforts planned to refactor the Apache Cassandra sink implementation with the Async Sink.

We have been working on additional improvements for the Async Sink since the initial release. We’ve implemented a rate-limiting strategy that is slowing down the sink (and the entire Flink application) if the destination becomes overloaded. For the initial release, this strategy cannot be adapted easily and we are currently working to make it easier to configure the default strategy (FLIP-242: Introduce configurable RateLimitingStrategy for Async Sink). We are also seeking feedback from the community on potential future extensions.

Beyond connectors, we want to continue contributing to Apache Flink. There have been efforts in the community to create a Flink Kubernetes operator. We are currently looking to extend the capabilities of that operator with support for additional deployment modes (FLIP-225: Implement standalone mode support in the kubernetes operator). These efforts will help to improve the security posture of Flink deployments in a multi-tenant environment. Moreover, we are adding support for asynchronous job submission (FLIP-236: Asynchronous Job Submission). This will help to reduce friction when deploying Flink applications with expensive initialization work as part of their main method.

We are excited to continue to work with the open source community to improve Apache Flink. It’s great to be part of the journey to make Apache Flink even more powerful to enable stream more processing use cases. We are curious to see how the contributions will be used by others to get value from their streaming data. If you are using the Async Sink to create a sink of your own, please let us know on the Flink mailing list or by creating a ticket on the Apache Flink Jira. We’d love to get your feedback and thoughts.

Flatlogic Admin Templates banner