基于Netty的IM简单实现原理

简介:

最近在开发MobIM,实现了消息传输和群等功能的IM功能。SDK功能包小,而功能全面。可以与原来的系统进行无缝整合。
自己抽空也实现了一套IM Server和IMClient的业务通信模式。没有实现复杂的UI界面,实现简单的登录注册,发消息,收消息。服务器端与客户端都使用Netty通信。
Netty基于非阻塞(nio),事件驱动的网络应用程序框架和工具。
通过Netty面对大规模的并发请求可以处理的得心用手。用来替代原来的bio网络应用请求框架。

BIO通信即平时使用的基于Socket,ServerSocket的InputStream和OutStream。
Netty神奇的地方在于是否是阻塞的。

while(true){
//主线程死循环等待新连接到来
 Socket socket = serverSocket.accept();
//为新的连接创建新的线程,客户端与服务器上的线程数1:1
 executor.submit(new ConnectIOnHandler(socket));

在BIO模型中,服务器通过ServerSocket来开启监听,每当有请求的时候开启一个线程来接受处理和维持状态。这种思想在低并发,小吞吐的应用还可以应付,一旦遇到大并发,大吞吐的请求,必然歇菜。线程和客户端保持着1:1的对应关系,维持着线程。维持那么的多的线程,JVM必然不堪重负,服务器必然崩溃,宕机。
而在非阻塞的Netty中,却可以应付自如。从容应对。Tomcat就是基于BIO的网络通信模式(Tomcat可以通过一定配置,改成非阻塞模式),而JBoss却是基于非阻塞的NIO实现。
NIO的网络通信模式很强劲,但是上手却一点都不容易。其中解决和牵扯到好多网络问题。如:网络延时,TCP的粘包/拆包,网络故障等一堆一堆的问题。而Netty呢,针对nio复杂的编程难题而进行一系列的封装实现,提供给广大开发者一套开源简单,方便使用的API类库,甚至青出于蓝而胜于蓝,甚至几乎完美的解决CPU突然飙升到100%的bug :http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6403933 (其实也没有真正的解决,只是把复现的概率降到了最低而已)。
用Netty来实现IM实在太合适了。可以在最短的时间里整出一套思路清晰,架构简明的IM通信底层模型。提下需求,底层用JSON 字符串String进行通信,对象通过JSON序列化成JSON String。收到JSON数据后再反序列化成对象。
首先,我们先看服务器是怎么实现的。

private static final StringDecoder DECODER = new StringDecoder();
    private static final StringEncoder ENCODER = new StringEncoder();
...
         //boss线程监听端口,worker线程负责数据读写
        bossGroup = new NioEventLoopGroup(1);
        workerGroup = new NioEventLoopGroup();
        //辅助启动类
        ServerBootstrap bootstrap = new ServerBootstrap();
        try {
            //设置线程池
            bootstrap.group(bossGroup, workerGroup);
            //设置socket工厂
        bootstrap.channel(NioServerSocketChannel.class);
            bootstrap.handler(new LoggingHandler(LogLevel.INFO));
            //设置管道工厂
            bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    //获取管道
                    ChannelPipeline pipe = socketChannel.pipeline();
                    
                    // Add the text line codec combination first,
                    pipe.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
                    // the encoder and decoder are static as these are sharable
                    //字符串编码器
                    pipe.addLast(DECODER);
                    //字符串解码器
                    pipe.addLast(ENCODER);
                    //业务处理类
                    pipe.addLast(new IMServerHandle());
                }
            });
            //绑定端口
            // Bind and start to accept incoming connections.
            ChannelFuture f = bootstrap.bind(port).sync();
            if (f.isSuccess()) {
                Log.debug("server start success... port: " + port + ", main work thread: "
                        + Thread.currentThread().getId());
            }
            ////等待服务端监听端口关闭
            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            //优雅退出,释放线程池资源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

以上是Netty服务器启动的代码。其中需要注意childHandler方法。需要把我们要添加的业务处理handler来添加到这里。通过ChannelPipeline 添加ChannelHandler。而处理字符串的就在IMServerHandle里实现。IMServerHandle继承了SimpleChannelInboundHandler类。其中泛型T就是要转换成的对象。客户端与服务器端通信是本质上通过字节码byte[]通信的,而通过StringDecoder 和StringEncoder工具类对byte[]进行转换,在IMServerHandle中获取到String进行处理即可。
看下IMServerHandle的实现方式。

/***
 * 面向IM通信操作的业务类
 * @author xhj
 *
 */
public class IMServerHandle extends SimpleChannelInboundHandler<String> {
    /**
     * user操作业务类
     */
    private UserBiz userBiz = new UserBiz();
    /***
     * 消息操作的业务类
     */
    private IMMessageBiz immessagebiz = new IMMessageBiz();
    
    /***
     * 处理接受到的String类型的JSON数据
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println(" get msg >> "+msg);
        //把JSON数据进行反序列化
         Request req = JSON.parseObject(msg, Request.class);
         Response respon = new Response();
         respon.setSendTime(System.currentTimeMillis());
         //判断是否是合法的请求
         if(req != null ) {
             System.out.println("the req method >> "+req.getMethod());
             //获取操作类型
             if(req.getMethod() == IMProtocol.LOGIN) {
                 //获取要操作的对象
                 User user = JSON.parseObject(req.getBody(),User.class);
                 //设置返回数据的操作类型
                 respon.setMethod(IMProtocol.LOGIN);
                 //执行业务操作
                 boolean bl = userBiz.login(user);
                 if(bl) {//检验用户有效
                     //设置响应数据
                     respon.setBody("login ok");
                     //设置状态
                     respon.setStatus(0);
                     //登录成功将连接channel保存到Groups里
                     ChannelGroups.add(ctx.channel());
                     //将用户的uname和channelId进行绑定,服务器向指定用户发送消息的时候需要用到uname和channelId
                     ChannelGroups.putUser(user.getUname(), ctx.channel().id());
                     //发送广播通知某人登录成功了
                     userBiz.freshUserLoginStatus(user);
                 } else {//用户密码错误
                     //设置错误描述
                     respon.setErrorStr("pwd-error");
                     //设置状态描述码
                     respon.setStatus(-1);
                 }
                 //将Response序列化为json字符串
                 msg = JSON.toJSONString(respon);
                 //发送josn字符串数据,注意后面一定要加"\r\n"
                 ctx.writeAndFlush(msg+"\r\n");
             } else if(req.getMethod() == IMProtocol.SEND) {
                 IMMessage immsg = JSON.parseObject(req.getBody(), IMMessage.class);
                 immsg.setSendTime(System.currentTimeMillis());     c

通过IMServerHandle可以十分方便的处理获取到的String字符串。处理完后,可以直接通过ChannelHandlerContext的writeAndFlush方法发送数据。
再看下Netty客户端如何实现。

private BlockingQueue<Request> requests = new LinkedBlockingQueue<>();

   /**
    * String字符串解码器
    */
private static final StringDecoder DECODER = new StringDecoder();

   /***
    * String字符串编码器
    */
private static final StringEncoder ENCODER = new StringEncoder();

   /**
    * 客户端业务处理Handler
    */
   private IMClientHandler clientHandler ;

   /**
    * 添加发送请求Request
    * @param request
    */
   public void addRequest(Request request) {
       try {
           requests.put(request);
       } catch (InterruptedException e) {
           e.printStackTrace();
       }
   }

   /**
    * 是否继续进行运行
    */
   private boolean run = true;

   public void run() {
       //远程IP
       String host = "172.20.10.7";
       //端口号
       int port = 10000;
       //工作线程
       EventLoopGroup workerGroup = new NioEventLoopGroup();
       try {
           //辅助启动类
           Bootstrap b = new Bootstrap(); // (1)
           //设置线程池
           b.group(workerGroup); // (2)
           //设置socket工厂 不是ServerSocket而是Socket
           b.channel(NioSocketChannel.class); // (3)
           b.handler(new LoggingHandler(LogLevel.INFO));
           //设置管道工厂
           b.handler(new ChannelInitializer<SocketChannel>() {
               public void initChannel(SocketChannel ch) throws Exception {
                   ChannelPipeline pipe = ch.pipeline();
                   // Add the text line codec combination first,
                   pipe.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
                   // the encoder and decoder are static as these are sharable
                   //字符串解码器
                   pipe.addLast(DECODER);
                   //字符串编码器
                   pipe.addLast(ENCODER);
                   clientHandler = new IMClientHandler();
                   //IM业务处理类
                   pipe.addLast(clientHandler);
               }
           });

           // Start the client.
           ChannelFuture f = b.connect(host, port).sync(); // (5)

           Channel ch = f.channel();
           ChannelFuture lastWriteFuture = null;
           while(run) {
               //将要发送的Request转化为JSON String类型
               String line = JSON.toJSONString(requests.take());
               if(line != null && line.length() > 0) {//判断非空
                   // Sends the received line to the server.
                   //发送数据到服务器
                   lastWriteFuture = ch.writeAndFlush(line + "\r\n");
               }
           }
           // Wait until all messages are flushed before closing the channel.
           //关闭写的端口
           if (lastWriteFuture != null) {
               lastWriteFuture.sync();
           }
       } catch(Exception ex){
           ex.printStackTrace();
       } finally {
           //优雅的关闭工作线程
           workerGroup.shutdownGracefully();
       }
   }

   /**
    * 增加消息监听接受接口
    * @param messgeReceivedListener
    */
   public void addMessgeReceivedListener(MessageSender.MessgeReceivedListener messgeReceivedListener) {
       clientHandler.addMessgeReceivedListener(messgeReceivedListener);
   }

   /***
    *  移除消息监听接口
    * @param messgeReceivedListener
    */
   public void remove(MessageSender.MessgeReceivedListener messgeReceivedListener) {
       clientHandler.remove(messgeReceivedListener);
   }

Netty的client端实现和Server实现方式大同小异。比Server端要简要些了。少一个NIOEventLoop。在Bootstrap 的handle方法中增加ChannelInitializer初始化监听器,并增加了IMClientHandler的监听操作。其中IMClientHandler具体处理服务器返回的通信信息。
通过ChannelFuture 获取Channel,通过Channel在一个循环里发送请求。如果消息队列BlockingQueue非空的时候,获取Request并发送。以上发送,如何接受数据呢?接受到的json被反序列化直接变成了对象Response,对Response进行处理即可。
定义了一个消息接受到的监听接口。

public static interface MessgeReceivedListener {
    public void onMessageReceived(Response msg);
    public void onMessageDisconnect();
    public void onMessageConnect();
}

在接口onMessageReceived方法里直接对获取成功的响应进行处理。

而服务器端对某个客户端进行发送操作,把Channel添加到ChannelGroup里,将uname和channelid对应起来。需要对某个用户发送消息的时候通过uname获取channelid,通过channelid从ChannelGroup里获取channel,通过channel发送即可。
具体操作如下:

public void transformMessage(IMMessage message) {
        
        Channel channel = ChannelGroups.getChannel(ChannelGroups.getChannelId(message.getTo()));
        if(channel != null && channel.isActive()) {
            Response response = new Response();
            response.setBody(JSON.toJSONString(message));
            response.setStatus(0);
            response.setMethod(IMProtocol.REV);
            response.setSendTime(System.currentTimeMillis());
            channel.writeAndFlush(JSON.toJSON(response)+"\r\n");
        }
        
    }
ChannelGroups的代码实现:
public class ChannelGroups {

    private static final Map<String,ChannelId> userList = new ConcurrentHashMap();
    
    private static final ChannelGroup CHANNEL_GROUP = new DefaultChannelGroup("ChannelGroups",
            GlobalEventExecutor.INSTANCE);

    public static void putUser(String uname,ChannelId id) {
        userList.put(uname,id);
    }

通过以上代码解析应该对IM的通信模式有了比较全面的认识。具体实现过程可以下载源代码进行查看。欢迎大家反馈提出问题。
https://github.com/sinxiao/NettyIMServerAndAndroidClient
_1
运行效果图。

目录
相关文章
|
10月前
|
存储 缓存 NoSQL
跟着源码学IM(十一):一套基于Netty的分布式高可用IM详细设计与实现(有源码)
本文将要分享的是如何从零实现一套基于Netty框架的分布式高可用IM系统,它将支持长连接网关管理、单聊、群聊、聊天记录查询、离线消息存储、消息推送、心跳、分布式唯一ID、红包、消息同步等功能,并且还支持集群部署。
13146 1
|
4月前
|
JSON 算法 Dubbo
Netty入门实践-模拟IM聊天
本文以入门实践为主,通过原理+代码的方式,实现一个简易IM聊天功能。分为2个部分:Netty的核心概念、IM聊天简易实现。
|
4月前
|
NoSQL Java Redis
跟着源码学IM(十二):基于Netty打造一款高性能的IM即时通讯程序
关于Netty网络框架的内容,前面已经讲了两个章节,但总归来说难以真正掌握,毕竟只是对其中一个个组件进行讲解,很难让诸位将其串起来形成一条线,所以本章中则会结合实战案例,对Netty进行更深层次的学习与掌握,实战案例也并不难,一个非常朴素的IM聊天程序。 原本打算做个多人斗地主练习程序,但那需要织入过多的业务逻辑,因此一方面会带来不必要的理解难度,让案例更为复杂化,另一方面代码量也会偏多,所以最终依旧选择实现基本的IM聊天程序,既简单,又能加深对Netty的理解。
85 1
|
网络协议 前端开发 Java
SpringBoot+Netty开发IM即时通讯系列(一)
简单来讲,Netty是一个提供了易于使用的API的客户端/服务端框架。Netty并发非常高,一个非阻塞的IO,Netty传输速度也非常快,因为他是0拷贝,什么是零拷贝?NIO中的特性之一就是零拷贝,在Java中,内存分为堆和栈以及字符串常量值等等,如果有一些数据从IO中读取并且放到堆里面,中间会经过一些缓冲区。
938 0
SpringBoot+Netty开发IM即时通讯系列(一)
|
12月前
|
消息中间件 前端开发 JavaScript
太顶了,使用 Netty 实现了一个 IM 即时通讯系统
太顶了,使用 Netty 实现了一个 IM 即时通讯系统
|
移动开发 安全 网络协议
手把手教你为基于Netty的IM生成自签名SSL/TLS证书
本文要分享的是如何使用OpenSSL生成在基于Netty的IM中真正可用的SSL/TLS证书,内容包括:证书的创建、创建过程中的注意点,以及在Server端、Android端、iOS端、Java桌面端、H5端使用证书的代码范例。
353 0
手把手教你为基于Netty的IM生成自签名SSL/TLS证书
|
存储 编解码 安全
基于Netty的IM聊天加密技术学习:一文理清常见的加密概念、术语等
本文正好借此机会,以Netty编写的IM聊天加密为例,为入门者理清什么是PKI体系、什么是SSL、什么是OpenSSL、以及各类证书和它们间的关系等,并在文末附上简短的Netty代码实示例,希望能助你通俗易懂地快速理解这些知识和概念!
184 0
基于Netty的IM聊天加密技术学习:一文理清常见的加密概念、术语等
|
前端开发 JavaScript API
SpringBoot+Netty开发IM即时通讯系列(二)
通过JS以Ajax异步地让浏览器每隔一段时间(10S)发送请求到后端,去询问服务端是否有新消息、新状态等,如果有则取出并通过前端再渲染。但这很容易造成无限循环,也就是前端Ajax会不停地循环后端的数据
435 0
SpringBoot+Netty开发IM即时通讯系列(二)
|
缓存 安全 NoSQL
基于Netty,从零开发IM(四):编码实践篇(系统优化)
虽然 Netty 的性能很高,但是也不能保证随意写出来的项目就是性能很高的,所以本篇将主要讲解几个基于Netty的IM系统的优化实战技术点。
183 0
基于Netty,从零开发IM(四):编码实践篇(系统优化)
|
存储 Java Go
基于Netty,从零开发IM(三):编码实践篇(群聊功能)
接上两篇《IM系统设计篇》、《编码实践篇(单聊功能)》,本篇主要讲解的是通过实战编码实现IM的群聊功能,内容涉及群聊技术实现原理、编码实践等知识。
182 0
基于Netty,从零开发IM(三):编码实践篇(群聊功能)