《NETTY官方文档》4.0的新特性及注意点(二)

  1. 云栖社区>
  2. 博客>
  3. 正文

《NETTY官方文档》4.0的新特性及注意点(二)

青衫无名 2017-05-18 14:20:00 浏览2374
展开阅读全文

半关闭套接字(Half-closed sockets)

TCP及SCTP允许在不完全关闭socket的前提下关闭socket的出站传输。这样的socket称之为 ‘a half-closed socket’,用户可以通过调用 SocketChannel.shutdownOutput() 方法来产生半关闭socket。如果远端节点关闭了出站传输,SocketChannel.read(..) 就会返回 -1,看起来跟关闭的连接似乎没区别。

3.x没有 shutdownOutput() 操作。并且 当 SocketChannel.read(..) 返回 -1 时总是会关闭连接。

4.0中加入了 SocketChannel.shutdownOutput() 方法来支持半关闭socket,同时,用户可以设置 ChannelOption 为 ‘ALLOW_HALF_CLOSURE’ 来防止Netty在 SocketChannel.read(..) 返回 -1 时自动关闭连接。

灵活的 I/O 线程分配

3.x通过 ChannelFactory 创建 Channel,并且新创建的 Channel 会自动注册到一个隐藏的 I/O 线程上。4.0用新接口 EventLoopGroup 替代了 ChannelFactory,它由一个或者多个 EventLoop 组成。并且,新建的 Channel 不会自动注册到 EventLoopGroup,你必须显式调用 EventLoopGroup.register() 来完成注册。

基于此变更(即:ChannelFactory 与 I/O 线程的分离)就可以把不同的 Channel 实现注册到同样的 EventLoopGroup 上,或者同样的 Channel 实现注册到不同的 EventLoopGroup 上。例如,你可以运行NIO server socket, NIO client sockets, NIO UDP sockets及in-VM local channels在同样的 I/O 线程上。当编写需要极低延迟的代理服务器的时候,这将十分有用。

从已存在的 JDK socket 中创建Channel

3.x无法从已存在的 JDK socket 中创建 Channel,如 java.nio.channels.SocketChannel。4.0可以了。

从 I/O 线程中注销及重新注册Channel

3.x中,一旦 Channel 创建了,它就会绑定到一个 I/O 线程上,直到这个线程关闭为止。4.0中,用户可以把 Channel 从它的 I/O 线程中注销来获得它底层的 JDK sokcet 的完全控制权。比如,你可以利用高级non-blocking I/O Netty支持( high-level non-blocking I/O Netty provides)来处理复杂的协议,然后可以注销 Channel ,再切换为阻塞模式来传输文件,以达到最大的吞吐。当然,也可以把 Channel 再重新注册回去。

java.nio.channels.FileChannel myFile = ...;
java.nio.channels.SocketChannel mySocket = java.nio.channels.SocketChannel.open();

// 执行一些阻塞操作
...

// Netty 接管
SocketChannel ch = new NioSocketChannel(mySocket);
EventLoopGroup group = ...;
group.register(ch);
...

// 从 Netty 注销
ch.deregister().sync();

// 执行一些阻塞操作
mySocket.configureBlocking(true);
myFile.transferFrom(mySocket, ...);

// 重新注册到另一个 event loop group
EventLoopGroup anotherGroup = ...;
anotherGroup.register(ch);

使用 I/O 线程调度任意任务

把 Channel 注册到 EventLoopGroup 时,实际上是注册到了 EventLoopGroup 管理的一个 EventLoop 上。EventLoop 实现了 java.util.concurrent.ScheduledExecutorService。这意味着,用户可以在该channel所属的 I/O 线程上执行或者调度(execute or schedule)任意 Runnable 或者 Callable。基于后面会讲到的新的设计良好的线程模型,实现一个线程安全的handler将会十分容易。

public class MyHandler extends ChannelOutboundHandlerAdapter {
    ...
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise p) {
        ...
        ctx.write(msg, p);

        // 调度一个写超时任务
        ctx.executor().schedule(new MyWriteTimeoutTask(p), 30, TimeUnit.SECONDS);
        ...
    }
}

public class Main {
    public static void main(String[] args) throws Exception {
        // 使用 I/O 线程运行任意任务
        Channel ch = ...;
        ch.executor().execute(new Runnable() { ... });
    }
}

简化了的关闭

没有 releaseExternalResources() 了。你可以使用 EventLoopGroup.shutdownGracefully() 立即关闭所有已经打开的channel以及让所有的 I/O 线程自行停止。

类型安全的 ChannelOption

Netty有两种方式可以配置 Channel 的socket参数。一种是显式调用 ChannelConfig 的setters,如 SocketChannelConfig.setTcpNoDelay(true)。这是最类型安全的方式了。另一种是调用 ChannelConfig.setOption() 方法。有时候你认为有些socket选项是运行时配置的,这个方法刚好适用于这种场景。但3.x中,因为用户传入一个string和一个object,所以很容易出错。当用户传入错误的选项名或者值时,用户可能会收到一个 ClassCastException 错误,或者干脆只是被默默忽略掉。

4.0引入了新的类 ChannelOption 来提供类型安全的socket配置。

ChannelConfig cfg = ...;

// Before:
cfg.setOption("tcpNoDelay", true);
cfg.setOption("tcpNoDelay", 0);  // 运行时 ClassCastException
cfg.setOption("tcpNoDelays", true); // 打错了配置名 —— 静默忽略

// After:
cfg.setOption(ChannelOption.TCP_NODELAY, true);
cfg.setOption(ChannelOption.TCP_NODELAY, 0); // 编译错误

AttributeMap

应用户要求,现在你可以在 Channel 及 ChannelHandlerContext 上附加任何对象了。Channel 及 ChannelHandlerContext都实现了 AttributeMap 这个新接口。同时,ChannelLocal 及 Channel.attachment 被移除了。当 Channel 被GC时,其相应的属性值会被一起GC。

public class MyHandler extends ChannelInboundHandlerAdapter {

    private static final AttributeKey<MyState> STATE =
            AttributeKey.valueOf("MyHandler.state");

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) {
        ctx.attr(STATE).set(new MyState());
        ctx.fireChannelRegistered();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        MyState state = ctx.attr(STATE).get();
    }
    ...
}

新的 bootstrap API

bootstrap API 被完全重写了,当然,用途跟原来是一样的。它遵循了常见的样例代码中运行server或client的典型步骤。

新的bootstrap还支持流式API。

public static void main(String[] args) throws Exception {
    // 配置 server.
    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
         .channel(NioServerSocketChannel.class)
         .option(ChannelOption.SO_BACKLOG, 100)
         .localAddress(8080)
         .childOption(ChannelOption.TCP_NODELAY, true)
         .childHandler(new ChannelInitializer<SocketChannel>() {
             @Override
             public void initChannel(SocketChannel ch) throws Exception {
                 ch.pipeline().addLast(handler1, handler2, ...);
             }
         });

        // 启动 server.
        ChannelFuture f = b.bind().sync();

        // 等待socket关闭
        f.channel().closeFuture().sync();
    } finally {
        // 关闭所有的event loop来终止所有线程
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();

        // 等待所有线程终止
        bossGroup.terminationFuture().sync();
        workerGroup.terminationFuture().sync();
    }
}

ChannelPipelineFactory → ChannelInitializer

你可能注意到上面例子中已经没有 ChannelPipelineFactory 了。它已经替换为支持更多 Channel 及 ChannelPipeline 配置的ChannelInitializer 了。

注意,不要自己创建 ChannelPipeline。Netty项目组根据至今报道的大量用例推断,用户创建自己的pipline实现或者继承其默认实现都不会带来什么好处。因此,ChannelPipeline 不再由用户创建了,而会被 Channel 自动创建。

ChannelFuture → ChannelFuture 及ChannelPromise

ChannelFuture 被拆分为 ChannelFuture 和 ChannelPromise。这不仅是生产者与消费者的异步操作的明确约定,同时可以更安全的使用链中(如filtering)返回的 ChannelFuture 了。因为 ChannelFuture 的状态是不可变的。

基于此变化,部分方法现在接受 ChannelPromise 而不是 ChannelFuture 来修改状态。

良好定义的线程模型

3.x中线程模型定义的并不好,尽管3.5尝试进行了改良也仍然不好。4.0定义了严格的线程模型,这样用户在编写ChannelHandler时不用再过多的担忧线程安全了。

  • Netty不会并发的调用 ChannelHandler 的方法,除非加了 @Sharable 注解。无论入站,出站或者生命周期事件handler方法都一样。
    • 用户不再需要同步入站或者出站事件handler方法了。
    • 4.0仅允许标记 @Sharable 注解的 ChannelHandler 被添加多次。
  • 每个Netty的 ChannelHandler 方法的调用都存在 happens-before 关系。
    • 用户不需要定义 volatile 字段来保存handler的状态
  • 用户在添加handler到 ChannelPipeline 时可以指定 EventExecutor
    • 如果指定了, 则总会使用指定的 EventExecutor 来调用 ChannelHandler 的方法
    • 如果未指定,则总是使用其关联的 Channel 中注册的 EventLoop 来调用handler的方法
  • 分配给handler或者channel的 EventExecutor 及 EventLoop 线程总是单个线程
    • handler的方法总会在同一个线程中执行
    • 如果指定了多线程的 EventExecutor 或者 EventLoop,首先会选中一个线程,并且直到注销为止都会使用这个线程
    • 如果同一个pipeline中的两个handler分配了不同的 EventExecutor,他们会被同时调用。用户就需要关注pipeline中的共享数据的线程安全,即使共享数据只是被读取。
  • 附加到 ChannelFuture 上的 ChannelFutureListeners 总是运行在future关联的 Channel 被分配的 EventLoop 线程上
  • The ChannelFutureListeners added to ChannelFuture are always invoked by the EventLoop thread assigned to the future’s associated Channel.
  • 可以使用 ChannelHandlerInvoker 控制 Channel 的事件顺序。DefaultChannelHandlerInvoker 会立即执行 EventLoop 线程的事件和其他线程提交到 EventExecutor 的 Runnable 对象。下面的例子展示了在 EventLoop 线程中以及其他线程中与Channel交互时的潜在影响。
写排序 – 混合了 EventLoop 线程和其他线程
Channel ch = ...;
ByteBuf a, b, c = ...;

// 线程1 - 非EventLoop线程
ch.write(a);
ch.write(b);

// .. 发生一些事情

// EventLoop线程
ch.write(c);

// a,b,c写入底层传输通道的顺序是未定义的。
// 如果出现了线程间交互而顺序又很重要,那么如何保证顺序性就是用户的职责了

没有 ExecutionHandler 了——移到了核心模块里

在添加 ChannelHandler 到 ChannelPipeline 的时候,可以指定 EventExecutor。这样pipeline 就总会使用指定的 EventExecutor 来调用handler方法。

Channel ch = ...;
ChannelPipeline p = ch.pipeline();
EventExecutor e1 = new DefaultEventExecutor(16);
EventExecutor e2 = new DefaultEventExecutor(8);

p.addLast(new MyProtocolCodec());
p.addLast(e1, new MyDatabaseAccessingHandler());
p.addLast(e2, new MyHardDiskAccessingHandler());

编解码器框架变更

基于4.0中handler创建和管理它自己的buffer(参考本文档中的Per-handler buffer章节),因此编解码框架内部进行了大量的变更。不过用户层面的变化倒不是很大。

  • 核心编解码器类移到了 io.netty.handler.codec 包中
  • FrameDecoder 重命名为 ByteToMessageDecoder
  • OneToOneEncoder 及 OneToOneDecoder 替换为 MessageToMessageEncoder 及 MessageToMessageDecoder
  • decode()decodeLast()encode() 的方法签名进行了些许调整,可支持泛型了,并且移除了多余的参数

Codec embedder → EmbeddedChannel

Codec embedder 替换为 io.netty.channel.embedded.EmbeddedChannel,用户可以测试包含编解码器在内的的任何类型的pipline了。

HTTP 编解码器

HTTP解码器会将单条HTTP消息解码为多个消息对象。

1       * HttpRequest / HttpResponse
0 - n   * HttpContent
1       * LastHttpContent

参照最新的 HttpSnoopServer 样例获取更多细节。如果对于单条HTTP消息你不想处理多个消息对象,你可以传入 HttpObjectAggregator 到pipline中。HttpObjectAggregator 会将多个消息对象转变为单个 FullHttpRequest 或者 FullHttpResponse

传输实现的变更

新增加的transport:

  • OIO SCTP transport
  • UDT transport

用例学习:移植Factorial样例

本节简单的展示了如何将Factorial样例从3.x移植到4.0。移植到4.0的Factorial样例已经放到了 io.netty.example.factorial 包里。请查看源码来了解所有细节修改。

移植服务端

  1. 使用新的bootstrap API来重写 FactorialServer.run()
  2. 没有 ChannelFactory 了,请自行实例化 NioEventLoopGroup (一个是接受入站连接,另一个则是处理已接受的连接)
  3. 重命名 FactorialServerPipelineFactory 为 FactorialServerInitializer
  4. 使其继承 ChannelInitializer<Channel>
  5. 通过 Channel.pipeline() 获取 ChannelPipeline 而不是新建一个
  6. 使 FactorialServerHandler 继承 ChannelInboundHandlerAdapter
  7. 用 channelInactive() 替换 channelDisconnected()
  8. handleUpstream() 没用了
  9. messageReceived() 重命名为 channelRead(),并且请根据方法签名调整参数
  10. ctx.write() 替换为 ctx.writeAndFlush()
  11. 使 BigIntegerDecoder 继承 ByteToMessageDecoder<BigInteger>
  12. 使 NumberEncoder 继承 MessageToByteEncoder<Number>
  13. encode() 不返回buffer了。使用 ByteToMessageDecoder 填充encode过的数据到buffer里。

移植客户端

大部分跟移植服务端一样,不过当你要写入的流很大时则需要多加注意。

  1. 使用新的bootstrap API重写 FactorialClient.run()
  2. FactorialClientPipelineFactory 重命名为 FactorialClientInitializer
  3. 使 FactorialClientHandler 继承 ChannelInboundHandler

转载自 并发编程网 - ifeve.com

网友评论

登录后评论
0/500
评论
青衫无名
+ 关注