One important feature of RxPy is concurrency, i.e. to allow the task to execute in parallel. To make that happen, we have two operators subscribe_on() and observe_on() that will work with a scheduler, that will decide the execution of the subscribed task.
Here, is a working example, that shows the need for subscibe_on(), observe_on() and scheduler.
import random import time import rx from rx import operators as ops def adding_delay(value): time.sleep(random.randint(5, 20) * 0.1) return value # Task 1 rx.of(1,2,3,4,5).pipe( ops.map(lambda a: adding_delay(a)) ).subscribe( lambda s: print("From Task 1: {0}".format(s)), lambda e: print(e), lambda: print("Task 1 complete") ) # Task 2 rx.range(1, 5).pipe( ops.map(lambda a: adding_delay(a)) ).subscribe( lambda s: print("From Task 2: {0}".format(s)), lambda e: print(e), lambda: print("Task 2 complete") ) input("Press any key to exit\n")
In the above example, I have 2 tasks: Task 1 and Task 2. The execution of the task is in sequence. The second task starts only, when the first task is done.
E:\pyrx>python testrx.py From Task 1: 1 From Task 1: 2 From Task 1: 3 From Task 1: 4 From Task 1: 5 Task 1 complete From Task 2: 1 From Task 2: 2 From Task 2: 3 From Task 2: 4 Task 2 complete
RxPy supports many Scheduler, and here, we are going to make use of ThreadPoolScheduler. ThreadPoolScheduler mainly will try to manage with the CPU threads available.
In the example, we have seen earlier, we are going to make use of a multiprocessing module that will give us the cpu_count. The count will be given to the ThreadPoolScheduler that will manage to get the task working in parallel based on the threads available.
Here, is a working example −
import multiprocessing import random import time from threading import current_thread import rx from rx.scheduler import ThreadPoolScheduler from rx import operators as ops # calculate cpu count, using which will create a ThreadPoolScheduler thread_count = multiprocessing.cpu_count() thread_pool_scheduler = ThreadPoolScheduler(thread_count) print("Cpu count is : {0}".format(thread_count)) def adding_delay(value): time.sleep(random.randint(5, 20) * 0.1) return value # Task 1 rx.of(1,2,3,4,5).pipe( ops.map(lambda a: adding_delay(a)), ops.subscribe_on(thread_pool_scheduler) ).subscribe( lambda s: print("From Task 1: {0}".format(s)), lambda e: print(e), lambda: print("Task 1 complete") ) # Task 2 rx.range(1, 5).pipe( ops.map(lambda a: adding_delay(a)), ops.subscribe_on(thread_pool_scheduler) ).subscribe( lambda s: print("From Task 2: {0}".format(s)), lambda e: print(e), lambda: print("Task 2 complete") ) input("Press any key to exit\n")
In the above example, I have 2 tasks and the cpu_count is 4. Since, the task is 2 and threads available with us are 4, both the task can start in parallel.
E:\pyrx>python testrx.py Cpu count is : 4 Press any key to exit From Task 1: 1 From Task 2: 1 From Task 1: 2 From Task 2: 2 From Task 2: 3 From Task 1: 3 From Task 2: 4 Task 2 complete From Task 1: 4 From Task 1: 5 Task 1 complete
If you see the output, both the task has started in parallel.
Now, consider a scenario, where the task is more than the CPU count i.e. CPU count is 4 and tasks are 5. In this case, we would need to check if any thread has got free after task completion, so that, it can be assigned to the new task available in the queue.
For this purpose, we can use the observe_on() operator which will observe the scheduler if any threads are free. Here, is a working example using observe_on()
import multiprocessing import random import time from threading import current_thread import rx from rx.scheduler import ThreadPoolScheduler from rx import operators as ops # calculate cpu count, using which will create a ThreadPoolScheduler thread_count = multiprocessing.cpu_count() thread_pool_scheduler = ThreadPoolScheduler(thread_count) print("Cpu count is : {0}".format(thread_count)) def adding_delay(value): time.sleep(random.randint(5, 20) * 0.1) return value # Task 1 rx.of(1,2,3,4,5).pipe( ops.map(lambda a: adding_delay(a)), ops.subscribe_on(thread_pool_scheduler) ).subscribe( lambda s: print("From Task 1: {0}".format(s)), lambda e: print(e), lambda: print("Task 1 complete") ) # Task 2 rx.range(1, 5).pipe( ops.map(lambda a: adding_delay(a)), ops.subscribe_on(thread_pool_scheduler) ).subscribe( lambda s: print("From Task 2: {0}".format(s)), lambda e: print(e), lambda: print("Task 2 complete") ) #Task 3 rx.range(1, 5).pipe( ops.map(lambda a: adding_delay(a)), ops.subscribe_on(thread_pool_scheduler) ).subscribe( lambda s: print("From Task 3: {0}".format(s)), lambda e: print(e), lambda: print("Task 3 complete") ) #Task 4 rx.range(1, 5).pipe( ops.map(lambda a: adding_delay(a)), ops.subscribe_on(thread_pool_scheduler) ).subscribe( lambda s: print("From Task 4: {0}".format(s)), lambda e: print(e), lambda: print("Task 4 complete") ) #Task 5 rx.range(1, 5).pipe( ops.map(lambda a: adding_delay(a)), ops.observe_on(thread_pool_scheduler) ).subscribe( lambda s: print("From Task 5: {0}".format(s)), lambda e: print(e), lambda: print("Task 5 complete") ) input("Press any key to exit\n")
E:\pyrx>python testrx.py Cpu count is : 4 From Task 4: 1 From Task 4: 2 From Task 1: 1 From Task 2: 1 From Task 3: 1 From Task 1: 2 From Task 3: 2 From Task 4: 3 From Task 3: 3 From Task 2: 2 From Task 1: 3 From Task 4: 4 Task 4 complete From Task 5: 1 From Task 5: 2 From Task 5: 3 From Task 3: 4 Task 3 complete From Task 2: 3 Press any key to exit From Task 5: 4 Task 5 complete From Task 1: 4 From Task 2: 4 Task 2 complete From Task 1: 5 Task 1 complete
If you see the output, the moment task 4 is complete, the thread is given to the next task i.e., task 5 and the same starts executing.