非阻塞IO---NIO

  1. 云栖社区>
  2. 博客>
  3. 正文

非阻塞IO---NIO

chenjian44 2019-01-29 16:32:11 浏览1199
展开阅读全文

NIO编程

NIO(Non-block IO, 非阻塞IO),与Socket类和ServerSocket类相对应,NIO也提供了SocketChannel和ServerSocketChannel两种不同的套接字通道实现,这两种新增的通道都支持阻塞和非阻塞模式。阻塞模式使用简单,但是性能和可靠性不好,非阻塞模式刚好相反。一般来说,低负载、低并发的应用程序可以选择同步阻塞IO来降低编程复杂度,但是对于高并发、高负载的应用,需要使用NIO的非阻塞IO进行开发。

NIO类库简介

  • 缓冲区Buffer
    Buffer是一个对象,包含要写入或者要读出的数据,在NIO类库中加入Buffer对象,使得在面向流的IO中,可以直接将数据写入或者直接将数据读到Stream对象中。在NIO中,所有数据都是通过缓冲区进行操作的,缓冲区实质上是一个数组,提供了对数据的结构化访问以及维护读写位置等信息。

image

  • 通道Channel
    Channel是一个通道,可以通过它读写网络数据,与流的不同之处在于,流是单向的(一个流必须是InputStream或者OutputStream的子类),而Channel可以用于读、写或者同时读写,Channel是全双工的,一般分为两类,分别是用于网络读写的SelectableChannel和用于文件操作的FileChannel

image

  • 多路复用器Selector
    多路复用器提供选择已经就绪的任务的能力,Selector会不断轮询注册在其上的Channel,如果某个Channel上面有新的TCP连接接入 、读和写事件,这个Channel就处于就绪状态,会被Selector轮询出来,然后通过SelectionKey可以获取Channel的集合,进行后续的IO操作;一个多路复用器Selector可以同时轮询多个Channel,由于JDK底层使用了epoll()代替了select的实现,所以没有最大连接句柄的限制。

NIO服务端序列图

image
1.打开ServerSocketChannel,用于监听客户端的连接,它是所有客户端连接的父管道

ServerSocketChannel acceptorServ = ServerSocketChannel.open();

2.绑定监听端口,设置连接为非阻塞模式

acceptorServ.socket().bind(new InetSocketAddress(InetAddress.getByName("IP"), port));
acceptorServ.configureBlocking(false);

3.创建Reactor线程,创建多路复用器并启动线程

Selector selector = Selector.open();
new Thread(new ReactorTask()).start();

4.将ServerSocketChannel注册到Reactor线程的多路复用器上,监听ACCEPT事件

SelectionKey key = acceptorServ.register(selector, SelectionKey.OP_ACCEPT, ioHandler);

5.多路复用器在线程run方法的无限循环体内轮询准备就绪的Key

int num = selector.select();
Set selectedKeys = selector.selectedKeys();
Iterator it = selectedKeys.iterator();
while(it.hasNext()) {
    SelectionKey key = (SelectionKey)it.next();
    //deal with IO event...
}

6.多路复用器监听到有新的客户端接入,处理新的请求,完成三次握手,建立物理链路

SocketChannel channel = svrChannel.accept();

7.设置客户端链路为非阻塞链路

channel.configureBlocking(false);
channel.socket().setReuseAddress(true);

8.将新接入的客户端连接注册到Reactor线程的多路复用器上,监听读操作,用来读取客户端发送的网络信息

SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ, ioHandler);

9.异步读取客户端请求到缓冲区

int readNumber = channel.read(reveicedBuffer);

10.对ByteBuffer进行编解码,如果有半包消息指针reset,继续读取后续的报文,将解码成功的消息封装成Task,投递到业务线程池中,进行业务逻辑变排

Object message = null;
while(buffer.hasRemain()) {
    byteBuffer.mark();
    Object message = decode(byteBuffer);
    if (message == null) {
        byteBuffer.reset();
        break;
    }
    messageList.add(message);
}
if(!byteBuffer.hasRemain()) {
    byteBuffer.clear();
} else {
   byteBuffer.compact();
}
if(messageList != null & !messageList.isEmpty()) {
    for (Object messageE: messageList )
        handerTask(messageE);
}

11.将POJO对象encode成ByteBuffer,调用SocketChannel的异步write接口,将消息异步发送给客户端

socketChannel.write(buffer);

NIO客户端序列图

image
1.打开SocketChannel,绑定本地客户端地址

SocketChannel clientChannel = SocketChannel.open();

2.设置SocketChannel为非阻塞模式,同时设置客户端连接的TCP参数

clientChannel.configureBlocking(false);
socket.setReuseAddres(true);
socket.setReveiceBufferSize(BUFFER_SIZE);
socket.setSendBufferSize(BUFFER_SIZE);

3.异步连接服务端

boolean connected = clientChannel.connect(new InetSocketAddress("ip", port));

4.判断是否连接成功,如果连接成功,则直接注册状态位到多路复用器中,如果当前没有连接成功,返回false,说明客户端已发送sync包,服务端没有返回ack包,物理链路还没建立

if(connected)
    clientChannel.register(selector, SelectionKey.OP_READ, ioHandler);
else
    clientChannel.register(selector, SelectionKey.OP_CONNECT, ioHandler);

5.向Reactor线程的多路复用器注册OP_CONNECT状态位,监听服务端的TCPACK应答

clientChannel.register(selector, SelectionKey.OP_CONNECT, ioHandler);

6.创建Reactor线程,创建多路复用器并启动线程

Selector selector = Selector.open();
new Thread(new ReactorTask()).start();

7.多路复用器在线程run方法的无限循环体内轮询准备就绪的Key

int num = selector.select();
Set selectedKeys = selector.selectedKeys();
Iterator it = selectedKeys.iterator();
while(it.hasNext()) {
    SelectionKey key = (SelectionKey)it.next();
    //deal with IO event...
}

8.接受connect事件进行处理

if (key.isConnectable())
    //handlerConnect()

9.判断连接结果,如果连接成功,注册读事件到多路复用器

if (channel.finishConnect())
     registerRead();

10.注册读事件到多路复用器

clientChannel.register(selector, SelectionKey.OP_READ, ioHandler);

11.异步读取客户端请求到缓冲区

int readNumber = channel.read(reveicedBuffer);

12.对ByteBuffer进行编解码,如果有半包消息指针reset,继续读取后续的报文,将解码成功的消息封装成Task,投递到业务线程池中,进行业务逻辑变排

Object message = null;
while(buffer.hasRemain()) {
    byteBuffer.mark();
    Object message = decode(byteBuffer);
    if (message == null) {
        byteBuffer.reset();
        break;
    }
    messageList.add(message);
}
if(!byteBuffer.hasRemain()) {
    byteBuffer.clear();
} else {
   byteBuffer.compact();
}
if(messageList != null & !messageList.isEmpty()) {
    for (Object messageE: messageList )
        handerTask(messageE);
}

13.将POJO对象encode成ByteBuffer,调用SocketChannel的异步write接口,将消息异步发送给客户端

socketChannel.write(buffer);

NIO创建的TimeServer和TimeClient源码

https://github.com/chenjian44/netty_readings_note/tree/master/nio

对比BIO的优点总结

  • 客户端发起的连接操作是异步的,可以通过在多路复用器注册OP_CONNECT等待后续结果,不需要像之前的客户端那样被同步阻塞
  • SocketChannel的读写操作都是异步的,如果没有可以写的数据它不会同步等待,直接返回,这样IO通信线程就可以处理其他链路,不需要同步等待这个链路可用
  • 线程模型的优化:jdk的Selector在Linux服务器上通过epoll实现,没有连接句柄的限制,意味着一个Selector线程可用同时处理上千万的客户端连接,而且性能不会随着客户端连接数增加而线性下降,非常适合做高性能、高负载的网络服务器

网友评论

登录后评论
0/500
评论
chenjian44
+ 关注