[Python] Pipes vs. Queues for Flawless Data Communication in Multiprocessing

[Python] Pipes vs. Queues for Flawless Data Communication in Multiprocessing

Understanding Process

Python] MultiProcessing map() vs imap() | LIM

Key characteristics of process:

  1. Process must contain a main thread.

  2. Code, data, stack and heap are independent.

    • This means it is not so easy and low-cost to share memories within multiple processes.
  3. CPU + Memory space are independent.

  4. Communication within processes are done by using piple, file and socket.

    • Context switching

Sharing State

from multiprocessing import Process, current_process, Value, Array
import os

def generate_update_number(v: int): 
    for _ in range(50):
        v.value += 1

    print(current_process().name, 'data', v)

def main():
    parent_process_id = os.getpid()
    print(f"Parent Process ID: {parent_process_id}")

    processes = list()
    share_value = Value('i', 0)

    for _ in range(1, 10):
        p = Process(target=generate_update_number, args=(share_value,))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    print('Final Data in parent process:', share_value)

if __name__ == '__main__':
    main()

Output:

Parent Process ID: 490
Process-1 data <Synchronized wrapper for c_int(50)>
Process-2 data <Synchronized wrapper for c_int(100)>
Process-3 data <Synchronized wrapper for c_int(150)>
Process-4 data <Synchronized wrapper for c_int(200)>
Process-5 data <Synchronized wrapper for c_int(250)>
Process-6 data <Synchronized wrapper for c_int(300)>
Process-7 data <Synchronized wrapper for c_int(350)>
Process-8 data <Synchronized wrapper for c_int(400)>
Process-9 data <Synchronized wrapper for c_int(450)>
Final Data in parent process: <Synchronized wrapper for c_int(450)>

Using Value and Array module from multiprocessing package is a way to share data within multiple processes. You can see that the value is increased by 50, which means, 9 processes are using the same variable and adding 50.

share_value = Value('i', 0)

‘i’ means that the data type is an int. If your data type is a char, it’s going to be Value(‘c’, ‘a’). If str, Value(‘s’, ‘some str’).

Using Array

If your data is not just an int or str, but an array, you can use from multiprocessing import Array.

share_numbers = Array('i', range(50))

More Options

  • Since Python 3.8 release, you can use from multiprocess import shared memory .

  • from multiprocess import Manager.


Pipe

In multiprocessing, a pipe is something that connects child connection (child_conn) and parent connection (parent_conn), which means pipe is for two-way communication (1 to 1).

Let’s say Player A and B are playing the tennis. If the player A sends, player B must receive to flawlessly play the game. That’s the rule. It’s the same for the pipe.

Python Lessons

Code:

from multiprocessing import Process, Pipe, current_process
import time
import os

def worker(id, baseNum, conn):
    process_id = os.getpid()
    process_name = current_process().name

    sub_total = 0

    for i in range(baseNum):
        sub_total += 1

    conn.send(sub_total)
    conn.close()

    print(f'Process ID: {process_id}, Process Name: {process_name}')
    print(f'Result: {sub_total}')

def main():
    parent_process_id = os.getpid()
    print(f'Parent process id: {parent_process_id}')

    start_time = time.time()
    parent_conn, child_conn = Pipe()

    t = Process(name=str(1), target=worker, args=(1, 1000000, child_conn))
    t.start()
    t.join()

    print("--- %s seconds ---" % (time.time() - start_time))
    print('Main-processing total count: {}'.format(parent_conn.recv()))
    print('Main-processing done.')

if __name__ == '__main__':
    main()

Output:

Parent process id: 2340
Process ID: 2341, Process Name: 1
Result: 1000000
--- 0.16553831100463867 seconds ---
Main-processing total count: 1000000
Main-processing done.

Queue

If you have basic computer science knowledge, you may have heard about queue data structure. One of its key characteristics is FIFO (First-in-first-out). Using the put() function, we can insert data to the Queue, and using get(), we can get items from queues.

Code:

from multiprocessing import Process, Queue, current_process
import time
import os

def worker(id, baseNum, q):
    process_id = os.getpid()
    process_name = current_process().name

    sub_total = 0

    for i in range(baseNum):
        sub_total += 1

    q.put(sub_total)
    print(f'Process ID: {process_id}, Process Name: {process_name}')
    print(f'Result: {sub_total}')

def main():
    parent_process_id = os.getpid()
    print(f'Parent process id: {parent_process_id}')

    processes = list()
    start_time = time.time()
    q = Queue()

    for i in range(5):
        t = Process(name=str(i), target=worker, args=(i, 1000000, q))
        processes.append(t)
        t.start()

    for process in processes:
        process.join()

    print("--- %s seconds ---" % (time.time() - start_time))

    q.put('exit')

    total = 0
    while True:
        tmp = q.get()
        if tmp == 'exit':
            break
        else:
            total += tmp

    print()
    print('Main-processing total count: {}'.format(total))
    print('Main-processing done.')

if __name__ == '__main__':
    main()

Output:

Parent process id: 387
Process ID: 391, Process Name: 3
Process ID: 390, Process Name: 2Result: 1000000

Result: 1000000
Process ID: 392, Process Name: 4
Result: 1000000
Process ID: 389, Process Name: 1
Result: 1000000
Process ID: 388, Process Name: 0
Result: 1000000
--- 1.0864899158477783 seconds ---

Main-processing total count: 5000000
Main-processing done.

You can see that the total count is 5000000.

Did you find this article valuable?

Support Jay's Dev Blog by becoming a sponsor. Any amount is appreciated!