By Joe Francis and Matteo Merli, Yahoo Platforms
Pub-sub messaging is a very common design pattern that is increasingly found in distributed systems powering Internet applications. These applications provide real-time services, and need publish-latencies of 5ms on average and no more than 15ms at the 99th percentile. At Internet scale, these applications require a messaging system with ordering, strong durability, and delivery guarantees. In order to handle the “five 9’s” durability requirements of a production environment, the messages have to be committed on multiple disks or nodes.
At the time we started, we could not find any existing open-source messaging solution that could provide the scale, performance, and features Yahoo required to provide messaging as a hosted service, supporting a million topics. So we set out to build Pulsar as a general messaging solution, that also addresses these specific requirements.
Pulsar is a highly scalable, low latency pub-sub messaging system running on commodity hardware. It provides simple pub-sub messaging semantics over topics, guaranteed at-least-once delivery of messages, automatic cursor management for subscribers, and cross-datacenter replication.
Using Pulsar, one can set up a centrally-managed cluster to provide pub-sub messaging as a service; applications can be onboarded as tenants. Pulsar is horizontally scalable; the number of topics, messages processed, throughput, and storage capacity can be expanded by adding servers to the pool.
Pulsar has a robust set of APIs to manage the service, namely, account management activities like provisioning users, allocating capacity, accounting usage, and monitoring the service. Tenants can administer, manage, and monitor their own domains via APIs. Pulsar also provides security via a pluggable authentication scheme, and access control features that let tenants manage access to their data.
Application development using Pulsar is easy due to the simple messaging model and API. Pulsar includes a client library that encapsulates the messaging protocol; complex functions like service discovery, as well as connection establishment and recovery, are handled internally by the library.
At a high level, a Pulsar instance is composed of multiple clusters, typically residing in different geographical regions. A Pulsar cluster is composed of a set of Brokers and BookKeepers (bookies), plus ZooKeeper ensembles for coordination and configuration management.
A Pulsar broker serves topics. Each topic is assigned to a broker, and a broker serves thousands of topics. The broker accepts messages from writers, commits them to a durable store, and dispatches them to readers. The broker also serves admin requests. It has no durable state. The broker has built-in optimizations; for example, it caches the data in order to avoid additional disk reads when dispatching messages to clients as well as replication clusters. Pulsar brokers also manage the replicators, which asynchronously push messages published in the local cluster to remote clusters.
Apache BookKeeper is the building block for Pulsar’s durable storage. BookKeeper is a distributed write-ahead log system, a top-level Apache project that was originally developed at and open-sourced by Yahoo in 2011. BookKeeper has an active developer community with contributors across the industry. Using the BookKeeper built-in semantics, Pulsar creates multiple independent logs, called ledgers, and uses them for durable message storage. Bookkeeper hosts, called bookies, are designed to handle thousands of ledgers with concurrent reads and writes. BookKeeper is horizontally scalable in capacity and throughput; from an operational perspective we can elastically add more bookies to a Pulsar cluster to increase capacity.
By using separate physical disks (one for journal and another for general storage), bookies are able to isolate the effects of read operations from impacting the latency of ongoing write operations, and vice-versa. Since read and write paths are decoupled, spikes in reads – which commonly occur when readers drain backlog to catch up – do not impact publish latencies in Pulsar. This sets Pulsar apart from other commonly-used messaging systems.
Managed Ledger represents the storage layer for a single topic. It is the abstraction of a stream of messages, with a single writer, and multiple readers, each with its own associated cursor position, the offset of the reader in the message stream. A single managed ledger uses multiple BookKeeper ledgers to store the data. Cursor positions are maintained in per-cursor ledgers.
A Pulsar cluster runs a ZooKeeper (another top-level Apache project open-sourced by Yahoo in 2008) ensemble used for coordinating assignment of topics among brokers, and storing BookKeeper metadata. In addition, Pulsar runs a Global ZooKeeper ensemble to store the provisioning and configuration data. At Yahoo, we have presence in multiple regions and our users create global topics that are replicated between these regions. The Global Zookeeper ensemble keeps provisioning and configuration data consistent globally. We can tolerate higher write latencies on these writes (e.g.: ~150ms latency for configuration writes).
The load balancer is a distributed service that runs on the brokers, to make sure the traffic is equally spread across all available brokers. Since Pulsar brokers have no durable state, topics can be redistributed within seconds.
The Pulsar topic is the core of the system; applications and components communicate by publishing to and consuming from the same topic. Topics are created dynamically as needed when a producer (writer) starts publishing on it; and topics are removed when not in use.
Subscriptions are created automatically when a consumer (reader) subscribes to the topic. A subscription persists until it is deleted, and receives all messages published during its lifetime. Common messaging semantics (like JMS Topic or Queue) are available as subscription modes; an exclusive subscription is equivalent to a “topic,” and a shared subscription is equivalent to a “queue.”
Pulsar is designed for low-publish latencies at scale. Our typical publish latencies on average are well below 5ms. With SSD as the bookie journal device, Pulsar can achieve 99 percentile latencies of 5ms with two guaranteed copies and total ordering.
The latency remains within the acceptable range until the throughput reaches the limit of the disk IO capacity.
Pulsar supports partitioned topics, which can further increase the per-topic throughput.
Pulsar at Yahoo
Pulsar backs major Yahoo applications like Mail, Finance, Sports, Gemini Ads, and Sherpa, Yahoo’s distributed key-value service.
We deployed our first Pulsar instance in Q2 2015. Pulsar use has rapidly grown since then, and as of today, Yahoo runs Pulsar at scale.
- Deployed globally, in 10+ data-centers, with full mesh replication capability
- Greater than 100 billion messages/day published
- More than 1.4 million topics
- Average publish latency across the service of less than 5 ms
As Pulsar use grows at Yahoo, we have been scaling the service horizontally. Most of the challenges we faced were with JVM GC impacting publish latencies, and reducing failover times when the number of topics on a broker went up to tens of 1000s (now 40,000). This led to significant changes to the Pulsar broker and to BookKeeper.
Looking to the Future
We are actively engaged in pushing the scale and reliability boundaries of Pulsar further. Current improvements being worked on include:
- Migrate topic between brokers in under 1 sec, from 10 sec
- Improve 99.9%ile publish latencies to 5ms
- Provide additional language bindings for Pulsar
Pulsar is a highly scalable pub-sub messaging system, production-ready and battled tested at Yahoo. We are glad to make Pulsar available as open source under Apache License Version 2.0. Detailed instructions and documentation are available at Yahoo’s Github repository. Our goal is to make Pulsar widely used and well integrated with other large-scale open source software, and we welcome contributions from the community to make that happen.