Geventの良さ2

これはメモです

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(processes=4) as pool:         # start 4 worker processes
        result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously
        print(result.get(timeout=1))        # prints "100" unless your computer is *very* slow

        print(pool.map(f, range(10)))       # prints "[0, 1, 4,..., 81]"

        it = pool.imap(f, range(10))
        print(next(it))                     # prints "0"
        print(next(it))                     # prints "1"
        print(it.next(timeout=1))           # prints "4" unless your computer is *very* slow

        import time
        result = pool.apply_async(time.sleep, (10,))
        print(result.get(timeout=1))        # raises TimeoutError

正直、asyncとかいう単語だけでも興奮しますね。
このソースからでもmapとimapの違いわかりますね。

map_unordered()の説明がいまいち理解出来ませんでしたが、


class threading.Lock

プリミティブロック (primitive lock) オブジェクトを実装しているクラスです。スレッドが一度ロックを獲得すると、それ以後のロック獲得の試みはロックが解放されるまでブロックします。どのスレッドでもロックを解放できます。

バージョン 3.3 で変更: ファクトリ関数からクラスに変更されました。

acquire(blocking=True, timeout=-1)

ブロックあり、またはブロックなしでロックを獲得します。

引数 blocking を True (デフォルト) に設定して呼び出した場合、ロックがアンロック状態になるまでブロックします。そしてそれをロック状態にしてから True を返します。

引数 blocking の値を False にして呼び出すとブロックしません。 blocking を True にして呼び出した場合にブロックするような状況では、直ちに False を返します。それ以外の場合には、ロックをロック状態にして True を返します。

正の値に設定された浮動小数点の timeout 引数とともに起動された場合、ロックを得られなければ最大で timeout によって指定された秒数だけブロックします。timeout 引数の -1 は無制限の待機を指定します。blocking が false の場合に timeout を指定することは禁止されています。

ロックを獲得すると True を、ロックを獲得できなかったとき (例えば timeout が過ぎた場合) には False を返します。

バージョン 3.2 で変更: 新しい timeout 引数

バージョン 3.2 で変更: POSIX ではロックの取得がシグナルに割り込まれるようになりました。

release()

ロックを解放します。これはロックを獲得したスレッドだけでなく、任意のスレッドから呼ぶことができます。

ロックの状態がロックのとき、状態をアンロックにリセットして処理を戻します。他のスレッドがロックがアンロック状態になるのを待ってブロックしている場合、ただ一つのスレッドだけが処理を継続できるようにします。

アンロック状態のロックに対して起動された場合、 RuntimeError が送出されます。

戻り値はありません。




geventに

from gevent.event import AsyncResult

a = AsyncResult()
...
a.set()
...
a.get()

と書いてて、なんだろうと調べてみました。

すると、threading.Eventに書いてました。
うん、セマフォと何が違うねん。っていうのが第一印象です。
ちなみに、推測ですが、 getがclearと同じ挙動してると思います。

Event オブジェクトの拡張の AsyncResult を使うと、モーニングコール (wakeup call) 付きの値を送ることができます。 これは将来どこかのタイミングで設定される値に対する参照を持っているので、 future とか deferred と呼ばれることもあります。

って書いてありますが、意味がわかりません。

import gevent
from gevent.event import AsyncResult

a = AsyncResult()

def setter():
    
	print("1")

	gevent.sleep(3)
    
	print("2")
    
	a.set()
    
	print("3")

def waiter():
    """
    After 3 seconds the get call will unblock.
    """
    print("4")
    a.get() # blocking
    print("5")
    print 'I live!'

gevent.joinall([
    gevent.spawn(setter),
    gevent.spawn(waiter),
])

実行結果

1
4
2
3
5
I live!

テストみました.

すると,
1
4
ここで、3秒間待ちます。
2
3
5となります.
要は、get()が呼ばれてから3秒待つという感じですね。

そのあとに
2
3
が出力されるのは分かるんですが、
なんで 5 も出力されるんだろう?

a.setされてるととかんがえるより、
a.getでブロックが起こってるって考えるほうが無難なイメージです。

import gevent
from gevent.event import AsyncResult

a = AsyncResult()

def setter():
    
	print("1")

	gevent.sleep(1)
    
	print("2")
    
    
	print("3")

def waiter():
	print "4"
	print "5"
	a.get()
	print "6"
	print "I live"

gevent.joinall([
    gevent.spawn(setter),
    gevent.spawn(waiter),
])


テスト用に
書いてみたら、

1
4
5
2
3
Traceback (most recent call last):
  File "07-2.py", line 26, in <module>
    gevent.spawn(waiter),
  File "/usr/local/lib/python2.7/dist-packages/gevent-1.0-py2.7-linux-i686.egg/gevent/greenlet.py", line 400, in joinall
    wait(greenlets, timeout=timeout)
  File "/usr/local/lib/python2.7/dist-packages/gevent-1.0-py2.7-linux-i686.egg/gevent/hub.py", line 645, in wait
    return list(iwait(objects, timeout))
  File "/usr/local/lib/python2.7/dist-packages/gevent-1.0-py2.7-linux-i686.egg/gevent/hub.py", line 598, in iwait
    item = waiter.get()
  File "/usr/local/lib/python2.7/dist-packages/gevent-1.0-py2.7-linux-i686.egg/gevent/hub.py", line 568, in get
    return self.hub.switch()
  File "/usr/local/lib/python2.7/dist-packages/gevent-1.0-py2.7-linux-i686.egg/gevent/hub
.py", line 331, in switch
    return greenlet.switch(self)
gevent.hub.LoopExit: This operation would block forever

案の定エラーが起きました。

並列っていうなら、gevent.sleep(5)とかしてる最中に違うプロセスでひたすらprintとか出来るんだろうか?

Eventフラグの初期値って Falseですよね?
だったら、その地点でブロッキングされてるんじゃないんですか?

あとで、師匠に聞こう…

今回は、Geventとmultiprocessingの似たような、似てないような
クラスの困惑に詰みました。
昨日も言いましたが、そろそろScapyに入りたい。


##追記##

師匠の返事

1.

> multiprocessingは動かしてるコアの数を言っているんですかね?
> で、geventの方は非同期のプロセスワーカの数を言ってるんですかね?

これはイメージ的にはだいたいOKですが、厳密にはmultiprocessingの方が少し違います。
マルチコアというのはHWです、CPUがコアを複数持っているってことですね。だから厳密にいうとmultiprocessingは動かす「プロセス」の数、geventの方はワーカ(greeenlet, コルーチン)ですね。
プロセスとコアの関係は、以下のような感じです。
OSがコア(CPU)にプロセスを割り当てる(プロセスにCPUを使わせる)んです。これがOSのスケジューリングですね。

参考サイト

2.

>map() 組み込み関数の並列版です (iterable な引数を1つだけサポートするという違いはありますが)。結果が出るまでブロックします。
>って書いてあるじゃないですか?
>並列なのにブロックしてたら遅いというか、個人的には矛盾を感じるんですけど。
Pool()

ここでブロックするのは親プロセスですね、複数の子プロセスを作って子どもの仕事が終わるのを待つ(ブロックする)ってことです。
普通のmapは一つのプロセスだけが仕事をしますが、multiprocessingのmapは複数の子プロセスを作って並列で仕事をさせるので高速になるという仕組みです。
残りはまたちょっと後でー

3.

> これの不思議なのが、
> multiprocessingがなんで、
> [1,2,3,4,5...]とならないかっていうことなんですが、
> そもそもなんでなんですか?

ここのポイントはさっき言ったようにプロセスのスケジューリングをするのはOSだというところです。multiprocessingはプロセスを10こ作ってそれぞれにecho()を実行させますが、どのプロセスが実行されるか(CPUが割り当てられるか)はOSが決めます。
その順番はいつも決まった順にはならないので[1,2,3,4,5,...]と順番にはならないということです。

4.

> p = Pool(10)で
> コアを増やして10個分にタスクを分けるんですよね?
>けど、greenletはコアは増えない
> っていうその違いだけで、なんでこんなことがおこるんですか?

greenletは1プロセスだからいつも同じ順ということですが、もう少し言うとgreenletがいつも同じ順になるように作られている(決定論的に動作する)からです。
チュートリアルのとこに書いてある以下です。

    • -

先に触れたように、 greenlet は決定論的に動作します。 同じように設定した greenlets があって、同じ入力が与えられた場合、 必ず同じ結果になります。例として、タスクを multiprocessing の pool に 与えた場合と gevent pool に与えた場合を比較します。

例えばmultiprocessingでもEventとかの同期プリミティブを使うと[1,2,3,4,5...]となるようにもできます。
↓こんな感じに使います。
参考サイト
これだとプロセス間で同期を取っているので必ず[1,2,3,4,5...]となります。

5.

> それからそれををロック(acquire)すると、
> Pool(10)で、10個ぐらい動かしていても、すべて止まってしまうんですか?

みんなでロックを獲得しようとすると、一人以外はみんな待たされますね。
lockは、複数人で同時に実行されると困る処理があったときに、必ず一人ずつに実行させたいときに使います。

> ロックって同期的な処理ですよね?

そうですね。
でも同期的な処理というよりは、(複数のワーカー/プロセスで)同期を取るための手段ですね。


> multiprocessingとかgeventのPoolは非同期ですよね?

はい、そうですね。

以上です