In this chapter, we will discuss the following topics in detail −
Given below is a basic example showing the working of observable, operators, and subscribing to the observer.
test.py
import requests import rx import json from rx import operators as ops def filternames(x): if (x["name"].startswith("C")): return x["name"] else : return "" content = requests.get('https://jsonplaceholder.typicode.com/users') y = json.loads(content.text) source = rx.from_(y) case1 = source.pipe( ops.filter(lambda c: filternames(c)), ops.map(lambda a:a["name"]) ) case1.subscribe( on_next = lambda i: print("Got - {0}".format(i)), 8. RxPy — Examples on_error = lambda e: print("Error : {0}".format(e)), on_completed = lambda: print("Job Done!"), )
Here, is a very simple example, wherein, I am getting user data from this URL −
https://jsonplaceholder.typicode.com/users.Filtering the data, to give the names starting with "C", and later using the map to return the names only. Here is the output for the same −
E:\pyrx\examples>python test.py Got - Clementine Bauch Got - Chelsey Dietrich Got - Clementina DuBuque Job Done!
In this example, we will see the difference between an observable and a subject.
from rx import of, operators as op import random test1 = of(1,2,3,4,5) sub1 = test1.pipe( op.map(lambda a : a+random.random()) ) print("From first subscriber") subscriber1 = sub1.subscribe(lambda i: print("From sub1 {0}".format(i))) print("From second subscriber") subscriber2 = sub1.subscribe(lambda i: print("From sub2 {0}".format(i)))
E:\pyrx>python testrx.py From first subscriber From sub1 1.610450821095726 From sub1 2.9567564032037335 From sub1 3.933217537811936 From sub1 4.82444905626622 From sub1 5.929414892567188 From second subscriber From sub2 1.8573813517529874 From sub2 2.902433239469483 From sub2 3.2289868093016825 From sub2 4.050413890694411 From sub2 5.226515068012821
In the above example, every time you subscribe to the observable, it will give you new values.
from rx import of, operators as op import random from rx.subject import Subject subject_test = Subject() subject_test.subscribe( lambda x: print("From sub1 {0}".format(x)) ) subject_test.subscribe( lambda x: print("From sub2 {0}".format(x)) ) test1 = of(1,2,3,4,5) sub1 = test1.pipe( op.map(lambda a : a+random.random()) ) subscriber = sub1.subscribe(subject_test)
E:\pyrx>python testrx.py From sub1 1.1789422863284509 From sub2 1.1789422863284509 From sub1 2.5525627903260153 From sub2 2.5525627903260153 From sub1 3.4191549324778325 From sub2 3.4191549324778325 From sub1 4.644042420199624 From sub2 4.644042420199624 From sub1 5.079896897489065 From sub2 5.079896897489065
If you see the values are shared, between both subscribers using the subject.
An observable is classified as
The difference in observables will be noticed when multiple subscribers are subscribing.
Cold observables, are observable that are executed, and renders data each time it is subscribed. When it is subscribed, the observable is executed and the fresh values are given.
The following example gives the understanding of cold observable.
from rx import of, operators as op import random test1 = of(1,2,3,4,5) sub1 = test1.pipe( op.map(lambda a : a+random.random()) ) print("From first subscriber") subscriber1 = sub1.subscribe(lambda i: print("From sub1 {0}".format(i))) print("From second subscriber") subscriber2 = sub1.subscribe(lambda i: print("From sub2 {0}".format(i)))
E:\pyrx>python testrx.py From first subscriber From sub1 1.610450821095726 From sub1 2.9567564032037335 From sub1 3.933217537811936 From sub1 4.82444905626622 From sub1 5.929414892567188 From second subscriber From sub2 1.8573813517529874 From sub2 2.902433239469483 From sub2 3.2289868093016825 From sub2 4.050413890694411 From sub2 5.226515068012821
In the above example, every time you subscribe to the observable, it will execute the observable and emit values. The values can also differ from subscriber to subscriber as shown in the example above.
In the case of hot observable, they will emit the values when they are ready and will not always wait for a subscription. When the values are emitted, all the subscribers will get the same value.
You can make use of hot observable when you want values to emitted when the observable is ready, or you want to share the same values to all your subscribers.
An example of hot observable is Subject and connectable operators.
from rx import of, operators as op import random from rx.subject import Subject subject_test = Subject() subject_test.subscribe( lambda x: print("From sub1 {0}".format(x)) ) subject_test.subscribe( lambda x: print("From sub2 {0}".format(x)) ) test1 = of(1,2,3,4,5) sub1 = test1.pipe( op.map(lambda a : a+random.random()) ) subscriber = sub1.subscribe(subject_test)
E:\pyrx>python testrx.py From sub1 1.1789422863284509 From sub2 1.1789422863284509 From sub1 2.5525627903260153 From sub2 2.5525627903260153 From sub1 3.4191549324778325 From sub2 3.4191549324778325 From sub1 4.644042420199624 From sub2 4.644042420199624 From sub1 5.079896897489065 From sub2 5.079896897489065
If you see, the same value is shared between the subscribers. You can achieve the same using publish () connectable observable operator.