Understanding Process
Key characteristics of process:
Process must contain a main thread.
Code, data, stack and heap are independent.
- This means it is not so easy and low-cost to share memories within multiple processes.
CPU + Memory space are independent.
Communication within processes are done by using piple, file and socket.
- Context switching
Sharing State
from multiprocessing import Process, current_process, Value, Array
import os
def generate_update_number(v: int):
for _ in range(50):
v.value += 1
print(current_process().name, 'data', v)
def main():
parent_process_id = os.getpid()
print(f"Parent Process ID: {parent_process_id}")
processes = list()
share_value = Value('i', 0)
for _ in range(1, 10):
p = Process(target=generate_update_number, args=(share_value,))
processes.append(p)
p.start()
for p in processes:
p.join()
print('Final Data in parent process:', share_value)
if __name__ == '__main__':
main()
Output:
Parent Process ID: 490
Process-1 data <Synchronized wrapper for c_int(50)>
Process-2 data <Synchronized wrapper for c_int(100)>
Process-3 data <Synchronized wrapper for c_int(150)>
Process-4 data <Synchronized wrapper for c_int(200)>
Process-5 data <Synchronized wrapper for c_int(250)>
Process-6 data <Synchronized wrapper for c_int(300)>
Process-7 data <Synchronized wrapper for c_int(350)>
Process-8 data <Synchronized wrapper for c_int(400)>
Process-9 data <Synchronized wrapper for c_int(450)>
Final Data in parent process: <Synchronized wrapper for c_int(450)>
Using Value and Array module from multiprocessing package is a way to share data within multiple processes. You can see that the value is increased by 50, which means, 9 processes are using the same variable and adding 50.
share_value = Value('i', 0)
‘i’ means that the data type is an int. If your data type is a char, it’s going to be Value(‘c’, ‘a’)
. If str, Value(‘s’, ‘some str’)
.
Using Array
If your data is not just an int or str, but an array, you can use from multiprocessing import Array
.
share_numbers = Array('i', range(50))
More Options
Since Python 3.8 release, you can use
from multiprocess import shared memory
.from multiprocess import Manager
.
Pipe
In multiprocessing, a pipe is something that connects child connection (child_conn) and parent connection (parent_conn), which means pipe is for two-way communication (1 to 1).
Let’s say Player A and B are playing the tennis. If the player A sends, player B must receive to flawlessly play the game. That’s the rule. It’s the same for the pipe.
Code:
from multiprocessing import Process, Pipe, current_process
import time
import os
def worker(id, baseNum, conn):
process_id = os.getpid()
process_name = current_process().name
sub_total = 0
for i in range(baseNum):
sub_total += 1
conn.send(sub_total)
conn.close()
print(f'Process ID: {process_id}, Process Name: {process_name}')
print(f'Result: {sub_total}')
def main():
parent_process_id = os.getpid()
print(f'Parent process id: {parent_process_id}')
start_time = time.time()
parent_conn, child_conn = Pipe()
t = Process(name=str(1), target=worker, args=(1, 1000000, child_conn))
t.start()
t.join()
print("--- %s seconds ---" % (time.time() - start_time))
print('Main-processing total count: {}'.format(parent_conn.recv()))
print('Main-processing done.')
if __name__ == '__main__':
main()
Output:
Parent process id: 2340
Process ID: 2341, Process Name: 1
Result: 1000000
--- 0.16553831100463867 seconds ---
Main-processing total count: 1000000
Main-processing done.
Queue
If you have basic computer science knowledge, you may have heard about queue data structure. One of its key characteristics is FIFO (First-in-first-out). Using the put()
function, we can insert data to the Queue, and using get()
, we can get items from queues.
Code:
from multiprocessing import Process, Queue, current_process
import time
import os
def worker(id, baseNum, q):
process_id = os.getpid()
process_name = current_process().name
sub_total = 0
for i in range(baseNum):
sub_total += 1
q.put(sub_total)
print(f'Process ID: {process_id}, Process Name: {process_name}')
print(f'Result: {sub_total}')
def main():
parent_process_id = os.getpid()
print(f'Parent process id: {parent_process_id}')
processes = list()
start_time = time.time()
q = Queue()
for i in range(5):
t = Process(name=str(i), target=worker, args=(i, 1000000, q))
processes.append(t)
t.start()
for process in processes:
process.join()
print("--- %s seconds ---" % (time.time() - start_time))
q.put('exit')
total = 0
while True:
tmp = q.get()
if tmp == 'exit':
break
else:
total += tmp
print()
print('Main-processing total count: {}'.format(total))
print('Main-processing done.')
if __name__ == '__main__':
main()
Output:
Parent process id: 387
Process ID: 391, Process Name: 3
Process ID: 390, Process Name: 2Result: 1000000
Result: 1000000
Process ID: 392, Process Name: 4
Result: 1000000
Process ID: 389, Process Name: 1
Result: 1000000
Process ID: 388, Process Name: 0
Result: 1000000
--- 1.0864899158477783 seconds ---
Main-processing total count: 5000000
Main-processing done.
You can see that the total count is 5000000
.