Capturing Data Evolution in a Service-Oriented Architecture


Building Airbnb’s Change Data Capture system (SpinalTap), to enable propagating & reacting to data mutations in real time.

Labor day weekend is just around the corner! Karim is amped for a well deserved vacation. He logs in to Airbnb to start planning a trip to San Francisco, and stumble upon a great listing hosted by Dany. He books it.

A moment later, Dany receives a notification that his home has been booked. He checks his listing calendar and sure enough, those dates are reserved. He also notices the recommended daily price has increased for that time period. “Hmm, must be a lot of folks looking to visit the city over that time” he mumbles. Dany marks his listing as available for the rest of that week…

All the way on the east coast, Sara is sipping tea in her cozy Chelsea apartment in New York, preparing for a business trip to her company’s HQ in San Francisco. She’s been out of luck for a while and about to take a break, when Dany’s listing pops up on her search map. She checks out the details and it looks great! She starts writing a message: “Dear Dany, I’m traveling to San Francisco and your place looks perfect for my stay…”

Adapting to data evolution has presented itself as a recurrent need for many emerging applications at Airbnb over the last few years. The above scenario depicts examples of that, where dynamic pricing, availability, and reservation workflows need to react to changes from different components in our system in near real-time. From an infrastructure perspective, designing our architecture to scale is a necessity as we continually grow both in terms of data and number of services. Yet, as part of striving towards a service-oriented architecture, an efficient manner of propagating meaningful data model mutations between microservices while maintaining a decoupled architecture that preserved data ownership boundaries was just as important.

In response, we created SpinalTap; a scalable, performant, reliable, lossless Change Data Capture service capable of detecting data mutations with low latency across different data source types, and propagating them as standardized events to consumers downstream. SpinalTap has become an integral component in Airbnb’s infrastructure and derived data processing platform, on which several critical pipelines rely. In this blog, we will present an overview of the system architecture, use cases, guarantees, and how it was designed to scale.

Background

Change Data Capture (CDC) is a design pattern that enables capturing changes to data and notifying actors so they can react accordingly. This follows a publish-subscribe model where change to a data set is the topic of interest.

Requirements

Certain high-level requirements for the system were desirable to accommodate for our use cases:

  • Lossless: Zero tolerance to data loss, a requirement for on-boarding critical applications such as a stream-based accounting audit pipeline
  • Scalable: Horizontally scalable with increased load and data cluster size, to avoid recurrent re-design of the system with incremental growth
  • Performant: Changes are propagated to subscribed consumers in near real-time (sub-second)
  • Consistent: Ordering and timeline consistency are enforced to retain sequence of changes for a specific data record
  • Fault Tolerant: Highly available with a configurable degree of redundancy to be resilient to failure
  • Extensible: A generic framework that can accommodate for different data source and sink types

Solutions Considered

There are several solutions promoted in literature for building a CDC system, the most referenced of which are:

  • Polling: A time-driven strategy can be used to periodically check whether any changes have been committed to the records of a data store, by keeping track of a status attribute (such as last updated or version)
  • Triggers: For storage engines that support database triggers (ex: MySQL), stored procedures triggered on row-based operations, those can be employed to propagate changes to other data tables in a seamless manner
  • Dual Writes: Data changes can be communicated to subscribed consumers in the application layer during the request, such as by emitting an event or scheduling an RPC after write commit
  • Audit Trail: Most data storage solutions maintain a transaction log (or changelog) to record and track changes committed to the database. This is commonly used for replication between cluster nodes, and recovery operations (such as unexpected server shutdown or failover).

There are several desirable features of employing the database changelog for detecting changes: reading from the logs allows for an asynchronous non-intrusive approach to capturing changes, as compared to triggers and polling strategies. It also supports strong consistency and ordering guarantees on commit time, and retains transaction boundary information, both of which are not achievable with dual writes. This allows to replay events from a certain point-in-time. With this in mind, SpinalTap was designed based on this approach.

Architecture

High level workflow overview

At a high-level, SpinalTap was designed to be a general purpose solution that abstracts the change capture workflow, enough to be easily adaptable with different infrastructure dependencies (data stores, event bus, consumer services). The architecture is comprised of 3 main components that aid in providing sufficient abstraction to achieve these qualities:

Source

Left: Source component diagram; Right: Source event workflow

The source represents the origin of the change event stream from a specific data store. The source abstraction can be easily extended with different data source types, as long as there is an accessible changelog to stream events from. Events parsed from the changelog are filtered, processed, and transformed to corresponding mutations. A mutation is an application layer construct that represents a single change (insert, update, or delete) to a data entity. It includes the entity values before & after the change, a globally unique identifier, transaction information, and metadata derived from the originating source event. The source is also responsible for detecting data schema evolution, and propagating the schema information accordingly with the corresponding mutations. This is important to ensure consistency when deserializing the entity values on the client side or replaying events from an earlier state.

Destination

Left: Destination component diagram; Right: Destination event workflow

The destination represents a sink for mutations, after being processed and converted to standardized event. The destination also keeps track of the last successfully published mutation, which is employed to derive the source state position to checkpoint on. The component abstracts away the transport medium and format used. At Airbnb, we employ Apache Kafka as event bus, given its wide usage within our infrastructure. Apache Thrift is used as the data format to offer a standardized mutation schema definition and cross-language support (Ruby & Java).
 
A major performance bottleneck identified through benchmarking on the system was mutation publishing. The situation was aggravated given our system settings were chosen to favor strong consistency over latency. To relieve the situation, we incorporated a few optimizations:

Buffered Destination: To avoid the source being blocked while waiting for mutations to be published, we employ an in-memory bounded queue to buffer events emitted from the source (consumer-producer pattern). The source would add events to the buffer while the destination is publishing the mutations. Once available, the destination would drain the buffer and process the next batch of mutations.

Destination Pool: For sources that display erratic spiky behavior in incoming event rate, the in-memory buffer gets saturated occasionally causing intermittent degradation in performance. To relieve the system from irregular load patterns, we employed application-level partitioning of the source events to a configurable set of buffered destinations managed by a thread pool. Events are multiplexed to thread destinations while retaining the ordering schema. This enabled us to achieve high throughput while not compromising latency or consistency.

Pipe

Left: Pipe component diagram; Right: Pipe manager coordinating the pipe lifecycles

The pipe coordinates the workflow between a given source and destination. It represents the basic unit of parallelism. It’s also responsible for periodically checkpointing source state, and managing the lifecycle of event streaming. In case of erroneous behavior, the pipe performs graceful shutdown and initiates the failure recovery process. A keep-alive mechanism is employed to ensure source streaming is restarted in event of failure, according to last state checkpoint. This allows to auto-remediate from intermittent failures while maintaining data integrity. The pipe manager is responsible for creating, updating, and removing pipes, as well as the pipe lifecycle (start/stop), on a given cluster node. It also ensures any changes to pipe configuration are propagated accordingly in run-time.

Cluster Management

Left: Cluster resource management; Middle: Node failure recovery; Right: Isolation with instance tagging

To achieve certain desirable architectural aspects — such as scalability, fault-tolerance, and isolation — we adopted a cluster management framework (Apache Helix) to coordinate distribution of stream processing across compute resources. This helped us achieve deterministic load balancing, and horizontal scaling with automatic redistribution of source processors across the cluster.
 
To promote high availability with configurable fault tolerance, each source is appointed a certain subset of cluster nodes to process event streaming. We use a Leader-Standby state model, where only one node streams events from a source at any given point, while the remaining nodes in the sub cluster are on standby. If the leader is down, then one of the standby nodes will assume leadership.

To support isolation between source type processing, each node in the cluster is tagged with the source type(s) that can be delegated to it. Stream processing is distributed across cluster nodes while maintaining this isolation criteria.

For resolving inconsistencies from network partition, in particular the case where more than one node assume leadership over streaming from a specific source (split brain), we maintain a global leader epoch per source that is atomically incremented on leader transition. The leader epoch is propagated with each mutation and inconsistencies are consequently mitigated with client-side filtering, by disregarding events that have a smaller epoch than the latest observed.

Guarantees

Certain guarantees were essential for the system to uphold, to accommodate for all downstream uses cases.

Data Integrity: The system maintains an at-least-once delivery guarantee, where any change to the underlying data store is eventually propagated to clients. This dictates that no event present in the changelog is permanently lost, and is delivered within the time window specified by our SLA. We also ensure there is no data corruption incurred, and mutation content maintains parity that of the source event .

Event Ordering: Ordering is enforced according to the defined partitioning scheme. We maintain ordering per data record (row), i.e. all changes to a specific row in a given database table will be received in commit order.

Timeline Consistency: Being consistent across a timeline demands that changes are received chronologically within a given time frame, i.e. two sequences of a given mutation set are not sent interleaved. A split brain scenario can potentially compromise this guarantee, but is mitigated with epoch fencing as explained earlier.

Validation

The SpinalTap validation framework

Justifying there is no breach in SpinalTap’s guarantees by virtue of design was not sufficient, and we wanted a more pragmatic data-driven approach to validate our assumptions. To address this, we developed a continuous online end-to-end validation pipeline, responsible for validating the mutations received on the consumer side against the source of truth, and asserting no erroneous behavior is detected in both pre-production and production environments.

To achieve a reliable validation workflow, consumed mutations are partitioned and stored on local disk, with the same partitioning scheme applied to source events. Once all mutations corresponding to events of a partition are received, the partition file is validated against with the originating source partition through a list of tests that asserted the guarantees described earlier. For MySQL specifically, the binlog file was considered a clean partition boundary. 
 
We set up offline integration testing in a sandbox environment to prevent any regression from being deployed to production. The validator is also employed online in production by consuming live events for each source stream. This aids as a safeguard to detect any breaches that are not caught within our testing pipeline, and automatically remediate by rolling back source state to a previous checkpoint. This enforces that streaming does not proceed until any issues are resolved, and eventually guarantee consistency and data integrity.

Model Mutations

Left: Sync vs Async app workflow; Right: Propagating model mutations to downstream consumers

A shortcomings of consumer services tapping directly into SpinalTap events for a given service’s database is that the data schema is leaked, creating unnecessary coupling. Furthermore, domain logic for processing data mutations encapsulated in the owning service needs to be replicated to consumer services as well.
 
To mitigate the situation, we built a model streaming library on top of SpinalTap, which allowed services to listen to events from a service’s data store, transform them to domain model mutations, and re-inject them in the message bus. This effectively allowed data model mutations to become part of the service’s interface, and segregation of the request/response cycle from asynchronous data ingestion and event propagation. It also helped decouple domain dependencies, facilitate event-driven communication, and provide performance & fault tolerance improvements to services by isolating synchronous & asynchronous application workflows.

Use Cases

SpinalTap is employed for numerous use cases within our infrastructure, the most prominent of which are:

Cache Invalidation: A common application for CDC systems is cache invalidation, where changes to the backing data store are detected by a cache invalidator service or process that consequently evicts (or updates) the corresponding cache entries. Preferring an asynchronous approach allowed us to decouple our caching mechanism from the request path, and application code that serves production traffic. This pattern is widely used amongst services to maintain consistency between the source of truth data stores and our distributed cache clusters (e.g. Memcached, Redis).

Search Indexing: There are multiple search products at Airbnb that use real-time indexing (e.g. review search, inbox search, support ticket search). SpinalTap proved to be a good fit for building the indexing pipeline from data stores to the search backends (e.g. ElasticSearch), particularly due to its in-order and at least once delivery semantics. Services can easily consume events for the corresponding topics and convert the mutations to update the indices, which helps ensure search freshness with low latency.

Offline Processing: SpinalTap is also employed to export the online datastores to our offline big data processing systems (e.g. Hive, Airstream) in a streaming manner, which requires high throughput, low latency, and proper scalability. The system was also used historically for our database snapshot pipeline, to continuously construct backups of our online database and store them in HBase. This dramatically reduced the time to land our daily backups, and allowed for taking snapshots at a finer time granularity (ex: hourly).

Signaling: Another recurrent use cases for propagating data changes in a distributed architecture is as a signaling mechanism, where depending services can subscribe and react to data changes from another service in near real time. For example, the Availability service would block a listing’s dates by subscribing to changes from the Reservation service to be notified when a booking was made. Risk, security, payments, search, and pricing workflows are a few examples of where this pattern is employed within our ecosystem.

Conclusion

SpinalTap has become an integral part of our infrastructure over the last few years, and a system fueling many of our core workflows. It can be particularly useful for platforms looking for a reliable general purpose framework that can be easily integrated with your infrastructure. At Airbnb, SpinalTap is used to propagate data mutations from MySQL, DynamoDB, and our in-house storage solution. Kafka is currently the event bus of choice, but the system’s extensibility has allowed us to consider other mediums as well (ex: Kinesis).
 
Lastly, we have open-sourced several of our library components, and are in the process of reviewing the remaining modules for general release as well. Contributions are more than welcome!

Yours Truly,

The SpinalTap Team (Jad Abi-Samra, Litao Deng, Zuofei Wang)



Source link