研究了netty的服务端创建过程。至于netty的优势,可以参照网络其他文章。《Netty系列之Netty 服务端创建》是 李林锋撰写的netty源码分析的一篇好文,绝对是技术干货。但抛开技术来说,也存在一些瑕疵。
缺点如下
-
代码衔接不连贯,上下不连贯。
-
代码片段是截图,对阅读代理不便(可能和阅读习惯有关)
本篇主要内容,参照《Netty系列之Netty 服务端创建》,梳理出自己喜欢的阅读风格。
1.整体逻辑图
整体将服务端创建分为2部分:(1)绑定端口,提供服务过程;(2)轮询网络请求
1.1 绑定端口序列图
1.2 类图
类图仅仅涵盖了绑定过程中比较重要的几个组件
1.3 代码分析
step 2 doBind 绑定本地端口,启动服务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
private
ChannelFuture doBind(
final
SocketAddress localAddress) {
final
ChannelFuture regFuture = initAndRegister();
//1
final
Channel channel = regFuture.channel();
if
(regFuture.cause() !=
null
) {
return
regFuture;
}
final
ChannelPromise promise;
if
(regFuture.isDone()) {
promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
//2
}
else
{
// Registration future is almost always fulfilled already, but just in case it's not.
promise =
new
DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE);
regFuture.addListener(
new
ChannelFutureListener() {
@Override
public
void
operationComplete(ChannelFuture future)
throws
Exception {
doBind0(regFuture, channel, localAddress, promise);
//2
}
});
}
return
promise;
}
|
主要分为2个处理单元
step3 initAndRegister
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
final
ChannelFuture initAndRegister() {
Channel channel;
try
{
channel = createChannel();
}
catch
(Throwable t) {
return
VoidChannel.INSTANCE.newFailedFuture(t);
}
try
{
init(channel);
}
catch
(Throwable t) {
channel.unsafe().closeForcibly();
return
channel.newFailedFuture(t);
}
//注册NioServerSocketChannel到Reactor线程的多路复用器上
ChannelPromise regFuture = channel.newPromise();
channel.unsafe().register(regFuture);
if
(regFuture.cause() !=
null
) {
if
(channel.isRegistered()) {
channel.close();
}
else
{
channel.unsafe().closeForcibly();
}
}
return
regFuture;
}
|
createChannel由子类ServerBootstrap实现,创建新的NioServerSocketChannel,并完成Channel初始化,以及注册。
4.ServerBootstrap.createChannel
1
2
3
4
|
Channel createChannel() {
EventLoop eventLoop = group().next();
return
channelFactory().newChannel(eventLoop, childGroup);
}
|
它有两个参数,参数1是从父类的NIO线程池中顺序获取一个NioEventLoop,它就是服务端用于监听和接收客户端连接的Reactor线程。第二个参数就是所谓的workerGroup线程池,它就是处理IO读写的Reactor线程组。
5.ServerBootstrap.init
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
void
init(Channel channel)
throws
Exception {
//设置Socket参数和NioServerSocketChannel的附加属性
final
Map<ChannelOption<?>, Object> options = options();
synchronized
(options) {
channel.config().setOptions(options);
}
final
Map<AttributeKey<?>, Object> attrs = attrs();
synchronized
(attrs) {
for
(Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings
(
"unchecked"
)
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
//将AbstractBootstrap的Handler添加到NioServerSocketChannel的ChannelPipeline中
ChannelPipeline p = channel.pipeline();
if
(handler() !=
null
) {
p.addLast(handler());
}
final
ChannelHandler currentChildHandler = childHandler;
final
Entry<ChannelOption<?>, Object>[] currentChildOptions;
final
Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized
(childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized
(childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
//将用于服务端注册的Handler ServerBootstrapAcceptor添加到ChannelPipeline中
p.addLast(
new
ChannelInitializer<Channel>() {
@Override
public
void
initChannel(Channel ch)
throws
Exception {
ch.pipeline().addLast(
new
ServerBootstrapAcceptor(currentChildHandler, currentChildOptions,
currentChildAttrs));
}
});
}
|
到此处,Netty服务端监听的相关资源已经初始化完毕。
6.AbstractChannel.AbstractUnsafe.register
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
public
final
void
register(
final
ChannelPromise promise) {
//首先判断是否是NioEventLoop自身发起的操作,如果是,则不存在并发操作,直接执行Channel注册;
if
(eventLoop.inEventLoop()) {
register0(promise);
}
else
{
//如果由其它线程发起,则封装成一个Task放入消息队列中异步执行。
try
{
eventLoop.execute(
new
Runnable() {
@Override
public
void
run() {
register0(promise);
}
});
}
catch
(Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}"
,
AbstractChannel.
this
, t);
closeForcibly();
closeFuture.setClosed();
promise.setFailure(t);
}
}
}
|
7.register0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
private
void
register0(ChannelPromise promise) {
try
{
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if
(!ensureOpen(promise)) {
return
;
}
doRegister();
registered =
true
;
promise.setSuccess();
pipeline.fireChannelRegistered();
if
(isActive()) {
//完成绑定时,不会调用该代码段
pipeline.fireChannelActive();
}
}
catch
(Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
if
(!promise.tryFailure(t)) {
logger.warn(
"Tried to fail the registration promise, but it is complete already. "
+
"Swallowing the cause of the registration failure:"
, t);
}
}
}
|
触发事件
8.doRegister
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
protected
void
doRegister()
throws
Exception {
boolean
selected =
false
;
for
(;;) {
try
{
//将NioServerSocketChannel注册到NioEventLoop的Selector上
selectionKey = javaChannel().register(eventLoop().selector,
0
,
this
);
return
;
}
catch
(CancelledKeyException e) {
if
(!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected =
true
;
}
else
{
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw
e;
}
}
}
}
|
大伙儿可能会很诧异,应该注册OP_ACCEPT(16)到多路复用器上,怎么注册0呢?0表示只注册,不监听任何网络操作。这样做的原因如下:
注册方法是多态的,它既可以被NioServerSocketChannel用来监听客户端的连接接入,也可以用来注册SocketChannel,用来监听网络读或者写操作;
通过SelectionKey的interestOps(int ops)方法可以方便的修改监听操作位。所以,此处注册需要获取SelectionKey并给AbstractNioChannel的成员变量selectionKey赋值。
本文转自 randy_shandong 51CTO博客,原文链接:http://blog.51cto.com/dba10g/1863497,如需转载请自行联系原作者