読者です 読者をやめる 読者になる 読者になる

unicorn の hook が呼ばれるタイミングを調べてみた

unicorn には特定のタイミングで発火する hook を仕掛ける仕組みがあるが、それらの hook が呼ばれるタイミングを正確にわかっていなかったのでメモ。 いちおう公式のドキュメントはこちらにある。ここで言及する unicorn のバージョンは v5.3.0

それぞれの hook の説明

after_fork, after_worker_exit, after_worker_ready, before_exec, before_fork の 5 つある。 以下はそれぞれの説明&コードの場所。

after_fork

この hook は fork した後にワーカープロセスから呼ばれる ( init_worker_process )。 init_worker_process は fork したワーカープロセスの中で呼ばれる worker_loop で呼ばれるメソッド。

after_worker_exit

この hook はワーカープロセスが exit する際にマスタープロセスから呼ばれる。 ワーカーが死んだ後、ワーカーの socket などを閉じた後に呼ばれる ( reap_all_worker )。

after_worker_ready

この hook はワーカープロセスがレスポンスを受けられる状態になったところでワーカープロセスから呼ばれる。 fork したワーカープロセスの中で呼ばれる ( worker_loop )。

before_exec

この hook は新しい unicorn のコマンドを exec する直前にマスタプロセスによって呼ばれる。 unicorn は graceful restart などをする際に、呼び出されたときと同じコマンド (例えば unicorn -c unicorn.ru など) を Kernel#exec する。 その直前に呼ばれる ( reexec )。

before_fork

それぞれのワーカプロセスを fork する前にマスタープロセスから呼ばれる。 Unicorn::Worker オブジェクトを作成したらすぐ呼ばれている ( spawn_missing_workers )。

呼び出しタイミングの確認

ワーカーが 2 つの設定で幾つか実験を行って呼び出しタイミングを見てみた。今回確認する際に使用したソースコードは以下。 もちろんオプションによっては呼び出し順序が変わるので注意。

github.com

起動時

  1. before_fork
  2. after_fork
  3. after_worker_ready

マスタープロセスが起動してから 2 つのワーカーに対してそれぞれ before_fork, after_fork, affter_worker_ready が呼ばれている。 マスタープロセスの pid が 46056 でワーカープロセスがそれぞれ、46090 と 46089 になる。 before_fork はマスタープロセスからよばれるので pid は 46056 になる。それ以外はワーカープロセスで呼ばれるのでそれぞれの pid になる。

$ be unicorn -c unicorn.ru
I, [2017-05-21T16:26:53.009330 #46056]  INFO -- : listening on addr=0.0.0.0:8080 fd=9
I, [2017-05-21T16:26:53.009448 #46056]  INFO -- : [before_fork] worker=0 pid=46056
I, [2017-05-21T16:26:53.010518 #46056]  INFO -- : [before_fork] worker=1 pid=46056
I, [2017-05-21T16:26:53.012068 #46056]  INFO -- : master process ready
I, [2017-05-21T16:26:53.012010 #46089]  INFO -- : [after_fork] worker=0 pid=46089
I, [2017-05-21T16:26:53.012433 #46089]  INFO -- : Refreshing Gem list
I, [2017-05-21T16:26:53.013825 #46090]  INFO -- : [after_fork] worker=1 pid=46090
I, [2017-05-21T16:26:53.014232 #46090]  INFO -- : Refreshing Gem list
I, [2017-05-21T16:26:53.121563 #46090]  INFO -- : [after_worker_ready] worker=1 pid=46090
I, [2017-05-21T16:26:53.125676 #46089]  INFO -- : [after_worker_ready] worker=0 pid=46089

マスタープロセスに SIGINT

  1. after_worker_exit

すぐにワーカープロセスを殺して after_worker_exit が呼ばれ、その後マスタープロセスも死ぬ。 after_worker_exit はマスタープロセスで呼ばれるので pid は 46056 となる。

I, [2017-05-21T16:29:04.026190 #46056]  INFO -- : [after_worker_exit] worker=1 pid=46056 status=pid 46090 exit 0
I, [2017-05-21T16:29:04.026515 #46056]  INFO -- : [after_worker_exit] worker=0 pid=46056 status=pid 46089 exit 0
I, [2017-05-21T16:29:04.026615 #46056]  INFO -- : master complete

ワーカープロセスに SIGINT

  1. after_worker_exit
  2. before_fork
  3. after_fork
  4. after_worker_ready

kill -s INT 46651 を実行した後のログ。 マスタープロセスの pid は 46622 なので、after_worker_exit はマスタープロセス内で呼ばれていることがわかる。 そのあと、新しいワーカープロセスを作成するので起動時と同じログになる(ただしワーカーは 1 つ)。

I, [2017-05-21T16:35:10.707660 #46622]  INFO -- : [after_worker_exit] worker=1 pid=46622 status=pid 46651 exit 0
I, [2017-05-21T16:35:10.707795 #46622]  INFO -- : [before_fork] worker=1 pid=46622
I, [2017-05-21T16:35:10.709798 #46653]  INFO -- : [after_fork] worker=1 pid=46653
I, [2017-05-21T16:35:10.710304 #46653]  INFO -- : Refreshing Gem list
I, [2017-05-21T16:35:10.828349 #46653]  INFO -- : [after_worker_ready] worker=1 pid=46653

マスターに SIGUSER2 (graceful restart)

  1. before_exec
  2. before_fork
  3. after_fork
  4. after_worker_ready

ログとしては Kernel#exec した (1行目の executing) 後に before_exec がきているが、これはコードでなぜかそういう順番になっているだけで本当の順番は逆 ( https://github.com/defunkt/unicorn/blob/v5.3.0/lib/unicorn/http_server.rb#L455 )。 あとは起動時のときと同じログになる。

I, [2017-05-21T15:45:51.255441 #43971]  INFO -- : executing ["/Users/yuta-iwama/src/github.com/ganmacs/playground/ruby/unicorn-hook/vendor/bundle/ruby/2.4.0/bin/unicorn", "-c", "unicorn.ru", {9=>#<Unicorn::TCPSrv:fd 9>}] (in /Users/yuta-iwama/src/github.com/ganmacs/playground/ruby/unicorn-hook)
I, [2017-05-21T15:45:51.255758 #43971]  INFO -- : [before_exec]
I, [2017-05-21T15:45:51.497657 #43971]  INFO -- : inherited addr=0.0.0.0:8080 fd=9
I, [2017-05-21T15:45:51.497814 #43971]  INFO -- : [before_fork] worker=#<Unicorn::Worker:0x007ffc888f3018>
I, [2017-05-21T15:45:51.498835 #43971]  INFO -- : [before_fork] worker=#<Unicorn::Worker:0x007ffc888f2ca8>
I, [2017-05-21T15:45:51.499294 #43971]  INFO -- : master process ready
I, [2017-05-21T15:45:51.500538 #43982]  INFO -- : [after_fork] worker=#<Unicorn::Worker:0x007ffc888f3018>
I, [2017-05-21T15:45:51.500818 #43983]  INFO -- : [after_fork] worker=#<Unicorn::Worker:0x007ffc888f2ca8>
I, [2017-05-21T15:45:51.501025 #43982]  INFO -- : Refreshing Gem list
I, [2017-05-21T15:45:51.501152 #43983]  INFO -- : Refreshing Gem list
I, [2017-05-21T15:45:51.602740 #43983]  INFO -- : [after_worker_ready] worker=#<Unicorn::Worker:0x007ffc888f2ca8>
I, [2017-05-21T15:45:51.604692 #43982]  INFO -- : [after_worker_ready] worker=#<Unicorn::Worker:0x007ffc888f3018>

その他

自分は before_exec と before_fork の呼ばれるタイミングの違いがよくわかっていなかった。 before_exec は restart などをする時に unicorn コマンドを内部で叩く前、before_fork は ワーカープロセスを fork する直前呼ばれるみたい。

簡単な UDP client / server in golang

UDP server と client の覚書。Ping を送って Pong と帰ってくるだけの雑なやつ。 特に難しいところもなくこの辺 みながらやると簡単にできる。

server 側

サーバ側では 127.0.0.1:8080 をlisten するようにしている。 何か送られてきたらそれを読んで、送ってきた相手に Pong と返すサーバ。

package main

import (
    "log"
    "net"
    "os"
)

func main() {
    udpAddr := &net.UDPAddr{
        IP:   net.ParseIP("127.0.0.1"),
        Port: 8080,
    }
    updLn, err := net.ListenUDP("udp", udpAddr)

    if err != nil {
        log.Fatalln(err)
        os.Exit(1)
    }

    buf := make([]byte, 1024)
    log.Println("Starting udp server...")

    for {
        n, addr, err := updLn.ReadFromUDP(buf)
        if err != nil {
            log.Fatalln(err)
            os.Exit(1)
        }

        go func() {
            log.Printf("Reciving data: %s from %s", string(buf[:n]), addr.String())

            log.Printf("Sending data..")
            updLn.WriteTo([]byte("Pong"), addr)
            log.Printf("Complete Sending data..")
        }()
    }
}

client 側

サーバに対して Pong と送信して、その返信をlogに出力する。

package main

import (
    "log"
    "net"
    "os"
)

func main() {
    conn, err := net.Dial("udp", "127.0.0.1:8080")
    if err != nil {
        log.Fatalln(err)
        os.Exit(1)
    }
    defer conn.Close()

    n, err := conn.Write([]byte("Ping"))
    if err != nil {
        log.Fatalln(err)
        os.Exit(1)
    }

    if len(buf) != n {
        log.Printf("data size is %d, but sent data size is %d", len(buf), n)
    }

    recvBuf := make([]byte, 1024)

    n, err = conn.Read(recvBuf)
    if err != nil {
        log.Fatalln(err)
        os.Exit(1)
    }

    log.Printf("Received data: %s", string(recvBuf[:n]))
}

参考

net - The Go Programming Language

Software Transactional Memory の雰囲気メモ

wikipedia の ( Software Transactional Memoryのページ ) を眺めたまとめのメモと、toy 実装をしてみた話。

github.com

stm の rust 実装 見ながら書いてたんだけど、そのまま rust で書くのもあれなのでなんとなく go で書いてみた。 ちなみに int にしか対応していない完全なトイ実装。

STM

並列プログラミングをする際に、共有メモリへのアクセスを制御するための機構のことで、ロックを使用した排他制御機構の代替として使用できる。 例えば、ロックを使用してスレッドセーフなプログラムを実現するには以下のようになる。

v := 1
m := new(sync.Mutex)

go func() {
    m.Lock()
    v += 1
    m.Unlock()
}()

go func() {
    m.Lock()
    v += 1
    m.Unlock()
}()


...

これを STM で実現すると以下のようになる(自分で実装した STM だけど大体ほかの実装もこんなシンタックスになってるはず)。

tvar := istm.NewTVar(1)

go func() {
    tv, _ := istm.Atomically(func(t *istm.Transaction) (int, error) {
        v := tvar.Read(t)
        tvar.Write(t, v+1)
        return v, nil
    })

    fmt.Printf("t1: %v\n", tv)
}()

go func() {
    tv, _ := istm.Atomically(func(t *istm.Transaction) (int, error) {
        v := tvar.Read(t)
        tvar.Write(t, v+1)
        return v, nil
    })

    fmt.Printf("t2: %v\n", tv)
}()

...

Atomically のブロック内は必ずアトミックであることが保証されているので、STM の方ではロックを(シンタックス上は)使用せずにスレッドセーフなプログラムを書くことができる。 ロックを使用する問題点として、デッドロックやライブロックが発生するコードが書けてしまうこと、ロックし忘れてレースコンディションが発生する恐れがあること、 優先順位の逆転 が発生することなどがある。 STM はロックに比べて高いレベルの抽象化を提供しているので、そのあたり問題をいい感じに回避している。 また、STM の利点としてスケールしやすいという点もある。 ロックを用いた場合、大雑把にロックを取るとそこがボトルネックとなりスケールしないプログラムになってしまう(逆に細粒度にロックを取ろうとするとバグを生み出しやすいという問題もある)。 しかし、STM は変更をコミットする瞬間だけロックを取ることで大きなロックを取る必要がなくなりスケールするプログラムを意識しなくても書くことができる。 問題点もあって、コミットにする際に一貫性が取れていないとすべての処理をアボートして、再度処理を初めからすべて実行し直すためオーバヘッドが大きいというのがある。

実装

github.com

DB で言うところの楽観的ロックみたいな実装をしている。 アトミックなブロック中 (istm.Atomicallyに渡されたクロージャ) での変更はトランザクションでログとして持っておき、ブロックが終了するときにまとめて変更をコミットする(このへん)。 コミット処理の際に一貫性を検査し、他のトランザクションと衝突していたら自分側の変更をすべて破棄する。これを成功するまで何度も再実行する。

参考

バリア同期の実装

Java Concurrency in Practice の 5.5.4 Barrier の話

Java Concurrency in Practice

Java Concurrency in Practice

  • 作者: Brian Goetz,Tim Peierls,Joshua Bloch,Joseph Bowbeer,David Holmes,Doug Lea
  • 出版社/メーカー: Addison-Wesley Professional
  • 発売日: 2006/05/09
  • メディア: ペーパーバック
  • 購入: 7人 クリック: 14回
  • この商品を含むブログ (22件) を見る

バリア同期とは

バリア同期とは、各スレッドの実行の進行具合を合わせるために使用する同期機構のこと。 複数スレッドにまたがった依存関係のある計算(処理)の完了を待って、処理を実行する際に使用する。 たとえば、時間のかかる計算 A と B の処理を C で使用する時、C を実行する前に A と B の終了をバリア同期で待ってから実行するというような使用方法がある。

実装

実際に Ruby で表現すると以下のようになる。 イニシャライズ時に渡された数(n)になるまでそのスレッドを待ち状態にしておき、待ち状態数が n になったら @cv.broadcast をして待ち状態のスレッドを起こすということをしている。

class CyclicBarrier
  def initialize(n)
    @n = n
    @wait_count = 0
    @mutex = Mutex.new
    @cv = ConditionVariable.new
  end

  def wait
    @mutex.synchronize do
      @wait_count += 1

      if @wait_count == @n
        @cv.broadcast
      else
        @cv.wait(@mutex)
      end
    end
  end
end

使用部分は以下のようになる。 Runner#reduce_process が別スレッドで走らせた Runner#process の計算結果を使用するという例。 実際は sleep(i) の部分に重たい処理が来ることになる。 実行結果を見るとわかるんだけど、 Runner#reduce_processRunner#process の実行の終了を待ってから実行されていることがわかる。

class Runner
  def initialize(n)
    @n = n
    @cb = CyclicBarrier.new(n)
    @queue = Queue.new
  end

  def run
    (@n - 1).times.map do |i|
      Thread.start(i + 1) { |j| process(j) }
    end

    reduce_process
  end

  private

  def reduce_process
    puts "[Wait] reduce process"
    @cb.wait
    puts "[Start] reduce process"

    result = 0
    until @queue.empty?
      result += @queue.pop
    end

    puts "[Done] result is #{ans}"
  end

  def process(i)
    puts "[Start] process #{i}"

    sleep(i)
    @queue << i

    puts "[Wait] process #{i}"
    @cb.wait
  end
end

Runner.new(4).run

実行結果

$ ruby runner.rb
[Start] process 2
[Start] process 3
[Wait] reduce process
[Start] process 1
[Wait] process 1
[Wait] process 2
[Wait] process 3
[Start] reduce process
[Done] result is 6

コードは全体はこちら↓↓↓

github.com

Lock Striping についてとその実装

Java Concurrency in Practice の 5.2.1. ConcurrentHashMap にて Lock Striping というロックの方法が出てくる。 Lock Striping の説明と実装、および簡単にパフォーマンスを測ったのでそのメモ。

Java Concurrency in Practice

Java Concurrency in Practice

  • 作者: Brian Goetz,Tim Peierls,Joshua Bloch,Joseph Bowbeer,David Holmes,Doug Lea
  • 出版社/メーカー: Addison-Wesley Professional
  • 発売日: 2006/05/09
  • メディア: ペーパーバック
  • 購入: 7人 クリック: 14回
  • この商品を含むブログ (22件) を見る

Lock Striping とは

Lock Striping とは Array や HashMap などのオブジェクトに対して、複数スレッドから書き/読み込む場合に、そのオブジェクト全体のロックを取るのではなく部分的にロックを取ること。 例えば、Array オブジェクトの 0 から 9 は Mutex1、10 から 19 は Mutex2 が管理するようにすること。 このようにすることで、依存関係のない操作を複数スレッドで実行する際 (上の例なら Arrayオブジェクトの index 1 と 11 への書き込みなど) に互いの操作を待たなくて良くなり並列度が上がり、パフォーマンスの向上が見込める。

1 つの Mutex オブジェクトでロックする際の問題

Array や Map などのオブジェクトに対して、複数スレッドから書き込み/読み込みがある時にロックが必要になる。 同じキーに対して複数のスレッドから書き込んでしまった場合にレースコンディションが発生するからである。 1 つのオブジェクトに対して 1 つの Mutex オブジェクトを使用すると、ロックの粒度が適切でない時がある。 たとえば、以下のように Map オブジェクト dict に Thread 1 (go の場合は goroutine) では "foo" というキーに 値 1 をセットし、Thread 2 では "bar" というキーに 2 をセットするとする。 この時 Thread 1 と Thread 2 の操作は互いに独立しており、並列に実行してもレースコンディションが発生しないためそれぞれの Thread を並列に実行できた方がパフォーマンスがよくなる。 しかし、dict に対して 1 つの Mutex オブジェクトを使用すると、互いに独立している操作であっても dict への操作は並列に実行することができなくなる。

dict := new(map[string]map)

// In Thread 1
dict["foo"] = 1


// In Thread 2
dict["bar"] = 2

実装

Lock Striping な辞書の実装。 キーは文字列、値は数字のみにしている。 hash 関数に FNV prime を使っている。 Hash値(とbucket sizeの剰余)ごとに bucket をもって、それごとにロックを管理している。 その為、ちがう bucket であれば並列に実行可能になる。

github.com

パフォーマンス

スペック

計測したコード

github.com

結果

synchronizedDict~ が dict に対して 1 つのロックを取っている方で、 stripedDict~ が Striping Lock をしている方。 最後の数 (たとえば、synchronizedDictWriting11) は Write する Worker の数を表している (1,3,5 でためした)。 だいたい予想通りの結果になっているようにみえる。 write する worker が 1 つのとき (並列に実行していない時) は hash 値の計算などのオーバヘッドがあるから若干 stripedDict のほうが遅くなっている。 それ以外はロックが分散されて、並列度が上がったためパフォーマンスがよくなっている。

Elapsed Time in synchronizedDictWriting1: 43.632465ms
Elapsed Time in stripedDictWriting1: 35.341727ms

Elapsed Time in synchronizedDictWriting2: 62.054392ms
Elapsed Time in stripedDictWriting2: 48.307852ms

Elapsed Time in synchronizedDictWriting5: 172.436661ms
Elapsed Time in stripedDictWriting5: 89.864355ms

リポジトリごとに user.name と user.email を変更する

会社で Github Enterprise を使っているので、github.com の方の user.nameuser.email でコミットしてしまわないようにリポジトリごとに設定を変更できるようにした。

gitconfig にあった user.nameuser.email をけして、useConfigOnlytrue にした。 こうすることで、リポジトリごとに user.nameuser.email を設定しないと *** Please tell me who you are. などといわれてコミットに失敗するようになり、誤ったユーザでのコミットを防げる。 変更前

[user]
    name = ganmacs
    email = ganmacs@gmail.com

変更後

[user]
    useConfigOnly = true

あとは、シェルに以下のような関数を定義しておけば良さそう。

function company() {
    git config user.name "namae"
    echo "Set user.name as $(git config --get user.name)"
    git config user.email "foo@bar.com"
    echo "Set user.email as $(git config --get user.email)"
}

function private() {
    git config user.name "ganmacs"
    echo "Set user.name as $(git config --get user.name)"
    git config user.email "ganmacs@gmail.com"
    echo "Set user.email as $(git config --get user.email)"
}

その他

pre-commit hook などでディレクトリごとに自動で設定する方法も考えたが、大げさだなと思ってこの方法に落ち着けた。 もっといい方法があれば教えて欲しい。

Ruby の Monitor

Ruby には Monitor というクラスがあるんだけど、よく見るわりに Mutex との違いをよくわかっていなかった。 たまたま Java Concurrency in Practice を読んでいたら 2.3.2 Reentrancy で同じような概念の話が出てきて、少しわかった気がするのでそのメモ。

Java Concurrency in Practice

Java Concurrency in Practice

  • 作者: Brian Goetz,Tim Peierls,Joshua Bloch,Joseph Bowbeer,David Holmes,Doug Lea
  • 出版社/メーカー: Addison-Wesley Professional
  • 発売日: 2006/05/09
  • メディア: ペーパーバック
  • 購入: 7人 クリック: 14回
  • この商品を含むブログ (22件) を見る

Monitor とは

Ruby の Monitor は何度も lock 出来る Mutex とドキュメントに書いてある。 特になんの意味もない例だけど以下のようにロック中にさらにロックできる(もちろん同じスレッド内で)。 もしこのコードを Mutex を使用して m = Mutex.new とした場合、2回めの m のロックする部分(m.synchronize)でデッドロックが発生するが Monitor を使用すると発生しない。

m = Monitor.new
m.synchronize do
  m.synchronize do
    # your code
  end
end

使い所

たぶん使い所は、すでにあるコードを組みわせてアトミックな処理をするところ。 下のようなコードのような感じ。もともと Nanika クラスに sugoi というスレッドセーフなメソッドがあったとする。 この時、メソッド内でアトミックに sugoi を2回呼び出す ultra_sugoi というメソッドを新たに追加しようとしたときに Monitor が使えそう。 Monitor ではなく Mutex を使用すると、ultra_sugoi 内でロックを取ってから sugoi のメッソド呼び出しが発生して、 sugoi 内で またロックを取りに行きデッドロックが発生する。 しかし、Montior を使用すると何度でもロックが成功するので、 sugoi をアトミックに2回呼び出せる。

class Nanika
  def initialize
    @monitor = Monitor.new
    @sugoi_value = 0
  end

  def sugoi
    @monitor.synchronize do
      @sugoi_value += 1
    end
  end

  def ultra_sugoi
    @monitor.synchronize do
      sugoi
      sugoi
    end
  end
end

nanika = Nanika.new

[*1..2].map {
  Thread.new do
    nanika.ultra_sugoi
  end
}.each(&:join)

その他

javaの本では reentrant という言葉で説明されていた。

reentrancy とは

Reentrancy means that locks are acquired on a per-thread rather than per-invocation basis.

リエントラント性とは、呼び出し単位ではなくスレッド単位でロックが取得される。

Lock with reentrant

同じスレッドであれば、何度でもロックを取得できる ロックしたスレッドとそのスレッド内でのロック保持数を記録しておく。Ruby の Monitor。

Lock without reentrant

呼び出し(処理)単位でロックを取っているので、同じスレッドであっても再度ロックを取れない。 単純にロックされているかどうかだけを見る。Ruby の Mutex。

RubyはGILが

そうね