队列 Queue
Queue([maxsize])
创建共享的进程队列。maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。底层队列使用管道和锁定实现。另外,还需要运行支持线程以便队列中的数据传输到底层管道中。
Queue的实例q具有以下方法:
q.get( [ block [ ,timeout ] ] )
返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。
q.get_nowait( )
同q.get(False)方法。 get不到数据,不会阻塞,会直接报错
q.put(item [, block [,timeout ] ] )
将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。
q.put_nowait( )
q.qsize()
返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。
q.empty()
如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。
q.full()
如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)
队列中的数据是进程安全的,同一时间,只有一个进程能够对队列中的数据进行修改。
- 一般队列 (先进先出)
q = queue.Queue() - 先进后出队列(栈)
q = queue.LifoQueue() - 优先级队列
q = queue.PriorityQueue()
put一个元组,(优先级,数据), 优先级是一个数字,数越小,优先级越高。如果优先级一样,则按照ascii码的大小顺序来输出
import queue
q = queue.PriorityQueue()
q.put((20, 'a'))
q.put((10, 'b'))
q.put((30, 'c'))
print(q.get()) # 输出(10, 'b')
生产者消费者模型
通过JoinableQueue模块来实现
- JoinableQueue 比 Queue 多了一个 q.task_done()的计数功能,当put进一个数据,队列中默认count+1,当get取出一个数据,并处理完后,q.task_done()会将默认的count-1,直到count=0,就将q.join()的阻塞状态改为非阻塞。
- 在生产者模型中加入q.join() 已延长生产者进程的生命周期
- 生产者进程结束,通过代码最后的p1.join() 、p2.join() 来结束主进程的生命周期
- 设置消费者进程为守护进程,主进程的代码执行完毕,自动结束其生命周期,不会形成阻塞。
- 通过q.task_done()来给生产者反馈,当所有数据都被消费者处理完后,生产者中由q.join()形成的阻塞消失-->生产者进程结束-->主进程代码全部执行完毕-->守护进程(消费者进程)也结束。
from multiprocessing import JoinableQueue, Process
import time, random
def consumer(name, q):
while True:
food = q.get()
time.sleep(random.random())
print('\033[31m%s购买了%s\033[0m' % (name, food))
q.task_done()
def producer(name, food, q):
for i in range(10):
time.sleep(random.random())
print('%s生产了第%s个%s'%(name, i, food))
q.put(food)
# 当生产者生产完后,进程并没有马上结束。通过q.join()来阻塞,已延长生命周期
q.join() # 阻塞。直到一个队列中全部的数据 都被处理完毕。
q = JoinableQueue()
p1 = Process(target=producer, args=('Jobs', 'hamberger', q))
p1.start()
p2 = Process(target=producer, args=('Steve', 'cola', q))
p2.start()
c1 = Process(target=consumer, args=('kindle', q))
c2 = Process(target=consumer, args=('caps', q))
# 将消费者进程设置为守护进程。主进程代码执行完毕,就结束
c1.daemon = True
c2.daemon = True
c1.start()
c2.start()
p1.join()
p2.join() # 主进程代码在此阻塞,等待生产者进程生命周期的结束(生产者进程被阻塞,需要等待消费者进程中q.task_done()来反馈已经全部处理完队列中的数据。)













网友评论