マルチスレッディング
Base.Threads.@threads
── マクロ
Threads.@threads [schedule] for ... end
for
ループを並列化して複数のスレッドで実行するためのマクロです。反復空間を分割して複数のタスクに割り当て、それらのタスクをスケジュールポリシーに従って複数のスレッドで実行します。ループの終端にバリアが敷かれ、全てのタスクが実行を終えるのを待機します。
引数 schedule
は特定のスケジュールポリシーを要請するために利用します。現在サポートされる値は :static
だけであり、こうするとスレッドごとに一つのタスクが作成され、各タスクに反復が平等に割り当てられます。他の @threads
の内部あるいは ID が 1 でないスレッドで :static
を指定するとエラーになります。
デフォルトのスケジュール (schedule
引数を与えないときに使われるスケジュール) は将来変更される可能性があります。
引数 schedule
は Julia 1.5 以降でサポートされます。
Base.Threads.@spawn
── マクロ
Threads.@spawn expr
利用できる適当なスレッドで Task
を作成・実行します。タスクの終了を待つには、このマクロの返り値に対して wait
を呼び出してください。fetch
を呼び出せばタスクの終了を待った上で返り値を取得できます。
@spawn
に渡す式では $
を使って値を補間でき、補間された値は内部のクロージャに直接コピーされます。この機能を使って変数 value
の値を非同期なコードに埋め込めば、現在のタスクで変数の値を変更しても非同期にコードを実行する新しいタスクに影響しないようにできます。
重要な注意点についてはマニュアルのスレッディングの章を参照してください。
このマクロは Julia 1.3 以降でサポートされます。
$
を使った値の補間は Julia 1.4 以降でサポートされます。
Base.Threads.threadid
── 関数
Base.Threads.nthreads
── 関数
同期
Base.Threads.Condition
── 型
Threads.Condition([lock])
スレッドセーフなバージョンの Base.Condition
です。
Threads.Condition
に対して wait
および notify
を呼び出すときは、まず条件を lock
する必要があります。このロックは wait
が呼び出されブロックしている間は不可分に解放され、wait
が返るとき再取得されます。そのため Threads.Condition
型の値 c
を使うコードは通常次のような形をしているはずです:
lock(c)
try
while !thing_we_are_waiting_for
wait(c)
end
finally
unlock(c)
end
この機能は Julia 1.2 以降でサポートされます。
Base.Event
── 型
不可分操作
不可分操作の API はまだ完全に決まっておらず、将来おそらく変更されます。
Base.Threads.Atomic
── 型
Threads.Atomic{T}
この型の値は T
型のオブジェクトへの参照を保持し、そのオブジェクトに対するアクセスを不可分な (つまり、スレッドセーフな) アクセスだけに制限します。
不可分に扱えるのは一部の "簡単な" 真偽値・整数・浮動小数点数だけです。具体的には次の型の値です:
Bool
Int8
,Int16
,Int32
,Int64
,Int128
Int256
UInt8
,UInt16
,UInt32
,UInt64
,UInt128
UInt256
Float16
,Float32
,Float64
新しい Atomic
型のオブジェクトは不可分でない値から作成できます。値を指定しなければ、新しい不可分オブジェクトはゼロで初期化されます。
例
不可分オブジェクトへのアクセスは []
記法で行います:
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> x[] = 1
1
julia> x[]
1
atomic_add!
や atomic_xchg!
のように、不可分操作には名前の最初に atomic_
が付いています。
Base.Threads.atomic_cas!
── 関数
Threads.atomic_cas!(x::Atomic{T}, cmp::T, newval::T) where T
x
に対して不可分に compare-and-set を行います。
「x
が保持する値と cmp
を比較し、もし等しいなら newval
を x
に書き込む」という処理が不可分に行われます。それ以外のとき x
は変更されず、x
の古い値がそのまま返ります。返り値と cmp
を ===
で比較することで、x
が更新されたかどうかを判定できます。更新されていれば x
は newval
を保持しています。
詳細は LLVM の cmpxchg
命令のドキュメントを参照してください。
この関数はトランザクションセマンティクスの実装に使われます。トランザクションを行う前に x
の値を記録しておき、トランザクションを行っている間に x
の値が変わっていないときに限って新しい値を保存するという使い方です。
例
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_cas!(x, 4, 2);
julia> x
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_cas!(x, 3, 2);
julia> x
Base.Threads.Atomic{Int64}(2)
Base.Threads.atomic_xchg!
── 関数
Threads.atomic_xchg!(x::Atomic{T}, newval::T) where T
x
が保持する値を不可分に交換します。
「x
が保持する値を newval
に置き換えて、古い値を返す」という処理が不可分に行われます。
詳細は LLVM の atomicrmw xchg
命令のドキュメントを見てください。
例
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_xchg!(x, 2)
3
julia> x[]
2
Base.Threads.atomic_add!
── 関数
Threads.atomic_add!(x::Atomic{T}, val::T) where T <: ArithmeticTypes
x
が保持する値に val
を不可分に足します。
x[] += val
を不可分に実行し、x
の古い値を返します。Atomic{Bool}
に対しては定義されません。
詳細は LLVM の atomicrmw add
命令のドキュメントを見てください。
例
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_add!(x, 2)
3
julia> x[]
5
Base.Threads.atomic_sub!
── 関数
Threads.atomic_sub!(x::Atomic{T}, val::T) where T <: ArithmeticTypes
x
が保持する値から val
を不可分に引きます。
x[] -= val
を不可分に実行し、x
の古い値を返します。Atomic{Bool}
に対しては定義されません。
詳細は LLVM の atomicrmw sub
命令のドキュメントを見てください。
例
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_sub!(x, 2)
3
julia> x[]
1
Base.Threads.atomic_and!
── 関数
Threads.atomic_and!(x::Atomic{T}, val::T) where T
x
が保持する値を val
とのビット単位の論理積で更新します。
x[] &= val
を不可分に実行し、x
の古い値を返します。
詳細は LLVM の atomicrmw and
命令のドキュメントを見てください。
例
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_and!(x, 2)
3
julia> x[]
2
Base.Threads.atomic_nand!
── 関数
Threads.atomic_nand!(x::Atomic{T}, val::T) where T
x
が保持する値を val
との否定論理積で更新します。
x[] = ~(x[] & val)
を不可分に実行し、x
の古い値を返します。
詳細は LLVM の atomicrmw nand
命令のドキュメントを見てください。
例
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_nand!(x, 2)
3
julia> x[]
-3
Base.Threads.atomic_or!
── 関数
Threads.atomic_or!(x::Atomic{T}, val::T) where T
x
が保持する値を val
との論理和で更新します。
x[] |= val
を不可分に実行し、x
の古い値を返します。
詳細は LLVM の atomicrmw or
命令のドキュメントを見てください。
例
julia> x = Threads.Atomic{Int}(5)
Base.Threads.Atomic{Int64}(5)
julia> Threads.atomic_or!(x, 7)
5
julia> x[]
7
Base.Threads.atomic_xor!
── 関数
Threads.atomic_xor!(x::Atomic{T}, val::T) where T
x
が保持する値を val
との排他論理和で更新します。
x[] $= val
を不可分に実行し、x
の古い値を返します。
詳細は LLVM の atomicrmw xor
命令のドキュメントを見てください。
例
julia> x = Threads.Atomic{Int}(5)
Base.Threads.Atomic{Int64}(5)
julia> Threads.atomic_xor!(x, 7)
5
julia> x[]
2
Base.Threads.atomic_max!
── 関数
Threads.atomic_max!(x::Atomic{T}, val::T) where T
x
が保持する値と val
の大きい方を x
に保存します。
x[] = max(x[], val)
を不可分に実行し、x
の古い値を返します。
詳細は LLVM の atomicrmw max
命令のドキュメントを見てください。
例
julia> x = Threads.Atomic{Int}(5)
Base.Threads.Atomic{Int64}(5)
julia> Threads.atomic_max!(x, 7)
5
julia> x[]
7
Base.Threads.atomic_min!
── 関数
Threads.atomic_min!(x::Atomic{T}, val::T) where T
x
が保持する値と val
の小さい方を x
に保存します。
x[] = min(x[], val)
を不可分に実行し、x
の古い値を返します。
詳細は LLVM の atomicrmw min
命令のドキュメントを見てください。
例
julia> x = Threads.Atomic{Int}(7)
Base.Threads.Atomic{Int64}(7)
julia> Threads.atomic_min!(x, 5)
7
julia> x[]
5
Base.Threads.atomic_fence
── 関数
Threads.atomic_fence()
逐次一貫性に関するメモリフェンスを敷きます。
処理がプログラムにある順序で逐次的に実行されることを保証するメモリフェンスを敷きます。この機能が必要なアルゴリズム、つまり acquire/release フェンスでは不十分なアルゴリズムが存在します。
これは多くの場合で非常に低速な操作です。Julia の不可分操作は全て acquire/release の意味論を持つので、明示的なフェンスはまず必要にならないはずです。
詳細は LLVM の fence
命令のドキュメントを見てください。
スレッドプールを使った ccall (実験的)
Base.@threadcall
── マクロ
@threadcall((cfunc, clib), rettype, (argtypes...), argvals...)
@threadcall
マクロの呼び出し方は ccall
と同じですが、処理を他のスレッドで行います。ブロックする C 関数を呼ぶときに @threadcall
を使うとメインの julia
スレッドをブロックせずに済むので有用です。並行性は libuv のスレッドプールのサイズで制限されます。このサイズのデフォルト値は 4 ですが、環境変数 UV_THREADPOOL_SIZE
を変更して julia
プロセスを再起動すれば変更できます。
@threadcall
で呼び出された関数から Julia を呼び返してはいけないことに注意してください。
低水準同期プリミティブ
これらは通常の同期オブジェクトを作るのに使われる基礎単位です。
Base.Threads.SpinLock
── 型
SpinLock()
再入可能でない、test-and-test-and-set を使ったスピンロックです。再帰的なロックを試みるとデッドロックします。この種のロックはブロックが起こらずほとんど時間のかからない処理 (IO の実行など) でのみ使われるべきです。基本的には ReentrantLock
を使ってください。
lock
には必ず unlock
が対応する必要があります。
test-and-test-and-set で実装されるスピンロックは競合しているスレッドが 30 程度以下である場合に最速となります。もしそれ以上のスレッドがあるなら、他のアプローチによる同期を考えてみるべきです。