How Zomato Boosted Performance 25% and Cut Compute Cost 30% Migrating Trino and Druid Workloads to AWS Graviton

Zomato is an India-based restaurant aggregator, food delivery, dining-out company with over 350,000 listed restaurants across more than 1,000 cities in India. The company relies heavily on data analytics to enrich the customer experience and improve business efficiency. Zomato’s engineering and product teams use data insights to refine their platform’s restaurant and cuisine recommendations, improve the accuracy of waiting times at restaurants, speed up the matching of delivery partners and improve overall food delivery process.

At Zomato, different teams have different requirements for data discovery based upon their business functions. For example, number of orders placed in specific area required by a city lead team, queries resolved per minute required by customer support team or most searched dishes on special events or days by marketing and other teams. Zomato’s Data Platform team is responsible for building and maintaining a reliable platform which serves these data insights to all business units.

Zomato’s Data Platform is powered by AWS services including Amazon EMR, Amazon Aurora MySQL-Compatible Edition and Amazon DynamoDB along with open source software Trino (formerly PrestoSQL) and Apache Druid for serving the previously mentioned business metrics to different teams. Trino clusters process over 250K queries by scanning 2PB of data and Apache Druid ingests over 20 billion events and serves 8 million queries every week. To deliver performance at Zomato scale, these massively parallel systems utilize horizontal scaling of nodes running on Amazon Elastic Compute Cloud (Amazon EC2) instances in their clusters on AWS. Performance of both these data platform components is critical to support all business functions reliably and efficiently in Zomato. To improve performance in a cost-effective manner, Zomato migrated these Trino and Druid workloads onto AWS Graviton-based Amazon EC2 instances.

Graviton-based EC2 instances are powered by Arm-based AWS Graviton processors. They deliver up to 40% better price performance than comparable x86-based Amazon EC2 instances. CPU and Memory intensive Java-based applications including Trino and Druid are suitable candidates for AWS Graviton based instances to optimize price-performance, as Java is well supported and generally performant out-of-the-box on arm64.

In this blog, we will walk you through an overview of Trino and Druid, how they fit into the overall Data Platform architecture and migration journey onto AWS Graviton based instances for these workloads. We will also cover challenges faced during migration, how Zomato team overcame those challenges, business gains in terms of cost savings and better performance along with future plans of Zomato on Graviton adoption for more workloads.

Trino overview

Trino is a fast, distributed SQL query engine for querying petabyte scale data, implementing massively parallel processing (MPP) architecture. It was designed as an alternative to tools that query Apache Hadoop Distributed File System (HDFS) using pipelines of MapReduce jobs, such as Apache Hive or Apache Pig, but Trino is not limited to querying HDFS only. It has been extended to operate over a multitude of data sources, including Amazon Simple Storage Service (Amazon S3), traditional relational databases and distributed data stores including Apache Cassandra, Apache Druid, MongoDB and more. When Trino executes a query, it does so by breaking up the execution into a hierarchy of stages, which are implemented as a series of tasks distributed over a network of Trino workers. This reduces end-to-end latency and makes Trino a fast tool for ad hoc data exploration over very large data sets.

Figure 1 – Trino architecture overview

Trino coordinator is responsible for parsing statements, planning queries, and managing Trino worker nodes. Every Trino installation must have a coordinator alongside one or more Trino workers. Client applications including Apache Superset and Redash connect to the coordinator via Presto Gateway to submit statements for execution. The coordinator creates a logical model of a query involving a series of stages, which is then translated into a series of connected tasks running on a cluster of Trino workers. Presto Gateway acts as a proxy/load-balancer for multiple Trino clusters.

Druid overview

Apache Druid is a real-time database to power modern analytics applications for use cases where real-time ingest, fast query performance and high uptime are important. Druid processes are deployed on three types of server nodes: Master nodes govern data availability and ingestion, Query nodes accept queries, execute them across the system, and return the results and Data nodes ingest and store queryable data. Broker processes receive queries from external clients and forward those queries to Data servers. Historicals are the workhorses that handle storage and querying on “historical” data. MiddleManager processes handle ingestion of new data into the cluster. Please refer here to learn more on detailed Druid architecture design.

Figure 2 – Druid architecture overview

Zomato’s Data Platform Architecture on AWS

Figure 3 – Zomato’s Data Platform landscape on AWS

Zomato’s Data Platform covers data ingestion, storage, distributed processing (enrichment and enhancement), batch and real-time data pipelines unification and a robust consumption layer, through which petabytes of data is queried daily for ad-hoc and near real-time analytics. In this section, we will explain the data flow of pipelines serving data to Trino and Druid clusters in the overall Data Platform architecture.

Data Pipeline-1: Amazon Aurora MySQL-Compatible database is used to store data by various microservices at Zomato. Apache Sqoop on Amazon EMR run Extract, Transform, Load (ETL) jobs at scheduled intervals to fetch data from Aurora MySQL-Compatible to transfer it to Amazon S3 in the Optimized Row Columnar (ORC) format, which is then queried by Trino clusters.

Data Pipeline-2: Debezium Kafka connector deployed on Amazon Elastic Container Service (Amazon ECS) acts as producer and continuously polls data from Aurora MySQL-Compatible database. On detecting changes in the data, it identifies the change type and publishes the change data event to Apache Kafka in Avro format. Apache Flink on Amazon EMR consumes data from Kafka topic, performs data enrichment and transformation and writes it in ORC format in Iceberg tables on Amazon S3. Trino clusters then query data from Amazon S3.

Data Pipeline-3: Moving away from other databases, Zomato had decided to go serverless with Amazon DynamoDB because of its high performance (single-digit millisecond latency), request rate (millions per second), extreme scale as per Zomato peak expectations, economics (pay as you go) and data volume (TB, PB, EB) for their business-critical apps including Food Cart, Product Catalog and Customer preferences. DynamoDB streams publish data from these apps to Amazon S3 in JSON format to serve this data pipeline. Apache Spark on Amazon EMR reads JSON data, performs transformations including conversion into ORC format and writes data back to Amazon S3 which is used by Trino clusters for querying.

Data Pipeline-4: Zomato’s core business applications serving end users include microservices, web and mobile applications. To get near real-time insights from these core applications is critical to serve customers and win their trust continuously. Services use a custom SDK developed by data platform team to publish events to the Apache Kafka topic. Then, two downstream data pipelines consume these application events available on Kafka via Apache Flink on Amazon EMR. Flink performs data conversion into ORC format and publishes data to Amazon S3 and in a parallel data pipeline, Flink also publishes enriched data onto another Kafka topic, which further serves data to an Apache Druid cluster deployed on Amazon EC2 instances.

Performance requirements for querying at scale

All of the described data pipelines ingest data into an Amazon S3 based data lake, which is then leveraged by three types of Trino clusters – Ad-hoc clusters for ad-hoc query use cases, with a maximum query runtime of 20 minutes, ETL clusters for creating materialized views to enhance performance of dashboard queries, and Reporting clusters to run queries for dashboards with various Key Performance Indicators (KPIs), with query runtime upto 3 minutes. ETL queries are run via Apache Airflow with a built-in query retry mechanism and a runtime of up to 3 hours.

Druid is used to serve two types of queries: computing aggregated metrics based on recent events and comparing aggregated metrics to historical data. For example, how is a specific metric in the current hour compared to the same last week. Depending on the use case, the service level objective for Druid query response time ranges from a few milliseconds to a few seconds.

Graviton migration of Druid cluster

Zomato first moved Druid nodes to AWS Graviton based instances in their test cluster environment to determine query performance. Nodes running brokers and middle-managers were moved from R5 to R6g instances and nodes running historicals were migrated from i3 to R6gd instances.   Zomato logged real-world queries from their production cluster and replayed them in their test cluster to validate the performance. Post validation, Zomato saw significant performance gains and reduced cost:

Performance gains

For queries in Druid, performance was measured using typical business hours (12:00 to 22:00 Hours) load of 14K queries, as shown here, where p99 query runtime reduced by 25%.

Figure 4 – Overall Druid query performance (Intel x86-64 vs. AWS Graviton)

Also, query performance improvement on the historical nodes of the Druid cluster are shown here, where p95 query runtime reduced by 66%.

Figure 5 –Query performance on Druid Historicals (Intel x86-64 vs. AWS Graviton)

Under peak load during business hours (12:00 to 22:00 Hours as shown in the provided graph), with increasingly loaded CPUs, Graviton based instances demonstrated close to linear performance resulting in better query runtime than equivalent Intel x86 based instances. This provided headroom to Zomato to reduce their overall node count in the Druid cluster for serving the same peak load query traffic.

Figure 6 – CPU utilization (Intel x86-64 vs. AWS Graviton)

Cost savings

A Cost comparison of Intel x86 vs. AWS Graviton based instances running Druid in a test environment along with the number, instance types and hourly On-demand prices in the Singapore region is shown here. There are cost savings of ~24% running the same number of Graviton based instances. Further, Druid cluster auto scales in production environment based upon performance metrics, so average cost savings with Graviton based instances are even higher at ~30% due to better performance.

Figure 7 – Cost savings analysis (Intel x86-64 vs. AWS Graviton)

Graviton migration of Trino clusters

Zomato also moved their Trino cluster in their test environment to AWS Graviton based instances and monitored query performance for different short and long-running queries. As shown here, mean wall (elapsed) time value for different Trino queries is lower on AWS Graviton instances than equivalent Intel x86 based instances, for most of the queries (lower is better).

Figure 8 – Mean Wall Time for Trino queries (Intel x86-64 vs. AWS Graviton)

Also, p99 query runtime reduced by ~33% after migrating the Trino cluster to AWS Graviton instances for a typical business day’s (7am – 7pm) mixed query load with ~15K queries.

Figure 9 –Query performance for a typical day (7am -7pm) load

Zomato’s team further optimized overall Trino query performance by enhancing Advanced Encryption Standard (AES) performance on Graviton for TLS negotiation with Amazon S3. It was achieved by enabling -XX:+UnlockDiagnosticVMOptions and -XX:+UseAESCTRIntrinsics in extra JVM flags. As shown here, mean CPU time for queries is lower after enabling extra JVM flags, for most of the queries.

Figure 10 –Query performance after enabling extra JVM options with Graviton instances

Migration challenges and approach

Zomato team is using Trino version 359 and multi-arch or ARM64-compatible docker image for this Trino version was not available. As the team wanted to migrate their Trino cluster to Graviton based instances with minimal engineering efforts and time, they backported the Trino multi-arch supported UBI8 based Docker image to their Trino version 359.  This approach allowed faster adoption of Graviton based instances, eliminating the heavy lift of upgrading, testing and benchmarking the workload on a newer Trino version.

Next Steps

Zomato has already migrated AWS managed services including Amazon EMR and Amazon Aurora MySQL-Compatible database to AWS Graviton based instances. With the successful migration of two main open source software components (Trino and Druid) of their data platform to AWS Graviton with visible and immediate price-performance gains, the Zomato team plans to replicate that success with other open source applications running on Amazon EC2 including Apache Kafka, Apache Pinot, etc.

Conclusion

This post demonstrated the price/performance benefits of adopting AWS Graviton based instances for high throughput, near real-time big data analytics workloads running on Java-based, open source Apache Druid and Trino applications. Overall, Zomato reduced the cost of its Amazon EC2 usage by 30%, while improving performance for both time-critical and ad-hoc querying by as much as 25%. Due to better performance, Zomato was also able to right size compute footprint for these workloads on a smaller number of Amazon EC2 instances, with peak capacity of Apache Druid and Trino clusters reduced by 25% and 20% respectively.

Zomato migrated these open source software applications faster by quickly implementing customizations needed for optimum performance and compatibility with Graviton based instances. Zomato’s mission is “better food for more people” and Graviton adoption is helping with this mission by providing a more sustainable, performant, and cost-effective compute platform on AWS. This is certainly a “food for thought” for customers looking forward to improve price-performance and sustainability for their business-critical workloads running on Open Source Software (OSS).

Flatlogic Admin Templates banner

Adding CDK Constructs to the AWS Analytics Reference Architecture

In 2021, we released the AWS Analytics Reference Architecture, a new AWS Cloud Development Kit (AWS CDK) application end-to-end example, as open source (docs are CC-BY-SA 4.0 International, sample code is MIT-0). It shows how our customers can use the available AWS products and features to implement well-architected analytics solutions. It also regroups AWS best practices for designing, implementing and operating analytics solutions through different purpose-built patterns. Altogether, the AWS Analytics Reference Architecture answers common requirements and solves customer challenges.

In 2022, we extended the scope of this project with AWS CDK constructs to provide more granular and reusable examples. This project is now composed of:

Reusable core components exposed in an AWS CDK library currently available in Typescript and Python. This library contains the AWS CDK constructs that can be used to quickly provision prepackaged analytics solutions.
Reference architectures consuming the reusable components in AWS CDK applications, and demonstrating end-to-end examples in a business context. Currently, only the AWS native reference architecture is available but others will follow.

In this blog post, we will first show how to consume the core library to quickly provision analytics solutions using CDK Constructs and experiment with AWS analytics products.

Building solutions with the Core Library

To illustrate how to use the core components,  let’s see how we can quickly build a Data Lake, a central piece for most analytics projects. The storage layer is implemented with the DataLakeStorage CDK construct relying on Amazon Simple Storage Service (Amazon S3), a durable, scalable and cost-effective object storage service. The query layer is implemented with the AthenaDemoSetup construct using Amazon Athena, an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. With regard to the data catalog, it‘s implemented with the DataLakeCatalog construct using AWS Glue Data Catalog.

Before getting started, please make sure to follow the instructions available here for setting up the prerequisites:

Install the necessary build dependencies
Bootstrap the AWS account
Initialize the CDK application.

This architecture diagram depicts the data lake building blocks we are going to deploy using the AWS Analytics Reference Architecture library. These are higher level constructs (commonly called L3 constructs) as they integrate several AWS services together in patterns.

To assemble these components, you can add this code snippet in your app.py file:

import aws_analytics_reference_architecture as ara

# Create a new DataLakeStorage with Raw, Clean and Transform buckets
storage = ara.DataLakeStorage(scope=self, id=”storage”)

# Create a new DataLakeCatalog with Raw, Clean and Transform databases
catalog = ara.DataLakeCatalog(scope=self, id=”catalog”)

# Configure a new Athena Workgroup
athena_defaults = ara.AthenaDemoSetup(scope=self, id=”demo_setup”)

# Generate data from Customer TPC dataset
data_generator = ara.BatchReplayer(
scope=self,
id=”customer-data”,
dataset=ara.PreparedDataset.RETAIL_1_GB_CUSTOMER,
sink_object_key=”customer”,
sink_bucket=storage.raw_bucket,
)

# Role with default permissions for any Glue service
glue_role = ara.GlueDemoRole.get_or_create(self)

# Crawler to create tables automatically
crawler = glue.CfnCrawler(self, id=’ara-crawler’, name=’ara-crawler’,
role=glue_role.iam_role.role_arn, database_name=’raw’,
targets={‘s3Targets’: [{“path”: f”s3://{storage.raw_bucket.bucket_name}/{data_generator.sink_object_key}/”}],}
)

# Trigger to kick off the crawler
cfn_trigger = glue.CfnTrigger(self, id=”MyCfnTrigger”,
actions=[{‘crawlerName’: crawler.name}],
type=”SCHEDULED”, description=”ara_crawler_trigger”,
name=”min_based_trigger”, schedule=”cron(0/5 * * * ? *)”, start_on_creation=True,
)

In addition to this library construct, the example also includes lower level constructs (commonly called L1 constructs) from the AWS CDK standard library. This shows that you can combine constructs from any CDK library interchangeably.

For use cases where customers have a need to adjust the default configurations in order to align with their organization specific requirements (e.g. data retention rules), the constructs can be changed through the class parameters as shown in this example:

storage = ara.DataLakeStorage(scope=self, id=”storage”, raw_archive_delay=180, clean_archive_delay=1095)

Finally, you can deploy the solution using the AWS CDK CLI from the root of the application with this command: cdk deploy. Once you deploy the solution, AWS CDK provisions the AWS resources included in the Constructs and you can log into your AWS account.

Go to the Athena console and start querying the data. The AthenaDemoSetup provides an Athena workgroup called “demo” that you can select to start querying the BatchReplayer data very quickly. Data is stored in the DataLakeStorage and registered in the DataLakeCatalog. Here is an example of an Athena query accessing the customer data from the BatchReplayer:

Accelerate the implementation

Earlier in the post we pointed out that the library simplifies and accelerates the development process. First, writing Python code is more appealing than writing CloudFormation markup code, either in json or yaml. Second, the CloudFormation template generated by the AWS CDK for the data lake example is 16 times more verbose than Python scripts.

❯ cdk synth | wc -w
2483

❯ wc -w ara_demo/ara_demo_stack.py
154

Demonstrating end-to-end examples with reference architectures

The AWS native reference architecture is the first reference architecture available. It explains the journey of a fake company, MyStore Inc., as it implements its data platform solution with AWS products and services . Deploying the AWS native reference architecture demonstrates a fully working example of a data platform from data ingestion to business analysis. AWS customers can learn from it, see analytics solutions in action, and play with retail dataset and business analysis.

More reference architectures will will be added to this project in Github later.

Business Story

The AWS native reference architecture is faking a retail company called MyStore Inc. that is building a new analytics platform on top of AWS products. This example shows how retail data can be ingested, processed, and analyzed in streaming and batch processes to provide business insights like sales analysis. The platform is built on top of the CDK Constructs from the core library to minimize development effort and inherit from AWS best practices.

Here is the architecture deployed by the AWS native reference architecture:

The platform is implemented in purpose-built modules. They are decoupled and can be independently provisioned but still integrate with each other. The global platformMyStore’s analytics platform has been able to deploy the following modules thanks to:

Data Lake foundations: This mandatory module (based on DataLakeCatalog and DataLakeStorage core constructs) is the core of the analytics platform. It contains the data lake storage and associated metadata for both batch and streaming data. The data lake is organized in multiple Amazon S3 buckets representing different versions of the data. (a) The raw layer contains the data coming from the data sources in the raw format. (b) The cleaned layer contains the raw data that has been cleaned and parsed to a consumable schema. (c) And the curated layer contains refactored data based on business requirements.

Batch analytics: This module is in charge of ingesting and processing data from a Stores channel generated by the legacy systems in batch mode. Data is then exposed to other modules for downstream consumption. The data preparation process leverages various features of AWS Glue, a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning, and application development via the Apache Spark framework. The orchestration of the preparation is handled using AWS Glue Workflows that allows managing and monitoring executions of Extract, Transform, and Load (ETL) activities involving multiple crawlers, jobs, and triggers. The metadata management is implemented via AWS Glue Crawlers, a serverless process that crawls data sources and sinks to extract the metadata including schemas, statistics and partitions. It saves them in the AWS Glue Data Catalog.

Streaming analytics: This module is ingesting and processing real time data from the Web channel generated by cloud native systems. The solution minimizes data analysis latency but also to feed the data lake for downstream consumption.

Data Warehouse: This module is ingesting data from the data lake to support reporting, dashboarding and ad hoc querying capabilities. The module is using an Extract, Load, and Transform (ELT) process to transform the data from the Data Lake foundations module. Here are the steps that outline the data pipeline from the data lake into the data warehouse. 1. AWS Glue Workflow reads CSV files from the Raw layer of the data lake and writes them to the Clean layer as Parquet files. 2. Stored procedures in Amazon Redshift’s stg_mystore schema extract data from the Clean layer of the data lake using Amazon Redshift Spectrum. 3. The stored procedures then transform and load the data into a star schema model.

Data Visualization: This module is providing dashboarding capabilities to business users like data analysts on top of the Data Warehouse module, but also provides data exploration on top of the Data Lake module. It is implemented with Amazon Quicksight, a scalable, serverless, embeddable, and machine learning-powered business intelligence tool. Amazon QuickSight is connected to the data lake via Amazon Athena and the data lake via Amazon Redshift using direct query mode, in opposition to the caching mode with SPICE.

Project Materials

The AWS native reference architecture provides both code and documentation about MyStore’s analytics platform:

Documentation is available on GitHub and comes in two different parts:

The high level design describes the overall data platform implemented by MyStore, and the different components involved. This is the recommended entry point to discover the solution.
The analytics solutions provide fine-grained solutions to the challenges MyStore met during the project. These technical patterns can help you choose the right solution for common challenges in analytics.

The code is publicly available here and can be reused as an example for other analytics platform implementations. The code can be deployed in an AWS account by following the getting started guide.

Conclusion

In this blog post, we introduced new AWS CDK content available for customers and partners to easily implement AWS analytics solutions with the AWS Analytics Reference Architecture. The core library provides reusable building blocks with best practices to accelerate the development life cycle on AWS and the reference architecture demonstrates running examples with end-to-end integration in a business context.

Because of its reusable nature, this project will be the foundation for lots of additional content. We plan to extend the technical scope of it with Constructs and reference architectures for a data mesh. We’ll also expand the business scope with industry focused examples. In a future blog post, we will go deeper into the constructs related to Amazon EMR Studio and Amazon EMR on EKS to demonstrate how customers can easily bootstrap an efficient data platform based on Amazon EMR Spark and notebooks.

Flatlogic Admin Templates banner

Making it Easier to Build Connectors with Apache Flink: Introducing the Async Sink

Apache Flink is a popular open source framework for stateful computations over data streams. It allows you to formulate queries that are continuously evaluated in near real time against an incoming stream of events. To persist derived insights from these queries in downstream systems, Apache Flink comes with a rich connector ecosystem that supports a wide range of sources and destinations. However, the existing connectors may not always be enough to support all conceivable use cases. Our customers and the community kept asking for more connectors and better integrations with various open source tools and services.

But that’s not an easy problem to solve. Creating and maintaining production-ready sinks for a new destination is a lot of work. For critical use cases, it’s undesirable to lose messages or to compromise on performance when writing into a destination. However, sinks have commonly been developed and maintained independently of each other. This further adds to the complexity and cost of adding sinks to Apache Flink, as more functionality had to be independently reimplemented and optimized for each sink.

To better support our customers and the entire Apache Flink community, we set out to make it easier and less time consuming to build and maintain sinks. We contributed the Async Sink to the Flink 1.15 release, which improved cloud interoperability and added more sink connectors and formats, among other updates. The Async Sink is an abstraction for building sinks with at-least-once semantics. Instead of reimplementing the same core functionality for every new sink that is created, the Async Sink provides common sink functionality that can be extended upon. In the remainder of this post, we’ll explain how the Async Sink works, how you can build a new sink based on the Async Sink, and discuss our plans to continue our contributions to Apache Flink.

Abstracting away common components with the Async Sink

Although sinks have been commonly developed in isolation, their basic functionality is often similar. Sinks buffer multiple messages to send them in a single batch request to improve efficiency. They check completed requests for success and resend messages that were not persisted successfully at a later point. They participate in Flink’s checkpointing mechanism to avoid losing any messages in case the Flink application fails and needs to recover. Lastly, sinks monitor and control the throughput to the destination to not overload it and to fairly divide the capacity amongst multiple concurrent producers. There are usually only two main things that differ between destinations: the structure and information contained in both the destination requests and responses.

Instead of creating independent sinks and duplicating all of this common functionality for every sink, we can abstract away and implement these common requirements once. To implement a new sink, developers then only need to specify those aspects that are specific to the sink: how to build and send requests, and how to identify from the response which records were not persisted successfully and need to be resent. In this way, building a new sink just requires the creation of a lightweight shim that is specific to the destination.

Building a new sink with the Async Sink abstraction

Let’s look at what it takes to build a new sink based on the Async Sink abstraction. For this example, we’ll implement a simplified sink for Amazon Kinesis Data Streams. Kinesis Data Streams is a streaming data service to capture and store data streams. Data is persisted into a kinesis stream by means of the PutRecords API that can persist multiple records with a single batch request.

There are three main aspects that are specific to our sink that we need to implement. First, we need to specify how to extract the information required to make a batch request from the event. In Kinesis Data Streams, this includes the actual payload and a partition key. Second, we need to specify how to construct and make a batch request. And third, we need to inspect the response of the request to know whether all elements of the batch request have been persisted successfully.

Let’s start with extracting the required information from an event. We need to specify how to convert an event to a so-called request entry that forms a batch request. The following code example shows what this looks like for our Kinesis Data Streams sink. The code simply specifies how to extract the actual payload and a partition key from the event and return a PutRecordsRequestEntry object. In this simplified example, we use the string representation of the event as the payload and the hash code of the event as partition key. For a more sophisticated implementation, it may be more desirable to use a serializer that is configurable and provides more flexibility on how to construct the payload and partition key to end users of the sink.

@Override
public PutRecordsRequestEntry apply(InputT event, SinkWriter.Context context) {
return PutRecordsRequestEntry.builder()
.data(SdkBytes.fromUtf8String(event.toString()))
.partitionKey(String.valueOf(event.hashCode()))
.build();
}

The sink will buffer these objects until it has collected enough of them according to the buffering hints. These buffering hints include a limit on the number of messages, total size of messages, and a timeout condition.

Next, we need to specify how to construct and make the actual batch request. This is, again, specific to the destination we are writing to, and therefore something we need to implement as part of the submitRequestEntries method that you can see in the code example below. The Async Sink invokes this method with a set of buffered request entries that should form the batch request.

For the Kinesis Data Streams sink, we need to specify how to construct and run the PutRecords request from a set of PutRecordsRequestEntry objects (Lines 6-9 in the example below). In addition to making the batch request, we also need to check the response of the PutRecords request for entries that were not persisted successfully. These entries need to be requeued in the internal buffer so the Async Sink can retry them at a later point (Lines 11-31).

@Override
protected void submitRequestEntries(
List<PutRecordsRequestEntry> requestEntriesToSend,
Consumer<List<PutRecordsRequestEntry>> requestEntriesToRetry) {

//construct and run the PutRecords request
PutRecordsRequest batchRequest =
PutRecordsRequest.builder().records(requestEntriesToSend).streamName(streamName).build();

CompletableFuture<PutRecordsResponse> future = kinesisClient.putRecords(batchRequest);

//check the response of the PutRecords request
future.whenComplete(
(response, err) -> {
if (err != null) {
// entire batch request failed, all request entries need to be retried
requestEntriesToRetry.accept(requestEntriesToSend);
} else if (response.failedRecordCount() > 0) {
// some request entries in the batch request were not persisted and need to be retried
List<PutRecordsRequestEntry> failedRequestEntries = new ArrayList<>(response.failedRecordCount());
List<PutRecordsResultEntry> records = response.records();

for (int i = 0; i < records.size(); i++) {
if (records.get(i).errorCode() != null) {
failedRequestEntries.add(requestEntriesToSend.get(i));
}
}
requestEntriesToRetry.accept(failedRequestEntries);
} else {
// all request entries of the batch request have been successfully persisted
requestEntriesToRetry.accept(Collections.emptyList());
}
});
}

That’s basically it. These are the main components of the sink you need to implement for a basic Kinesis Data Streams sink. These are parts that are specific to the destination and cannot be abstracted away.

For each event the sink receives, it applies the conversion and buffers the result. Once the conditions of the buffering hints are met, the sink will then construct and send a batch request. The buffering hints also help to satisfy constraints of the destination. For instance, the PutRecords API supports up to 500 records with a total size of 5 MiB and the buffering hints help to enforce these limits. From the response of the request, the sink identifies which request entries were not persisted correctly and requeues them in the internal queue. In addition, the sink will automatically adapt the throughput to the limits of the destination and slow down the entire Flink application by applying back pressure in case the destination becomes overloaded.

However, we left out a couple of details for the sake of simplicity. Some additional boilerplate code is required to assemble these individual pieces into a complete sink. For a production-ready sink, we would also need to extract the message size to support size-based buffering hints, implement serialization for request entries to obtain exactly once semantics, and add support for Flink’s Python and Table API. In addition, adding tests to the implementation is highly encouraged to obtain a well-tested implementation.

We have just used Kinesis Data Streams as an example here to explain the basic components that are required to create a simplified sink. We have implemented a complete and production-ready Kinesis Data Streams sink in Flink 1.15. If you want to sink data into a Kinesis data stream or are interested in a complete example, you can find the sources in the official Apache Flink GitHub repository. If you are curious to see additional examples, you can refer to the Amazon Kinesis Data Firehose sink that is also part of Flink 1.15 or a sample implementation of an Amazon CloudWatch sink.

What’s next?

We’ve started the work on the Async Sink to make it easier to build integrations with AWS services. But we soon realized that our contributions could be generalized to be useful to a much wider set of use cases. We are excited to see how the community is already using the Async Sink since it became available with the Flink 1.15 release. In addition to the sinks for Kinesis Data Streams and Amazon Kinesis Data Firehose that we have contributed, the community has been working on a sink for Amazon DynamoDB and Redis Streams. There are also efforts planned to refactor the Apache Cassandra sink implementation with the Async Sink.

We have been working on additional improvements for the Async Sink since the initial release. We’ve implemented a rate-limiting strategy that is slowing down the sink (and the entire Flink application) if the destination becomes overloaded. For the initial release, this strategy cannot be adapted easily and we are currently working to make it easier to configure the default strategy (FLIP-242: Introduce configurable RateLimitingStrategy for Async Sink). We are also seeking feedback from the community on potential future extensions.

Beyond connectors, we want to continue contributing to Apache Flink. There have been efforts in the community to create a Flink Kubernetes operator. We are currently looking to extend the capabilities of that operator with support for additional deployment modes (FLIP-225: Implement standalone mode support in the kubernetes operator). These efforts will help to improve the security posture of Flink deployments in a multi-tenant environment. Moreover, we are adding support for asynchronous job submission (FLIP-236: Asynchronous Job Submission). This will help to reduce friction when deploying Flink applications with expensive initialization work as part of their main method.

We are excited to continue to work with the open source community to improve Apache Flink. It’s great to be part of the journey to make Apache Flink even more powerful to enable stream more processing use cases. We are curious to see how the contributions will be used by others to get value from their streaming data. If you are using the Async Sink to create a sink of your own, please let us know on the Flink mailing list or by creating a ticket on the Apache Flink Jira. We’d love to get your feedback and thoughts.

Flatlogic Admin Templates banner