itertools --- 為高效循環(huán)而創(chuàng)建迭代器的函數(shù)?


本模塊實(shí)現(xiàn)一系列 iterator ,這些迭代器受到APL,Haskell和SML的啟發(fā)。為了適用于Python,它們都被重新寫過。

本模塊標(biāo)準(zhǔn)化了一個(gè)快速、高效利用內(nèi)存的核心工具集,這些工具本身或組合都很有用。它們一起形成了“迭代器代數(shù)”,這使得在純Python中有可能創(chuàng)建簡潔又高效的專用工具。

例如,SML有一個(gè)制表工具: tabulate(f),它可產(chǎn)生一個(gè)序列 f(0), f(1), ...。在Python中可以組合 map()count() 實(shí)現(xiàn): map(f, count())。

這些內(nèi)置工具同時(shí)也能很好地與 operator 模塊中的高效函數(shù)配合使用。例如,我們可以將兩個(gè)向量的點(diǎn)積映射到乘法運(yùn)算符: sum(map(operator.mul, vector1, vector2))

無窮迭代器:

迭代器

實(shí)參

結(jié)果

示例

count()

start, [step]

start, start+step, start+2*step, ...

count(10) --> 10 11 12 13 14 ...

cycle()

p

p0, p1, ... plast, p0, p1, ...

cycle('ABCD') --> A B C D A B C D ...

repeat()

elem [,n]

elem, elem, elem, ... 重復(fù)無限次或n次

repeat(10, 3) --> 10 10 10

根據(jù)最短輸入序列長度停止的迭代器:

迭代器

實(shí)參

結(jié)果

示例

accumulate()

p [,func]

p0, p0+p1, p0+p1+p2, ...

accumulate([1,2,3,4,5]) --> 1 3 6 10 15

chain()

p, q, ...

p0, p1, ... plast, q0, q1, ...

chain('ABC', 'DEF') --> A B C D E F

chain.from_iterable()

iterable -- 可迭代對象

p0, p1, ... plast, q0, q1, ...

chain.from_iterable(['ABC', 'DEF']) --> A B C D E F

compress()

data, selectors

(d[0] if s[0]), (d[1] if s[1]), ...

compress('ABCDEF', [1,0,1,0,1,1]) --> A C E F

dropwhile()

pred, seq

seq[n], seq[n+1], ... 從pred首次真值測試失敗開始

dropwhile(lambda x: x<5, [1,4,6,4,1]) --> 6 4 1

filterfalse()

pred, seq

seq中pred(x)為假值的元素,x是seq中的元素。

filterfalse(lambda x: x%2, range(10)) --> 0 2 4 6 8

groupby()

iterable[, key]

根據(jù)key(v)值分組的迭代器

islice()

seq, [start,] stop [, step]

seq[start:stop:step]中的元素

islice('ABCDEFG', 2, None) --> C D E F G

pairwise()

iterable -- 可迭代對象

(p[0], p[1]), (p[1], p[2])

pairwise('ABCDEFG') --> AB BC CD DE EF FG

starmap()

func, seq

func(*seq[0]), func(*seq[1]), ...

starmap(pow, [(2,5), (3,2), (10,3)]) --> 32 9 1000

takewhile()

pred, seq

seq[0], seq[1], ..., 直到pred真值測試失敗

takewhile(lambda x: x<5, [1,4,6,4,1]) --> 1 4

tee()

it, n

it1, it2, ... itn 將一個(gè)迭代器拆分為n個(gè)迭代器

zip_longest()

p, q, ...

(p[0], q[0]), (p[1], q[1]), ...

zip_longest('ABCD', 'xy', fillvalue='-') --> Ax By C- D-

排列組合迭代器:

迭代器

實(shí)參

結(jié)果

product()

p, q, ... [repeat=1]

笛卡爾積,相當(dāng)于嵌套的for循環(huán)

permutations()

p[, r]

長度r元組,所有可能的排列,無重復(fù)元素

combinations()

p, r

長度r元組,有序,無重復(fù)元素

combinations_with_replacement()

p, r

長度r元組,有序,元素可重復(fù)

例子

結(jié)果

product('ABCD', repeat=2)

AA AB AC AD BA BB BC BD CA CB CC CD DA DB DC DD

permutations('ABCD', 2)

AB AC AD BA BC BD CA CB CD DA DB DC

combinations('ABCD', 2)

AB AC AD BC BD CD

combinations_with_replacement('ABCD', 2)

AA AB AC AD BB BC BD CC CD DD

Itertool函數(shù)?

下列模塊函數(shù)均創(chuàng)建并返回迭代器。有些迭代器不限制輸出流長度,所以它們只應(yīng)在能截?cái)噍敵隽鞯暮瘮?shù)或循環(huán)中使用。

itertools.accumulate(iterable[, func, *, initial=None])?

創(chuàng)建一個(gè)迭代器,返回累積匯總值或其他雙目運(yùn)算函數(shù)的累積結(jié)果值(通過可選的 func 參數(shù)指定)。

如果提供了 func,它應(yīng)當(dāng)為帶有兩個(gè)參數(shù)的函數(shù)。 輸入 iterable 的元素可以是能被 func 接受為參數(shù)的任意類型。 (例如,對于默認(rèn)的加法運(yùn)算,元素可以是任何可相加的類型包括 DecimalFraction。)

通常,輸出的元素?cái)?shù)量與輸入的可迭代對象是一致的。 但是,如果提供了關(guān)鍵字參數(shù) initial,則累加會以 initial 值開始,這樣輸出就比輸入的可迭代對象多一個(gè)元素。

大致相當(dāng)于:

def accumulate(iterable, func=operator.add, *, initial=None):
    'Return running totals'
    # accumulate([1,2,3,4,5]) --> 1 3 6 10 15
    # accumulate([1,2,3,4,5], initial=100) --> 100 101 103 106 110 115
    # accumulate([1,2,3,4,5], operator.mul) --> 1 2 6 24 120
    it = iter(iterable)
    total = initial
    if initial is None:
        try:
            total = next(it)
        except StopIteration:
            return
    yield total
    for element in it:
        total = func(total, element)
        yield total

func 參數(shù)有幾種用法。它可以被設(shè)為 min() 最終得到一個(gè)最小值,或者設(shè)為 max() 最終得到一個(gè)最大值,或設(shè)為 operator.mul() 最終得到一個(gè)乘積。攤銷表可通過累加利息和支付款項(xiàng)得到。給iterable設(shè)置初始值并只將參數(shù) func 設(shè)為累加總數(shù)可以對一階 遞歸關(guān)系 建模。

>>>
>>> data = [3, 4, 6, 2, 1, 9, 0, 7, 5, 8]
>>> list(accumulate(data, operator.mul))     # running product
[3, 12, 72, 144, 144, 1296, 0, 0, 0, 0]
>>> list(accumulate(data, max))              # running maximum
[3, 4, 6, 6, 6, 9, 9, 9, 9, 9]

# Amortize a 5% loan of 1000 with 4 annual payments of 90
>>> cashflows = [1000, -90, -90, -90, -90]
>>> list(accumulate(cashflows, lambda bal, pmt: bal*1.05 + pmt))
[1000, 960.0, 918.0, 873.9000000000001, 827.5950000000001]

# Chaotic recurrence relation https://en.wikipedia.org/wiki/Logistic_map
>>> logistic_map = lambda x, _:  r * x * (1 - x)
>>> r = 3.8
>>> x0 = 0.4
>>> inputs = repeat(x0, 36)     # only the initial value is used
>>> [format(x, '.2f') for x in accumulate(inputs, logistic_map)]
['0.40', '0.91', '0.30', '0.81', '0.60', '0.92', '0.29', '0.79', '0.63',
 '0.88', '0.39', '0.90', '0.33', '0.84', '0.52', '0.95', '0.18', '0.57',
 '0.93', '0.25', '0.71', '0.79', '0.63', '0.88', '0.39', '0.91', '0.32',
 '0.83', '0.54', '0.95', '0.20', '0.60', '0.91', '0.30', '0.80', '0.60']

參考一個(gè)類似函數(shù) functools.reduce() ,它只返回一個(gè)最終累積值。

3.2 新版功能.

在 3.3 版更改: 增加可選參數(shù) func 。

在 3.8 版更改: 添加了可選的 initial 形參。

itertools.chain(*iterables)?

創(chuàng)建一個(gè)迭代器,它首先返回第一個(gè)可迭代對象中所有元素,接著返回下一個(gè)可迭代對象中所有元素,直到耗盡所有可迭代對象中的元素??蓪⒍鄠€(gè)序列處理為單個(gè)序列。大致相當(dāng)于:

def chain(*iterables):
    # chain('ABC', 'DEF') --> A B C D E F
    for it in iterables:
        for element in it:
            yield element
classmethod chain.from_iterable(iterable)?

構(gòu)建類似 chain() 迭代器的另一個(gè)選擇。從一個(gè)單獨(dú)的可迭代參數(shù)中得到鏈?zhǔn)捷斎耄搮?shù)是延遲計(jì)算的。大致相當(dāng)于:

def from_iterable(iterables):
    # chain.from_iterable(['ABC', 'DEF']) --> A B C D E F
    for it in iterables:
        for element in it:
            yield element
itertools.combinations(iterable, r)?

返回由輸入 iterable 中元素組成長度為 r 的子序列。

組合元組會以字典順序根據(jù)所輸入 iterable 的順序發(fā)出。 因此,如果所輸入 iterable 是已排序的,組合元組也將按已排序的順序生成。

即使元素的值相同,不同位置的元素也被認(rèn)為是不同的。如果元素各自不同,那么每個(gè)組合中沒有重復(fù)元素。

大致相當(dāng)于:

def combinations(iterable, r):
    # combinations('ABCD', 2) --> AB AC AD BC BD CD
    # combinations(range(4), 3) --> 012 013 023 123
    pool = tuple(iterable)
    n = len(pool)
    if r > n:
        return
    indices = list(range(r))
    yield tuple(pool[i] for i in indices)
    while True:
        for i in reversed(range(r)):
            if indices[i] != i + n - r:
                break
        else:
            return
        indices[i] += 1
        for j in range(i+1, r):
            indices[j] = indices[j-1] + 1
        yield tuple(pool[i] for i in indices)

combinations() 的代碼可被改寫為 permutations() 過濾后的子序列,(相對于元素在輸入中的位置)元素不是有序的。

def combinations(iterable, r):
    pool = tuple(iterable)
    n = len(pool)
    for indices in permutations(range(n), r):
        if sorted(indices) == list(indices):
            yield tuple(pool[i] for i in indices)

當(dāng) 0 <= r <= n 時(shí),返回項(xiàng)的個(gè)數(shù)是 n! / r! / (n-r)!;當(dāng) r > n 時(shí),返回項(xiàng)個(gè)數(shù)為0。

itertools.combinations_with_replacement(iterable, r)?

返回由輸入 iterable 中元素組成的長度為 r 的子序列,允許每個(gè)元素可重復(fù)出現(xiàn)。

組合元組會以字典順序根據(jù)所輸入 iterable 的順序發(fā)出。 因此,如果所輸入 iterable 是已排序的,組合元組也將按已排序的順序生成。

不同位置的元素是不同的,即使它們的值相同。因此如果輸入中的元素都是不同的話,返回的組合中元素也都會不同。

大致相當(dāng)于:

def combinations_with_replacement(iterable, r):
    # combinations_with_replacement('ABC', 2) --> AA AB AC BB BC CC
    pool = tuple(iterable)
    n = len(pool)
    if not n and r:
        return
    indices = [0] * r
    yield tuple(pool[i] for i in indices)
    while True:
        for i in reversed(range(r)):
            if indices[i] != n - 1:
                break
        else:
            return
        indices[i:] = [indices[i] + 1] * (r - i)
        yield tuple(pool[i] for i in indices)

combinations_with_replacement() 的代碼可被改寫為 production() 過濾后的子序列,(相對于元素在輸入中的位置)元素不是有序的。

def combinations_with_replacement(iterable, r):
    pool = tuple(iterable)
    n = len(pool)
    for indices in product(range(n), repeat=r):
        if sorted(indices) == list(indices):
            yield tuple(pool[i] for i in indices)

當(dāng) n > 0 時(shí),返回項(xiàng)個(gè)數(shù)為 (n+r-1)! / r! / (n-1)!.

3.1 新版功能.

itertools.compress(data, selectors)?

創(chuàng)建一個(gè)迭代器,它返回 data 中經(jīng) selectors 真值測試為 True 的元素。迭代器在兩者較短的長度處停止。大致相當(dāng)于:

def compress(data, selectors):
    # compress('ABCDEF', [1,0,1,0,1,1]) --> A C E F
    return (d for d, s in zip(data, selectors) if s)

3.1 新版功能.

itertools.count(start=0, step=1)?

創(chuàng)建一個(gè)迭代器,它從 start 值開始,返回均勻間隔的值。常用于 map() 中的實(shí)參來生成連續(xù)的數(shù)據(jù)點(diǎn)。此外,還用于 zip() 來添加序列號。大致相當(dāng)于:

def count(start=0, step=1):
    # count(10) --> 10 11 12 13 14 ...
    # count(2.5, 0.5) -> 2.5 3.0 3.5 ...
    n = start
    while True:
        yield n
        n += step

當(dāng)對浮點(diǎn)數(shù)計(jì)數(shù)時(shí),替換為乘法代碼有時(shí)精度會更好,例如: (start + step * i for i in count()) 。

在 3.1 版更改: 增加參數(shù) step ,允許非整型。

itertools.cycle(iterable)?

創(chuàng)建一個(gè)迭代器,返回 iterable 中所有元素并保存一個(gè)副本。當(dāng)取完 iterable 中所有元素,返回副本中的所有元素。無限重復(fù)。大致相當(dāng)于:

def cycle(iterable):
    # cycle('ABCD') --> A B C D A B C D A B C D ...
    saved = []
    for element in iterable:
        yield element
        saved.append(element)
    while saved:
        for element in saved:
              yield element

注意,該函數(shù)可能需要相當(dāng)大的輔助空間(取決于 iterable 的長度)。

itertools.dropwhile(predicate, iterable)?

創(chuàng)建一個(gè)迭代器,如果 predicate 為true,迭代器丟棄這些元素,然后返回其他元素。注意,迭代器在 predicate 首次為false之前不會產(chǎn)生任何輸出,所以可能需要一定長度的啟動(dòng)時(shí)間。大致相當(dāng)于:

def dropwhile(predicate, iterable):
    # dropwhile(lambda x: x<5, [1,4,6,4,1]) --> 6 4 1
    iterable = iter(iterable)
    for x in iterable:
        if not predicate(x):
            yield x
            break
    for x in iterable:
        yield x
itertools.filterfalse(predicate, iterable)?

創(chuàng)建一個(gè)迭代器,只返回 iterablepredicateFalse 的元素。如果 predicateNone,返回真值測試為false的元素。大致相當(dāng)于:

def filterfalse(predicate, iterable):
    # filterfalse(lambda x: x%2, range(10)) --> 0 2 4 6 8
    if predicate is None:
        predicate = bool
    for x in iterable:
        if not predicate(x):
            yield x
itertools.groupby(iterable, key=None)?

創(chuàng)建一個(gè)迭代器,返回 iterable 中連續(xù)的鍵和組。key 是一個(gè)計(jì)算元素鍵值函數(shù)。如果未指定或?yàn)?None,key 缺省為恒等函數(shù)(identity function),返回元素不變。一般來說,iterable 需用同一個(gè)鍵值函數(shù)預(yù)先排序。

groupby() 操作類似于Unix中的 uniq。當(dāng)每次 key 函數(shù)產(chǎn)生的鍵值改變時(shí),迭代器會分組或生成一個(gè)新組(這就是為什么通常需要使用同一個(gè)鍵值函數(shù)先對數(shù)據(jù)進(jìn)行排序)。這種行為與SQL的GROUP BY操作不同,SQL的操作會忽略輸入的順序?qū)⑾嗤I值的元素分在同組中。

返回的組本身也是一個(gè)迭代器,它與 groupby() 共享底層的可迭代對象。因?yàn)樵词枪蚕淼?,?dāng) groupby() 對象向后迭代時(shí),前一個(gè)組將消失。因此如果稍后還需要返回結(jié)果,可保存為列表:

groups = []
uniquekeys = []
data = sorted(data, key=keyfunc)
for k, g in groupby(data, keyfunc):
    groups.append(list(g))      # Store group iterator as a list
    uniquekeys.append(k)

groupby() 大致相當(dāng)于:

class groupby:
    # [k for k, g in groupby('AAAABBBCCDAABBB')] --> A B C D A B
    # [list(g) for k, g in groupby('AAAABBBCCD')] --> AAAA BBB CC D
    def __init__(self, iterable, key=None):
        if key is None:
            key = lambda x: x
        self.keyfunc = key
        self.it = iter(iterable)
        self.tgtkey = self.currkey = self.currvalue = object()
    def __iter__(self):
        return self
    def __next__(self):
        self.id = object()
        while self.currkey == self.tgtkey:
            self.currvalue = next(self.it)    # Exit on StopIteration
            self.currkey = self.keyfunc(self.currvalue)
        self.tgtkey = self.currkey
        return (self.currkey, self._grouper(self.tgtkey, self.id))
    def _grouper(self, tgtkey, id):
        while self.id is id and self.currkey == tgtkey:
            yield self.currvalue
            try:
                self.currvalue = next(self.it)
            except StopIteration:
                return
            self.currkey = self.keyfunc(self.currvalue)
itertools.islice(iterable, stop)?
itertools.islice(iterable, start, stop[, step])

創(chuàng)建一個(gè)迭代器,返回從 iterable 里選中的元素。如果 start 不是0,跳過 iterable 中的元素,直到到達(dá) start 這個(gè)位置。之后迭代器連續(xù)返回元素,除非 step 設(shè)置的值很高導(dǎo)致被跳過。如果 stopNone,迭代器耗光為止;否則,在指定的位置停止。與普通的切片不同,islice() 不支持將 start , stop ,或 step 設(shè)為負(fù)值。可用來從內(nèi)部數(shù)據(jù)結(jié)構(gòu)被壓平的數(shù)據(jù)中提取相關(guān)字段(例如一個(gè)多行報(bào)告,它的名稱字段出現(xiàn)在每三行上)。大致相當(dāng)于:

def islice(iterable, *args):
    # islice('ABCDEFG', 2) --> A B
    # islice('ABCDEFG', 2, 4) --> C D
    # islice('ABCDEFG', 2, None) --> C D E F G
    # islice('ABCDEFG', 0, None, 2) --> A C E G
    s = slice(*args)
    start, stop, step = s.start or 0, s.stop or sys.maxsize, s.step or 1
    it = iter(range(start, stop, step))
    try:
        nexti = next(it)
    except StopIteration:
        # Consume *iterable* up to the *start* position.
        for i, element in zip(range(start), iterable):
            pass
        return
    try:
        for i, element in enumerate(iterable):
            if i == nexti:
                yield element
                nexti = next(it)
    except StopIteration:
        # Consume to *stop*.
        for i, element in zip(range(i + 1, stop), iterable):
            pass

如果 startNone,迭代從0開始。如果 stepNone ,步長缺省為1。

itertools.pairwise(iterable)?

返回從輸入 iterable 中獲取的連續(xù)重疊對。

輸出迭代器中 2 元組的數(shù)量將比輸入的數(shù)量少一個(gè)。 如果輸入可迭代對象中少于兩個(gè)值則它將為空。

大致相當(dāng)于:

def pairwise(iterable):
    # pairwise('ABCDEFG') --> AB BC CD DE EF FG
    a, b = tee(iterable)
    next(b, None)
    return zip(a, b)

3.10 新版功能.

itertools.permutations(iterable, r=None)?

連續(xù)返回由 iterable 元素生成長度為 r 的排列。

如果 r 未指定或?yàn)?None ,r 默認(rèn)設(shè)置為 iterable 的長度,這種情況下,生成所有全長排列。

排列元組會以字典順序根據(jù)所輸入 iterable 的順序發(fā)出。 因此,如果所輸入 iterable 是已排序的,組合元組也將按已排序的順序生成。

即使元素的值相同,不同位置的元素也被認(rèn)為是不同的。如果元素值都不同,每個(gè)排列中的元素值不會重復(fù)。

大致相當(dāng)于:

def permutations(iterable, r=None):
    # permutations('ABCD', 2) --> AB AC AD BA BC BD CA CB CD DA DB DC
    # permutations(range(3)) --> 012 021 102 120 201 210
    pool = tuple(iterable)
    n = len(pool)
    r = n if r is None else r
    if r > n:
        return
    indices = list(range(n))
    cycles = list(range(n, n-r, -1))
    yield tuple(pool[i] for i in indices[:r])
    while n:
        for i in reversed(range(r)):
            cycles[i] -= 1
            if cycles[i] == 0:
                indices[i:] = indices[i+1:] + indices[i:i+1]
                cycles[i] = n - i
            else:
                j = cycles[i]
                indices[i], indices[-j] = indices[-j], indices[i]
                yield tuple(pool[i] for i in indices[:r])
                break
        else:
            return

permutations() 的代碼也可被改寫為 product() 的子序列,只要將含有重復(fù)元素(來自輸入中同一位置的)的項(xiàng)排除。

def permutations(iterable, r=None):
    pool = tuple(iterable)
    n = len(pool)
    r = n if r is None else r
    for indices in product(range(n), repeat=r):
        if len(set(indices)) == r:
            yield tuple(pool[i] for i in indices)

當(dāng) 0 <= r <= n ,返回項(xiàng)個(gè)數(shù)為 n! / (n-r)! ;當(dāng) r > n ,返回項(xiàng)個(gè)數(shù)為0。

itertools.product(*iterables, repeat=1)?

可迭代對象輸入的笛卡兒積。

大致相當(dāng)于生成器表達(dá)式中的嵌套循環(huán)。例如, product(A, B)((x,y) for x in A for y in B) 返回結(jié)果一樣。

嵌套循環(huán)像里程表那樣循環(huán)變動(dòng),每次迭代時(shí)將最右側(cè)的元素向后迭代。這種模式形成了一種字典序,因此如果輸入的可迭代對象是已排序的,笛卡爾積元組依次序發(fā)出。

要計(jì)算可迭代對象自身的笛卡爾積,將可選參數(shù) repeat 設(shè)定為要重復(fù)的次數(shù)。例如,product(A, repeat=4)product(A, A, A, A) 是一樣的。

該函數(shù)大致相當(dāng)于下面的代碼,只不過實(shí)際實(shí)現(xiàn)方案不會在內(nèi)存中創(chuàng)建中間結(jié)果。

def product(*args, repeat=1):
    # product('ABCD', 'xy') --> Ax Ay Bx By Cx Cy Dx Dy
    # product(range(2), repeat=3) --> 000 001 010 011 100 101 110 111
    pools = [tuple(pool) for pool in args] * repeat
    result = [[]]
    for pool in pools:
        result = [x+[y] for x in result for y in pool]
    for prod in result:
        yield tuple(prod)

product() 運(yùn)行之前,它會完全耗盡輸入的可迭代對象,在內(nèi)存中保留值的臨時(shí)池以生成結(jié)果積。 相應(yīng)地,它只適用于有限的輸入。

itertools.repeat(object[, times])?

創(chuàng)建一個(gè)迭代器,不斷重復(fù) object 。除非設(shè)定參數(shù) times ,否則將無限重復(fù)。可用于 map() 函數(shù)中的參數(shù),被調(diào)用函數(shù)可得到一個(gè)不變參數(shù)。也可用于 zip() 的參數(shù)以在元組記錄中創(chuàng)建一個(gè)不變的部分。

大致相當(dāng)于:

def repeat(object, times=None):
    # repeat(10, 3) --> 10 10 10
    if times is None:
        while True:
            yield object
    else:
        for i in range(times):
            yield object

repeat 最常見的用途就是在 mapzip 提供一個(gè)常量流:

>>>
>>> list(map(pow, range(10), repeat(2)))
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
itertools.starmap(function, iterable)?

創(chuàng)建一個(gè)迭代器,使用從可迭代對象中獲取的參數(shù)來計(jì)算該函數(shù)。當(dāng)參數(shù)對應(yīng)的形參已從一個(gè)單獨(dú)可迭代對象組合為元組時(shí)(數(shù)據(jù)已被“預(yù)組對”)可用此函數(shù)代替 map()。map()starmap() 之間的區(qū)別可以類比 function(a,b)function(*c) 的區(qū)別。大致相當(dāng)于:

def starmap(function, iterable):
    # starmap(pow, [(2,5), (3,2), (10,3)]) --> 32 9 1000
    for args in iterable:
        yield function(*args)
itertools.takewhile(predicate, iterable)?

創(chuàng)建一個(gè)迭代器,只要 predicate 為真就從可迭代對象中返回元素。大致相當(dāng)于:

def takewhile(predicate, iterable):
    # takewhile(lambda x: x<5, [1,4,6,4,1]) --> 1 4
    for x in iterable:
        if predicate(x):
            yield x
        else:
            break
itertools.tee(iterable, n=2)?

從一個(gè)可迭代對象中返回 n 個(gè)獨(dú)立的迭代器。

下面的Python代碼能幫助解釋 tee 做了什么(盡管實(shí)際的實(shí)現(xiàn)更復(fù)雜,而且僅使用了一個(gè)底層的 FIFO 隊(duì)列)。

大致相當(dāng)于:

def tee(iterable, n=2):
    it = iter(iterable)
    deques = [collections.deque() for i in range(n)]
    def gen(mydeque):
        while True:
            if not mydeque:             # when the local deque is empty
                try:
                    newval = next(it)   # fetch a new value and
                except StopIteration:
                    return
                for d in deques:        # load it to all the deques
                    d.append(newval)
            yield mydeque.popleft()
    return tuple(gen(d) for d in deques)

一旦 tee() 實(shí)施了一次分裂,原有的 iterable 不應(yīng)再被使用;否則tee對象無法得知 iterable 可能已向后迭代。

tee 迭代器不是線程安全的。當(dāng)同時(shí)使用由同一個(gè) tee() 調(diào)用所返回的迭代器時(shí)可能引發(fā) RuntimeError,即使原本的 iterable 是線程安全的。

該迭代工具可能需要相當(dāng)大的輔助存儲空間(這取決于要保存多少臨時(shí)數(shù)據(jù))。通常,如果一個(gè)迭代器在另一個(gè)迭代器開始之前就要使用大部份或全部數(shù)據(jù),使用 list() 會比 tee() 更快。

itertools.zip_longest(*iterables, fillvalue=None)?

創(chuàng)建一個(gè)迭代器,從每個(gè)可迭代對象中收集元素。如果可迭代對象的長度未對齊,將根據(jù) fillvalue 填充缺失值。迭代持續(xù)到耗光最長的可迭代對象。大致相當(dāng)于:

def zip_longest(*args, fillvalue=None):
    # zip_longest('ABCD', 'xy', fillvalue='-') --> Ax By C- D-
    iterators = [iter(it) for it in args]
    num_active = len(iterators)
    if not num_active:
        return
    while True:
        values = []
        for i, it in enumerate(iterators):
            try:
                value = next(it)
            except StopIteration:
                num_active -= 1
                if not num_active:
                    return
                iterators[i] = repeat(fillvalue)
                value = fillvalue
            values.append(value)
        yield tuple(values)

如果其中一個(gè)可迭代對象有無限長度,zip_longest() 函數(shù)應(yīng)封裝在限制調(diào)用次數(shù)的場景中(例如 islice()takewhile())。除非指定, fillvalue 默認(rèn)為 None

itertools 配方?

本節(jié)將展示如何使用現(xiàn)有的 itertools 作為基礎(chǔ)構(gòu)件來創(chuàng)建擴(kuò)展的工具集。

基本上所有這些西方和許許多多其他的配方都可以通過 Python Package Index 上的 more-itertools 項(xiàng)目 來安裝:

python -m pip install more-itertools

擴(kuò)展的工具提供了與底層工具集相同的高性能。保持了超棒的內(nèi)存利用率,因?yàn)橐淮沃惶幚硪粋€(gè)元素,而不是將整個(gè)可迭代對象加載到內(nèi)存。代碼量保持得很小,以函數(shù)式風(fēng)格將這些工具連接在一起,有助于消除臨時(shí)變量。速度依然很快,因?yàn)閮A向于使用“矢量化”構(gòu)件來取代解釋器開銷大的 for 循環(huán)和 generator 。

def take(n, iterable):
    "Return first n items of the iterable as a list"
    return list(islice(iterable, n))

def prepend(value, iterator):
    "Prepend a single value in front of an iterator"
    # prepend(1, [2, 3, 4]) -> 1 2 3 4
    return chain([value], iterator)

def tabulate(function, start=0):
    "Return function(0), function(1), ..."
    return map(function, count(start))

def tail(n, iterable):
    "Return an iterator over the last n items"
    # tail(3, 'ABCDEFG') --> E F G
    return iter(collections.deque(iterable, maxlen=n))

def consume(iterator, n=None):
    "Advance the iterator n-steps ahead. If n is None, consume entirely."
    # Use functions that consume iterators at C speed.
    if n is None:
        # feed the entire iterator into a zero-length deque
        collections.deque(iterator, maxlen=0)
    else:
        # advance to the empty slice starting at position n
        next(islice(iterator, n, n), None)

def nth(iterable, n, default=None):
    "Returns the nth item or a default value"
    return next(islice(iterable, n, None), default)

def all_equal(iterable):
    "Returns True if all the elements are equal to each other"
    g = groupby(iterable)
    return next(g, True) and not next(g, False)

def quantify(iterable, pred=bool):
    "Count how many times the predicate is true"
    return sum(map(pred, iterable))

def pad_none(iterable):
    """Returns the sequence elements and then returns None indefinitely.

    Useful for emulating the behavior of the built-in map() function.
    """
    return chain(iterable, repeat(None))

def ncycles(iterable, n):
    "Returns the sequence elements n times"
    return chain.from_iterable(repeat(tuple(iterable), n))

def dotproduct(vec1, vec2):
    return sum(map(operator.mul, vec1, vec2))

def convolve(signal, kernel):
    # See:  https://betterexplained.com/articles/intuitive-convolution/
    # convolve(data, [0.25, 0.25, 0.25, 0.25]) --> Moving average (blur)
    # convolve(data, [1, -1]) --> 1st finite difference (1st derivative)
    # convolve(data, [1, -2, 1]) --> 2nd finite difference (2nd derivative)
    kernel = tuple(kernel)[::-1]
    n = len(kernel)
    window = collections.deque([0], maxlen=n) * n
    for x in chain(signal, repeat(0, n-1)):
        window.append(x)
        yield sum(map(operator.mul, kernel, window))

def flatten(list_of_lists):
    "Flatten one level of nesting"
    return chain.from_iterable(list_of_lists)

def repeatfunc(func, times=None, *args):
    """Repeat calls to func with specified arguments.

    Example:  repeatfunc(random.random)
    """
    if times is None:
        return starmap(func, repeat(args))
    return starmap(func, repeat(args, times))

def grouper(iterable, n, *, incomplete='fill', fillvalue=None):
    "Collect data into non-overlapping fixed-length chunks or blocks"
    # grouper('ABCDEFG', 3, fillvalue='x') --> ABC DEF Gxx
    # grouper('ABCDEFG', 3, incomplete='strict') --> ABC DEF ValueError
    # grouper('ABCDEFG', 3, incomplete='ignore') --> ABC DEF
    args = [iter(iterable)] * n
    if incomplete == 'fill':
        return zip_longest(*args, fillvalue=fillvalue)
    if incomplete == 'strict':
        return zip(*args, strict=True)
    if incomplete == 'ignore':
        return zip(*args)
    else:
        raise ValueError('Expected fill, strict, or ignore')

def triplewise(iterable):
    "Return overlapping triplets from an iterable"
    # triplewise('ABCDEFG') -> ABC BCD CDE DEF EFG
    for (a, _), (b, c) in pairwise(pairwise(iterable)):
        yield a, b, c

def sliding_window(iterable, n):
    # sliding_window('ABCDEFG', 4) -> ABCD BCDE CDEF DEFG
    it = iter(iterable)
    window = collections.deque(islice(it, n), maxlen=n)
    if len(window) == n:
        yield tuple(window)
    for x in it:
        window.append(x)
        yield tuple(window)

def roundrobin(*iterables):
    "roundrobin('ABC', 'D', 'EF') --> A D E B F C"
    # Recipe credited to George Sakkis
    num_active = len(iterables)
    nexts = cycle(iter(it).__next__ for it in iterables)
    while num_active:
        try:
            for next in nexts:
                yield next()
        except StopIteration:
            # Remove the iterator we just exhausted from the cycle.
            num_active -= 1
            nexts = cycle(islice(nexts, num_active))

def partition(pred, iterable):
    "Use a predicate to partition entries into false entries and true entries"
    # partition(is_odd, range(10)) --> 0 2 4 6 8   and  1 3 5 7 9
    t1, t2 = tee(iterable)
    return filterfalse(pred, t1), filter(pred, t2)

def before_and_after(predicate, it):
    """ Variant of takewhile() that allows complete
        access to the remainder of the iterator.

        >>> it = iter('ABCdEfGhI')
        >>> all_upper, remainder = before_and_after(str.isupper, it)
        >>> ''.join(all_upper)
        'ABC'
        >>> ''.join(remainder)     # takewhile() would lose the 'd'
        'dEfGhI'

        Note that the first iterator must be fully
        consumed before the second iterator can
        generate valid results.
    """
    it = iter(it)
    transition = []
    def true_iterator():
        for elem in it:
            if predicate(elem):
                yield elem
            else:
                transition.append(elem)
                return
    def remainder_iterator():
        yield from transition
        yield from it
    return true_iterator(), remainder_iterator()

def subslices(seq):
    "Return all contiguous non-empty subslices of a sequence"
    # subslices('ABCD') --> A AB ABC ABCD B BC BCD C CD D
    slices = starmap(slice, combinations(range(len(seq) + 1), 2))
    return map(operator.getitem, repeat(seq), slices)

def powerset(iterable):
    "powerset([1,2,3]) --> () (1,) (2,) (3,) (1,2) (1,3) (2,3) (1,2,3)"
    s = list(iterable)
    return chain.from_iterable(combinations(s, r) for r in range(len(s)+1))

def unique_everseen(iterable, key=None):
    "List unique elements, preserving order. Remember all elements ever seen."
    # unique_everseen('AAAABBBCCDAABBB') --> A B C D
    # unique_everseen('ABBCcAD', str.lower) --> A B C D
    seen = set()
    seen_add = seen.add
    if key is None:
        for element in filterfalse(seen.__contains__, iterable):
            seen_add(element)
            yield element
    else:
        for element in iterable:
            k = key(element)
            if k not in seen:
                seen_add(k)
                yield element

def unique_justseen(iterable, key=None):
    "List unique elements, preserving order. Remember only the element just seen."
    # unique_justseen('AAAABBBCCDAABBB') --> A B C D A B
    # unique_justseen('ABBCcAD', str.lower) --> A B C A D
    return map(next, map(operator.itemgetter(1), groupby(iterable, key)))

def iter_except(func, exception, first=None):
    """ Call a function repeatedly until an exception is raised.

    Converts a call-until-exception interface to an iterator interface.
    Like builtins.iter(func, sentinel) but uses an exception instead
    of a sentinel to end the loop.

    Examples:
        iter_except(functools.partial(heappop, h), IndexError)   # priority queue iterator
        iter_except(d.popitem, KeyError)                         # non-blocking dict iterator
        iter_except(d.popleft, IndexError)                       # non-blocking deque iterator
        iter_except(q.get_nowait, Queue.Empty)                   # loop over a producer Queue
        iter_except(s.pop, KeyError)                             # non-blocking set iterator

    """
    try:
        if first is not None:
            yield first()            # For database APIs needing an initial cast to db.first()
        while True:
            yield func()
    except exception:
        pass

def first_true(iterable, default=False, pred=None):
    """Returns the first true value in the iterable.

    If no true value is found, returns *default*

    If *pred* is not None, returns the first item
    for which pred(item) is true.

    """
    # first_true([a,b,c], x) --> a or b or c or x
    # first_true([a,b], x, f) --> a if f(a) else b if f(b) else x
    return next(filter(pred, iterable), default)

def random_product(*args, repeat=1):
    "Random selection from itertools.product(*args, **kwds)"
    pools = [tuple(pool) for pool in args] * repeat
    return tuple(map(random.choice, pools))

def random_permutation(iterable, r=None):
    "Random selection from itertools.permutations(iterable, r)"
    pool = tuple(iterable)
    r = len(pool) if r is None else r
    return tuple(random.sample(pool, r))

def random_combination(iterable, r):
    "Random selection from itertools.combinations(iterable, r)"
    pool = tuple(iterable)
    n = len(pool)
    indices = sorted(random.sample(range(n), r))
    return tuple(pool[i] for i in indices)

def random_combination_with_replacement(iterable, r):
    "Random selection from itertools.combinations_with_replacement(iterable, r)"
    pool = tuple(iterable)
    n = len(pool)
    indices = sorted(random.choices(range(n), k=r))
    return tuple(pool[i] for i in indices)

def nth_combination(iterable, r, index):
    "Equivalent to list(combinations(iterable, r))[index]"
    pool = tuple(iterable)
    n = len(pool)
    c = math.comb(n, r)
    if index < 0:
        index += c
    if index < 0 or index >= c:
        raise IndexError
    result = []
    while r:
        c, n, r = c*r//n, n-1, r-1
        while index >= c:
            index -= c
            c, n = c*(n-r)//n, n-1
        result.append(pool[-1-n])
    return tuple(result)