这段时间系统的学习了Scala,编程思想上可谓收获不少,想从里面挑些值得写的东西分享给大家,Scala的Actor
可谓这门语言的一个亮点,函数式编程核心价值在于多核编程,所以就打算说说这个Actor,总结一下学习心得。先很俗
套的简单谈谈概念,然后会拿四个例子做补充。主要内容包括基本原理,设计思想,单机环境并发测试。
Actor是一种基于事件的轻量级线程,在以前的并发模型中,我们需要关注共享的数据结构,而使用Actor则需要
关注操作数据的代码结构,因为减少了数据的共享。Actor的主要能力来源于消息传递,而不是采用阻塞调用的处理形式。
如果创建直接或间接扩展 Actor的类,要确保对对象的所有调用都通过消息传递进行。
我把Actor的设计思想归为两类,使用目的归为两类。从设计思想上来说Scala推荐的是以消息传递为核心的设计
思想,由于Scala可以无缝使用Java类库,所以也可以采用以共享数据为核心的设计,当然也可以写出混合式设计风格的
。使用目的主要有两种,一种是Scala提供API给Java调用,另一种就是Scala自给自足。举三个例子,例子很简单,是一
个累加器。
1、以消息传递为核心的设计:使用Actor的actor方法,使用不可变对象,不考虑数据共享问题,以消息传递为设计核心。
1 import actors._, Actor._ 2 /* 3 * Author:ShiYang 4 * Blog: http://shiyangxt.cnblogs.com 5 * */ 6 object SendMessageStyle { 7 8 def main(args: Array[String]): Unit = { 9 val caller = self 10 val accumulator = actor { 11 var continue = true 12 var sum = 0 13 loopWhile( continue ) { 14 reactWithin( 500 ) { 15 case number: Int => sum += number 16 case TIMEOUT => 17 continue = false 18 caller ! sum 19 } 20 } 21 } 22 accumulator ! 1 23 accumulator ! 2 24 accumulator ! 3 25 receiveWithin( 1000 ) { 26 case result => println( " Total is " + result) 27 } 28 } 29 }
2、以共享数据为核心的设计:构建由Actor继承共享数据操作类,以共享数据为核心。
1 import actors._, Actor._ 2 3 /* 4 * Author:ShiYang 5 * Blog: http://shiyangxt.cnblogs.com 6 * */ 7 object SharedDataStyle { 8 case class Add(number: Int) 9 case class GetResult(sender: Actor) 10 11 class AddActor extends Actor { 12 override def act(): Unit = process( 0 ) 13 def process(value: Int): Unit = { 14 reactWithin( 500 ) { 15 case Add(number) => process(value + number) 16 case GetResult(a) => a ! value; process(value) 17 case _ => process(value) 18 } 19 } 20 } 21 22 def main(args: Array[String]): Unit = { 23 val addActor = new AddActor 24 addActor.start() 25 addActor ! Add( 1 ) 26 addActor ! Add( 2 ) 27 addActor ! Add( 3 ) 28 addActor ! GetResult(self) 29 receiveWithin( 1000 ) { 30 case result => println( " Total is " + result) 31 } 32 } 33 }
3、以API形式提供给Java程序使用:由于Java不能直接向Actor发消息,所以需要对Scala的!()方法进行封装。
1 import actors._, Actor._ 2 /* 3 * Author:ShiYang 4 * Blog: http://shiyangxt.cnblogs.com 5 * */ 6 object ForJavaStyle { 7 case class Add(number: Int) 8 case class GetResult(sender: Actor) 9 private class AddActor extends Actor { 10 override def act(): Unit = process( 0 ) 11 def process(value: Int): Unit = { 12 reactWithin( 500 ) { 13 case Add(number) => process(value + number) 14 case GetResult(a) => a ! value; process(value) 15 case _ => process(value) 16 } 17 } 18 } 19 private val addActor = new AddActor 20 addActor.start() 21 private def add(sender: Actor, num: Int): Unit = { 22 sender ! Add(num) 23 } 24 private def getResult(sender: Actor): Int = { 25 sender ! GetResult(self) 26 receiveWithin( 1000 ) { 27 case result: Int => result 28 } 29 } 30 def addForJava(num: Int): Unit = { 31 add(addActor, num) 32 } 33 def getResultForJava(): Int = { 34 getResult(addActor) 35 } 36 } 1 /* 2 * Author:ShiYang 3 * Blog: http://shiyangxt.cnblogs.com 4 * */ 5 public class GetFromScala { 6 7 public static void main(String[] args) { 8 ForJavaStyle$.MODULE$.addForJava( 1 ); 9 ForJavaStyle$.MODULE$.addForJava( 2 ); 10 ForJavaStyle$.MODULE$.addForJava( 3 ); 11 System.out.println( " Total is " 12 + ForJavaStyle$.MODULE$.getResultForJava()); 13 } 14 } 通过上面的例子可见Scala对Java语言有非常大的补充,提高了生产力。为Java提供了轻松实现多核并行编程的能
力。为了进一步测试Actor的并发性能,于是做了一个简单的单机环境并发测试。程序是构建一个Actor动态有序数组,
并发创建N个Actor对象,为了证明这些对象全都可用,顺序从数组的第一个Actor发消息到最后一个Actor,只有当一个
Actor接收到前一个Actor发送的消息后,才向后一个Actor发送消息。当最后一个数组元素接收到消息后,再把消息从数组
尾部用同样处理过程逆序发送到数组头部。这个消息发送过程不是并发处理,是顺序处理。这里只是为了证明这些对象全都
可用。如果为了测试并发处理,可以修改程序,让每个数组元素给后一位数组元素发消息。这样就会看到输出混乱的发送信
息,因为并发是无序的。
测试环境:双核4G内存,Windows XP,Sun JVM1.6,单机环境,Scala版本2.9.0.1
测试结果:当使用Receive方法接收消息时,由于Receive会在结束任务前一直持有线程,而Scala在后台默认只给Receive
方法启动256个线程,我的程序又是顺序的发消息,而且不是临时接收器(只处理一次消息),所以Receive在这种情况下,
只有255个并发。React接收器由于不需要长期持有线程,空闲即释放线程。所以React在本程序中可以跑20w的并发,如果
简单优化一下JVM,就可以达到100w的并发量。默认React接收器后台会调用4个线程组成的线程池。如果修改程序让每个数
组元素给后一位数组元素并发的发消息,那么在不阻塞线程的情况下,Receive方法也可以达到和React一样的并发量。因为
这个测试程序是顺序发送消息,所以就没有设置超时,如果是并发环境,建议加上超时,避免线程阻塞。
1 import actors._, Actor._, java.util._ 2 /* 3 * Author:ShiYang 4 * Blog: http://shiyangxt.cnblogs.com 5 * */ 6 object ConcurrentTest { 7 8 val actors = new ArrayList[Actor] 9 val length = 1000000 10 var startTime = System.nanoTime 11 12 def main(args: Array[String]): Unit = { 13 for (i <- 0 to length) 14 actors.add(actor { 15 info( " react: " + i + " actor created " ) 16 reactMessage 17 }) 18 actors.get( 0 ) ! ( 0 , 0 ) 19 } 20 21 def info(msg: String) = println(msg + " received by " + 22 Thread.currentThread) 23 24 def receiveMessage { 25 var continue = true 26 while ( continue ) { 27 receive { 28 case (id: Int, direction: Int) => 29 sendMessage(id: Int, direction: Int) 30 case " finish " => 31 continue = false 32 val endTime = System.nanoTime 33 println( " Finish, spend time: " + 34 (endTime - startTime) / 1000000000.0 + " secs " ) 35 case _ => println( " input error " ) 36 } 37 } 38 } 39 40 def reactMessage { 41 var continue = true 42 loopWhile( continue ) { 43 react { 44 case (id: Int, direction: Int) => 45 sendMessage(id: Int, direction: Int) 46 case " finish " => 47 continue = false 48 val endTime = System.nanoTime 49 println( " Finish, spend time: " + 50 (endTime - startTime) / 1000000000.0 + " secs " ) 51 case _ => println( " input error " ) 52 } 53 } 54 } 55 56 // direction=0->sendLatter;direction=1->sendFormer 57 def sendMessage(id: Int, direction: Int) { 58 if (direction == 0 && id != length) { 59 info( " Actor " + id + " send message to the Actor " + (id + 1 )) 60 actors.get(id + 1 ) ! (id + 1 , 0 ) 61 } else if (id != 0 && direction == 1 ) { 62 info( " Actor " + id + " send message to the Actor " + (id - 1 )) 63 actors.get(id - 1 ) ! (id - 1 , 1 ) 64 } else if (direction == 0 && id == length) { 65 actors.get(length) ! (length, 1 ) 66 } else if (id == 0 && direction == 1 ) { 67 actors.get( 0 ) ! " finish " 68 } 69 } 70 }
前些天看到Scala正在努力支持.net平台,不过我觉得.net平台有F#,所以对于.net程序员来说Scala提供不了什么
附加价值。如果非要找出来这样做的优点的话,就是Scala可以同时支持两个主流平台,野心不小。如果看后有所收获,请
推荐让更多人看到,算我为Scala的推广尽了点绵薄之力。行文仓促,如果有不对的地方,欢迎指正。