饭饭TXT > 学习管理 > 《实战Java高并发程序设计(出书版)》作者:葛一鸣/郭超【完结】 > 实战Java高并发程序设计.txt

第3章 JDK并发包.4

作者:葛一鸣/郭超 当前章节:15458 字 更新时间:2026-6-23 07:00

熟悉的异常又回来了!现在,我们不仅可以得到异常发生的Runnable实现内的信息,我们也知道了这个任务是在哪里提交的。如此丰富的信息,我相信可以帮助我们瞬间定位问题!

3.2.9 分而治之:Fork/Join框架

“分而治之”一直是一个非常有效地处理大量数据的方法。著名的MapReduce也是采取了分而治之的思想。简单来说,就是如果你要处理1000个数据,但是你并不具备处理1000个数据的能力,那么你可以只处理其中的10个,然后,分阶段处理100次,将100次的结果进行合成,那就是最终想要的对原始1000个数据的处理结果。

Fork一词的原始含义是吃饭用的叉子,也有分叉的意思。在Linux平台中,函数fork()用来创建子进程,使得系统进程可以多一个执行分支。在Java中也沿用了类似的命名方式。

而join()的含义在之前的章节中已经解释过,这里也是相同的意思,表示等待。也就是使用fork()后系统多了一个执行分支(线程),所以需要等待这个执行分支执行完毕,才有可能得到最终的结果,因此join()就表示等待。

在实际使用中,如果毫无顾忌地使用fork()开启线程进行处理,那么很有可能导致系统开启过多的线程而严重影响性能。所以,在JDK中,给出了一个ForkJoinPool线程池,对于fork()方法并不急着开启线程,而是提交给ForkJoinPool线程池进行处理,以节省系统资源。使用Fork/Join进行数据处理时的总体结构如图3.8所示。

图3.8 Fork/Join执行逻辑

由于线程池的优化,提交的任务和线程数量并不是一对一的关系。在绝大多数情况下,一个物理线程实际上是需要处理多个逻辑任务的。因此,每个线程必然需要拥有一个任务队列。因此,在实际执行过程中,可能遇到这么一种情况:线程A已经把自己的任务都执行完成了,而线程B还有一堆任务等着处理,此时,线程A就会“帮助”线程B,从线程B的任务队列中拿一个任务过来处理,尽可能地达到平衡。如图3.9所示,显示了这种互相帮助的精神。一个值得注意的地方是,当线程试图帮助别人时,总是从任务队列的底部开始拿数据,而线程试图执行自己的任务时,则是从相反的顶部开始拿。因此这种行为也十分有利于避免数据竞争。

图3.9 互相帮助的线程

下面我们来看一下ForkJoinPool的一个重要的接口:

public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)

你可以向ForkJoinPool线程池提交一个ForkJoinTask任务。所谓ForkJoinTask任务就是支持fork()分解以及join()等待的任务。ForkJoinTask有两个重要的子类,RecursiveAction和RecursiveTask。它们分别表示没有返回值的任务和可以携带返回值的任务。图3.10显示了这两个类的作用和区别。

图3.10 RecursiveAction和RecursiveTask

下面我们简单地展示Fork/Join框架的使用,这里用来计算数列求和。

01 public class CountTask extends RecursiveTask<Long>{ 02 private static final int THRESHOLD = 10000; 03 private long start; 04 private long end; 05 06 public CountTask(long start,long end){ 07 this.start=start; 08 this.end=end; 09 } 10 11 public Long compute(){ 12 long sum=0; 13 boolean canCompute = (end-start)<THRESHOLD; 14 if(canCompute){ 15 for(long i=start;i<=end;i++){ 16 sum +=i; 17 } 18 }else{ 19 //分成100个小任务 20 long step=(start+end)/100; 21 ArrayList<CountTask> subTasks=new ArrayList<CountTask>(); 22 long pos=start; 23 for(int i=0;i<100;i++){ 24 long lastOne=pos+step; 25 if(lastOne>end)lastOne=end; 26 CountTask subTask=new CountTask(pos,lastOne); 27 pos+=step+1; 28 subTasks.add(subTask); 29 subTask.fork(); 30 } 31 for(CountTask t:subTasks){ 32 sum+=t.join(); 33 } 34 } 35 return sum; 36 } 37 38 public static void main(String[]args){ 39 ForkJoinPool forkJoinPool = new ForkJoinPool(); 40 CountTask task = new CountTask(0,200000L); 41 ForkJoinTask<Long> result = forkJoinPool.submit(task); 42 try{ 43 long res = result.get(); 44 System.out.println("sum="+res); 45 }catch(InterruptedException e){ 46 e.printStackTrace(); 47 }catch(ExecutionException e){ 48 e.printStackTrace(); 49 } 50 } 51 }

由于计算数列的和必然是需要函数返回值的,因此选择RecursiveTask作为任务的模型。上述代码第39行,建立ForkJoinPool线程池。在第40行,构造一个计算1到200000求和的任务。在第41行将任务提交给线程池,线程池会返回一个携带结果的任务,通过get()方法可以得到最终结果(第43行)。如果在执行get()方法时,任务没有结束,那么主线程就会在get()方法时等待。

下面来看一下CountTask的实现。首先CountTask继承自RecursiveTask,可以携带返回值,这里的返回值类型设置为long。第2行定义的THRESHOLD设置了任务分解的规模,也就是如果需要求和的总数大于THRESHOLD个,那么任务就需要再次分解,否则就可以直接执行。这个判断逻辑在第14行有体现。如果任务可以直接执行,那么直接进行求和,返回结果。否则,就对任务再次分解。每次分解时,简单地将原有任务划分成100个等规模的小任务,并使用fork()提交子任务。之后,等待所有的子任务结束,并将结果再次求和(第31~33行)。

在使用ForkJoin时需要注意,如果任务的划分层次很深,一直得不到返回,那么可能出现两种情况:第一,系统内的线程数量越积越多,导致性能严重下降。第二,函数的调用层次变得很深,最终导致栈溢出。不同版本的JDK内部实现机制可能有差异,从而导致其表现不同。

下面的StackOverflowError异常就是加深本例的调用层次,在JDK 8上得到的错误。

java.util.concurrent.ExecutionException: java.lang.StackOverflowError at java.util.concurrent.ForkJoinTask.get(ForkJoinTask.java:1000) at geym.conc.ch3.fork.CountTask.main(CountTask.java:51) Caused by: java.lang.StackOverflowError

此外,ForkJoin线程池使用一个无锁的栈来管理空闲线程。如果一个工作线程暂时取不到可用的任务,则可能会被挂起,挂起的线程将会被压入由线程池维护的栈中。待将来有任务可用时,再从栈中唤醒这些线程。

3.3 不要重复发明轮子:JDK的并发容器

除了提供诸如同步控制,线程池等基本工具外,为了提高开发人员的效率,JDK还为大家准备了一大批好用的容器类,可以大大减少开发工作量。大家应该都听说过一种说法,所谓程序就是“算法+数据结构”,这些容器类就是为大家准备好的线程数据结构。你可以在里面找到链表、HashMap、队列等。当然,它们都是线程安全的。

在这里,我也打算花一些篇幅为大家介绍一下这些工具类。这些容器类的封装都是非常完善并且“平易近人”的,也就是说只要你有那么一点点的编程经验,就可以非常容易地使用这些容器。因此,我可能会花更多的时间来分析这些工具的具体实现,希望起到抛砖引玉的作用。

3.3.1 超好用的工具类:并发集合简介

JDK提供的这些容器大部分在java.util.concurrent包中。我先提纲挈领地介绍一下它们,初次露脸,大家只需要知道他们的作用即可。有关具体的实现和注意事项,在后面我会慢慢道来。

ConcurrentHashMap:这是一个高效的并发HashMap。你可以理解为一个线程安全的HashMap。

CopyOnWriteArrayList:这是一个List,从名字看就是和ArrayList是一族的。在读多写少的场合,这个List的性能非常好,远远好于Vector。

ConcurrentLinkedQueue:高效的并发队列,使用链表实现。可以看做一个线程安全的LinkedList。

BlockingQueue:这是一个接口,JDK内部通过链表、数组等方式实现了这个接口。表示阻塞队列,非常适合用于作为数据共享的通道。

ConcurrentSkipListMap:跳表的实现。这是一个Map,使用跳表的数据结构进行快速查找。

除了以上并发包中的专有数据结构外,java.util下的Vector是线程安全的(虽然性能和上述专用工具没得比),另外Collections工具类可以帮助我们将任意集合包装成线程安全的集合。

3.3.2 线程安全的HashMap

在之前的章节中,已经给大家展示了在多线程环境中使用HashMap所带来的问题。那如果需要一个线程安全的HashMap应该怎么做呢?一种可行的方法是使用Collections.synchronizedMap()方法包装我们的HashMap。如下代码,产生的HashMap就是线程安全的:

public static Map m=Collections.synchronizedMap(new HashMap());

Collections.synchronizedMap()会生成一个名为SynchronizedMap的Map。它使用委托,将自己所有Map相关的功能交给传入的HashMap实现,而自己则主要负责保证线程安全。

具体参考下面的实现,首先SynchronizedMap内包装了一个Map。

private static class SynchronizedMap<K,V> implements Map<K,V>, Serializable { private static final long serialVersionUID = 1978198479659022715L; private final Map<K,V> m; // Backing Map final Object mutex; // Object on which to synchronize

通过mutex实现对这个m的互斥操作。比如,对于Map.get()方法,它的实现如下:

public V get(Object key) { synchronized (mutex) {return m.get(key);} }

而其他所有相关的Map操作都会使用这个mutex进行同步。从而实现线程安全。

这个包装的Map可以满足线程安全的要求。但是,它在多线程环境中的性能表现并不算太好。无论是对Map的读取或者写入,都需要获得mutex的锁,这会导致所有对Map的操作全部进入等待状态,直到mutex锁可用。如果并发级别不高,一般也够用。但是,在高并发环境中,我们也有必要寻求新的解决方案。

一个更加专业的并发HashMap是ConcurrentHashMap。它位于java.util.concurrent包内。它专门为并发进行了性能优化,因此,更加适合多线程的场合。

有关ConcurrentHashMap的具体实现细节,大家可以参考“第4章锁的优化及注意事项”一章。我们将在那里给出更加详细的实现说明。

3.3.3 有关List的线程安全

队列、链表之类的数据结构也是极其常用的,几乎所有的应用程序都会与之相关。在Java中,ArrayList和Vector都是使用数组作为其内部实现。两者最大的不同在于Vector是线程安全的,而ArrayList不是。此外,LinkedList使用链表的数据结构实现了List。但是很不幸,LinkedList并不是线程安全的,不过参考前面对HashMap的包装,在这里我们也可以使用Collections.synchronizedList()方法来包装任意List,如下所示:

public static List<String> l=Collections.synchronizedList(new LinkedList<String>());

此时生成的List对象就是线程安全的。

3.3.4 高效读写的队列:深度剖析ConcurrentLinkedQueue

队列Queue也是常用的数据结构之一。在JDK中提供了一个ConcurrentLinkedQueue类用来实现高并发的队列。从名字可以看到,这个队列使用链表作为其数据结构。有关ConcurrentLinkedQueue的性能测试,大家可以自行尝试。这里限于篇幅就不再给出性能测试的代码。大家只要知道ConcurrentLinkedQueue应该算是在高并发环境中性能最好的队列就可以了。它之所有能有很好的性能,是因为其内部复杂的实现。

在这里,我更加愿意花一些篇幅来简单介绍一下ConcurrentLinkedQueue的具体实现细节。不过在深入ConcurrentLinkedQueue之前,我强烈建议大家先阅读一下第4章,补充一下有关无锁操作的一些知识。

作为一个链表,自然需要定义有关链表内的节点,在ConcurrentLinkedQueue中,定义的节点Node核心如下:

private static class Node<E> { volatile E item; volatile Node<E> next;

其中item是用来表示目标元素的。比如,当列表中存放String时,item就是String类型。字段next表示当前Node的下一个元素,这样每个Node就能环环相扣,串在一起了。如图3.11所示,显示了ConcurrentLinkedQueue的基本结构。

图3.11 ConcurrentLinkedQueue基本结构

对Node进行操作时,使用了CAS操作。

boolean casItem(E cmp, E val) { return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); } boolean casNext(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); }

方法casItem()表示设置当前Node的item值。它需要两个参数,第一个参数为期望值,第二个参数为设置目标值。当当前值等于cmp期望值时,就会将目标设置为val。同样casItem()方法也是类似的,但是它是用来设置next字段,而不是item字段。

ConcurrentLinkedQueue内部有两个重要的字段,head和tail,分别表示链表的头部和尾部,它们都是Node类型。对于head来说,它永远不会为null,并且通过head以及succ()后继方法一定能完整地遍历整个链表。对于tail来说,它自然应该表示队列的末尾。

但ConcurrentLinkedQueue的内部实现非常复杂,它允许在运行时链表处于多个不同的状态。以tail为例,一般来说,我们期望tail总是为链表的末尾,但实际上,tail的更新并不是及时的,而是可能会产生拖延现象。如图3.12所示,显示了插入时,tail的更新情况,可以看到tail的更新会产生滞后,并且每次更新会跳跃两个元素。

图3.12 插入节点时tail的更新

可以看到tail并不总是在更新。下面就是ConcurrentLinkedQueue中向队列中添加元素的offer()方法(本节中使用JDK 7u40的代码,不同版本的代码可能存在差异):

01 public boolean offer(E e) { 02 checkNotNull(e); 03 final Node<E> newNode = new Node<E>(e); 04 05 for (Node<E> t = tail, p = t;;) { 06 Node<E> q = p.next; 07 if (q == null) { 08 // p 是最后一个节点 09 if (p.casNext(null, newNode)) { 10 //每2次,更新一下tail 11 if (p != t) 12 casTail(t, newNode); 13 return true; 14 } 15 //CAS竞争失败,再次尝试 16 } 17 else if (p == q) 18 //遇到哨兵节点,从都head开始遍历。 19 //但如果tail被修改,则使用tail(因为可能被修改正确了) 20 p = (t != (t = tail)) ? t : head; 21 else 22 // 取下一个节点或者最后一个节点 23 p = (p != t && t != (t = tail)) ? t : q; 24 } 25 }

首先值得注意的是,这个方法没有任何锁操作。线程安全完全由CAS操作和队列的算法来保证。整个方法的核心是for循环,这个循环没有出口,直到尝试成功,这也符合CAS操作的流程。当第一次加入元素时,由于队列为空,因此p.next为null。程序进入第8行。并将p的next节点赋值为newNode,也就是将新的元素加入到队列中。此时p==t成立,因此不会执行第12行的代码更新tail末尾。如果casNext()成功,程序直接返回,如果失败,则再进行一次循环尝试,直到成功。因此,增加一个元素后,tail并不会被更新。

当程序试图增加第2个元素时,由于t还在head的位置上,因此p.next指向实际的第一个元素,因此第6行的q!=null,这表示q不是最后的节点。由于往队列中增加元素需要最后一个节点的位置,因此,循环开始查找最后一个节点。于是,程序会进入第23行,获得最后一个节点。此时,p实际上是指向链表中的第一个元素,而它的next为null,故在第2个循环时,进入第8行。p更新自己的next,让它指向新加入的节点。如果成功,由于此时p!=t成功,则会更新t所在位置,将t移动到链表最后。

在第17行,处理了p==q的情况。这种情况是由于遇到了哨兵(sentinel)节点导致的。所谓哨兵节点,就是next指向自己的节点。这种节点在队列中的存在价值不大,主要表示要删除的节点,或者空节点。当遇到哨兵节点时,由于无法通过next取得后续的节点,因此很可能直接返回head,期望通过从链表头部开始遍历,进一步查找到链表末尾。但一旦发生在执行过程中,tail被其他线程修改的情况,则进行一次“打赌”,使用新的tail作为链表末尾(这样就避免了重新查找tail的开销)。

如果大家对Java不是特别熟悉,可能会对类似下面的代码产生疑惑(第20行):

p = (t != (t = tail)) ? t : head;

这句代码虽然只有短短一行,但是包含的信息比较多。首先“!=”并不是原子操作,它是可以被中断的。也就是说,在执行“!=”是,程序会先取得t的值,再执行t=tail,并取得新的t的值。然后比较这两个值是否相等。在单线程时,t!=t这种语句显然不会成立。但是在并发环境中,有可能在获得左边的t值后,右边的t值被其他线程修改。这样,t!=t就可能成立。这里就是这种情况。如果在比较过程中,tail被其他线程修改,当它再次赋值给t时,就会导致等式左边的t和右边的t不同。如果两个t不相同,表示tail在中途被其他线程篡改。这时,我们就可以用新的tail作为链表末尾,也就是这里等式右边的t。但如果tail没有被修改,则返回head,要求从头部开始,重新查找尾部。

作为简化问题,我们考察t!=t的字节码(注意这里假设t为静态整形变量):

11: getstatic #10 // Field t:I 14: getstatic #10 // Field t:I 17: if_icmpeq 24

可以看到,在字节码层面,t被先后取了两次,在多线程环境下,我们自然无法保证两次对t的取值会是相同的,如图3.13所示,显示了这种情况。

图3.13 t!=t成立的情况

下面我们来看一下哨兵节点是如何产生的:

ConcurrentLinkedQueue<String> q=new ConcurrentLinkedQueue<String>(); q.add("1"); q.poll();

上述代码第3行,弹出队列内的元素。其执行过程如下:

01 public E poll() { 02 restartFromHead: 03 for (;;) { 04 for (Node<E> h = head, p = h, q;;) { 05 E item = p.item; 06 if (item != null && p.casItem(item, null)) { 07 if (p != h) 08 updateHead(h, ((q = p.next) != null) ? q : p); 09 return item; 10 } 11 else if ((q = p.next) == null) { 12 updateHead(h, p); 13 return null; 14 } 15 else if (p == q) 16 continue restartFromHead; 17 else 18 p = q; 19 } 20 } 21 }

由于队列中只有一个元素,根据前文的描述,此时tail并没有更新,而是指向和head相同的位置。而此时,head本身的item域为null,其next为列表第一个元素。故在第一个循环中,代码直接进入第18行,将p赋值为q,而q就是p.next,也是当前列表中的第一个元素。接着,在第2轮循环中,p.item显然不为null(为字符串1)。因此,代码应该可以顺利进入第7行(如果CAS操作成功)。进入第7行,也意味着p的item域被设置为null(因为这是弹出元素,自然需要删除)。同时,此时p和h是不相等的(因为p已经指向原有的第一个元素了)。故执行了第8行的updateHead()操作,其实现如下:

final void updateHead(Node<E> h, Node<E> p) { if (h != p && casHead(h, p)) h.lazySetNext(h); }

可以看到,在updateHead中,就将p作为新的链表头部(通过casHead()实现),而原有的head就被设置为哨兵(通过lazySetNext()实现)。

这样一个哨兵节点就产生了,而由于此时原有的head头部和tail实际上是同一个元素。因此,再次offer()插入元素时,就会遇到这个tail,也就是哨兵。这就是offer()代码中,第17行的判断的意义。

通过这些说明,大家应该可以明显感觉到,不使用锁而单纯地使用CAS操作会要求在应用层面保证线程安全,并处理一些可能存在的不一致问题,大大增加了程序设计和实现的难度。但是它带来的好处就是可以得到性能的飞速提升。因此,在有些场合也是值得的。

3.3.5 高效读取:不变模式下的CopyOnWriteArrayList

在很多应用场景中,读操作可能会远远大于写操作。比如,有些系统级别的信息,往往只需要加载或者修改很少的次数,但是会被系统内所有模块频繁的访问。对于这种场景,我们最希望看到的就是读操作可以尽可能地快,而写即使慢一些也没有太大关系。

由于读操作根本不会修改原有的数据,因此对于每次读取都进行加锁其实是一种资源浪费。我们应该允许多个线程同时访问List的内部数据,毕竟读取操作是安全的。根据读写锁的思想,读锁和读锁之间确实也不冲突。但是,读操作会受到写操作的阻碍,当写发生时,读就必须等待,否则可能读到不一致的数据。同理,如果读操作正在进行,程序也不能进行写入。

为了将读取的性能发挥到极致,JDK中提供了CopyOnWriteArrayList类。对它来说,读取是完全不用加锁的,并且更好的消息是:写入也不会阻塞读取操作。只有写入和写入之间需要进行同步等待。这样一来,读操作的性能就会大幅度提升。那它是怎么做的呢?

从这个类的名字我们可以看到,所谓CopyOnWrite就是在写入操作时,进行一次自我复制。换句话说,当这个List需要修改时,我并不修改原有的内容(这对于保证当前在读线程的数据一致性非常重要),而是对原有的数据进行一次复制,将修改的内容写入副本中。写完之后,再将修改完的副本替换原来的数据。这样就可以保证写操作不会影响读了。

下面的代码展示了有关读取的实现:

private volatile transient Object[] array; public E get(int index) { return get(getArray(), index); } final Object[] getArray() { return array; } private E get(Object[] a, int index) { return (E) a[index]; }

需要注意的是:读取代码没有任何同步控制和锁操作,理由就是内部数组array不会发生修改,只会被另外一个array替换,因此可以保证数据安全。大家也可以参考“5.2不变模式”一节,相信可以有更深的认识。

和简单的读取相比,写入操作就有些麻烦了:

01 public boolean add(E e) { 02 final ReentrantLock lock = this.lock; 03 lock.lock(); 04 try { 05 Object[] elements = getArray(); 06 int len = elements.length; 07 Object[] newElements = Arrays.copyOf(elements, len + 1); 08 newElements[len] = e; 09 setArray(newElements); 10 return true; 11 } finally { 12 lock.unlock(); 13 } 14 }

首先,写入操作使用锁,当然这个锁仅限于控制写-写的情况。其重点在于第7行代码,进行了内部元素的完整复制。因此,会生成一个新的数组newElements。然后,将新的元素加入newElements。接着,在第9行,使用新的数组替换老的数组,修改就完成了。整个过程不会影响读取,并且修改完后,读取线程可以立即“察觉”到这个修改(因为array变量是volatile类型)。

3.3.6 数据共享通道:BlockingQueue

前文中,我们已经提到了ConcurrentLinkedQueue作为高性能的队列。对于并发程序而言,高性能自然是一个我们需要追求的目标。但多线程的开发模式还会引入一个问题,那就是如何进行多个线程间的数据共享呢?比如,线程A希望给线程B发一个消息,用什么方式告知线程B是比较合理的呢?

一般来说,我们总是希望整个系统是松散耦合的。比如,你所在小区的物业希望可以得到一些业主的意见,设立了一个意见箱,如果对物业有任何要求和或者意见都可以投到意见箱里。这时,作为业主的你并不需要直接找到物业相关的领导表达你的意见。实际上,物业的工作人员也可能经常发生变动,直接找工作人员未必是一件方便的事情。而你投递到意见箱的意见总是会被物业的工作人员看到,不管是否发生了人员的变动。这样,你就可以很容易地表达自己的诉求了。你既不需要直接和他们对话,又可以轻松提出自己的建议(这里假定我们物业公司的员工都是尽心尽责的好员工)。

将这个模式映射到我们程序中。就是说我们既希望线程A能够通知线程B,又希望线程A不知道线程B的存在。这样,如果将来进行重构或者升级,我们完全可以不修改线程A,而直接把线程B升级为线程C,保证系统的平滑过渡。而这中间的“意见箱”就可以使用BlockingQueue来实现。

与之前提到的ConcurrentLinkedQueue或者CopyOnWriteArrayList不同,BlockingQueue是一个接口,并非一个具体的实现。它的主要实现有下面一些,如图3.14所示。

图3-14 BlockingQueue的主要实现

这里我们主要介绍ArrayBlockingQueue和LinkedBlockingQueue。从名字应该可以得知,ArrayBlockingQueue是基于数组实现的,而LinkedBlockingQueue基于链表。也正因为如此,ArrayBlockingQueue更适合做有界队列,因为队列中可容纳的最大元素需要在队列创建时指定(毕竟数组的动态扩展不太方便)。而LinkedBlockingQueue适合做无界队列,或者那些边界值非常大的队列,因为其内部元素可以动态增加,它不会因为初值容量很大,而一口气吃掉你一大半的内存。

而BlockingQueue之所有适合作为数据共享的通道,其关键还在于Blocking上。Blocking是阻塞的意思,当服务线程(服务线程指不断获取队列中的消息,进行处理的线程)处理完成队列中所有的消息后,它如何知道下一条消息何时到来呢?

一种最傻瓜化的做法是让这个线程按照一定的时间间隔不停地循环和监控这个队列。这是可行的一种方案,但显然造成了不必要的资源浪费,而循环周期也难以确定。而BlockingQueue很好地解决了这个问题。它会让服务线程在队列为空时,进行等待,当有新的消息进入队列后,自动将线程唤醒,如图3.15所示。那它是如何实现的呢?我们以ArrayBlockingQueue为例,来一探究竟。

图3.15 BlockingQueue的工作模式

ArrayBlockingQueue的内部元素都放置在一个对象数组中:

final Object[] items;

向队列中压入元素可以使用offer()方法和put()方法。对于offer()方法,如果当前队列已经满了,它就会立即返回false。如果没有满,则执行正常的入队操作。所以,我们不讨论这个方法。现在,我们需要关注的是put()方法。put()方法也是将元素压入队列末尾。但如果队列满了,它会一直等待,直到队列中有空闲的位置。

从队列中弹出元素可以使用poll()方法和take()方法。它们都从队列的头部获得一个元素。不同之处在于:如果队列为空poll()方法直接返回null,而take()方法会等待,直到队列内有可用元素。

因此,put()方法和take()方法才是体现Blocking的关键。为了做好等待和通知两件事,在ArrayBlockingQueue内部定义了以下一些字段:

final ReentrantLock lock; private final Condition notEmpty; private final Condition notFull;

当执行take()操作时,如果队列为空,则让当前线程等待在notEmpty上。新元素入队时,则进行一次notEmpty上的通知。

下面的代码显示了take()的过程:

01 public E take() throws InterruptedException { 02 final ReentrantLock lock = this.lock; 03 lock.lockInterruptibly(); 04 try { 05 while (count == 0) 06 notEmpty.await(); 07 return extract(); 08 } finally { 09 lock.unlock(); 10 } 11 }

第6行代码,就要求当前线程进行等待。当队列中有新元素时,线程会得到一个通知。下面是元素入队时的一段代码:

1 private void insert(E x) { 2 items[putIndex] = x; 3 putIndex = inc(putIndex); 4 ++count; 5 notEmpty.signal(); 6 }

注意第5行代码,当新元素进入队列后,需要通知等待在notEmpty上的线程,让他们继续工作。

同理,对于put()操作也是一样的,当队列满时,需要让压入线程等待,如下面第7行。

01 public void put(E e) throws InterruptedException { 02 checkNotNull(e); 03 final ReentrantLock lock = this.lock; 04 lock.lockInterruptibly(); 05 try { 06 while (count == items.length) 07 notFull.await(); 08 insert(e); 09 } finally { 10 lock.unlock(); 11 } 12 }

当有元素从队列中被挪走,队列中出现空位时,自然也需要通知等待入队的线程:

1 private E extract() { 2 final Object[] items = this.items; 3 E x = this.<E>cast(items[takeIndex]); 4 items[takeIndex] = null; 5 takeIndex = inc(takeIndex); 6 --count; 7 notFull.signal(); 8 return x; 9 }

目录
设置
设置
阅读主题
字体风格
雅黑 宋体 楷书 卡通
字体大小
适中 偏大 超大
保存设置
恢复默认
手机
手机阅读
扫码获取链接,使用浏览器打开
书架同步,随时随地,手机阅读
首 页 < 上一章 章节列表 下一章 > 尾 页