Using graph algorithms to optimize Kafka operations, Part 1

Ping-Min Lin | Software Engineer, Logging Platform

Pinterest uses Kafka across the board for data movement. The Logging Platform team alone manages over 3,000 Kafka brokers transporting trillions of messages per day to help bring more than 320 million monthly active Pinners the inspiration to create a life they love.

The Logging Platform team has developed several tools to keep operations smooth in a deployment of this scale. In this post, we’ll share how we leverage graph algorithms to efficiently solve problems in our daily operations. This two-part write-up involves some graph theory and flow network concepts, which you can read more about here and here.

Quick recap of some Kafka concepts

  • Reassignments are done on multiple topics and partitions to change the elements and the ordering of the replica brokers. A reassignment that only changes the ordering of in-sync replicas won’t cause extra replication (i.e., data movement), but a reassignment that changes the elements requires full replication of the existing data before the reassignment can finish.
  • A preferred replica leader election (PLE) operation sets the leaders of each partition to the first broker of the replica list. If the partitions involved are fully-replicated and in-sync, this will not cause any additional data movement.

Check out the official introduction to Apache Kafka for more details.

The imbalanced leader problem

For each topic in a Kafka cluster, the number of leaders per broker may for various reasons change over time (e.g., when increasing the partition count of a topic, Kafka assigns some brokers as leaders of multiple partitions while other brokers have no leader replicas at all). If a broker dies, the on-call engineer might move replicas around to other brokers to restore the replication factor, which may also skew the leader distribution. This skew might cause new issues if the overloaded leaders hit network or CPU limits due to serving much more data than the other brokers.

An example of skewed leaders among brokers after increasing partitions of a topic.

To tackle this problem, we came up with two different graph-theoretic approaches given the following assumptions:

  • The load of each partition within a topic is uniform (i.e., it doesn’t have skews). This is usually true in our use cases, but it’s occasionally violated if keyed-partitions are used and there exist hot-keys in the topic.
  • We want to avoid replications and use leader swaps only since replication is an expensive operation that introduces extra load to an already overloaded broker.
  • Replicas within each partition are already spread out through different racks, so we don’t need to take care of rack distribution.
  • Offloading leaders from an overloaded broker to an underloaded broker

    A simple way to solve the problem of overloaded brokers is to (i) identify brokers that are leaders of more partitions than average and (ii) swap the leadership of some of the overloaded leaders’ partitions to follower brokers with lower than average leadership load. Here’s a simple example:

    This will work if the overloaded brokers are leaders of partitions with underloaded broker followers. However, this is usually not the case, and we wouldn’t want to change the leader from an overloaded broker to an average-loaded broker or another overloaded broker.

    All partitions of a topic are assumed to have the same load, so as long as the net leader count doesn’t change on average-loaded brokers, we can move leaders around by applying a series of leader swap operations (move the leader from the overloaded broker to intermediate brokers, then the intermediate brokers swap away another leader to other intermediate brokers, finally reaching the underloaded broker). Although this process seems to be a sequential chain of operations, it can be done with one reassignment and one PLE in Kafka, which greatly reduces the operational overhead.

    So how do we apply this “chain-swapping” concept? This is where graph theory can help us. We create a directed graph in which nodes represent the Kafka topic’s brokers and edges point from each partition’s leader broker to the partition’s follower brokers. For the above example, we have a directed graph like this:

    Given we know Broker 1 has two leaders and Broker 4 has none, we want to find a path from Broker 1 to Broker 4 in the directed graph; such a path represents a series of swap operations. We also want to minimize the number of swaps to perform since leader swaps still could cause spikes in the cluster, so we’re thus seeking the shortest path. By applying breadth-first search (BFS) from Broker 1’s node, we can find the shortest path to Broker 4.

    We need two leader swaps: (i) from Broker 1 to Broker 2 and (ii) from Broker 2 to Broker 4. Within which partitions should we execute the swaps? When creating the graph, we can keep the set of partitions (or use multi-edges) in the edge’s attribute. We then pick one partition for each edge in the path and swap the leaders in that partition from the source to the destination of that edge. In this case, for (i) we can choose Partition 1 or Partition 4 to swap, and for (ii) we swap Partition 2’s leader.

    This chain swapping method allows us to ease the skew within a topic without moving data around, which would have been inevitable if we’d sought the standard reassignment route (it could make matters worse if the leader broker was already throttled). However, a few issues emerged in practice. Since the graph changes when we perform an operation, we need to start over each time leaders are swapped. We can only move away one leader at a time; when there are multiple imbalanced leaders, we must find out which brokers are overloaded and which are underloaded, then iterate through each pair of sources and destinations. This trial-and-error process still imposes overhead to the on-call engineer when an incident happens for which the need to rebalance is urgent, and the largest fundamental reason for this overhead was the limitation of moving one leader at a time. With BFS, we simply can’t coordinate multiple leader swaps simultaneously on the same graph. We still needed a more feasible rebalancing approach.

    In this post, we’ve described the Kafka leader balancing problem we’re facing in our operations work, and we’ve derived a graph-based approach to solve the problem with certain constraints. In the next post, we’ll describe how we transformed the problem into a different type of graph, flow networks, which enabled us to better solve the leader balancing challenge with multiple brokers by utilizing powerful optimization algorithms that have been developed for decades.

    Keep an eye out for Part 2 next week!

    Using graph algorithms to optimize Kafka operations, Part 1 was originally published in Pinterest Engineering Blog on Medium, where people are continuing the conversation by highlighting and responding to this story.

    Source link