Scala アクター:簡易チュートリアル
(Scala Actors: A Short Tutorial)
By phaller
Created 2007-05-24, 08:26
はじめに (Introduction)
マルチコアプロセッサの到来で並行プログラミングは不可欠になりました。並行性に関する Scala の主要な構文はアクターです。アクターは基本的に、メッセージ交換で通信する並行プロセスです。アクターは、メッセージ送信に関連するメソッドを起動する、アクティブなオブジェクトの形態と見ることもできます。
Scala アクターライブラリは、同期、非同期メッセージ送信の両方を提供します(前者は、複数の非同期メッセージ交換によって実装されます)。さらに、アクターは、リクエストを非同期に処理するフューチャ(*1)を使って通信しますが、その応答を待つことを可能にする表現(フューチャ)を返します。
このチュートリアルの主な目的は、すぐにコンパイルできて Scala 2.4 以降で実行できる、いくつかの完全なサンプルプログラムをウォークスルーすることです。
最初の例 (First Example)
最初の例は、一連のメッセージを交換して終了する 2 つのアクターから成ります。最初のアクターは "ping" メッセージを 2 番目のアクターに送り、するとそれは "pong" メッセージを返送します。( 各受信 "ping"メッセージ に 1 つの "pong" メッセージ)。
はじめにアクターによって送受信されるメッセージを定義します。この場合、シングルトンオブジェクトが使えます(より先進的なプログラムでは、メッセージをパラメータ化します)。パターンマッチィングを使いたいので、各メッセージは case object (ケースオブジェクト)です:
case object Ping
case object Pong
case object Stop
ping アクターは、pong アクターに Ping メッセージを送ることで交換処理を開始します。Pong メッセージは pong アクターからの応答です。ping アクターは、特定の数の Ping メッセージを送り終えると、pong アクターに Stop メッセージを送ります。
Scala アクターライブラリのすべてのクラス、オブジェクトとトレイトは、scala.actors パッケージにあります。このパッケージから Actor クラスをインポートし、拡張して独自のカスタムアクターを定義するのに使います。また、それは多くの役に立つアクター操作を含むので、Actor オブジェクトのすべてのメンバーをインポートします:
import scala.actors.Actor
import scala.actors.Actor._
アクターは、Actor クラスのサブクラスをインスタンス化して生成できる、普通のオブジェクトです。Actor をサブクラス化し、その抽象 act メソッドを実装することで、ping アクターの振る舞いを定義します:
class Ping(count: int, pong: Actor) extends Actor {
def act() {
var pingsLeft = count - 1
pong ! Ping
while (true) {
receive {
case Pong =>
if (pingsLeft % 1000 == 0)
Console.println("Ping: pong")
if (pingsLeft > 0) {
pong ! Ping
pingsLeft -= 1
} else {
Console.println("Ping: stop")
pong ! Stop
exit()
}
}
}
}
}
送るべき Ping メッセージの数と pong アクターは、コンストラクタの引数として渡されます。無限ループ中の receive メソッドの呼び出しは、Pong メッセージがアクターに送られるまで、アクターをサスペンドします。この場合、メッセージはアクターのメールボックスから取り除かれ、対応する、矢印の右側のアクションが実行されます。
pingsLeft がゼロより大きい場合は、! 送信演算子を使って Ping メッセージを pong アクターへ送り、pingsLeft カウンタを減じます。pingsLeft カウンタがゼロになったら、pong アクターへ Stop メッセージを送り、exit()を呼び出して現在のアクター実行を終了させます。
pong アクター用のクラスも同じように定義できます:
class Pong extends Actor {
def act() {
var pongCount = 0
while (true) {
receive {
case Ping =>
if (pongCount % 1000 == 0)
Console.println("Pong: ping "+pongCount)
sender ! Pong
pongCount = pongCount + 1
case Stop =>
Console.println("Pong: stop")
exit()
}
}
}
}
ここで、注目すべき点が 1 つあります。Ping メッセージを受信して、Pong メッセージを sender アクターに送りますが、それは我々のクラスのどこにも定義されていません! 実は、それは Actor クラスのメソッドです。sender を使えば、現在のアクターが最後に受け取ったメッセージを送ったアクターを参照できます。これにより、メッセージの引数として明示的に送信者を渡さなくても済みます。
あとはアクタークラスを定義すれば、それらを使う Scala アプリケーションの生成準備完了です。:
object pingpong extends Application {
val pong = new Pong
val ping = new Ping(100000, pong)
ping.start
pong.start
}
Java スレッドと同じように、アクターは start メソッドの呼び出しで始めます。
動かしましょう! (Let's run it!)
完全な例は、doc/scala-devel/scala/examples/actors/pingpong.scala 下の Scala ディストリビューションに含まれています。 次は、これをコンパイル、実行する方法です:
$ scalac pingpong.scala
$ scala -cp . examples.actors.pingpong
Pong: ping 0
Ping: pong
Pong: ping 1000
Ping: pong
Pong: ping 2000
...
Ping: stop
Pong: stop
スレッド レスに! (Make it Thread-less!)
アクターはスレッドプール上で実行されます。最初は、4 つのワーカースレッドがあります。もしすべてのワーカースレッドがブロックされて、まだ処理すべきタスクがあれば、スレッドプールは増大します。理想的には、スレッドプールの大きさは計算機の処理系コアの数に一致します。
アクターが、receive のような(あるいは同等の wait)スレッドをブロックする操作を呼び出すと、現在のアクター(self)を実行しているワーカースレッドはブロックされます。これは基本的に、アクターがブロックされるスレッドであることを意味します。たいていの JVM は標準的なハードウェア上では数千スレッド以上は処理できないので、使いたいアクター数への依存は避けたいことでしょう。
スレッド-ブロッキング操作は、新しいメッセージを待つ react (イベントベースの receive の仲間)を使って避けることができます。しかし、支払うべき(通常は少しの)代償があります : react は決して戻ってきません。現実には、メッセージへの応答の終わりに、アクターの計算の残りを含む何らかの関数を呼び出す必要があるということです。while ループ内で react を使ってもうまく機能しないことに注意してください! しかし、ループは一般的ですから、このための特別な、loop 関数形のライブラリサポートがあります。次のように使えます。:
loop {
react {
case A => ...
case B => ...
}
}
react の呼び出しをネストできることに注意してください。それにより、次のようにシーケンスの複数のメッセージを受信できます。:
react {
case A => ...
case B => react { // if we get a B we also want a C
case C => ...
}
}
ping と pong アクターをスレッドレスにするには、たんに while(true)を loop で、receive を react で置き換えれば十分です。例えば、次は pong アクターの修正版 act メソッドです:
def act() {
var pongCount = 0
loop {
react {
case Ping =>
if (pongCount % 1000 == 0)
Console.println("Pong: ping "+pongCount)
sender ! Pong
pongCount = pongCount + 1
case Stop =>
Console.println("Pong: stop")
exit()
}
}
}
2 つめの例 (Second Example)
2 つめ例として、作り出された値のシーケンスを読み出す producers の抽象化を書くつもりです。それは標準的なイテレータインターフェースを提供します。
特定の producers は、抽象 produceValues メソッドを実装することで、定義します。個々の値は、produce メソッドを使って作り出します。クラス producer は両方のメソッドを継承します。例えば、行きがけ順(pre-order)のツリーに含まれる値を作り出す producer は次のように定義できます:
class PreOrder(n: Tree) extends Producer[int] {
def produceValues = traverse(n)
def traverse(n: Tree) {
if (n != null) {
produce(n.elem)
traverse(n.left)
traverse(n.right)
}
}
Producer は、2 つのアクター producer アクターと coordinator アクターを用いて実装されています。次は producer アクターの実装方法を示します。:
abstract class Producer[T] {
protected def produceValues: unit
protected def produce(x: T) {
coordinator ! Some(x)
receive { case Next => }
}
private val producer: Actor = actor {
receive {
case Next =>
produceValues
coordinator ! None
}
}
...
}
producer アクターの定義方法に注目してください。今回は、わざわざ Actor の特別のサブクラスを生成して、その act メソッドを実装しようとはしませんでした。その代わり、たんに actor 関数を使ってインラインでアクターの振る舞いを定義します。多分、これはずっと簡潔です! さらに、actor を使って定義したアクターは自動的に始まるので、start メソッドを起動する必要がありません!
それで、producer はどのように動作するのでしょうか? Next メッセージを受け取ると、(抽象) produceVaules メソッドを実行し、それは今度は、produce メソッドを呼び出します。この結果、値のシーケンスが Some メッセージにラップされて coordinator へ送られます。シーケンスのお終いは None メッセージです。Some と None は Scala の標準 Option クラスの 2 つのケース(訳注:ケースクラス/ケースオブジェクト)です。
coordinatorはクライアントからの要求と producer から来る値を同期させます。次のように実装できます:
private val coordinator: Actor = actor {
loop {
react {
case Next =>
producer ! Next
reply {
receive { case x: Option[_] => x }
}
case Stop => exit('stop)
}
}
}
Next メッセージ用のハンドラ中で、受信 Option 値をある要求側 actor へ返すために、reply を使っていることに注意してください。このことは、次のセクションで説明するつもりです。話を元に戻して...
イテレータインターフェース (The Iterator Interface)
我々は producers を標準的なイテレータとして使えることを望みます。そのために、驚くなかれ、イテレータ返す iterator メソッドを実装します。その hasNext と next メソッドは、仕事を遂行するために coordinator アクターにメッセージを送ります。見てみましょう:
def iterator = new Iterator[T] {
private var current: Any = Undefined
private def lookAhead = {
if (current == Undefined) current = coordinator !? Next
current
}
def hasNext: boolean = lookAhead match {
case Some(x) => true
case None => { coordinator ! Stop; false }
}
def next: T = lookAhead match {
case Some(x) => current = Undefined; x.asInstanceOf[T]
}
}
非公開の lookAhead メソッドを使ってイテレータロジックを実装します。next 値がまだ未探索の場合は、current 変数は単なるプレースホルダーオブジェクトである値 Undefined を持ちます。
private val Undefined = new Object
ちょっと興味を引くことが lookAhead メソッドの中にあります。current 値が Undefined のとき、これは next 値を手に入れなければならないことを意味します。そのために、同期メッセージ送信演算子 !? を使います。これは coordinatorに Next メッセージを送信しますが、しかし、通常の(非同期)メッセージ送信のように戻る代わりに、coordinator からの応答を待ちます。応答は !? の戻り値です。!? を使って送られたメッセージは、reply を使って応答されます。 sender に単純にメッセージを送っても機能しないことに注意してください! それは、!? がメールボックスの代わりに 非公開の reply チャネルからのメッセージ受信を待つからです。これは "true" 応答を、たまたまメールボックスにある古いメッセージに起因する "fake(偽物)" から区別するために必要です。
producers の例は doc/scala-devel/scala/examples/actors/producers.scala 下の Scala ディストリビューションに同じく含まれています。
最終更新:2011年04月14日 21:03