Uber has a complex marketplace consisting of riders, drivers, eaters, restaurants and so on. Operating that marketplace at a global scale requires real-time intelligence and decision making. For instance, identifying delayed Uber Eats orders or abandoned carts helps to enable our community operations team to take corrective action. Having a real-time dashboard of different events such as consumer demand, driver availability, or trips happening in a city is crucial for day-to-day operation, incident triaging, and financial intelligence.
Over the last few years, we’ve built a self-service platform to power such use cases, and many others, across different parts of Uber. The core building block of this platform is Apache Pinot – a distributed, OnLine Analytical Processing (OLAP) system designed for performing low latency analytical queries on terabytes-scale data. In this article, we present details of this platform and how it fits in Uber’s ecosystem. We focus on the evolution of Pinot within Uber and how we scaled from a few use cases to a multi-cluster, all active deployment powering hundreds of use cases for querying terabyte-scale data with millisecond latencies.
Use case overview
The above diagram depicts the typical requirements for real-time analytics use cases. The different use cases within Uber can be bucketed into following broad categories:
- Analytical applications
- Near-real-time exploration
A lot of engineering teams within Uber use Pinot for building customized dashboards for their respective products. One of the examples of this is the Uber Eats Restaurant Manager:This dashboard enables the owner of a restaurant to get insights from Uber Eats orders regarding customer satisfaction, popular menu items, sales, and service quality analysis. Pinot enables slicing and dicing the raw data in different ways and supports low latency queries to build a rich experience for the restaurant owner.
Similarly, our city operational teams have built customized dashboards that leverage Pinot’s power of combining real-time and historical data to obtain metrics around demand-supply, anomalous events (for example, delayed orders in the last five mins), live orders, and so on. This is a crucial tool for our day to day operations and helps in early identification of issues.
Another category of use cases stem from the need to execute analytical queries as part of many backend services. The primary distinguishing requirement for such use cases is data freshness and query latency which needs to be real-time in nature. For instance, identifying geo-hotspots for Uber rider grouping in real-time is crucial to a good user experience. Similarly, identifying rider cancellation or abandoned Uber Eats carts instantly, enables quick corrective action (in the form of messaging / incentives).
Data exploration is typically done on traditional batch and warehousing systems like Hadoop. However, there are many instances where users need to be able to execute complex SQL queries on real-time data. For example, engineers often need to triage incidents by joining various events logged by microservices. In other cases, real-time events may need to be joined with batch data sets sitting in Hive. Within Uber, we provide a rich (Presto) SQL interface on top of Apache Pinot to unlock exploration on the underlying real-time data sets. In addition, this interface seamlessly works with all of our internal business intelligence tools (like Dashbuilder) which is tremendously useful for all our customers. For instance, here’s a simple Sunburst chart showing the breakdown of Uber Eats orders in a five minute period, for a given region, grouped by the job state. This was built using Dashbuilder by running a Presto query running on top of Pinot in seconds.
Today, hundreds of business critical use cases are being powered by Apache Pinot in production. Over the last few years, we’ve gone from operating a small ten node cluster to hundreds of nodes per region. The total data footprint managed by Pinot has increased from tens of GBs in the early days to tens of TBs today. Similarly the per region QPS has increased by a factor of 30 (thousands of QPS in production today).
In the following sections we describe the details of our platform, discuss the unique contributions Uber has made to Apache Pinot and elaborate on the lessons learned while operating this platform at scale.
Pinot Platform at Uber
In order to serve such use cases, we’ve built a self service platform around Apache Pinot as depicted in the diagram below
The different components of this architecture can be categorized into 3 phases:
This is also known as the data preparation phase and is responsible for making the data available for Pinot to consume. In general, Pinot can ingest data from both streaming sources such as Apache Kafka as well as batch/offline data sources such as Apache Hadoop (see here for Pinot documentation). Within Uber, we’ve added more features as described below:
- Realtime sources: Thousands of microservices are continuously writing their logs and events to topics in Apache Kafka (a pub-sub system). Many of these topics can directly be consumed by Pinot to make the data available for querying in a real-time manner. For instance, user demand metrics across different dimensions (like time, location, or product line) can be easily computed from a Pinot table ingesting from the user eyeball Kafka stream.
In some cases, we need to do some additional processing on the input Kafka topic before Pinot can consume the data. For instance: join the input topic with another topic/table, or pre-aggregate some column values.
For such cases, we rely on a stream processing platform of FlinkSQL (previously known as AthenaX from Uber, and later contributed back to Apache Flink community ). It provides a SQL interface used to express rich processing on an input stream (Kafka topic), which is compiled into an Apache Flink job and executed on our YARN cluster. The output of such a FlinkSQL job is another Kafka topic which becomes the data source for Pinot. Here’s an example of a simple FlinkSQL job that filters the incoming records by device OS and a particular city ID.
2. Offline sources: In Uber’s Data ecosystem, most of the important Kafka topics are also ingested into HDFS. This becomes an important source of historical data for Pinot. Pinot provides a powerful feature for combining real-time and offline data sets into the same table, providing one logical view to the user (see more details here). This is useful for various reasons:
- Data correction: In many instances, the real-time data coming from Kafka can be missing or incorrect. The owner of the topic often corrects this at a later point in time, which eventually gets reflected in the offline data set. Pinot can then consume curated offline data sets and overwrite the inconsistent real-time data – improving the overall accuracy of our analytics.
- Bulk data load: In some cases, we need to bootstrap a Pinot table with at least 3 months of data. Doing this directly from Kafka will take too long. Having a way to directly ingest such data sets from offline sources is very convenient.
- Low latency serving of offline data sets: In many cases, engineers and data scientists need to perform real-time analytical queries on the output of their ETL jobs (sitting in some temporary Hive table). Traditional query engines such as Hive and Presto are quite powerful, but cannot typically achieve low latency (sub-second granularity). This has people turning to Pinot to import these data sets.
- Complex processing: In a lot of machine learning related use cases, we need to run complex algorithms for computing models which is difficult to express in FlinkSQL. In such cases, engineers and data scientists can write a Spark job for computing these models, and then have this data ingested in Pinot for online serving.
Similar to the real-time source, an offline data source can be ingested as-is, or pre-processed before being ingested into Pinot. Within Uber, we rely on another platform called Piper (a workflow scheduling system) for ingesting offline data sets. Like FlinkSQL, a Piper job allows the user to specify a SQL query (in this case, a Hive query) for specifying the processing needed on the raw data. Internally, it runs a Spark job to run this query, create Pinot segments from the output data and import it into Pinot. This is depicted in the diagram below:
Piper allows the user to schedule this job at a given frequency (for example, hourly or daily) which in turn defines the frequency with which offline data sets are imported into Pinot.
Here’s a zoomed-in view of the core Apache Pinot storage engine:
Each Pinot cluster consists of a controller (the brains of the cluster), brokers (query processing nodes) and servers (data nodes). Pinot is designed to be multi-tenant from the ground up, and it enables us to group together a specific combination of brokers and servers into one tenant – a unit of isolation owned by a specific use case. For instance, the figure shows a Pinot cluster with two tenants: Eats and Maps. In this case, Maps tenant has two brokers and two servers. Maps data will be distributed evenly across these 2 servers and query processing will be restricted to the specified brokers, thus isolating it from any of the Eats traffic.
Within Uber, a Pinot table can be configured as:
- Region local: In this case, the Pinot table ingests data from the local Kafka and HDFS instances. In other words, all the incoming data is generated by services belonging to this particular region. This is useful for analytical use cases that have affinity to the region-local data. For instance, metrics regarding live Uber Eats orders or delayed orders only make sense for a particular region. Typically, this configuration is used when data freshness is extremely important.
- Global: This is useful for analytical use cases interested in a global view of the data. In this case, the data is aggregated by Kafka and HDFS respectively. In other words, each aggregate Kafka topic in every region will have all the data from the individual region-local Kafka topics (which in turn is ingested into Pinot). This configuration is used when we want to get insights across all trips globally (for example, from revenue, sales, or incidents). Although this is more complex to set up in terms of aggregate Kafka and HDFS data sets, it automatically makes Pinot active-active. Thus, any one region failure does not affect Pinot’s availability.
In addition to the core Pinot Storage, we also leverage 2 other components.
This is a centralized repository of all the schemas used by Uber. Within Uber, Pinot heavily uses this as the source of truth for all Kafka schemas. We’ve added a custom Pinot decoder for fetching the required Kafka schema during ingestion and generating a corresponding Pinot GenericRow object, which in turn is used for segment generation. Going forward, we plan to use Schema Service for managing Pinot schemas as well.
Pinot has a concept of a segment store for archiving its immutable data segments. For any given real-time or offline Pinot table, once a data segment is sealed (based on certain criteria) it becomes immutable. This segment is then archived in the Segment Store for the purpose of recovery during node or replication failures. The original Pinot architecture relied on the use of a POSIX compliant filesystem (like NFS) mounted onto the Pinot nodes for this purpose. We’ve extended this by adding the ability to use any general purpose storage system (for example, HDFS, Ceph, or S3) as the Segment Store. Please see the section below for more details.
Currently, there are 2 ways to query the Pinot data within Uber.
Pinot Rest Proxy
Apache Pinot has a component known as a broker for issuing RESTful queries. We’ve added a lightweight layer on top of the broker called Pinot Rest Proxy. This is a simple Restlet service that provides a convenient way for applications to query any Pinot table.
As mentioned before, each Pinot table is associated with a tenant which has a unique set of brokers. Any client application must query one of these brokers in order to reach the specified table. This adds some complexity since the client applications need to be aware of the different tenants and brokers thereof. Using this new Restlet service, the client application can reach any one of the rest proxy nodes via some load balancer (in our case, haproxy). Each Pinot rest proxy instance locally caches the Pinot routing information (obtained via Apache Helix). It uses this information to identify the tenant, identify the broker set, and route the client request to one of them in an asynchronous manner.
The locally cached metadata within each Pinot rest proxy instance is useful in a variety of scenarios. A Piper (Spark) job can query the rest proxy to fetch the table and schema information instead of the Pinot controller. This reduces the load on the controller and also isolates it from request spikes.
Pinot rest proxy is currently being heavily used by dashboarding and analytical application use cases.
Recently, we’ve done a lot of work to integrate Presto with Pinot, which allows our users to use standard PrestoSQL to query Pinot. We had initially focused on real-time exploration use cases as well as by some analytical applications. However, after multiple optimizations and production experience over multiple quarters, we are currently onboarding real-time dashboarding and application use-cases as well. Our long-term plan is to replace the Pinot rest proxy with Presto. Please see the section below on Full SQL support for more details.
There are four major contributions by Uber’s Pinot team to increase the overall reliability and query flexibility.
In the initial days of the Pinot platform, onboarding new use cases was a very manual process. One of our Pinot engineers had to sit down with the customer to understand the requirements, come up with a schema, table config, and estimate the capacity required to serve that use case. Naturally, the turnaround time for the customer to start playing around with Pinot could be anywhere from three days to a week. In order to make the platform self serve, we invested in the following areas.
We added the ability to automatically derive a Pinot schema from an input Kafka topic or a Parquet data file created using an Avro schema. At a high level, this utility translates the Avro fields into Pinot column types and automatically selects one of the fields as the timestamp column. In certain cases, it also flattens an Avro record into individual Pinot column types. This automatic conversion applies to more than 80% of our input Kafka or Parquet data sets, saving a lot of manual effort.
We integrated closely with FlinkSQL, enabling customers to treat Pinot as a “data sink.” Customers create a new FlinkSQL job, define a SQL transformation query, define the input and output Kafka topics, and then “push” to Pinot.
In this case, the Pinot schema is inferred from the output Kafka topics. This will automatically create a table in the Pinot staging environment as soon as the FlinkSQL job starts executing.
Similarly, we added the ability to derive the Pinot schema from the output of the “Hive to Avro Converter” Spark job and automatically create the table and schema in the staging Pinot cluster before pushing the data.
Each such table is created in a staging environment with a minimum configuration. This allows users to start issuing queries to Pinot or build a customized BI dashboard within minutes. In the staging phase, the table goes through several rounds of iteration such as schema evolution, adding specialized indices (for example, star-tree, sorted, or inverted) to corresponding columns and validation of the user queries. The memory and disk usage from the staging environment gives a good indication of the production requirements. We mandate each use case to be vetted in staging for at least 24 hours before being promoted to production.
As mentioned before, we’ve integrated Pinot with Presto for enabling standard PrestoSQL queries on these data sets. This combination works great since we combine Pinot’s seconds-level data freshness with Presto’s flexibility in doing complicated queries. In addition, predicate pushdowns and aggregation function pushdowns enable us to achieve sub-second query latencies for such PrestoSQL queries – which is not possible to do on standard backends such as HDFS/Hive. Please find details of this work in our previous article: https://eng.uber.com/engineering-sql-support-on-apache-pinot/
HDFS deep store for Pinot segments
Pinot’s original design for real-time stream ingestion (a.k.a. LLC) requires a locally mounted file system to the Pinot controller for storage of Pinot segments. In cases where a Pinot server has fallen behind its replicas or is being rebuilt after a node failure, it allows the server to download the segments from the central store. The original design solves this capacity issue by mounting a network file system (NFS) to the controllers.
Like many other users of Pinot, Uber does not operate a NFS and thus can not use the original LLC design. To solve this problem, together with LinkedIn engineers we enhanced the split segment completion phase of the LLC protocol so that it can work with a deep storage or external storage service like HDFS or Amazon S3. Today Uber’s Pinot realtime ingestion pipeline operates using HDFS as its deep store with segments from hundreds of Pinot tables.
Our team identified an important issue with schema evolution in Pinot. In case of realtime Pinot tables (specifically, ingestion from a streaming data source), adding a new column to the existing schema was not fully supported. Although the older data segments reflected this correctly, the new column was not visible in the most active data segment, causing queries to fail. Our fix for this critical issue can be found here: https://github.com/apache/incubator-pinot/issues/4225
We have learned a lot while scaling our Pinot use cases inside Uber. Many of the lessons in this section are from areas addressing the growing pains found in operation, deployment, memory management, and monitoring.
Ease of operation is essential to scaling Pinot usage. When more use cases were onboarded to Pinot clusters, we wanted to keep the overhead of the cluster management to the minimum. Thankfully, there are several out-of-the-box features from Pinot that make operation and management easier.
Multi-tenant Pinot cluster
One of the core requirements of running an analytical system at scale is multi-tenancy. Pinot provides native multi-tenant support, and it has shown tremendous value in operations, especially in outage mitigation. With first-class tenant support, the tables can be logically grouped together under a single tenant name and assigned to the hosts of that tenant. This provides strong isolation and avoids noisy-neighbor problems. When issues arise such as a bad query pressing the server, we could limit the impact to its tenant and not breach the SLA of other tenants.
Ease of cluster expansion
Pinot comes with segment assignment strategy, so that segments can be distributed evenly among hosts, including the newly added ones. This greatly simplifies the cluster/tenant expansion efforts. All we need to do is provision a new Pinot server host and add it to the required tenant. It will automatically start getting new segments.
One area Uber improves upon the open source Pinot is to add JMX metrics associated with tenants. At Uber’s scale, our Pinot cluster has around 1,000 tables and hundreds of Pinot servers and brokers. It makes monitoring difficult for Pinot administrators; a typical scenario is that one table has query performance issues but it could be caused by some other tables on the same tenant. With Pinot tenant metrics, one can group the servers into a dozen tenants to check each tenant’s resource usage and query performance. It makes management of Pinot servers/broker and triage of issues much easier.
Another valuable feature provided by Pinot is the segment backup in the segment store (mentioned here), such as remote HDFS cluster or cloud storage. This feature greatly reduces the operational work required to replace a server node, helping to deal with the hardware failures as they occur in a large pool of machines. With the deep store backup, a newly added host is able to download and recover the data without human intervention and automatically serve the traffic once the segments are fully downloaded.
Oftentimes, a given Pinot controller or server can experience garbage collection (GC) pauses. Depending on the severity of the GC pause, this could result in a performance degradation or an outage. For instance, if the leader controller keeps experiencing full GC pauses, it fails to create new segments and hence stops ingestion for all the tables in that cluster. In case of a Pinot server experiencing full GC, it potentially causes query latency spikes and inconsistent query results for all tables belonging to that node. Here are typical reasons behind such issues.
After enabling Presto queries on top of Pinot, we noticed bad queries would often overwhelm the Pinot servers. For instance, if the user tries to do a select * query for a huge time range (or without any time range predicates), it will cause the Pinot servers to use up a lot of memory (our setup uses in-memory indexes instead of mmap) and eventually leads to a full GC pause. There are a few ways to minimize the impact of such large scans:
- Use off heap indices which could potentially impact overall query latency.
- Limit the query parallelism in the Pinot query scheduler configuration. Reducing this limit also reduces the memory pressure caused by the corresponding queries. Naturally, this can also affect overall Query throughput.
In general, to avoid such issues – we isolate such ad hoc querying use cases to a separate tenant.
Apache Helix metrics bug
Pinot controller is the control plane of the entire Pinot cluster. By design, it requires limited heap space because it does not host or serve data. When our Pinot cluster got larger with more tables and tables with more segments, we found the heap usage of Pinot controllers grew much bigger than the default value of 4GB. Major Java GC events followed and that caused disruption to the whole cluster. Upon heap analysis of Pinot controllers, we discovered that the issue is related to a histogram metric kept by Helix lib used in Pinot controllers. The histogram metric uses a default 1 hour sliding window which means too many event data points will be kept in memory for a busy production controller. We contributed a fix to Apache Helix so that one can configure the sliding window length to reduce memory footprint.
With a great variety of Pinot index and query patterns used by various Uber teams, we have also seen other cases of memory overhead:
- Queries involving Hyperloglog objects (used for approximate distinct counts) are sensitive to input size changes; even though Pinot servers can ingest these objects, the query time performance degrades a lot with 2-5x more data volume due to GC activities
- Queries with large numbers (many thousands) of values in the IN clause can also cause GC because these queries run over multiple table segments of a server – increasing the memory usage.
- Surge in query volume with 2-3 times or more normal traffic (often caused by region failover) can cause severe GCs
There are several challenges that we’ve encountered so far with respect to segment management, due to the scale of our Pinot clusters and the overall data volume.
Too many segments per server
As the scale of data grew, we also experienced several issues caused by too many segments. Pinot leverages Apache Helix over Apache Zookeeper for cluster management. For example, when a server transitioned from offline to online, Pinot will propagate state transition messages via Helix to notify other instances. The number of such state transition messages are proportional to the number of the segments on the server. When a server hosts too many segments, there could be a spike of state transition messages on Helix, resulting in lots of zookeeper nodes. If the number of zookeeper nodes is beyond the buffer threshold, the Pinot server and controller will crash. To solve this issue, we added message throttling to Pinot controllers to flatten the state transition surge.
Another challenge we faced is the potential for hotspots due to the segment assignment strategy. By default Pinot balances the segments among servers, by assigning a new segment to the least assigned host. As a result, the recent segments could all be created in the newly added servers in case of cluster expansion. To mitigate this issue, we run the table rebalance after cluster expansion.
Decoupling Pinot from segment store
During our operation of segment deep store, we identified two major issues with the current LLC protocol at the time:
- Deep store is a single point of failure for the real-time ingestion flow
- All segments upload and download go through Pinot controllers
The first problem is particularly a serious issue because many of our users want high SLA on data freshness (less than 5 mins for the 99th percentile). Within Uber, we’ve seen a few situations when HDFS was unavailable for one hour or so due to maintenance or outages. This violates SLA for all our important real-time tables. In practice, HDFS has its own SLA and can fail independently of Pinot. To solve this strict dependency issue, we proposed a major improvement over LLC so that it can continue real-time ingestion even if the deep store is down for a few hours. During the deep store downtime, the proposal leverages peer server storage to download segments instead. The proposal is approved by the community, code completed and now under testing.
Overall, our experience with Apache Pinot has been great. Within Uber, it has emerged as a key technology for solving real-time analytics use cases at scale. The memory efficient indices and columnar compression help in reducing storage cost. The built-in multi-tenancy features and ease of node and tenant maintenance low operational cost. In addition, the Apache community around Pinot is very welcoming and highly engaged. We continue to invest in Pinot and plan to work with the community for future projects such as Pinot Upserts, federated segment storage & query, smart indexing and so on.