Distributed
Distributed.addprocs ── 関数
addprocs(manager::ClusterManager; kwargs...) -> プロセス ID のリスト
指定したクラスターマネージャを通してワーカープロセスを起動します。
例えば Beowulf クラスターは ClusterManagers.jl が実装する独自のクラスターマネージャを通してサポートされます。
新しく起動したワーカーがマスターからの接続確立を待機する秒数は、ワーカープロセスの環境変数 JULIA_WORKER_TIMEOUT で指定できます。この値は TCP/IP トランスポートを利用するときにだけ意味を持ちます。
REPL をブロックせずにワーカーを起動するには、addprocs を個別のタスクとして実行してください。ワーカーをプログラム的に起動する関数でも同様です。
例
# 忙しいクラスターでは、addprocs を非同期的に呼び出す。
t = @async addprocs(...)
# オンラインになったワーカーを利用する。
if nprocs() > 1 # 少なくとも一つの新しいワーカーが利用可能なことを確認する。
.... # 分散計算を行う
end
# 新しく起動したワーカーの ID またはエラーメッセージを取得する。
if istaskdone(t) # fetch がブロックしないように addprocs が完了したことを確認する。
if nworkers() == N
new_pids = fetch(t)
else
fetch(t)
end
end
addprocs(machines;
tunnel=false,
sshflags=``,
max_parallel=10,
kwargs...) -> プロセス ID のリスト
SSH を通してリモートマシン上にプロセスを追加します。各ノードに julia が必要であり、同じ場所にインストールされているか、共有ファイルシステムで利用可能でなければなりません。
machines はマシン仕様 (machine specification) のベクトルです。ワーカーはマシン仕様ごとに起動されます。
マシン仕様は文字列 machine_spec またはタプル (machine_spec, count) で表されます。
machine_spec は [user@]host[:port] [bind_addr[:port]] という形の文字列です。デフォルトでは user は現在のユーザー、port は標準の SSH ポートとなります。[bind_addr[:port]] が指定されると、このワーカーに対する他のワーカーからの接続は指定された bind_addr と port を使うようになります。
count は指定されたホストで起動されるワーカーの個数です。:auto を指定すると、そのホストにおける CPU スレッドと同じ個数のワーカーが起動します。
キーワード引数
-
tunnel:trueだと、マスタープロセスからワーカーへの接続で SSH トンネリングが使われます。デフォルトはfalseです。 -
multiplex:trueだと、SSH トンネリングで SSH 多重化が使われます。デフォルトはfalseです。 -
sshflags: SSH の追加オプションを指定します。例えばsshflags=`-i /home/foo/bar.pem`などとします。 -
max_parallel: ホストへ同時に接続できるワーカーの個数の上限です。デフォルトは10です。 -
dir: ワーカーのワーキングディレクトリを指定します。デフォルトはホストのカレントディレクトリ (pwd()の返り値) です。 -
enable_threaded_blas:trueだと、追加されたプロセスで BLAS がマルチスレッドで実行されるようになります。デフォルトはfalseです。 -
exename:julia実行形式の名前です。デフォルトは場合に応じて"$(Sys.BINDIR)/julia"または"$(Sys.BINDIR)/julia-debug"です。 -
exeflags: ワーカープロセスに渡される追加のフラグです。 -
topology: ワーカー同士がどのように接続するかを指定します。接続されていないワーカー間でメッセージを送信するとエラーとなります。-
topology=:all_to_all: 全てのプロセスが互いに接続します。デフォルトです。 -
topology=:master_worker:pidが 1 のドライバプロセスだけがワーカーと接続し、ワーカー同士は接続しません。 -
topology=:custom: クラスターマネージャのlaunchメソッドがWorkerConfigのidentとconnect_identsのフィールドを通じて接続トポロジーを指定します。identというクラスターマネージャ ID を持つワーカーはconnect_identsに含まれる全てのワーカーと接続します。
-
-
lazy:topology=:all_to_allのときにだけ意味を持ちます。trueだと、ワーカー同士の接続確立が遅延されます。つまりワーカー間で初めてリモートコールが起こったときに接続がセットアップされます。デフォルトはtrueです。
環境変数
マスタープロセスが新しく起動したワーカーとの接続を 60.0 秒が経過するまでの間に確立できなかった場合、ワーカーはそれを致命的な状況と扱い、終了します。このタイムアウトは環境変数 JULIA_WORKER_TIMEOUT で制御でき、マスタープロセスにおける JULIA_WORKER_TIMEOUT の値が新しいワーカーが接続確立を待機する秒数を指定します。
addprocs(; kwargs...) -> プロセス ID のリスト
addprocs(Sys.CPU_THREADS; kwargs...) と等価です。
ワーカーはスタートアップスクリプト .julia/config/startup.jl を実行しないことに注意してください。またグローバルな状態 (グローバル変数・新しいメソッドの定義・読み込まれたモジュールなど) を実行中の他プロセスと同期することもありません。
addprocs(np::Integer; restrict=true, kwargs...) -> プロセス ID のリスト
組み込みの LocalManager を使ってワーカーを起動します。このマネージャはローカルホストでのみワーカーを起動するので、マルチコアを活用できます。例えば addprocs(4) はローカルマシンに四つのプロセスを追加します。restrict が true だとバインド先が 127.0.0.1 に制限されます。キーワード引数 dir, exename, exeflags, topology, lazy, enable_threaded_blas の効果は addprocs のドキュメントにある通りです。
Distributed.nprocs ── 関数
nprocs()
利用可能なプロセスの個数を返します。
例
julia> nprocs()
3
julia> workers()
5-element Array{Int64,1}:
2
3
Distributed.nworkers ── 関数
Distributed.procs ── メソッド
Distributed.procs ── メソッド
procs(pid::Integer)
同じ物理ノード上にある全てのプロセス ID のリストを返します。正確に言うと、pid と同じ IP アドレスにバインドされた全てのワーカーが返ります。
Distributed.workers ── 関数
workers()
全てのワーカープロセス ID のリストを返します。
例
$ julia -p 5
julia> workers()
2-element Array{Int64,1}:
2
3
Distributed.rmprocs ── 関数
rmprocs(pids...; waitfor=typemax(Int))
指定されたワーカーを削除します。ワーカーを追加/削除できるのはプロセス 1 だけであることに注意してください。
キーワード引数 waitfor はワーカーがシャットダウンするまで待つ秒数を指定します。
waitforを指定しないと、rmprocsは要求されたpidsが全て削除されるまで待ちます。- 要求した
waitfor秒までの間に全てのワーカーが終了できないと、ErrorExceptionが送出されます。 -
waitforが0だとrmprocsはすぐに返り、違うタスクでワーカーの削除がスケジュールされます。スケジュールされたTaskオブジェクトが返ります。ユーザーは他の並列な関数を呼び出す前にrmprocsが返すタスクに対してwaitを行うべきです。
例
$ julia -p 5
julia> t = rmprocs(2, 3, waitfor=0)
Task (runnable) @0x0000000107c718d0
julia> wait(t)
julia> workers()
3-element Array{Int64,1}:
4
5
6
Distributed.interrupt ── 関数
interrupt(pids::Integer...)
指定されたワーカーで現在実行中のタスクを中断します。これはローカルマシンで Ctrl-C を入力するのと等価です。引数が与えられなければ、全てのワーカーが中断されます。
interrupt(pids::AbstractVector=workers())
指定されたワーカーで現在実行中のタスクを中断します。これはローカルマシンで Ctrl-C を入力するのと等価です。引数が与えられなければ、全てのワーカーが中断されます。
Distributed.myid ── 関数
Distributed.pmap ── 関数
pmap(f,
[::AbstractWorkerPool],
c...;
distributed=true,
batch_size=1,
on_error=nothing,
retry_delays=[],
retry_check=nothing) -> collection
コレクション c を変形します。利用可能なワーカーとタスクを使って c の各要素に f が適用されます。
c に複数のコレクションが渡されると、それぞれから要素を一つずつ取ったものが f の引数となります。
f は全てのワーカープロセスで利用可能である必要があることに注意が必要です。詳細はマニュアルのパッケージの読み込みとコードの可視性の章を参照してください。
ワーカープールが指定されないと、利用可能な全てのワーカー (デフォルトのワーカープール) が使われます。
デフォルトで pmap は指定された全てのワーカーに計算を分散します。ローカルプロセスだけを使ってタスクで計算を分散させるには distributed=false を指定してください。このときの振る舞いは asyncmap と等価であり、例えば pmap(f, c; distributed=false) は asyncmap(f, c; ntasks=()->nworkers()) と同じです。
引数 batch_size を使うと、pmap でプロセスとタスクを混ぜて利用できます。batch_size を 1 より大きくすると、コレクションは複数のバッチで処理されます。各バッチは batch_size 以下の長さを持ち、利用可能なワーカーに単一のリクエストとして送られます。これに対してローカルな asyncmap では、各要素はバッチの一部として複数の並行なタスクを使って処理されます。
エラーが一つでも発生すると、pmap はコレクションの残りの部分の処理を止めます。この振る舞いを上書きするには、引数 on_error にエラー処理関数を指定してください。この関数はエラーを唯一の引数として受け取り、エラーを再度送出して処理を止めるか、何らかの値を返して処理を継続させるかを選択できます。後者では返した値が結果として呼び出し側に返ります。
on_error の使用例を二つ示します。一つ目では例外オブジェクトが返り値の配列に組み込まれ、二つ目では例外が発生した場所が 0 となっています:
julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=identity)
4-element Array{Any,1}:
1
ErrorException("foo")
3
ErrorException("foo")
julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=ex->0)
4-element Array{Int64,1}:
1
0
3
0
失敗した計算をやり直すことでエラーを処理することもできます。pmap のキーワード引数 retry_delays と retry_check が retry にキーワード引数 delays および check として渡されます。バッチを使っていてバッチ全体が失敗したときは、そのバッチに含まれる要素が全てやり直されます。
on_error と retry_delays が両方指定されると、やり直しの前に on_error が呼ばれます。on_error が例外を (再) 送出しなかった場合には、要素に対する処理のやり直しは行われません。
例: 次の呼び出しはエラーが起きた要素に対して f を最大三回やり直します。やり直しの間に遅延は挿入されません。
pmap(f, c; retry_delays = zeros(3))
例: 次の呼び出しは例外が InexactError でないときに限って f をやり直します。やり直しは指数的に長くなる遅延を使って最大三回行われます。InexactError が起こった場所には NaN が挿入されます。
pmap(f, c;
on_error = e->(isa(e, InexactError) ? NaN : rethrow()),
retry_delays = ExponentialBackOff(n = 3))
Distributed.RemoteException ── 型
RemoteException(captured)
リモート計算で発生した例外をローカルでキャプチャして再度送出するときに使われる例外型です。RemoteException 型の値はワーカーの pid とキャプチャされた例外を包みます。CapturedException 型のフィールド captured は例外が送出された時点におけるリモートのコールスタックをシリアライズ可能な形にしたものと例外オブジェクトを保持します。
Distributed.Future ── 型
Future(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing)
Future は終了までの時間と終了状態が未知である単一の計算を表すプレースホルダーです。複数の計算に対しては RemoteChannel を使ってください。AbstractRemoteRef の特定には remoteref_id が利用できます。
Distributed.RemoteChannel ── 型
RemoteChannel(pid::Integer=myid())
プロセス pid に存在する Channel{Any}(1) に対する参照を作成します。デフォルトの pid は現在のプロセスです。
RemoteChannel(f::Function, pid::Integer=myid())
指定されたサイズと型を持つリモートのチャンネルに対する参照を作成します。f は pid を受け取って AbstractChannel の実装を返す関数でなければなりません。
例えば RemoteChannel(()->Channel{Int}(10), pid) は、pid に存在するサイズ 10 で Int 型のチャンネルに対する参照を返します。
Base.fetch ── メソッド
fetch(x::Future)
Future を待機し、その値を取得します。フェッチされた値はローカルでキャッシュされ、一度呼び出した後に同じ参照に対して fetch を呼び出すとキャッシュされた値が返ります。リモートが返した値が例外の場合は、リモートの例外とバックトレースを捕捉した RemoteException を送出します。
Base.fetch ── メソッド
fetch(c::RemoteChannel)
RemoteChannel を待機し、その値を取得します。リモートで発生した例外は Future と同じように処理されます。フェッチした要素を削除しません。
Distributed.remotecall ── メソッド
Distributed.remotecall_wait ── メソッド
remotecall_wait(f, id::Integer, args...; kwargs...)
メッセージを一つだけ用いる高速な wait(remotecall(...)) を行います。関数はワーカー ID が id の Worker で実行されます。キーワード引数があれば f にそのまま渡されます。
wait, remotecall も参照してください。
Distributed.remotecall_fetch ── メソッド
remotecall_fetch(f, id::Integer, args...; kwargs...)
メッセージを一つだけ使って fetch(remotecall(...)) を行います。キーワード引数があれば f にそのまま渡されます。リモートで発生した例外はキャプチャされて RemoteException として送出されます。
参照:fetch, remotecall
例
$ julia -p 2
julia> remotecall_fetch(sqrt, 2, 4)
2.0
julia> remotecall_fetch(sqrt, 2, -4)
ERROR: On worker 2:
DomainError with -4.0:
sqrt will only return a complex result if called with a complex argument. Try sqrt(Complex(x)).
...
Distributed.remote_do ── メソッド
remote_do(f, id::Integer, args...; kwargs...) -> nothing
ワーカー id で f を非同期に実行します。remotecall と異なり、計算の結果は保存されず、終了を待つことはできません。
成功した remote_do の呼び出しは実行の要求がリモートノードで受け付けられたことを意味します。
同じワーカーに対する連続した remotecall は呼び出しの順番でシリアライズされますが、リモートワーカーにおける実行の順序は決まっていません。例えば remote_do(f1, 2); remotecall(f2, 2); remote_do(f3, 2) とすると f1, f2, f3 という順序で呼び出しがシリアライズされますが、ワーカー 2 で f1 が f3 より先に実行されることは保証されません。
f から送出された任意の例外はリモートワーカーの stderr に出力されます。
キーワード引数があれば f にそのまま渡されます。
Base.put! ── メソッド
put!(rr::RemoteChannel, args...)
RemoteChannel に渡された値を格納します。チャンネルが満杯なら、空間が空くまでブロックします。第一引数を返します。
Base.take! ── メソッド
Base.isready ── メソッド
isready(rr::RemoteChannel, args...)
RemoteChannel が値を保持しているかどうかを判定します。この関数に頼って処理を行うと競合状態が起こる可能性があることに注意してください。isready を呼び出したプロセスが返り値を受け取るときには条件が成り立たなくなっている場合があるためです。ただし Future は一度だけ値が設定されるので、Future に対しては安全に使用できます。
Base.isready ── メソッド
Distributed.AbstractWorkerPool ── 型
AbstractWorkerPool
WorkerPool や CachingPool といったワーカープールを表す上位型です。AbstractWorkerPool が実装すべきメソッドは次の通りです:
-
push!: overall プール (利用可能なワーカーおよび実行中のワーカーが入るプール) に新しいワーカーを追加します。 -
put!: ワーカーを available プールに入れます。 -
take!: available プールから (リモートコールのために) ワーカーを取得します。 -
length: overall プールにある利用可能なワーカーの個数を返します。 -
isready: プールに対するtake!がブロックするならfalseを、ブロックしないならtrueを返します。
これらのメソッドの (AbstractWorkerPool に対する) デフォルト実装は次のフィールドを必要とします:
channel::Channel{Int}workers::Set{Int}
channel には利用可能なワーカーのプロセス ID が格納されます。workers はこのプールに関連付いた全てのワーカーのプロセス ID の集合です。
Distributed.WorkerPool ── 型
WorkerPool(workers::Vector{Int})
ワーカー ID のベクトルから WorkerPool を作成します。
例
$ julia -p 3
julia> WorkerPool([2, 3])
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([2, 3]), RemoteChannel{Channel{Any}}(1, 1, 6))
Distributed.CachingPool ── 型
CachingPool(workers::Vector{Int})
AbstractWorkerPool の実装です。remote, remotecall_fetch, pmap (および関数をリモートで実行する他のリモートコール) では、CachingPool 型のワーカープールを使ってワーカーノードにある関数 (特に大きなデータをキャプチャする可能性のあるクロージャ) をシリアライズ/デシリアライズした形で保持することで効率を向上できます。
リモートキャッシュは返される CachingPool オブジェクトと同じ寿命を持ちます。それよりも早くキャッシュを削除するには clear!(pool) を使ってください。
クロージャでグローバル変数をキャプチャするとき、キャプチャされるのは束縛だけであり、データはキャプチャされません。グローバル変数のデータもキャプチャするには let ブロックを使ってください。
例
const foo = rand(10^8);
wp = CachingPool(workers())
let foo = foo
pmap(wp, i -> sum(foo) + i, 1:100);
end
このコードは foo を各ノードに一度だけ送信します。
Distributed.default_worker_pool ── 関数
default_worker_pool()
アイドル状態の worker を保持する WorkerPool です。remote(f) と pmap がデフォルトで利用します。
例
$ julia -p 3
julia> default_worker_pool()
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:3), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 4))
Distributed.clear! ── メソッド
Distributed.remote ── 関数
remote([p::AbstractWorkerPool], f) -> Function
remotecall_fetch を使って f を利用可能なワーカーで実行する無名関数を返します。WorkerPool 型の値 p が与えられれば、f はそこから取ってきたワーカーで実行されます。
Distributed.remotecall ── メソッド
remotecall(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future
WorkerPool を受け取るバージョンの remotecall(f, pid, ....) です。pool でワーカーが利用可能になるのを待ち、利用可能になったらワーカーを取得して remotecall を実行します。
例
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> f = remotecall(maximum, wp, A)
Future(2, 1, 6, nothing)
この例では、ID 1 のプロセスが呼び出したタスクが ID 2 のプロセスで実行されます。
Distributed.remotecall_wait ── メソッド
remotecall_wait(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future
WorkerPool を受け取るバージョンの remotecall_wait(f, pid, ....) です。pool でワーカーが利用可能になるのを待ち、利用可能になったらワーカーを取得して remotecall_wait を実行します。
例
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> f = remotecall_wait(maximum, wp, A)
Future(3, 1, 9, nothing)
julia> fetch(f)
0.9995177101692958
Distributed.remotecall_fetch ── メソッド
remotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...) -> result
WorkerPool を受け取るバージョンの remotecall_fetch(f, pid, ....) です。pool でワーカーが利用可能になるのを待ち、利用可能になったらワーカーを取得して remotecall_wait を実行します。
例
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> remotecall_fetch(maximum, wp, A)
0.9995177101692958
Distributed.remote_do ── メソッド
remote_do(f, pool::AbstractWorkerPool, args...; kwargs...) -> nothing
WorkerPool を受け取るバージョンの remote_do(f, pid, ....) です。pool でワーカーが利用可能になるのを待ち、利用可能になったらワーカーを取得して remotecall_do を実行します。
Distributed.@spawnat ── マクロ
@spawnat p expr
式を包むクロージャを作成し、それをプロセス p で非同期に実行します。返り値に対する Future を返します。p がクオートされたリテラルシンボル :any なら、expr を実行するプロセスはシステムによって自動的に選択されます。
例
julia> addprocs(3);
julia> f = @spawnat 2 myid()
Future(2, 1, 3, nothing)
julia> fetch(f)
2
julia> f = @spawnat :any myid()
Future(3, 1, 7, nothing)
julia> fetch(f)
3
引数 :any は Julia 1.3 以降でサポートされます。
Distributed.@fetch ── マクロ
Distributed.@fetchfrom ── マクロ
Distributed.@distributed ── マクロ
@distributed
分散メモリを使った並列 for ループです。次の形式で使います:
@distributed [reducer] for var = range
body
end
指定された区間 range が分割されて各ワーカーに割り当てられ、本体 body の実行はワーカーでローカルに行われます。省略可能な縮約関数 reducer を指定すると、@distributed はローカルな縮約を各ワーカーで実行し、それから呼び出したプロセスで最後の縮約を実行します。
縮約関数を使用しないと、@distributed は非同期に実行されます。つまり利用可能な全てのワーカーで独立したタスクが起動し、終了を待つことなくマクロはすぐに返ります。タスクの終了を待つには、次のように @sync を最初に付けてください:
@sync @distributed for var = range
body
end
Distributed.@everywhere ── マクロ
@everywhere [procs()] expr
procs に含まれる全てのプロセスで式 expr を Main の下で実行します。任意のプロセスで起きたエラーは収集され、単一の CompositeException として送出されます。例えば
@everywhere bar = 1
は Main.bar を現在起動している全てのプロセスで定義します。後から (addprocs() などで) 追加されるプロセスでは Main.bar は定義されません。
@spawnat とは異なり、@everywhere はローカル変数を一切キャプチャしません。その代わり、ローカル変数は補間を使ってブロードキャストできます:
foo = 1
@everywhere bar = $foo
省略可能な引数 procs を使うと、式を実行するプロセスをプロセス全体の部分集合にできます。
remotecall_eval(Main, procs, expr) の呼び出しと等価です。
Distributed.clear! ── メソッド
Distributed.remoteref_id ── 関数
remoteref_id(r::AbstractRemoteRef) -> RRID
Future と RemoteChannel は次のフィールドで識別されます:
-
where: リモートリファレンスが指すオブジェクト/ストレージが実際に存在するノードです。 -
whence: リモートリファレンスが作成されたノードです。これはリモートリファレンスが指すオブジェクトが実際に存在するノードと異なる概念であることに注意してください。例えばマスタープロセスでRemoteChannel(2)とすると、whereは2であるのに対してwhenceは1となります。 -
id: リモートリファレンスの識別子です。識別子whenceを持つワーカーで作成されるリモートリファレンスは全て異なるidを持ちます。
まとめると、whence と id があれば全てのワーカーが持つリモートリファレンスの中から一つをユニークに識別できます。
remoteref_id はリモートリファレンスの whence と id を包んだ RRID オブジェクトを返す低水準 API です。
Distributed.channel_from_id ── 関数
channel_from_id(id) -> c
remoteref_id が返す id を受け取って、対応する AbstractChannel を返す低水準 API です。この関数は対応するチャンネルが存在するノードでのみ呼び出せます。
Distributed.cluster_cookie ── メソッド
Distributed.cluster_cookie ── メソッド
ClusterManager インターフェース
このインターフェースは様々なクラスター環境で Julia ワーカーを起動・管理する仕組みを提供します。Base には二種類のクラスターマネージャがあります: 同じホストで追加のワーカーを起動する LocalManager と、SSH を使ってリモートホストでワーカーを起動する SSHManager です。プロセス間の接続とメッセージのトランスポートには TCP/IP ソケットが使われますが、クラスターマネージャは異なるトランスポートを提供することもできます。
Distributed.ClusterManager ── 型
ClusterManager
クラスターマネージャを表す上位型です。ワーカープロセスをクラスターとして管理します。クラスターマネージャはワーカーの追加・削除・通信の方式を実装します。SSHManager と LocalManager は ClusterManager の部分型です。
Distributed.WorkerConfig ── 型
WorkerConfig
クラスターに追加されるワーカーを制御するために ClusterManager が使う型です。いくつかのフィールドはホストにアクセスするために全てのクラスターマネージャによって利用されます:
io: ワーカーにアクセスするときに使う接続 (IOもしくはNothing)host: ホストのアドレス (AbstractStringもしくはNothing)port: ワーカーに接続するときに使うホストのポート番号 (IntもしくはNothing)
いくつかのフィールドはクラスターマネージャが初期化済みのホストにワーカーを追加するときに利用されます:
count: ホストで起動するワーカーの個数exename: ホストにおける Julia 実行形式のパス (デフォルトは"$(Sys.BINDIR)/julia"もしくは"$(Sys.BINDIR)/julia-debug")exeflags: Julia をリモートで起動するときに使うフラグ
userdate フィールドは外部マネージャが各ワーカーの情報を格納するときに利用されます。
いくつかのフィールドは SSHManager およびそれに似たマネージャによって利用されます:
-
tunnel:true(トンネリングを行う)・false(トンネリングを行わない)・nothing(マネージャのデフォルトを使う) のいずれか multiplex:true(SSH トンネリングで多重化を使う) もしくはfalse(使わない)forward: SSH の-Lオプションで使われるフォワーディングオプションbind_addr: バインドを行うリモートホストのアドレスsshflags: SSH 接続を確立するときに使うフラグmax_parallel: ホストに並列に接続できるワーカーの個数の最大値
いくつかのフィールドは LocalManager と SSHManager の両方によって利用されます:
connect_at: セットアップされる通信がワーカー-ワーカー間とドライバ-ワーカー間のどちらか-
process: 接続されるプロセス (通常はaddprocs中にマネージャが値を代入する) ospid: ホスト OS におけるプロセス ID (ワーカープロセスを停止するときに使われる)environ:LocalManager/SSHManagerが一時的な情報を格納するのに使うプライベートな辞書-
ident:ClusterManagerがワーカーの識別に使う ID connect_idents: 独自のトポロジーを使っているとき、このワーカーが接続しなければならないワーカーの ID のリストenable_threaded_blas: ワーカーでスレッド化された BLAS を使うかどうか (true,false,nothingのいずれか)
Distributed.launch ── 関数
launch(manager::ClusterManager,
params::Dict,
launched::Array,
launch_ntfy::Condition)
クラスターマネージャによって実装されます。クラスターマネージャはこの関数で起動した Julia ワーカーのそれぞれに対して WorkerConfig を launched に追加し、ワーカーが全て起動したら launch_ntfy を notify するべきです。この関数は必ず、マネージャが要求したワーカーの起動が全て完了してから返る必要があります。params は addprocs を呼び出すときに使われたキーワード引数を全て納めた辞書です。
Distributed.manage ── 関数
manage(manager::ClusterManager, id::Integer, config::WorkerConfig. op::Symbol)
クラスターマネージャによって実装されます。ワーカーでライフタイムに関するイベントが起こるたびにマスタープロセスで呼び出され、そのとき op が何が起こったかを示します:
:register/:deregister: ワーカーが Julia のワーカープールから追加/削除されたことを示します。:interrupt:interrupt(workers)が呼ばれたことを示します。ClusterManagerは適切なワーカーに停止シグナルを送るべきです。:finalize: クリーンアップ用途で使われます。
Sockets.connect ── メソッド
connect(manager::ClusterManager,
pid::Int,
config::WorkerConfig) -> (instrm::IO, outstrm::IO)
独自のトランスポートを使うクラスターマネージャで実装されます。この関数は config が指定する論理接続を ID が pid のワーカーに対して確立し、IO オブジェクトの組を返すべきです。pid から現在のプロセスへのメッセージは instrm から読めるようになり、pid へのメッセージは outstrm に書き込まれます。独自のトランスポート実装はメッセージの送受信が完全な形で順序を保って行われることを保証する必要があります。connect(manager::ClusterManager.....) は TCP/IP ソケット接続をワーカー間にセットアップします。
Distributed.init_worker ── 関数
init_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager())
独自のトランスポートを実装するクラスターマネージャによって呼ばれます。この関数は新しく起動されたプロセスをワーカーとして初期化します。コマンドライン引数 --worker[=<cookie>] を指定すると、プロセスはトランスポートに TCP/IP ソケットを使うワーカーとして初期化されます。cookie は cluster_cookie を表します。
Distributed.start_worker ── 関数
start_worker([out::IO=stdout],
cookie::AbstractString=readline(stdin);
close_stdin::Bool=true,
stderr_to_stdout::Bool=true)
start_worker は内部で使われる関数であり、TCP/IP で接続するワーカープロセスのデフォルトのエントリーポイントです。Julia クラスターワーカーとしてプロセスをセットアップします。
host:port の情報がストリーム out (デフォルトは stdout) に書き込まれます。
この関数は必要ならクッキーを stdin から読み、空いているポート (コマンドライン引数 --bind-to が指定されればそのポート) にリッスンし、内向きの TCP 接続とリクエストを処理するタスクをスケジュールします。さらに stdin を閉じ、stderr を stdout にリダイレクトします (どちらも省略可能)。
この関数は返りません。
Distributed.process_messages ── 関数
process_messages(r_stream::IO, w_stream::IO, incoming::Bool=true)
独自のトランスポートを使うクラスターマネージャによって呼ばれます。この関数は独自のトランスポート実装がリモートワーカーからの最初のメッセージを受信したときに呼ばれるべきです。独自のトランスポートはリモートワーカーへの論理接続を管理し、二つの IO オブジェクトを提供しなければなりません。IO オブジェクトの一つは内向きのメッセージのためで、もう一つはリモートワーカーに向かうメッセージのためです。incoming が true だと、リモートピアが接続を開始したことを表します。接続を開始した方が認証ハンドシェイクを実行するためのクラスタークッキーと Julia のバージョン番号を送信します。
cluster_cookie も参照してください。