饭饭TXT > 学习管理 > 《实战Java高并发程序设计(出书版)》作者:葛一鸣/郭超【完结】 > 实战Java高并发程序设计.txt

第6章 Java 8与并发.2

作者:葛一鸣/郭超 当前章节:15363 字 更新时间:2026-6-23 07:00

public class PrimeUtil { public static boolean isPrime(int number) { int tmp = number; if (tmp < 2) { return false; } for (int i = 2; Math.sqrt(tmp) >= i; i++) { if (tmp % i == 0) { return false; } } return true; } }

上述函数给定一个数字,如果这个数字是质数就返回true,否则返回false。

接着,使用函数式编程统计给定范围内所有的质数:

IntStream.range(1, 1000000).filter(PrimeUtil::isPrime).count();

上述代码首先生成一个1到1000000的数字流。接着使用过滤函数,只选择所有的质数,最后进行数量统计。

上述代码是串行的,将它改造成并行计算非常简单,只需要将流并行化即可:

IntStream.range(1, 1000000).parallel().filter(PrimeUtil::isPrime).count();

上述代码中,首先parallel()方法得到一个并行流,接着,在并行流上进行过滤,此时,PrimeUtil.isPrime()函数会被多线程并发调用,应用于流中的所有元素。

6.4.2 从集合得到并行流

在函数式编程中,我们可以从集合得到一个流或者并行流。下面这段代码试图统计集合内所有学生的平均分:

List<Student> ss=new ArrayList<Student>(); double ave=ss.stream().mapToInt(s->s.score).average().getAsDouble();

从集合对象List中,我们使用stream()方法可以得到一个流。如果希望将这段代码并行化,则可以使用parallelStream()函数。

double ave=ss.parallelStream().mapToInt(s->s.score).average().getAsDouble();

可以看到,将原有的串行方式改造成并行执行是非常容易的。

6.4.3 并行排序

除了并行流外,对于普通数组,Java 8中也提供了简单的并行功能。比如,对于数组排序,我们有Arrays.sort()方法。当然这是串行排序,但在Java 8中,我们可以使用新增的Arrays. parallelSort()方法直接使用并行排序。

比如,你可以这样使用:

int[] arr=new int[10000000]; Arrays.parallelSort(arr);

除了并行排序外,Arrays中还增加了一些API用于数组中数据的赋值,比如:

public static void setAll(int[] array, IntUnaryOperator generator)

这是一个函数式味道很浓的接口,它的第2个参数是一个函数式接口。如果我们想给数组中每一个元素都附上一个随机值,则可以这么做:

Random r=new Random(); Arrays.setAll(arr, (i)->r.nextInt());

当然,以上过程是串行的。但是只要使用setAll()对应的并行版本,你就可以很快将它执行在多个CPU上:

Random r=new Random(); Arrays.parallelSetAll (arr, (i)->r.nextInt());

6.5 增强的Future:CompletableFuture

CompletableFuture是Java 8新增的一个超大型工具类。为什么说它大呢?因为一方面,它实现了Future接口,而更重要的是,它也实现了CompletionStage接口。CompletionStage接口也是在Java 8中新增的。而CompletionStage接口拥有多达约40种方法!是的,你没有看错,这看起来完全不符合设计原则中所谓的“单方法接口”,但是在这里,它就这么存在了。这个接口之所以拥有如此众多的方法,是为了函数式编程中的流式调用准备的。通过CompletionStage提供的接口,我们可以在一个执行结果上进行多次流式调用,以此可以得到最终结果。比如,你可以在一个CompletionStage上进行如下调用:

stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println())

这一连串的调用就会挨个执行。

6.5.1 完成了就通知我

CompletableFuture和Future一样,可以作为函数调用的契约。如果你向CompletableFuture请求一个数据,如果数据还没有准备好,请求线程就会等待。而让人惊喜的是,通过CompletableFuture,我们可以手动设置CompletableFuture的完成状态。

01 public static class AskThread implements Runnable { 02 CompletableFuture<Integer> re = null; 03 04 public AskThread(CompletableFuture<Integer> re) { 05 this.re = re; 06 } 07 08 @Override 09 public void run() { 10 int myRe = 0; 11 try { 12 myRe = re.get() * re.get(); 13 } catch (Exception e) { 14 } 15 System.out.println(myRe); 16 } 17 } 18 19 public static void main(String[] args) throws InterruptedException { 20 final CompletableFuture<Integer> future = new CompletableFuture<>(); 21 new Thread(new AskThread(future)).start(); 22 // 模拟长时间的计算过程 23 Thread.sleep(1000); 24 // 告知完成结果 25 future.complete(60); 26 }

上述代码在第1~17行,定义了一个AskThread线程。它接收一个CompletableFuture作为其构造函数,它的任务是计算CompletableFuture表示的数字的平方,并将其打印。

代码第20行,我们创建一个CompletableFuture对象实例,第21行,我们将这个对象实例传递给这个AskThread线程,并启动这个线程。此时,AskThread在执行到第12行代码时会阻塞,因为CompletableFuture中根本没有它所需要的数据,整个CompletableFuture处于未完成状态。第23行用于模拟长时间的计算过程。当计算完成后,可以将最终数据载入CompletableFuture,并标记为完成状态(第25行)。

当第25行代码执行后,表示CompletableFuture已经完成,因此AskThread就可以继续执行了。

6.5.2 异步执行任务

通过CompletableFuture提供的进一步封装,我们很容易实现Future模式那样的异步调用。比如:

01 public static Integer calc(Integer para) { 02 try { 03 // 模拟一个长时间的执行 04 Thread.sleep(1000); 05 } catch (InterruptedException e) { 06 } 07 return para*para; 08 } 09 10 public static void main(String[] args) throws InterruptedException, ExecutionException { 11 final CompletableFuture<Integer> future = 12 CompletableFuture.supplyAsync(() -> calc(50)); 13 System.out.println(future.get()); 14 }

上述代码中,第11~12行使用CompletableFuture.supplyAsync()方法构造一个CompletableFuture实例,在supplyAsync()函数中,它会在一个新的线程中,执行传入的参数。在这里,它会执行calc()方法。而calc()方法的执行可能是比较慢的,但是这不影响CompletableFuture实例的构造速度,因此supplyAsync()会立即返回,它返回的CompletableFuture对象实例就可以作为这次调用的契约,在将来任何场合,用于获得最终的计算结果。代码第13行,试图获得calc()的计算结果,如果当前计算没有完成,则调用get()方法的线程就会等待。

在CompletableFuture中,类似的工厂方法有以下几个:

static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier); static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor); static CompletableFuture<Void> runAsync(Runnable runnable); static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);

其中supplyAsync()方法用于那些需要有返回值的场景,比如计算某个数据等。而runAsync()方法用于没有返回值的场景,比如,仅仅是简单地执行某一个异步动作。

在这两对方法中,都有一个方法可以接收一个Executor参数。这就使我们可以让Supplier <U>或者Runnable在指定的线程池中工作。如果不指定,则在默认的系统公共的ForkJoinPool.common线程池中执行。

注意:在Java 8中,新增了ForkJoinPool.commonPool()方法。它可以获得一个公共的ForkJoin线程池。这个公共线程池中的所有线程都是Daemon线程。这意味着如果主线程退出,这些线程无论是否执行完毕,都会退出系统。

6.5.3 流式调用

在前文中我已经简单的提到,CompletionStage的约40个接口是为函数式编程做准备的。在这里,就让我们看一下,如何使用这些接口进行函数式的流式API调用:

01 public static Integer calc(Integer para) { 02 try { 03 // 模拟一个长时间的执行 04 Thread.sleep(1000); 05 } catch (InterruptedException e) { 06 } 07 return para*para; 08 } 09 10 public static void main(String[] args) throws InterruptedException, ExecutionException { 11 CompletableFuture<Void> fu=CompletableFuture.supplyAsync(() -> calc(50)) 12 .thenApply((i)->Integer.toString(i)) 13 .thenApply((str)->"\""+str+"\"") 14 .thenAccept(System.out::println); 15 fu.get(); 16 }

上述代码中,使用supplyAsync()函数执行一个异步任务。接着连续使用流式调用对任务的处理结果进行再加工,直到最后的结果输出。

这里,我们在第15行执行CompletableFuture.get()方法,目的是等待calc()函数执行完成。如果不进行这个等待调用,由于CompletableFuture异步执行的缘故,主函数不等calc()方法执行完毕就会退出,随着主线程的结束,所有的Daemon线程都会立即退出,从而导致calc()方法无法正常完成。

6.5.4 CompletableFuture中的异常处理

如果CompletableFuture在执行过程中遇到异常,我们可以用函数式编程的风格来优雅地处理这些异常。CompletableFuture提供了一个异常处理方法exceptionally():

01 public static Integer calc(Integer para) { 02 return para / 0; 03 } 04 05 public static void main(String[] args) throws InterruptedException,ExecutionException { 06 CompletableFuture<Void> fu = CompletableFuture 07 .supplyAsync(() -> calc(50)) 08 .exceptionally(ex -> { 09 System.out.println(ex.toString()); 10 return 0; 11 }) 12 .thenApply((i) -> Integer.toString(i)) 13 .thenApply((str) -> "\"" + str + "\"") 14 .thenAccept(System.out::println); 15 fu.get(); 16 }

在上述代码中,第8行对当前的CompletableFuture进行异常处理。如果没有异常发生,则CompletableFuture就会返回原有的结果。如果遇到了异常,就可以在exceptionally()中处理异常,并返回一个默认的值。在上例中,我们忽略了异常堆栈,只是简单地打印异常的信息。

执行上述函数,我们将得到输出:

java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero "0"

6.5.5 组合多个CompletableFuture

CompletableFuture还允许你将多个CompletableFuture进行组合。一种方法是使用thenCompose(),它的签名如下:

public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)

一个CompletableFuture可以在执行完成后,将执行结果通过Function传递给下一个CompletionStage进行处理(Function接口返回新的CompletionStage实例):

01 public static Integer calc(Integer para) { 02 return para/2; 03 } 04 05 public static void main(String[] args) throws InterruptedException, ExecutionException { 06 CompletableFuture<Void> fu = 07 CompletableFuture.supplyAsync(() -> calc(50)) 08 .thenCompose((i)->CompletableFuture.supplyAsync(() -> calc(i))) 09 .thenApply((str)->"\"" + str + "\"").thenAccept(System.out::println); 10 fu.get(); 11 }

上述代码第8行,将处理后的结果传递给thenCompose(),并进一步传递给后续新生成的CompletableFuture实例。以上代码的输出如下:

"12"

另外一种组合多个CompletableFuture的方法是thenCombine(),它的签名如下:

public <U,V> CompletableFuture<V> thenCombine (CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)

方法thenCombine()首先完成当前CompletableFuture和other的执行。接着,将这两者的执行结果传递给BiFunction(该接口接收两个参数,并有一个返回值),并返回代表BiFunction实例的CompletableFuture对象:

01 public static Integer calc(Integer para) { 02 return para / 2; 03 } 04 05 public static void main(String[] args) throws InterruptedException,ExecutionException { 06 CompletableFuture<Integer> intFuture = CompletableFuture.supplyAsync(() -> calc(50)); 07 CompletableFuture<Integer> intFuture2 = CompletableFuture.supplyAsync(() -> calc(25)); 08 09 CompletableFuture<Void> fu = intFuture.thenCombinet(intFuture2, (i, j) -> (i + j)) 10 .thenApply((str) -> "\"" + str + "\"") 11 .thenAccept(System.out::println); 12 fu.get(); 13 }

上述代码中,首先生成两个CompletableFuture实例(第6~7行),接着使用thenCombine()组合这两个CompletableFuture,将两者的执行结果进行累加(由第9行的(i, j) -> (i + j)实现),并将其累加结果转为字符串,并输出。上述代码的输出是:

"37"

6.6 读写锁的改进:StampedLock

StampedLock是Java 8中引入的一种新的锁机制。简单的理解,可以认为它是读写锁的一个改进版本。读写锁虽然分离了读和写的功能,使得读与读之间可以完全并发。但是,读和写之间依然是冲突的。读锁会完全阻塞写锁,它使用的依然是悲观的锁策略,如果有大量的读线程,它也有可能引起写线程的“饥饿”。

而StampedLock则提供了一种乐观的读策略。这种乐观的锁非常类似无锁的操作,使得乐观锁完全不会阻塞写线程。

6.6.1 StampedLock使用示例

StampedLock的使用并不困难,下面是StampedLock的使用示例:

01 public class Point { 02 private double x, y; 03 private final StampedLock sl = new StampedLock(); 04 05 void move(double deltaX, double deltaY) { // 这是一个排它锁 06 long stamp = sl.writeLock(); 07 try { 08 x += deltaX; 09 y += deltaY; 10 } finally { 11 sl.unlockWrite(stamp); 12 } 13 } 14 15 double distanceFromOrigin() { // 只读方法 16 long stamp = sl.tryOptimisticRead(); 17 double currentX = x, currentY = y; 18 if (!sl.validate(stamp)) { 19 stamp = sl.readLock(); 20 try { 21 currentX = x; 22 currentY = y; 23 } finally { 24 sl.unlockRead(stamp); 25 } 26 } 27 return Math.sqrt(currentX * currentX + currentY * currentY); 28 } 29 }

上述代码出自JDK的官方文档。它定义了一个点Point类,内部有两个元素x和y,表示点的坐标。第3行,定义了StampedLock锁。第15行定义的distanceFromOrigin()方法是一个只读方法,它只会读取Point的x和y坐标。在读取时,首先使用了StampedLock.tryOptimisticRead()方法。这个方法表示试图尝试一次乐观读。它会返回一个类似于时间戳的邮戳整数stamp。这个stamp就可以作为这一次锁获取的凭证。

接着,在第17行,读取x和y的值。当然,这时我们并不确定这个x和y是否是一致的(在读取x的时候,可能其他线程改写了y的值,使得currentX和currentY处于不一致的状态),因此,我们必须在第18行,使用validate()方法,判断这个stamp是否在读过程发生期间被修改过。如果stamp没有被修改过,则认为这次读取是有效的,因此就可以跳转到第27行,进行数据处理。反之,如果stamp是不可用的,则意味着在读取的过程中,可能被其他线程改写了数据,因此,有可能出现了脏读。如果出现这种情况,我们可以像处理CAS操作那样在一个死循环中一直使用乐观读,直到成功为止。

也可以升级锁的级别。在本例中,我们升级乐观锁的级别,将乐观锁变为悲观锁。在第19行,当判断乐观读失败后,使用readLock()获得悲观的读锁,并进一步读取数据。如果当前对象正在被修改,则读锁的申请可能导致线程挂起。

写入的情况可以参考第5行定义的move()函数。使用writeLock()函数可以申请写锁。这里的含义和读写锁是类似的。

在退出临界区时,不要忘记释放写锁(第11行)或者读锁(第24行)。

可以看到,StampedLock通过引入乐观读来增加系统的并行度。

6.6.2 StampedLock的小陷阱

StampedLock内部实现时,使用类似于CAS操作的死循环反复尝试的策略。在它挂起线程时,使用的是Unsafe.park()函数,而park()函数在遇到线程中断时,会直接返回(注意,不同于Thread.sleep(),它不会抛出异常)。而在StampedLock的死循环逻辑中,没有处理有关中断的逻辑。因此,这就会导致阻塞在park()上的线程被中断后,会再次进入循环。而当退出条件得不到满足时,就会发生疯狂占用CPU的情况。这一点值得我们注意,下面演示了这个问题:

01 public class StampedLockCPUDemo { 02 static Thread[] holdCpuThreads = new Thread[3]; 03 static final StampedLock lock = new StampedLock(); 04 public static void main(String[] args) throws InterruptedException { 05 new Thread() { 06 public void run() { 07 long readLong = lock.writeLock(); 08 LockSupport.parkNanos(600000000000L); 09 lock.unlockWrite(readLong); 10 } 11 }.start(); 12 Thread.sleep(100); 13 for (int i = 0; i < 3; ++i) { 14 holdCpuThreads[i] = new Thread(new HoldCPUReadThread()); 15 holdCpuThreads[i].start(); 16 } 17 Thread.sleep(10000); 18 //线程中断后,会占用CPU 19 for (int i = 0; i < 3; ++i) { 20 holdCpuThreads[i].interrupt(); 21 } 22 } 23 24 private static class HoldCPUReadThread implements Runnable { 25 public void run() { 26 long lockr = lock.readLock(); 27 System.out.println(Thread.currentThread().getName()+ " 获得读锁"); 28 lock.unlockRead(lockr); 29 } 30 } 31 }

在上述代码中,首先开启线程占用写锁(第7行),注意,为了演示效果,这里使写线程不释放锁而一直等待。接着,开启3个读线程,让它们请求读锁。此时,由于写锁的存在,所有读线程都会被最终挂起。

下面是其中一个读线程在挂起时的信息:

"Thread-2" #10 prio=5 os_prio=0 tid=0x14b1d800 nid=0xafc waiting on condition [0x153ef000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x046b54c8> (a java.util.concurrent.locks.StampedLock) at java.util.concurrent.locks.StampedLock.acquireRead(StampedLock.java:1215) at java.util.concurrent.locks.StampedLock.readLock(StampedLock.java:428) at geym.conc.ch6.stamped.StampedLockCPUDemo$HoldCPUReadThread.run (StampedLockCPUDemo.java:35) at java.lang.Thread.run(Thread.java:745)

可以看到,这个线程因为park()的操作而进入了等待状态,这种情况是正常的。

而在10秒以后(代码第17行执行了10秒等待),系统中断了这3个读线程,之后,你就会发现,你的CPU占用率极有可能会飙升。这是因为中断导致park()函数返回,使线程再次进入运行状态,下面是同一个线程在中断后的信息:

"Thread-2" #10 prio=5 os_prio=0 tid=0x14b1d800 nid=0xafc runnable [0x153ef000] java.lang.Thread.State: RUNNABLE at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x046b54c8> (a java.util.concurrent.locks.StampedLock) at java.util.concurrent.locks.StampedLock.acquireRead(StampedLock.java:1215) at java.util.concurrent.locks.StampedLock.readLock(StampedLock.java:428) at geym.conc.ch6.stamped.StampedLockCPUDemo$HoldCPUReadThread.run (StampedLockCPUDemo.java:35) at java.lang.Thread.run(Thread.java:745)

此时,这个线程的状态是RUNNABLE,这是我们不愿意看到的。它会一直存在并耗尽CPU资源,直到自己抢占到了锁。

6.6.3 有关StampedLock的实现思想

StampedLock的内部实现是基于CLH锁的。CLH锁是一种自旋锁,它保证没有饥饿发生,并且可以保证FIFO(First-In-First-Out)的服务顺序。

CLH锁的基本思想如下:锁维护一个等待线程队列,所有申请锁,但是没有成功的线程都记录在这个队列中。每一个节点(一个节点代表一个线程),保存一个标记位(locked),用于判断当前线程是否已经释放锁。

当一个线程试图获得锁时,取得当前等待队列的尾部节点作为其前序节点,并使用类似如下代码判断前序节点是否已经成功释放锁:

while (pred.locked) { }

只要前序节点(pred)没有释放锁,则表示当前线程还不能继续执行,因此会自旋等待。

反之,如果前序线程已经释放锁,则当前线程可以继续执行。

释放锁时,也遵循这个逻辑,线程会将自身节点的locked位置标记为false,那么后续等待的线程就能继续执行了。

如图6.4所示,显示了CLH队列锁的基本思想。

图6-4 CLH队列锁

StampedLock正是基于这种思想,但是实现上更为复杂。

在StampedLock内部,会维护一个等待链表队列:

01 /** Wait nodes */ 02 static final class WNode { 03 volatile WNode prev; 04 volatile WNode next; 05 volatile WNode cowait; // 读节点链表 06 volatile Thread thread; // 当可能被暂停时非空 07 volatile int status; // 0, WAITING, or CANCELLED 08 final int mode; // RMODE or WMODE 09 WNode(int m, WNode p) { mode = m; prev = p; } 10 } 11 12 /** CLH 队列头部 */ 13 private transient volatile WNode whead; 14 /** CLH 队列尾部 */ 15 private transient volatile WNode wtail;

上述代码中,WNode为链表的基本元素,每一个WNode表示一个等待线程。字段whead和wtail分别指向等待链表的头部和尾部。

另外一个重要的字段为state:

private transient volatile long state;

字段state表示当前锁的状态。它是一个long型,有64位,其中,倒数第8位表示写锁状态,如果该位为1,表示当前由写锁占用。

对于一次乐观读的操作,它会执行如下操作:

public long tryOptimisticRead() { long s; return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L; }

一次成功的乐观读必须保证当前锁没有写锁占用。其中WBIT用来获取写锁状态位,值为0x80。如果成功,则返回当前state的值(末尾7位清零,末尾7位表示当前正在读取的线程数量)。

如果在乐观读后,有线程申请了写锁,那么state的状态就会改变:

1 public long writeLock() { 2 long s, next; // bypass acquireWrite in fully unlocked case only 3 return ((((s = state) & ABITS) == 0L && 4 U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ? 5 next : acquireWrite(false, 0L)); 6 }

上述代码中第4行,设置写锁位为1(通过加上WBIT(0x80))。这样,就会改变state的取值。那么在乐观锁确认(validate)时,就会发现这个改动,而导致乐观锁失效。

public boolean validate(long stamp) { U.loadFence(); return (stamp & SBITS) == (state & SBITS); }

上述validate()函数比较当前stamp和发生乐观锁时取得的stamp,如果不一致,则宣告乐观锁失败。

乐观锁失败后,则可以提升锁级别,使用悲观读锁。

1 public long readLock() { 2 long s = state, next; // bypass acquireRead on common uncontended case 3 return ((whead == wtail && (s & ABITS) < RFULL && 4 U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ? 5 next : acquireRead(false, 0L)); 6 }

悲观读会尝试设置state状态(第4行),它会将state加1(前提是读线程数量没有溢出,对于读线程数量溢出的情况,会使用辅助的readerOverflow进行统计,我们在这里不做过于烦琐的讨论),用于统计读线程的数量。如果失败,则进入acquireRead()二次尝试锁获取。

目录
设置
设置
阅读主题
字体风格
雅黑 宋体 楷书 卡通
字体大小
适中 偏大 超大
保存设置
恢复默认
手机
手机阅读
扫码获取链接,使用浏览器打开
书架同步,随时随地,手机阅读
首 页 < 上一章 章节列表 下一章 > 尾 页