Upgrading & Scaling Airflow at Robinhood
Upgrading & Scaling Airflow at Robinhood
We have been using Airflow as a key workflow management tool at Robinhood for a while. In addition to the traditional use cases like analytics and metrics aggregation, we use Airflow for certain brokerage operations as well, including our clearing system.
However, as Airflow usage within the company has grown, we’ve had instances of reliability and performance issues. The most common issues we saw were:
- Tasks getting stuck in the “QUEUED” state and never running unless manually triggered.
- DAG SLA miss alerts not being triggered.
- DAG version inconsistencies between the webserver and the workers, causing tasks to fail silently.
At the start of 2019, we decided to invest in improving our understanding of Airflow and improving the reliability of our Airflow cluster. In this article, we share how we upgraded our Airflow cluster in production, improved the observability and reliability, and the lessons we learned along the way.
Upgrading Airflow in Production
At the time we started working on this project, Airflow 1.10 was the latest stable Airflow version available, but we were using 1.7 within Robinhood. We spent some time researching and looking into what changes had been made from 1.7 to 1.10 and decided to upgrade our cluster. The main reasons for upgrading our Airflow cluster were:
- Airflow has a very active community, and a lot of bug fixes and performance improvements had been made over the last couple of versions. These changes would help us make our clusters more resilient.
- Airflow 1.7 was no longer publicly available, which caused issues when we tried to deploy internally. We ended up maintaining an internal fork of Airflow to be able to support Airflow 1.7.
As with any critical production migration, there were a few challenges:
- Airflow 1.7 used local time as the default, so date macros (execution_date, ds, etc.) were in local time. As a result of where we run our Airflow clusters, all downstream services had an implicit assumption that dates being passed around would be in Eastern Time (ET). However, Airflow 1.10 breaks this assumption. Even though Airflow 1.10 is timezone aware when it comes to DAG scheduling, Airflow automatically updates date macros to UTC time. Thus, we had to handle the conversion of these datetime objects from UTC to ET before passing them to downstream tasks and services.
- We wanted a zero downtime migration in production that would be as transparent to the application developers as possible.
- We had gaps in observability in the current cluster, which made debugging hard.
We decided not to upgrade our Airflow cluster in place since we didn’t want to affect the already running production cluster. Instead, we spun up a new Airflow cluster in a “staging” (production-like) environment first. We used this cluster to test changes before rolling them out to production. Some changes we tested in this environment were:
- Airflow Config Changes: Examples include changing the default Timezone, increasing parallelism, disabling backfilling by default.
- Infrastructure changes: Examples include any changes to either the webserver or workers.
- Abstraction Changes: Examples include changes to Robinhood-specific Airflow abstractions, such as custom sensors and operators.
Being able to test these changes in the staging environment was important, as we wanted to establish confidence in those changes before we migrated our production cluster. The migration took a couple of weeks, and we transitioned to the new cluster without any major issues or downtime.
These steps were key to our successful migration process:
- We spun up a new Airflow 1.10 cluster in the staging environment, which mimicked our production environment as closely as possible.This made it easier to test these changes in a similar environment without exposing key business workflows to risk.
- Once the cluster was tested in staging, we spun up a new Airflow 1.10 cluster in production. Since we had already tested the cluster in staging, this step went pretty smoothly.
- We migrated the DAGs in batches from the old cluster to the new cluster. During the migration phase, each DAG would exist in both clusters. We decided on this approach because it gave us the flexibility to run a DAG on the old cluster if there were any issues on the new cluster.
- To ensure we didn’t accidentally miss a DAG run or include multiple DAG runs (one in each cluster), we would pause a DAG in the old cluster after the latest run would finish, add the DAG to the new cluster with a static start_date for the next execution_date, and then unpause the DAG. We also disabled backfills by default in the Airflow config (airflow.cfg) for the new cluster so we didn’t accidentally backfill DAGs when we turned them on.
- Ideally, every task in a DAG would be idempotent and we wouldn’t need to worry about multiple DAG runs. However, during our migration, we found out that certain tasks weren’t idempotent, and we had to be careful about how we migrated them. We also worked on making these tasks idempotent.
Running Airflow @ Scale
As part of this migration, we also wanted to improve the observability of our Airflow cluster. Airflow comes with minimal metrics out of the box (https://airflow.apache.org/metrics.html). Engineers often ran into various issues with their DAGs, but we weren’t able to conclusively diagnose the issues because we didn’t have extensive monitoring and metrics. This resulted in teams spending more time investigating because we weren’t able to quickly identify if the issue was with the application code or Airflow itself.
With this improvement, we wanted to easily answer questions like:
- Is the Airflow scheduler running and scheduling DAGs & task instances? Even though Airflow includes a scheduler_heartbeat metric, we saw multiple instances where the scheduler would send a heartbeat, but not schedule any tasks or DAGs.
- What is the scheduling delay for DAGs & tasks?
- Which are the slowest DAGs and tasks?
- Are there tasks stuck in the “QUEUED” state?
We also wanted to come up with Service Level Agreements (SLAs) around Airflow itself so that we could have a clear boundary between infrastructure and application issues. The SLAs we came up with are mentioned in the next section.
We use Prometheus for monitoring and alerting within Robinhood, so we wrote a Prometheus exporter for Airflow which dumps a bunch of metrics. The library is available here.
These metrics have helped us quickly understand the state of the Airflow cluster and identify issues.
We’ve also used these metrics to track the performance of some of our most critical Airflow workflows:
Improving Performance & Reliability
While doing research about how other companies manage their Airflow clusters at scale, we came across Running Apache Airflow at Lyft, which gave us a lot of good ideas about how we could improve the performance & reliability of our Airflow cluster.
The following have been the most useful for us:
- Adding a canary DAG: We added a simple DAG that runs every 10 minutes. If this DAG fails or doesn’t run because it wasn’t scheduled, an oncall engineer gets alerted. This usually happens when the Airflow scheduler gets into a bad state.
- Restarting the Airflow scheduler: Based on the metrics we added earlier, we noticed that we would often have tasks stuck in the “QUEUED” state. They would never run, causing many DAGs to miss their SLA. We investigated this, but couldn’t identify the root cause. However, we noticed that restarting the Airflow scheduler would generally fix this, so we added a cron (which was a slightly hacky approach, but quick to implement) to restart the Airflow scheduler frequently. We also found 7 Common Errors to Check when Debugging Airflow DAGs from Astronomer, which mentioned that Airflow scheduler performance degrades over time and that restarts are required for optimal performance.
- Version skews: In our current Airflow setup, the cluster is shared between many different workers, which run different microservices with different deployment schedules. This led to scenarios where a small subset of workers would get deployed and the DAG files would potentially change on these workers, but the DAG files on the webserver would be stale. Another scenario we ran into was when only the webserver gets deployed and has a newer DAG definition that hasn’t been deployed to the workers. We have mitigated this problem by deploying all the workers and the webserver atomically via our CI/CD pipeline.
As part of improving and tracking our reliability, we came up with the following SLAs for Airflow:
- The scheduler will be down for 10 minutes at most. We decide if the scheduler is available by looking at the Canary DAG metrics.
- Tasks will be stuck in the queued state for at most 15 minutes.
Better understanding of Airflow internals
A goal we had when we started working on this project was to get a better understanding of Airflow internals. This understanding has been useful in dealing with any issues that come up and we no longer have to treat the system as a blackbox.
We also found a couple of bugs and submitted pull requests to Airflow’s source code as a result of this process:
- AIRFLOW-4510: Timezone set incorrectly if multiple DAGs defined in the same file
- AIRFLOW-4452: Webserver and Scheduler keep crashing because of Slack client update
We hope this visibility into how we upgraded our Airflow cluster in production, as well as how we operate our Airflow clusters at scale, can serve as a useful reference to others looking into migrating and running Airflow clusters in production. To recap, some of the lessons we learned from this project were:
- Test infrastructure changes in a safe environment as much as possible.
- Monitoring and alerting for production systems is critical.
- Understand the internals of important systems and don’t treat them as a black box.
Huge thanks to the Data Platform team at Airbnb for taking the time to speak with us about how to manage Airflow at scale. We learned a lot from that conversation and it helped drive some of the improvements we made internally!