常用python3多进程指令


1. 前言

多进程与多线程的大部分的使用方法都是类似的

所以下面会省略一些对多进程的说明,有困惑的地方可以先参考多线程部分再参考多进程

2. multiprocessing

2.1. 创建

2.1.1. 调用式

import multiprocessing
import time
import random


def worker(n):
    for i in range(3):
        print(f'The worker is {n}, the range is {i}')
        time.sleep(random.random())

    return n

if __name__ == '__main__':
    processes = []
    for n in range(3):
        processes.append(multiprocessing.Process(target=worker, args=(n,)))

2.1.2. 继承式

class myProcess(multiprocessing.Process):
    def __init__(self, n):
        super(myProcess, self).__init__()
        self.n = n

    def run(self):
        for i in range(3):
            print(f'The worker is {self.n}, the range is {i}')
            time.sleep(random.random())


if __name__ == '__main__':
    processes = []
    for n in range(3):
        processes.append(myProcess(n))
        
"""
Start processing
End processing
The worker is 1, the range is 0
The worker is 0, the range is 0
The worker is 2, the range is 0
The worker is 0, the range is 1
The worker is 1, the range is 1
The worker is 2, the range is 1
The worker is 2, the range is 2
The worker is 1, the range is 2
The worker is 0, the range is 2
"""

2.2. 启动

if __name__ == '__main__':
    ...
    print('Start processing')

    for p in processes:
        p.start()

    print('End processing')
    
"""
Start processing
End processing
The worker is 1, the range is 0
The worker is 0, the range is 0
The worker is 2, the range is 0
The worker is 2, the range is 1
The worker is 2, the range is 2
The worker is 0, the range is 1
The worker is 1, the range is 1
The worker is 1, the range is 2
The worker is 0, the range is 2
"""

注:

启动语句一定要放在 if __name__ == '__main__' 里面,否则会报错,但创建的语句不一定要放到main判断中

因为多进程的实质是多次调用脚本,不妨进行如下实验

print(__name__)

if __name__ == '__main__':
 ...
 print('Start processing')

 for p in processes:
     p.start()

 print('End processing')

"""
__main__
Start processing
End processing
__mp_main__
The worker is 1, the range is 0
__mp_main__
The worker is 0, the range is 0
__mp_main__
The worker is 2, the range is 0
The worker is 1, the range is 1
The worker is 2, the range is 1
The worker is 1, the range is 2
The worker is 0, the range is 1
The worker is 2, the range is 2
The worker is 0, the range is 2
"""

分析打印日志:

  1. 首先执行主进程,打印模块名 __main__,通过入口判断,启动子进程
  2. 子进程调用脚本,打印模块名 __mp_main__,无法通过入口判断,所以无法再次启动子进程

所以,如果不将入口放在main判断中,程序将无限创建子进程,造成系统奔溃

2.3. 守护进程

if __name__ == '__main__':
    ...
    print('Start processing')

    for p in processes:
        p.daemon = True
        p.start()

    print('End processing')
    
"""
Start processing
End processing
"""

2.4. 获取返回值

调用式,只能返回函数的返回值

for p in processes:
    p.get()

继承式,可以返回任何想要返回的值

class myProcess(multiprocessing.Process):
    def __init__(self, n):
        super(myProcess, self).__init__()
        self.n = n

    def run(self):
        for i in range(3):
            print(f'The worker is {self.n}, the range is {i}')
            time.sleep(random.random())

    def get_result(self):
        return self.n

2.5. 阻塞

if __name__ == '__main__':
    ...
    print('Start processing')

    for p in processes:
        p.start()

    for p in processes:
        p.join()

    print('End processing')
    
"""
Start processing
The worker is 1, the range is 0
The worker is 0, the range is 0
The worker is 2, the range is 0
The worker is 0, the range is 1
The worker is 2, the range is 1
The worker is 1, the range is 1
The worker is 0, the range is 2
The worker is 1, the range is 2
The worker is 2, the range is 2
End processing
"""

2.6. 锁

锁和下面提到的挂起的很大一个作用是实现进程间的通信。

通信对与多进程来说尤其重要,因为多进程无法像多线程那样,通过全局变量进行传递,所以需要通过传参实现进程间的通信

def worker(n, lock):
    # 方法一
    lock.acquire()
    for i in range(3):
        print(f'The worker is {n}, the range is {i}')
        time.sleep(random.random())

    lock.release()

    # 方法二
    with lock:
        for i in range(3):
            print(f'The worker is {n}, the range is {i}')
            time.sleep(random.random())
            
    return n

if __name__ == '__main__':
    lock = multiprocessing.Lock()   # 只能放在main判断中
    processes = []
    for n in range(3):
        processes.append(multiprocessing.Process(target=worker, args=(n, lock)))

    print('Start processing')

    for p in processes:
        p.start()

    print('End processing')

2.7. 挂起

2.7.1. 队列

队列式全双工的

def consumer(q):
    while True:
        if not q.empty():
            i = q.get()
            print(f'Consume {i}')
            time.sleep(random.random())

        print('Queue is empty, wait....')
        time.sleep(random.random())


def producer(q):
    while True:
        if not q.full():
            i = random.random()
            print(f'Product {i}')
            q.put(i)
            time.sleep(random.random())

        print('Queue is full, wait....')
        time.sleep(random.random())


if __name__ == '__main__':
    q = multiprocessing.Queue(3)    # 只能放在main判断中

    p1 = multiprocessing.Process(target=producer, args=(q, ))
    p2 = multiprocessing.Process(target=consumer, args=(q, ))

    p1.start()
    p2.start()
    
"""
Product 0.3212293933578372
Consume 0.3212293933578372
Queue is empty, wait....
Queue is full, wait....
Product 0.27350559121923923
Queue is full, wait....
Consume 0.27350559121923923
Queue is empty, wait....
Queue is empty, wait....
Queue is empty, wait....
...
"""

2.7.2. 管道

管道最原始的进程间通信,是单向的,而且功能较少,所以一般都会用队列代替通信

def consumer(pipe):
    while True:
        i = pipe.recv()
        print(f'Consume {i}')
        time.sleep(random.random())


def producer(pipe):
    while True:
        i = random.random()
        print(f'Product {i}')
        pipe.send(i)
        time.sleep(random.random())


if __name__ == '__main__':
    send, recv = multiprocessing.Pipe()  # 只能放在main判断中

    p1 = multiprocessing.Process(target=producer, args=(send, ))
    p2 = multiprocessing.Process(target=consumer, args=(recv, ))

    p1.start()
    p2.start()
    
"""
Product 0.6589189860316914
Consume 0.6589189860316914
Product 0.5579720122002056
Consume 0.5579720122002056
Product 0.27536809480824054
Consume 0.27536809480824054
...
"""

2.8. 进程池

2.8.1. 阻塞式

if __name__ == '__main__':
    ...
    pool = multiprocessing.Pool(3)

    print('Start processing')
    # 方法一
    pool.map(worker, range(3))
    
    # 方法二
    # 注意,方法一和方法二是不定价的,方法二其实已经是“单进程”了
    for i in range(3):
        pool.apply(worker, args=(i, ))

    print('End processing')
    
"""
Start processing
The worker is 0, the range is 0
The worker is 1, the range is 0
The worker is 2, the range is 0
The worker is 1, the range is 1
The worker is 2, the range is 1
The worker is 0, the range is 1
The worker is 1, the range is 2
The worker is 0, the range is 2
The worker is 2, the range is 2
End processing
"""

2.8.2. 非阻塞式

if __name__ == '__main__':
    ...
    pool = multiprocessing.Pool(3)

    print('Start processing')
    for i in range(3):
        pool.apply_async(worker, args=(i, ))
        
    # 转成阻塞式
    # 这种方式上面阻塞式中的方法一等价
    # pool.close()
    # pool.join()
    
    print('End processing')
    
"""
Start processing
End processing
"""

3. references

https://docs.python.org/zh-cn/3.8/library/multiprocessing.html


评论
  目录