Running Apache Airflow At Lyft – Lyft Engineering


By Tao Feng, Andrew Stahlman, and Junda Yang

ETL is a process to extract data from various raw events, transform them for analysis and load the derived data into a queryable data store. Data engineers and scientists at Lyft build various ETL pipelines which run at a different set schedule to gain insight on topics ranging from the current ridesharing market to the experiences for driver/passenger, etc. A reliable, efficient, and trustworthy workflow management system is crucial to make sure these pipelines run successfully and deliver the data on its set schedule.

Apache Airflow is a workflow orchestration management system which allows users to programmatically author, schedule, and monitor data pipelines. Lyft is the very first Airflow adopter in production since the project was open sourced around three years ago. Today, Airflow has become one of the most important pieces of infrastructure at Lyft which serves various use cases: from powering executive dashboards to metrics aggregation, to derived data generation, to machine learning feature computation, etc.

In this post, we will share our experiences on how we run Airflow at Lyft.

Airflow under the hood

For context around the terms used in this blog post, here are a few key concepts for Airflow:

  • DAG (Directed Acyclic Graph): a workflow which glues all the tasks with inter-dependencies.
  • Operator: a template for a specific type of work to be executed. For example, BashOperator represents how to execute a bash script while PythonOperator represents how to execute a python function, etc.
  • Sensor: a type of special operator which will only execute if a certain condition is met.
  • Task: a parameterized instance of an operator/sensor which represents a unit of actual work to be executed.
  • Plugin: an extension to allow users to easily extend Airflow with various custom hooks, operators, sensors, macros, and web views.
  • Pools: concurrency limit configuration for a set of Airflow tasks.

For other Airflow terminologies, please check out Airflow documentation for more details.

Airflow Architecture @Lyft

The graph shows the Airflow architecture at Lyft:

As illustrated in the above graph, there are four main architecture components:

  • WebUI: the portal for users to view the related status of the DAGs.
  • Metadata DB: the metastore of Airflow for storing various metadata including job status, task instance status, etc.
  • Scheduler: a multi-process which parses the DAG bag, creates a DAG object and triggers executor to execute those dependency met tasks.
  • Executor: A message queuing process that orchestrates worker processes to execute tasks. There are quite a few executors supported by Airflow. For example, the Kubernetes(k8s) operator and executor are added to Airflow 1.10 which provides native Kubernetes execution support for Airflow. At Lyft, we leverage CeleryExecutor to scale out Airflow task execution with different celery workers in production.

Here we show how to deploy Airflow in production at Lyft:

Configuration: Apache Airflow 1.8.2 with cherry-picks, and numerous in-house Lyft customized patches.

Scale: Three sets of Amazon auto scaling group (ASG) for celery workers, each of which is associated with one celery queue:

  • ASG #1: 15 worker nodes each of which is the r5.4xlarge type. This fleet of workers is for processing low-priority memory intensive tasks.
  • ASG #2: 3 worker nodes each of which is the m4.4xlarge type. This fleet of workers is dedicated for those DAGs with a strict SLA.
  • ASG #3: 1 worker node which is the m4.10xlarge type. The single node is used to process the compute-intensive workloads from a critical team’s DAGs.

Numbers of DAGs / Tasks: 500 DAGs, 800 DagRuns, 2300+ TaskInstances running on Airflow platform at Lyft daily.

Airflow Monitoring And Alerting @Lyft

There are nearly five hundred DAGs running daily on Airflow. It is crucial to maintain the SLA and uptime for Airflow. At Lyft, we leverage various technologies including Datadog, Statsd, Grafana, and PagerDuty to monitor the Airflow system.

the overall system health dashboard for Airflow

Previously, we had a production issue which caused Airflow not to schedule any task for an hour at Lyft. We didn’t have a good monitoring system to understand whether Airflow schedules tasks or not at that time. Hence we built the Airflow “canary” monitoring system which aims to treat Airflow as a black-box and verify that it schedules and executes tasks in a reasonable amount of time. If Airflow doesn’t schedule task within a threshold (10 minutes), the oncall will immediately get a page notification for the issue.

We monitor Airflow overall system health in three aspects:

Airflow scheduler and worker availability health check

We use Airflow “canary” monitoring DAG in production which does:

  • A connection check with a simple SQL query (e.g.” SELECT 1”) for all the critical data sources including redshift and Postgres, etc.
  • A celery queue check by scheduling a dummy task to every queue.

The “canary” DAG helps the oncall to answer the following questions:

  • how long it takes for the Airflow scheduler to schedule the task (scheduled execution_time — current_time).
  • how long it takes for celery worker to pick up the task.
  • how long the task runs.

Airflow UI / Web server availability

We monitor the Airflow web server health check endpoint and trigger a page notification if the numbers of healthy hosts are less than certain thresholds.

Airflow Uptime for 7 days, 30 days, and 90 days

The uptime is measured by 100% – %downtime. Airflow is down when either Airflow scheduler, workers, or the web server are down.

various metrics for the system

Other important metrics for monitoring:

  • Schedule delay: this metric as mentioned above gives us insight not only on the latency of the Airflow scheduler but also the end-to-end system overall availability.
  • The numbers of tasks that are in queued state vs running state for every celery queue.
  • The numbers of occupied slots for every celery worker. This stat shows whether the worker is fully occupied and needs to scale out.
  • The numbers of DAGs and how long the Airflow scheduler takes to parse DAG.

Airflow Customization @Lyft

UI Auditing

The multi-tenant isolation of the UI in Airflow 1.8.2 has documented limitations. It is hard for us to answer users’ questions like: “who paused my DAG? ”, “who marked the task for my DAG failed / success?” and “who changed the state of my DagRun?”, etc.

We leverage the existing Airflow log model and Flask signal to implement an audit log for actions taken via the Airflow UI. It will send a UI signal which triggers a callback to log the related information (who did the action, what was the action) whenever a Flask UI endpoint is accessed.

The above figure shows someone (removed from the picture) turning off a DAG named “fact_user_messages”. This feature helps us to answer these kinds of questions easily.

Extra link for task instance UI panel

We would like to customize the task instance model view UI panel based on its operator at Lyft. For example, we would like to display a Qubole (3rd party hive computation platform) query link for the QuboleOperator, show the Kibana logs for all the operators, link to the internal data portal table detail page for the HiveOperator, etc.

The above graph shows that we provide a Kibana log link for the Airflow task instance in the UI panel. The feature is very generic and allowed developers to customize and associate various links with the operators. We have contributed and submitted the feature back to upstream.

DAG dependency graph

The Graph View tab in the Airflow UI is great for visualizing dependencies within a DAG. However, some DAGs at Lyft have dependencies on other DAGs, and the Airflow UI doesn’t provide a good visualization of these inter-DAG dependencies. Our intern, Tirath, built a tool that visualizes the DAG lineage/dependency and allows a user to filter the upstream/downstream dependency for a given specific DAG.

It looks for inter-DAG dependencies expressed via ExternalTaskSensors to compute the dependency graph and leverages Airflow web view plugin for display. In the future, we will enhance the tool to show the DAG state in DAG dependency graph in real time which helps data engineers to understand any ongoing issues related to their DAG.

Improving Airflow Performance And Reliability @Lyft

Reduce Airflow UI page load time

One pain point we have with Airflow at Lyft is that it takes a very long time to load the UI for certain DAGs. We have hundreds of DAGs running in production, some of which have hundreds of tasks. The default Airflow UI loads the DAG tree view with past 25 DagRuns for all the tasks’ information. This quite often triggers UI timeout which prevents the users from seeing their DAG’s execution details.

Although we could certainly rewrite the UI for better performance, we found that we could simply reduce the numbers of DAG run for display from 25 in default to 5. This helps to significantly reduce the page load time. We contributed the change back to upstream which allows the user to modify default_dag_run_display_number for the number of DagRuns for display in the Airflow configuration file(airflow.cfg).

Execute with higher parallelism for the DAG

Airflow provides various configurables to tune the DAG performance. At Lyft, we suggest users tune the following variables:

  1. Parallelism: This variable controls the number of task instances that the Airflow worker can run simultaneously. Users could increase the parallelism variable in the Airflow.cfg. We normally suggest users increase this value when doing backfill.
  2. Concurrency: The Airflow scheduler will run no more than concurrency task instances for your DAG at any given time. Concurrency is defined in your Airflow DAG as a DAG input argument. If you do not set the concurrency on your DAG, the scheduler will use the default value from the dag_concurrency entry in your Airflow.cfg.
  3. max_active_runs: Airflow will run no more than max_active_runs DagRuns of your DAG at a given time. If you do not set the max_active_runs on your DAG, Airflow will use the default value from the max_active_runs_per_dag entry in your Airflow.cfg. We suggest users not to set depends_on_past to true and increase this configuration during backfill.
  4. Pool: Airflow pool is used to limit the execution parallelism. Users could increase the priority_weight for the task if it is a critical one.

Reduce Airflow scheduling latency

Airflow scheduling latency could be measured by the delay between the time where the dependencies of a task are met and when the task actually starts. At Lyft, we tune the following configs to reduce the latency:

  • max_threads: Scheduler will spawn multiple threads in parallel to schedule DAGs. This is controlled by max_threads with the default value of 2. The user should increase this value to a larger value (e.g. the number of CPUs where scheduler runs-1) in production.
  • scheduler_heartbeat_sec: User should consider increasing scheduler_heartbeat_sec config to a higher value (e.g. 60 secs) which controls how frequently the Airflow scheduler gets the heartbeat and updates the job’s entry in the Airflow metastore.

Developers from the community have contributed couples of fixes in Airflow upstream to further reduce the latency.

Improve Airflow reliability

We spend considerable efforts to improve Airflow’s reliability. Here are a few things worth mentioning:

  • Source Control For Pools: We maintain the Airflow pool configuration in source control and review each team’s pool request with their estimations on the max task slots. The updated pool configuration is applied in runtime for Airflow.
  • Integration Test For DAG: We have integration tests running in Continuous Integration phase, which do checks to ensure Airflow best practices including a sanity check on all the DAG definitions; a start_date parameter check to guarantee all DAGs have a fixed start_date; a pool check to ensure there is no unused pool and a check to ensure pool specified in any DAG actually exists, etc.
  • Secure UI access: We disable write access on a couple of important UI ModelViews (e.g, PoolModelView, VariableView, DagRunModelView) on Airflow. This is to avoid users accidentally modifying Pool, Variable, and DagRun tables in the Airflow metastore from the UI.

Conclusion

In this post, we shared how we operate Airflow at Lyft. We described:

  1. How the Airflow architecture looks like in general
  2. How we monitor Airflow to maintain high SLA
  3. How we customize Airflow for Lyft use cases
  4. How we improve Airflow performance and reliability in production

In the future, we are going to focus on improving Airflow security for the multi-tenant environment. We also plan to have a second blog post to share how we build various toolkits for Airflow to boost internal ETL developer productivity at Lyft.

Acknowledgments

Special thanks to all the team members on the Lyft Data Platform team, especially Maxime Beauchemin, Tao Feng, Andrew Stahlman, Junda Yang, Max Payton, Chao-Han Tsai, Alagappan Sethuraman, and Jinhyuk Chang for their contributions on maintaining Airflow’s SLA at Lyft and Shenghu Yang, Yuko Yamazaki, Prashant Kommireddi, and Guy Bayes for their guidance.

Thanks to Maxime Beauchemin and Mark Grover for the review.



Source link