An introduction to Reactive Programming with RxJava – Part #2: The framework

In my last blog entry, I gave a short introduction to reactive programming. In this part, I will give a brief introduction to a reactive framework called RxJava. Yep, know I promised you some practical examples and there will be some. However, I couldn’t fit everything in, which means it will be a part 3 as well. 

RxJava is an implementation of the Reactive Extensions Library, for the JVM (Java Virtual Machine). It’s building on the Observable design pattern, which means that your building blocks can either be of type Publisher or Subscribers. A Publisher is an object that can produce events and a Subscriber can consume events produced by a Publisher.

The Basic

So, to get started, we first need to create a subscriber. In RxJava, all subscribers are implementing the interface org.reactivestreams.Subscriber<T>. The Subscriber interface is declaring three methods.

–    onNext: is called when there is new data.
–    onError: a failed terminal state.
–    onComplete: the successful terminal state.

Next, we need to create a Publisher; the producer of our events. RxJava has two types, Observable and Flowable. The difference between these two is that Flowable is designed for handling backpressure, while Observable is not. Backpressure is when the publishers are creating more events than the Subscribers can handle, which can lead to that the subscribers are missing events or a queue of events is created (which could lead to out of memory).

Flowable implements the org.reactivestreams.Publisher<T> interface, that has one method:

–    subscribe: request to start streaming to the Subscriber, given as argument.

The subscribe method is returning a Disposable object, which can be used to prematurely stop subscribing to events. In the example below, I have created a Flowable containing three Strings. Next, I have created a Subscriber, that will just print the strings to the console.

Operators

The Observable/Flowable classes have several methods for transforming the event data.

Map

The map method is called on each event that is produced and is used for transforming one type of data to another. In the example below, the Strings are mapped to their corresponding hashCode values (a number).

FlatMap

The flatmap method can convert one object to several, for example:

The example above takes one string, and then returns a new Observable, containing two new Strings. Of course, the new Observable can be of a completely different type.

Scan

Scan takes two parameters, the first one is an initial state and the second one is a function (Java 8’s BiFunctional interface). It then takes the initial state plus the first item and inputs it to the function. The result is then used with the second item when calling the function for the second time. I think an example will make it clearer.

Here, we are first setting the initial value to zero. In the function, we are taking the previous outcome (in the first iteration it is the initial value 0), and adding it to the length of the string. Which means that it’s just counting the number of letters in our strings (including the whitespace). If we wanted to ignore the whitespace, we could use the map function as well.

Filter

As name states, this is a method for applying a filter on the events. For example:

This will only print “Take Massive Action”, since it’s the only String that starts with the letter T. There’s a lot of other useful methods, but these are some of the basic ones.

Conclusion

For anyone familiar with Streams in Java 8, this looks just like that. Or does it? To start with, Streams in Java 8 are pull based. This means that is it’s the “subscriber” that is pulling the data out from the Stream. RxJava is push based, the data are pushed to the subscriber. Another difference is how you are handling exceptions. With streams in Java 8, you have to wrap the code within try-catch, for example, the following code won’t compile:

The method MyPrinter.print has declared that it might throw an exception of type Exception, for it to compile we need to rewrite it as:

With RxJava, that is taken care of in the onError method on our subscribers. Furthermore, RxJava has a great support for different Schedulers, where you can specify if the subscribers should run in a different thread than the main thread. This means that you won’t have to block the main thread while processing the events in another thread. In Java 9 there will be support for reactive programming with the new Flow API, this could be really interesting.

This was a short introduction to the RxJava framework. In my next entry, I will give a more concrete example for how we can use RxJava in some real code. Stay tuned.


Read pt#1, The Introduction, here and pt #3, Convert a traditional web application to a reactive, here.