Python and the Multiprocessing library
Introduction
Sometimes we need to run Python code that is easy to parallelize. In this case, Python’s multiprocessing library lets us create processes that run in parallel and share data between them. This is a simple way to take advantage of Multivac resources on a single machine to run parallel code.
Example 1 Pools
A Pool is a collection of processes that can be used to execute functions in parallel. In this example, we create a Pool with 4 processes and use the map method to apply a function to a list of numbers.
A map applies a function to every item in a list and returns a new list with the results. In this case, it computes the square of each number in the list.
We use the Pool class from multiprocessing to execute a function in parallel with different arguments.
multiprocessing.Poolimport time
import math
from multiprocessing import Pool
if __name__ == '__main__':
start = time.perf_counter()
results1 = [math.factorial(x) for x in range(12000)]
end = time.perf_counter()
print(end-start)
start = time.perf_counter()
with Pool(5) as p:
results2 = p.map(math.factorial, list(range(12000)))
end = time.perf_counter()
print(end - start)
print(all(x == y for x, y in zip(results1, results2)))
The advantage of using a Pool is that processes are created and managed automatically, which makes it easier to run code in parallel without manually handling process creation and lifecycle.
Example 2 Process
A Process is an individual process running in parallel. In this example, a process is created for each number in a list, and each process computes the square of that number.
multiprocessing.Processimport time
import math
from multiprocessing import Process
def watchdog(interval):
while True:
print(f"[{time.strftime('%X')}] heartbeat")
time.sleep(interval)
if __name__ == "__main__":
p = Process(target=watchdog, args=(5,), daemon=True)
p.start()
result = [math.factorial(x) for x in range(12000)]
print("Main work done, exiting.")
The advantage of using processes is that each process runs in its own memory space, which helps avoid concurrency conflicts. However, process management can be more complex than thread management, because each process has its own memory space and does not directly share data with others. To share data between processes, you can use queues, pipes, or other inter-process communication mechanisms.
Example 3 Queue
A Queue is a data structure that allows processes to share data safely. In this example, we create a queue to exchange data between two processes. One process computes the square of a number and sends the result through the queue to another process that prints it.
A useful analogy is a supermarket checkout queue, where processes are customers placing and taking items from the queue. In this case, one process puts the computed result in the queue and the other removes it to print it.
multiprocessing.Queueimport time
from multiprocessing import Process, Queue
def producer(q):
for i in range(10):
q.put(i)
print(f'Produced {i}')
time.sleep(0.2)
q.put(None)
def consumer(q):
while True:
item = q.get()
if item is None:
break
print(f"Consumed {item}")
time.sleep(1)
if __name__ == "__main__":
q = Queue()
p = Process(target=producer, args=(q,))
c = Process(target=consumer, args=(q,))
p.start()
c.start()
p.join()
c.join()
The advantage of queues is that they provide safe and simple data sharing between processes, because synchronization and access are handled automatically. However, queues can be slower than other inter-process communication mechanisms, such as pipes, because they add synchronization overhead.
Example 4 Pipes
A Pipe is a communication channel that allows safe data sharing between processes. In this example, we create a pipe to share data between two processes. One process computes the square of a number and sends the result through the pipe to another process that prints it.
multiprocessing.Pipesfrom multiprocessing import Process, Pipe
def worker(conn):
for i in range(5):
conn.send(i * i)
conn.close()
if __name__ == "__main__":
parent_conn, child_conn = Pipe() # use Pipe(duplex=True) for two-way communication
p = Process(target=worker, args=(child_conn,))
p.start()
for _ in range(5):
print("Received:", parent_conn.recv())
p.join()
The advantage of pipes is that they provide safe and simple communication between processes. They can also be faster than queues because they usually involve less synchronization overhead. However, pipes only support communication between two processes, while queues can be used across multiple processes.
Example 5 Locks
A Lock is a synchronization mechanism that controls access to a shared resource between processes. In this example, a lock controls access to a shared variable between two processes. One process increments the variable and the other decrements it, but only one process can access the variable at a time thanks to the lock.
multiprocessing.Locksfrom multiprocessing import Process, Lock, Value
def worker(lock, counter):
for _ in range(100000):
with lock: # try without lock
counter.value += 1
if __name__ == "__main__":
lock = Lock()
counter = Value('i', 0) # shared, synchronized integer
procs = [Process(target=worker, args=(lock, counter)) for _ in range(4)]
for p in procs:
p.start()
for p in procs:
p.join()
print(counter.value)
The advantage of locks is that they safely control access to shared resources and help avoid concurrency issues. However, locks can introduce performance costs and blocking problems if they are not used correctly, because processes may wait indefinitely for a lock that is never released. To avoid this, it is important to use locks carefully and ensure they are always released properly.
We also want to avoid a race condition, where a program’s result depends on process execution order, potentially causing unexpected results or errors. For example, two processes may try to increment the same shared variable at the same time. If both read the same value simultaneously and then increment it, the final result may be wrong. Using locks helps prevent this kind of issue by allowing only one process to access the shared variable at a time.
Important
Be careful to avoid a deadlock, which is a situation where two or more processes are blocked while waiting for each other to release a resource required to continue. To prevent this, use locks consistently and ensure they are released correctly after use.
Visual example: deadlock case and non-deadlock case.
Visual lock comparison
%%{init: {'themeVariables': {'fontSize': '11px'}, 'flowchart': {'nodeSpacing': 10, 'rankSpacing': 10, 'diagramPadding': 2}} }%%
flowchart TB
subgraph DL[With deadlock]
direction TB
A[P1: L1] --> B[waits for L2]
C[P2: L2] --> D[waits for L1]
B -. deadlock .-> C
D -. deadlock .-> A
end
classDef hold fill:#d8f3dc,stroke:#2d6a4f,stroke-width:1px,color:#1b4332;
classDef wait fill:#fff3bf,stroke:#b08900,stroke-width:1px,color:#5f3f00;
class A,C hold;
class B,D wait;
linkStyle 2,3 stroke:#c92a2a,stroke-width:1.5px,stroke-dasharray:4 3;
|
%%{init: {'themeVariables': {'fontSize': '11px'}, 'flowchart': {'nodeSpacing': 10, 'rankSpacing': 10, 'diagramPadding': 2}} }%%
flowchart TB
subgraph OK[Without deadlock]
direction TB
A1[P1: L1] --> A2[P1: L2]
A2 --> A3[releases]
A3 --> B2[P2: L1]
B1[waits for L1] --> B2
end
classDef ok fill:#d3f9d8,stroke:#2b8a3e,stroke-width:1px,color:#1b5e20;
classDef wait fill:#fff3bf,stroke:#b08900,stroke-width:1px,color:#5f3f00;
class A1,A2,A3,B2 ok;
class B1 wait;
linkStyle 2 stroke:#2b8a3e,stroke-width:1.5px;
|
Example 6 Semaphores
A Semaphore is a synchronization mechanism that controls access to a shared resource between processes. Unlike locks, semaphores allow multiple processes to access the same resource at the same time up to a defined limit. In this example, a semaphore with a limit of 2 is created, meaning only two processes can access the shared resource simultaneously. Each process tries to access the resource, and if the semaphore limit has been reached, the process blocks until another process releases the semaphore.
multiprocessing.Semaphoresimport time
import random
from multiprocessing import Process, Semaphore
def worker(sem, idx):
print(f"Worker {idx} waiting for semaphore…")
with sem:
print(f"→ Worker {idx} ENTERED critical section ({sem})")
# simulate some work
time.sleep(random.uniform(1, 3))
print(f"← Worker {idx} LEAVING critical section ({sem})")
print(f"← Worker {idx} LEFT critical section ({sem})")
if __name__ == "__main__":
# only 2 workers may hold the semaphore at once
sem = Semaphore(2)
procs = [Process(target=worker, args=(sem, i)) for i in range(6)]
for p in procs:
p.start()
for p in procs:
p.join()
The advantage of semaphores is that they provide more flexible access control than locks for shared resources, because they allow multiple concurrent holders up to a defined limit. However, semaphores can still introduce performance issues and blocking problems if used incorrectly, since processes may remain blocked waiting for a semaphore that is not released. To avoid this, semaphores should be used carefully and always released correctly after use.