上述代码中,第11行和第21行分别模拟了一个非常耗时的操作,让线程耗时1秒钟。它们分别对应读耗时和写耗时。代码第34和45行,分别是读线程和写线程。在这里,第34行使用读锁,第35行使用写锁。第53~55行开启18个读线程,第57~59行,开启两个写线程。由于这里使用了读写分离,因此,读线程完全并行,而写会阻塞读,因此,实际上这段代码运行大约2秒多就能结束(写线程之间是实际串行的)。而如果使用第35行代替第34行,使用第46行代替第45行执行上述代码,即,使用普通的重入锁代替读写锁。那么所有的读和写线程之间都必须相互等待,因此整个程序的执行时间将长达20余秒。
3.1.5 倒计时器:CountDownLatch
CountDownLatch是一个非常实用的多线程控制工具类。“Count Down”在英文中意为倒计数,Latch为门闩的意思。如果翻译成为倒计数门闩,我想大家都会觉得不知所云吧!因此,这里简单地称之为倒计数器。在这里,门闩的含义是:把门锁起来,不让里面的线程跑出来。因此,这个工具通常用来控制线程等待,它可以让某一个线程等待直到倒计时结束,再开始执行。
对于倒计时器,一种典型的场景就是火箭发射。在火箭发射前,为了保证万无一失,往往还要进行各项设备、仪器的检查。只有等所有的检查完毕后,引擎才能点火。这种场景就非常适合使用CountDownLatch。它可以使得点火线程等待所有检查线程全部完工后,再执行。
CountDownLatch的构造函数接收一个整数作为参数,即当前这个计数器的计数个数。
public CountDownLatch(int count)
下面这个简单的示例,演示了CountDownLatch的使用。
01 public class CountDownLatchDemo implements Runnable { 02 static final CountDownLatch end = new CountDownLatch(10); 03 static final CountDownLatchDemo demo=new CountDownLatchDemo(); 04 @Override 05 public void run() { 06 try { 07 //模拟检查任务 08 Thread.sleep(new Random().nextInt(10)*1000); 09 System.out.println("check complete"); 10 end.countDown(); 11 } catch (InterruptedException e) { 12 e.printStackTrace(); 13 } 14 } 15 public static void main(String[] args) throws InterruptedException { 16 ExecutorService exec = Executors.newFixedThreadPool(10); 17 for(int i=0;i<10;i++){ 18 exec.submit(demo); 19 } 20 //等待检查 21 end.await(); 22 //发射火箭 23 System.out.println("Fire!"); 24 exec.shutdown(); 25 } 26 }
上述代码第2行,生成一个CountDownLatch实例。计数数量为10。这表示需要有10个线程完成任务,等待在CountDownLatch上的线程才能继续执行。代码第10行,使用了CountDownLatch.countdown()方法,也就是通知CountDownLatch,一个线程已经完成了任务,倒计时器可以减1啦。第21行,使用CountDownLatch.await()方法,要求主线程等待所有10个检查任务全部完成。待10个任务全部完成后,主线程才能继续执行。
上述案例的执行逻辑可以用图3.1简单表示。
图3.1 CountDownLatch示意图
主线程在CountDownLatch上等待,当所有检查任务全部完成后,主线程方能继续执行。
3.1.6 循环栅栏:CyclicBarrier
CyclicBarrier是另外一种多线程并发控制实用工具。和CountDownLatch非常类似,它也可以实现线程间的计数等待,但它的功能比CountDownLatch更加复杂且强大。
CyclicBarrier可以理解为循环栅栏。栅栏就是一种障碍物,比如,通常在私人宅邸的周围就可以围上一圈栅栏,阻止闲杂人等入内。这里当然就是用来阻止线程继续执行,要求线程在栅栏处等待。前面Cyclic意为循环,也就是说这个计数器可以反复使用。比如,假设我们将计数器设置为10,那么凑齐第一批10个线程后,计数器就会归零,然后接着凑齐下一批10个线程,这就是循环栅栏内在的含义。
CyclicBarrier的使用场景也很丰富。比如,司令下达命令,要求10个士兵一起去完成一项任务。这时,就会要求10个士兵先集合报道,接着,一起雄赳赳气昂昂地去执行任务。当10个士兵把自己手头的任务都执行完成了,那么司令才能对外宣布,任务完成!
比CountDownLatch略微强大一些,CyclicBarrier可以接收一个参数作为barrierAction。所谓barrierAction就是当计数器一次计数完成后,系统会执行的动作。如下构造函数,其中,parties表示计数总数,也就是参与的线程总数。
public CyclicBarrier(int parties, Runnable barrierAction)
下面的示例使用CyclicBarrier演示了上述司令命令士兵完成任务的场景。
01 public class CyclicBarrierDemo { 02 public static class Soldier implements Runnable { 03 private String soldier; 04 private final CyclicBarrier cyclic; 05 06 Soldier(CyclicBarrier cyclic, String soldierName) { 07 this.cyclic = cyclic; 08 this.soldier = soldierName; 09 } 10 11 public void run() { 12 try { 13 //等待所有士兵到齐 14 cyclic.await(); 15 doWork(); 16 //等待所有士兵完成工作 17 cyclic.await(); 18 } catch (InterruptedException e) { 19 e.printStackTrace(); 20 } catch (BrokenBarrierException e) { 21 e.printStackTrace(); 22 } 23 } 24 25 void doWork() { 26 try { 27 Thread.sleep(Math.abs(new Random().nextInt()%10000)); 28 } catch (InterruptedException e) { 29 e.printStackTrace(); 30 } 31 System.out.println(soldier + ":任务完成"); 32 } 33 } 34 35 public static class BarrierRun implements Runnable { 36 boolean flag; 37 int N; 38 public BarrierRun(boolean flag, int N) { 39 this.flag = flag; 40 this.N = N; 41 } 42 43 public void run() { 44 if (flag) { 45 System.out.println("司令:[士兵" + N + "个,任务完成!]"); 46 } else { 47 System.out.println("司令:[士兵" + N + "个,集合完毕!]"); 48 flag = true; 49 } 50 } 51 } 52 53 public static void main(String args[]) throws InterruptedException { 54 final int N = 10; 55 Thread[] allSoldier=new Thread[N]; 56 boolean flag = false; 57 CyclicBarrier cyclic = new CyclicBarrier(N, new BarrierRun(flag, N)); 58 //设置屏障点,主要是为了执行这个方法 59 System.out.println("集合队伍!"); 60 for (int i = 0; i < N; ++i) { 61 System.out.println("士兵 "+i+" 报道!"); 62 allSoldier[i]=new Thread(new Soldier(cyclic, "士兵 " + i)); 63 allSoldier[i].start(); 64 } 65 } 66 }
上述代码第57行,创建了CyclicBarrier实例,并将计数器设置为10,并要求在计数器达到指标时,执行第43行的run()方法。每一个士兵线程会执行第11行定义的run()方法。在第14行,每一个士兵线程都会等待,直到所有的士兵都集合完毕。集合完毕后,意味着CyclicBarrier的一次计数完成,当再一次调用CyclicBarrier.await()时,会进行下一次计数。第15行,模拟了士兵的任务。当一个士兵任务执行完毕后,他就会要求CyclicBarrier开始下一次计数,这次计数主要目的是监控是否所有的士兵都已经完成了任务。一旦任务全部完成,第35行定义的BarrierRun就会被调用,打印相关信息。
上述代码的执行输出如下:
集合队伍! 士兵 0 报道! //篇幅有限,省略其他几个士兵 士兵 9 报道! 司令:[士兵10个,集合完毕!] 士兵 0:任务完成 //篇幅有限,省略其他几个士兵 士兵 4:任务完成 司令:[士兵10个,任务完成!]
整个工作过程的图示如图3.2所示。
图3.2 CyclicBarrier工作示意图
CyclicBarrier.await()方法可能会抛出两个异常。一个是InterruptedException,也就是在等待过程中,线程被中断,应该说这是一个非常通用的异常。大部分迫使线程等待的方法都可能会抛出这个异常,使得线程在等待时依然可以响应外部紧急事件。另外一个异常则是CyclicBarrier特有的BrokenBarrierException。一旦遇到这个异常,则表示当前的CyclicBarrier已经破损了,可能系统已经没有办法等待所有线程到齐了。如果继续等待,可能就是徒劳无功的,因此,还是就地散货,打道回府吧!上述代码第18~22行处理了这2种异常。
如果我们在上述代码的第63行后,插入以下代码,使得第5个士兵线程产生中断:
if(i==5){ allSoldier[0].interrupt(); }
如果这样做,我们很可能就会得到1个InterruptedException和9个BrokenBarrierException。这个InterruptedException就是被中断线程抛出的。而其他9个BrokenBarrierException,则是等待在当前CyclicBarrier上的线程抛出的。这个异常可以避免其他9个线程进行永久的、无谓的等待(因为其中一个线程已经被中断,等待是没有结果的)。
3.1.7 线程阻塞工具类:LockSupport
LockSupport是一个非常方便实用的线程阻塞工具,它可以在线程内任意位置让线程阻塞。和Thread.suspend()相比,它弥补了由于resume()在前发生,导致线程无法继续执行的情况。和Object.wait()相比,它不需要先获得某个对象的锁,也不会抛出InterruptedException异常。
LockSupport的静态方法park()可以阻塞当前线程,类似的还有parkNanos()、parkUntil()等方法。它们实现了一个限时的等待。
大家应该还记得,我们在第2章中提到的那个有关suspend()永久卡死线程的例子吧!现在,我们可以用LockSupport重写这个程序:
01 public class LockSupportDemo { 02 public static Object u = new Object(); 03 static ChangeObjectThread t1 = new ChangeObjectThread("t1"); 04 static ChangeObjectThread t2 = new ChangeObjectThread("t2"); 05 06 public static class ChangeObjectThread extends Thread { 07 public ChangeObjectThread(String name){ 08 super.setName(name); 09 } 10 @Override 11 public void run() { 12 synchronized (u) { 13 System.out.println("in "+getName()); 14 LockSupport.park(); 15 } 16 } 17 } 18 19 public static void main(String[] args) throws InterruptedException { 20 t1.start(); 21 Thread.sleep(100); 22 t2.start(); 23 LockSupport.unpark(t1); 24 LockSupport.unpark(t2); 25 t1.join(); 26 t2.join(); 27 } 28 }
注意,这里只是将原来的suspend()和resume()方法用park()和unpark()方法做了替换。当然,我们依然无法保证unpark()方法发生在park()方法之后。但是执行这段代码,你会发现,它自始至终都可以正常的结束,不会因为park()方法而导致线程永久性的挂起。
这是因为LockSupport类使用类似信号量的机制。它为每一个线程准备了一个许可,如果许可可用,那么park()函数会立即返回,并且消费这个许可(也就是将许可变为不可用),如果许可不可用,就会阻塞。而unpark()则使得一个许可变为可用(但是和信号量不同的是,许可不能累加,你不可能拥有超过一个许可,它永远只有一个)。
这个特点使得:即使unpark()操作发生在park()之前,它也可以使下一次的park()操作立即返回。这也就是上述代码可顺利结束的主要原因。
同时,处于park()挂起状态的线程不会像suspend()那样还给出一个令人费解的Runnable的状态。它会非常明确地给出一个WAITING状态,甚至还会标注是park()引起的:
"t1" #8 prio=5 os_prio=0 tid=0x00b1a400 nid=0x1994 waiting on condition [0x1619f000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:304) at geym.conc.ch3.ls.LockSupportDemo$ChangeObjectThread.run(LockSupportDemo.java:18) - locked <0x048b2680> (a java.lang.Object)
这使得分析问题时格外方便。此外,如果你使用park(Object)函数,还可以为当前线程设置一个阻塞对象。这个阻塞对象会出现在线程Dump中。这样在分析问题时,就更加方便了。
比如,如果我们将上述代码第14行的park()改为:
LockSupport.park(this);
那么在线程Dump时,你可能会看到如下信息:
"t1" #8 prio=5 os_prio=0 tid=0x0117ac00 nid=0x2034 waiting on condition [0x15d0f000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x048b4738> (a geym.conc.ch3.ls.LockSupport- Demo$ChangeObjectThread) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at geym.conc.ch3.ls.LockSupportDemo$ChangeObjectThread.run (LockSupportDemo.java:18) - locked <0x048b2808> (a java.lang.Object)
注意,在堆栈中,我们甚至还看到了当前线程等待的对象,这里就是ChangeObjectThread实例。
除了有定时阻塞的功能外,LockSupport.park()还能支持中断影响。但是和其他接收中断的函数很不一样,LockSupport.park()不会抛出InterruptedException异常。它只是会默默的返回,但是我们可以从Thread.interrupted()等方法获得中断标记。
01 public class LockSupportIntDemo { 02 public static Object u = new Object(); 03 static ChangeObjectThread t1 = new ChangeObjectThread("t1"); 04 static ChangeObjectThread t2 = new ChangeObjectThread("t2"); 05 06 public static class ChangeObjectThread extends Thread { 07 public ChangeObjectThread(String name){ 08 super.setName(name); 09 } 10 @Override 11 public void run() { 12 synchronized (u) { 13 System.out.println("in "+getName()); 14 LockSupport.park(); 15 if(Thread.interrupted()){ 16 System.out.println(getName()+" 被中断了"); 17 } 18 } 19 System.out.println(getName()+"执行结束"); 20 } 21 } 22 23 public static void main(String[] args) throws InterruptedException { 24 t1.start(); 25 Thread.sleep(100); 26 t2.start(); 27 t1.interrupt(); 28 LockSupport.unpark(t2); 29 } 30 }
注意上述代码在第27行,中断了处于park()状态的t1。之后,t1可以马上响应这个中断,并且返回。之后在外面等待的t2才可以进入临界区,并最终由LockSupport.unpark(t2)操作使其运行结束。
in t1 t1 被中断了 t1 执行结束 in t2 t2执行结束
3.2 线程复用:线程池
多线程的软件设计方法确实可以最大限度地发挥现代多核处理器的计算能力,提高生产系统的吞吐量和性能。但是,若不加控制和管理的随意使用线程,对系统的性能反而会产生不利的影响。
一种最为简单的线程创建和回收的方法类似如下代码:
new Thread(new Runnable(){ @Override public void run() { //do sth. } }).start();
以上代码创建了一个线程,并在run()方法结束后,自动回收该线程。在简单的应用系统中,这段代码并没有太多问题。但是在真实的生产环境中,系统由于真实环境的需要,可能会开启很多线程来支撑其应用。而当线程数量过大时,反而会耗尽CPU和内存资源。
首先,虽然与进程相比,线程是一种轻量级的工具,但其创建和关闭依然需要花费时间,如果为每一个小的任务都创建一个线程,很有可能出现创建和销毁线程所占用的时间大于该线程真实工作所消耗的时间的情况,反而会得不偿失。
其次,线程本身也是要占用内存空间的,大量的线程会抢占宝贵的内存资源,如果处理不当,可能会导致Out of Memory异常。即便没有,大量的线程回收也会给GC带来很大的压力,延长GC的停顿时间。
因此,对线程的使用必须掌握一个度,在有限的范围内,增加线程的数量可以明显提高系统的吞吐量,但一旦超出了这个范围,大量的线程只会拖垮应用系统。因此,在生产环境中使用线程,必须对其加以控制和管理。
注意:在实际生产环境中,线程的数量必须得到控制。盲目的大量创建线程对系统性能是有伤害的。
3.2.1 什么是线程池
为了避免系统频繁地创建和销毁线程,我们可以让创建的线程进行复用。如果大家进行过数据库开发,对数据库连接池应该不会陌生。为了避免每次数据库查询都重新建立和销毁数据库连接,我们可以使用数据库连接池维护一些数据库连接,让他们长期保持在一个激活状态。当系统需要使用数据库时,并不是创建一个新的连接,而是从连接池中获得一个可用的连接即可。反之,当需要关闭连接时,并不真的把连接关闭,而是将这个连接“还”给连接池即可。通过这种方式,可以节约不少创建和销毁对象的时间。
线程池也是类似的概念。线程池中,总有那么几个活跃线程。当你需要使用线程时,可以从池子中随便拿一个空闲线程,当完成工作时,并不急着关闭线程,而是将这个线程退回到池子,方便其他人使用。
简而言之,在使用线程池后,创建线程变成了从线程池获得空闲线程,关闭线程变成了向池子归还线程,如图3.3所示。
图3-3 线程池的作用
3.2.2 不要重复发明轮子:JDK对线程池的支持
为了能够更好地控制多线程,JDK提供了一套Executor框架,帮助开发人员有效地进行线程控制,其本质就是一个线程池。它的核心成员如图3.4所示。
图3-4 Executor框架结构图
以上成员均在java.util.concurrent包中,是JDK并发包的核心类。其中ThreadPoolExecutor表示一个线程池。Executors类则扮演着线程池工厂的角色,通过Executors可以取得一个拥有特定功能的线程池。从UML图中亦可知,ThreadPoolExecutor类实现了Executor接口,因此通过这个接口,任何Runnable的对象都可以被ThreadPoolExecutor线程池调度。
Executor框架提供了各种类型的线程池,主要有以下工厂方法:
public static ExecutorService newFixedThreadPool(int nThreads) public static ExecutorService newSingleThreadExecutor() public static ExecutorService newCachedThreadPool() public static ScheduledExecutorService newSingleThreadScheduledExecutor() public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
以上工厂方法分别返回具有不同工作特性的线程池。这些线程池工厂方法的具体说明如下。
newFixedThreadPool()方法:该方法返回一个固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。
newSingleThreadExecutor()方法:该方法返回一个只有一个线程的线程池。若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。
newCachedThreadPool()方法:该方法返回一个可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。
newSingleThreadScheduledExecutor()方法:该方法返回一个ScheduledExecutorService对象,线程池大小为1。ScheduledExecutorService接口在ExecutorService接口之上扩展了在给定时间执行某任务的功能,如在某个固定的延时之后执行,或者周期性执行某个任务。
newScheduledThreadPool()方法:该方法也返回一个ScheduledExecutorService对象,但该线程池可以指定线程数量。
1. 固定大小的线程池
这里,我们以newFixedThreadPool()为例,简单地展示线程池的使用:
01 public class ThreadPoolDemo { 02 public static class MyTask implements Runnable { 03 @Override 04 public void run() { 05 System.out.println(System.currentTimeMillis() + ":Thread ID:" 06 + Thread.currentThread().getId()); 07 try { 08 Thread.sleep(1000); 09 } catch (InterruptedException e) { 10 e.printStackTrace(); 11 } 12 } 13 } 14 15 public static void main(String[] args) { 16 MyTask task = new MyTask(); 17 ExecutorService es = Executors.newFixedThreadPool(5); 18 for (int i = 0; i < 10; i++) { 19 es.submit(task); 20 } 21 } 22 }
上述代码中,第17行创建了固定大小的线程池,内有5个线程。在第19行,依次向线程池提交了10个任务。此后,线程池就会安排调度这10个任务。每个任务都会将自己的执行时间和执行这个线程的ID打印出来,并且在这里,安排每个任务要执行1秒钟。
执行上述代码,可以得到类似以下输出:
1426510293450:Thread ID:8 1426510293450:Thread ID:9 1426510293450:Thread ID:12 1426510293450:Thread ID:10 1426510293450:Thread ID:11 1426510294450:Thread ID:12 1426510294450:Thread ID:11 1426510294450:Thread ID:8 1426510294450:Thread ID:10 1426510294450:Thread ID:9
这个输出就表示这10个线程的执行情况。很显然,前5个任务和后5个任务的执行时间正好相差1秒钟(注意时间戳的单位是毫秒),并且前5个任务的线程ID和后5个任务也是完全一致的(都是8、9、10、11、12)。这说明在这10个任务中,是分成2批次执行的。这也完全符合一个只有5个线程的线程池的行为。
有兴趣的读者可以将其改造成newCachedThreadPool(),看看任务的分配情况会有何变化?
2. 计划任务
另外一个值得注意的方法是newScheduledThreadPool()。它返回一个ScheduledExecutorService对象,可以根据时间需要对线程进行调度。它的一些主要方法如下:
public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit); public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
与其他几个线程池不同,ScheduledExecutorService并不一定会立即安排执行任务。它其实是起到了计划任务的作用。它会在指定的时间,对任务进行调度。如果大家使用过Linux下的crontab工具应该就能很容易地理解它了。
作为说明,这里给出了三个方法。方法schedule()会在给定时间,对任务进行一次调度。方法scheduleAtFixedRate()和scheduleWithFixedDelay()会对任务进行周期性的调度。但是两者有一点小小的区别,如图3.5所示。
图3.5 FixedRate和FixDelay区别
对于FixedRate方式来说,任务调度的频率是一定的。它是以上一个任务开始执行时间为起点,之后的period时间,调度下一次任务。而FixDelay则是在上一个任务结束后,再经过delay时间进行任务调度。
由于担心我的解释不够周全,我也很乐意将官方文档中的描述贴出来供大家参考,从而可以更精确地理解两者的差别:
scheduleAtFixedRate
Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently with the given period; that is executions will commence after initialDelay then initialDelay+period, then initialDelay + 2 * period, and so on.
翻译:创建一个周期性任务。任务开始于给定的初始延时。后续的任务按照给定的周期进行:后续第一个任务将会在initialDelay+period时执行,后续第二个任务将在initialDelay+2*period时进行,依此类推。
scheduleWithFixedDelay
Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently with the given delay between the termination of one execution and the commencement of the next.
翻译:创建并执行一个周期性任务。任务开始于初始延时时间,后续任务将会按照给定的延时进行,即上一个任务的结束时间到下一个任务的开始时间的时间差。
下面的例子使用scheduleAtFixedRate()方法调度一个任务。这个任务会执行1秒钟时间,调度周期是2秒。也就是说每2秒钟,任务就会被执行一次。
01 public class ScheduledExecutorServiceDemo { 02 public static void main(String[] args) { 03 ScheduledExecutorService ses=Executors.newScheduledThreadPool(10); 04 //如果前面的任务没有完成,则调度也不会启动 05 ses.scheduleAtFixedRate(new Runnable() { 06 @Override 07 public void run() { 08 try { 09 Thread.sleep(1000); 10 System.out.println(System.currentTimeMillis()/1000); 11 } catch (InterruptedException e) { 12 e.printStackTrace(); 13 } 14 } 15 }, 0, 2, TimeUnit.SECONDS); 16 } 17 }
执行上述代码,一种输出的可能如下:
1426515345 1426515347 1426515349 1426515351
上述输出的单位是秒。可以看到,时间间隔是2秒。
这里还想说一个有意思的事情,如果任务的执行时间超过调度时间,会发生什么情况呢?比如,这里调度周期是2秒,如果任务的执行时间是8秒,是不是会出现多个任务堆叠在一起呢?
实际上,ScheduledExecutorService不会让任务堆叠出现。我们将第9行的代码改为: