Multiprocessing, Multithreading, and Coroutines in Python

Multiprocessing

Linux

On Linux you can clone the current process with os.fork(). The child returns 0, the parent receives the child PID. Use os.getpid() to inspect the current PID and os.getppid() to check the parent.

1
2
3
4
5
6
7
8
import os

print 'Process (%s) start...' % os.getpid()
pid = os.fork()
if pid == 0:
print 'I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid())
else:
print 'I (%s) just created a child process (%s).' % (os.getpid(), pid)

multiprocessing

Windows lacks fork(), so we use the cross-platform multiprocessing module:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from multiprocessing import Process
import os

# child code
def run_proc(name):
print 'Run child process %s (%s)...' % (name, os.getpid())

if __name__ == '__main__':
print 'Parent process %s.' % os.getpid()
p = Process(target=run_proc, args=('test',))
print 'Process will start.'
p.start()
p.join()
print 'Process end.'

Process pools

Pool keeps a fixed number of worker processes:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from multiprocessing import Pool
import os, time, random

def long_time_task(name):
print 'Run task %s (%s)...' % (name, os.getpid())
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print 'Task %s runs %0.2f seconds.' % (name, end - start)

if __name__ == '__main__':
print 'Parent process %s.' % os.getpid()
p = Pool() # defaults to CPU count
for i in range(5):
p.apply_async(long_time_task, args=(i,))
print 'Waiting for all subprocesses done...'
p.close()
p.join()
print 'All subprocesses done.'

Key APIs:

1
2
3
4
5
6
7
p = Pool(size)
p.map(func, iterable)
p.map_async(func, iterable)
p.apply(func, args=(...))
p.apply_async(func, args=(...))
p.close()
p.join()

Interprocess communication

Pipe connects two processes; Queue can fan out to many.

1
2
3
4
5
6
7
8
9
10
11
12
from multiprocessing import Process, Pipe

def f(conn):
conn.send([42, None, 'hello'])
conn.close()

if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print parent_conn.recv() # => [42, None, 'hello']
p.join()

Queues look like a thread-safe queue:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from multiprocessing import Process, Queue
import os, time, random

def write(q):
for value in ['A', 'B', 'C']:
print 'Put %s to queue...' % value
q.put(value)
time.sleep(random.random())

def read(q):
while True:
value = q.get(True)
print 'Get %s from queue.' % value

def main():
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
pw.start()
pr.start()
pw.join()
pr.terminate()

Managers expose shared objects (namespace, list, dict, etc.) across processes:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from multiprocessing import Process, Manager

def f(d, l):
d[1] = '1'
d[2] = '2'
l.append('hello')

if __name__ == '__main__':
with Manager() as manager:
d = manager.dict()
l = manager.list()
p = Process(target=f, args=(d, l))
p.start()
p.join()
print d
print l

subprocess

Use subprocess to spawn non-Python programs. subprocess.call() runs a command and waits; Popen gives more control over stdin/stdout/stderr.

1
2
3
4
5
import subprocess
subprocess.call(['ls', '-l'])

p = subprocess.Popen(['grep', 'python'], stdin=subprocess.PIPE)
p.communicate('python subprocess module\n')

Multithreading

Creating threads

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import time, threading

def loop():
print 'thread %s is running...' % threading.current_thread().name
for n in range(5):
print 'thread %s >>> %s' % (threading.current_thread().name, n)
time.sleep(1)
print 'thread %s ended.' % threading.current_thread().name

print 'thread %s is running...' % threading.current_thread().name

thread = threading.Thread(target=loop, name='LoopThread')
thread.start()
thread.join()
print 'thread %s ended.' % threading.current_thread().name

Locks

Protect shared state with Lock:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import threading
balance = 0
lock = threading.Lock()

def change_it(n):
global balance
balance = balance + n
balance = balance - n

def run_thread(n):
for i in range(100000):
lock.acquire()
try:
change_it(n)
finally:
lock.release()

RLock (reentrant lock) allows the same thread to acquire multiple times. Semaphore limits concurrency—for example, cap concurrent downloads.

Producer-consumer with Queue

Threads can pass messages via Queue:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import threading, Queue, time

queue = Queue.Queue()

class Producer(threading.Thread):
def run(self):
for i in range(5):
queue.put(i)
print 'Produced %s' % i
time.sleep(1)

class Consumer(threading.Thread):
def run(self):
while True:
item = queue.get()
print 'Consumed %s' % item
queue.task_done()

Producer().start()
Consumer().start()

threading.local

Thread-local storage keeps per-thread data:

1
2
3
4
5
6
7
8
9
10
11
12
13
import threading
local_school = threading.local()

def process_student():
std = local_school.student
print 'Hello, %s (in %s)' % (std, threading.current_thread().name)

def process_thread(name):
local_school.student = name
process_student()

threading.Thread(target=process_thread, args=('Alice',), name='Thread-A').start()
threading.Thread(target=process_thread, args=('Bob',), name='Thread-B').start()

Coroutines

Generators

Generators can cooperate by yielding values back to the caller:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import time

def consumer():
r = ''
while True:
n = yield r
if not n:
return
print('[CONSUMER] Consuming %s...' % n)
time.sleep(1)
r = '200 OK'

def produce(c):
c.next()
n = 0
while n < 5:
n += 1
print('[PRODUCER] Producing %s...' % n)
r = c.send(n)
print('[PRODUCER] Consumer return: %s' % r)
c.close()

if __name__ == '__main__':
c = consumer()
produce(c)

gevent

gevent provides full-featured cooperatively scheduled coroutines.

1
2
3
4
5
6
7
8
9
10
11
12
import gevent
from gevent import monkey
monkey.patch_all()

def task(n):
print('Task %s starting' % n)
gevent.sleep(1)
print('Task %s done' % n)

g1 = gevent.spawn(task, 1)
g2 = gevent.spawn(task, 2)
gevent.joinall([g1, g2])

gevent.pool.Pool limits concurrency:

1
2
3
4
5
6
from gevent.pool import Pool

pool = Pool(5)
for url in urls:
pool.spawn(fetch, url)
pool.join()

Yield control explicitly with gevent.sleep(seconds).

References:

  1. 多进程
  2. Python多进程并发(multiprocessing)
  3. Python标准库10 多进程初步 (multiprocessing包)
  4. python多线程模块multiprocessing的进程间通信
  5. Python线程指南
  6. 生成器
  7. Gevent Introduction
  8. python 进程池2 - Pool相关函数