バリア同期の実装

Java Concurrency in Practice の 5.5.4 Barrier の話

amzn.to

バリア同期とは

バリア同期とは、各スレッドの実行の進行具合を合わせるために使用する同期機構のこと。 複数スレッドにまたがった依存関係のある計算(処理)の完了を待って、処理を実行する際に使用する。 たとえば、時間のかかる計算 A と B の処理を C で使用する時、C を実行する前に A と B の終了をバリア同期で待ってから実行するというような使用方法がある。

実装

実際に Ruby で表現すると以下のようになる。 イニシャライズ時に渡された数(n)になるまでそのスレッドを待ち状態にしておき、待ち状態数が n になったら @cv.broadcast をして待ち状態のスレッドを起こすということをしている。

class CyclicBarrier
  def initialize(n)
    @n = n
    @wait_count = 0
    @mutex = Mutex.new
    @cv = ConditionVariable.new
  end

  def wait
    @mutex.synchronize do
      @wait_count += 1

      if @wait_count == @n
        @cv.broadcast
      else
        @cv.wait(@mutex)
      end
    end
  end
end

使用部分は以下のようになる。 Runner#reduce_process が別スレッドで走らせた Runner#process の計算結果を使用するという例。 実際は sleep(i) の部分に重たい処理が来ることになる。 実行結果を見るとわかるんだけど、 Runner#reduce_processRunner#process の実行の終了を待ってから実行されていることがわかる。

class Runner
  def initialize(n)
    @n = n
    @cb = CyclicBarrier.new(n)
    @queue = Queue.new
  end

  def run
    (@n - 1).times.map do |i|
      Thread.start(i + 1) { |j| process(j) }
    end

    reduce_process
  end

  private

  def reduce_process
    puts "[Wait] reduce process"
    @cb.wait
    puts "[Start] reduce process"

    result = 0
    until @queue.empty?
      result += @queue.pop
    end

    puts "[Done] result is #{ans}"
  end

  def process(i)
    puts "[Start] process #{i}"

    sleep(i)
    @queue << i

    puts "[Wait] process #{i}"
    @cb.wait
  end
end

Runner.new(4).run

実行結果

$ ruby runner.rb
[Start] process 2
[Start] process 3
[Wait] reduce process
[Start] process 1
[Wait] process 1
[Wait] process 2
[Wait] process 3
[Start] reduce process
[Done] result is 6

コードは全体はこちら↓↓↓

github.com