饭饭TXT > 学习管理 > 《实战Java高并发程序设计(出书版)》作者:葛一鸣/郭超【完结】 > 实战Java高并发程序设计.txt

第5章 并行模式与算法.4

作者:葛一鸣/郭超 当前章节:15373 字 更新时间:2026-6-23 07:00

为了能够统计服务器线程在一个客户端上花费了多少时间,这里还需要定义一个与时间统计有关的类:

public static Map<Socket,Long> time_stat=new HashMap<Socket,Long>(10240);

它用于统计在某一个Socket上花费的时间,time_stat的key为Socket,value为时间戳(可以记录处理开始时间)。

下面就可以来看一下NIO服务器的核心代码,下面的startServer()方法用于启动NIO Server:

01 private void startServer() throws Exception { 02 selector = SelectorProvider.provider().openSelector(); 03 ServerSocketChannel ssc = ServerSocketChannel.open(); 04 ssc.configureBlocking(false); 05 06 InetSocketAddress isa = new InetSocketAddress(InetAddress.getLocalHost(), 8000); 07 InetSocketAddress isa = new InetSocketAddress(8000); 08 ssc.socket().bind(isa); 09 10 SelectionKey acceptKey = ssc.register(selector, SelectionKey.OP_ACCEPT); 11 12 for (;;) { 13 selector.select(); 14 Set readyKeys = selector.selectedKeys(); 15 Iterator i = readyKeys.iterator(); 16 long e=0; 17 while (i.hasNext()) { 18 SelectionKey sk = (SelectionKey) i.next(); 19 i.remove(); 20 21 if (sk.isAcceptable()) { 22 doAccept(sk); 23 } 24 else if (sk.isValid() && sk.isReadable()) { 25 if(!time_stat.containsKey(((SocketChannel)sk.channel()).socket())) 26 time_stat.put(((SocketChannel)sk.channel()).socket(), 27 System.currentTimeMillis()); 28 doRead(sk); 29 } 30 else if (sk.isValid() && sk.isWritable()) { 31 doWrite(sk); 32 e=System.currentTimeMillis(); 33 long b=time_stat.remove(((SocketChannel)sk.channel()).socket()); 34 System.out.println("spend:"+(e-b)+"ms"); 35 } 36 } 37 } 38 }

上述代码第2行,通过工厂方法获得一个Selector对象的实例。第3行,获得表示服务端的SocketChannel实例。第4行,将这个SocketChannel设置为非阻塞模式。实际上,Channel也可以像传统的Socket那样按照阻塞的方式工作。但在这里,更倾向于让其工作在非阻塞模式,在这种模式下,我们才可以向Channel注册感兴趣的事件,并且在数据准备好时,得到必要的通知。接着,在第6~8行进行端口绑定,将这个Channel绑定在8000端口。

在第10行,将这个ServerSocketChannel绑定到Selector上,并注册它感兴趣的时间为Accept。这样,Selector就能为这个Channel服务了。当Selector发现ServerSocketChannel有新的客户端连接时,就会通知ServerSocketChannel进行处理。方法register()的返回值是一个SelectionKey,SelectionKey表示一对Selector和Channel的关系。当Channel注册到Selector上时,就相当于确立了两者的服务关系,那么SelectionKey就是这个契约。当Selector或者Channel被关闭时,它们对应的SelectionKey就会失效。

第12~37行是一个无穷循环,它的主要任务就是等待-分发网络消息。

第13行的select()方法是一个阻塞方法。如果当前没有任何数据准备好,它就会等待。一旦有数据可读,它就会返回。它的返回值是已经准备就绪的SelectionKey的数量。这里简单地将其忽略。

第14行获取那些准备好的SelectionKey。因为Selector同时为多个Channel服务,因此已经准备就绪的Channel就有可能是多个。所以,这里得到的自然是一个集合。得到这个就绪集合后,剩下的就是遍历这个集合,挨个处理所有的Channel数据。

第15行得到这个集合的迭代器。第17行使用迭代器遍历整个集合。第18行根据迭代器获得一个集合内的SelectionKey实例。

第19行将这个元素移除!注意,这个非常重要,否则就会重复处理相同的SelectionKey。当你处理完一个SelectionKey后,务必将其从集合内删除。

第21行判断当前SelectionKey所代表的Channel是否在Acceptable状态,如果是,就进行客户端的接收(执行doAccept()方法)。

第24行判断Channel是否已经可以读了,如果是就进行读取(doRead()方法)。这里为了统计系统处理每一个连接的时间,在第25~27行记录了在读取数据之前的一个时间戳。

第30行判断通道是否准备好进行写。如果是就进行写入(doWrite()方法),同时在写入完成后,根据读取前的时间戳,输出处理这个Socket连接的耗时。

在了解服务端的整体框架后,下面让我们从细节着手,学习一下几个主要方法的内部实现。首先是doAccept()方法,它与客户端建立连接:

01 private void doAccept(SelectionKey sk) { 02 ServerSocketChannel server = (ServerSocketChannel) sk.channel(); 03 SocketChannel clientChannel; 04 try { 05 clientChannel = server.accept(); 06 clientChannel.configureBlocking(false); 07 08 // Register this channel for reading. 09 SelectionKey clientKey = clientChannel.register(selector, SelectionKey.OP_READ); 10 // Allocate an EchoClient instance and attach it to this selection key. 11 EchoClient echoClient = new EchoClient(); 12 clientKey.attach(echoClient); 13 14 InetAddress clientAddress = clientChannel.socket().getInetAddress(); 15 System.out.println("Accepted connection from " + clientAddress.getHostAddress() + "."); 16 } catch (Exception e) { 17 System.out.println("Failed to accept new client."); 18 e.printStackTrace(); 19 } 20 }

和Socket编程很类似,当有一个新的客户端连接接入时,就会有一个新的Channel产生代表这个连接。上述代码第5行,生成的clientChannel就表示和客户端通信的通道。第6行,将这个Channel配置为非阻塞模式,也就是要求系统在准备好IO后,再通知我们的线程来读取或者写入。

第9行很关键,它将新生成的Channel注册到selector选择器上,并告诉Selector,我现在对读(OP_READ)操作感兴趣。这样,当Selector发现这个Channel已经准备好读时,就能给线程一个通知。

第11行新建一个对象实例,一个EchoClient实例代表一个客户端。在第12行,我们将这个客户端实例作为附件,附加到表示这个连接的SelectionKey上。这样在整个连接的处理过程中,我们都可以共享这个EchoClient实例。

EchoClient的定义很简单,它封装了一个队列,保存在需要回复给这个客户端的所有信息,这样,再进行回复时,只要从outq对象中弹出元素即可。

class EchoClient { private LinkedList<ByteBuffer> outq; EchoClient() { outq = new LinkedList<ByteBuffer>(); } public LinkedList<ByteBuffer> getOutputQueue() { return outq; } public void enqueue(ByteBuffer bb) { outq.addFirst(bb); } }

下面来看一下另外一个重要的方法doRead()。当Channel可以读取时,doRead()方法就会被调用。

01 private void doRead(SelectionKey sk) { 02 SocketChannel channel = (SocketChannel) sk.channel(); 03 ByteBuffer bb = ByteBuffer.allocate(8192); 04 int len; 05 06 try { 07 len = channel.read(bb); 08 if (len < 0) { 09 disconnect(sk); 10 return; 11 } 12 } catch (Exception e) { 13 System.out.println("Failed to read from client."); 14 e.printStackTrace(); 15 disconnect(sk); 16 return; 17 } 18 19 bb.flip(); 20 tp.execute(new HandleMsg(sk,bb)); 21 }

方法doRead()接收一个SelectionKey参数,通过这个SelectionKey可以得到当前的客户端Channel(第2行)。在这里,我们准备8K的缓冲区读取数据,所有读取的数据存放在变量bb中(第7行)。读取完成后,重置缓冲区,为数据处理做准备(第19行)。

在这个示例中,我们对数据的处理很简单。但是为了模拟复杂的场景,还是使用了线程池进行数据处理(第20行)。这样,如果数据处理很复杂,就能在单独的线程中进行,而不用阻塞任务派发线程。

HandleMsg的实现也很简单:

01 class HandleMsg implements Runnable{ 02 SelectionKey sk; 03 ByteBuffer bb; 04 public HandleMsg(SelectionKey sk,ByteBuffer bb){ 05 this.sk=sk; 06 this.bb=bb; 07 } 08 @Override 09 public void run() { 10 EchoClient echoClient = (EchoClient) sk.attachment(); 11 echoClient.enqueue(bb); 12 sk.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); 13 //强迫selector立即返回 14 selector.wakeup(); 15 } 16 }

上述代码,简单地将接收到的数据压入EchoClient的队列(第11行)。如果需要处理业务逻辑,就可以在这里进行处理。

在数据处理完成后,就可以准备将结果回写到客户端,因此,重新注册感兴趣的消息事件,将写操作(OP_WRITE)也作为感兴趣的事件进行提交(第12行)。这样在通道准备好写入时,就能通知线程。

写入操作使用doWrite()函数实现:

01 private void doWrite(SelectionKey sk) { 02 SocketChannel channel = (SocketChannel) sk.channel(); 03 EchoClient echoClient = (EchoClient) sk.attachment(); 04 LinkedList<ByteBuffer> outq = echoClient.getOutputQueue(); 05 06 ByteBuffer bb = outq.getLast(); 07 try { 08 int len = channel.write(bb); 09 if (len == -1) { 10 disconnect(sk); 11 return; 12 } 13 14 if (bb.remaining() == 0) { 15 // The buffer was completely written, remove it. 16 outq.removeLast(); 17 } 18 } catch (Exception e) { 19 System.out.println("Failed to write to client."); 20 e.printStackTrace(); 21 disconnect(sk); 22 } 23 24 if (outq.size() == 0) { 25 sk.interestOps(SelectionKey.OP_READ); 26 } 27 }

函数doWrite()也接收一个SelectionKey,当然针对一个客户端来说,这个SelectionKey实例和doRead()拿到的SelectionKey是同一个。因此,通过SelectionKey我们就可以在这两个操作中共享EchoClient实例。上述代码第3~4行,我们取得EchoClient实例以及它的发送内容列表。第6行,获得列表顶部元素,准备写回客户端。第8行进行写回操作。如果全部发送完成,则移除这个缓存对象(第16行)。

在doWrite()中最重要的,也是最容易被忽略的是在全部数据发送完成后(也就是outq的长度为0),需要将写事件(OP_WRITE)从感兴趣的操作中移除(第25行)。如果不这么做,每次Channel准备好写时,都会来执行doWrite()方法。而实际上,你又无数据可写,这显然是不合理的。因此,这个操作很重要。

上面我们已经介绍了主要的核心代码,现在使用这个NIO服务器来处理上一节中客户端的访问。同样的,客户端也是要花费将近6秒钟,才能完成一次消息的发送,那使用NIO技术后,服务端线程需要花费多少时间来处理这些请求呢?答案如下:

spend:2ms spend:2ms spend:2ms spend:2ms spend:3ms spend:3ms spend:0ms spend:0ms spend:2ms spend:3ms

可以看到,在使用NIO技术后,即使客户端迟钝或者出现了网络延迟等现象,并不会给服务器带来太大的问题。

5.10.3 使用NIO来实现客户端

在前面的案例中,我们使用Socket编程来构建我们的客户端,使用NIO来实现服务端。实际上,使用NIO也可以用来创建客户端。这里,我们再演示一下使用NIO创建客户端的例子。

和构造服务器类似,核心的元素也是Selector、Channel和SelectionKey。

首先,我们需要初始化Selector和Channel:

01 private Selector selector; 02 public void init(String ip, int port) throws IOException { 03 SocketChannel channel = SocketChannel.open(); 04 channel.configureBlocking(false); 05 this.selector = SelectorProvider.provider().openSelector(); 06 channel.connect(new InetSocketAddress(ip, port)); 07 channel.register(selector, SelectionKey.OP_CONNECT); 08 }

上述代码第3行,创建一个SocketChannel实例,并设置为非阻塞模式。第5行创建了一个Selector。第6行,将SocketChannel绑定到Socket上。但由于当前Channel是非阻塞的,因此,connect()方法返回时,连接并不一定建立成功,在后续使用这个连接时,还需要使用finishConnect()再次确认。第7行,将这个Channel和Selector进行绑定,并注册了感兴趣的事件作为连接(OP_CONNECT)。

初始化完成后,就是程序的主要执行逻辑:

01 public void working() throws IOException { 02 while (true) { 03 if (!selector.isOpen()) 04 break; 05 selector.select(); 06 Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator(); 07 while (ite.hasNext()) { 08 SelectionKey key = ite.next(); 09 ite.remove(); 10 // 连接事件发生 11 if (key.isConnectable()) { 12 connect(key); 13 } else if (key.isReadable()) { 14 read(key); 15 } 16 } 17 } 18 }

在上述代码中,第5行通过Selector得到已经准备好的事件。如果当前没有任何事件准备就绪,这里就会阻塞。这里的整个处理机制和服务端非常类似,主要处理两个事件,首先是表示连接就绪的Connct事件(由connect()函数处理)以及表示通道可读的Read事件(由read()函数处理)。

函数connect()的实现如下:

01 public void connect(SelectionKey key) throws IOException { 02 SocketChannel channel = (SocketChannel) key.channel(); 03 // 如果正在连接,则完成连接 04 if (channel.isConnectionPending()) { 05 channel.finishConnect(); 06 } 07 channel.configureBlocking(false); 08 channel.write(ByteBuffer.wrap(new String("hello server!\r\n") 09 .getBytes())); 10 channel.register(this.selector, SelectionKey.OP_READ); 11 }

上述connect()函数接收SelectionKey作为其参数。在第4~6行,它首先判断是否连接已经建立,如果没有,则调用finishConnect()完成连接。建立连接后,向Channel写入数据,并同时注册读事件为感兴趣的事件(第10行)。

当Channel可读时,会执行read()方法,进行数据读取:

01 public void read(SelectionKey key) throws IOException { 02 SocketChannel channel = (SocketChannel) key.channel(); 03 // 创建读取的缓冲区 04 ByteBuffer buffer = ByteBuffer.allocate(100); 05 channel.read(buffer); 06 byte[] data = buffer.array(); 07 String msg = new String(data).trim(); 08 System.out.println("客户端收到信息:" + msg); 09 channel.close(); 10 key.selector().close(); 11 }

上述read()函数首先创建了100字节的缓冲区(第4行),接着从Channel中读取数据,并将其打印在控制台上。最后,关闭Channel和Selector。

5.11 读完了再通知我:AIO

AIO是异步IO的缩写,即Asynchronized。虽然NIO在网络操作中,提供了非阻塞的方法,但是NIO的IO行为还是同步的。对于NIO来说,我们的业务线程是在IO操作准备好时,得到通知,接着就由这个线程自行进行IO操作,IO操作本身还是同步的。

但对于AIO来说,则更加进了一步,它不是在IO准备好时再通知线程,而是在IO操作已经完成后,再给线程发出通知。因此,AIO是完全不会阻塞的。此时,我们的业务逻辑将变成一个回调函数,等待IO操作完成后,由系统自动触发。

下面,我将通过AIO来实现一个简单的EchoServer以及对应的客户端。

5.11.1 AIO EchoServer的实现

异步IO需要使用异步通道(AsynchronousServerSocketChannel):

public final static int PORT = 8000; private AsynchronousServerSocketChannel server; public AIOEchoServer() throws IOException { server = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(PORT)); }

上述代码绑定了8000端口为服务器端口,并使用AsynchronousServerSocketChannel异步Channel作为服务器,变量名为server。

我们使用这个server来进行客户端的接收和处理:

01 public void start() throws InterruptedException, ExecutionException, TimeoutException { 02 System.out.println("Server listen on " + PORT); 03 //注册事件和事件完成后的处理器 04 server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() { 05 final ByteBuffer buffer = ByteBuffer.allocate(1024); 06 public void completed(AsynchronousSocketChannel result, Object attachment) { 07 System.out.println(Thread.currentThread().getName()); 08 Future<Integer> writeResult=null; 09 try { 10 buffer.clear(); 11 result.read(buffer).get(100, TimeUnit.SECONDS); 12 buffer.flip(); 13 writeResult=result.write(buffer); 14 } catch (InterruptedException | ExecutionException e) { 15 e.printStackTrace(); 16 } catch (TimeoutException e) { 17 e.printStackTrace(); 18 } finally { 19 try { 20 server.accept(null, this); 21 writeResult.get(); 22 result.close(); 23 } catch (Exception e) { 24 System.out.println(e.toString()); 25 } 26 } 27 } 28 29 @Override 30 public void failed(Throwable exc, Object attachment) { 31 System.out.println("failed: " + exc); 32 } 33 }); 34 }

上述定义的start()方法开启了服务器。值得注意的是,这个方法除了第2行的打印语句外,只调用了一个函数server.accept()。之后,你看到的那一大堆代码只是这个函数的参数。

AsynchronousServerSocketChannel.accept()方法会立即返回。它并不会真的去等待客户端的到来。在这里使用的accept()方法的签名为:

public final <A> void accept(A attachment, CompletionHandler<AsynchronousSocketChannel,? super A> handler)

它的第一个参数是一个附件,可以是任意类型,作用是让当前线程和后续的回调方法可以共享信息,它会在后续调用中,传递给handler。它的第二个参数是CompletionHandler接口。这个接口有两个方法:

void completed(V result, A attachment); void failed(Throwable exc, A attachment);

这两个方法分别在异步操作accept()成功或者失败时被回调。

因此AsynchronousServerSocketChannel.accept()实际上做了两件事,第一是发起accept请求,告诉系统可以开始监听端口了。第二,注册CompletionHandler实例,告诉系统,一旦有客户端前来连接,如果成功连接,就去执行CompletionHandler.completed()方法;如果连接失败,就去执行CompletionHandler.failed()方法。

所以,server.accept()方法不会阻塞,它会立即返回。

下面,来分析一下CompletionHandler.completed()的实现。当completed()被执行时,意味着已经有客户端成功连接了。在第11行,使用read()方法读取客户的数据。这里要注意,AsynchronousSocketChannel.read()方法也是异步的,换句话说它不会等待读取完成了再返回,而是立即返回,返回的结果是一个Future,因此这里就是Future模式的典型应用。为了编程方便,我在这里直接调用Future.get()方法,进行等待,将这个异步方法变成了同步方法。因此,在第11行执行完成后,数据读取就已经完成了。

之后,将数据回写给客户端(第13行)。这里调用的是AsynchronousSocketChannel.write()方法。这个方法不会等待数据全部写完,也是立即返回的。同样,它返回的也是Future对象。

再之后,在第20行,服务器进行下一个客户端连接的准备。同时关闭当前正在处理的客户端连接。但在关闭之前,得先确保之前的write()操作已经完成,因此,使用Future.get()方法进行等待(第21行)。

接下来,我们只需要在主函数中调用这个start()方法就可以开启服务器了:

01 public static void main(String args[]) throws Exception { 02 new AIOEchoServer().start(); 03 // 主线程可以继续自己的行为 04 while (true) { 05 Thread.sleep(1000); 06 } 07 }

上述代码第2行,调用start()方法开启服务器。但由于start()方法里使用的都是异步方法,因此它会马上返回,它并不像阻塞方法那样会进行等待。因此,如果想让程序驻守执行,第4~6行的等待语句是必需的。否则,在start()方法结束后,不等客户端到来,程序已经运行完成,主线程就将退出。

5.11.2 AIO Echo客户端实现

在服务端的实现中,我们使用Future.get()方法将异步调用转为了一个同步等待。在客户端的实现里,我们将全部使用异步回调实现:

01 public class AIOClient { 02 public static void main(String[] args) throws Exception { 03 final AsynchronousSocketChannel client = AsynchronousSocketChannel.open(); 04 client.connect(new InetSocketAddress("localhost", 8000), null, new CompletionHandler<Void, Object>() { 05 @Override 06 public void completed(Void result, Object attachment) { 07 client.write(ByteBuffer.wrap("Hello!".getBytes()), null, new CompletionHandler<Integer, Object>() { 08 @Override 09 public void completed(Integer result, Object attachment) { 10 try { 11 ByteBuffer buffer = ByteBuffer.allocate(1024); 12 client.read(buffer,buffer,new CompletionHandler<Integer, ByteBuffer>(){ 13 @Override 14 public void completed(Integer result, ByteBuffer buffer) { 15 buffer.flip(); 16 System.out.println(new String(buffer.array())); 17 try { 18 client.close(); 19 } catch (IOException e) { 20 e.printStackTrace(); 21 } 22 } 23 @Override 24 public void failed(Throwable exc, ByteBuffer attachment) { 25 } 26 }); 27 } catch (Exception e) { 28 e.printStackTrace(); 29 } 30 } 31 @Override 32 public void failed(Throwable exc, Object attachment) { 33 } 34 }); 35 } 36 @Override 37 public void failed(Throwable exc, Object attachment) { 38 } 39 }); 40 //由于主线程马上结束,这里等待上述处理全部完成 41 Thread.sleep(1000); 42 } 43 }

上面的AIO客户端看起来代码很长,但实际上只有三个语句。

第一个语句为第3行,打开AsynchronousSocketChannel通道。第二个语句是第4~39行,它让客户端去连接指定的服务器,并注册了一系列事件。第三个语句是第41行,让线程进行等待。虽然第2个语句看起来很长,但是它完全是异步的,因此会很快返回,并不会等待在连接操作的过程中。如果不进行等待,客户端会马上退出,也就无法继续工作了。

第4行,客户端进行网络连接,并注册了连接成功的回调函数CompletionHandler<Void, Object>。待连接成功后,就会进入代码第7行。第7行进行数据写入,向服务端发送数据。这个过程也是异步的,会很快返回。写入完成后,会通知回调接口CompletionHandler<Integer, Object>,进入第10行。第10行开始,准备进行数据读取,从服务端读取回写的数据。当然,第12行的read()函数也是立即返回的,成功读取所有数据后,会回调CompletionHandler<Integer, ByteBuffer>接口,进入第15行。在第15~16行,打印接收到的数据。

5.12 参考文献

有关disruptor的性能测试

https://github.com/LMAX-Exchange/disruptor/wiki/Performance-Results

disruptor的小例子

https://github.com/LMAX-Exchange/disruptor/wiki/Getting-Started

伪共享的案例

http://mechanical-sympathy.blogspot.jp/2011/07/false-sharing.html

有关并行排序的更详细资料

The Art of Concurrency: A Thread Monkey's Guide to Writing Parallel Applications

并发的艺术Clay Brebears著

目录
设置
设置
阅读主题
字体风格
雅黑 宋体 楷书 卡通
字体大小
适中 偏大 超大
保存设置
恢复默认
手机
手机阅读
扫码获取链接,使用浏览器打开
书架同步,随时随地,手机阅读
首 页 < 上一章 章节列表 下一章 > 尾 页