RxJava is a Java based extension of ReactiveX. It provides implementation or ReactiveX project in Java. Following are the key characteristics of RxJava.
Extends the observer pattern.
Support sequences of data/events.
Provides operators to compose sequences together declaratively.
Handles threading, synchronization, thread-safety and concurrent data structures internally.
ReactiveX is a project which aims to provide reactive programming concept to various programming languages. Reactive Programming refers to the scenario where program reacts as and when data appears. It is a event based programming concept and events can propagate to registers observers.
As per the Reactive, they have combined the best of Observer pattern, Iterator pattern and functional pattern.
The Observer pattern done right. ReactiveX is a combination of the best ideas from the Observer pattern, the Iterator pattern, and functional programming.
Functional programming revolves around building the software using pure functions. A pure function do not depends upon previous state and always returns the same result for the same parameters passed. Pure functions helps avoiding problems associated with shared objects, mutable data and side effects often prevalent in multi-threading environments.
Reactive programming refers to event driven programming where data streams comes in asynchronous fashion and get processed when they are arrived.
RxJava implements both the concepts together, where data of streams changes over time and consumer function reacts accordingly.
Reactive Manifesto is an on-line document stating the high standard of application software systems. As per the manifesto, following are the key attributes of a reactive software −
Responsive − Should always respond in a timely fashion.
Message Driven − Should use asynchronous message-passing between components so that they maintain loose coupling.
Elastic − Should stay responsive even under high load.
Resilient − Should stay responsive even if any component(s) fail.
RxJava have two key components: Observables and Observer.
Observable − It represents an object similar to Stream which can emit zero or more data, can send error message, whose speed can be controlled while emitting a set of data, can send finite as well as infinite data.
Observer − It subscribes to Observable's data of sequence and reacts per item of the observables. Observers are notified whenever Observable emits a data. An Observer handles data one by one.
An observer is never notified if items are not present or a callback is not returned for a previous item.
RxJava is a library for Java, so the very first requirement is to have JDK installed in your machine.
JDK | 1.5 or above. |
---|---|
Memory | No minimum requirement. |
Disk Space | No minimum requirement. |
Operating System | No minimum requirement. |
First of all, open the console and execute a java command based on the operating system you are working on.
OS | Task | Command |
---|---|---|
Windows | Open Command Console | c:\> java -version |
Linux | Open Command Terminal | $ java -version |
Mac | Open Terminal | machine:< joseph$ java -version |
Let's verify the output for all the operating systems −
OS | Output |
---|---|
Windows | java version "1.8.0_101" Java(TM) SE Runtime Environment (build 1.8.0_101) |
Linux | java version "1.8.0_101" Java(TM) SE Runtime Environment (build 1.8.0_101) |
Mac | java version "1.8.0_101" Java(TM) SE Runtime Environment (build 1.8.0_101) |
If you do not have Java installed on your system, then download the Java Software Development Kit (SDK) from the following link https://www.oracle.com. We are assuming Java 1.8.0_101 as the installed version for this tutorial.
Set the JAVA_HOME environment variable to point to the base directory location where Java is installed on your machine. For example.
OS | Output |
---|---|
Windows | Set the environment variable JAVA_HOME to C:\Program Files\Java\jdk1.8.0_101 |
Linux | export JAVA_HOME = /usr/local/java-current |
Mac | export JAVA_HOME = /Library/Java/Home |
Append Java compiler location to the System Path.
OS | Output |
---|---|
Windows | Append the string C:\Program Files\Java\jdk1.8.0_101\bin at the end of the system variable, Path. |
Linux | export PATH = $PATH:$JAVA_HOME/bin/ |
Mac | not required |
Verify Java installation using the command java -version as explained above.
Download the latest version of RxJava jar file from RxJava @ MVNRepository and its dependency Reactive Streams @ MVNRepository . At the time of writing this tutorial, we have downloaded rxjava-2.2.4.jar, reactive-streams-1.0.2.jar and copied it into C:\>RxJava folder.
OS | Archive name |
---|---|
Windows | rxjava-2.2.4.jar, reactive-streams-1.0.2.jar |
Linux | rxjava-2.2.4.jar, reactive-streams-1.0.2.jar |
Mac | rxjava-2.2.4.jar, reactive-streams-1.0.2.jar |
Set the RX_JAVA environment variable to point to the base directory location where RxJava jar is stored on your machine. Let’s assuming we've stored rxjava-2.2.4.jar and reactive-streams-1.0.2.jar in the RxJava folder.
Sr.No | OS & Description |
---|---|
1 | Windows Set the environment variable RX_JAVA to C:\RxJava |
2 | Linux export RX_JAVA = /usr/local/RxJava |
3 | Mac export RX_JAVA = /Library/RxJava |
Set the CLASSPATH environment variable to point to the RxJava jar location.
Sr.No | OS & Description |
---|---|
1 | Windows Set the environment variable CLASSPATH to %CLASSPATH%;%RX_JAVA%\rxjava-2.2.4.jar;%RX_JAVA%\reactive-streams-1.0.2.jar;.; |
2 | Linux export CLASSPATH = $CLASSPATH:$RX_JAVA/rxjava-2.2.4.jar:reactive-streams-1.0.2.jar:. |
3 | Mac export CLASSPATH = $CLASSPATH:$RX_JAVA/rxjava-2.2.4.jar:reactive-streams-1.0.2.jar:. |
Create a class TestRx.java as shown below −
import io.reactivex.Flowable; public class TestRx { public static void main(String[] args) { Flowable.just("Hello World!") .subscribe(System.out::println); } }
Compile the classes using javac compiler as follows −
C:\RxJava>javac Tester.java
Verify the output.
Hello World!
Observables represents the sources of data where as Observers (Subscribers) listen to them. In nutshell, an Observable emits items and a Subscriber then consumes these items.
Observable provides data once subscriber starts listening.
Observable can emit any number of items.
Observable can emit only signal of completion as well with no item.
Observable can terminate successfully.
Observable may never terminate. e.g. a button can be clicked any number of times.
Observable may throw error at any point of time.
Observable can have multiple subscribers.
When an Observable emits an item, each subscriber onNext() method gets invoked.
When an Observable finished emitting items, each subscriber onComplete() method gets invoked.
If an Observable emits error, each subscriber onError() method gets invoked.
Following are the base classes to create observables.
Flowable − 0..N flows, Emits 0 or n items. Supports Reactive-Streams and back-pressure.
Observable − 0..N flows ,but no back-pressure.
Single − 1 item or error. Can be treated as a reactive version of method call.
Completable − No item emitted. Used as a signal for completion or error. Can be treated as a reactive version of Runnable.
MayBe − Either No item or 1 item emitted. Can be treated as a reactive version of Optional.
Following are the convenient methods to create observables in Observable class.
just(T item) − Returns an Observable that signals the given (constant reference) item and then completes.
fromIterable(Iterable source) − Converts an Iterable sequence into an ObservableSource that emits the items in the sequence.
fromArray(T... items) − Converts an Array into an ObservableSource that emits the items in the Array.
fromCallable(Callable supplier) − Returns an Observable that, when an observer subscribes to it, invokes a function you specify and then emits the value returned from that function.
fromFuture(Future future) − Converts a Future into an ObservableSource.
interval(long initialDelay, long period, TimeUnit unit) − Returns an Observable that emits a 0L after the initialDelay and ever increasing numbers after each period of time thereafter.
The Single class represents the single value response. Single observable can only emit either a single successful value or an error. It does not emit onComplete event.
Following is the declaration for io.reactivex.Single<T> class −
public abstract class Single<T> extends Object implements SingleSource<T>
Following is the sequential protocol that Single Observable operates −
onSubscribe (onSuccess | onError)?
Create the following Java program using any editor of your choice in, say, C:\> RxJava.
import java.util.concurrent.TimeUnit; import io.reactivex.Single; import io.reactivex.disposables.Disposable; import io.reactivex.observers.DisposableSingleObserver; import io.reactivex.schedulers.Schedulers; public class ObservableTester { public static void main(String[] args) throws InterruptedException { //Create the observable Single<String> testSingle = Single.just("Hello World"); //Create an observer Disposable disposable = testSingle .delay(2, TimeUnit.SECONDS, Schedulers.io()) .subscribeWith( new DisposableSingleObserver<String>() { @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onSuccess(String value) { System.out.println(value); } }); Thread.sleep(3000); //start observing disposable.dispose(); } }
Compile the class using javac compiler as follows −
C:\RxJava>javac ObservableTester.java
Now run the ObservableTester as follows −
C:\RxJava>java ObservableTester
It should produce the following output −
Hello World
The MayBe class represents deferred response. MayBe observable can emit either a single successful value or no value.
Following is the declaration for io.reactivex.Single<T> class −
public abstract class Maybe<T> extends Object implements MaybeSource<T>
Following is the sequential protocol that MayBe Observable operates −
onSubscribe (onSuccess | onError | OnComplete)?
Create the following Java program using any editor of your choice in, say, C:\> RxJava.
import java.util.concurrent.TimeUnit; import io.reactivex.Maybe; import io.reactivex.disposables.Disposable; import io.reactivex.observers.DisposableMaybeObserver; import io.reactivex.schedulers.Schedulers; public class ObservableTester { public static void main(String[] args) throws InterruptedException { //Create an observer Disposable disposable = Maybe.just("Hello World") .delay(2, TimeUnit.SECONDS, Schedulers.io()) .subscribeWith(new DisposableMaybeObserver<String>() { @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onSuccess(String value) { System.out.println(value); } @Override public void onComplete() { System.out.println("Done!"); } }); Thread.sleep(3000); //start observing disposable.dispose(); } }
Compile the class using javac compiler as follows −
C:\RxJava>javac ObservableTester.java
Now run the ObservableTester as follows −
C:\RxJava>java ObservableTester
It should produce the following output −
Hello World
The Completable class represents deferred response. Completable observable can either indicate a successful completion or error.
Following is the declaration for io.reactivex.Completable class −
public abstract class Completable extends Object implements CompletableSource
Following is the sequential protocol that Completable Observable operates −
onSubscribe (onError | onComplete)?
Create the following Java program using any editor of your choice in, say, C:\> RxJava.
import java.util.concurrent.TimeUnit; import io.reactivex.Completable; import io.reactivex.disposables.Disposable; import io.reactivex.observers.DisposableCompletableObserver; import io.reactivex.schedulers.Schedulers; public class ObservableTester { public static void main(String[] args) throws InterruptedException { //Create an observer Disposable disposable = Completable.complete() .delay(2, TimeUnit.SECONDS, Schedulers.io()) .subscribeWith(new DisposableCompletableObserver() { @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onStart() { System.out.println("Started!"); } @Override public void onComplete() { System.out.println("Done!"); } }); Thread.sleep(3000); //start observing disposable.dispose(); } }
Compile the class using javac compiler as follows −
C:\RxJava>javac ObservableTester.java
Now run the ObservableTester as follows −
C:\RxJava>java ObservableTester
It should produce the following output −
Started! Done!
The CompositeDisposable class represents a container which can hold multiple disposable and offers O(1) complexity of adding and removing disposables.
Following is the declaration for io.reactivex.disposables.CompositeDisposable class −
public final class CompositeDisposable extends Object implements Disposable, io.reactivex.internal.disposables.DisposableContainer
Create the following Java program using any editor of your choice in, say, C:\> RxJava.
import io.reactivex.Maybe; import io.reactivex.Single; import io.reactivex.disposables.CompositeDisposable; import io.reactivex.disposables.Disposable; import io.reactivex.observers.DisposableMaybeObserver; import io.reactivex.observers.DisposableSingleObserver; import io.reactivex.schedulers.Schedulers; import java.util.concurrent.TimeUnit; public class ObservableTester { public static void main(String[] args) throws InterruptedException { CompositeDisposable compositeDisposable = new CompositeDisposable(); //Create an Single observer Disposable disposableSingle = Single.just("Hello World") .delay(2, TimeUnit.SECONDS, Schedulers.io()) .subscribeWith( new DisposableSingleObserver<String>() { @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onSuccess(String value) { System.out.println(value); } }); //Create an observer Disposable disposableMayBe = Maybe.just("Hi") .delay(2, TimeUnit.SECONDS, Schedulers.io()) .subscribeWith(new DisposableMaybeObserver<String>() { @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onSuccess(String value) { System.out.println(value); } @Override public void onComplete() { System.out.println("Done!"); } }); Thread.sleep(3000); compositeDisposable.add(disposableSingle); compositeDisposable.add(disposableMayBe); //start observing compositeDisposable.dispose(); } }
Compile the class using javac compiler as follows −
C:\RxJava>javac ObservableTester.java
Now run the ObservableTester as follows −
C:\RxJava>java ObservableTester
It should produce the following output −
Hello World Hi
Following are the operators which are used to create an Observable.
Sr.No. | Operator & Description |
---|---|
1 | Create Creates an Observable from scratch and allows observer method to call programmatically. |
2 | Defer Do not create an Observable until an observer subscribes. Creates a fresh observable for each observer. |
3 | Empty/Never/Throw Creates an Observable with limited behavior. |
4 | From Converts an object/data structure into an Observable. |
5 | Interval Creates an Observable emitting integers in sequence with a gap of specified time interval. |
6 | Just Converts an object/data structure into an Observable to emit the same or same type of objects. |
7 | Range Creates an Observable emitting integers in sequence of given range. |
8 | Repeat Creates an Observable emitting integers in sequence repeatedly. |
9 | Start Creates an Observable to emit the return value of a function. |
10 | Timer Creates an Observable to emit a single item after given delay. |
Create the following Java program using any editor of your choice in, say, C:\> RxJava.
import io.reactivex.Observable; //Using fromArray operator to create an Observable public class ObservableTester { public static void main(String[] args) { String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; final StringBuilder result = new StringBuilder(); Observable<String> observable = Observable.fromArray(letters); observable .map(String::toUpperCase) .subscribe( letter -> result.append(letter)); System.out.println(result); } }
Compile the class using javac compiler as follows −
C:\RxJava>javac ObservableTester.java
Now run the ObservableTester as follows −
C:\RxJava>java ObservableTester
It should produce the following output −
ABCDEFG
Following are the operators which are used to transform an item emitted from an Observable.
Sr.No. | Operator & Description |
---|---|
1 | Buffer Gathers items from Observable into bundles periodically and then emit the bundles rather than items. |
2 | FlatMap Used in nested observables. Transforms items into Observables. Then flatten the items into single Observable. |
3 | GroupBy Divide an Observable into set of Observables organized by key to emit different group of items. |
4 | Map Apply a function to each emitted item to transform it. |
5 | Scan Apply a function to each emitted item, sequentially and then emit the successive value. |
6 | Window Gathers items from Observable into Observable windows periodically and then emit the windows rather than items. |
Create the following Java program using any editor of your choice in, say, C:\> RxJava.
import io.reactivex.Observable; //Using map operator to transform an Observable public class ObservableTester { public static void main(String[] args) { String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; final StringBuilder result = new StringBuilder(); Observable<String> observable = Observable.fromArray(letters); observable .map(String::toUpperCase) .subscribe( letter -> result.append(letter)); System.out.println(result); } }
Compile the class using javac compiler as follows −
C:\RxJava>javac ObservableTester.java
Now run the ObservableTester as follows −
C:\RxJava>java ObservableTester
It should produce the following output −
ABCDEFG
Following are the operators which are used to selectively emit item(s) from an Observable.
Sr.No. | Operator & Description |
---|---|
1 | Debounce Emits items only when timeout occurs without emiting another item. |
2 | Distinct Emits only unique items. |
3 | ElementAt emit only item at n index emitted by an Observable. |
4 | Filter Emits only those items which pass the given predicate function. |
5 | First Emits the first item or first item which passed the given criteria. |
6 | IgnoreElements Do not emits any items from Observable but marks completion. |
7 | Last Emits the last element from Observable. |
8 | Sample Emits the most recent item with given time interval. |
9 | Skip Skips the first n items from an Observable. |
10 | SkipLast Skips the last n items from an Observable. |
11 | Take takes the first n items from an Observable. |
12 | TakeLast takes the last n items from an Observable. |
Create the following Java program using any editor of your choice in, say, C:\> RxJava.
import io.reactivex.Observable; //Using take operator to filter an Observable public class ObservableTester { public static void main(String[] args) { String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; final StringBuilder result = new StringBuilder(); Observable<String> observable = Observable.fromArray(letters); observable .take(2) .subscribe( letter -> result.append(letter)); System.out.println(result); } }
Compile the class using javac compiler as follows −
C:\RxJava>javac ObservableTester.java
Now run the ObservableTester as follows −
C:\RxJava>java ObservableTester
It should produce the following output −
ab
Following are the operators which are used to create a single Observable from multiple Observables.
Sr.No. | Operator & Description |
---|---|
1 |
And/Then/When
Combine item sets using Pattern and Plan intermediaries. |
2 |
CombineLatest
Combine the latest item emitted by each Observable via a specified function and emit resulted item. |
3 |
Join
Combine items emitted by two Observables if emitted during time-frame of second Observable emitted item. |
4 |
Merge
Combines the items emitted of Observables. |
5 |
StartWith
Emit a specified sequence of items before starting to emit the items from the source Observable |
6 |
Switch
Emits the most recent items emitted by Observables. |
7 |
Zip
Combines items of Observables based on function and emits the resulted items. |
Create the following Java program using any editor of your choice in, say, C:\> RxJava.
import io.reactivex.Observable; //Using combineLatest operator to combine Observables public class ObservableTester { public static void main(String[] args) { Integer[] numbers = { 1, 2, 3, 4, 5, 6}; String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; final StringBuilder result = new StringBuilder(); Observable<String> observable1 = Observable.fromArray(letters); Observable<Integer> observable2 = Observable.fromArray(numbers); Observable.combineLatest(observable1, observable2, (a,b) -> a + b) .subscribe( letter -> result.append(letter)); System.out.println(result); } }
Compile the class using javac compiler as follows −
C:\RxJava>javac ObservableTester.java
Now run the ObservableTester as follows −
C:\RxJava>java ObservableTester
It should produce the following output −
g1g2g3g4g5g6
Following are the operators which are often useful with Observables.
Sr.No. | Operator & Description |
---|---|
1 | Delay Register action to handle Observable life-cycle events. |
2 | Materialize/Dematerialize Represents item emitted and notification sent. |
3 | ObserveOn Specify the scheduler to be observed. |
4 | Serialize Force Observable to make serialized calls. |
5 | Subscribe Operate upon the emissions of items and notifications like complete from an Observable |
6 | SubscribeOn Specify the scheduler to be used by an Observable when it is subscribed to. |
7 | TimeInterval Convert an Observable to emit indications of the amount of time elapsed between emissions. |
8 | Timeout Issues error notification if specified time occurs without emitting any item. |
9 | Timestamp Attach timestamp to each item emitted. |
9 |
Using Creates a disposable resource or same lifespan as that of Observable. |
Create the following Java program using any editor of your choice in, say, C:\> RxJava.
import io.reactivex.Observable; //Using subscribe operator to subscribe to an Observable public class ObservableTester { public static void main(String[] args) { String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; final StringBuilder result = new StringBuilder(); Observable<String> observable = Observable.fromArray(letters); observable.subscribe( letter -> result.append(letter)); System.out.println(result); } }
Compile the class using javac compiler as follows −
C:\RxJava>javac ObservableTester.java
Now run the ObservableTester as follows −
C:\RxJava>java ObservableTester
It should produce the following output −
abcdefg
Following are the operators which evaluates one or multiple Observables or items emitted.
Sr.No. | Operator & Description |
---|---|
1 |
All Evaluates all items emitted to meet given criteria. |
2 |
Amb Emits all items from the first Observable only given multiple Observables. |
3 |
Contains Checks if an Observable emits a particular item or not. |
4 |
DefaultIfEmpty Emits default item if Observable do not emit anything. |
5 |
SequenceEqual Checks if two Observables emit the same sequence of items. |
6 |
SkipUntil Discards items emitted by first Observable until a second Observable emits an item. |
7 |
SkipWhile Discard items emitted by an Observable until a given condition becomes false. |
8 |
TakeUntil Discards items emitted by an Observable after a second Observable emits an item or terminates. |
9 |
TakeWhile Discard items emitted by an Observable after a specified condition becomes false. |
Create the following Java program using any editor of your choice in, say, C:\> RxJava.
import io.reactivex.Observable; //Using defaultIfEmpty operator to operate on an Observable public class ObservableTester { public static void main(String[] args) { final StringBuilder result = new StringBuilder(); Observable.empty() .defaultIfEmpty("No Data") .subscribe(s -> result.append(s)); System.out.println(result); String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; final StringBuilder result1 = new StringBuilder(); Observable.fromArray(letters) .firstElement() .defaultIfEmpty("No data") .subscribe(s -> result1.append(s)); System.out.println(result1); } }
Compile the class using javac compiler as follows −
C:\RxJava>javac ObservableTester.java
Now run the ObservableTester as follows −
C:\RxJava>java ObservableTester
It should produce the following output −
No Data a
Following are the operators which operates on entire items emitted by an Observable.
Sr.No. | Operator & Description |
---|---|
1 | Average Evaluates averages of all items and emit the result. |
2 | Concat Emits all items from multiple Observable without interleaving. |
3 | Count Counts all items and emit the result. |
4 | Max Evaluates max valued item of all items and emit the result. |
5 | Min Evaluates min valued item of all items and emit the result. |
6 | Reduce Apply a function on each item and return the result. |
7 | Sum Evaluates sum of all items and emit the result. |
Create the following Java program using any editor of your choice in, say, C:\> RxJava.
import io.reactivex.Observable; //Using concat operator to operate on multiple Observables public class ObservableTester { public static void main(String[] args) throws InterruptedException { Integer[] numbers = { 1, 2, 3, 4, 5, 6}; String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; final StringBuilder result = new StringBuilder(); Observable<String> observable1 = Observable.fromArray(letters); Observable<Integer> observable2 = Observable.fromArray(numbers); Observable.concat(observable1, observable2) .subscribe( letter -> result.append(letter)); System.out.println(result); } }
Compile the class using javac compiler as follows −
C:\RxJava>javac ObservableTester.java
Now run the ObservableTester as follows −
C:\RxJava>java ObservableTester
It should produce the following output −
abcdefg123456
Following are the operators which has more precisely control over subscription.
Sr.No. | Operator & Description |
---|---|
1 | Connect Instruct a connectable Observable to emit items to its subscribers. |
2 | Publish Converts an Observable to connectable Observable. |
3 | RefCount Converts a connectable Observable to ordinary Observable. |
4 | Replay Ensure same sequence of emitted items to be seen by each subscriber, even after the Observable has begun emitting items and subscribers subscribe later. |
Create the following Java program using any editor of your choice in, say, C:\> RxJava.
import io.reactivex.Observable; import io.reactivex.observables.ConnectableObservable; //Using connect operator on a ConnectableObservable public class ObservableTester { public static void main(String[] args) { String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; final StringBuilder result = new StringBuilder(); ConnectableObservable<String> connectable = Observable.fromArray(letters).publish(); connectable.subscribe(letter -> result.append(letter)); System.out.println(result.length()); connectable.connect(); System.out.println(result.length()); System.out.println(result); } }
Compile the class using javac compiler as follows −
C:\RxJava>javac ObservableTester.java
Now run the ObservableTester as follows −
C:\RxJava>java ObservableTester
It should produce the following output −
0 7 abcdefg
As per the Reactive, a Subject can act as both Observable as well as Observer.
A Subject is a sort of bridge or proxy that is available in some implementations of ReactiveX that acts both as an observer and as an Observable. Because it is an observer, it can subscribe to one or more Observables, and because it is an Observable, it can pass through the items it observes by reemitting them, and it can also emit new items.
There are four types of Subjects −
Sr.No. | Subject & Description |
---|---|
1 | Publish Subject Emits only those items which are emitted after time of subscription. |
2 |
Replay Subject
Emits all the items emitted by source Observable regardless of when it has subscribed the Observable. |
3 | Behavior Subject Upon subscription, emits the most recent item then continue to emit item emitted by the source Observable. |
4 | Async Subject Emits the last item emitted by the source Observable after it's completes emission. |
PublishSubject emits items to currently subscribed Observers and terminal events to current or late Observers.
Following is the declaration for io.reactivex.subjects.PublishSubject<T> class −
public final class PublishSubject<T> extends Subject<T>
Create the following Java program using any editor of your choice in, say, C:\> RxJava.
import io.reactivex.subjects.PublishSubject; public class ObservableTester { public static void main(String[] args) { final StringBuilder result1 = new StringBuilder(); final StringBuilder result2 = new StringBuilder(); PublishSubject<String> subject = PublishSubject.create(); subject.subscribe(value -> result1.append(value) ); subject.onNext("a"); subject.onNext("b"); subject.onNext("c"); subject.subscribe(value -> result2.append(value)); subject.onNext("d"); subject.onComplete(); //Output will be abcd System.out.println(result1); //Output will be d only //as subscribed after c item emitted. System.out.println(result2); } }
Compile the class using javac compiler as follows −
C:\RxJava>javac ObservableTester.java
Now run the ObservableTester as follows −
C:\RxJava>java ObservableTester
It should produce the following output −
abcd d
BehaviorSubject emits the most recent item it has observed and then all subsequent observed items to each subscribed Observer.
Following is the declaration for io.reactivex.subjects.BehaviorSubject<T> class −
public final class BehaviorSubject<T> extends Subject<T>
Create the following Java program using any editor of your choice in, say, C:\> RxJava.
import io.reactivex.subjects.BehaviorSubject; public class ObservableTester { public static void main(String[] args) { final StringBuilder result1 = new StringBuilder(); final StringBuilder result2 = new StringBuilder(); BehaviorSubject<String> subject = BehaviorSubject.create(); subject.subscribe(value -> result1.append(value) ); subject.onNext("a"); subject.onNext("b"); subject.onNext("c"); subject.subscribe(value -> result2.append(value)); subject.onNext("d"); subject.onComplete(); //Output will be abcd System.out.println(result1); //Output will be cd being BehaviorSubject //(c is last item emitted before subscribe) System.out.println(result2); } }
Compile the class using javac compiler as follows −
C:\RxJava>javac ObservableTester.java
Now run the ObservableTester as follows −
C:\RxJava>java ObservableTester
It should produce the following output −
abcd cd
ReplaySubject replays events/items to current and late Observers.
Following is the declaration for io.reactivex.subjects.ReplaySubject<T> class −
public final class ReplaySubject<T> extends Subject<T>
Create the following Java program using any editor of your choice in, say, C:\> RxJava.
import io.reactivex.subjects.ReplaySubject; public class ObservableTester { public static void main(String[] args) { final StringBuilder result1 = new StringBuilder(); final StringBuilder result2 = new StringBuilder(); ReplaySubject<String> subject = ReplaySubject.create(); subject.subscribe(value -> result1.append(value) ); subject.onNext("a"); subject.onNext("b"); subject.onNext("c"); subject.subscribe(value -> result2.append(value)); subject.onNext("d"); subject.onComplete(); //Output will be abcd System.out.println(result1); //Output will be abcd being ReplaySubject //as ReplaySubject emits all the items System.out.println(result2); } }
Compile the class using javac compiler as follows −
C:\RxJava>javac ObservableTester.java
Now run the ObservableTester as follows −
C:\RxJava>java ObservableTester
It should produce the following output −
abcd abcd
AsyncSubject emits the only last value followed by a completion event or the received error to Observers.
Following is the declaration for io.reactivex.subjects.AsyncSubject<T> class −
public final class AsyncSubject<T> extends Subject<T>
Create the following Java program using any editor of your choice in, say, C:\> RxJava.
import io.reactivex.subjects. AsyncSubject; public class ObservableTester { public static void main(String[] args) { final StringBuilder result1 = new StringBuilder(); final StringBuilder result2 = new StringBuilder(); AsyncSubject<String> subject = AsyncSubject.create(); subject.subscribe(value -> result1.append(value) ); subject.onNext("a"); subject.onNext("b"); subject.onNext("c"); subject.subscribe(value -> result2.append(value)); subject.onNext("d"); subject.onComplete(); //Output will be d being the last item emitted System.out.println(result1); //Output will be d being the last item emitted System.out.println(result2); } }
Compile the class using javac compiler as follows −
C:\RxJava>javac ObservableTester.java
Now run the ObservableTester as follows −
C:\RxJava>java ObservableTester
It should produce the following output −
d d
Schedulers are used in multi-threading environment to work with Observable operators.
As per the Reactive,Scheduler are used to schedule how chain of operators will apply to different threads.
By default, an Observable and the chain of operators that you apply to it will do its work, and will notify its observers, on the same thread on which its Subscribe method is called. The SubscribeOn operator changes this behavior by specifying a different Scheduler on which the Observable should operate. The ObserveOn operator specifies a different Scheduler that the Observable will use to send notifications to its observers.
There are following types of Schedulers available in RxJava −
Sr.No. | Scheduler & Description |
---|---|
1 | Schedulers.computation() Creates and returns a Scheduler intended for computational work. Count of threads to be scheduled depends upon the CPUs present in the system. One thread is allowed per CPU. Best for event-loops or callback operations. |
2 | Schedulers.io() Creates and returns a Scheduler intended for IO-bound work. Thread pool may extend as needed. |
3 | Schedulers.newThread() Creates and returns a Scheduler that creates a new Thread for each unit of work. |
4 | Schedulers.trampoline() Creates and returns a Scheduler that queues work on the current thread to be executed after the current work completes. |
4 | Schedulers.from(java.util.concurrent.Executor executor) Converts an Executor into a new Scheduler instance. |
Schedulers.trampoline() method creates and returns a Scheduler that queues work on the current thread to be executed after the current work completes.
Create the following Java program using any editor of your choice in, say, C:\> RxJava.
import java.util.Random; import io.reactivex.Observable; import io.reactivex.schedulers.Schedulers; public class ObservableTester { public static void main(String[] args) throws InterruptedException { Observable.just("A", "AB", "ABC") .flatMap(v -> getLengthWithDelay(v) .doOnNext(s -> System.out.println("Processing Thread " + Thread.currentThread().getName())) .subscribeOn(Schedulers.trampoline())) .subscribe(length -> System.out.println("Receiver Thread " + Thread.currentThread().getName() + ", Item length " + length)); Thread.sleep(10000); } protected static Observable<Integer> getLengthWithDelay(String v) { Random random = new Random(); try { Thread.sleep(random.nextInt(3) * 1000); return Observable.just(v.length()); } catch (InterruptedException e) { e.printStackTrace(); } return null; } }
Compile the class using javac compiler as follows −
C:\RxJava>javac ObservableTester.java
Now run the ObservableTester as follows −
C:\RxJava>java ObservableTester
It should produce the following output −
Processing Thread main Receiver Thread main, Item length 1 Processing Thread main Receiver Thread main, Item length 2 Processing Thread main Receiver Thread main, Item length 3
Schedulers.newThread() method creates and returns a Scheduler that creates a new Thread for each unit of work.
Create the following Java program using any editor of your choice in, say, C:\> RxJava.
import java.util.Random; import io.reactivex.Observable; import io.reactivex.schedulers.Schedulers; public class ObservableTester { public static void main(String[] args) throws InterruptedException { Observable.just("A", "AB", "ABC") .flatMap(v -> getLengthWithDelay(v) .doOnNext(s -> System.out.println("Processing Thread " + Thread.currentThread().getName())) .subscribeOn(Schedulers.newThread())) .subscribe(length -> System.out.println("Receiver Thread " + Thread.currentThread().getName() + ", Item length " + length)); Thread.sleep(10000); } protected static Observable<Integer> getLengthWithDelay(String v) { Random random = new Random(); try { Thread.sleep(random.nextInt(3) * 1000); return Observable.just(v.length()); } catch (InterruptedException e) { e.printStackTrace(); } return null; } }
Compile the class using javac compiler as follows −
C:\RxJava>javac ObservableTester.java
Now run the ObservableTester as follows −
C:\RxJava>java ObservableTester
It should produce the following output −
Processing Thread RxNewThreadScheduler-1 Receiver Thread RxNewThreadScheduler-1, Item length 1 Processing Thread RxNewThreadScheduler-2 Receiver Thread RxNewThreadScheduler-2, Item length 2 Processing Thread RxNewThreadScheduler-3 Receiver Thread RxNewThreadScheduler-3, Item length 3
Schedulers.computation() method creates and returns a Scheduler intended for computational work. Count of threads to be scheduled depends upon the CPUs present in the system. One thread is allowed per CPU. Best for event-loops or callback operations.
Create the following Java program using any editor of your choice in, say, C:\> RxJava.
import java.util.Random; import io.reactivex.Observable; import io.reactivex.schedulers.Schedulers; public class ObservableTester { public static void main(String[] args) throws InterruptedException { Observable.just("A", "AB", "ABC") .flatMap(v -> getLengthWithDelay(v) .doOnNext(s -> System.out.println("Processing Thread " + Thread.currentThread().getName())) .subscribeOn(Schedulers.computation())) .subscribe(length -> System.out.println("Receiver Thread " + Thread.currentThread().getName() + ", Item length " + length)); Thread.sleep(10000); } protected static Observable<Integer> getLengthWithDelay(String v) { Random random = new Random(); try { Thread.sleep(random.nextInt(3) * 1000); return Observable.just(v.length()); } catch (InterruptedException e) { e.printStackTrace(); } return null; } }
Compile the class using javac compiler as follows −
C:\RxJava>javac ObservableTester.java
Now run the ObservableTester as follows −
C:\RxJava>java ObservableTester
It should produce the following output −
Processing Thread RxComputationThreadPool-1 Receiver Thread RxComputationThreadPool-1, Item length 1 Processing Thread RxComputationThreadPool-2 Receiver Thread RxComputationThreadPool-2, Item length 2 Processing Thread RxComputationThreadPool-3 Receiver Thread RxComputationThreadPool-3, Item length 3
Schedulers.io() method creates and returns a Scheduler intended for IO-bound work. Thread pool may extend as needed. Best for I/O intensive operations.
Create the following Java program using any editor of your choice in, say, C:\> RxJava.
import java.util.Random; import io.reactivex.Observable; import io.reactivex.schedulers.Schedulers; public class ObservableTester { public static void main(String[] args) throws InterruptedException { Observable.just("A", "AB", "ABC") .flatMap(v -> getLengthWithDelay(v) .doOnNext(s -> System.out.println("Processing Thread " + Thread.currentThread().getName())) .subscribeOn(Schedulers.io())) .subscribe(length -> System.out.println("Receiver Thread " + Thread.currentThread().getName() + ", Item length " + length)); Thread.sleep(10000); } protected static Observable<Integer> getLengthWithDelay(String v) { Random random = new Random(); try { Thread.sleep(random.nextInt(3) * 1000); return Observable.just(v.length()); } catch (InterruptedException e) { e.printStackTrace(); } return null; } }
Compile the class using javac compiler as follows −
C:\RxJava>javac ObservableTester.java
Now run the ObservableTester as follows −
C:\RxJava>java ObservableTester
It should produce the following output −
Processing Thread RxCachedThreadScheduler-1 Receiver Thread RxCachedThreadScheduler-1, Item length 1 Processing Thread RxCachedThreadScheduler-1 Receiver Thread RxCachedThreadScheduler-1, Item length 2 Processing Thread RxCachedThreadScheduler-1 Receiver Thread RxCachedThreadScheduler-1, Item length 3
Schedulers.from(Executor) method converts an Executor into a new Scheduler instance.
Create the following Java program using any editor of your choice in, say, C:\> RxJava.
import java.util.Random; import java.util.concurrent.Executors; import io.reactivex.Observable; import io.reactivex.schedulers.Schedulers; public class ObservableTester { public static void main(String[] args) throws InterruptedException { Observable.just("A", "AB", "ABC") .flatMap(v -> getLengthWithDelay(v) .doOnNext(s -> System.out.println("Processing Thread " + Thread.currentThread().getName())) .subscribeOn(Schedulers.from(Executors.newFixedThreadPool(3)))) .subscribe(length -> System.out.println("Receiver Thread " + Thread.currentThread().getName() + ", Item length " + length)); Thread.sleep(10000); } protected static Observable<Integer> getLengthWithDelay(String v) { Random random = new Random(); try { Thread.sleep(random.nextInt(3) * 1000); return Observable.just(v.length()); } catch (InterruptedException e) { e.printStackTrace(); } return null; } }
Compile the class using javac compiler as follows −
C:\RxJava>javac ObservableTester.java
Now run the ObservableTester as follows −
C:\RxJava>java ObservableTester
It should produce the following output −
Processing Thread pool-1-thread-1 Processing Thread pool-3-thread-1 Receiver Thread pool-1-thread-1, Item length 1 Processing Thread pool-4-thread-1 Receiver Thread pool-4-thread-1, Item length 3 Receiver Thread pool-3-thread-1, Item length 2
Buffering operator allows to gather items emitted by an Observable into a list or bundles and emit those bundles instead of items. In the example below, we've created an Observable to emit 9 items and using buffering, 3 items will be emitted together.
Create the following Java program using any editor of your choice in, say, C:\> RxJava.
import io.reactivex.Observable; import io.reactivex.Observer; import io.reactivex.disposables.Disposable; import io.reactivex.schedulers.Schedulers; import java.util.List; import java.util.concurrent.TimeUnit; public class ObservableTester { public static void main(String[] args) throws InterruptedException { Observable<Integer> observable = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9); observable.subscribeOn(Schedulers.io()) .delay(2, TimeUnit.SECONDS, Schedulers.io()) .buffer(3) .subscribe(new Observer<List<Integer>>() { @Override public void onSubscribe(Disposable d) { System.out.println("Subscribed"); } @Override public void onNext(List<Integer> integers) { System.out.println("onNext: "); for (Integer value : integers) { System.out.println(value); } } @Override public void onError(Throwable e) { System.out.println("Error"); } @Override public void onComplete() { System.out.println("Done! "); } }); Thread.sleep(3000); } }
Compile the class using javac compiler as follows −
C:\RxJava>javac ObservableTester.java
Now run the ObservableTester as follows −
C:\RxJava>java ObservableTester
It should produce the following output −
Subscribed onNext: 1 2 3 onNext: 4 5 6 onNext: 7 8 9 Done!
Windowing operator works similar to buffer operator but it allows to gather items emitted by an Observable into another observable instead of collection and emit those Observables instead of collections. In the example below, we've created an Observable to emit 9 items and using window operator, 3 Observable will be emitted together.
Create the following Java program using any editor of your choice in, say, C:\> RxJava.
import io.reactivex.Observable; import io.reactivex.Observer; import io.reactivex.disposables.Disposable; import io.reactivex.schedulers.Schedulers; import java.util.concurrent.TimeUnit; public class ObservableTester { public static void main(String[] args) throws InterruptedException { Observable<Integer> observable = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9); observable.subscribeOn(Schedulers.io()) .delay(2, TimeUnit.SECONDS, Schedulers.io()) .window(3) .subscribe(new Observer<Observable<Integer>>() { @Override public void onSubscribe(Disposable d) { System.out.println("Subscribed"); } @Override public void onNext(Observable<Integer> integers) { System.out.println("onNext: "); integers.subscribe(value -> System.out.println(value)); } @Override public void onError(Throwable e) { System.out.println("Error"); } @Override public void onComplete() { System.out.println("Done! "); } }); Thread.sleep(3000); } }
Compile the class using javac compiler as follows −
C:\RxJava>javac ObservableTester.java
Now run the ObservableTester as follows −
C:\RxJava>java ObservableTester
It should produce the following output −
Subscribed onNext: 1 2 3 onNext: 4 5 6 onNext: 7 8 9 Done!