バリア同期の実装
Java Concurrency in Practice の 5.5.4 Barrier の話
バリア同期とは
バリア同期とは、各スレッドの実行の進行具合を合わせるために使用する同期機構のこと。 複数スレッドにまたがった依存関係のある計算(処理)の完了を待って、処理を実行する際に使用する。 たとえば、時間のかかる計算 A と B の処理を C で使用する時、C を実行する前に A と B の終了をバリア同期で待ってから実行するというような使用方法がある。
実装
実際に Ruby で表現すると以下のようになる。
イニシャライズ時に渡された数(n
)になるまでそのスレッドを待ち状態にしておき、待ち状態数が n
になったら @cv.broadcast
をして待ち状態のスレッドを起こすということをしている。
class CyclicBarrier def initialize(n) @n = n @wait_count = 0 @mutex = Mutex.new @cv = ConditionVariable.new end def wait @mutex.synchronize do @wait_count += 1 if @wait_count == @n @cv.broadcast else @cv.wait(@mutex) end end end end
使用部分は以下のようになる。
Runner#reduce_process
が別スレッドで走らせた Runner#process
の計算結果を使用するという例。
実際は sleep(i)
の部分に重たい処理が来ることになる。
実行結果を見るとわかるんだけど、 Runner#reduce_process
は Runner#process
の実行の終了を待ってから実行されていることがわかる。
class Runner def initialize(n) @n = n @cb = CyclicBarrier.new(n) @queue = Queue.new end def run (@n - 1).times.map do |i| Thread.start(i + 1) { |j| process(j) } end reduce_process end private def reduce_process puts "[Wait] reduce process" @cb.wait puts "[Start] reduce process" result = 0 until @queue.empty? result += @queue.pop end puts "[Done] result is #{ans}" end def process(i) puts "[Start] process #{i}" sleep(i) @queue << i puts "[Wait] process #{i}" @cb.wait end end Runner.new(4).run
実行結果
$ ruby runner.rb [Start] process 2 [Start] process 3 [Wait] reduce process [Start] process 1 [Wait] process 1 [Wait] process 2 [Wait] process 3 [Start] reduce process [Done] result is 6
コードは全体はこちら↓↓↓