A subject is an observable sequence, as well as, an observer that can multicast, i.e. talk to many observers that have subscribed.
We are going to discuss the following topics on subject −
To work with a subject, we need to import Subject as shown below −
from rx.subject import Subject
You can create a subject-object as follows −
subject_test = Subject()
The object is an observer that has three methods −
You can create multiple subscription on the subject as shown below −
subject_test.subscribe( lambda x: print("The value is {0}".format(x)) ) subject_test.subscribe( lambda x: print("The value is {0}".format(x)) )
You can pass data to the subject created using the on_next(value) method as shown below −
subject_test.on_next("A") subject_test.on_next("B")
The data will be passed to all the subscription, added on the subject.
Here, is a working example of the subject.
from rx.subject import Subject subject_test = Subject() subject_test.subscribe( lambda x: print("The value is {0}".format(x)) ) subject_test.subscribe( lambda x: print("The value is {0}".format(x)) ) subject_test.on_next("A") subject_test.on_next("B")
The subject_test object is created by calling a Subject(). The subject_test object has reference to on_next(value), on_error(error) and on_completed() methods. The output of the above example is shown below −
E:\pyrx>python testrx.py The value is A The value is A The value is B The value is B
We can use the on_completed() method, to stop the subject execution as shown below.
from rx.subject import Subject subject_test = Subject() subject_test.subscribe( lambda x: print("The value is {0}".format(x)) ) subject_test.subscribe( lambda x: print("The value is {0}".format(x)) ) subject_test.on_next("A") subject_test.on_completed() subject_test.on_next("B")
Once we call complete, the next method called later is not invoked.
E:\pyrx>python testrx.py The value is A The value is A
Let us now see, how to call on_error(error) method.
from rx.subject import Subject subject_test = Subject() subject_test.subscribe( on_error = lambda e: print("Error : {0}".format(e)) ) subject_test.subscribe( on_error = lambda e: print("Error : {0}".format(e)) ) subject_test.on_error(Exception('There is an Error!'))
E:\pyrx>python testrx.py Error: There is an Error! Error: There is an Error!
BehaviorSubject will give you the latest value when called. You can create behavior subject as shown below −
from rx.subject import BehaviorSubject behavior_subject = BehaviorSubject("Testing Behaviour Subject"); // initialized the behaviour subject with value:Testing Behaviour Subject
Here, is a working example to use Behaviour Subject
from rx.subject import BehaviorSubject behavior_subject = BehaviorSubject("Testing Behaviour Subject"); behavior_subject.subscribe( lambda x: print("Observer A : {0}".format(x)) ) behavior_subject.on_next("Hello") behavior_subject.subscribe( lambda x: print("Observer B : {0}".format(x)) ) behavior_subject.on_next("Last call to Behaviour Subject")
E:\pyrx>python testrx.py Observer A : Testing Behaviour Subject Observer A : Hello Observer B : Hello Observer A : Last call to Behaviour Subject Observer B : Last call to Behaviour Subject
A replaysubject is similar to behavior subject, wherein, it can buffer the values and replay the same to the new subscribers. Here, is a working example of replay subject.
from rx.subject import ReplaySubject replay_subject = ReplaySubject(2) replay_subject.subscribe(lambda x: print("Testing Replay Subject A: {0}".format(x))) replay_subject.on_next(1) replay_subject.on_next(2) replay_subject.on_next(3) replay_subject.subscribe(lambda x: print("Testing Replay Subject B: {0}".format(x))); replay_subject.on_next(5)
The buffer value used is 2 on the replay subject. So, the last two values will be buffered and used for the new subscribers called.
E:\pyrx>python testrx.py Testing Replay Subject A: 1 Testing Replay Subject A: 2 Testing Replay Subject A: 3 Testing Replay Subject B: 2 Testing Replay Subject B: 3 Testing Replay Subject A: 5 Testing Replay Subject B: 5
In the case of AsyncSubject, the last value called is passed to the subscriber, and it will be done only after the complete() method is called.
from rx.subject import AsyncSubject async_subject = AsyncSubject() async_subject.subscribe(lambda x: print("Testing Async Subject A: {0}".format(x))) async_subject.on_next(1) async_subject.on_next(2) async_subject.on_completed() async_subject.subscribe(lambda x: print("Testing Async Subject B: {0}".format(x))) Here, before complete is called, the last value passed to the subject is 2, and the same is given to the subscribers.
E:\pyrx>python testrx.py Testing Async Subject A: 2 Testing Async Subject B: 2