Netty学习6-源码跟踪ChannelPipeline和ChanelHandler工作原理

  1. 云栖社区>
  2. 博客>
  3. 正文

Netty学习6-源码跟踪ChannelPipeline和ChanelHandler工作原理

徐胖子 2016-12-24 16:13:00 浏览2316
展开阅读全文

1 概述

Netty中的ChannelPipeline类似于servlet,chanelHandler类似于filter。这类拦截器就是职责链设计模式,主要是事件拦截和用户业务逻辑定制。演示代码采用的是netty 3.10.5版本。调试步骤和示例代码如下:

步骤1 下载完成后导入为maven项目。
步骤2 需要测试的项目在configure build path时不要直接导入netty3的jar包,直接导入project项目。
步骤3 对关注的方法打断点。
步骤4 跟踪调用链。
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.string.StringDecoder;

/**
 * netty服务端入门
 */
public class Server {

	public static void main(String[] args) {

		// 服务类
		ServerBootstrap bootstrap = new ServerBootstrap();

		// boss线程监听端口,worker线程负责数据读写
		ExecutorService boss = Executors.newCachedThreadPool();
		ExecutorService worker = Executors.newCachedThreadPool();

		// 设置niosocket工厂
		bootstrap.setFactory(new NioServerSocketChannelFactory(boss, worker));

		// 设置管道的工厂
		bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
			@Override
			public ChannelPipeline getPipeline() throws Exception {
				ChannelPipeline pipeline = Channels.pipeline();
				pipeline.addLast("decoder", new StringDecoder());
				pipeline.addLast("xyHelloHandler", new HelloHandler());
				return pipeline;
			}
		});

		bootstrap.bind(new InetSocketAddress(10101));
		System.out.println("server3 start!!!");
	}
}


import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;

/**
 * 消息接受处理类
 */
public class HelloHandler extends SimpleChannelHandler {

	// 接收消息
	@Override
	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
			throws Exception {

		String messageReceived = (String) e.getMessage();
		System.out.println(messageReceived);
		super.messageReceived(ctx, e);
	}

	// 捕获异常
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
			throws Exception {
		System.out.println("exceptionCaught");
		super.exceptionCaught(ctx, e);
	}

	// 新连接
	@Override
	public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
			throws Exception {
		System.out.println("channelConnected");
		super.channelConnected(ctx, e);
	}

	// 必须是链接已经建立,关闭通道的时候才会触发
	@Override
	public void channelDisconnected(ChannelHandlerContext ctx,
			ChannelStateEvent e) throws Exception {
		System.out.println("channelDisconnected");
		super.channelDisconnected(ctx, e);
	}

	// channel关闭的时候触发
	@Override
	public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
			throws Exception {
		System.out.println("channelClosed");
		super.channelClosed(ctx, e);
	}
}

handler是通过pipeline的addLast方法添加的,那首先将断点定位该处。

bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
	@Override
	public ChannelPipeline getPipeline() throws Exception {
		ChannelPipeline pipeline = Channels.pipeline();
		pipeline.addLast("decode", new StringDecoder());
		pipeline.addLast("xyHelloHandler", new HelloHandler());
		return pipeline;
	}
});

2 server启动

在示例代码中,Debug方式运行Server,耐心一步步跟着断点往下走,注意查看调用链。

A DefaultChannelPileline@addLast


断点如约而至,从调用链中可看出入口是ServerBootStrap的bindAsync方法。

public ChannelFuture bindAsync(final SocketAddress localAddress) {
      if (localAddress == null) {
          throw new NullPointerException("localAddress");
      }
      Binder binder = new Binder(localAddress);
      ChannelHandler parentHandler = getParentHandler();


      ChannelPipeline bossPipeline = pipeline();
      bossPipeline.addLast("binder", binder);


      if (parentHandler != null) {
          bossPipeline.addLast("userHandler", parentHandler);
      }
}

bossPipeline.addLast("binder", binder)赋值了一个名为binder的处理器。继续跟进addLast方法。

private final Map<String, DefaultChannelHandlerContext> name2ctx =new HashMap<String, DefaultChannelHandlerContext>(4);
public synchronized void addLast(String name, ChannelHandler handler) {
        if (name2ctx.isEmpty()) {
            init(name, handler);
        } else {
            checkDuplicateName(name);
            DefaultChannelHandlerContext oldTail = tail;
            DefaultChannelHandlerContext newTail = new DefaultChannelHandlerContext(oldTail, null, name, handler);
            callBeforeAdd(newTail);
            oldTail.next = newTail;
            tail = newTail;
            name2ctx.put(name, newTail);
            callAfterAdd(newTail);
        }
}
B DefaultChannelPileline@init

name2ctx是保存handlerContext的集合对象,此时是空的。断点走进init方法,逐行分析方法内的这几行代码。


private void init(String name, ChannelHandler handler) {
    DefaultChannelHandlerContext ctx = new DefaultChannelHandlerContext(null, null, name, handler);
    callBeforeAdd(ctx);
    head = tail = ctx;
    name2ctx.clear();
    name2ctx.put(name, ctx);
    callAfterAdd(ctx);
}

第1行 声明了DefaultChannelHandlerContext对象,传入name值是binder,handler是名为binder的处理器,这两个值初始化的地方就在上文看到的调用链的BootStrap的bind方法中。先看DefaultChannelHandlerContext该类使用到的属性。很明显这是链表的数据结构,prev指向上个对象,next指向下个对象。

private final class DefaultChannelHandlerContext implements ChannelHandlerContext {
     volatile DefaultChannelHandlerContext next;
     volatile DefaultChannelHandlerContext prev;
     private final String name;
     private final ChannelHandler handler;
}

第2行 binder并不是LifeCycleAwareChannelHandler类型,直接return掉。

private static void callBeforeAdd(ChannelHandlerContext ctx) {
    if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) {
        return;
}

第3行 将链表中的第1个head和最后一个tail都赋值给了第1行中声明的对象。

private volatile DefaultChannelHandlerContext head;
private volatile DefaultChannelHandlerContext tail;

第4行 清除Map对象中的值。

第5行 将第1行声明的对象赋值给map集合。

第6行 binder并不是LifeCycleAwareChannelHandler类型,直接return掉。第一个需要关注的断点到此结束。

private void callAfterAdd(ChannelHandlerContext ctx) {
    if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) {
        return;
    }
}

3 client启动

利用telnet启动一个客户端。telnet 127.0.0.1 10101。这次设置断点在位置Server类中的Channels.pipeline()位置

A channels.pipeline()


新生成了一个ChannelPipeline对象。从调用链可看出【1】boss线程池负责接收连接。【2】上游是NioServerBoss@registerAcceptedChannel。在建立连接时,该ChannelPipeline的所有handler将被设置完成。

private static void registerAcceptedChannel(NioServerSocketChannel parent, SocketChannel acceptedSocket,Thread currentThread) {
try {
          ChannelSink sink = parent.getPipeline().getSink();
          ChannelPipeline pipeline =parent.getConfig().getPipelineFactory().getPipeline();
    }
}
B pipeline.addLast(decoder)

添加StringDecoder这个处理器,流程和4.2中的所有步骤一致,都是走init方法,同样在callBeforeAdd和callAfterAdd中return了。唯一不同的是name叫做decoder。

C pipeline.addLast(xyHelloHandler)

name2ctx不再empty,进入else分支。


checkDuplicateName检查是否有重名,有重名则直接抛出异常。
oldTail表示原来尾对象,新申明的xyHelloHandler将被作为新尾对象。再把原oldTail的next指向xyHelloHandler,再放入name2ctx集合。

D AbstractNioSelector@select

继续跟着断点走,会走到AbstractNioSelector的select方法,select会阻塞,等待下一次事件。


4 client发送信息

在telnet窗口按下ctrl+] 会进入telnet的发送窗口。现在关注以下2个方法的调用链,StringDecoder@decode、HelloHandler@messageReceived断点到这两个方法。

A StringDecoder@decode



从NioWorker的read方法开始,执行到DefaultChannelPipeline的sendUpstream,首先执行该pipeline的head的handler,而这个管道的head handler是StringDecoder。

  public void sendUpstream(ChannelEvent e) {
        DefaultChannelHandlerContext head = getActualUpstreamContext(this.head);
        if (head == null) {
            if (logger.isWarnEnabled()) {
                logger.warn(
                        "The pipeline contains no upstream handlers; discarding: " + e);
            }
            return;
        }
        sendUpstream(head, e);
}

进入sendUpstream(head, e)方法,会先调用StringDecoder父类OnetoOneDecoder的handleUpStream()方法。

    public void handleUpstream(
            ChannelHandlerContext ctx, ChannelEvent evt) throws Exception {
        if (!(evt instanceof MessageEvent)) {
            ctx.sendUpstream(evt);
            return;
        }
        MessageEvent e = (MessageEvent) evt;
        Object originalMessage = e.getMessage();
        Object decodedMessage = decode(ctx, e.getChannel(), originalMessage);
        if (originalMessage == decodedMessage) {
            ctx.sendUpstream(evt);
        } else if (decodedMessage != null) {
            fireMessageReceived(ctx, decodedMessage, e.getRemoteAddress());
        }
    }

父类方法调用子类的decode方法,如此这般StringDecoder的decode方法就被调用到了。

B HelloHandler@messageReceived

注意上述方法中还调用fireMessageReceived(ctx, decodedMessage, e.getRemoteAddress())。一层一层进入该方法,最终会执行到DefaultChannelPipeline的内部类DefaultChannelHandlerContext的sendUpstream方法

        public void sendUpstream(ChannelEvent e) {
            DefaultChannelHandlerContext next = getActualUpstreamContext(this.next);
            if (next != null) {
                DefaultChannelPipeline.this.sendUpstream(next, e);
            }
        }
next相当于获取了head的下一个handler对象即HelloHanlder。当然helloHandler中也有所谓的链式调用,继续调用fireMessageReceived,若它的next还指向其他handler则会继续调用。
	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
			throws Exception {
		String messageReceived = (String) e.getMessage();
		System.out.println(messageReceived);
		super.messageReceived(ctx, e);
	}


5 简要流程图


流程图参考地址 http://blog.csdn.net/zxhoo/article/details/17264263

网友评论

登录后评论
0/500
评论
徐胖子
+ 关注