queue --- 一個(gè)同步的隊(duì)列類?

源代碼: Lib/queue.py


queue 模塊實(shí)現(xiàn)了多生產(chǎn)者、多消費(fèi)者隊(duì)列。這特別適用于消息必須安全地在多線程間交換的線程編程。模塊中的 Queue 類實(shí)現(xiàn)了所有所需的鎖定語義。

模塊實(shí)現(xiàn)了三種類型的隊(duì)列,它們的區(qū)別僅僅是條目取回的順序。在 FIFO 隊(duì)列中,先添加的任務(wù)先取回。在 LIFO 隊(duì)列中,最近被添加的條目先取回(操作類似一個(gè)堆棧)。優(yōu)先級(jí)隊(duì)列中,條目將保持排序( 使用 heapq 模塊 ) 并且最小值的條目第一個(gè)返回。

在內(nèi)部,這三個(gè)類型的隊(duì)列使用鎖來臨時(shí)阻塞競爭線程;然而,它們并未被設(shè)計(jì)用于線程的重入性處理。

此外,模塊實(shí)現(xiàn)了一個(gè) "簡單的" FIFO 隊(duì)列類型, SimpleQueue ,這個(gè)特殊實(shí)現(xiàn)為小功能在交換中提供額外的保障。

queue 模塊定義了下列類和異常:

class queue.Queue(maxsize=0)?

Constructor for a FIFO queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.

class queue.LifoQueue(maxsize=0)?

LIFO 隊(duì)列構(gòu)造函數(shù)。 maxsize 是個(gè)整數(shù),用于設(shè)置可以放入隊(duì)列中的項(xiàng)目數(shù)的上限。當(dāng)達(dá)到這個(gè)大小的時(shí)候,插入操作將阻塞至隊(duì)列中的項(xiàng)目被消費(fèi)掉。如果 maxsize 小于等于零,隊(duì)列尺寸為無限大。

class queue.PriorityQueue(maxsize=0)?

優(yōu)先級(jí)隊(duì)列構(gòu)造函數(shù)。 maxsize 是個(gè)整數(shù),用于設(shè)置可以放入隊(duì)列中的項(xiàng)目數(shù)的上限。當(dāng)達(dá)到這個(gè)大小的時(shí)候,插入操作將阻塞至隊(duì)列中的項(xiàng)目被消費(fèi)掉。如果 maxsize 小于等于零,隊(duì)列尺寸為無限大。

最小值先被取出( 最小值條目是由 sorted(list(entries))[0] 返回的條目)。條目的典型模式是一個(gè)以下形式的元組: (priority_number, data) 。

如果 data 元素沒有可比性,數(shù)據(jù)將被包裝在一個(gè)類中,忽略數(shù)據(jù)值,僅僅比較優(yōu)先級(jí)數(shù)字 :

from dataclasses import dataclass, field
from typing import Any

@dataclass(order=True)
class PrioritizedItem:
    priority: int
    item: Any=field(compare=False)
class queue.SimpleQueue?

無界的 FIFO 隊(duì)列構(gòu)造函數(shù)。簡單的隊(duì)列,缺少任務(wù)跟蹤等高級(jí)功能。

3.7 新版功能.

exception queue.Empty?

對(duì)空的 Queue 對(duì)象,調(diào)用非阻塞的 get() (or get_nowait()) 時(shí),引發(fā)的異常。

exception queue.Full?

對(duì)滿的 Queue 對(duì)象,調(diào)用非阻塞的 put() (or put_nowait()) 時(shí),引發(fā)的異常。

Queue對(duì)象?

隊(duì)列對(duì)象 (Queue, LifoQueue, 或者 PriorityQueue) 提供下列描述的公共方法。

Queue.qsize()?

返回隊(duì)列的大致大小。注意,qsize() > 0 不保證后續(xù)的 get() 不被阻塞,qsize() < maxsize 也不保證 put() 不被阻塞。

Queue.empty()?

如果隊(duì)列為空,返回 True ,否則返回 False 。如果 empty() 返回 True ,不保證后續(xù)調(diào)用的 put() 不被阻塞。類似的,如果 empty() 返回 False ,也不保證后續(xù)調(diào)用的 get() 不被阻塞。

Queue.full()?

如果隊(duì)列是滿的返回 True ,否則返回 False 。如果 full() 返回 True 不保證后續(xù)調(diào)用的 get() 不被阻塞。類似的,如果 full() 返回 False 也不保證后續(xù)調(diào)用的 put() 不被阻塞。

Queue.put(item, block=True, timeout=None)?

item 放入隊(duì)列。如果可選參數(shù) block 是 true 并且 timeoutNone (默認(rèn)),則在必要時(shí)阻塞至有空閑插槽可用。如果 timeout 是個(gè)正數(shù),將最多阻塞 timeout 秒,如果在這段時(shí)間沒有可用的空閑插槽,將引發(fā) Full 異常。反之 (block 是 false),如果空閑插槽立即可用,則把 item 放入隊(duì)列,否則引發(fā) Full 異常 ( 在這種情況下,timeout 將被忽略)。

Queue.put_nowait(item)?

Equivalent to put(item, block=False).

Queue.get(block=True, timeout=None)?

從隊(duì)列中移除并返回一個(gè)項(xiàng)目。如果可選參數(shù) block 是 true 并且 timeoutNone (默認(rèn)值),則在必要時(shí)阻塞至項(xiàng)目可得到。如果 timeout 是個(gè)正數(shù),將最多阻塞 timeout 秒,如果在這段時(shí)間內(nèi)項(xiàng)目不能得到,將引發(fā) Empty 異常。反之 (block 是 false) , 如果一個(gè)項(xiàng)目立即可得到,則返回一個(gè)項(xiàng)目,否則引發(fā) Empty 異常 (這種情況下,timeout 將被忽略)。

POSIX系統(tǒng)3.0之前,以及所有版本的Windows系統(tǒng)中,如果 block 是 true 并且 timeoutNone , 這個(gè)操作將進(jìn)入基礎(chǔ)鎖的不間斷等待。這意味著,沒有異常能發(fā)生,尤其是 SIGINT 將不會(huì)觸發(fā) KeyboardInterrupt 異常。

Queue.get_nowait()?

相當(dāng)于 get(False)

提供了兩個(gè)方法,用于支持跟蹤 排隊(duì)的任務(wù) 是否 被守護(hù)的消費(fèi)者線程 完整的處理。

Queue.task_done()?

表示前面排隊(duì)的任務(wù)已經(jīng)被完成。被隊(duì)列的消費(fèi)者線程使用。每個(gè) get() 被用于獲取一個(gè)任務(wù), 后續(xù)調(diào)用 task_done() 告訴隊(duì)列,該任務(wù)的處理已經(jīng)完成。

如果 join() 當(dāng)前正在阻塞,在所有條目都被處理后,將解除阻塞(意味著每個(gè) put() 進(jìn)隊(duì)列的條目的 task_done() 都被收到)。

如果被調(diào)用的次數(shù)多于放入隊(duì)列中的項(xiàng)目數(shù)量,將引發(fā) ValueError 異常 。

Queue.join()?

阻塞至隊(duì)列中所有的元素都被接收和處理完畢。

當(dāng)條目添加到隊(duì)列的時(shí)候,未完成任務(wù)的計(jì)數(shù)就會(huì)增加。每當(dāng)消費(fèi)者線程調(diào)用 task_done() 表示這個(gè)條目已經(jīng)被回收,該條目所有工作已經(jīng)完成,未完成計(jì)數(shù)就會(huì)減少。當(dāng)未完成計(jì)數(shù)降到零的時(shí)候, join() 阻塞被解除。

如何等待排隊(duì)的任務(wù)被完成的示例:

import threading, queue

q = queue.Queue()

def worker():
    while True:
        item = q.get()
        print(f'Working on {item}')
        print(f'Finished {item}')
        q.task_done()

# Turn-on the worker thread.
threading.Thread(target=worker, daemon=True).start()

# Send thirty task requests to the worker.
for item in range(30):
    q.put(item)

# Block until all tasks are done.
q.join()
print('All work completed')

SimpleQueue 對(duì)象?

SimpleQueue 對(duì)象提供下列描述的公共方法。

SimpleQueue.qsize()?

返回隊(duì)列的大致大小。注意,qsize() > 0 不保證后續(xù)的 get() 不被阻塞。

SimpleQueue.empty()?

如果隊(duì)列為空,返回 True ,否則返回 False 。如果 empty() 返回 False ,不保證后續(xù)調(diào)用的 get() 不被阻塞。

SimpleQueue.put(item, block=True, timeout=None)?

item 放入隊(duì)列。此方法永不阻塞,始終成功(除了潛在的低級(jí)錯(cuò)誤,例如內(nèi)存分配失敗)。可選參數(shù) blocktimeout 僅僅是為了保持 Queue.put() 的兼容性而提供,其值被忽略。

CPython implementation detail: This method has a C implementation which is reentrant. That is, a put() or get() call can be interrupted by another put() call in the same thread without deadlocking or corrupting internal state inside the queue. This makes it appropriate for use in destructors such as __del__ methods or weakref callbacks.

SimpleQueue.put_nowait(item)?

Equivalent to put(item, block=False), provided for compatibility with Queue.put_nowait().

SimpleQueue.get(block=True, timeout=None)?

從隊(duì)列中移除并返回一個(gè)項(xiàng)目。如果可選參數(shù) block 是 true 并且 timeoutNone (默認(rèn)值),則在必要時(shí)阻塞至項(xiàng)目可得到。如果 timeout 是個(gè)正數(shù),將最多阻塞 timeout 秒,如果在這段時(shí)間內(nèi)項(xiàng)目不能得到,將引發(fā) Empty 異常。反之 (block 是 false) , 如果一個(gè)項(xiàng)目立即可得到,則返回一個(gè)項(xiàng)目,否則引發(fā) Empty 異常 (這種情況下,timeout 將被忽略)。

SimpleQueue.get_nowait()?

相當(dāng)于 get(False) 。

參見

multiprocessing.Queue

一個(gè)用于多進(jìn)程上下文的隊(duì)列類(而不是多線程)。

collections.deque 是無界隊(duì)列的一個(gè)替代實(shí)現(xiàn),具有快速的不需要鎖并且支持索引的原子化 append()popleft() 操作。