Threads Intercommunication


Advertisements

In real life, if a team of people is working on a common task then there should be communication between them for finishing the task properly. The same analogy is applicable to threads also. In programming, to reduce the ideal time of the processor we create multiple threads and assign different sub tasks to every thread. Hence, there must be a communication facility and they should interact with each other to finish the job in a synchronized manner.

Consider the following important points related to thread intercommunication −

  • No performance gain − If we cannot achieve proper communication between threads and processes then the performance gains from concurrency and parallelism is of no use.

  • Accomplish task properly − Without proper intercommunication mechanism between threads, the assigned task cannot be completed properly.

  • More efficient than inter-process communication − Inter-thread communication is more efficient and easy to use than inter-process communication because all threads within a process share same address space and they need not use shared memory.

Python data structures for thread-safe communication

Multithreaded code comes up with a problem of passing information from one thread to another thread. The standard communication primitives do not solve this issue. Hence, we need to implement our own composite object in order to share objects between threads to make the communication thread-safe. Following are a few data structures, which provide thread-safe communication after making some changes in them −

Sets

For using set data structure in a thread-safe manner, we need to extend the set class to implement our own locking mechanism.

Example

Here is a Python example of extending the class −

class extend_class(set):
   def __init__(self, *args, **kwargs):
      self._lock = Lock()
      super(extend_class, self).__init__(*args, **kwargs)

   def add(self, elem):
      self._lock.acquire()
	  try:
      super(extend_class, self).add(elem)
      finally:
      self._lock.release()
  
   def delete(self, elem):
      self._lock.acquire()
      try:
      super(extend_class, self).delete(elem)
      finally:
      self._lock.release()

In the above example, a class object named extend_class has been defined which is further inherited from the Python set class. A lock object is created within the constructor of this class. Now, there are two functions - add() and delete(). These functions are defined and are thread-safe. They both rely on the super class functionality with one key exception.

Decorator

This is another key method for thread-safe communication is the use of decorators.

Example

Consider a Python example that shows how to use decorators &mminus;

def lock_decorator(method):

   def new_deco_method(self, *args, **kwargs):
      with self._lock:
         return method(self, *args, **kwargs)
return new_deco_method

class Decorator_class(set):
   def __init__(self, *args, **kwargs):
      self._lock = Lock()
      super(Decorator_class, self).__init__(*args, **kwargs)

   @lock_decorator
   def add(self, *args, **kwargs):
      return super(Decorator_class, self).add(elem)
   @lock_decorator
   def delete(self, *args, **kwargs):
      return super(Decorator_class, self).delete(elem)

In the above example, a decorator method named lock_decorator has been defined which is further inherited from the Python method class. Then a lock object is created within the constructor of this class. Now, there are two functions - add() and delete(). These functions are defined and are thread-safe. They both rely on super class functionality with one key exception.

Lists

The list data structure is thread-safe, quick as well as easy structure for temporary, in-memory storage. In Cpython, the GIL protects against concurrent access to them. As we came to know that lists are thread-safe but what about the data lying in them. Actually, the list’s data is not protected. For example, L.append(x) is not guarantee to return the expected result if another thread is trying to do the same thing. This is because, although append() is an atomic operation and thread-safe but the other thread is trying to modify the list’s data in concurrent fashion hence we can see the side effects of race conditions on the output.

To resolve this kind of issue and safely modify the data, we must implement a proper locking mechanism, which further ensures that multiple threads cannot potentially run into race conditions. To implement proper locking mechanism, we can extend the class as we did in the previous examples.

Some other atomic operations on lists are as follows −

L.append(x)
L1.extend(L2)
x = L[i]
x = L.pop()
L1[i:j] = L2
L.sort()
x = y
x.field = y
D[x] = y
D1.update(D2)
D.keys()

Here −

  • L,L1,L2 all are lists
  • D,D1,D2 are dicts
  • x,y are objects
  • i, j are ints

Queues

If the list’s data is not protected, we might have to face the consequences. We may get or delete wrong data item, of race conditions. That is why it is recommended to use the queue data structure. A real-world example of queue can be a single-lane one-way road, where the vehicle enters first, exits first. More real-world examples can be seen of the queues at the ticket windows and bus-stops.

Queues

Queues are by default, thread-safe data structure and we need not worry about implementing complex locking mechanism. Python provides us the module to use different types of queues in our application.

Types of Queues

In this section, we will earn about the different types of queues. Python provides three options of queues to use from the <queue> module −

  • Normal Queues (FIFO, First in First out)
  • LIFO, Last in First Out
  • Priority

We will learn about the different queues in the subsequent sections.

Normal Queues (FIFO, First in First out)

It is most commonly used queue implementations offered by Python. In this queuing mechanism whosoever will come first, will get the service first. FIFO is also called normal queues. FIFO queues can be represented as follows −

FIFO

Python Implementation of FIFO Queue

In python, FIFO queue can be implemented with single thread as well as multithreads.

FIFO queue with single thread

For implementing FIFO queue with single thread, the Queue class will implement a basic first-in, first-out container. Elements will be added to one “end” of the sequence using put(), and removed from the other end using get().

Example

Following is a Python program for implementation of FIFO queue with single thread −

import queue

q = queue.Queue()

for i in range(8):
   q.put("item-" + str(i))

while not q.empty():
   print (q.get(), end = " ")

Output

item-0 item-1 item-2 item-3 item-4 item-5 item-6 item-7

The output shows that above program uses a single thread to illustrate that the elements are removed from the queue in the same order they are inserted.

FIFO queue with multiple threads

For implementing FIFO with multiple threads, we need to define the myqueue() function, which is extended from the queue module. The working of get() and put() methods are same as discussed above while implementing FIFO queue with single thread. Then to make it multithreaded, we need to declare and instantiate the threads. These threads will consume the queue in FIFO manner.

Example

Following is a Python program for implementation of FIFO queue with multiple threads

import threading
import queue
import random
import time
def myqueue(queue):
   while not queue.empty():
   item = queue.get()
   if item is None:
   break
   print("{} removed {} from the queue".format(threading.current_thread(), item))
   queue.task_done()
   time.sleep(2)
q = queue.Queue()
for i in range(5):
   q.put(i)
threads = []
for i in range(4):
   thread = threading.Thread(target=myqueue, args=(q,))
   thread.start()
   threads.append(thread)
for thread in threads:
   thread.join()

Output

<Thread(Thread-3654, started 5044)> removed 0 from the queue
<Thread(Thread-3655, started 3144)> removed 1 from the queue
<Thread(Thread-3656, started 6996)> removed 2 from the queue
<Thread(Thread-3657, started 2672)> removed 3 from the queue
<Thread(Thread-3654, started 5044)> removed 4 from the queue

LIFO, Last in First Out queue

This queue uses totally opposite analogy than FIFO(First in First Out) queues. In this queuing mechanism, the one who comes last, will get the service first. This is similar to implement stack data structure. LIFO queues prove useful while implementing Depth-first search like algorithms of artificial intelligence.

Python implementation of LIFO queue

In python, LIFO queue can be implemented with single thread as well as multithreads.

LIFO queue with single thread

For implementing LIFO queue with single thread, the Queue class will implement a basic last-in, first-out container by using the structure Queue.LifoQueue. Now, on calling put(), the elements are added in the head of the container and removed from the head also on using get().

Example

Following is a Python program for implementation of the LIFO queue with single thread −

import queue

q = queue.LifoQueue()

for i in range(8):
   q.put("item-" + str(i))

while not q.empty():
   print (q.get(), end=" ")
Output:
item-7 item-6 item-5 item-4 item-3 item-2 item-1 item-0

The output shows that the above program uses a single thread to illustrate that elements are removed from the queue in the opposite order they are inserted.

LIFO queue with multiple threads

The implementation is similar as we have done the implementation of FIFO queues with multiple threads. The only difference is that we need to use the Queue class that will implement a basic last-in, first-out container by using the structure Queue.LifoQueue.

Example

Following is a Python program for implementation of LIFO queue with multiple threads −

import threading
import queue
import random
import time
def myqueue(queue):
   while not queue.empty():
      item = queue.get()
      if item is None:
      break
	  print("{} removed {} from the queue".format(threading.current_thread(), item))
      queue.task_done()
      time.sleep(2)
q = queue.LifoQueue()
for i in range(5):
   q.put(i)
threads = []
for i in range(4):
   thread = threading.Thread(target=myqueue, args=(q,))
   thread.start()
   threads.append(thread)
for thread in threads:
   thread.join() 

Output

<Thread(Thread-3882, started 4928)> removed 4 from the queue
<Thread(Thread-3883, started 4364)> removed 3 from the queue
<Thread(Thread-3884, started 6908)> removed 2 from the queue
<Thread(Thread-3885, started 3584)> removed 1 from the queue
<Thread(Thread-3882, started 4928)> removed 0 from the queue

Priority queue

In FIFO and LIFO queues, the order of items are related to the order of insertion. However, there are many cases when the priority is more important than the order of insertion. Let us consider a real world example. Suppose the security at the airport is checking people of different categories. People of the VVIP, airline staff, custom officer, categories may be checked on priority instead of being checked on the basis of arrival like it is for the commoners.

Another important aspect that needs to be considered for priority queue is how to develop a task scheduler. One common design is to serve the most agent task on priority basis in the queue. This data structure can be used to pick up the items from the queue based on their priority value.

Python Implementation of Priority Queue

In python, priority queue can be implemented with single thread as well as multithreads.

Priority queue with single thread

For implementing priority queue with single thread, the Queue class will implement a task on priority container by using the structure Queue.PriorityQueue. Now, on calling put(), the elements are added with a value where the lowest value will have the highest priority and hence retrieved first by using get().

Example

Consider the following Python program for implementation of Priority queue with single thread −

import queue as Q
p_queue = Q.PriorityQueue()

p_queue.put((2, 'Urgent'))
p_queue.put((1, 'Most Urgent'))
p_queue.put((10, 'Nothing important'))
prio_queue.put((5, 'Important'))

while not p_queue.empty():
   item = p_queue.get()
   print('%s - %s' % item)

Output

1 – Most Urgent
2 - Urgent
5 - Important
10 – Nothing important

In the above output, we can see that the queue has stored the items based on priority – less value is having high priority.

Priority queue with multi threads

The implementation is similar to the implementation of FIFO and LIFO queues with multiple threads. The only difference is that we need to use the Queue class for initializing the priority by using the structure Queue.PriorityQueue. Another difference is with the way the queue would be generated. In the example given below, it will be generated with two identical data sets.

Example

The following Python program helps in the implementation of priority queue with multiple threads −

import threading
import queue
import random
import time
def myqueue(queue):
   while not queue.empty():
      item = queue.get()
      if item is None:
      break
      print("{} removed {} from the queue".format(threading.current_thread(), item))
      queue.task_done()
      time.sleep(1)
q = queue.PriorityQueue()
for i in range(5):
   q.put(i,1)

for i in range(5):
   q.put(i,1)

threads = []
for i in range(2):
   thread = threading.Thread(target=myqueue, args=(q,))
   thread.start()
   threads.append(thread)
for thread in threads:
   thread.join()

Output

<Thread(Thread-4939, started 2420)> removed 0 from the queue
<Thread(Thread-4940, started 3284)> removed 0 from the queue
<Thread(Thread-4939, started 2420)> removed 1 from the queue
<Thread(Thread-4940, started 3284)> removed 1 from the queue
<Thread(Thread-4939, started 2420)> removed 2 from the queue
<Thread(Thread-4940, started 3284)> removed 2 from the queue
<Thread(Thread-4939, started 2420)> removed 3 from the queue
<Thread(Thread-4940, started 3284)> removed 3 from the queue
<Thread(Thread-4939, started 2420)> removed 4 from the queue
<Thread(Thread-4940, started 3284)> removed 4 from the queue
Advertisements