17.10 メールボックス
メールボックスはプロセス同期と通信のための、高度で柔軟な構造物です。メッセージの送受信が可能です。ここで メッセージ とは任意のオブジェクトを指します。シグナルのタイムアウトに使う TIMEOUT という特別のメッセージがあります。
case object TIMEOUT
メールボックスは次のシグネチャを実装します。
class MailBox {
def send(msg: Any)
def receive[A](f: PartialFunction[Any, A]): A
def receiveWithin[A](msec: Long)(f: PartialFunction[Any, A]): A
}
メールボックスの状態はメッセージのマルチセットから成ります。メッセージは send メソッドで メールボックスへ追加されます。メッセージは receive メソッドで メールボックスから取り除かれ、メッセージプロセッサ f の引数に渡されます。 f はメッセージから何らかの結果型への部分関数です。通常、この関数はパターンマッチ式で実装されます。 receive メソッドは、そのメッセージプロセッサが定義されたメールボックスにメッセージが届くまで、ブロックされます。マッチしたメッセージはメールボックスから取り除かれ、ブロックされていたスレッドは再スタートしてメッセージプロセッサをそのメッセージに適用します。送られたメッセージとレシーバの双方とも時間順に並べられます。レシーバ r は、マッチしたメッセージ m へ適用されますが、それは、各コンポーネントを時間順に並べた個別の順序中で、m, r に先立つ {メッセージ,レシーバ}ペアが他にないときに限ります。
メールボックスの使い方の簡単な例として one-place バッファを考えてみましょう。
class OnePlaceBuffer {
private val m = new MailBox // An internal mailbox
private case class Empty, Full(x: Int) // Types of messages we deal with
m send Empty // Initialization
def write(x: Int)
{ m receive { case Empty => m send Full(x) } }
def read: Int =
m receive { case Full(x) => m send Empty; x }
}
メールボックスクラスは、次のようにも実装できます。
class MailBox {
private abstract class Receiver extends Signal {
def isDefined(msg: Any): Boolean
var msg = null
}
テストメソッド isDefined を備えたレシーバ用の内部クラスを定義し、与えられたメッセージに対してレシーバが定義されているかどうかを示すようにします。レシーバは Signal クラスから、レシーバスレッドを起動するのに使われる nofify メソッドを継承します。レシーバスレッドが起動されると、適用すべきメッセージは Reciever の msg 変数に保存されます。
private val sent = new LinkedList[Any] private var lastSent = sent private val receivers = new LinkedList[Receiver] private var lastReceiver = receivers
メールボックスクラスは2つの連結リストを保持していて、一つは、送信されたけれど取り出されていないメッセージ用で、もう一つは、ウェイトしているレシーバ用のものです。
def send(msg: Any) = synchronized {
var r = receivers, r1 = r.next
while (r1 != null && !r1.elem.isDefined(msg)) {
r = r1; r1 = r1.next
}
if (r1 != null) {
r.next = r1.next; r1.elem.msg = msg; r1.elem.notify
} else {
lastSent = insert(lastSent, msg)
}
}
send メソッドは最初に、ウェイトしているレシーバがその送信されたメッセージに適用可能かどうかチェックします。もしそうなら、レシーバに通知されます。そうでなければ、メッセージは送信されたメッセージの連結リストに追加されます。
def receive[A](f: PartialFunction[Any, A]): A = {
val msg: Any = synchronized {
var s = sent, s1 = s.next
while (s1 != null && !f.isDefinedAt(s1.elem)) {
s = s1; s1 = s1.next
}
if (s1 != null) {
s.next = s1.next; s1.elem
} else {
val r = insert(lastReceiver, new Receiver {
def isDefined(msg: Any) = f.isDefinedAt(msg)
})
lastReceiver = r
r.elem.wait()
r.elem.msg
}
}
f(msg)
}
recieve メソッドは最初に、メッセージプロセッサ関数 f が、既に送信されたけれどもまだ取り出されていないメッセージに適用可能かどうかチェックします。もしそうなら、スレッドは続けてすぐに f をそのメッセージに適用します。そうでなければ、新しいレシーバが作られてレシーバリストへリンクされ、スレッドはそのレシーバ上の通知を待ちます。スレッドは再び起動されると、f をそのレシーバに保存されたメッセージに適用します。 連結リストの insert メソッドは次のように定義されています。
def insert(l: LinkedList[A], x: A): LinkedList[A] = {
l.next = new LinkedList[A]
l.next.elem = x
l.next.next = l.next
l
}
メールボックスクラスは、指定された最大時間だけブロックする receiveWithin メソッドも提供しています。メッセージを指定された時間(ミリ秒で与えられる)以内に受信しなければ、メッセージプロセッサ引数 f は TIMEOUT という特別のメッセージでアンブロックされます。 recieveWithin の実装は receive とほとんど同じです。
def receiveWithin[A](msec: Long)(f: PartialFunction[Any, A]): A = {
val msg: Any = synchronized {
var s = sent, s1 = s.next
while (s1 != null && !f.isDefinedAt(s1.elem)) {
s = s1; s1 = s1.next
}
if (s1 != null) {
s.next = s1.next; s1.elem
} else {
val r = insert(lastReceiver, new Receiver {
def isDefined(msg: Any) = f.isDefinedAt(msg)
})
lastReceiver = r
r.elem.wait(msec)
if (r.elem.msg == null) r.elem.msg = TIMEOUT
r.elem.msg
}
}
f(msg)
}
} // end MailBox
違いは、制限時間つきの wait コールと、その後の文だけです。
このwikiの更新情報RSS