Java网络应用框架Netty简单入门法

这篇文章旨在采用简单的方式快速入门Java网络应用框架Netty。

Netty应用实例

首先给出本文分析的实例,采用自官方User Guide,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// Server
public class DiscardServer {
private int port;
public DiscardServer(int port) {
this.port = port;
}
public void run() throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup(1);
try {
ServerBootstrap b = new ServerBootstrap();
b.group(group)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new Handler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
int port = 8080;
new DiscardServer(port).run();
}
}
// Handler
public class Handler extends ChannelInboundHandlerAdapter {
@Override
// 方法在收到消息时被调用,msg就是收到的消息,然后被该方法"洗礼"
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
((ByteBuf) msg).release();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 出现异常就关闭连接
cause.printStackTrace();
ctx.close();
}
}

这是一个单线程模型,采用此实例便于和NIO.2实现的单线程多路复用做对比。

框架结构与NIO.2实现的多路复用进行对比

本小节主要将前文代码和站内文《再次更新的NIO:NIO2实现多路复用》中的代码进行对比。

在Netty框架里,NioServerSocketChannel对应Java API的ServerSocketChannel,所以.channel(NioServerSocketChannel.class)相当于ServerSocketChannel ssChannel = ServerSocketChannel.open();

.childHandler(new ChannelInitializer<SocketChannel>()...)这句相当于:

1
2
3
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);

至于socketChannel.pipeline().addLast(new Handler());主要就是用来处理监控到的事件,对标的代码是:

1
2
3
4
5
6
7
if (key.isReadable()) {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
sc.read(buffer);
buffer.flip();
sc.write(buffer);
}

.option.childOption一个是设置ServerSocketChannel对象,一个是设置SocketChannel对象。

可见Netty属于更高一层的封装,相比原本Java API层面的实现,的确清晰明了了许多。

代码中的选项设定

ChannelOption.SO_KEEPALIVE的含义

首先需要明白.childOption(ChannelOption.SO_KEEPALIVE, true);这句的作用是什么?

在《UNIX网络编程卷1:套接字联网API》这本书的相关章节已经有所描述。

When the keep-alive option is set for a TCP socket and no data has been exchanged across the socket in either direction for two hours, TCP automatically sends a keep-alive probe to the peer. This probe is a TCP segment to which the peer must respond.

也就是说对一个TCP socket设置了这个选项后,如果两个小时在该socket上的任意方向都没有数据交换,那么它就会网peer发送一个keep-alive probe,这个probe是一个segment对面peer必须响应。具体响应的细节可以参考书中的内容。由此也会产生TCP中已有SO_KEEPALIVE选项,为什么还要在应用层加入心跳包机制??的问题。大体上来说,前者判断连接的存活状态,但并不代表连接能正常传输数据。

所以源代码中的选项设置不是严格限定的,并不能取代应用级别的心跳检测。

ChannelOption.SO_BACKLOG的含义

这个选项在《UNIX网络编程卷1:套接字联网API》这本书的相关章节也有所描述。该书的84页写到,要理解backlog就必须认识到内核为任何一个给定的监听套接字维护两个队列:未完成连接队列和已完成连接队列。

TCP连接队列

该图片来自:http://www.cnxct.com/something-about-phpfpm-s-backlog/

这涉及到一些TCP协议的知识,本人将另起一篇文章单独进行说明和分析,目前只需知道backlog参数和已完成连接队列的大小有关。

Netty应用的收尾工作

涉及到的相关代码:

1
2
3
4
5
6
...
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}

sync()

先来看看sync():

1
2
3
4
5
/**
* Waits for this future until it is done, and rethrows the cause of the failure if this future
* failed.
*/
Future<V> sync() throws InterruptedException;

这是一个同步操作,它等待当前的future对象is done或者返回future发生错误的原因,在这里暂且不讨论该方法的具体实现。

b.bind(port)

b是一个ServerBootstrap,它继承自AbstractBootstrap,而bind(int)也是后者实现的方法,这个方法进一步封装了doBind(SocketAddress):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
...
}
}

因为sync()的存在,所以在这里暂时只用去解析if (regFuture.isDone())条件下的代码。doBind0方法会执行线程池进行绑定,来看看这里的channel实例是如何产生的。

它由ChannelFuture实例regFuture通过channel方法得到,而后者又产生自initAndRegister方法,此方法内存在一个赋值语句:

1
channel = channelFactory.newChannel();

那么这个channelFactory是何时初始化的呢?

不难猜到这和下面这段代码有关:

1
b.group(group).channel(NioServerSocketChannel.class)

接着就来验证一下,b是一个ServerBootstrap,它存在一个group方法,此方法主要用来传递group,也就是本人起初new的单线程池包装器对象(NioEventLoopGroup毕竟只是封装,不能等同于线程池)new NioEventLoopGroup(1)。group方法返回的是当前的ServerBootstrap实例,也就是b,相当于采用了构建者模式。同样的手法在channel方法再进行一次,channelFactory就在该方法内完成了赋值。一旦channelFactory调用newChannel(),那么传入的NioServerSocketChannel.class就通过反射产生了Channel对象。

so,当f.channel()时,得到的就是一个NioServerSocketChannel对象。

.closeFuture()

注释是这么说的:

1
2
3
4
/**
* Returns the {@link ChannelFuture} which will be notified when this
* channel is closed. This method always returns the same future instance.
*/

注意代码中语句和close()的区别,close()是主动关闭当前的Channel对象,而closeFuture()搭配sync()相当于监听Channel关闭的事件,如何关闭Netty程序可以参考技术贴:Shutdown netty programmatically

至此,实例中较难理解的部分就分析完毕。

小结

Netty随着版本的迭代,对很多方法和对象进行了高度的封装,但不要因为换了马甲就不识其原本的真正面目。将Netty代码和多路复用代码进行对照学习,是一种快速入门Netty框架的方式,本文末尾的参考添加了更多的Netty实例,体现了Netty的简单和强大。

参考

How to stop netty bootstrap from listening and accepting new connetions

Closing ChannelFuture after bind: I believe I found out…

Shutdown netty programmatically

UNIX网络编程 卷1:套接字联网API(第3版)

如何在连接闲置时发送心跳来维持连接

Netty聊天功能实例

Netty 实现 WebSocket 聊天功能

TCP中已有SO_KEEPALIVE选项,为什么还要在应用层加入心跳包机制??