第1行的preStart表示RestartActor正在初始化,注意它的HashCode为7302437。接着,这个Actor遇到了NullPointerException。根据自定义的策略,这将导致它重启,因此,这就有了第3行的preRestart,因为preRestart在正式重启之前调用,因此HashCode还是7302437,表示当前Actor和上一个Actor还是同一个实例。接着,第4~19行打印了异常信息。
第20行进入了preStart()方法,它的HashCode为23269863。这说明系统已经为这个RestartActor生成了一个新的实例,原有的实例因为重启而被回收。新的实例将代替原有实例继续工作。这说明同一个RestartActor在系统的工作始终,未必能保持同一个实例。重启完成后,调用postRestart()方法(第21行)。实际上,Actor重启后的preStart()方法,就是在postRestart()中调用的(Actor父类的postRestart()会调用preStart()方法)。
在经过3次重启后,超过了监督策略中的单位时间内的重试上限。因此,系统不会再进行尝试,而是直接关闭RestartActor。上述输出中第33行就显示了这个过程,在最后一个RestartActor实例上,执行了停止方法。
7.6 选择Actor
在一个ActorSystem中,可能存在大量的Actor。如何才能有效地对大量Actor进行批量的管理和通信呢?Akka为我们提供了一个ActorSelection类,用来批量进行消息发送。限于篇幅有限,这里不再给出完整的代码,示意代码如下:
1 for(int i=0;i<WORDER_COUNT;i++){ 2 workers.add(system.actorOf(Props.create(MyWorker.class,i), "worker_"+i)); 3 } 4 5 ActorSelection selection = getContext().actorSelection("/user/worker_*"); 6 selection.tell(5, getSelf());
上述代码第1~3行,批量生成了大量Actor。接着,我们要给这些worker发送消息,通过actorSelection()方法提供的选择通配符(第5行),可以得到代表所有满足条件的ActorSelection。第6行,通过这个ActorSelection实例,便可以向所有woker Actor发送消息。
7.7 消息收件箱(Inbox)
我们已经知道,所有Actor之间的通信都是通过消息来进行的。这是否意味着我们必须构建一个Actor来控制整个系统呢?答案是否定的,我们并不一定要这么做,Akka框架已经为我们准备了一个叫做“收件箱”的组件,使用收件箱,可以很方便地对Actor进行消息发送和接收,大大方便了应用程序与Actor之间的交互。
下面定义了当前示例中唯一一个Actor:
01 public class MyWorker extends UntypedActor { 02 private final LoggingAdapter log = Logging.getLogger(getContext().system(), this); 03 public static enum Msg { 04 WORKING, DONE,CLOSE; 05 } 06 07 @Override 08 public void onReceive(Object msg) { 09 if (msg == Msg.WORKING) { 10 log.info("I am working"); 11 } 12 if (msg == Msg.DONE) { 13 log.info("Stop working"); 14 }if (msg == Msg.CLOSE) { 15 log.info("I will shutdown"); 16 getSender().tell(Msg.CLOSE, getSelf()); 17 getContext().stop(getSelf()); 18 } else 19 unhandled(msg); 20 } 21 }
上述代码中,MyWorker会根据收到的消息打印自己的工作状态。当接收到CLOSE消息时(第14行),会关闭自己,结束运行。
而在本例中,与这个MyWorker Actor交互的,并不是一个Actor,而是一个邮箱,邮箱的使用很简单:
01 public static void main(String[] args) { 02 ActorSystem system = ActorSystem.create("inboxdemo", ConfigFactory.load("samplehello.conf")); 03 ActorRef worker = system.actorOf(Props.create(MyWorker.class), "worker"); 04 05 final Inbox inbox = Inbox.create(system); 06 inbox.watch(worker); 07 inbox.send(worker, MyWorker.Msg.WORKING); 08 inbox.send(worker, MyWorker.Msg.DONE); 09 inbox.send(worker, MyWorker.Msg.CLOSE); 10 11 while(true){ 12 Object msg = inbox.receive(Duration.create(1, TimeUnit.SECONDS)); 13 if(msg==MyWorker.Msg.CLOSE){ 14 System.out.println("My worker is Closing"); 15 }else if(msg instanceof Terminated){ 16 System.out.println("My worker is dead"); 17 system.shutdown(); 18 break; 19 }else{ 20 System.out.println(msg); 21 } 22 } 23 }
上述代码中,第5行,根据ActorSystem构造了一个与之绑定的邮箱Inbox。接着使用邮箱监视MyWorker(第6行),这样就能在MyWorker停止后得到一个消息通知。第7~9行,通过邮箱向MyWorker发送消息。
在第11~21行,进行消息接收,如果发现MyWorker已经停止工作,则关闭整个ActorSystem(第17行)。
执行上述代码,输出如下(为节省版面,我对输出进行了一些简单的删减):
[INFO] [inboxdemo-akka.actor.default-dispatcher-3] [akka://inboxdemo/user/worker] I am working [INFO] [inboxdemo-akka.actor.default-dispatcher-3] [akka://inboxdemo/user/worker] Stop working [INFO] [inboxdemo-akka.actor.default-dispatcher-3] [akka://inboxdemo/user/worker] I will shutdown My worker is Closing My worker is dead
上述输出的前3行为MyWorker的输出日志,表示MyWorker Actor的工作状态。后两行为主函数main()中对MyWorker消息的处理。
7.8 消息路由
Akka提供了非常灵活的消息发送机制。有时候,我们也许会使用一组Actor而不是一个Actor来提供一项服务。这一组Actor中所有的Actor都是对等的,也就是说你可以找任何一个Actor来为你服务。这种情况下,如何才能快速有效地找到合适的Actor呢?或者说如何调度这些消息,才可以使负载更为均衡地分配在这一组Actor中。
为了解决这个问题,Akka使用一个路由器组件(Router)来封装消息的调度。系统提供了几种实用的消息路由策略,比如,轮询选择Actor进行消息发送,随机消息发送,将消息发送给最为空闲的Actor,甚至是在组内广播消息。
下面就来演示一下消息路由的使用方式:
01 public class WatchActor extends UntypedActor { 02 private final LoggingAdapter log = Logging.getLogger(getContext().system(), this); 03 public Router router; 04 { 05 List<Routee> routees=new ArrayList<Routee>(); 06 for(int i=0;i<5;i++){ 07 ActorRef worker = getContext().actorOf(Props.create(MyWorker.class),"worker_"+i); 08 getContext().watch(worker); 09 routees.add(new ActorRefRoutee(worker)); 10 } 11 router=new Router(new RoundRobinRoutingLogic(),routees); 12 } 13 14 @Override 15 public void onReceive(Object msg) { 16 if(msg instanceof MyWorker.Msg){ 17 router.route(msg, getSender()); 18 }else if (msg instanceof Terminated) { 19 router=router.removeRoutee(((Terminated)msg).actor()); 20 System.out.println(((Terminated)msg).actor().path()+" is closed,routees="+router. routees().size()); 21 if(router.routees().size()==0){ 22 System.out.println("Close system"); 23 RouteMain.flag.send(false); 24 getContext().system().shutdown(); 25 } 26 } else { 27 unhandled(msg); 28 } 29 } 30 }
上述代码中定义了WatchActor。第3行,就是路由器组件Router,在构造Router时,需要指定路由策略和一组被路由的Actor(Routee),如第11行所示。这里使用了RoundRobinRoutingLogic路由策略,也就是对所有的Routee进行轮询消息发送。在本例中,Routee由5个MyWorker Actor构成(第6~10行,MyWorker与上一节中的相同,故不再给出代码)。
当有消息需要传递给这5个MyWorker时,只需要将消息投递给这个Router即可(上述代码第17行)。Router就会根据给定的消息路由策略进行消息投递。当一个MyWorker停止工作时,还可以简单地将其从工作组中移除(第19行)。在这里,如果发现系统中没有可用的Actor,就会直接关闭系统。
主函数比较简单,如下:
01 public class RouteMain { 02 public static Agent<Boolean> flag=Agent.create(true, ExecutionContexts.global()); 03 public static void main(String[] args) throws InterruptedException { 04 ActorSystem system = ActorSystem.create("route", ConfigFactory.load("samplehello.conf")); 05 ActorRef w=system.actorOf(Props.create(WatchActor.class), "watcher"); 06 int i=1; 07 while(flag.get()){ 08 w.tell(MyWorker.Msg.WORKING, ActorRef.noSender()); 09 if(i%10==0)w.tell(MyWorker.Msg.CLOSE, ActorRef.noSender()); 10 i++; 11 Thread.sleep(100); 12 } 13 } 14 }
上述代码向WatchActor发送大量消息,其中夹杂着几条关闭Actor的消息。这会使得MyWorker Actor逐一被关闭,最终程序将退出。
这段程序的部分输出如下(做过适量裁剪):
[INFO][route-akka.actor.default-dispatcher-3] [akka://route/user/watcher/worker_0] I am working [INFO][route-akka.actor.default-dispatcher-3] [akka://route/user/watcher/worker_1] I am working [INFO][route-akka.actor.default-dispatcher-3] [akka://route/user/watcher/worker_2] I am working [INFO][route-akka.actor.default-dispatcher-4] [akka://route/user/watcher/worker_3] I am working [INFO][route-akka.actor.default-dispatcher-3] [akka://route/user/watcher/worker_4] I am working [INFO][route-akka.actor.default-dispatcher-3] [akka://route/user/watcher/worker_0] I am working ... [INFO][route-akka.actor.default-dispatcher-2] [akka://route/user/watcher/worker_0] I will shutdown akka://route/user/watcher/worker_1 is closed,routees=0 Close system
可以看到,WORKING消息被轮流发送给这5个worker。大家可以修改路由策略,观察不同路由策略下的消息投递方式(除了RoundRobinRoutingLogic外,还可以尝试BroadcastRoutingLogic广播策略、RandomRoutingLogic随机投递策略、SmallestMailboxRoutingLogic空闲Actor优先投递策略)。
7.9 Actor的内置状态转换
在很多场景下,Actor的业务逻辑可能比较复杂,Actor可能需要根据不同的状态对同一条消息作出不同的处理。Akka已经为我们考虑到了这一点,一个Actor内部消息处理函数可以拥有多个不同的状态,在特定的状态下,可以对同一消息进行不同的处理,状态之间也可以任意切换。
现在让我们模拟一个婴儿Actor,假设婴儿会拥有两种不同的状态,开心或者生气。当你带他玩的时候,他总是会表现出开心状态,当你让他睡觉时,他就会非常生气,小孩子总是拥有用不完的精力,入睡困难可能是一种通病吧!
在我们这个简单的场景模拟中,我们会给这个婴儿Actor发送睡觉和玩两种指令。如果婴儿正在生气,你还让他睡觉,他就会说“我已经生气了”,如果你让他去玩,他就会变得开心起来。同样,如果他正玩得高兴,你让他继续玩,他就会说“我已经很愉快了”,如果让他睡觉,他就马上变得很生气。
下面的这个BabyActor就模拟了上述场景:
01 public class BabyActor extends UntypedActor { 02 private final LoggingAdapter log = Logging.getLogger(getContext().system(), this); 03 public static enum Msg { 04 SLEEP, PLAY, CLOSE; 05 } 06 07 Procedure<Object> angry = new Procedure<Object>() { 08 @Override 09 public void apply(Object message) { 10 System.out.println("angryApply:"+message); 11 if (message == Msg.SLEEP) { 12 getSender().tell("I am already angry", getSelf()); 13 System.out.println("I am already angry"); 14 } else if (message == Msg.PLAY) { 15 System.out.println("I like playing"); 16 getContext().become(happy); 17 } 18 } 19 }; 20 21 Procedure<Object> happy = new Procedure<Object>() { 22 @Override 23 public void apply(Object message) { 24 System.out.println("happyApply:"+message); 25 if (message == Msg.PLAY) { 26 getSender().tell("I am already happy :-)", getSelf()); 27 System.out.println("I am already happy :-)"); 28 } else if (message == Msg.SLEEP) { 29 System.out.println("I don't want to sleep"); 30 getContext().become(angry); 31 } 32 } 33 }; 34 35 @Override 36 public void onReceive(Object msg) { 37 System.out.println("onReceive:"+msg); 38 if (msg == Msg.SLEEP) { 39 getContext().become(angry); 40 } else if (msg == Msg.PLAY) { 41 getContext().become(happy); 42 } else { 43 unhandled(msg); 44 } 45 } 46 }
上述代码中,使用了become()方法用于切换Actor的状态(第39、41行)。方法become()接收一个Procedure参数。Procedure在这里可以表示一种Actor的状态,同时,更重要的是它封装了在这种状态下的消息处理逻辑。
在这个BabyActor中,定义了两种Procedure,一是angry生气(第7行),另一个是happy开心(第21行)。
在初始状态下,BabyActor既没有生气也不开心。因此angry处理函数和happy处理函数都不会工作。当BabyActor接收到消息时,系统会调用onReceive()方法来处理这个消息。
令人吃惊的魔法就在这个onReceive()函数中。当onReceive()处理SLEEP消息时,它会切换当前Actor的状态为angry(第39行)。如果是PLAY消息,则切换状态为happy。
一旦完成状态切换,当后续有新的消息送达时,就不会再由onReceive()函数处理了。由于angry和happy本身就是消息处理函数。因此,后续的消息就直接交由当前状态处理(angry或者happy),从而很好地封装了Actor的多个不同处理逻辑。
下面的代码向我们的婴儿Actor发送了几条PLAY和SLEEP的消息:
1 ActorSystem system = ActorSystem.create("become", ConfigFactory.load("samplehello.conf")); 2 ActorRef child = system.actorOf(Props.create(BabyActor.class), "baby"); 3 system.actorOf(Props.create(WatchActor.class, child), "watcher"); 4 child.tell(BabyActor.Msg.PLAY, ActorRef.noSender()); 5 child.tell(BabyActor.Msg.SLEEP, ActorRef.noSender()); 6 child.tell(BabyActor.Msg.PLAY, ActorRef.noSender()); 7 child.tell(BabyActor.Msg.PLAY, ActorRef.noSender()); 8 9 child.tell(PoisonPill.getInstance(), ActorRef.noSender());
其输出如下(进行过适量裁剪):
onReceive:PLAY happyApply:SLEEP I don't want to sleep angryApply:PLAY I like playing happyApply:PLAY I am already happy :-) [INFO][akka://become/user/watcher] akka://become/user/baby has terminated, shutting down system
可以看到,当第一个PLAY消息到来时,是由onReceive()函数进行处理的,在onReceive()中,将Actor切换为happy状态。因此,当SLEEP消息达到时,由happy.apply()函数处理,接着Actor切换为angry状态。当PLAY消息再次到达时,由angry.apply()函数处理。由此可见,Akka为Actor提供了灵活的状态切换机制,处于不同状态的Actor可以绑定不同的消息处理函数进行消息处理,这对构造结构化应用有着重要的帮助。
7.10 询问模式:Actor中的Future
由于Actor之间都是通过异步消息通信的。当你发送一条消息给一个Actor后,你通常只能等待Actor的返回。与同步方法不同,在你发送异步消息后,接受消息的Actor可能还根本来不及处理你的消息,而调用方就已经返回了。
这种模式与我们之前提到的Future模式非常相像。不同之处只是在传统的异步调用中,我们进行的是函数调用,但在这里,我们发送了一条消息。
因为两者的行为方式是如此相像,因此我们就会很自然地想到,当我们需要一个有返回值的调用时,Actor是不是也应该给我们一个契约(Future)呢?这样,就算我们当下没有办法立即获得Actor的处理结果,在将来,通过这个契约还是可以追踪到我们的请求的。
01 import static akka.pattern.Patterns.ask; 02 import static akka.pattern.Patterns.pipe; 03 04 public class AskMain { 05 06 public static void main(String[] args) throws Exception { 07 ActorSystem system = ActorSystem.create("askdemo", ConfigFactory.load("samplehello.conf")); 08 ActorRef worker = system.actorOf(Props.create(MyWorker.class), "worker"); 09 ActorRef printer = system.actorOf(Props.create(Printer.class), "printer"); 10 system.actorOf(Props.create(WatchActor.class, worker), "watcher"); 11 12 //等待future返回 13 Future<Object> f = ask(worker, 5, 1500); 14 int re = (int) Await.result(f, Duration.create(6, TimeUnit.SECONDS)); 15 System.out.println("return:" + re); 16 17 //直接导向其他Actor,pipe不会等待 18 f = ask(worker, 6, 1500); 19 pipe(f, system.dispatcher()).to(printer); 20 21 worker.tell(PoisonPill.getInstance(), ActorRef.noSender()); 22 } 23 }
上述代码给出了两处在Actor交互中使用Future的例子。
在第13行,使用ask()方法给worker发送消息,消息内容是5,也就说worker会接收到一个Integer消息,值为5。当workder接收到消息后,就可以进行计算处理,并且将结果返回给发送者。当然,这个处理过程可能需要花费一点时间。
方法ask()不会等待worker处理,会立即返回一个Future对象(第13行)。在第14行,我们使用Await方法等待worker的返回,接着在第15行打印返回结果。
在这种方法中,我们间接地将一个异步调用转为同步阻塞调用。虽然比较容易理解,但是在有些场合可能会出现性能问题。另外一种更为有效的方法是使用pipe()函数。
代码第18行使用ask()再次询问worker,并传递数值6给worker。接着并不进行等待,而是使用pipe()将这个Future重定向到另外一个称为printer的Actor。pipe()函数不会阻塞程序,会立即返回。
这个printer的实现很简单的,只是简单地输出得到的数据:
01 @Override 02 public void onReceive(Object msg) { 03 if (msg instanceof Integer) { 04 System.out.println("Printer:"+msg); 05 } 06 if (msg == Msg.DONE) { 07 log.info("Stop working"); 08 }if (msg == Msg.CLOSE) { 09 log.info("I will shutdown"); 10 getSender().tell(Msg.CLOSE, getSelf()); 11 getContext().stop(getSelf()); 12 } else 13 unhandled(msg); 14 }
上述代码就是Printer Actor的实现,它会通过pipe()方法得到worker的输出结果,并打印在控制台上(第4行)。
在本例中,worker Actor接受一个整数,并计算它的平方,并给予返回。如下:
01 @Override 02 public void onReceive(Object msg) { 03 if (msg instanceof Integer) { 04 int i=(Integer)msg; 05 try { 06 Thread.sleep(1000); 07 } catch (InterruptedException e) {} 08 getSender().tell(i*i, getSelf()); 09 } 10 if (msg == Msg.DONE) { 11 log.info("Stop working"); 12 }if (msg == Msg.CLOSE) { 13 log.info("I will shutdown"); 14 getSender().tell(Msg.CLOSE, getSelf()); 15 getContext().stop(getSelf()); 16 } else 17 unhandled(msg); 18 }
上述代码第5~7行,模拟了一个耗时的调用,为了更明显地说明ask()和pipe()方法的用途。第8行,worker计算了给定数值的平方,并把它“告诉”请求者。
7.11 多个Actor同时修改数据:Agent
在Actor的编程模型中,Actor之间主要通过消息进行信息传递。因此,很少发生多个Actor需要访问同一个共享变量的情况。但在实际开发中,这种情况很难完全避免。那如果多个Agent需要对同一个共享变量进行读写时,如何保证线程安全呢?
在Akka中,使用一种叫做Agent的组件来实现这个功能。一个Agent提供了对一个变量的异步更新。当一个Actor希望改变Agent的值时,它会向这个Agent下发一个动作(action)。当多个Actor同时改变Agent时,这些action将会在ExecutionContext中被并发调度执行。在任意时刻,一个Agent最多只能执行一个action,对于某一个线程来说,它执行action的顺序与它的发生顺序一致,但对于不同线程来说,这些action可能会交织在一起。
Agent的修改可以使用两个方法send()或者alter()。它们都可以向Agent发送一个修改动作。但是send()方法没有返回值,而alter()方法会返回一个Future对象便于跟踪Agent的执行。
下面让我们模拟这么一个场景:有10个Actor,它们一起对一个Agent执行累加操作,每个agent累加10000次,如果没有意外,那么agent最终的值将是100000,如果Actor间的调度出现问题,那么这个值可能小于100000。
01 public class CounterActor extends UntypedActor { 02 Mapper addMapper = new Mapper<Integer, Integer>() { 03 @Override 04 public Integer apply(Integer i) { 05 return i+1; 06 } 07 }; 08 09 @Override 10 public void onReceive(Object msg) { 11 if (msg instanceof Integer) { 12 for (int i = 0; i < 10000; i++) { 13 //我希望能够知道future何时结束 14 Future<Integer> f = AgentDemo.counterAgent.alter(addMapper); 15 AgentDemo.futures.add(f); 16 } 17 getContext().stop(getSelf()); 18 } else 19 unhandled(msg); 20 } 21 }
上述代码定义了一个累加的Actor:CounterActor。第2~7行,定义了累计动作action addMapper。它的作用是对Agent的值进行修改,这里简单地加1。
CounterActor的消息处理函数onReceive()中,对全局的counterAgent进行累加操作,alter()指定了累加动作addMapper(第14行)。由于我们希望在将来知道累加行为是否完成,因此在这里将返回的Future对象进行收集(第15行)。完成任务后,Actor自行退出(第17行)。
程序的主函数如下:
01 public class AgentDemo { 02 public static Agent<Integer> counterAgent = Agent.create(0, ExecutionContexts.global()); 03 static ConcurrentLinkedQueue<Future<Integer>> futures = new ConcurrentLinkedQueue<Future <Integer>>(); 04 05 public static void main(String[] args) throws InterruptedException { 06 final ActorSystem system = ActorSystem.create("agentdemo", 07 ConfigFactory.load("samplehello.conf")); 08 ActorRef[] counter = new ActorRef[10]; 09 for (int i = 0; i < counter.length; i++) { 10 counter[i] = system.actorOf(Props.create(CounterActor.class), "counter_" + i); 11 } 12 final Inbox inbox = Inbox.create(system); 13 for (int i = 0; i < counter.length; i++) { 14 inbox.send(counter[i], 1); 15 inbox.watch(counter[i]); 16 } 17 18 int closeCount = 0; 19 //等待所有Actor全部结束 20 while (true) { 21 Object msg = inbox.receive(Duration.create(1, TimeUnit.SECONDS)); 22 if (msg instanceof Terminated) { 23 closeCount++; 24 if (closeCount == counter.length) { 25 break; 26 } 27 } else { 28 System.out.println(msg); 29 } 30 } 31 // 等待所有的累加线程完成,因为他们都是异步的 32 Futures.sequence(futures, system.dispatcher()).onComplete( 33 new OnComplete<Iterable<Integer>>() { 34 @Override 35 public void onComplete(Throwable arg0, Iterable<Integer> arg1) throws Throwable { 36 System.out.println("counterAgent=" + counterAgent.get()); 37 system.shutdown(); 38 } 39 }, system.dispatcher()); 40 } 41 }