RxCentral: Uber’s Open Source Library for Seamless Bluetooth Integrations


At Uber, we innovate through products that connect users to both the digital and physical worlds, making services such as transportation and food delivery as easy as possible. An increasing number of these products share a common need: to discover, connect, and communicate using Bluetooth. To enable a new generation of innovations, we need to make Bluetooth easier for engineers to implement and enable cross-platform designs that can be repeated across applications.

Enter RxCentral, Uber-developed libraries for Android, currently open source, and iOS, soon to be open source, to communicate with Bluetooth LE peripherals in a reliable, repeatable fashion via a platform-agnostic, reactive design. RxCentral’s reactive functions can be used individually or in combination to orchestrate elegant, reactive user experiences with Bluetooth LE peripherals.

Our initial integration connects our driver-partners’ phones with our next-generation Uber beacon. Beacon combines color-matching technology with an advanced sensor suite to enhance the experience of both driver-partners and riders. To enable its complex features, beacon requires high throughput and reliability that pushes the limits of Bluetooth: RxCentral delivers this functionality, and in the process, improves transportation experiences on our platform for riders and driver-partners alike, who can only use Bluetooth in-app while leveraging beacon. 

RxCentral at a glance

RxCentral allows us to rapidly implement four primary functions: detect state, scan, connect, and communicate. Each function is performed in a reactive manner; subscribe to enable functionality and dispose to stop, letting engineers design services that cleanly integrate into a modern, reactive application.

Detect Bluetooth

The BluetoothDetector detects an Android device’s Bluetooth state, determining if Bluetooth is live and available to connect:

BluetoothDetector bluetoothDetector;

Disposable detection;

// Use the detector to detect Bluetooth state.

detection = bluetoothDetector

   .enabled()

   .subscribe(

       enabled -> {

         // Tell the user to turn on Bluetooth if not enabled

       }

   );

In our reactive paradigm, we can dispose our detection subscription to stop detection:

// Stop Bluetooth detection.

detection.dispose();

Scan

The Scanner tells the device to scan for Bluetooth LE peripherals advertising their availability:

Scanner scanner;

Disposable scanning;

// Use the scanner to scan for advertising peripherals.

scanning = scanner

   .scan())

   .subscribe(

       scanData -> {

         // We found the peripheral we are looking for

       }

   );

Disposing the scanner subscription stops scanning for other Bluetooth peripherals, automatically cleaning up after ourselves and releasing all scanning resources:

// Stop scanning.

scanner.dispose();

Connect

RxCentral’s code for establishing a connection is similarly easy to using the BluetoothDetector and Scanner. Building on the above example, we can scan for devices and connect to them with a single ConnectionManager function:

ScanData scanData;

ConnectionManager connectionManager

PeripheralManager peripheralManager;

Disposable connection;

// Connect to a device.

connection = connectionManager

   .connect(scanData)

   .subscribe(

       peripheral -> {

         // Inject our newly connected peripheral into the PeripheralManager. 

         peripheralManager.setPeripheral(peripheral);

       }

   );

If we start to see a pattern developing, we can just dispose our connection subscription to disconnect and clean up all resources:

// Disconnect from the device.

connection.dispose();

Communicate

Once the above functions have created a Bluetooth connection to a peripheral and given it to the PeripheralManager, we can queue operations, like Generic Attribute Profile (GATT) reads and writes, to perform on the peripheral:

PeripheralManager peripheralManager;

Read read;

Disposable operation;

// Perform a write operation.

operation = peripheralManager

  .queueOperation(read))

  .subscribe(

       bytes -> {

         // The bytes read from the peripheral.

       }); 

We retain the subscription to the operation as well, letting us cancel an operation if it has not yet run:

// Cancel the read.

operation.dispose();

Enhanced, reactive Bluetooth LE

RxCentral makes it easy to orchestrate elaborate sequences of actions in a reliable, repeatable fashion.  Since each function adheres to the Observable contract, we can combine them into seamless reactive streams that translate to seamless user experiences with beacon or any other Bluetooth-enabled hardware it powers. 

Detect, scan, and connect

We can detect state, scan for peripherals, and connect to a peripheral on a single reactive stream:

BluetoothDetector bluetoothDetector;

Scanner scanner;

ScanMatcher scanMatcher;

ConnectionManager connectionManager

PeripheralManager peripheralManager;

Disposable connection;

// When Bluetooth is enabled, scan for and connect to a peripheral.  

connection = bluetoothDetector

   .enabled()

   .filter(enabled -> enabled)

   .switchMap(enabled -> scanner.scan())

   .compose(scanMatcher.match()

   .switchMap(match -> connectionManager.connect(match))

   .subscribe(

       peripheral -> {

         // Inject our newly connected peripheral into the PeripheralManager. 

         peripheralManager.setPeripheral(peripheral);

       }

   );

Because connecting to a peripheral is a common use case, RxCentral makes these connections possible with a single function using the ConnectionManager and ScanMatcher. In this typical example, we connect to, then configure a peripheral before handing off to the PeripheralManager for communication. We also implement retry logic to automatically reconnect if a connection error occurs:

UUID service;

ScanMatcher scanMatcher;

ConnectionManager connectionManager

PeripheralManager peripheralManager;

Disposable connection;

// Connect, configure, and retry on connection errors.

connection = connectionManager

   .connect(scanMatcher))

   .switchMap(peripheral -> 

      peripheral.registerNotification(service).flatMap(irr -> Single.just(peripheral))

   .switchMap(peripheral -> 

      peripheral.requestMtu(512).flatMap(mtu -> Single.just(peripheral))

   .retryWhen(errors -> 

      errors.switchMap(error -> {

         if (error instanceof ConnectionError) {

           return Observable.just(true);

         } else {

           return Observable.error(error);

         }}))

   .subscribe(

       peripheral -> {

         peripheralManager.setPeripheral(peripheral);

       }

   );

The ability to retry any operation that fails is a key feature in RxCentral, which handles retries without the need to manage or reset state.

Connected operations

Oftentimes peripherals implement a command/response protocol where commands are written to a characteristic and a response is sent back to the mobile app via a notification:

UUID characteristic;

AbstractWrite<Boolean> writeCommand = new AbstractWrite<Boolean>(…) {

   @Override

   protected SingleTransformer<Peripheral, Boolean> postWrite() {

      return peripheralSingle -> peripheralSingle

         .flatMap(peripheral -> peripheral

            .notification(characteristic)

            .filter(bytes -> checkResponse(bytes))

            .map(bytes -> true));

   }

};

Below, we perform a series of operations each time we connect to our peripheral. lf any operation in the series fails, we can retry the entire stream or individual operations:

PeripheralManager peripheralManager;

Read read1;

Read read2;

Disposable operations;

// Perform a stream of operations each time we are connected to a Peripheral

operations = peripheralManager

   .connected()

   .filter(connected -> connected) // Begin operations when connected.

   .switchMap(connected -> 

      peripheralManager

         .queueOperation(read1))

   .switchMap(readResult1 -> 

      peripheralManager
        .queueOperation(writeCommand)

         .retry(2)) // Retry the write command up to two times.

   .filter(writeResult -> writeResult)

   .switchMap(writeSuccessful -> 

      peripheralManager
        .queueOperation(read2))

   .retry(3) // Retry the entire stream up to three times. 

   .subscribe(

       readResult2 -> {

         // Finally, we receive the second read result.

       }

   );

When using RxCentral, an application can have multiple streams, as depicted above, across different threads; the PeripheralManager provides a thread-safe FIFO operation queue.

Enabling innovation

At Uber, we’re using RxCentral across several products in production and under development to help users have more seamless transportation experiences on our platform. The flagship RxCentral integration is for beacon.

RxCentral’s design makes it easy to extend and compose functionality on top of the core implementations of ConnectionManager and PeripheralManager. The design shown in Figure 1, below, captures how we play animations on beacon: 

Diagram of Bluetooth stack between Uber beacon and app
Figure 1: This design shows how we can send commands to Uber beacon over Bluetooth, in this case making it play animations so a rider can more easily find their car.

 

Each logical manager (e.g., AnimationManager and FileTransferManager) composes reactive streams that extend upstream Observables rooted in the reactive bridge RxCentral provides to platform-level Bluetooth APIs.

To play an animation on beacon’s rear-facing screen, the consuming application simply subscribes to a stream exposed by the AnimationManager. This pattern allows engineers to easily integrate beacon into their application without concern for the intricacies of Bluetooth. 

What’s next

We designed RxCentral to make Bluetooth interaction simple. It eliminates the need to struggle with platform-level Bluetooth APIs and varying system designs between Android and iOS, letting engineers fully exploit the capabilities of Bluetooth peripherals.

We’ve chosen to open source RxCentral to help move the community forward and maintain a library that works with all peripherals, not just those used by Uber. With each Bluetooth peripheral comes a new set of challenges, and collecting feedback is key to improving RxCentral. Engineers can raise issues or contribute new features at the RxCentralBle Github page for our Android library

We hope to have the iOS library for RxCentral available to the community soon.



Source link