Ping-Min Lin | Software Engineer, Logging Platform

In our first *Using graph algorithms to optimize Kafka operations* post, we highlighted the leader skew issue, one of the pain points we’ve experienced while maintaining our 3,000+ node Kafka deployment, and we discussed how we modeled the problem using a directed graph. We then described the “chain-swapping” mechanism to move excessive traffic from an overloaded broker to a broker with less load, and we detailed why this method wasn’t as useful in the common case of multiple overloaded brokers in the cluster. In this article, we discuss how we use flow network algorithms to deal with the downsides of the previous solution and solve the problem in general.

**One-shot leader rebalancing for a skewed topic: Maximum Flow Problem**

Flow networks are often used to model optimization problems, especially in logistics. It turns out we can transform the rebalancing problem into a Maximum Flow problem and use existing flow algorithms to solve it.

The graph we established in the previous article treated partitions as an attribute on the edges between brokers. However, this doesn’t accurately reflect their relationship in Kafka; partitions are logical and brokers are physical, and neither of them belongs to the other. We, therefore, promote partitions as a first-class citizen of the graph by making it another type of node. Now, instead of connecting the leader broker to follower brokers for each partition in the topic, we create an edge from the leader broker node to the partition node, and then we create edges with a capacity of 1 pointing from the partition node to the follower broker nodes. The result of applying this new graph formulation to our previous example is a *bipartite graph* like the following:

If we now set Broker 1 as the source and Broker 4 as the sink, we can find the maximum flow using algorithms available in most graph libraries (e.g., *networkx flow module* in Python, *jgrapht’s alg.flow package* in Java). One possible result goes through Broker 1, Partition 1, Broker 2, and Partition 2 before arriving at Broker 4.

This is similar to what we had in the directed graph from the first post: an edge in the flow from one broker to another broker via a partition indicates a leader swap on that partition. If a partition is not in the flow, no leader swap happens on that partition. Each partition has at most one edge flowing in, and the “one leader per partition” constraint will always hold since the capacity of each edge is 1.

Knowing how a single leader swap works, let’s extend our graph to offload multiple leaders. We first compute the average number of leaders per broker, *L_avg*. Overloaded brokers have more leaders than *L_avg*, while underloaded brokers have fewer leaders than *L_avg*. We connect all the overloaded brokers to a dummy *source* node and all underloaded brokers to a dummy *sink* node. These two dummy nodes will be the source and sink of our maximum flow when we apply the flow algorithms. Assigning the capacity of the edges from the *source *to the brokers and the brokers to the *sink* is a bit trickier. From the *source* to the overloaded brokers, we assign the capacity as the floored absolute difference between the number of leaders on that broker and *L_avg*; suppose *L_avg* = 1.75 and Broker 2 has 4 leaders, then the capacity on the edge between the *source* and Broker 2 would be *floor*(*abs*(4–1.75))* *= 2. The *floor* operation is applied because our Kafka operations are inherently discrete (i.e., there can’t be a 0.75 leader swap); meanwhile, if the algorithm swaps away too many leaders from an overloaded broker, some underloaded broker has to take the toll and may turn into an overloaded broker. For the edges from underloaded brokers to the *sink*, the capacity assignment is the same.

Applying the maximum flow algorithm directly to this graph would produce a result that helps the skew a bit, but it wouldn’t be very effective. While we try not to overshoot the leader distribution by *floor*ing the difference, at the same time we are losing the flexibility to adjust the brokers that have leader count close to *L_avg*. In the case above, since Broker 3 is allocated 0 capacity on it’s edge to the *sink*, it will not be able to increase its leader count and “absorb” a leader from the overloaded brokers, even if such a swap would reduce the overall skew across the partitions.

180 partitions spread across 122 brokers. Leaders aren’t fully balanced after applying the above method.

To mitigate this, we allocate two additional dummy nodes: the *source_residue* node and the *sink_residue* node. The *source_residue* has an edge from the *source* and an edge to each overloaded broker. The *sink_residue* is the direct opposite: each underloaded broker points to it, and it, in turn, points to the *sink*. The capacity between these two residue nodes and the brokers is always 1, but the edges between the terminals and residue nodes are a bit more complicated. As you can see in the name of these two nodes, they exist to “collect” the residue that we lost when *floor*ing the difference on edges between the *source* to the brokers and the brokers to the *sink*. The capacity from the *source* to the *source_residue* is the sum of the leftovers of the overloaded brokers, while the underloaded brokers’ counterpart goes to the capacity of the edge from the *sink_residue* to the *sink*. How does this help? It gives a second chance to the brokers by providing the flexibility to nudge the leader count on brokers with already near-average leaders.

We can almost always achieve perfect leader balance of a topic using the maximum flow algorithms after adding the shared residue mechanism to the graph (the exception is cases of extremely skewed topics that have low or no connectivity between overloaded and underloaded brokers):

Leaders are balanced, but the number of reassignments increased a lot.

**Further Improvements**

Although the topic in the above example is now fully balanced, the number of reassignments is extremely high compared to the theoretical minimum. This is because a lot of the reassignments don’t actually improve the skew since the algorithm doesn’t know what the distribution looks like and only tries to push as much from the source to the sink as possible. This potentially causes a lot of “*see-saw*” reassignments resulting in zero net skew change (i.e., moving a leader from a slightly overloaded broker to a slightly underloaded broker). We implemented two optimizations to help mitigate the issue.

The first one is to introduce a separate cost attribute to some edges, which will be incurred for every unit flow going through the edge. After applying the max-flow algorithm to the graph, we can apply minimum-cost-flow algorithms to solve for flows that minimize the cost. By setting the cost from the broker nodes to the partition nodes to 1, we penalize flows that are too long, favoring shorter reassignment chains. Placing a cost of 1 on edges from the *source* to the *source_residue* and from the *sink_residue* to the *sink* prioritizes the flow to use up the safe capacity that is allocated to the brokers before using the shared residue capacity, thus reducing see-saw reassignments.

The second optimization is pruning. If we remove the dummy nodes from the result flow, what’s left is the broker nodes and partition nodes. We can decompose the remaining graph into several weakly connected components. Each of these components represents a series of reassignments for which only the brokers in the given connected component are involved, and those brokers will not be involved in any other reassignments outside of that set of operations. If we calculate the “contribution” of each connected component to the result maximum flow, we can pick ones that actually reduce the skew and prune those that aren’t actually helpful, thereby reducing unnecessary reassignments.

After all these optimizations, the reassignment solution in the previous section can be improved to use only 15 reassignments. We think it is a reasonable result that uses only 4 more reassignments than the lower bound.

After the 2 optimizations. Leaders are balanced and operations are close to the lower bound.

Another example with a 120-partition topic covered by 135 brokers.

**Conclusion**

This started as a side project to automate the tedious manual rebalancing of troublesome imbalanced topics. However, it turned out to be a very useful tool for our Kafka operations. Our journey to explore and apply other interesting concepts in our Logging Platform team won’t stop here. At Pinterest Engineering, we are always looking for opportunities to combine different skills and backgrounds to create more value. If you are passionate about innovative solutions that solve interesting challenges, Pinterest Engineering would be a great playground for you to unleash your creativity!

**Acknowledgments**

*Great thanks to Vahid Hashemian, Ambud Sharma, Henry Cai, Yu Yang, Heng Zhang and Eric Lopez on the Logging Platform team for providing support and feedback to this project.*