マルチプロセッシングと分散計算
分散メモリ並列計算は Distributed
モジュールとして実装され、Julia の標準ライブラリに付属します。
多くの現代的なコンピューターは複数の CPU を使って処理を行い、複数のコンピューターが組み合わさってクラスターとなることもあります。複数の CPU のパワーを上手く活用すれば、多くの計算を素早く終わらわせることができます。計算性能に影響を及ぼす主要な因子は二つあります: CPU 自身の計算速度と、CPU からメモリへのアクセス速度です。コンピューターのクラスターでは同じコンピューター (ノード) の RAM に対するアクセスが最も高速なのは自然に理解できると思います。しかし意外にも、典型的なマルチコアのノートパソコンでも同じ問題が起こります: メインメモリとキャッシュではアクセス速度が異なるためです。そのため優れたマルチプロセッシング環境では、特定のメモリ領域をどの CPU が保持するか (メモリのオーナーシップ) を制御できる必要があります。Julia はメッセージパッシングを使ったマルチプロセッシング環境を提供するので、異なるメモリドメインを持つ複数のプロセスでプログラムを同時に実行することが可能です。
Julia のメッセージパッシングの実装は MPI1 といった他の環境とは異なります。Julia における通信は一般に片方向 (one-sided) です。これは二つのプロセスが絡む通信操作であってもプログラマーが管理しなければならないプロセスは一つだけであることを意味します。さらに、通信に関する操作は通常「メッセージの送信」や「メッセージの受信」という形をしておらず、「ユーザー関数の呼び出し」のようなより高水準な操作の形をしています。
Julia における分散プログラミングの基礎となるのはリモートリファレンス (remote reference) とリモートコール (remote call) という二つのプリミティブです。リモートリファレンスは特定のプロセスが持つオブジェクトを指すオブジェクトであり、どのプロセスからでも利用できます。リモートコールは一つのプロセスが行う関数呼び出しのリクエストであり、関数と (同じプロセスにあるとは限らない) 引数を指定して行います。
リモートリファレンスには二つの種類があります: Future
と RemoteChannel
です。
リモートコールは Future
を返り値として返します。リモートコールはすぐに値を返すので、呼び出したプロセスはリモートコールがどこか別の場所で行われている間に次の処理へ進むことができます。リモートコールの終了まで待機するには、呼び出しが返す Future
に対して wait
を行い、その後 fetch
をすれば返り値を受け取れます。
これに対して RemoteChannel
には何度でも書き込みが可能です。例えば複数のプロセスが一つの RemoteChannel
を参照しながら協調して処理を行うといったことができます。
各プロセスには識別子 (ID) が関連付きます。Julia の対話プロンプトを提供するプロセスの ID は必ず 1 です。並列操作を行ったときにデフォルトで使われるプロセスを、まとめてワーカー (worker) と呼びます。プロセスが一つしかないときは ID が 1 のプロセスがワーカーとみなされ、そうでない場合は ID が 1 でない全てのプロセスがワーカーとみなされます。そのため、pmap
といった並列処理メソッドの恩恵を得るためには少なくとも二つのプロセスが必要です。長い計算をワーカーで行っている間にメインプロセスで他のことを行いたいだけなら、プロセスを一つ追加するだけでも十分です。
実際に試してみましょう。Julia を julia -p n
で起動すると、n
個のワーカープロセスがローカルのマシンに起動します。通常は n
をマシンの CPU スレッド (論理コア) 数と一致させるのが理にかなっています。コマンドライン引数で -p
を指定すると Distributed
が自動的にインポートされることに注意してください。
$ ./julia -p 2
julia> r = remotecall(rand, 2, 2, 2)
Future(2, 1, 4, nothing)
julia> s = @spawnat 2 1 .+ fetch(r)
Future(2, 1, 5, nothing)
julia> fetch(s)
2×2 Array{Float64,2}:
1.18526 1.50912
1.16296 1.60607
remotecall
の第一引数は呼び出す関数です。Julia における並列プログラミングの大部分は実行するプロセスの ID や個数を指定しませんが、remotecall
はそういった数値を指定できる低水準なインターフェースとなっています。remotecall
の第二引数は処理を行うプロセスの ID で、それ以降の引数は呼ばれる関数へ渡される引数です。
コードを見ればわかると思いますが、一行目で乱数が入った 2×2 行列の構築する命令をプロセス 2 に出し、二行目でその行列の各要素に 1 を加えています。この二つの計算結果として r
, s
という二つの Future
が手に入ります。@spawnat
マクロは第二引数の式を第一引数が指定するプロセスで評価します。
リモートノードで計算した値をすぐに利用したい場合もあるでしょう。典型的なのがローカルで次に行う計算に必要なデータを取得するためにリモートのオブジェクトを読むときです。このための関数が remotecall_fetch
です。この関数が行う処理は fetch(remotecall(...))
と等価ですが、二つの関数を別々に呼ぶより効率的に行えます:
julia> remotecall_fetch(getindex, 2, r, 1, 1)
0.18526337335308085
以前に説明したように、getindex(r,1,1)
は r[1,1]
と等価です。そのためこの呼び出しは r
の最初の要素を返します。
呼び出しを簡単にするために、@spawnat
にはシンボル :any
を渡せるようになっています。:any
を渡すと処理を行うプロセスが自動的に選ばれます:
julia> r = @spawnat :any rand(2,2)
Future(2, 1, 4, nothing)
julia> s = @spawnat :any 1 .+ fetch(r)
Future(3, 1, 5, nothing)
julia> fetch(s)
2×2 Array{Float64,2}:
1.38854 1.9098
1.20939 1.57158
1 .+ r
ではなく 1 .+ fetch(r)
としていることに注目してください。この例では二つのコードがそれぞれどこで実行されるか分からないので、加算を行うプロセスへ r
を移動させなければならない可能性があります。ただこの場合には二つ目の @spawnat
が気を利かせて r
があるプロセスで計算を始めてくれるので、1 .+ fetch(r)
に含まれる fetch
は noop (何もしない) となります。
(@spawnat
が組み込みの構文ではなく Julia で定義されるマクロであることは注目に値します。こういった構文を自分で定義することもできます。)
一度 fetch
された Future
の値がローカルにキャッシュされることは覚えておくべき重要な事実です。以降の fetch
はネットワークホップを意味しません。全ての Future
の参照がフェッチされると、リモートで保存される値は削除されます。
パッケージの読み込みとコードの可視性
コードを他のプロセスで実行するときは、実行を行う全てのプロセスでそのコードが利用可能である必要があります。例えば、次のコードを Julia プロンプトに入力してみてください:
julia> function rand2(dims...)
return 2*rand(dims...)
end
julia> rand2(2,2)
2×2 Array{Float64,2}:
0.153756 0.368514
1.15119 0.918912
julia> fetch(@spawnat :any rand2(2,2))
ERROR: RemoteException(2, CapturedException(UndefVarError(Symbol("#rand2"))
Stacktrace:
[...]
このコードでプロセス 1 は rand2
関数を知っていますが、プロセス 2 は知っていません。
コードはファイルやパッケージから読み込むことがほとんどですが、コードを読み込むプロセスの選択では非常に柔軟な制御が可能です。DummyModule.jl
という名前のファイルに次のコードが含まれているとします:
module DummyModule
export MyType, f
mutable struct MyType
a::Int
end
f(x) = x^2+1
println("loaded")
end
全てのプロセスから MyType
を参照できるようにするには、DummyModule.jl
を全てのプロセスで読み込む必要があります。include("DummyModule.jl")
を呼び出しても一つのプロセスで読み込まれるだけなので、@everywhere
マクロが必要です。次の例では julia -p 2
として起動したプロンプトを使っています:
julia> @everywhere include("DummyModule.jl")
loaded
From worker 3: loaded
From worker 2: loaded
通常通り、これだけでは DummyModule
はプロセスのスコープに入らず、using
か import
が必要です。また一つのプロセスのスコープに DummyModule
が持ち込まれたとしても、他のプロセスのスコープに持ち込まれることはありません:
julia> using .DummyModule
julia> MyType(7)
MyType(7)
julia> fetch(@spawnat 2 MyType(7))
ERROR: On worker 2:
UndefVarError: MyType not defined
⋮
julia> fetch(@spawnat 2 DummyModule.MyType(7))
MyType(7)
ただし、次に示すように、DummyModule
を読み込んだプロセスに対してスコープに入っていない MyType
型の値を送るといったことは可能です:
julia> put!(RemoteChannel(2), MyType(7))
RemoteChannel{Channel{Any}}(2, 1, 13)
Julia を開始するときに複数のプロセスでファイルを事前ロードするには、コマンドライン引数 -L
を使います。さらに全体の計算の制御をドライバスクリプト (driver.jl
) にまとめれば、次のコマンドとなります:
$ julia -p <n> -L file1.jl -L file2.jl driver.jl
この例でドライバスクリプトを実行するプロセスの ID は 1 であり、対話プロンプトを提供するプロセスと同じです。
最後に、もし DummyModule.jl
が単一のファイルではなくパッケージなら、using DummyModule
は DummyModule.jl
を全てのプロセスで読み込みます。ただしスコープに入るのは using
を呼んだプロセスでだけです。
ワーカープロセスの開始と管理
通常の方法でインストールされた Julia は二種類のクラスターに対する組み込みのサポートを持ちます:
- コマンドライン引数
-p
で指定されるローカルクラスター: 上記のコードで使ったもの。 - コマンドライン引数
--machine-file
で起動される複数のマシンからなるクラスター: パスワードを使用しないssh
ログインを使って Julia ワーカープロセスが起動される。Julia は指定されたマシン上で現在のホストと同じパスで開始される。
addprocs
, rmprocs
, workers
といった関数を使うとプログラムからプロセスの追加・削除・問い合わせが可能です:
julia> using Distributed
julia> addprocs(2)
2-element Array{Int64,1}:
2
3
addprocs
を呼ぶためには Distributed
モジュールを明示的にマスタープロセスを読み込む必要があります。ワーカープロセスでは Distributed
は最初から利用可能です。
ワーカーはスタートアップスクリプト ~/.julia/config/startup.jl
を実行しないこと、そしてワーカーはグローバルな状態 (グローバル変数・新しいメソッドの定義・読み込まれたモジュール) を実行中の他のプロセスと同期しないことに注意してください。ワーカーを特定の環境で初期化するには addprocs(exeflags="--project")
を使った上で @everywhere using <modulename>
または @everywhere include("file.jl")
とします。
独自の ClusterManager
を書けば他の種類のクラスターをサポートできます。ClusterManager
の節でこの話題を詳しく説明します。
データの移動
メッセージの送信とデータの移動は分散プログラムにおける主要なオーバーヘッドです。そのため送信されるメッセージの数とデータの量を削減することが高い性能とスケーラビリティを達成する上で非常に重要となります。このためには、Julia の様々な分散プログラミング構文が行うデータの移動に関して理解しておかなければなりません。
fetch
関数は明示的なデータ移動を行う操作とみなせます。この関数はオブジェクトをローカルマシンに移動させることを直接指示するからです。@spawnat
(およびこれに似た構文) もデータを移動しますが、何が移動するかは fetch
の場合ほど明快ではありません。@spawnat
が暗黙のデータ移動操作を意味する場合があるからです。例えば、乱数行列を作成して二乗する二つのアプローチを考えます:
-
一つ目の方法:
julia> A = rand(1000,1000); julia> Bref = @spawnat :any A^2; [...] julia> fetch(Bref);
-
二つ目の方法:
julia> Bref = @spawnat :any rand(1000,1000)^2; [...] julia> fetch(Bref);
二つのコードに大きな違いはないと思うかもしれませんが、@spawnat
の振る舞いにより大きな違いが存在します。一つ目の方法では乱数行列がローカルに構築され、それから二乗を実行するプロセスに送信されます。これに対して二つ目の方法では乱数行列の構築と二乗が同じプロセスで行われます。そのため二つ目の方法はデータの通信量が一つ目の方法よりずっと少なく済みます。
この簡単な例では二つの方法の違いを見つけて正しい方を選ぶのは難しくありません。しかし現実の問題では、正しいデータ移動を設計するのに様々なことを考えなければならず、いくらかの計測も必要になることでしょう。例えば一つ目のプロセスが行列 A
を必要とするなら一つ目の方法が優れているかもしれませんし、A
の計算に時間がかかるなら別のプロセスへの移動は避けられません。あるいは並列性はそもそも必要とならず、@spawnat
や fetch(Bref)
を使わない方が性能が高くなる可能性さえあります。また rand(1000,1000)
の部分がもっと時間のかかる処理であれば、その一つのステップに対して @spawnat
を行うのが理にかなっているでしょう。
グローバル変数
@spawnat
を通じてリモートで実行される式、および remotecall
を通じてリモートで実行されるクロージャではグローバル変数を参照できます。そのとき Main
モジュールに含まれるグローバル束縛は他のモジュールに含まれるグローバル束縛と少し異なった扱いを受けます。例えばコードスニペット
A = rand(10,10)
remotecall_fetch(()->sum(A), 2)
において、 sum
はリモートプロセスで定義されている必要があります。A
はローカルのワークスペースで定義されるグローバル変数であることに注目してください。ワーカー 2 は Main
内に A
という名前の変数を持ちません。()->sum(A)
というクロージャをワーカー 2 に渡すと Main.A
がそこで定義され、Main.A
は remotecall_fetch
が終わった後もワーカー 2 に存在し続けます。Main
モジュールのグローバルな参照を含むリモートコールは次のように扱われます:
-
リモートコールがそれを実行するワーカーで定義されていないグローバル束縛を参照しているなら、ワーカーはその束縛を作成します。
-
グローバル定数はリモートノードでも定数として宣言されます。
-
グローバルな値は変更された後にリモートコールが起こったときにだけワーカーに再送信されます。またクラスターはノード間でグローバル束縛を同期しません。例えば
A = rand(10,10) remotecall_fetch(()->sum(A), 2) # ワーカー 2 A = rand(10,10) remotecall_fetch(()->sum(A), 3) # ワーカー 3 A = nothing
を実行すると、ワーカー 2 とワーカー 3 が持つ
Main.A
は異なる値となり、ノード 1 のMain.A
はnothing
となります。
気が付いたかもしれませんが、マスターではグローバル変数に関連付くメモリの回収・再利用が可能であるのに対して、ワーカーではグローバル束縛が有効であり続けるために不必要になったメモリの回収・再利用が不可能です。clear!
を使うと、使わなくなったリモートノード上のグローバル変数に nothing
を代入できます。こうすれば通常のガベージコレクションサイクルでそのグローバル変数が持っていたメモリが解放されます。
そのため、リモートコールがグローバル変数を参照するときは注意が必要です。可能ならグローバル変数を一切使わない方がよいでしょう。グローバル変数を参照しなければならないときは、let
ブロックを使ってグローバル変数をローカル変数として使用できないか考えてみてください。
let
を使ってグローバル変数を "ローカルに" 使う例を示します:
julia> A = rand(10,10);
julia> remotecall_fetch(()->A, 2);
julia> B = rand(10,10);
julia> let B = B
remotecall_fetch(()->B, 2)
end;
julia> @fetchfrom 2 InteractiveUtils.varinfo()
name size summary
––––––––– ––––––––– ––––––––––––––––––––––
A 800 bytes 10×10 Array{Float64,2}
Base Module
Core Module
Main Module
最後の出力から分かるように、グローバル変数 A
はワーカー 2 で定義されていますが、ローカル変数としてキャプチャされて使われた B
はワーカー 2 に存在しません。
並列 for ループと並列マップ
幸い、データの移動を必要としない並列計算にも実用的なものが存在します。例えばよく使われるモンテカルロシミュレーションでは、複数のプロセスが独立したシミュレーションの試行を同時に行えます。@spawnat
を使って二つのプロセスでコインを投げるシミュレーションを書いてみましょう。まず count_heads.jl
で次の関数を定義します:
function count_heads(n)
c::Int = 0
for i = 1:n
c += rand(Bool)
end
c
end
count_heads
関数は n
個のランダムなビットを足しているだけです。この関数を使って二つのマシンで試行を行い、その結果を足すには次のようにします:
julia> @everywhere include_string(Main,
$(read("count_heads.jl", String)),
"count_heads.jl")
julia> a = @spawnat :any count_heads(100000000)
Future(2, 1, 6, nothing)
julia> b = @spawnat :any count_heads(100000000)
Future(3, 1, 7, nothing)
julia> fetch(a)+fetch(b)
100001564
この例は頻繁に使われる強力なプログラミングパターンを示しています: 多くの反復をいくつかのプロセスを使って独立に実行し、最後に結果を何らかの関数で組み合わせるというパターンです。各プロセスからの結果を組み合わせる最後の処理を縮約 (reduction) と呼びます。こう呼ばれているのは、この操作が一般にテンソルのランクを落とす (reduce する) ためです。例えば数値のベクトルは単一の数値となり、行列は単一の行や列となります。リダクションはループの中で x = f(x,v[i])
という形をしていることが多く、ここで x
が最終的な縮約結果の値、f
が縮約を行う関数、そして v[i]
が縮約される要素を表します。縮約の順序が結果に影響しないように、f
が結合律を満たすことが望ましいとされます。
count_heads
が持つこのパターンは一般化できます。上記の例では @spawnat
文を使っているので並列性が文の数に制限されますが、並列 for ループを使えば分散メモリ上で任意の個数のプロセスを使って処理を実行できます。Julia では並列 for ループを @distributed
マクロで次のように書きます:
nheads = @distributed (+) for i = 1:200000000
Int(rand(Bool))
end
この構文が実装するのは、反復を分割して複数のプロセスに割り当て、反復の結果を指定された縮約演算 (この例では +
関数) で組み合わせるというパターンです。ループの最後の式の値が各反復の結果となり、縮約の最終的な結果が並列 for ループ全体の式の評価結果となります。
並列 for ループは通常の逐次 for ループのような見た目をしていますが、その振る舞いは大きく異なることに注意してください。特に気を付けなければならないのが、反復が指定した順序で起こるとは限らないこと、そして変数や配列への書き込みが (別のプロセスで起こるために) グローバルな影響を持たないことです。並列 for ループの内部で使われる変数は全てコピーされ、実行を行う各プロセスにブロードキャストされます。
例えば、次のコードは意図通りに動作しません:
a = zeros(100000)
@distributed for i = 1:100000
a[i] = i
end
全てのプロセスが個別に a
のコピーを持つことになるので、このコードでは a
を初期化できません。こういった並列 for ループは避けるべきです。ですが幸いにも、SharedArrays
モジュールが提供する共有配列を使えばこの制限を回避できます:
using SharedArrays
a = SharedArray{Float64}(10)
@distributed for i = 1:10
a[i] = i
end
並列 for ループで外部変数を使ったとしても、その変数が読み込み専用であれば何の問題もありません:
a = randn(1000)
@distributed (+) for i = 1:100000
f(a[rand(1:end)])
end
このコードを実行するとベクトル a
が全てのプロセスに共有され、各プロセスがランダムに選んだ a
の要素に対して f
を計算します。
なお縮約演算は必要なければ省略でき、そのとき並列 for ループは非同期に実行されます。言い換えると、縮約演算を省略した並列 for ループは利用可能なワーカーで独立したタスクを起動し、その終了を待つことなく Future
の配列をすぐに返します。呼び出し側は Future
に対して fetch
を呼べば処理の終了を待機できます。あるいは @sync
をループの最初に付けて @sync @distributed for
とするやり方もあります。
並列計算において縮約演算が必要とならず、何らかの区間に属する全ての整数 (より一般には何らかのコレクションに属する全ての要素) に対して関数を適用したいだけである場合があります。これは縮約とは異なる並列計算パターンであり、並列マップ (parallel map) と呼ばれます。Julia は並列マップを pmap
関数で実装します。例えば 10 個の大規模な乱数行列の特異値を並列に計算するには次のようにします:
julia> M = Matrix{Float64}[rand(1000,1000) for i = 1:10];
julia> pmap(svdvals, M);
Julia の pmap
はそれぞれの関数呼び出しが大規模な処理をするものとして設計されています。これに対して @distributed for
は各反復が小さな処理 (例えば二つの数字の加算) を行う状況を扱います。pmap
と @distributed for
はどちらもワーカープロセスだけを使って並列計算を行い、@distributed for
では最後の縮約で呼び出したプロセスが使われます。
リモートリファレンスと AbstractChannel
リモートリファレンスは必ず AbstractChannel
の実装を指します。
AbstractChannel
を実装する具象型 (例えば Channel
) には put!
, take!
, fetch
, isready
, wait
の実装が必要です。Future
が指すリモートオブジェクトは Channel{Any}(1)
(Any
型のオブジェクトを保持するサイズ 1 の Channel
) に保持されます。
RemoteChannel
は再書き込み可能であり、任意の型とサイズを持つチャンネルあるいは他の AbstractChannel
の実装を指すことができます。
コンストラクタ RemoteChannel(f::Function, pid)
を使うと、特定の型を持つ複数の値を保持するチャンネルへのリモートリファレンスを構築できます。f
はプロセス pid
上で実行される関数であり、AbstractChannel
のインスタンスを返す必要があります。
例えば RemoteChannel(()->Channel{Int}(10), pid)
はサイズ 10 で Int
型のチャンネルへのリモートリファレンスを返します。このチャンネルは ID が pid
のワーカーに存在します。
RemoteChannel
に対するメソッド put!
, take!
, fetch
, isready
, wait
はリモートプロセス上のバッキングストアへ転送されます。
このため RemoteChannel
はユーザーが実装した AbstractChannel
オブジェクトを参照するために利用できます。この機能の簡単な例として、Julia のサンプルコードレポジトリの dictchannel.jl
に辞書をリモートの保存場所に使う AbstractChannel
があります。
Channel
と RemoteChannel
の違い
-
Channel
はプロセス内でローカルです。例えばワーカー 3 にあるChannel
をワーカー 2 が直接参照することはできません。これに対してRemoteChannel
はワーカーをまたいで値をやり取りできます。 -
RemoteChannel
はChannel
に対するハンドルと考えることができます。 -
RemoteChannel
にはバッキングストア (例えばChannel
) を持つプロセスの ID が関連付きます。 - 任意のプロセスは自身が参照できる
RemoteChannel
にデータを入れたり、そこからデータを出したりできます。RemoteChannel
は自身が関連付いたプロセスとのデータの送受信を自動的に行います。 -
Channel
をシリアライズするとチャンネルが持つデータもシリアライズされます。その結果をデシリアライズすると、オリジナルのオブジェクトと事実上同一のコピーが手に入ります。 - これに対して
RemoteChannel
のシリアライズでは、RemoteChannel
が参照するChannel
インスタンスの場所を特定する識別子だけがシリアライズされます。そのためシリアライズ結果を (任意のワーカーで) デシリアライズすると、最初と同じバッキングストアを指すRemoteChannel
オブジェクトが得られます。
前にスレッドを使って作成したシミュレーションをプロセス間通信を使って書き換えると次のようになります。
このコードは四つのワーカーが一つのリモートチャンネル jobs
を処理します。最初 job_id
という ID で識別されるジョブを jobs
チャンネルへ書き込み、その後リモートで実行されるタスクが同時に jobs
チャンネルから読み、ランダムな時間だけ待機し、そして job_id
・経過時間・pid
からなるタプルを results
チャンネルに書き込みます。最後にマスタープロセスが全ての results
を出力します:
julia> addprocs(4); # ワーカープロセスを追加する
julia> const jobs = RemoteChannel(()->Channel{Int}(32));
julia> const results = RemoteChannel(()->Channel{Tuple}(32));
julia> # 仕事を行う関数を全てのプロセスで定義する。
@everywhere function do_work(jobs, results)
while true
job_id = take!(jobs)
exec_time = rand()
sleep(exec_time) # 実際の仕事をする処理時間をシミュレートする。
put!(results, (job_id, exec_time, myid()))
end
end
julia> function make_jobs(n)
for i in 1:n
put!(jobs, i)
end
end;
julia> n = 12;
julia> @async make_jobs(n); # n 個のジョブを jobs チャンネルに入れる。
julia> for p in workers() # ワーカー上でタスクを開始して、リクエストを並列に処理する。
remote_do(do_work, p, jobs, results)
end
julia> @elapsed while n > 0 # 結果を出力する
job_id, exec_time, where = take!(results)
println("$job_id finished in $(round(exec_time; digits=2)) seconds on worker $where")
global n = n - 1
end
1 finished in 0.18 seconds on worker 4
2 finished in 0.26 seconds on worker 5
6 finished in 0.12 seconds on worker 4
7 finished in 0.18 seconds on worker 4
5 finished in 0.35 seconds on worker 5
4 finished in 0.68 seconds on worker 2
3 finished in 0.73 seconds on worker 3
11 finished in 0.01 seconds on worker 3
12 finished in 0.02 seconds on worker 3
9 finished in 0.26 seconds on worker 5
8 finished in 0.57 seconds on worker 4
10 finished in 0.58 seconds on worker 2
0.055971741
リモートリファレンスと分散ガベージコレクション
リモートリファレンスによって参照されるオブジェクト (AbstractChannel
型のインスタンス) は、そのオブジェクトを指すクラスター内の参照が全て削除されてはじめて削除できます。
オブジェクトを参照するノードの追跡はそのオブジェクトを保存するノードによって行われます。RemoteChannel
や未フェッチの Future
がワーカーへシリアライズ (転送) されるたびに、その参照が指す (AbstractChannel
型の) オブジェクトを持つノードは通知を受けます。そして RemoteChannel
や未フェッチの Future
がローカルでガベージコレクトされるたびに、その参照が指すオブジェクトを持つノードは再度通知を受けます。この仕組みはクラスターと適切なやり取りを行う Julia 内部のシリアライザに実装されています。リモートリファレンスはクラスターで実行されるときにだけ意味を持ちます。また通常の IO
オブジェクトへの参照のシリアライズとデシリアライズはサポートされていません。
コレクト対象のオブジェクトが受け取る通知は "追跡" メッセージとして送信されます。例えば参照が他のプロセスに向けてシリアライズされるときは「参照の追加」のメッセージが、ローカルでガベージコレクトされたときは「参照の削除」のメッセージが送信されます。
Future
は一度だけ書き込み可能であり計算された値はローカルにキャッシュされるので、Future
を fetch
すると参照の追跡情報も更新されます。
オブジェクトを指す全ての参照が削除されると、そのオブジェクトを持つノードが削除を行います。
フェッチ済みの Future
を異なるノードへシリアライズすると、Future
の値も同じノードから送信されます。フェッチされてからシリアライズされるまでの間に、値を計算したノードにあるオリジナルの値がコレクトされる可能性があるためです。
重要な事実として、オブジェクトがローカルのガベージコレクタによってコレクトされるタイミングはオブジェクトのサイズやシステムのメモリ使用量に依存します。
リモートリファレンスの場合には、ローカルに保存される参照オブジェクトは非常に小さくなる一方でリモートノードに保存される値は非常に大きくなる可能性があります。リモートのローカルオブジェクトがすぐにコレクトされない可能性があるので、RemoteChannel
や未フェッチの Future
のローカルなインスタンスに対しては必要なくなった段階で finalize
を明示的に呼び出すのが良い習慣とされます。Future
をフェッチするときはリモートストアからの参照が削除されるので、フェッチした Future
に対しては finalize
は必要ありません。finalize
を明示的に呼び出すとすぐにリモートノードへメッセージが送信され、その値への参照を削除すべきであることが伝わります。
ファイナライザを起動した参照は無効となり、それ以降は利用できなくなります。
ローカルノードに対するリモートコール
リモートコールの実行に必要なデータはリモートノードへコピーされます。これは異なるノードにある RemoteChannel
や Future
にデータが保存されるときでも同様です。期待される通り、このときオブジェクトはシリアライズされてリモートノードにコピーされます。ただし目的ノードがローカルノードのとき、つまり呼び出し側のプロセス ID がリモートノードの ID と同じときは、リモートコールがローカルな呼び出しとして実行されます。このとき通常 (必ずではありません) 呼び出しは異なるタスクとして実行されますが、データのシリアライズとデシリアライズが行われません。つまりコピーは行われず、呼び出しは渡されたオブジェクトと同じオブジェクトを操作することになります。この振る舞いを示す例を示します:
julia> using Distributed;
julia> # ローカルノードに RemoteChannel が作成される。
rc = RemoteChannel(()->Channel(3));
julia> v = [0];
julia> for i in 1:3
v[1] = i # v を再利用する。
put!(rc, v)
end;
julia> result = [take!(rc) for _ in 1:3];
julia> println(result);
Array{Int64,1}[[3], [3], [3]]
julia> println("Num Unique objects : ", length(unique(map(objectid, result))));
Num Unique objects : 1
julia> addprocs(1);
julia> # リモートノードに RemoteChannel を作成する。
rc = RemoteChannel(()->Channel(3), workers()[1]);
julia> v = [0];
julia> for i in 1:3
v[1] = i
put!(rc, v)
end;
julia> result = [take!(rc) for _ in 1:3];
julia> println(result);
Array{Int64,1}[[1], [2], [3]]
julia> println("Num Unique objects : ", length(unique(map(objectid, result))));
Num Unique objects : 3
この例から分かるように、ローカルな RemoteChannel
に対して同じオブジェクト v
を put!
すると、その値が異なっていたとしても同じ参照が保存されます。これに対して異なるノードにある rc
に put!
を行うと v
のコピーが作成されます。
これは通常問題にならないことを指摘しておきます。これはオブジェクトがローカルに保存され、かつ呼び出しの後に変更されるときに限って考慮すべき事項です。そういった場合にはオブジェクトの deepcopy
を行うのが適しているでしょう。
ローカルノードに対するリモートコールでも同じことが起こります:
julia> using Distributed; addprocs(1);
julia> v = [0];
julia> v2 = remotecall_fetch(x->(x[1] = 1; x), myid(), v); # ローカルノードで実行
julia> println("v=$v, v2=$v2, ", v === v2);
v=[1], v2=[1], true
julia> v = [0];
julia> v2 = remotecall_fetch(x->(x[1] = 1; x), workers()[1], v); # リモートノードで実行
julia> println("v=$v, v2=$v2, ", v === v2);
v=[0], v2=[1], false
ここでもローカルノードに対するリモートコールは通常の関数呼び出しのように振る舞い、引数に渡されるローカルのオブジェクトを改変します。これに対してリモートの呼び出しは引数をコピーします。
繰り返しになりますが、この振る舞いは通常問題になりません。これを考慮すべきなのはローカルノードが計算ノードとして使われていて引数が関数の後に使われるときだけであり、必要ならローカルノードの呼び出しで深いコピーを渡すことで対処できます。リモートノードに対する呼び出しは必ず引数のコピーに対する処理となります。
共有配列
共有配列はシステムの共有メモリを使って配列を複数のプロセスにマップします。SharedArray
と DArray
は一部似ている部分もありますが、その振る舞いは大きく異なります。DArray
では各プロセスがデータの一部分だけを持ち、同じデータが二つ以上のプロセスで共有されることはありません。一方 SharedArray
では配列への処理に "参加" する全てのプロセスが配列全体へのアクセス権を持ちます。同じマシンの二つ以上のプロセスが大きなデータに連帯してアクセスする必要があるなら、SharedArray
が良い選択肢です。
共有配列のサポートは SharedArrays
モジュールを通して提供されるので、参加する全てのワーカーで明示的に読み込んでおく必要があります。
SharedArray
の添え字アクセス (および代入) は通常の配列と同じように動作し、さらに内部のメモリがローカルのプロセスから利用可能なので効率も落ちません。そのため単一プロセスで動作する配列を前提として書かれたアルゴリズムの多くは SharedArray
に対して自然に動作します。アルゴリズムが Array
の入力を要求するときは、sdata
を使えば SharedArray
が持つ内部の配列を取得できます。他の AbstractArray
型に対して sdata
は引数をそのまま返すので、sdata
は任意の Array
型のオブジェクトに対して呼び出して構いません。
共有配列のコンストラクタは次の形をしています:
SharedArray{T,N}(dims::NTuple; init=false, pids=Int[])
これは要素型が T
でサイズが dims
の N
次元共有配列を作成し、それを pids
で指定されるプロセスで共有します。分散配列とは異なり、共有配列は名前付き引数 pids
で指定される "参加する" ワーカーからのみアクセスできます (共有配列を作成するプロセスが同じホスト上にあるなら、そこからもアクセスできます)。また SharedArray
が要素型としてサポートするのは isbitstype
が true
を返す型だけです。
initfn(S::SharedArray)
というシグネチャを持つ関数を init
に指定すると、それが参加するワーカーの全てで呼ばれます。各ワーカーの init
関数で配列の異なる部分を初期化させれば、初期化の並列化が可能です。
簡単な例を示します:
julia> using Distributed
julia> addprocs(3)
3-element Array{Int64,1}:
2
3
4
julia> @everywhere using SharedArrays
julia> S = SharedArray{Int,2}((3,4), init = S -> S[localindices(S)] = repeat([myid()], length(localindices(S))))
3×4 SharedArray{Int64,2}:
2 2 3 4
2 3 3 4
2 3 4 4
julia> S[3,2] = 7
7
julia> S
3×4 SharedArray{Int64,2}:
2 2 3 4
2 3 3 4
2 7 4 4
SharedArrays.localindices
はプロセスごとに異なる重ならない添え字の区間を提供するので、プロセス間でタスクを分けるときに便利です。もちろん、初期化処理の切り分け方は自由です:
julia> S = SharedArray{Int,2}((3,4), init = S -> S[indexpids(S):length(procs(S)):length(S)] = repeat([myid()], length( indexpids(S):length(procs(S)):length(S))))
3×4 SharedArray{Int64,2}:
2 2 2 2
3 3 3 3
4 4 4 4
全てのプロセスが内部データへのアクセスを持つので、衝突を起こさないように注意が必要です。例えば
@sync begin
for p in procs(S)
@async begin
remotecall_wait(fill!, p, S, p)
end
end
end
は未定義動作となります。各プロセスが配列全体に pid
を書き込むためです。S
の各要素について、その要素への代入を最後に実行したプロセスの pid
が代入されることになります。
さらに込み入った複雑な例として、次の "カーネル" を並列に実行する処理を考えます:
q[i,j,t+1] = q[i,j,t] + u[i,j,t]
このカーネルは一次元の添え字を使って処理を分けているので、高い確率で問題が発生します: あるワーカーに割り当てられたブロックの後ろの部分に q[i,j,t]
があり、別のワーカーに割り当てられたブロックの前の部分に q[i,j,t+1]
があると、q[i,j,t+1]
の計算で q[i,j,t]
が必要になった段階では q[i,j,t]
の準備ができていない可能性が非常に高くなります。こういった場合には、配列を手動で切り分けた方がよいでしょう。ここでは二つ目の次元に関する分割を試してみます。まずワーカーに割り当てる区間 (irange, jrange)
を返す関数を作ります:
julia> @everywhere function myrange(q::SharedArray)
idx = indexpids(q)
if idx == 0 # このワーカーには何も割り当てない
return 1:0, 1:0
end
nchunks = length(procs(q))
splits = [round(Int, s) for s in range(0, stop=size(q,2), length=nchunks+1)]
1:size(q,1), splits[idx]+1:splits[idx+1]
end
そしてカーネルを定義します:
julia> @everywhere function advection_chunk!(q, u, irange, jrange, trange)
@show (irange, jrange, trange) # 何が起きているかが分かるよう出力する。
for t in trange, j in jrange, i in irange
q[i,j,t+1] = q[i,j,t] + u[i,j,t]
end
q
end
SharedArray
から簡単に使えるようにラッパーも定義します:
julia> @everywhere advection_shared_chunk!(q, u) =
advection_chunk!(q, u, myrange(q)..., 1:size(q,3)-1)
では三つのバージョンを比べてみましょう。一つ目はシングルプロセスで実行する方法です:
julia> advection_serial!(q, u) =
advection_chunk!(q, u, 1:size(q,1), 1:size(q,2), 1:size(q,3)-1);
二つ目は @distributed
を使う方法です:
julia> function advection_parallel!(q, u)
for t = 1:size(q,3)-1
@sync @distributed for j = 1:size(q,2)
for i = 1:size(q,1)
q[i,j,t+1]= q[i,j,t] + u[i,j,t]
end
end
end
q
end;
三つ目は範囲を切り分けてワーカーに任せる方法です:
julia> function advection_shared!(q, u)
@sync begin
for p in procs(q)
@async remotecall_wait(advection_shared_chunk!, p, q, u)
end
end
q
end;
SharedArray
を作成して関数の性能を計測します。ここでは julia -p 4
として Julia を起動し、ワーカーを三つとしました:
julia> q = SharedArray{Float64,3}((500,500,500));
julia> u = SharedArray{Float64,3}((500,500,500));
JIT コンパイルのために全ての関数を一度実行し、二度目の実行で @time
マクロを使います。次の結果となりました:
julia> @time advection_serial!(q, u);
(irange,jrange,trange) = (1:500,1:500,1:499)
830.220 milliseconds (216 allocations: 13820 bytes)
julia> @time advection_parallel!(q, u);
2.495 seconds (3999 k allocations: 289 MB, 2.09% gc time)
julia> @time advection_shared!(q,u);
From worker 2: (irange,jrange,trange) = (1:500,1:125,1:499)
From worker 4: (irange,jrange,trange) = (1:500,251:375,1:499)
From worker 3: (irange,jrange,trange) = (1:500,126:250,1:499)
From worker 5: (irange,jrange,trange) = (1:500,376:500,1:499)
238.119 milliseconds (2264 allocations: 169 KB)
advection_shared!
の一番の利点はワーカーの間の転送量が最小化され、割り当てられた部分を計算する時間を多く取れることです。
共有配列と分散ガベージコレクション
リモートリファレンスと同様、共有配列に参加している全てのワーカーからの参照を解放するタイミングは作成したノードにおけるガベージコレクションに左右されます。短命の共有配列オブジェクトをいくつも作るコードでは、ファイナライザを可能になった段階で明示的に呼ぶことで性能が向上するでしょう。共有されるセグメントをマップするファイルハンドルとそのメモリ領域が早く解放されるためです。
ClusterManager
論理クラスターにおける Julia プロセスの起動・管理・通信はクラスターマネージャを通して行われます。ClusterManager
は次の処理を担当します:
- クラスター環境でワーカープロセスを起動する。
- 各ワーカーが生きている間、イベントを管理する。
- データ転送を提供する (省略可能)。
Julia のクラスターは次の特徴を持ちます:
- 最初の Julia プロセスは
master
と呼ばれ、プロセス ID 1 と特殊な役割を持つ。 master
プロセスだけがワーカープロセスの追加と削除を行える。- 全てのプロセスは互いに直接通信できる。
ワーカー間の (組み込みの TCP/IP トランスポートを使った) 通信は次の手順で行われます:
-
master
プロセスがClusterManager
オブジェクトを引数としてaddprocs
を呼ぶ。 -
addprocs
が適切なlaunch
メソッドを呼び出し、このメソッドが必要な個数のワーカープロセスを適切なマシンで起動する。 - 各ワーカーが使われていないポートへのリッスンを開始し、ホストとポートの情報を
stdout
に書き出す。 - クラスターマネージャが各ワーカーの
stdout
をキャプチャし、その内容をmaster
プロセスで利用可能にする。 master
プロセスがこの情報をパースし、各ワーカーへの TCP/IP 接続を確立する。- 全てのワーカーがクラスターに含まれる他のワーカーの通知を受ける。
- 各ワーカーは自身の
id
よりも小さいid
を持つ全てのワーカーに接続する。 - これで全てのワーカーが他の全てのワーカーと接続するメッシュネットワークが確立される。
デフォルトのトランスポート層はプレーンな TCPSocket
を使いますが、Julia クラスターが独自のトランスポート層を提供することも可能です。
Julia は組み込みで二種類のクラスターマネージャを提供します:
-
LocalManager
:addprocs()
とaddprocs(np::Integer)
で使われる。 -
SSHManager
: 引数にホスト名のリストを受け取るaddprocs(hostnames::Array)
で使われる。
LocalManager
は同じホスト上に追加のワーカーを起動し、マルチコア/マルチプロセッサのハードウェアを最大限活用するために使います。
まとめると、最低限のクラスターマネージャに必要な要件は次の通りです:
- 抽象型
ClusterManager
の部分型であること。 - 新しいワーカーを起動を担当する
launch
メソッドを実装すること。 - 生きているワーカーに対する様々イベント (停止シグナルの送信など) が起きたときに呼ばれる
manage
メソッドを実装すること。
addprocs(manager::FooManager)
を行うためには、FooManager
に対する次のメソッドの実装が必要です:
function launch(manager::FooManager, params::Dict, launched::Array, c::Condition)
[...]
end
function manage(manager::FooManager, id::Integer, config::WorkerConfig, op::Symbol)
[...]
end
例として、同じホストでワーカーを起動する LocalManager
がどう実装されているかを見てみましょう:
struct LocalManager <: ClusterManager
np::Integer
end
function launch(manager::LocalManager, params::Dict, launched::Array, c::Condition)
[...]
end
function manage(manager::LocalManager, id::Integer, config::WorkerConfig, op::Symbol)
[...]
end
launch
の引数の意味は次の通りです:
-
manager::ClusterManager
:addprocs
に渡されたクラスターマネージャ -
params::Dict
:addprocs
に渡されたキーワード引数 launched::Array
: 一つ以上のWorkerConfig
を追加すべき配列c::Condition
: 起動したワーカーに通知すべき条件変数
launch
メソッドは個別のタスクとして非同期的に呼ばれます。このタスクが終了すると要求されたワーカーが全て起動したというシグナルが出るので、launch
関数は要求されたワーカーが起動したらすぐに終了しなければなりません。
新しく起動されたワーカーは他のワーカーとマスタープロセスに all-to-all で接続します。Julia を起動するときにコマンドライン引数 --worker[=<cookie>]
を指定するとプロセスが最初からワーカーとして初期化され、TCP/IP ソケットを使った接続が設定されます。
クラスター内の全てのワーカーは同じクッキーを共有します。--worker
というオプションでクッキーが指定されずに Julia が起動されると、ワーカーは標準入力からクッキーを読み込みます。LocalManager
と SSHManager
はどちらも新しく起動したワーカーに標準入力を通してクッキーを渡します。
デフォルトでワーカーは getipaddr()
が返すアドレスの使われていないポートにリッスンします。コマンドライン引数 --bind-to bind_addr[:port]
を指定すれば特定のアドレスにリッスンさせることが可能です。この機能はマルチホームのホストで有用です。
TCP/IP でないトランスポートの一例として、クラスターマネージャの実装は MPI を使って通信を行うこともできます。その場合 --worker
を指定してはならず、新しく起動されたワーカーは並列計算の機能を使う前に init_worker(cookie)
を呼び出すべきです。
launch
メソッドはワーカーを起動するごとに WorkerConfig
オブジェクトを (フィールドを適切に初期化した上で) launched
配列に追加する必要があります。WorkerConfig
の定義を示します:
mutable struct WorkerConfig
# 全てのクラスターマネージャに関連する共通フィールド
io::Union{IO, Nothing}
host::Union{AbstractString, Nothing}
port::Union{Integer, Nothing}
# ホストで追加のワーカーを起動するときに使われるフィールド
count::Union{Int, Symbol, Nothing}
exename::Union{AbstractString, Cmd, Nothing}
exeflags::Union{Cmd, Nothing}
# 外部のクラスターマネージャはこのフィールドにワーカー単位の情報を保存する。
# 複数のフィールドが必要なら辞書にもできる。
userdata::Any
# SSHManager でワーカーに接続する SSH トンネル接続が利用する。
tunnel::Union{Bool, Nothing}
bind_addr::Union{AbstractString, Nothing}
sshflags::Union{Cmd, Nothing}
max_parallel::Union{Integer, Nothing}
# LocalManager/SSHManager の両方が利用する。
connect_at::Any
[...]
end
WorkerConfig
の大半のフィールドは組み込みのマネージャによって使われます。独自のクラスターマネージャが指定するのは io
または host
/port
だけであるはずです。
-
io
が指定されると、ホストとポートの情報はio
から読み込まれます (Julia ワーカーは開始時にバインドアドレスとポートをio
に出力します)。このため Julia ワーカーは利用可能な任意のポートにリッスンでき、ポートを手動で設定する必要はありません。 -
io
が指定されないと、ワーカーはhost
とport
を使って接続を行います。 -
count
,exename
,exeflags
はワーカーから追加のワーカーを起動するときに使われます。例えば、クラスターマネージャはノードごとに一つのワーカーを起動し、そのワーカーに他のワーカーの起動を任せるかもしれません。count
が整数n
だと、全部でn
個のワーカーが起動されます。count
が:auto
だと、マシンの CPU スレッド (論理コア) と同じ個数のワーカーが起動されます。exename
はjulia
実行形式のフルパスです。exeflags
は新しいワーカーを起動するときのコマンドライン引数です。
-
tunnel
,bind_addr
,sshflags
,max_parallel
はマスタープロセスからワーカーへの接続に SSH トンネルが必要になったときに利用されます。 -
userdata
は独自のクラスターマネージャがワーカー特有の情報を保存できるように提供されます。
manage(manager::FooManager, id::Integer, config::WorkerConfig, op::Symbol)
はワーカーが生きている間にイベントが起こると呼ばれます。そのとき op
は次に示す値のどれかに設定されます:
:register
/:deregister
: ワーカーが Julia のワーカープールから追加/削除された。:interrupt
:interrupt(workers)
が呼ばれた。ClusterManager
は指定されたワーカーに停止シグナルを送る。:finalize
: クリーンアップ時に使われる。
独自トランスポートを使ったクラスターマネージャ
デフォルトで使われる all-to-all の TCP/IP ソケット接続を独自のトランスポート層に置き換えるには、もう少し込み入った設定が必要です。各 Julia プロセスは接続しているワーカーと同じ数の通信タスクを持ちます。例えば 32 プロセスの Julia クラスターが all-to-all のメッシュネットワークで接続しているとします。このとき:
- それぞれの Julia プロセスは 31 個の通信タスクを持ちます。
- それぞれの通信タスクは一つのメッセージ処理ループで一つのリモートワーカーからのメッセージを全て処理します。
- メッセージ処理ループは
IO
オブジェクト (例えばデフォルト実装ではTCPSocket
) を待機し、メッセージ全体を読み、処理を行い、次のメッセージを待機します。 - プロセスへのメッセージ送信は任意の Julia のタスクから直接行われます ──通信タスクだけが行えるわけではありません。ここでも、適切な
IO
オブジェクトが使われます。
デフォルトのトランスポート層を置き換えるのに必要なのは、リモートワーカーへの接続を確立する処理と、メッセージ処理ループが待機できる適切な IO
オブジェクトです。独自のマネージャが実装すべき特別なコールバックは次の二つです:
connect(manager::FooManager, pid::Integer, config::WorkerConfig)
kill(manager::FooManager, pid::Int, config::WorkerConfig)
connect(manager::ClusterManager, pid::Integer, config::WorkerConfig)
がデフォルト実装です (TCP/IP ソケットが使われます)。
connect
は IO
オブジェクトの組を返すべきです。一つはワーカー pid
から送信されるデータを読み込むためのオブジェクトで、もう一つはワーカー pid
へ送信するデータを書き込むためのオブジェクトです。独自のトランスポート層と Julia 組み込みの並列インフラストラクチャの間でデータを転送するときは、インメモリの BufferStream
をパイプとして利用できます (トランスポートは IO
でなくても構いません)。
BufferStream
はインメモリの IOBuffer
であり、IO
のように振る舞います ──非同期的に扱えるストリームです。
サンプルコードレポジトリの clustermanager/0mq
フォルダには ZeroMQ を使って Julia ワーカーをスター型トポロジーに接続する例があります (中間に ZeroMQ ブローカーがあります)。注意: この場合でも論理的には全ての Julia プロセスが互いに接続しています ──任意のワーカーは任意のワーカーに直接メッセージを送信でき、そのときトランスポート層に ZeroMQ が使われていることを意識する必要はありません。
独自のトランスポート層を使うときは:
- コマンドライン引数
--worker
を付けて Julia ワーカーを起動してはいけません。--worker
を付けて起動されたワーカーはデフォルトの TCP/IP ソケットトランスポート実装を利用します。 - 論理的接続を要求したワーカーそれぞれに対して
Base.process_messages(rd::IO, wr::IO)
が呼び出される必要があります。この関数はIO
オブジェクトが表すワーカーに対する送信または受信メッセージを処理する新しいタスクを起動します。 - ワーカーの初期化処理で
init_worker(cookie, manager::FooManager)
を呼び出す必要があります。 -
WorkerConfig
のconnect_at::Any
フィールドはlaunch
が呼ばれたときにクラスターマネージャから設定できます。このフィールドの値は全てのconnect
コールバックでワーカーに渡されます。例えば TCP/IP ソケットはワーカーに接続するための(host, port)
のタプルをこのフィールドに保存します。
kill(manager, pid, config)
はクラスターからワーカーを削除するときに呼ばれます。これを受けてマスタープロセスでは、クラスターマネージャの実装が対応する IO
オブジェクトを閉じて適切なクリーナップを行う必要があります。デフォルトの実装は単純に指定されたリモートワーカーで exit()
を実行させるだけです。
サンプルコードレポジトリ の clustermanager/simple
フォルダにはクラスターの設定に UNIX ドメインソケットを使った簡単な実装があります。
LocalManager
と SSHManager
に対するネットワーク要件
Julia クラスターはローカル PC や機関が保有するクラスター、あるいはクラウドといったインフラストラクチャ上に構築された既にセキュアであることが保証されている環境で実行されるものとして設計されています。この節では組み込みの LocalManager
と SSHManager
が持つネットワークセキュリティ要件を説明します。
-
マスタープロセスはどんなポートにもリッスンしません。ワーカーへの接続だけを行います。
-
それぞれのワーカーはローカルのインターフェースを一つだけにバインドし、OS が割り当てるエフェメラルポート番号だけにリッスンします。
-
addprocs(N)
で使われるLocalManager
は、デフォルトでループバックインターフェースだけをバインドします。これは後からリモートホストで起動された (悪意を持った) ワーカーがクラスターに接続できないことを意味します。例えばaddprocs(4)
の後にaddprocs(["remote_host"])
をすると失敗します。これを行うには、addprocs(4; restrict=false)
のようにキーワード引数restrict
を使って外部ネットワークインターフェスのバインドをLocalManager
へ明示的にリクエストしてください。 -
addprocs(list_of_remote_hosts)
で使われるSSHManager
は SSH を使ってリモートホスト上にワーカーを起動します。デフォルトで SSH は Julia ワーカーを起動するためにだけ利用され、その後のマスターとワーカーの間の通信にはプレーンな非暗号化 TCP/IP ソケットが利用されます。リモートホストはパスワードレスログインを有効にしておく必要があります。追加の SSH フラグや認証情報はキーワード引数sshflags
で指定できます。 -
マスターとワーカーの間の通信でも SSH を使いたいなら
addprocs(list_of_remote_hosts; tunnel=true, sshflags=<SSH キーと他のフラグ>)
としてください。ローカルの PC で Julia REPL (マスター) を起動して、クラスターの残りは Amazon EC2 のようなクラウドにするという使い方が典型的です。公開鍵基盤 (PKI) で認証された SSH クライアントと結び付いたリモートクラスターでは、空けておくべきポートは 22 だけです。認証情報はsshflags
を通してsshflags=`-i <keyfile>`
のように指定します。all-to-all のトポロジー (デフォルト) では、全てのワーカーが互いにプレーンの TCP ソケットで接続を行います。そのためクラスターノードのセキュリティポリシーに (OS によって異なる) エフェメラルポートの区間における自由な接続の許可が必要です。
ワーカー間の通信の (SSH を通じた) セキュア化および暗号化、あるいは個別のメッセージの暗号化は独自の
ClusterManager
を通して行えます。 -
addprocs
のオプションにmultiplex=true
を指定すると、マスターとワーカーの間のトンネルを作成するときに SSH の多重化が使われます。Julia とは別に SSH の多重化が設定され接続が確立されているなら、multiplex
オプションが無くとも多重化が使われます。多重化が有効なときは既存の接続を使ってフォワーディングが設定されます (SSH の-O forward
オプションです)。これはサーバーがパスワード認証を必要とする場合に便利です: Julia でaddprocs
を実行する前にサーバーへログインしておけば、Julia で認証を避けることができます。既存の多重化を使うのでなければ、セッション中の制御ソケットは
~/.ssh/julia-%r@%h:%p
に配置されます。多重化が有効なときに一つのノードで複数のプロセスを起動すると、単一の TCP 接続をそれらのプロセスで共有するために帯域が制限される場合があることに注意してください。
クラスターのクッキー
クラスターに含まれる全てのプロセスは同じクッキーを共有します。デフォルトではマスタープロセスで生成されたランダム文字列が使われます。
- クッキーは
cluster_cookie()
で確認でき、cluster_cookie(cookie)
とするとクッキーをcookie
に設定できます。 - 全ての接続は両端で認証され、マスターによって起動されたワーカーだけがマスターとの通信を許されます。
- コマンドライン引数
--worker=<cookie>
を使うとスタートアップ時にワーカーへクッキーを渡すことができます。もしクッキーを指定せずに--worker
とすると、ワーカーはクッキーを標準入力 (stdin
) から読み込みます。クッキーを取得するとstdin
はすぐに閉じられます。 -
ClusterManager
はマスターのクッキーをcluster_cookie()
で取得できます。デフォルトの TCP/IP トランスポートを使わない (--worker
を付けずに起動された) クラスターマネージャはinit_worker(cookie, manager)
をマスターと同じクッキーを使って呼び出す必要があります。
より高度なセキュリティは独自の ClusterManager
を実装することで実現できます。例えばクッキーを事前に共有してスタートアップ時の引数としては指定しない方法などが考えられます。
ネットワークトポロジーの指定 (実験的)
addprocs
にキーワード引数 topology
を渡すと、ワーカーが互いにどのように接続するかを指定できます:
:all_to_all
: 全てのワーカーが互いに接続する (デフォルト)。:master_worker
: ドライバプロセス (pid
が 1 のプロセス) だけがワーカーと接続する。:custom
: クラスターマネージャのlaunch
メソッドがWorkerConfig
のident
フィールドとconnect_idents
フィールドを通して接続トポロジーを指定する。クラスターマネージャが提供する識別子がident
であるワーカーはconnect_idents
が指定するワーカーと接続する。
キーワード引数 lazy=true|false
は topology
オプションが :all_to_all
のときにだけ効果を持ちます。lazy
が true
だとクラスターはマスターが全てのワーカーと接続された状態で開始され、ワーカー間の接続はその間でリモートコールがあったときに始めて確立されます。こうするとクラスター内通信のために確保される初期リソースが節約できます。接続は並列プログラムの実行時における要件によって設定されるということです。lazy
のデフォルト値は true
です。
現在のバージョンでは、接続されていないワーカー間でメッセージを送るとエラーが生じます。この振る舞い (機能とインターフェース) は実験的とみなされるべきであり、将来のリリースで変更される可能性があります。
注目に値する外部パッケージ
Julia に組み込みの並列計算機構の他にも、言及しておくべき外部パッケージがたくさんあります。例えば MPI.jl は MPI プロトコルに対する Julia ラッパーであり、共有配列の説明では DistributedArrays.jl に触れました。
さらに Julia の GPU プログラミングエコシステムにも言及しておかなければなりません:
-
低水準な (C カーネルを使った) 操作には OpenCL.jl と CUDAdrv.jl が利用できます。それぞれ OpenCL インターフェースと CUDA ラッパーです。
-
CUDAnative.jl のような低水準インターフェース (Julia カーネル) もあります。これは Julia ネイティブな CUDA 実装です。
-
CuArrays.jl と CLArrays.jl は特定ベンダーに対する高水準な抽象化を提供します。
-
ArrayFire.jl と GPUArrays.jl は GPU プログラミング用の高水準ライブラリです。
[訳注: 現在 CUDAdrv.jl, CUDAnative.jl, CuArrays.jl は CUDA.jl に統合され、個別のパッケージは非推奨となっています。以下の説明は非推奨の個別パッケージに対するものです。]
DistributedArrays.jl と CuArrays.jl を使って複数のプロセスに配列を分散させる例を次に示します。最初に distribute()
を使い、その後に CuArray()
を使っています。
DistributedArrays.jl をインポートするときは、@everywhere
マクロを使って全てのプロセスにインポートすることを忘れないでください:
$ ./julia -p 4
julia> addprocs()
julia> @everywhere using DistributedArrays
julia> using CuArrays
julia> B = ones(10_000) ./ 2;
julia> A = ones(10_000) .* π;
julia> C = 2 .* A ./ B;
julia> all(C .≈ 4*π)
true
julia> typeof(C)
Array{Float64,1}
julia> dB = distribute(B);
julia> dA = distribute(A);
julia> dC = 2 .* dA ./ dB;
julia> all(dC .≈ 4*π)
true
julia> typeof(dC)
DistributedArrays.DArray{Float64,1,Array{Float64,1}}
julia> cuB = CuArray(B);
julia> cuA = CuArray(A);
julia> cuC = 2 .* cuA ./ cuB;
julia> all(cuC .≈ 4*π);
true
julia> typeof(cuC)
CuArray{Float64,1}
Julia の一部の機能は CUDAnative.jl によってサポートされないことに注意してください。特に sin
などの関数は CUDAnative.sin
とする必要があります。
DistributedArrays.jl と CuArrays.jl の両方を使って配列を複数プロセスに分散し、総称関数をそれに対して呼び出す例をこれから示します。
function power_method(M, v)
for i in 1:100
v = M*v
v /= norm(v)
end
return v, norm(M*v) / norm(v) # または (M*v) ./ v
end
power_method
は何度も新しいベクトルを作成し、最後にベクトルを正規化します。関数の宣言には型シグネチャを指定していませんが、これを上述のパッケージの型に適用するとどうなるかを見ましょう:
julia> M = [2. 1; 1 1];
julia> v = rand(2)
2-element Array{Float64,1}:
0.40395
0.445877
julia> power_method(M,v)
([0.850651, 0.525731], 2.618033988749895)
julia> cuM = CuArray(M);
julia> cuv = CuArray(v);
julia> curesult = power_method(cuM, cuv);
julia> typeof(curesult)
CuArray{Float64,1}
julia> dM = distribute(M);
julia> dv = distribute(v);
julia> dC = power_method(dM, dv);
julia> typeof(dC)
Tuple{DistributedArrays.DArray{Float64,1,Array{Float64,1}},Float64}
外部パッケージの簡単な利用例な紹介の締めくくりとして、MPI プロトコルの Julia ラッパー MPI.jl を考えます。全ての内部関数を見ていくのは時間がかかり過ぎるので、プロトコルの実装で使われているアプローチを簡単に紹介します。
次に簡単なスクリプトを示します。このスクリプトは各サブプロセスを呼び出し、ランクをインスタンス化し、マスタープロセスでランクの和を計算します:
import MPI
MPI.Init()
comm = MPI.COMM_WORLD
MPI.Barrier(comm)
root = 0
r = MPI.Comm_rank(comm)
sr = MPI.Reduce(r, MPI.SUM, root, comm)
if(MPI.Comm_rank(comm) == root)
@printf("sum of ranks: %s\n", sr)
end
MPI.Finalize()
このスクリプトは次のコマンドで実行します:
$ mpirun -np 4 ./julia example.jl
-
ここで言う MPI とは MPI-1 規格のことです。MPI 標準化委員会は MPI-2 で Remote Memory Access (RMA) と呼ばれる新しい種類の通信機構を導入しました。RMA が MPI 規格に追加されたのは、片方向通信を行うパターンを簡単にするためです。最新の MPI 規格についてさらに詳しくは https://mpi-forum.org/docs を参照してください。[return]