RxPY - Working with Subject


Advertisements

A subject is an observable sequence, as well as, an observer that can multicast, i.e. talk to many observers that have subscribed.

We are going to discuss the following topics on subject −

  • Create a subject
  • Subscribe to a subject
  • Passing data to subject
  • BehaviorSubject
  • ReplaySubject
  • AsyncSubject

Create a subject

To work with a subject, we need to import Subject as shown below −

from rx.subject import Subject

You can create a subject-object as follows −

subject_test = Subject()

The object is an observer that has three methods −

  • on_next(value)
  • on_error(error) and
  • on_completed()

Subscribe to a Subject

You can create multiple subscription on the subject as shown below −

subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)

Passing Data to Subject

You can pass data to the subject created using the on_next(value) method as shown below −

subject_test.on_next("A")
subject_test.on_next("B")

The data will be passed to all the subscription, added on the subject.

Here, is a working example of the subject.

Example

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_next("B")

The subject_test object is created by calling a Subject(). The subject_test object has reference to on_next(value), on_error(error) and on_completed() methods. The output of the above example is shown below −

Output

E:\pyrx>python testrx.py
The value is A
The value is A
The value is B
The value is B

We can use the on_completed() method, to stop the subject execution as shown below.

Example

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_completed()
subject_test.on_next("B")

Once we call complete, the next method called later is not invoked.

Output

E:\pyrx>python testrx.py
The value is A
The value is A

Let us now see, how to call on_error(error) method.

Example

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.on_error(Exception('There is an Error!'))

Output

E:\pyrx>python testrx.py
Error: There is an Error!
Error: There is an Error!

BehaviorSubject

BehaviorSubject will give you the latest value when called. You can create behavior subject as shown below −

from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject"); // initialized the behaviour subject with value:Testing Behaviour Subject

Here, is a working example to use Behaviour Subject

Example

from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject");
behavior_subject.subscribe(
   lambda x: print("Observer A : {0}".format(x))
)
behavior_subject.on_next("Hello")
behavior_subject.subscribe(
   lambda x: print("Observer B : {0}".format(x))
)
behavior_subject.on_next("Last call to Behaviour Subject")

Output

E:\pyrx>python testrx.py
Observer A : Testing Behaviour Subject
Observer A : Hello
Observer B : Hello
Observer A : Last call to Behaviour Subject
Observer B : Last call to Behaviour Subject

Replay Subject

A replaysubject is similar to behavior subject, wherein, it can buffer the values and replay the same to the new subscribers. Here, is a working example of replay subject.

Example

from rx.subject import ReplaySubject
replay_subject = ReplaySubject(2)
replay_subject.subscribe(lambda x: print("Testing Replay Subject A: {0}".format(x)))
replay_subject.on_next(1)
replay_subject.on_next(2)
replay_subject.on_next(3)
replay_subject.subscribe(lambda x: print("Testing Replay Subject B: {0}".format(x)));
replay_subject.on_next(5)

The buffer value used is 2 on the replay subject. So, the last two values will be buffered and used for the new subscribers called.

Output

E:\pyrx>python testrx.py
Testing Replay Subject A: 1
Testing Replay Subject A: 2
Testing Replay Subject A: 3
Testing Replay Subject B: 2
Testing Replay Subject B: 3
Testing Replay Subject A: 5
Testing Replay Subject B: 5

AsyncSubject

In the case of AsyncSubject, the last value called is passed to the subscriber, and it will be done only after the complete() method is called.

Example

from rx.subject import AsyncSubject
async_subject = AsyncSubject()
async_subject.subscribe(lambda x: print("Testing Async Subject A: {0}".format(x)))
async_subject.on_next(1)
async_subject.on_next(2)
async_subject.on_completed()
async_subject.subscribe(lambda x: print("Testing Async Subject B: {0}".format(x)))
Here, before complete is called, the last value passed to the subject is 2, and the same is given to the subscribers.

Output

E:\pyrx>python testrx.py
Testing Async Subject A: 2
Testing Async Subject B: 2
Advertisements