Netty实践(三):实际场景下的数据通信

简介:

数据通信的场景:长连接 OR 短连接

在实际场景中,我们如何使用Netty进行通信呢?大致有3种方式:

第一种,使用长连接通道不断开的形式进行通信,也就是服务器和客户端的通道一直处于开启的状态。如果服务器性能足够好,并且我们的客户端数量也比较少的情况下,是适合使用长连接的通道。

第二种,采用短连接方式,一次性批量提交数据,也就是我们会把数据保存在本地临时缓冲区或者临时表里。当达到数量时,就进行批量提交;或者通过定时任务轮询提交。这种情况是有弊端的,就是无法做到实时传输。如果应用程序对实时性要求不高,可以考虑使用。

第三种,采用一种特殊的长连接。特殊在哪里呢?在指定的某一时间之内,服务器与某台客户端没有任何通信,则断开连接,如果断开连接后,客户端又需要向服务器发送请求,那么再次建立连接。这里有点CachedThreadPool的味道。

本篇博客将采用Netty来实现第三种方式的数据通信,接下来我们一起来看看吧~



Netty数据通信代码实例


请求消息对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package  day3;
 
import  java.io.Serializable;
 
public  class  Request  implements  Serializable{
 
    private  static  final  long   SerialVersionUID = 1L;
    
    private  String id ;
    private  String name ;
    private  String requestMessage ;
    
    public  String getId() {
       return  id;
    }
    public  void  setId(String id) {
       this .id = id;
    }
    public  String getName() {
       return  name;
    }
    public  void  setName(String name) {
       this .name = name;
    }
    public  String getRequestMessage() {
       return  requestMessage;
    }
    public  void  setRequestMessage(String requestMessage) {
       this .requestMessage = requestMessage;
    }
 
 
}


响应消息对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package  day3;
 
import  java.io.Serializable;
 
public  class  Response  implements  Serializable{
    
    private  static  final  long  serialVersionUID = 1L;
    
    private  String id;
    private  String name;
    private  String responseMessage;
    
    public  String getId() {
       return  id;
    }
    public  void  setId(String id) {
       this .id = id;
    }
    public  String getName() {
       return  name;
    }
    public  void  setName(String name) {
       this .name = name;
    }
    public  String getResponseMessage() {
       return  responseMessage;
    }
    public  void  setResponseMessage(String responseMessage) {
       this .responseMessage = responseMessage;
    }
    
 
}


编解码处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package  day3;
 
import  io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
import  io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
import  io.netty.handler.codec.marshalling.MarshallerProvider;
import  io.netty.handler.codec.marshalling.MarshallingDecoder;
import  io.netty.handler.codec.marshalling.MarshallingEncoder;
import  io.netty.handler.codec.marshalling.UnmarshallerProvider;
 
import  org.jboss.marshalling.MarshallerFactory;
import  org.jboss.marshalling.Marshalling;
import  org.jboss.marshalling.MarshallingConfiguration;
 
/**
  * Marshalling工厂
  */
public  final  class  MarshallingCodeCFactory {
 
     /**
      * 创建Jboss Marshalling解码器MarshallingDecoder
      * @return MarshallingDecoder
      */
     public  static  MarshallingDecoder buildMarshallingDecoder() {
        //首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
       final  MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory( "serial" );
       //创建了MarshallingConfiguration对象,配置了版本号为5 
       final  MarshallingConfiguration configuration =  new  MarshallingConfiguration();
       configuration.setVersion( 5 );
       //根据marshallerFactory和configuration创建provider
       UnmarshallerProvider provider =  new  DefaultUnmarshallerProvider(marshallerFactory, configuration);
       //构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
       MarshallingDecoder decoder =  new  MarshallingDecoder(provider,  1024 );
       return  decoder;
     }
 
     /**
      * 创建Jboss Marshalling编码器MarshallingEncoder
      * @return MarshallingEncoder
      */
     public  static  MarshallingEncoder buildMarshallingEncoder() {
       final  MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory( "serial" );
       final  MarshallingConfiguration configuration =  new  MarshallingConfiguration();
       configuration.setVersion( 5 );
       MarshallerProvider provider =  new  DefaultMarshallerProvider(marshallerFactory, configuration);
       //构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
       MarshallingEncoder encoder =  new  MarshallingEncoder(provider);
       return  encoder;
     }
}

注意,在上一篇博客《Netty实践(二):TCP拆包、粘包问题》中,我们是自己继承ByteToMessageDecoder、MessageToByteEncoder来实现ByteBuff与消息对象的转化的,其实这是有点麻烦的。在实际中,我们完全可以利用相关序列化框架(JBoss Marshlling/Protobuf/Kryo/MessagePack)来帮助我们快速完成编解码,这里我使用的是JBoss Marshalling(jboss-marshalling-1.3.0.CR9.jar+jboss-marshalling-serial-1.3.0.CR9.jar)。具体来说,客户端和服务端交互的消息对象只需要实现JDK默认的序列化接口,同时利用JBoss Marshalling 生成编码器和解码器,用于后续Client/Server端即可。


Client Handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package  day3;
 
import  io.netty.channel.ChannelHandlerAdapter;
import  io.netty.channel.ChannelHandlerContext;
import  io.netty.util.ReferenceCountUtil;
 
public  class  ClientHandler  extends  ChannelHandlerAdapter{
    
    @Override
    public  void  channelRead(ChannelHandlerContext ctx, Object msg)  throws  Exception {
       try  {
          Response resp = (Response)msg;
          System.out.println( "Client : "  + resp.getId() +  ", "  + resp.getName() +  ", "  + resp.getResponseMessage());       
       finally  {
          ReferenceCountUtil.release(msg);
       }
    }
 
    @Override
    public  void  exceptionCaught(ChannelHandlerContext ctx, Throwable cause)  throws  Exception {
       ctx.close();
    }
    
}

在这里可以清楚的看到,我们直接将Object转化成了自定义消息响应对象,可见JBoss Marshalling与Netty结合后,编解码是如此简单。



Client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package  day3;
 
import  io.netty.bootstrap.Bootstrap;
import  io.netty.channel.ChannelFuture;
import  io.netty.channel.ChannelInitializer;
import  io.netty.channel.EventLoopGroup;
import  io.netty.channel.nio.NioEventLoopGroup;
import  io.netty.channel.socket.SocketChannel;
import  io.netty.channel.socket.nio.NioSocketChannel;
import  io.netty.handler.logging.LogLevel;
import  io.netty.handler.logging.LoggingHandler;
import  io.netty.handler.timeout.ReadTimeoutHandler;
 
import  java.util.concurrent.TimeUnit;
 
 
/**
  *
  */
public  class  Client {
    
    private  static  class  SingletonHolder {
       static  final  Client instance =  new  Client();
    }
    
    public  static  Client getInstance(){
       return  SingletonHolder.instance;
    }
    
    private  EventLoopGroup group;
    private  Bootstrap b;
 
    //通过ChannelFuture实现读写操作
    private  ChannelFuture cf ;
    
    private  Client(){
          group =  new  NioEventLoopGroup();
          b =  new  Bootstrap();
          b.group(group)
           .channel(NioSocketChannel. class )
           .handler( new  LoggingHandler(LogLevel.INFO))
           .handler( new  ChannelInitializer<SocketChannel>() {
                @Override
                protected  void  initChannel(SocketChannel sc)  throws  Exception {
 
                   sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                   sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                   //超时handler(当服务器端与客户端在指定时间以上没有任何进行通信,则会关闭相应的通道,主要为减小服务端资源占用)
                   sc.pipeline().addLast( new  ReadTimeoutHandler( 3 )); 
                   sc.pipeline().addLast( new  ClientHandler());
                }
           });
    }
    
    public  void  connect(){
       try  {
          this .cf = b.connect( "127.0.0.1" 8765 ).sync();
          System.out.println( "远程服务器已经连接, 可以进行数据交换.." );            
       catch  (Exception e) {
          e.printStackTrace();
       }
    }
 
    //这里是通道关闭,再次建立连接的核心代码
    public  ChannelFuture getChannelFuture(){
       
       if ( this .cf ==  null ){
          this .connect();
       }
       if (! this .cf.channel().isActive()){
          this .connect();
       }
       
       return  this .cf;
    }
    
    public  static  void  main(String[] args)  throws  Exception{
 
       final  Client c = Client.getInstance();
 
       //注意client好像没有调用connect()方法进行连接,但是实际上在下面的代码中做了
       ChannelFuture cf = c.getChannelFuture();
 
       for ( int  i =  1 ; i <=  3 ; i++ ){
          Request request =  new  Request();
          request.setId( ""  + i);
          request.setName( "pro"  + i);
          request.setRequestMessage( "数据信息"  + i);
          cf.channel().writeAndFlush(request);
          TimeUnit.SECONDS.sleep( 4 );
       }
 
       cf.channel().closeFuture().sync();
       
       //通道关闭后,通过另一个线程模拟客户端再次建立连接发送请求
       new  Thread( new  Runnable() {
          @Override
          public  void  run() {
             try  {
                System.out.println( "进入子线程..." );
                ChannelFuture cf = c.getChannelFuture();
                System.out.println(cf.channel().isActive());
                System.out.println(cf.channel().isOpen());
                
                //再次发送数据
                Request request =  new  Request();
                request.setId( ""  4 );
                request.setName( "pro"  4 );
                request.setRequestMessage( "数据信息"  4 );
                cf.channel().writeAndFlush(request);               
                cf.channel().closeFuture().sync();
                System.out.println( "子线程结束..." );
 
             catch  (InterruptedException e) {
                e.printStackTrace();
             }
          }
       }).start();
       
       System.out.println( "断开连接,主线程结束.." );
       
    }
    
    
    
}

这里对Client进行了初步的封装,采用静态内部类实现单例。

Client的Handler不仅仅有Marshalling的编解码器,还加入了Netty自带的ReadTimeoutHandler,这是客户端与服务端一段时间没有通信就断开连接的依据。从这里也看到Netty的强大之处了,通过提供一些预定义的Handler让你的代码变得简单,只需要专注于业务实现即可。客户端超时断开通道后,如何再次建立连接进行通信呢?通过getChannelFuture()你会知道。

客户端代码模拟了一个线程通信超时,关闭通道后,另一个线程与服务器端再次通信。



Server Handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package  day3;
 
import  io.netty.channel.ChannelHandlerAdapter;
import  io.netty.channel.ChannelHandlerContext;
 
public  class  ServerHandler  extends  ChannelHandlerAdapter{
 
    @Override
    public  void  channelRead(ChannelHandlerContext ctx, Object msg)  throws  Exception {
       Request request = (Request)msg;
       System.out.println( "Server : "  + request.getId() +  ", "  + request.getName() +  ", "  + request.getRequestMessage());
       Response response =  new  Response();
       response.setId(request.getId());
       response.setName( "response"  + request.getId());
       response.setResponseMessage( "响应内容"  + request.getId());
       ctx.writeAndFlush(response); //.addListener(ChannelFutureListener.CLOSE);
    }
 
    @Override
    public  void  exceptionCaught(ChannelHandlerContext ctx, Throwable cause)  throws  Exception {
       ctx.close();
    }
 
    
    
}


Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package  day3;
 
import  io.netty.bootstrap.ServerBootstrap;
import  io.netty.channel.ChannelFuture;
import  io.netty.channel.ChannelInitializer;
import  io.netty.channel.ChannelOption;
import  io.netty.channel.EventLoopGroup;
import  io.netty.channel.nio.NioEventLoopGroup;
import  io.netty.channel.socket.SocketChannel;
import  io.netty.channel.socket.nio.NioServerSocketChannel;
import  io.netty.handler.logging.LogLevel;
import  io.netty.handler.logging.LoggingHandler;
import  io.netty.handler.timeout.ReadTimeoutHandler;
 
public  class  Server {
 
    public  static  void  main(String[] args)  throws  Exception{
       
       EventLoopGroup pGroup =  new  NioEventLoopGroup();
       EventLoopGroup cGroup =  new  NioEventLoopGroup();
       
       ServerBootstrap b =  new  ServerBootstrap();
       b.group(pGroup, cGroup)
        .channel(NioServerSocketChannel. class )
        .option(ChannelOption.SO_BACKLOG,  1024 )
        //设置日志
        .handler( new  LoggingHandler(LogLevel.INFO))
        .childHandler( new  ChannelInitializer<SocketChannel>() {
          protected  void  initChannel(SocketChannel sc)  throws  Exception {
             sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
             sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
             sc.pipeline().addLast( new  ReadTimeoutHandler( 3 ));
             sc.pipeline().addLast( new  ServerHandler());
          }
       });
       
       ChannelFuture cf = b.bind( 8765 ).sync();
       
       cf.channel().closeFuture().sync();
       pGroup.shutdownGracefully();
       cGroup.shutdownGracefully();
       
    }
}



运行结果

wKiom1h63b_ROhaOAAB6W0wcmzc404.png

wKiom1h63c3ALRKyAAA7IlAsS98780.png


说明:由于客户端一开始是发送3条消息给服务端,但是每条消息发送间隔4S,由于超时设置为3S,于是发送第一条消息后,通道便断开连接。接下来,客户端又启动了一个线程再次与服务端通信。



到这里,这篇博客就结束了,对你有用吗?

下周我们再来看Netty在心跳检测方面的应用,^_^


本文转自zfz_linux_boy 51CTO博客,原文链接:http://blog.51cto.com/zhangfengzhe/1892045,如需转载请自行联系原作者
相关文章
|
算法 网络协议 Linux
Netty 中ChannelOption的含义以及使用的场景
Netty 中ChannelOption的含义以及使用的场景 转自:http://www.cnblogs.com/googlemeoften/p/6082785.html 1、ChannelOption.
1747 0
|
7月前
|
监控 Java Linux
由浅入深Netty基础知识NIO网络编程1
由浅入深Netty基础知识NIO网络编程
38 0
|
7月前
|
缓存 安全 Java
由浅入深Netty基础知识NIO三大组件原理实战 2
由浅入深Netty基础知识NIO三大组件原理实战
44 0
|
7月前
|
Java
由浅入深Netty基础知识NIO三大组件原理实战 1
由浅入深Netty基础知识NIO三大组件原理实战
59 0
|
2月前
|
移动开发 编解码 网络协议
用Java的BIO和NIO、Netty来实现HTTP服务器(三) 用Netty实现
用Java的BIO和NIO、Netty来实现HTTP服务器(三) 用Netty实现
|
2月前
|
编解码 网络协议 Java
用Java的BIO和NIO、Netty实现HTTP服务器(一) BIO与绪论
用Java的BIO和NIO、Netty实现HTTP服务器(一) BIO与绪论
|
7月前
|
存储 Java Docker
由浅入深Netty基础知识NIO网络编程 2
由浅入深Netty基础知识NIO网络编程
45 0
|
7月前
|
Java Maven Spring
使用netty实现nio web服务器
使用netty实现nio web服务器
59 0
|
3月前
|
设计模式 网络协议 Java
Java NIO 网络编程 | Netty前期知识(二)
Java NIO 网络编程 | Netty前期知识(二)
77 0