饭饭TXT > 学习管理 > 《实战Java高并发程序设计(出书版)》作者:葛一鸣/郭超【完结】 > 实战Java高并发程序设计.txt

第7章 使用Akka构建高并发程序

作者:葛一鸣/郭超 当前章节:16008 字 更新时间:2026-6-23 07:00

我们知道,写出一个正确的、高性能并且可扩展的并发程序是相当困难的,那么是否有一个好的框架可以帮助我们轻松构建这么一个应用呢?答案是肯定的,那就是Akka。Akka是一款遵循Aapche 2许可的开源人员,这意味着你可以无偿并且几乎没有限制地使用它,包括将它应用于商业环境中。

Akka是用Scala创建的,但由于Scala和Java一样,都是Java虚拟机上的语言,本质上说,两者并没有什么不同,因此,我们也可以在Java中使用Akka。考虑到Java开发人员的数量远远高于Scala,为了方便大众,在这里,我将全程使用Java来作为Akka的宿主语言(本书使用Akka 2.11-2.3.7作为演示)。但我并不打算在这里把对Akka的介绍写成一个Akka使用手册,因此,不会对Akka进行全方位完整的API介绍。只是希望在这里对Akka的主要功能进行简单的描述,帮助大家尽快理解Akka的基本思想。

那么使用Akka能够给我们带来什么好处呢?

首先Akka提供了一种称为Actor的并发模型,其粒度比线程更小,这意味着你可以在系统中启用极其大量的Actor。

其次,Akka中提供了一套容错机制,允许在Actor出现异常时进行一些恢复或者重置操作。

最后,通过Akka不仅可以在单机上构建高并发程序,也可以在网络中构建分布式程序,并提供位置透明的Actor定位服务。

下面就让我们正式开启Akka之旅吧!

7.1 新并发模型:Actor

对于并发程序来说,线程始终作为并发程序的基本执行单元。但在Akka中,你可以完全忘记线程了。当你使用Akka时,你就有一个全新的执行单元——Actor。Actor是什么呢?

简单来说,你可以把Actor比喻成一个人。多个人之间可以使用语言进行交流。比如,老师问同学5乘以5是多少呀?同学听到问题后,想了想,回答说是25。Actor之间的通信方式和上述对话形式几乎是一模一样的。

传统Java并行程序,还是完全基于面向对象的方法。我们还是通过对象的方法调用进行信息的传递。这时,如果对象的方法会修改对象本身的状态,那么在多线程情况下,就有可能出现对象状态的不一致,所以我们必须对这类方法调用进行同步。当然,同步往往就是以牺牲性能为代价的。

在Actor模型中,我们失去了对象的方法调用,我们并不是通过调用Actor对象的某一个方法来告诉Actor你需要做什么,而是给Actor发送一条消息。当一个Actor收到消息后,它有可能会根据消息的内容做出某些行为,包括更改自身状态。但是,在这种情况下,这个状态的更改是Actor自己进行的,并不是由外界被迫进行的。

7.2 Akka之Hello World

在了解了Actor的基本行为模式后,我们通过简单的Hello World程序来进一步了解一下Akka的开发。

首先让我们看一下,第1个Actor的实现:

01 public class Greeter extends UntypedActor { 02 public static enum Msg { 03 GREET, DONE; 04 } 05 06 @Override 07 public void onReceive(Object msg) { 08 if (msg == Msg.GREET) { 09 System.out.println("Hello World!"); 10 getSender().tell(Msg.DONE, getSelf()); 11 } else 12 unhandled(msg); 13 } 14 }

上述代码中,定义了一个欢迎者(Greeter)Actor,它继承自UntypedActor(它自然就是Akka中的核心成员了)。UntypedActor就是我们所说的Actor,之所以这里强调是无类型的,那是因为在Akka中,还支持一种有类型的Actor。有类型的Actor可以使用系统中的其他类型构造,可以缓解Java单继承的问题。因为你在继承了UntypedActor后,就不能再继承系统中的其他类了。如果你一定想这么做,那么就只能选择有类型的Actor。否则,UntypedActor应该就是你的首选。

在这里,代码第2~4行,定义了消息类型。这里只有两种类型,欢迎(GREET)以及完成(DONE)。当Greeter收到GREET消息时,就会在控制台打印“Hello World”,并且向消息发送方发送DONE信息(第10行)。

与Greeter交流的另外一个Actor是HelloWorld,它的实现如下:

01 public class HelloWorld extends UntypedActor { 02 ActorRef greeter; 03 04 @Override 05 public void preStart() { 06 greeter = getContext().actorOf(Props.create(Greeter.class), "greeter"); 07 System.out.println("Greeter Actor Path:" + greeter.path()); 08 greeter.tell(Greeter.Msg.GREET, getSelf()); 09 } 10 11 @Override 12 public void onReceive(Object msg) { 13 if (msg == Greeter.Msg.DONE) { 14 greeter.tell(Greeter.Msg.GREET, getSelf()); 15 getContext().stop(getSelf()); 16 } else 17 unhandled(msg); 18 } 19 }

上述代码实现了一个名为HelloWorld的Actor。第5行的preStart()方法为Akka的回调方法,在Actor启动前,会被Akka框架调用,完成一些初始化的工作。在这里,我们在HelloWorld中创建了Greeter的实例(第6行),并且向它发送GREET消息(第8行)。此时,由于创建Greeter时使用的是HelloWorld的上下文,因此,它属于HelloWorld的子Actor。

第12行定义的onReceive()函数为HelloWorld的消息处理函数。在这里,只处理DONE的消息。在收到DONE消息后,它会再向Greeter发送GREET消息,接着将自己停止。

因此,Greeter会前后收到两条GREET消息,会打印两次“Hello World”。

最后,让我们看一下主函数main():

1 public class HelloMainSimple { 2 public static void main(String[] args) { 3 ActorSystem system = ActorSystem.create("Hello",ConfigFactory.load("samplehello.conf")); 4 ActorRef a = system.actorOf(Props.create(HelloWorld.class),"helloWorld"); 5 System.out.println("HelloWorld Actor Path:" + a.path()); 6 } 7 }

程序第3行,创建了ActorSystem,表示管理和维护Actor的系统。一般来说,一个应用程序只需要一个ActorSystem就够用了。ActorSystem.create()的第1个参数“Hello”为系统名称,第2个参数为配置文件。

第4行通过ActorSystem创建一个顶级的Actor(HelloWorld)。

配置文件samplehello.conf的内容如下:

akka { loglevel = INFO }

在这里,只是简单地配置了一下日志级别为INFO。

执行上述代码,可以看到以下输出:

1 HelloWorld Actor Path:akka://Hello/user/helloWorld 2 Greeter Actor Path:akka://Hello/user/helloWorld/greeter 3 Hello World! 4 Hello World! 5 [INFO] [05/13/2015 21:15:01.299] [Hello-akka.actor.default-dispatcher-2] [akka://Hello/user/helloWorld] Message [geym.akka.demo.hello.Greeter$Msg] from Actor[akka://Hello/user/helloWorld/greeter#-1698722495] to Actor[akka://Hello/user/helloWorld#-1915075849] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

第1行打印了HelloWorld Actor的路径。它是系统内第1个被创建的Actor。它的路径为:akka://Hello/user/helloWorld。其中第1个Hello表示ActorSystem的系统名,可以看一下我们构造这ActorSystem时,传入的第1个参数就是Hello。接着user表示用户Actor。所有的用户Actor都会挂载在user这个路径下。第3个helloWorld就是这个Actor的名字。

同理,第2个Greeter Actor的路径结构和HelloWorld是完全一致的。输出的第3、4行显示了Greeter打印的两条信息。第5行表示系统遇到了一条消息投递失败,失败的原因是HelloWorld将自己终止了,导致Greeter发送的信息无法投递。

可以看到,当使用Actor进行并行程序开发时,我们的关注点已经不在线程上了。实际上,线程调度已经被Akka框架进行封装,我们只需要关注Actor对象即可。而Actor对象之间的交流和普通的对象的函数调用有明显区别。它们是通过显示的消息发送来传递信息的。

当系统内有多个Actor存在时,Akka会自动在线程池中选择线程来执行我们的Actor。因此,多个不同的Actor有可能会被同一个线程执行,同时,一个Actor也有可能被不同的线程执行。因此,一个值得注意的地方是:不要在一个Actor中执行耗时的代码,这样可能会导致其他Actor的调度出现问题。

7.3 有关消息投递的一些说明

整个Akka应用是由消息驱动的。消息是除了Actor之外最重要的核心组件。作为在并发程序中的核心组件,在Actor之间传递的消息应该满足不可变性,也就是不变模式。因为可变的消息无法高效的在并发环境中使用。理论上Akka中的消息可以使用任何对象实例,但实际使用中,强烈推荐使用不可变的对象。一个典型的不可变对象的实现如下:

01 public final class ImmutableMessage { 02 private final int sequenceNumber; 03 04 private final List<String> values; 05 06 public ImmutableMessage(int sequenceNumber, List<String> values) { 07 this.sequenceNumber = sequenceNumber; 08 this.values = Collections.unmodifiableList(new ArrayList<String>(values)); 09 } 10 11 public int getSequenceNumber() { 12 return sequenceNumber; 13 } 14 15 public List<String> getValues() { 16 return values; 17 } 18 }

上述代码实现了一个不可变的消息。注意代码中对final的使用,它申明了当前消息中的几个字段都是常量,在消息构造完成后,就不能再发生改变了。更加需要注意的是,对于values字段,final关键字只能保证values引用的不可变性,并无法保证values对象的不可变性。为了实现彻底的不可变性,代码第8行构造了一个不可变的List对象。

对于消息投递,大家可能还有另外一个疑问,那就是消息投递究竟是以何种策略进行的呢?也就是发出去的消息一定会被对方接收到吗?如果接收不到会重发吗?有没有可能重复接收消息呢?

实际上,对于消息投递,我们可以有3种不同的策略:

第1种,称为至多一次投递。在这种策略中,每一条消息最多会被投递一次。在这种情况下,可能偶尔会出现消息投递失败,而导致消息丢失。

第2种称为至少一次投递。在这种策略中,每一条消息至少会被投递一次,直到成功为止。因此在一些偶然的场合,接受者可能会收到重复的消息,但不会发生消息丢失。

第3种称为精确的消息投递。也就是所有的消息保证被精确地投递并成功接收一次,既不会有丢失,也不会有重复接收。

很明显,第1种策略是最高性能,最低成本的。因为系统只要负责把消息送出去就可以了,不需要关注是否成功。第2种策略则需要保存消息投递的状态并不断充实。而第3种策略则是成本最高且最不容易实现的。

那我们是否真的需要保证消息投递的可靠性呢?

答案是否定的。实际上,我们没有必要在Akka层保证消息的可靠性。这样做,成本太高了,也是没有必要的。消息的可靠性更应该在应用的业务层去维护,因为也许在有些时候,丢失一些消息完全是符合应用要求的。因此,在使用Akka时,需要在业务层对此进行保证。

此外,对于消息投递Akka可以在一定程度上保证顺序性。比如,Actor A1向A2顺序发送了M1、M2和M3三条消息。Actor A3向A2顺序发送了M4、M5和M6三条消息。那么系统可以保证:

(1)如果M1没有丢失,那它一定先于M2和M3被A2收到。

(2)如果M2没有丢失,那它一定先于M3被A2收到。

(3)如果M4没有丢失,那它一定先于M5和M6被A2收到。

(4)如果M5没有丢失,那它一定先于M6被A2收到。

(5)对A2来说,来自A1和A3的消息可能交织在一起,没有顺序保证。

在这里,值得注意的一点是,这种消息投递规则不具备可传递性,比如:

Actor A向C发送了M1,接着,Actor A向B发送了M2,B将M2转发给Actor C。那么在这种情况下,C收到M1和M2的先后顺序是没有保证的。

7.4 Actor的生命周期

Actor在系统中产生后,也存在着“生老病死”的活动周期。Akka框架提供了若干回调函数,让我们得以在Actor的活动周期内进行一些业务相关的行为。Actor的生命周期如图7.1所示。

图7.1 Actor的生命周期

一个Actor在actorOf()函数被调用后开始建立,Actor实例创建后,会回调preStart()方法。在这个方法里,我们可以进行一些资源的初始化工作。在Actor的工作过程中,可能会出现一些异常,这种情况下,Actor会需要重启。当Actor被重启时,会回调preRestart()方法(在老的实例上),接着系统会创建一个新的Actor对象实例(虽然是新的实例,但它们都表示同一个Actor)。当新的Actor实例创建后,会回调postRestart()方法,表示启动完成,同时新的实例将会代替旧的实例。停止一个Actor也有很多方式,你可以调用stop()方法或者给Actor发送一个PosionPill(毒药丸)。Actor停止时,postStop()方法会被调用,同时这个Actor的监视者会收到一个Terminated消息。

下面让我们建立一个带有生命周期回调函数的Actor:

public class MyWorker extends UntypedActor { private final LoggingAdapter log = Logging.getLogger(getContext().system(), this); public static enum Msg { WORKING, DONE,CLOSE; } @Override public void preStart(){ System.out.println("MyWorker is starting"); } @Override public void postStop(){ System.out.println("MyWorker is stopping"); } @Override public void onReceive(Object msg) { if (msg == Msg.WORKING) { System.out.println("I am working"); } if (msg == Msg.DONE) { System.out.println("Stop working"); }if (msg == Msg.CLOSE) { System.out.println("I will shutdown"); getSender().tell(Msg.CLOSE, getSelf()); getContext().stop(getSelf()); } else unhandled(msg); } }

上述代码定义了一个名为MyWorker的Actor。它重载了preStart()和postStop()两个方法。一般来说,我们可以使用preStart()来初始化一些资源,使用postStop()来进行资源的释放。这个Actor很简单,当它收到WORKING消息时,就打印“I am working”,收到DONE消息时,打印“Stop working”。

接着,我们为MyWorker指定一个监视者,监视者就如同一个劳动监工,一旦MyWorker因为意外停止工作,监视者就会收到一个通知。

01 public class WatchActor extends UntypedActor { 02 private final LoggingAdapter log = Logging.getLogger(getContext().system(), this); 03 04 public WatchActor(ActorRef ref) { 05 getContext().watch(ref); 06 } 07 08 @Override 09 public void onReceive(Object msg) { 10 if (msg instanceof Terminated) { 11 System.out.println(String.format("%s has terminated, shutting down system", 12 ((Terminated) msg).getActor().path())); 13 getContext().system().shutdown(); 14 } else { 15 unhandled(msg); 16 } 17 } 18 }

上述代码定义了一个监视者WatchActor,它本质上也是一个Actor,但不同的是,它会在它的上下文中watch一个Actor(第5行)。如果将来这个被监视的Actor的退出终止,WatchActor就能收到一条Terminated消息(代码第10行)。在这里,我们将简单地打印终止消息Terminated中的相关Actor路径,并且关闭整个ActorSystem(第13行)。

主函数如下:

01 public class DeadMain { 02 public static void main(String[] args) { 03 ActorSystem system = ActorSystem 04 .create("deadwatch", ConfigFactory.load("samplehello.conf")); 05 ActorRef worker = system.actorOf(Props.create(MyWorker.class), "worker"); 06 system.actorOf(Props.create(WatchActor.class, worker), "watcher"); 07 worker.tell(MyWorker.Msg.WORKING, ActorRef.noSender()); 08 worker.tell(MyWorker.Msg.DONE, ActorRef.noSender()); 09 worker.tell(PoisonPill.getInstance(), ActorRef.noSender()); 10 } 11 }

上述代码中,我们首先创建ActorSystem全局实例(第3~4行),接着创建MyWorker Actor和WatchActor。注意第6行的Props.create()方法,它的第1个参数为要创建的Actor类型,第2个参数为这个Actor的构造函数的参数(在这里,就是要调用WatchActor的构造函数)。接着,向MyWorker先后发送WORKING和DONE两条消息。最后在第9行,发送一条特殊的消息PoisonPill。PoisonPill就是毒药丸,它会直接毒死接收方,让其终止。

执行上述代码,系统输出如下:

MyWorker is starting I am working Stop working MyWorker is stopping akka://deadwatch/user/worker has terminated, shutting down system

从这个输出中可以看到,MyWorker生命周期中的两个回调函数以及消息处理函数都被正常调用。最后一行输出也显示WatchActor正常监视到MyWorker的终止。

7.5 监督策略

如果一个Actor在执行过程中发生意外,比如没有处理某些异常,导致出错,那么这个时候应该怎么办呢?系统是应该当做什么都没发生过,继续执行,还是认为遇到了一个系统性的错误而重启Actor甚至是它所有的兄弟Actor呢?

对于这种情况,Akka框架给予了我们足够的控制权。在Akka框架内,父Actor可以对子Actor进行监督,监控Actor的行为是否有异常。大体上,监督策略可以分为两种:一种是OneForOneStrategy的监督,另外一种是AllForOneStrategy。

对于OneForOneStrategy的策略,父Actor只会对出问题的子Actor进行处理,比如重启或者停止,而对于AllForOneStrategy,父Actor会对出问题的子Actor以及它所有的兄弟都进行处理。很显然,对于AllForOneStrategy策略,它更加适合于各个Actor联系非常紧密的场景,如果多个Actor间只要有一个Actor出现故障,则宣告整个任务失败,就比较适合使用AllForOneStrategy,否则,在更多的场景中,应该使用OneForOneStrategy。当然了,OneForOneStrategy也是Akka的默认策略。

在一个指定的策略中,我们可以对Actor的失败情况进行相应的处理,比如:当失败时,我们可以无视这个错误,继续执行Actor,就像什么事都没发生过一样。或者可以重启这个Actor,甚至可以让这个Actor彻底停止工作。要指定这些监督行为,只要构造一个自定义的监督策略即可。

下面让我们简单看一下SupervisorStrategy的使用和设置。首先,需要定一个父Actor,它作为所有子Actor的监督者:

01 public class Supervisor extends UntypedActor { 02 private static SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.create(1, TimeUnit.MINUTES), 03 new Function<Throwable, Directive>() { 04 @Override 05 public Directive apply(Throwable t) { 06 if (t instanceof ArithmeticException) { 07 System.out.println("meet ArithmeticException,just resume"); 08 return SupervisorStrategy.resume(); 09 } else if (t instanceof NullPointerException) { 10 System.out.println("meet NullPointerException,restart"); 11 return SupervisorStrategy.restart(); 12 } else if (t instanceof IllegalArgumentException) { 13 return SupervisorStrategy.stop(); 14 } else { 15 return SupervisorStrategy.escalate(); 16 } 17 } 18 }); 19 20 @Override 21 public SupervisorStrategy supervisorStrategy() { 22 return strategy; 23 } 24 25 public void onReceive(Object o) { 26 if (o instanceof Props) { 27 getContext().actorOf((Props) o,"restartActor"); 28 } else { 29 unhandled(o); 30 } 31 } 32 }

上述代码第2~18行,定义了一个OneForOneStrategy的监督策略。在这个监督策略中,运行Actor在遇到错误后,在1分钟内进行3次重试。如果超过这个频率,那么就会直接杀死Actor。具体的策略由第5~16行定义。这里的含义是,当遇到ArithmeticException异常时(比如除以0的错误),继续指定这个Actor,不做任何处理(第8行);当遇到空指针时,进行Actor的重启(第11行)。如果遇到IllegalArgumentException异常,则直接停止Actor(第13行)。对于在这个函数中没有涉及的异常,则向上抛出,由更顶层的Actor处理(第15行)。

第20~23行覆盖父类的supervisorStrategy()方法,设置使用自定义的监督策略。

第27行用来新建一个名为restartActor的子Actor,这个子Actor就由当前的Supervisor进行监督了。当Supervisor接收一个Props对象时,就会根据这个Props配置生成一个restartActor。

RestartActor的实现如下:

01 public class RestartActor extends UntypedActor { 02 public enum Msg { 03 DONE, RESTART 04 } 05 06 @Override 07 public void preStart() { 08 System.out.println("preStart hashcode:" + this.hashCode()); 09 } 10 11 @Override 12 public void postStop() { 13 System.out.println("postStop hashcode:" + this.hashCode()); 14 } 15 16 @Override 17 public void postRestart(Throwable reason) throws Exception { 18 super.postRestart(reason); 19 System.out.println("postRestart hashcode:" + this.hashCode()); 20 } 21 22 @Override 23 public void preRestart(Throwable reason,Option opt) throws Exception { 24 System.out.println("preRestart hashcode:" + this.hashCode()); 25 } 26 27 @Override 28 public void onReceive(Object msg) { 29 if (msg == Msg.DONE) { 30 getContext().stop(getSelf()); 31 } else if (msg == Msg.RESTART) { 32 System.out.println(((Object)null).toString()); 33 //抛出异常 默认会被restart,但这里会resume 34 double a = 0 / 0; 35 } 36 unhandled(msg); 37 } 38 }

第6~25行,定义了一些Actor的生命周期的回调接口。目的是更好地观察Actor的活动情况。在第32~34行模拟了一些异常情况,第32行会抛出NullPointerException,而第34行因为除以零,所以会抛出ArithmeticException。

主函数如下定义:

01 public static void customStrategy(ActorSystem system){ 02 ActorRef a = system.actorOf(Props.create(Supervisor.class), "Supervisor"); 03 a.tell(Props.create(RestartActor.class), ActorRef.noSender()); 04 05 ActorSelection sel=system.actorSelection("akka://lifecycle/user/Supervisor/restartActor"); 06 07 for(int i=0;i<100;i++){ 08 sel.tell(RestartActor.Msg.RESTART, ActorRef.noSender()); 09 } 10 } 11 public static void main(String[] args) { 12 ActorSystem system = ActorSystem.create("lifecycle", ConfigFactory.load("lifecycle.conf")); 13 customStrategy(system); 14 }

上述代码中,第12行代码创建了全局ActorSystem,接着在customStrategy()函数中创建了Supervisor Actor,并且对Supervisor发送一个RestartActor的Props(第3行,这个消息会使得Supervisor创建RestartActor)。

接着,选中RestartActor实例(第5行)。第7~9行,向这个RestartActor发送100条RESTART消息。这会使得RestartActor抛出NullPointerException。

执行上述代码,部分输出如下(由于输出太多,这里只截取重要的部分):

01 preStart hashcode:7302437 02 meet NullPointerException,restart 03 preRestart hashcode:7302437 04 [ERROR] [lifecycle-akka.actor.default-dispatcher-3] [akka://lifecycle/user/Supervisor/ restartActor] null 05 java.lang.NullPointerException 06 at geym.akka.demo.lifecycle.RestartActor.onReceive(RestartActor.java:46) 07 at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) 08 at akka.actor.Actor$class.aroundReceive(Actor.scala:465) 09 at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) 10 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) 11 at akka.actor.ActorCell.invoke(ActorCell.scala:487) 12 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) 13 at akka.dispatch.Mailbox.run(Mailbox.scala:221) 14 at akka.dispatch.Mailbox.exec(Mailbox.scala:231) 15 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 16 at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 17 at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 18 at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 19 20 preStart hashcode:23269863 21 postRestart hashcode:23269863 22 meet NullPointerException,restart 23 preRestart hashcode:23269863 24 preStart hashcode:24918371 25 postRestart hashcode:24918371 26 meet NullPointerException,restart 27 preRestart hashcode:24918371 28 preStart hashcode:12844205 29 postRestart hashcode:12844205 30 [ERROR] [lifecycle-akka.actor.default-dispatcher-2] [akka://lifecycle/user/Supervisor/restartActor] n ull 31 meet NullPointerException,restart 32 ..... 33 postStop hashcode:12844205

目录
设置
设置
阅读主题
字体风格
雅黑 宋体 楷书 卡通
字体大小
适中 偏大 超大
保存设置
恢复默认
手机
手机阅读
扫码获取链接,使用浏览器打开
书架同步,随时随地,手机阅读
首 页 < 上一章 章节列表 下一章 > 尾 页