前言
这里不讲理论知识,直接研究多进程代码应该怎么写。
进程池
一般而言会将耗时任务放入子进程中执行,一个任务对应一个子进程。
但是,当任务数量很多时(远大于计算机的CPU核数),上述方式变得不再可行。
我们需要一个管理子进程的东西,或者说是控制子进程数量的东西,那么进程池就应运而生了。
下面是一个进程池的示例
import time
from multiprocessing import Pool
from queue import Queue
def create_queue():
# 创建一个包含100个任务的队列
queue = Queue()
[queue.put(item) for item in range(1, 11)]
return queue
def worker(data):
time.sleep(3)
print(f"处理完成,{data}")
if __name__ == '__main__':
queue = create_queue()
pool = Pool(processes=4)
start_time = time.time()
while queue.qsize():
element = queue.get()
# pool.apply(worker, args=(element,)) # 提交任务后会阻塞主进程,直到任务执行完毕
pool.apply_async(worker, args=(element,)) # 提交任务后不会阻塞,继续提交其他任务,直到进程池中的进程被占满才会阻塞,待某一进程执行完毕后,进程变为空闲状态,再继续提交任务。
pool.close()
pool.join()
print(f"所有任务执行完毕,耗时: {time.time() - start_time}")
简单解释一下代码:先创建一个包含100个元素的队列(queue),接下来创建一个进程容量为4的进程池。
将队列中所有的任务都放入进程池中执行。
多进程间通信
由于每个进程之间都有相互独立的内存,a进程无法访问b进程的内存中的变量,反之亦然。
所以就会在主进程中创建一个队列,用来存放数据,达到进程间通信的目标。
import os
import time
from multiprocessing import Process, Queue
def write(queue):
"""
向队列写入数据
Args:
queue: 队列
Returns:
None
"""
print(f"写进程 - {os.getpid()}")
for value in ['a', 'b', 'c']:
print(f"放入队列, value={value}")
queue.put(value)
time.sleep(3)
def read(queue):
"""
从队列中读取数据
Args:
queue: 队列
Returns:
None
"""
print(f"读进程 - {os.getpid()}")
while True:
value = queue.get()
print(f"读取数据,value={value}")
if __name__ == '__main__':
queue = Queue()
process_write = Process(target=write, args=(queue,))
process_read = Process(target=read, args=(queue,))
process_write.start()
process_read.start()
process_write.join()
process_read.terminate()
简单解释一下程序:在主进程中创建一个队列,此队列用于存放进程间通信的数据。然后另起两个进程,一个用于向队列中写数据,另一个从队列中读数据。在启动这两个进程后,阻塞写进程(等待写入完毕),然后终止读进程(读进程为死循环,无法自动退出)。
网友评论