再次更新的NIO:NIO2实现多路复用

本篇将用NIO2实现一个简单的IO多路复用程序并分析它,其中涉及到了ServerSocketChannel的configureBlocking()、Selector的select()、ServerSocketChannel的accept()等方法的理解。

单线程模型实例

先放上代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class ServerNio2 {
public static void main(String[] args) throws IOException {
ServerSocketChannel ssChannel = ServerSocketChannel.open();
int port = 1024;
ssChannel.bind(new InetSocketAddress(port));
Selector selector = Selector.open();
ssChannel.configureBlocking(false);
ssChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
for (SelectionKey key : keys) {
if (key.isAcceptable()) {
ServerSocketChannel ssc = (ServerSocketChannel) key
.channel();
// 常常要判断sc是否为null,但这里用key规避掉了
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
}
if (key.isReadable()) {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
sc.read(buffer);
buffer.flip();
sc.write(buffer);
}
}
keys.clear();
}
}
}

命令行执行如下代码:

1
yin@chenguoMacBook-Pro:~$telnet 127.0.0.1 1024

输入想要打印的内容会出现echo效果。

这段代码有三个比较重要的节点,分别是configureBlocking()的调用、selector.select()的调用以及ssc.accept()的调用,后续讲对其进行展开分析。

configureBlocking()

所谓非阻塞同步的非阻塞,说的就是这里。在Stackoverflow: Socket Channel configureBlocking to false下的答案给出了该方法的使用说明,通道默认是阻塞模式(也就是true),调用每个I/O操作都将引起通道的阻塞,直到操作完成。在register()之前应该设置其为false,如果取消了注册,通道可能不会返回阻塞模式。

selector.select()

接下来看看select()调用的底层脉络。

Selector是抽象类,代码中是通过Selector.open()来返回具体实例的:

1
2
3
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}

查看provider()代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}

当provider为空则通过provider = sun.nio.ch.DefaultSelectorProvider.create();产生实例。继续追踪有:

1
2
3
public static SelectorProvider create() {
return new KQueueSelectorProvider();
}

以及:

1
2
3
public AbstractSelector openSelector() throws IOException {
return new KQueueSelectorImpl(this);
}

因为本实验环境为MacOX,所以Selector.open()返回的对象是一个KQueueSelectorImpl,它是一个Selector实现类。该类方法select()的实现在抽象父类SelectorImpl中,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public int select(long var1) throws IOException {
if (var1 < 0L) {
throw new IllegalArgumentException("Negative timeout");
} else {
return this.lockAndDoSelect(var1 == 0L ? -1L : var1);
}
}
public int select() throws IOException {
return this.select(0L);
}
public int selectNow() throws IOException {
return this.lockAndDoSelect(0L);
}

继续查看lockAndDoSelect()有:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private int lockAndDoSelect(long var1) throws IOException {
synchronized(this) {
if (!this.isOpen()) {
throw new ClosedSelectorException();
} else {
Set var4 = this.publicKeys;
int var10000;
synchronized(this.publicKeys) {
Set var5 = this.publicSelectedKeys;
synchronized(this.publicSelectedKeys) {
var10000 = this.doSelect(var1);
}
}
return var10000;
}
}
}

这个doSelect()经由KQueueSelectorImpl实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
protected int doSelect(long var1) throws IOException {
boolean var3 = false;
if (this.closed) {
throw new ClosedSelectorException();
} else {
this.processDeregisterQueue();
int var7;
try {
this.begin();
var7 = this.kqueueWrapper.poll(var1);
} finally {
this.end();
}
this.processDeregisterQueue();
return this.updateSelectedKeys(var7);
}
}

poll()源码为:

1
2
3
4
5
int poll(long var1) {
this.updateRegistrations();
int var3 = this.kevent0(this.kq, this.keventArrayAddress, 128, var1);
return var3;
}

kevent0()是一个native方法:private native int kevent0(int kq, long keventAddress, int keventCount, long timeout);

本地方法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
JNIEXPORT jint JNICALL
Java_sun_nio_ch_KQueueArrayWrapper_kevent0(JNIEnv *env, jobject this, jint kq,
jlong kevAddr, jint kevCount,
jlong timeout)
{
struct kevent *kevs = (struct kevent *)jlong_to_ptr(kevAddr);
struct timespec ts;
struct timespec *tsp;
int result;
// Java timeout is in milliseconds. Convert to struct timespec.
// Java timeout == -1 : wait forever : timespec timeout of NULL
// Java timeout == 0 : return immediately : timespec timeout of zero
if (timeout >= 0) {
ts.tv_sec = timeout / 1000;
ts.tv_nsec = (timeout % 1000) * 1000000; //nanosec = 1 million millisec
tsp = &ts;
} else {
tsp = NULL;
}
result = kevent(kq, NULL, 0, kevs, kevCount, tsp);
if (result < 0) {
if (errno == EINTR) {
// ignore EINTR, pretend nothing was selected
result = 0;
} else {
JNU_ThrowIOExceptionWithLastError(env, "KQueueArrayWrapper: kqueue failed");
}
}
return result;
}

结合之前本人关于Kqueue的文章,就不难理解这段代码的原理,select()底层还是调用的kevent(),所以Java NIO.2的相关方法是对底层的更高级抽象。注释表明,Java的timeout参数小于0时,相当于给kevent()的timeout参数传入NULL,表示一直等待。

ssc.accept()

ServerSocketChannel是一个可以监听新的TCP连接的通道。看看该通道accept()的注释:

Accepts a connection made to this channel’s socket.

If this channel is in non-blocking mode then this method will
immediately return null if there are no pending connections.
Otherwise it will block indefinitely until a new connection is available
or an I/O error occurs.

The socket channel returned by this method, if any, will be in
blocking mode regardless of the blocking mode of this channel.

也就是说,在非阻塞模式,如果没有连接将立即返回,在阻塞模式将一直阻塞等待新的可用连接或者一个I/O错误的发生。

返回的SocketChannel通道将为阻塞模式,无论ServerSocketChannel方法调用之前是什么模式。

小结

本文分析的多路复用程序可以看作是Epoll、Kqueue在Java层面的实现,Java将底层的系统调用进行了很好的封装。本篇文章为后续Netty框架的学习进一步打下了基础。

参考

Socket Channel configureBlocking to false

Stackoverflow: What is “jobject thiz” in JNI and what is it used for?