Fresher Data Lake on AWS S3
Robinhood’s mission is to democratize finance for all. Continuous data analysis and data driven decision making at different levels within Robinhood are fundamental to achieving this. We have a variety of data sources — OLTP databases, event streams and various 3rd party sources — that need a fast, reliable, secure, and privacy-centric Data Lake ingestion service to power various reporting, business critical pipelines and dashboards.
We’ve come a long way from our initial version of Data Lake not only in terms of the scale of data stored and queries made, but also the use cases that we support in Data Lake. In this blog, we describe one such journey of how we built our Change Data Capture based incremental ingestion using various open source tools to reduce the data freshness latency for our core datasets from one day to under 15 minutes. We will also describe the limitations we had with the big batch ingestion model as well as lessons we learned operating incremental ingestion pipelines at massive scale.
Data Lake & Ecosystem
Robinhood’s Data Lake storage and compute infrastructure is the bedrock that powers many of our data driven functions such as business analytics dashboards and insights for product improvements. It is also the data source for running large scale data processing for business and ad hoc reporting and analysis. In addition, the ecosystem factors in privacy-centric primitives like anonymization and access control designed to protect user privacy.
The primary OLTP (Online Transaction Processing) databases are managed by Postgres RDS. Amazon S3 is the Data Lake storage, which provides a cost efficient and scalable storage tier for our Data Lake. We run production batch processing pipelines predominantly using Apache Spark. Our dashboards are powered by Trino distributed SQL query engine. Apache Hadoop Yarn manages the computing cluster for running Apache Spark jobs. Apache Hive Metastore manages and provides table schema for the query engine. Apache Airflow is the workflow orchestration service.
Data Lake with Compute Ecosystem
Throughout the article, we use the metric “data freshness” to compare different data ingestion architectures below. This metric gives time lag for a change that happened in a table in the source database to be visible in the corresponding Data Lake table.
Limitations of Large Batch Ingestion
As an initial step in the data-lake evolution, we began with ingesting daily snapshots of online databases using their read replicas. Ingesting complete snapshots of these tables results in high write amplification on the Data Lake tables. Even if only a few hundred thousand rows changed in a day for a table with billions of rows, ingesting a complete snapshot of the table would result in reading and writing the entire table. In addition, when using a live replica (instead of a database backup as upstream), the bottlenecks develop around read replica I/O performance which results in excessively longer snapshotting time, resulting in large ingestion delays. Even after employing techniques like parallelizing I/O by partitioned reads, this ingestion architecture was not capable of delivering data under an hour.
Robinhood has a genuine need to keep data freshness low for the Data Lake. Many of the batch processing pipelines that used to run on daily cadence after or before market hours had to be run at hourly or higher frequency to support evolving use-cases. It was clear we needed a faster ingestion pipeline to replicate online databases to the data-lake.
A better approach to achieve lower data freshness to Data Lake is incremental ingestion. Incremental ingestion is a well-known technique employed to build efficient ingestion pipelines for data lakes. Here, instead of taking snapshots and dumping them as a whole to Data Lake, the ingestion pipeline consumes the write ahead logs of the OLTP database in a streaming fashion and ingests them to the Data Lake tables, much like how database-to-database replication works.
Conceptually, we have a 2 stage pipeline.
- A Change Data Capture (CDC) service consumes Write-Ahead Log (WAL) data from OLTP database and buffers them in a changelog queue.
- A data ingestion job either periodically or in continuous fashion tails the queue and updates the Data Lake “raw” table.
Incremental Ingestion Components
The intermediate changelog queue allows for separation of concerns between the two stages. The two stages will be able to operate independently and each can be paused without affecting the other. The queue provides the necessary isolation so that any delay in ingesting data to data-lake does not cause back-pressure on the CDC.
For the first stage, we chose Debezium as a Change Data Capture (CDC) provider. Debezium is an open source distributed change data capture platform built on top of Kafka Connect. Debezium comes with a well-documented first class Postgres CDC connector that’s battle-tested. Based on our benchmarking, we found Debezium to comfortably handle our projected load volumes. We’ve set up Debezium to write change records in avro encoded format to Kafka using the open source Confluent Schema Registry. Avro encoding provided better performance when compared to json encoding.
For the second stage, we are using Apache Hudi to incrementally ingest changelogs from Kafka to create data-lake tables. Apache Hudi is a unified Data Lake platform for performing both batch and stream processing over Data Lakes. Apache Hudi comes with a full-featured out-of-box Spark based ingestion system called Deltastreamer with first-class Kafka integration, and exactly-once writes. Unlike immutable data, our CDC data have a fairly significant proportion of updates and deletes. Hudi Deltastreamer takes advantage of its pluggable, record-level indexes to perform fast and efficient upserts on the Data Lake table. Hudi self-manages its tables with automatic cleaning of old file versions, data clustering, hive schema syncing and file sizing to write well-sized files. The raw tables are currently stored in Hudi’s copy-on-write mode which provides native columnar read performance.
We’ve deployed the incremental ingestion pipeline to ingest 1000s of Postgres tables to the Data Lake. Prior to the new architecture, these tables were only guaranteed to be able to be snapshotted at a daily cadence due to the limitations of snapshotting and the costs involved. Data Lake users are happy to see the data freshness for critical tables go down from 24 hours to under 15 minutes with this new architecture.
Large Batch Snapshot Run Times showing long running times for snapshotting tables. Note that snapshotting for many of these tables need to run in sequence due to Read Replica I/O bottlenecks.
Large Batch Snapshot Running Schedule showing large batch snapshotting is run only once per day. This is due to the large turnaround time to snapshot all tables from a database.
New Incremental Ingestion Data Freshness showing the end to end data freshness to be around 5 minutes for the new ingestion system.
In this section, we’ll share the lessons we learned when building and operating incremental ingestion pipelines at scale. We hope this will be valuable for any one who wishes to embark on a similar journey for their Data Lake.
Scaling initial bootstrap
Incremental ingestion to the Data Lake still requires an initial snapshot of the source table. Debezium does provide initial snapshot mode but requires querying the master RDS instance. We did not want to query master RDS instances for snapshotting to avoid any contention between production OLTP queries and the initial snapshot ones. Additionally, we needed the ability to optimize initial snapshot time by running concurrent partitioned queries in a lock-free manner and also to take snapshots from DB backups.
For these reasons, we provisioned dedicated read-replicas and implemented a custom snapshotter on top of Apache Hudi Deltastreamer, which leverages Spark to run concurrent partitioned snapshot queries to take the initial snapshot of the table. Apache Hudi’s pluggable source framework allowed us to seamlessly implement this with a few lines of code.
With out-of-band initial snapshotting, we need to carefully track correct watermarks in CDC streams when switching between incremental ingestion and snapshotting. With Kafka, the CDC watermarks for the data ingestion job translate to Kafka offsets which marks the beginning changelog event to apply on the snapshotted table. If we choose an arbitrary Kafka offset, we could potentially end up missing some change events for applying to the Data Lake table.
Conceptually, we need 3 phases to perform the correct snapshotting and transitioning to incremental ingestion:
- Save the latest Kafka offsets to be used for replaying changelogs when switching to incremental ingestion. Let “Tₛ” be the source time of latest event.
- Ensure the read replica is up to date as of time “Tₛ + Δ” where Δ accounts for Debezium delay when kafka offsets are captured along with extra buffer time. Otherwise, the overall equation would not guarantee 0% data loss. Take the initial snapshot of the tables from read-replica and create Data Lake table
- Start consuming from the kafka offsets stored earlier and perform incremental ingestion of the tables. Once the incremental ingestion has started happening, sync the hive table definition to the latest location of the data. Downstream consumers will now be able to query the newly bootstrapped table.
Incremental Ingestion with Bootstrapping Architecture
Snapshotting from a dedicated read replica has limitations, such as I/O bottleneck on the replica side and also cost overhead of maintaining the read-replica online 24/7. We are exploring an approach which takes on-demand backup of OLTP databases and uses AWS S3 export to publish to S3. We can then rely on processing these S3 exports at scale and build the initial snapshot. This mechanism potentially allows for quicker snapshotting and overcoming some I/O bottlenecks on the read-replica side.
Monitoring the back-pressure risk with Postgres logical replication
Postgres logical replication requires CDC connector to talk directly to master RDS. Postgres logical replication protocol guarantees retention of WAL log files till Debezium has fully processed them.
This could cause WAL log files to accumulate and eat up free disk space if Debezium is stuck or unable to keep up with consuming the WAL logs. Debezium community recommends closely monitoring and alerting for lag build up which is what we implemented. Our Debezium load testing also gave us confidence about Debezium ability to handle projected increase in change velocity.
Automation to the rescue
One of the side-effects of switching from daily snapshotting to incremental ingestion is that the ingestion workflow has become stateful. The pipeline could be in either snapshotting or incremental ingestion state.
In addition, other operations such as schema upgrades, monitoring, and data quality validations need to be performed. New tables and databases need to be onboarded fairly regularly.
The end-to-end pipeline involves disparate systems — the online CDC world and batch/stream ingestion to Data Lake. Performing onboarding and regular operations for 1000s of tables requires proper state management and automation.
We realized that we needed a first-class orchestration service to be built in-house that would leverage Apache Airflow for managing the ingestion pipelines, keeping track of onboarding and table states and automatically handling state transitions and other maintenance. This has helped us in operating the pipelines at scale without getting bogged down with oncall maintenance.
Not all tables are created equal
When it comes to the importance of these tables for our critical use-cases, the pareto principle holds good. We have a smaller proportion of critical tables that need data freshness to be guaranteed under 15 minutes. We took an approach to classify the tables to different tiers based on their criticality. The highly critical tables are marked as tier-0. For these tables, we provision a separate CDC replication slot to isolate the CDC channel for these critical tables from that of other tables. Also, we provided dedicated resources to Hudi deltastreamer to continually ingest incremental change-logs and was able to keep the data fresh between 5 -15 minutes. For lower priority tables, Hudi deltastreamer is configured to run every 15 minutes in batch mode.
Managing Postgres Schema updates
We are in the business of replicating tables from the online OLTP world to that of Data Lake. The data replicated are not opaque blobs but have proper schema to them and the replication pipeline guarantees well defined behavior for translating online table schema to that of the Data Lake.
There are differences in what backwards compatibility means for the online and Data Lake world, given Data Lakes are also capable of storing the entire history of the data changes. For instance, in the online world adding a non-nullable column to postgres is perfectly fine but would not adhere to schema evolution rules for Avro (or Protobuf) which is being used to store changelog in motion. Having well defined schema evolution contracts helps keep the Data Lake pipelines more stable.
We found that most of the time, the schema changes involved adding new columns. We are using the Debezium feature to freeze the set of columns we read from Postgres tables and rely on re-bootstrapping tables to handle schema upgrades. We plan to add a detection mechanism for schema compatibility for the end-to-end pipeline to reduce the number of re-bootstrapping.
We are seeing faster adoption to use the incrementally ingested raw Data Lake tables and we are continuously working on improving the reliability of the pipelines. Here are a few next steps we are embarking on:
- Data Quality Assurance: We implemented both generic and custom data quality and integrity checks that run at different frequencies to catch discrepancies in replicated data. We are working towards leveraging Apache Hudi’s pre-commit validation support to run custom validations before every batch is committed.
- Reduce data freshness lag even further: We are currently using Apache Hudi Copy-On-Write format. With this mode, we are seeing data freshness around the 5–15 minute range. We are planning to explore Apache Hudi’s Merge-On-Read format to reduce the data freshness further.
- Streaming Data Lake: Apache Hudi provides incremental processing capabilities just like database changelogs. Our future work involves using this primitive and building end to end streaming pipelines to efficiently percolate the change to downstream tables. This will also allow us to perform privacy preserving operations like masking and anonymization in a real-time streaming fashion.
- CDC Service for inter-service data exchange: CDC has been used within Robinhood to provide the change stream for incremental ingestion to Data Lake. We are investigating using the CDC stream for reliable data exchanges between various online microservices.
- Data Compute: We are constantly working on improving usability, efficiency and performance of our data compute platform built on top of Apache Spark and Trino to power critical data computing workloads.
These are exciting times to be working in the Robinhood data infra team as we have started on building the next generation of Robinhood data-lake. If you are excited to be part of this journey, we would like to invite you to apply for relevant job opportunities.
Robinhood Markets Inc. and Medium are separate and unique companies and are not responsible for one another’s views or services.
© 2022 Robinhood Markets, Inc.