UDPでメッセージを送り合う時に一つにパケットに他のパケットを相乗りさせる

SWIM ではそれぞれのノード間の変化(メンバーの追加または離脱)を障害検知のために送り合っている ping / ack メッセージに相乗り (piggyback) して送ることでより強固に効率よく他のノードに変化を知らせている。 SWIM について詳しくは論文1を読むか日本語の記事2があるのでそっちをアレしてください。

SWIM では ping メッセージをランダムに選んだノードに一定間隔で送っている。 この ping メッセージに対して、join メッセージという「新たなノードを追加した時に、他のノードに送るメッセージ」を相乗りさせて送る部分の実装の話 (実際にはどんなメッセージでもいい)。 SWIM に若干の変更を加えた実装した memberlist という surf の内部で使用されているライブラリのコードみて書いている。 ただ、memberlist 自体は SWIM に若干の手を加えているし、複雑なので自分で相乗りする部分だけを書いたのが以下のリポジトリにあり、それを元に説明は書いている。

github.com

まずは ping と join それぞれのバイト単位での表現の仕方。 ping のような一つのメッセージを送る場合は、1 バイト目にどのメッセージであるかを、それ以降にそのデータの構造体を msgpack でエンコードしたデータが入っている。

例えば、以下が ping メッセージの中身。 1 バイト目に ping メッセージであることを表す値 (ここでは、0x01) を、それ以降に ping の構造体をエンコードしたものが入る。

+--------+========+
|  0x01  |  ping  |
+--------+========+

次が join メッセージの中身 (0x02 が join メッセージを表す)。

+--------+========+
|  0x02  |  join  |
+--------+========+

ping に相乗りさせるには、新たに compound メッセージというメッセージのタイプを作る。 このメッセージの中身に、含まれているメッセージの数、それぞれのメッセージのサイズの、実際のメッセージを載せることで複数のメッセージを同時に送れる。 以下が実際のメッセージの中身。 1 バイト目に coumpund メッセージであることを表す値 (0x03)、2 バイト目にはいくつのメッセージが詰まっているかを表す値 (0x02、つまり 2 つのメッセージが入っている)、 3 から 4 (5 から 6 も同様)の2バイトを使って 1 つめのメッセージの大きさ、それ以降には実際のメッセージが乗るという感じになっている。

+--------+--------+--------+--------+--------+--------+========+========+
|  0x03  |  0x02  |XXXXXXXX|XXXXXXXX|YYYYYYYY|YYYYYYYY|  ping  |  join  |
+--------+--------+--------+--------+--------+--------+========+========+

go で書くと以下のような関数で実現することができる。 msgs にエンコード済みのメッセージを渡している。

func ComposeCompoundMessage(msgs [][]byte) *bytes.Buffer {
    log.Println("Composing compound message")
    buf := bytes.NewBuffer(nil)

    // message type is 1 byte
    buf.WriteByte(uint8(CompoundMsg))

    // message count is 1 byte
    buf.WriteByte(uint8(len(msgs)))

    for _, msg := range msgs {
        // message size is 2bytes
        binary.Write(buf, binary.BigEndian, uint16(len(msg)))
    }

    for _, msg := range msgs {
        buf.Write(msg)
    }

    return buf
}

まとめ

何が言いたかったのかよくわからなくなったけど、とりあえず書いた。