关于Java NIO的Selector

[toc]

Selector 简介

Selector(选择器)是 Channel 的多路复用器,它可以同时监控多个 Channel 的 IO 状况,允许单个线程来操作多个 Channel。如下:

img

Selector 的作用是什么?

Selector 提供选择执行已经就绪的任务的能力。从底层来看,Selector 提供了询问 Channel 是否已经准备好执行每个 I/O 操作的能力。Selector 允许单线程处理多个 Channel。仅用单个线程来处理多个 Channels 的好处是,只需要更少的线程来处理 Channel 。事实上,可以只用一个线程处理所有的通道,这样会大量的减少线程之间上下文切换的开销。

Selector 的使用

使用 Selector 的主要流程如下:

  1. 打开 Selector
  2. 将 Channel 注册到 Selector 中,并设置要监听的事件
  3. 轮询处理 IO 操作

打开 Selector

和 SocketChannel 相似,调用 Selector.open() 就可以打开一个选择器实例。

1
Selector selector = Selector.open();

注册 Selector

为了将 Channel 和 Selector 配合使用,我们需要将 Channel 注册到对应的 Selector 上,调用 SelectableChannel.register() 方法来实现。

1
2
channel.configureBlocking(false);
SelectionKey key = channel.register(selector,Selectionkey.OP_ACCEPT);

这里有一个要注意的地方,所有注册到 Selector 中的 Channel 都必须是非阻塞的。怎么判断 Channel 是否可以设置为非阻塞呢?判断它是否继承了SelectableChannel,SelectableChannel 是一个抽象类,它提供了实现 Channel 的可选择性所需要的公共方法。而 FileChannel 没有继承 SelectableChannel ,所以它不能使用 Selector。

register() 提供了两个参数,一个是要注册的 Selector 是谁,第二个参数是对什么事件感兴趣。事件类型有四种:

  • 连接 : SelectionKey.OP_CONNECT

  • 接收 : SelectionKey.OP_ACCEPT

  • 可读 : SelectionKey.OP_READ

  • 可写 : SelectionKey.OP_WRITE

如果感兴趣的事件不止一个,则可以使用“位运算 | ” 来组合多个事件,如: SelectionKey.OP_CONNECT | SelectionKey.OP_ACCEPT

需要提醒的是,Selector 关注的不是 Channel 的操作,而是 Channel的某个操作的一种就绪状态。一旦 Channel 具备完成某个操作的条件,表示该 Channel 的某个操作已经就绪,就可以被 Selector 查询到,程序可以对该 Channel 进行对应的操作。比如说,某个 SocketChannel 可以连接到一个服务器,则处于“连接就绪”(OP_CONNECT)。某给 ServerSocketChannel 可以接收新的连接,则处理“接收就绪”(SelectionKey.OP_ACCEPT)。

轮询处理 IO 操作

将 Channel 注册到 Selector 并关注相对应的时间后,就可以轮询处理 IO 事件了。

Selector 提供了方法 select(),该方法可以查询出已经就绪的 Channel操作,如果没有事件发生,则该方法会一直阻塞,直到有事件。select() 有三个重载方法:

  • select(): 阻塞到至少有一个通道在你注册的事件上就绪了。·
  • select(long timeout):和select()一样,但最长阻塞事件为timeout毫秒。
  • selectNow():非阻塞,只要有通道就绪就立刻返回。

select() 返回值为 int 类型,该值表示有多少 Channel 的操作已经就绪,更准确地说是上一次 select() 到这一次 select() 方法之间的时间段内,有多少 Channel 变成了就绪状态。

当 select() 返回后,如果返回值大于 0 ,则可以调用 selectedKeys() 方法,该方法返回一个 Set 集合,该集合是一个 SelectionKey 的集合,SelectionKey 表示的是可选择通道 SelectableChannel 和一个特定的 Selector之间的注册关系。

  • SelectionKey 是一个抽象类,表示 SelectableChannel 在 Selector 中注册的标识.每个 Channel 向 Selector 注册时,都将会创建一个selectionKey

  • SelectionKey 是 SelectableChannel 与 Selector 的建立关系,并维护了 Channel 事件

  • 可以通过 cancel() 方法取消 key,取消的 key 不会立即从 Selector 中移除,而是添加到 cancelledKeys 中,在下一次 select() 操作时移除它.所以在调用某个 key 时,需要使用 isValid 进行校验。

SelectionKey 提供了两个非常重要的 “Set”:interest set 和 ready set。

  • interest set 表示感兴趣的事件,我们可以通过以下方式获取:
    int interestSet = selectionKey.interestOps();
    1
    2
    3
    4
    boolean isInterestedInAccept  = interestSet & SelectionKey.OP_ACCEPT;
    boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
    boolean isInterestedInRead = interestSet & SelectionKey.OP_READ;
    boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;
  • ready set:代表了 Channel 所准备好了的操作。
1
2
3
4
5
int readySet = selectionKey.readyOps();
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();

以下代码是一个处理 IO 操作的完整代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
while (true) {
selector.select();

Set<SelectionKey> selectedKeys = selector.selectedKeys();

Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = keyIterator.next();

if(key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.

} else if (key.isConnectable()) {
// a connection was established with a remote server.

} else if (key.isReadable()) {
// a channel is ready for reading

} else if (key.isWritable()) {
// a channel is ready for writing
}

// 这段代码非常重要,后面演示
key.remove();
}
}

这里有一段非常重要的代码 key.remove(),这行代码表示,我已经在处理该 IO 事件了,需要删除。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
public static void main(String[] args) throws Exception {
// 创建 ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

// 设置为非阻塞
serverSocketChannel.configureBlocking(false);

// 绑定 8081 端口
serverSocketChannel.bind(new InetSocketAddress(8081));

// 打开 Selector
Selector selector = Selector.open();

// 将 SocketChannel 注册到 Selector
// 通常我们都是先注册一个 OP_ACCEPT 事件, 然后在 OP_ACCEPT 到来时, 再将这个 Channel 的 OP_READ
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

while (true) {
// select 方法,一直阻塞直到有事件发生
selector.select();

// 获取 I/O 操作就绪的 SelectionKey, 通过 SelectionKey 可以知道哪些 Channel 的哪类 I/O 操作已经就绪
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();

while (iterator.hasNext()) {
SelectionKey key = iterator.next();

// 获取一个 SelectionKey 后,我们要将其删除掉,表示我们已经处理了这个事件
iterator.remove();

if (key.isAcceptable()) {
// 连接时间发生
// 当客户端连接服务端的时候,我们需要服务单与之建立连接
// 需要注意的是在 OP_ACCEPT 事件中, 从 key.channel() 返回的 Channel 是 ServerSocketChannel
ServerSocketChannel socketChannel = (ServerSocketChannel) key.channel();
// 需要从 socketChannel 获取 SocketChanel
SocketChannel clientChannel = socketChannel.accept();
log.info("{} 建立连接",clientChannel);
// 设置 clientChannel 为非阻塞
clientChannel.configureBlocking(false);

clientChannel.register(selector,SelectionKey.OP_READ);
} else if (key.isReadable()) {
// 获取的为 SocketChannel
SocketChannel clientChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(64);
int size = clientChannel.read(byteBuffer);
if (size < 0) {
// 小于 0 表示客户端断开连接,需要关闭该 SocketChannel
log.info("{},断开了连接",clientChannel);
clientChannel.close();
} else {
byteBuffer.flip();

CharBuffer charBuffer = Charset.forName("utf-8").decode(byteBuffer);

log.info("{},发来了消息,消息内容是:{}",clientChannel,charBuffer.toString());

// 服务端接收消息后,给客户端发送给客户端
Scanner scanner = new Scanner(System.in);
String string = scanner.nextLine();
ByteBuffer writeBuffer = Charset.forName("utf-8").encode(string);
clientChannel.write(writeBuffer);

if (writeBuffer.hasRemaining()) {
// 如果不能一次性发完只需要触发 write 事件去发
key.attach(writeBuffer);
key.interestOps(key.interestOps() + SelectionKey.OP_WRITE);
}
}
} else if (key.isWritable() && key.isValid()) {
ByteBuffer byteBuffer = (ByteBuffer) key.attachment();
SocketChannel clientChannel = (SocketChannel) key.channel();
byteBuffer.flip();

clientChannel.write(byteBuffer);

if (!byteBuffer.hasRemaining()) {
// 如果已完,则只无须关注 write 事件
key.attach(null);
key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
}
}
}
}
}

分析为什么要:key.remove()
这里拿上面那个问题来说明,为什么要加这 key.remove() 代码呢?首先这段代码的意思是说获取一个 SelectionKey 后,我们需要将其删除,表示我们已经对该 IO 事件进行了处理,如果没有这样代码会有什么后果呢?报 NullPointerException!

为什么会这样呢?这里我们来梳理整个流程。

  • 首先服务端创建一个 Selector,该 Selector 与 ServerSocketChannel 绑定,且关注 accept 事件。如下

img

  • 当客户端发起连接时,selector.selectedKeys() 会返回 Set 集合,该集合包含了已经准备就绪的 SelectionKey,这个时候只有连接事件,相对应的 SelectionKey 为 2b71fc7e

    img

  • 当服务端与客户端建立连接后,绑定 Selector 并关注 read 事件。这里需要注意的是 Selector 并不会主动去删除 SelectionKey,它只会增加,所以这个时候 Selector 里面有两个 SelectionKey,一个是 2b71fc7e(accept),一个是 1de0aca6(read)。建立连接后,事件处理完成,会该事件与之对应的事件去掉,也就是 2b71fc7e 的 SelectionKey 绑定的 ServerSocketChannel ,但是 Selector 里面对应的 SelectionKey 还是存在的。

img

  • 当客户端给服务端发送消息时,服务端监测到有事件发生,会将发生时间的 SelectionKey@1de0aca6 加入到 selectedKey 中,如下:

    img

  • 在迭代过程第一次取的是 SelectionKey@1de0aca6,这个是读事件,可以正常读,打印客户端发送过来的,但是第二次读取的是 SelectionKey@2b71fc7e,但是这个 Key 与之相绑定的事件已经处理了,它为 null,那肯定会报 NullPointerException。所以在使用 NIO 时一定要主动删除已经处理过的 SelectionKey ,既主动调用 key.remove(),删除该 SelectionKey。


原文链接:
https://blog.csdn.net/chenssy/article/details/134407984