Demystifying RxJava Backpressure on Android at Uber Engineering


The quick and efficient transmission of information about app activity—e.g., requesting rides, placing orders, or approaching a pickup location—is crucial to a seamless user experience across Uber’s products.

At Uber, we use RxJava heavily in our Android apps to clearly communicate events between observers and express complicated asynchronous transformations. Uber’s rider and driver apps have many asynchronous states using RxJava Observables, such as communicating new dispatches to a driver or a new uberPOOL match to a rider currently on trip. Although RxJava has been incredibly useful to us, RxJava backpressure forced us to think creatively about how we use this library.

In this article, we offer an example of RxJava backpressure and share our best practices for negating it through RxJava backpressure operators, more forgiving RxJava 1.x configurations, and RxJava 2.x.

 

Under Pressure: A Case of Backpressure with RxJava 1.x

RxJava backpressure occurs when an Observable is discharging events faster than an operator is able to ingest them; this causes lags, crashes, and other issues. There are many scenarios where RxJava backpressure is common, particularly when dealing with multiple threads or asynchronous events. With RxJava 1.x, backpressure is problematic regardless of whether or not it needs to be supported; thus, MissingBackpressureException errors and corresponding crashes often occur with little context or insight into which events are causing them.

In 2015, we began migrating away from Otto (an enhanced event bus with Android support) and implementing more and more RxJava into our rider and driver apps. This caused an uptick in MissingBackpressureException crashes. Since the root cause of the crash is difficult to identify, our Mobile Platform team needed to gain a deeper understanding of how backpressure works in RxJava environments. We also needed to develop solutions for preventing it from slowing down our apps.

In practice, the event sequence leading to a MissingBackpressureException is complicated. To keep things simple, we will use a small RxJava 1.x snippet to demonstrate how this inability to consume events causes a MissingBackpressureException:

As demonstrated above, RxJava 1.x’s  rx.ring-buffer.size property stores the size of any in-memory ring buffers that RxJava uses when an Observable cannot keep up with rate of event emissions. The ring buffer size is explicitly set to a low number (8) because different platforms have different defaults for this value. While the Java Virtual Machine (JVM) default is 128 items per ring buffer, Android has a much lower limit of 16.

In our example, we create a subject and emit to it on the main thread, but try to consume the events on a new thread scheduler. To keep things deterministic and reproducible, we use a single thread scheduler, starting the thread off with a sleep call to ensure it cannot consume events while emitting them from the main thread.

Next, we create a PublishSubject with one observer on our single thread scheduler to print the values it receives. Finally, we emit items 0 to 11 to the subject by iterating through the values in the main thread and calling PublishSubject#onNext().

If you run our example snippet, it will crash after throwing this exception:

Since the code has emitted 12 items on the main thread (4 over our 8 item limit), our observer on the background scheduler is unable to process them because the thread is currently sleeping. Emitting more than our 8 item limit also causes a MissingBackpressureException, forcing our code to crash.

In the next section, we discuss how to alleviate this backpressureno aspirin necessary.

 

Relieving the Pressure: Buffer Strategies

We can implement a few buffer strategies to address the MissingBackpressureException example above and others like it. The simplest approach is to use the onBackpressureBuffer() operator from the ReactiveX library, which is effectively what RxJava 2.x does by default now:

subject

       .onBackpressureBuffer()

       .observeOn(scheduler)

       .map((Integer integer) -> String.format(“%d “, integer))

       .subscribe(System.out::print);

The operator seamlessly buffers all values until the downstream consumer ingests them. The command output is all of its corresponding values, demonstrated below:

Another strategy is to drop events using the onBackpressureDrop() operator. This ignores new events when the queue is full, demonstrated below:  

The last option is to use the onBackpressureLatest() operator. This ensures that the latest value is not dropped and will discard other values once the queue is full, demonstrated below:

In addition to the above examples, these strategies have parameterized versions to customize behavior. For example, if you want to use onBackpressureBuffer() but only buffer a fixed number of items, you can apply the onBackpressureBuffer() operator, which takes a capacity parameter.

Within Uber Engineering, we employ a mixture of these solutions to handle backpressure depending on the use case. Nowadays, most of the Uber Android codebase uses RxJava 2.x (the next iteration of the library), which allows us to opt-out of backpressure. For our remaining RxJava 1.x code, we bumped our ring buffer size to achieve similar behavior as RxJava 2.x. (We will cover this in greater detail later.)

 

Minding Backpressure Operator Side Effects

Without a strategy to relieve backpressure, an app under high memory pressure can easily fill the internal ring buffers and crash with a MissingBackpressureException. One common scenario causing a MissingBackpressureException is observing a value on the main thread when the application is not responsive. If the queue fills up before these events can be consumed, the app will crash with a MissingBackpressureException. Since the default queue size on Android is only 16 items (versus 128 on the JVM), this can become a major issue.

While using a buffer strategy seems like an obvious decision, these backpressure antidotes still have side effects. For instance, you must be confident that buffering in-memory will not cause your application to run out of memory. With Android development in particular, it is very important to keep a low memory profile; buffering plain old Java objects (POJOs) is unlikely to cause a crash. However, engineers should think twice before buffering items with a large file size like bitmaps or other memory intensive resources.

In fact, for a strategy that drops objects like onBackpressureDrop() or onBackpressureLatest() , you need to assess whether dropping events will cause adverse reactions. If the events are stateful (for example, an event sent to start and stop a certain app functionality), dropping an event could result in hard to find issues at runtime.

Once you have a basic understanding of backpressure, the RxJava documentation becomes much more useful. Every operator that consumes events has its own backpressure section that describes in detail how it will behave.

Looking through the available operators, we could have completely avoided the crash in the original example by using the range operator, which heeds to downstream backpressure and transmits values as requested.

 

Improvements with RxJava 2.x

Now that we have thoroughly dissected causes and solutions to RxJava 1.x backpressure, it is worth noting that RxJava 2.x incorporates new types that enable the library to better handle backpressure.

The Observable type in RxJava 2.x has no concept of backpressure. Implementing Observable is effectively the same as using onBackpressureBuffer() by default. UI events, one-off network requests, and state changes should all work with this approach. The Completable, Maybe, and Single types can also dictate this behavior.

If you need to support backpressure, RxJava 2.x’s new class, Flowable,  is backpressure-aware like Observable was in RxJava 1.x. However, the updated library now requires an explicit choice of a backpressure strategy to prevent surprise MissingBackpressureExceptions.

 

Achieving RxJava 2.x Behavior in RxJava 1.x

We currently use both legacy RxJava 1.x code and RxJava 2.x code for the Uber Android apps, with all new code elicited from the latter model.  After migrating most of the code to RxJava 2.x for our rider app, we still had a long tail of missing backpressure crashes from older RxJava 1.x code. After bumping the ring buffer property to 128 (matching the default on the JVM), backpressure crashes were completely eradicated without any memory strain. Hooray!

However, setting this property should not be taken lightly. It will be applied globally to every RxJava 1.x stream in your app, including third party libraries that use RxJava. Additionally, engineers should be conscious if any RxJava 1.x observables are handling backpressure. This scenario can cause crashes and other negative implications despite these adjustments.

With a clearer understanding of how backpressure works, Uber Engineering was able to re-architect our rider and driver apps with enhanced stability, making for a more seamless user experience. We hope this knowledge is useful for your RxJava endeavors.

Tony Cosentini is a software engineer on Uber’s Mobile Platform team.

Photo Header Credit: “Hydrodynamic flow across manta ray back fin” by Conor Myhrvold, Raja Ampat, Indonesia.



Source link

Write a comment