Reliably Upgrading Apache Airflow at Slack’s Scale


Apache Airflow is a tool for describing, executing, and monitoring workflows. At Slack, we use Airflow to orchestrate and manage our data warehouse workflows, which includes product and business metrics and also is used for different engineering use-cases (e.g. search and offline indexing). For two years we’ve been running Airflow 1.8, and it was time for us to catch up with the recent releases and upgrade to Airflow 1.10; in this post we’ll describe the problems we encountered and the solutions we put in place.

As of September 2019, Slack has over 12 million daily active users performing 5 billion actions on average every week. This results in over 700 billion records loaded daily into our S3 Data Warehouse. This data is then processed by hundreds of workflows running Apache Hive, Apache Spark, and Presto on Airflow that submit jobs to our clusters running on thousands of EC2 instances.

  1. Reliability: The Airflow scheduler and webserver should run without issues after the upgrade. We are running hundreds of workflows that are managing the state of thousands of tasks and all these should be scheduled and executed successfully.
  2. Fast rollback: Besides all the bug fixes and improvements that this new version brings, it also involves a backwards incompatible schema upgrade on Airflow’s metadata database. If things go wrong, we want to be able to roll back to the previous Airflow version and schema quickly.
  3. Minimized downtime: We want to reduce the time Airflow is down during the upgrade, so we don’t affect our Airflow users and don’t miss SLAs, as folks rely on having data on time.
  4. History preserved: The metadata of previous runs should be preserved, so that we could run backfills and don’t have to update on the DAGs.

We considered a couple strategies for the Airflow upgrade:

  1. Red-Black upgrade: We run old and new versions of Airflow side-by-side, and move a small set of workflows over at a time. This is the more reliable option, but it wasn’t feasible: To do this, we would need each version of Airflow to point to its own metadata database, since sharing the same database can cause the same tasks to be scheduled on both Airflows (resulting in duplicates). Creating two databases for each, and moving DAGs piecewise, will result in losing history.
  2. Big-Bang upgrade: We test as much as possible in dev and move all DAGs to the new version in one big bang! If there are issues, either fix forward or roll back the upgrade. The challenge here was that we didn’t have very good test data and had only a handful of DAGs in dev.

Since the red-black upgrade was not feasible and involved more risk to the data quality, we went ahead with the big-bang upgrade. We added some of our critical DAGs from prod to dev for testing.

Following are the high-level steps for the upgrade. We did these steps in dev first and then in our prod environment:

  1. Launch an instance with Airflow 1.10 installed
  2. Fix incompatibilities with libraries that are installed on top of Airflow
  3. Back up the Airflow metadata DB
  4. Upgrade the schema
  5. Start Airflow services
  6. Test, and either fix forward or roll back

To optimize the upgrade based on the initial requirements, we considered some approaches for Database Backup and Schema Upgrade, which we will delve into next.

We wanted to upgrade the database in a way where we can rollback quickly and also minimize overall downtime. We considered two approaches:

  1. Snapshot: This is a pretty straightforward approach where first we take a snapshot of the master, then upgrade to the new schema and roll back to the snapshot if we see issues. However, this will result in more catchup time and more downtime, because taking a snapshot and restoring it takes a non-trivial amount of time (due to our Airflow metadata DB being quite large).
  2. Use replicas as the new master: At the start of this process, we had one master and one replica (Master, Replica-1). In this approach, we create two more replicas (Replica-2, Replica-3), replicating from Master. When we are ready to upgrade the schema, we cut Replica-2 off from replication and run the upgrade schema script; this replica then becomes the new Master. If the upgrade fails, we roll back immediately to the old Master.

We decided to go with the second approach for DB migration because we can rollback quickly with less downtime (we reduce catch-up time since we don’t have to take snapshots).

Airflow 1.10’s metadata DB schema has undergone many changes since version 1.8. To get from 1.8 to 1.10, the general recommendation is to go to 1.9 first; otherwise we cannot directly use the out-of-the-box command included with Airflow 1.10 to upgrade the DB (). However, we didn’t want to spend the time to do two upgrades, and instead wanted to go directly to 1.10. We also observed that the script, which was written in python using the alembic package, was taking a long time to run, likely due to our production metadata DB having over ten million rows in some tables. To resolve all of this, we wrote our own MySQL schema upgrade script. We noticed that running the same queries from MySQL was much faster compared to the Python scripts included in Airflow. We have contributed this script back to the community (see https://issues.apache.org/jira/browse/AIRFLOW-6094).

We found a few issues during testing and some after the upgrade.

  1. attribute removal

The attribute was removed from the task object in 1.10. This previously told Airflow not to run a task in a scheduled manner, but rather marked it as a task that will only be run manually. We had a few DAGs with tasks. The solution was to consolidate those tasks in a new DAG and to mark that DAG as .

2. The Presto operator stopped working

This is due to a more recent “future” package (0.16.0) being incompatible with Presto hook in Airflow when it is run on Python 2.7. To resolve this, we either need to convert the field in Presto hook to Unicode or move Airflow to Python 3. We went with the former approach since Python 3 migration is a big effort on its own and we don’t want to have too many moving parts in the version upgrade.

3. UI issues that could create data quality issues

In 1.10.3, when we click “Mark Success” on a task, instead of just marking the task as “success”, it marks the task and all its downstream dependencies to “success”. This could result in dependent DAGs kicking off and producing incorrect data, as the user could mark by mistake tasks that they did not intend to. We disabled “Mark Success” and instead use the “Task Instance” admin page to do the same.

  1. HiveServer2Hook stopped allowing queries

This issue has been reported upstream in AIRFLOW-3633 . To fix this, we wrote our own SlackHiveServer2Hook which allows .

2. boto2 vs boto3

There was a change in behavior of the AWS S3 API in boto3, which caused some tasks to fail. This was because started to ignore S3 folder markers($folder$ files). We resolved this by handling this new behavior appropriately in our library code.

3. Timezone issues

Airflow 1.10 became timezone aware. This caused the execution dates in the metadata DB to return results in the time zone that the MySQL server or the host was configured to (which was Pacific time). However, some of our dashboards broke because they were expecting data in UTC. To fix this, we updated the default timezone in Airflow metadata DB to UTC.

After the upgrade was complete, we did a post-mortem. Here are some of the lessons learnt from the upgrade.

Positive takeaways

  • Runbook: We created a runbook for the upgrade with the exact steps and commands that needed to be executed, allowing us to simply copy and paste them. This ensured we didn’t make mistakes during the upgrade.
  • Tech debt: As part of the upgrade, we reduced a large amount of tech debt that had been lingering in our code base.
  • Communication: We sent out clear communication, well in advance, to our stakeholders about the upgrade, the downtime, and what to expect.

Things we can do better

  • Frequent upgrades: Instead of doing upgrades once every two years, we would like to do them more frequently so that we don’t introduce large changes in one shot.
  • More dev DAGs: We had issues caught post-upgrade that didn’t show up in dev because we had only a few dev DAGs. We would like to increase that number so we get adequate coverage during testing.
  • Better monitoring and alerting: We had a number of internal user reports after the upgrade with questions like, “Hey, my DAG is not scheduled” or “Where is my data?”. We want to know about these issues before users report them to us; to solve this problem, we recently created an alerting framework to monitor whether Airflow is scheduling DAGs on time and alert us if it is not.
  • Strategy to restart tasks: Right after the upgrade, we bulk cleared a number of tasks, which overloaded our EMR cluster. For the next upgrade, we’d like to have a strategy to clear critical tasks first, followed by lower priority tasks.
  • Internal code freeze: During the upgrade, we made some unrelated code changes to EMR, which could have waited for a day, and these changes ended up breaking some DAGs. We plan to have an internal code freeze when we do major upgrades and allow only upgrade related changes to be committed.

Despite a few hiccups, the upgrade was overall successful. The issues that we found post-upgrade were fixed forward and most of the critical DAGs caught up quickly. The next step is to move Airflow into Kubernetes so that we can scale tasks horizontally.

This upgrade wouldn’t have been possible without efforts from multiple teams at Slack. Special thanks to Deepak Barge, Ananth Packkildurai, Derek Smith, and Atl Arredondo.

We have a lot more challenging problems in scale, reliability, and security coming up in the Data Infrastructure team. If you’d like to help us, please check out https://slack.com/careers.



Source link