タスク
Base.@task ── マクロ
Base.@async ── マクロ
Base.asyncmap ── 関数
asyncmap(f, c...; ntasks=0, batch_size=nothing)
並列な複数のタスクを使って f をコレクションの各要素に適用します。c に複数のコレクションが渡されると、それぞれから要素を一つずつ取ったものが f の引数となります。
ntasks には並行に実行されるタスクの数を指定します。ntasks を指定しなければ、コレクションの長さに応じて最大で 100 個のタスクが使われます。
ntasks にはゼロ引数の関数を指定することもできます。そうすると各要素の処理を開始する前に ntasks を通して並行に実行するタスクの個数の上限が確認され、ntasks が返した値が現在の並列タスクの個数より大きいときに限って新しいタスクが開始されるようになります1。
batch_size が指定されると、コレクションはバッチモードで処理されます。このとき f はコレクションの要素を並べたタプルからなる Vector を引数に受け取り、その処理結果からなる Vector を返す必要があります。入力のベクトルは batch_size と同じかそれ以下の長さを持ちます。
次に示すいくつかの例では、タスクの objectid を返す関数を使うことで asyncmap に渡された関数がどのタスクに割り当てられるかを確認しています。
まず ntasks が指定されないとき、全ての要素は異なるタスクで処理されます:
julia> tskoid() = objectid(current_task());
julia> asyncmap(x->tskoid(), 1:5)
5-element Array{UInt64,1}:
0x6e15e66c75c75853
0x440f8819a1baa682
0x9fb3eeadd0c83985
0xebd3e35fe90d4050
0x29efc93edce2b961
julia> length(unique(asyncmap(x->tskoid(), 1:5)))
5
ntasks=2 とすると、全ての要素が二つのタスクで処理されます:
julia> asyncmap(x->tskoid(), 1:5; ntasks=2)
5-element Array{UInt64,1}:
0x027ab1680df7ae94
0xa23d2f80cd7cf157
0x027ab1680df7ae94
0xa23d2f80cd7cf157
0x027ab1680df7ae94
julia> length(unique(asyncmap(x->tskoid(), 1:5; ntasks=2)))
2
batch_size を指定するとき、asyncmap に渡す関数はコレクションの要素を並べたタプルからなる配列を受け取り、結果の配列を返す必要があります。次の例では map を使って配列に対応しています:
julia> batch_func(input) = map(x->string("args_tuple: ", x,
", element_val: ", x[1],
", task: ", tskoid()),
input)
batch_func (generic function with 1 method)
julia> asyncmap(batch_func, 1:5; ntasks=2, batch_size=2)
5-element Array{String,1}:
"args_tuple: (1,), element_val: 1, task: 9118321258196414413"
"args_tuple: (2,), element_val: 2, task: 4904288162898683522"
"args_tuple: (3,), element_val: 3, task: 9118321258196414413"
"args_tuple: (4,), element_val: 4, task: 4904288162898683522"
"args_tuple: (5,), element_val: 5, task: 9118321258196414413"
現在、Julia における全てのタスクは単一の OS スレッドで協調して実行されます。asyncmap を使う意味があるのは、渡される関数が IO (ディスク・ネットワーク・リモートワーカーの起動など) を行うときだけです。
Base.asyncmap! ── 関数
Base.current_task ── 関数
Base.istaskdone ── 関数
istaskdone(t::Task) -> Bool
タスクが終了したかどうかを判定します。
例
julia> a2() = sum(i for i in 1:1000);
julia> b = Task(a2);
julia> istaskdone(b)
false
julia> schedule(b);
julia> yield();
julia> istaskdone(b)
true
Base.istaskstarted ── 関数
istaskstarted(t::Task) -> Bool
タスクが実行を開始したかどうかを判定します。
例
julia> a3() = sum(i for i in 1:1000);
julia> b = Task(a3);
julia> istaskstarted(b)
false
Base.istaskfailed ── 関数
istaskfailed(t::Task) -> Bool
タスクが例外の送出により終了したかどうかを判定します。
例
julia> a4() = error("task failed");
julia> b = Task(a4);
julia> istaskfailed(b)
false
julia> schedule(b);
julia> yield();
julia> istaskfailed(b)
true
Base.task_local_storage ── メソッド
Base.task_local_storage ── メソッド
Base.task_local_storage ── メソッド
task_local_storage(body, key, value)
タスクローカルストレージの key に value を対応させた状態で body を呼び出し、その後 key に対応する値 (もしくは key に値が割り当てられていない状態) を復元します。動的スコープのエミュレートに有用です。
スケジューリング
Base.yield ── 関数
yield()
スケジューラに実行を切り替えて、スケジュールされた他のタスクを実行できるようにします。この関数を呼び出したタスクは依然 runnable であり、他に runnable なタスクが存在しなければすぐに再開されます。
yield(t::Task, arg = nothing)
schedule(t, arg); yield() と等価です。このメソッドの方が二つの関数を別々に呼び出すより高速ですが、スケジューラを呼び出すことなく t に直接 yield するので、使うとスケジュールが不平等になります。
Base.yieldto ── 関数
yieldto(t::Task, arg = nothing)
指定されたタスクに実行を切り替えます。あるタスクに対して実行が最初に切り替わったとき、タスクの関数は引数無しに呼ばれます。以降の切り替わりでは、タスクが最後に呼んだ yieldto の返り値が arg となります。これはタスクの切り替えのみを行う低水準の関数であり、タスクの状態やスケジュールを一切関知しません。この関数の使用は推奨されません。
Base.sleep ── 関数
Base.schedule ── 関数
schedule(t::Task, [val]; error=false)
スケジューラのキューにタスク t を追加します。引数のタスク t が wait のようなブロックする処理を実行せず、さらにシステムが他に実行するタスクを持たないなら、タスク t の実行が最後まで継続されます。
第二引数 val が与えられると、それはタスク t が実行を再開するときに (yieldto の返り値として) 渡されます。error が true なら、再開されたタスクで val を値とする例外が送出されます。
例
julia> a5() = sum(i for i in 1:1000);
julia> b = Task(a5);
julia> istaskstarted(b)
false
julia> schedule(b);
julia> yield();
julia> istaskstarted(b)
true
julia> istaskdone(b)
true
同期
Base.@sync ── マクロ
@sync
字句的に囲まれた範囲で使われる @async, @spawn, spawnat, @distributed が全て完了するのを待ちます。内部の非同期処理で送出された全ての例外はまとめられ、一つの CompositeException として送出されます。
Base.wait ── 関数
Threads.Condition に対する wait に関する特別な注意点:
呼び出し側は、wait を呼び出す前に対象の Condition に対する lock を取得する必要があります。wait を呼び出したタスクは他のタスクによって (通常は同じ Condition に対する notify で) 起動されるまでブロックします。ブロックしている間 (再帰的にロックされていたとしても) ロックは不可分に解放され、返るときに再取得されます。
wait([x])
何らかのイベントが起こるまで現在のタスクをブロックします。待つイベントは引数の型で決まります:
-
Channel: チャンネルに値が追加されるまで待つ。 -
Condition: 条件に対するnotifyを待つ。 Process: プロセスあるいはプロセスチェーンが終了するのを待つ。プロセスの成功/失敗はプロセスのexitcodeフィールドで判定できる。-
Task: タスクが終了するのを待つ。タスクが例外を送出して失敗した場合には、失敗したタスクをラップするTaskFailedExceptionが送出される。 -
RawFD: ファイル記述子の変更を待つ (FileWatchingパッケージを参照)。
引数が渡されないと、ブロック時間は未定義となります。こうなったタスクは schedule または yieldto を明示的に呼び出すことで再開できます。
wait は while ループの中で処理を進めるのに必要な条件が成り立っていることを確認するためよく使われます。
Base.fetch ── メソッド
fetch(t::Task)
タスクが終わるのを待ち、その結果を返します。タスクが例外を送出して失敗すると、そのタスクをラップした TaskFailedException が送出されます。
Base.timedwait ── 関数
timedwait(testcb::Function, timeout::Real; pollint::Real=0.1)
testcb が true を返すか、timeout 秒が経過するまで待ちます。どちらかが起こればこの関数は返ります。testcb は pollint 秒ごとに状態を確認されます。timeout と pollint に指定できるのは 1 ミリ秒 (0.001) 以上の値です。
:ok または :timed_out を返します。
Base.Condition ── 型
Condition()
タスクが wait できるエッジトリガのイベントソースを作成します。Condition に対して wait を行ったタスクは停止され、キューに入れられます。その後キューに入れられたタスクは同じ Condition に対して notify が呼ばれたときに起動されます。「エッジトリガ」とは、notify が呼ばれたときに待っているタスクだけが起動することを意味します。レベルトリガの通知を行うには、通知が起こったかを記録する状態を別に用意する必要があります。Channel 型と Threads.Event 型はこれを行うので、レベルトリガのイベントが必要なときはこれらを利用できます。
このオブジェクトはスレッドセーフではありません。スレッドセーフなバージョンについては Threads.Condition を参照してください。
Base.notify ── 関数
notify(condition, val=nothing; all=true, error=false)
条件に対して待っているタスクを起動し、値 val を渡します。all が true (デフォルト値) なら待っているタスクが全て起動され、all が false なら一つのタスクだけが起動されます。error が true なら、起動されるタスクで渡された値 val が例外として送出されます。
起動したタスクの個数を返します。condition に待っているタスクが無ければ 0 が返ります。
Base.Semaphore ── 型
Base.acquire ── 関数
acquire(s::Semaphore)
セマフォ s のパーミットが取得可能になったら取得します。s が持つ sem_size 個のパーミットのいずれかが取得できるようになるまでブロックします。
Base.release ── 関数
Base.lock ── 関数
lock(lock)
lock が取得可能になったら取得します。lock が他のタスク/スレッドによってロックされている場合には、利用可能になるまで待ちます。
lock には必ず unlock が対応する必要があります。
lock(f::Function, lock)
lock を取得し、lock を保持した状態で f を実行し、f が返ったら lock を解放します。もし lock が他のタスク/スレッドによってロックされている場合には、利用可能になるまで待ちます。
この関数が返るとき lock は解放されているので、呼び出し側から unlock する必要はありません。
Base.unlock ── 関数
Base.trylock ── 関数
Base.islocked ── 関数
Base.ReentrantLock ── 型
ReentrantLock()
Task 間の同期に利用する再入可能 (re-entrant) なロックを作成します。一つのタスクは ReentrantLock を必要なだけ何度でも取得できます。ReentrantLock の lock には必ず unlock が対応する必要があります。
lock を呼び出すと、対応する unlock を呼び出すまでそのスレッドでファイナライザの実行が行われなくなります。次に示す標準のロックパターンは自然にサポートされますが、try と lock の順番を間違えたり、try ブロックの実行が飛ばされる (例えばロックを保持したまま関数から返る) ことのないよう注意してください:
lock(l)
try
<アトミックな処理>
finally
unlock(l)
end
チャンネル
Base.Channel ── 型
Channel{T=Any}(size::Int=0)
T 型のオブジェクトを最大で size 個だけ保持できる内部バッファを持った Channel を構築します。満杯のチャンネルに対して put! を呼び出すと、他のタスクが take! を呼び出しチャンネルからオブジェクトを削除するまでブロックします。
Channel(0) はバッファを持たないチャンネルを構築します。このチャンネルでは任意の put! が take! されるまでブロックし、逆も同様となります。
他のコンストラクタは次の通りです:
Channel(): デフォルトコンストラクタであり、Channel{Any}(0)と等価です。Channel(Inf):Channel{Any}(typemax(Int))と等価です。Channel(sz):Channel{Any}(sz)と等価です。
デフォルトコンストラクタ Channel() とデフォルト値 size=0 は Julia 1.3 で追加されました。
Base.Channel ── メソッド
Channel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false)
func から新しいタスクを作成し、T 型でサイズ size の新しいチャンネルにタスクをバインドし、タスクをスケジュールするという処理を一度の呼び出しで行います。
func は唯一の引数としてバインドされるチャンネルを受け取らなければなりません。
作成されるタスクを参照する必要があるなら、キーワード引数 taskref に Ref{Task} オブジェクトを渡してください。
spawn = true だと、func を実行する新しいタスクを他のスレッドで並列にスケジュールすることを許可できます。つまり Threads.@spawn でタスクを作成するのと等価になります。
Channel 型の値を返します。
例
julia> chnl = Channel() do ch
foreach(i -> put!(ch, i), 1:4)
end;
julia> typeof(chnl)
Channel{Any}
julia> for i in chnl
@show i
end;
i = 1
i = 2
i = 3
i = 4
作成されたタスクを参照する例を示します:
julia> taskref = Ref{Task}();
julia> chnl = Channel(taskref=taskref) do ch
println(take!(ch))
end;
julia> istaskdone(taskref[])
false
julia> put!(chnl, "Hello");
Hello
julia> istaskdone(taskref[])
true
キーワード引数 spawn は Julia 1.3 で追加されました。1.3 より前の Julia では、Channel のコンストラクタで size と T を指定するのにキーワード引数が使われていました。現在このコンストラクタは非推奨です。
julia> chnl = Channel{Char}(1, spawn=true) do ch
for c in "hello world"
put!(ch, c)
end
end
Channel{Char}(sz_max:1,sz_curr:1)
julia> String(collect(chnl))
"hello world"
Base.take! ── メソッド
Base.isready ── メソッド
Base.fetch ── メソッド
fetch(c::Channel)
チャンネルから値が利用可能になるのを待ち、利用可能になったら取得して返します。バッファを持たない (サイズが 0 の) チャンネルで fetch はサポートされません。
Base.close ── メソッド
Base.bind ── メソッド
bind(chnl::Channel, task::Task)
チャンネル chnl の寿命をタスク task に関連付けます。chnl はタスクが終了するときに自動的に閉じられるようになります。また task で発生した未捕捉の例外は chnl を待つ全てのタスクに伝播します。
chnl オブジェクトはタスクの終了と関係ないタイミングで閉じることもできます。Channel オブジェクトが閉じているならタスクが終了しても何も起きません。
一つのチャンネルが複数のタスクにバインドされると、タスクが一つでも終了した段階でチャンネルが閉じられます。複数のチャンネルが一つのタスクにバインドされると、タスクが終了するときにバインドされたチャンネルが全て閉じられます。
例
julia> c = Channel(0);
julia> task = @async foreach(i->put!(c, i), 1:4);
julia> bind(c,task);
julia> for i in c
@show i
end;
i = 1
i = 2
i = 3
i = 4
julia> isopen(c)
false
julia> c = Channel(0);
julia> task = @async (put!(c, 1); error("foo"));
julia> bind(c, task);
julia> take!(c)
1
julia> put!(c, 1);
ERROR: TaskFailedException:
foo
Stacktrace:
[...]
-
訳注: 英語版には逆のことが書かれていたので、記述を修正した。[return]