原创

Java Nio基本概念Channel、Buffer、Selector及搭建简单的Nio聊天室

核心组件:
1、Channel(通道)
2、Buffer(缓冲区)
3、Selector(选择器)
核心组件
定义
作用
特点
具体使用、应用
Channel
(通道)
Java NIO数据的源头/目的地
(是缓冲区[Buffer]对象的唯一接口)
1、向缓冲区写数据
2、读取缓冲区数据
1、双向读、写。
即:可从通道中读书数据,又可写数据到通道)
2、异步读、写。
3、数据来源/流向总是缓冲区。
(即:通道数据总是先读到缓冲区/或写入缓冲区)
数据来源不同而对应不同Channel。
(含:文件IO、TCP、UDP网络IO)
1、FileChannel:文件读写数据。
2、DataChannel:UDP读写网络数据。
3、SocketChannel:TCP读写网络数据。
4、ServerSocketChannel:监听新进来的TCP连接,对每一个新进来的连接创建一个对应的SocketChannel。
Buffer
(缓冲区)
Java NIO数据读/写的中转地
(一块连续的内存块)
数据缓存
适用于所有基本数据类型
(除了布尔boolean类型)
1、根据缓存的数据类型不同而不同
(含:能通过IO发送的基本数据类型:byte、short、int、long、float、double、char)
(1)ByteBuffer
(2)ShortBuffer
(3)IntBuffer
(4)LongBuffer
(5)FloatBuffer
(6)DoubleBuffer
(7)CharBuffer
2、类型之间相互转换:asXXXBuffer()方法
如:将ByteBuffer转为FloatBuffer。
ByteBuffer buf = ByteBuffer.allocate(1024);
buff.asFloatBuffer();
Selector
(选择器)
异步IO的核心类
实现异步、非阻塞IO各种操作
允许1个Selector线程 管理&处理 多个通道(Channel)
1、使用一个Selector线程检测1个/多个通道(Channel)上的事件&基于事件驱动 分发事件。
2、不需要为每个Channel去分配一个线程。
3、事件驱动机制 = 事件到的时候出发,而非同步监视事件。
使用步骤:
1、创建Selector对象。
2、向Selector注册通道Channel。
3、调用Selector的select()方法。
注:
该方法会一直阻塞到某个注册的Channel有事件。
一旦该方法返回,线程就可以处理这些注册事件,
事件驱动,如:新连接、数据接收、数据发送等。

Buffer:缓冲区
buffer类结构
XXXBuffer读写类、XXXBufferR只读类。
XXXBufferR类继承XXXBuffer类并重写了所有的可修改 buffer 的方法。把所有能修改 buffer 的方法都直接 throw ReadOnlyBufferException,来保证只读。

Buffer四个属性:
这四个属性之间总是遵循以下关系:
0 <= mark <= position <= limit <= capacity
1、capacity:容量。
缓冲区能够容纳的数据元素的最大数量。这一容量在缓冲区创建时被设定,并且永远不能被改变。

2、limit:上界。
缓冲区的第一个不能被读或写的元素。或者说,缓冲区中现存元素的计数。

3、position:指针位置。
下一个要被读或写的元素的索引。位置会自动由相应的 get( )和 put( )函数更新。

4、mark:标记位置。
一个备忘位置。调用 mark( )来设定 mark = postion。调用 reset( )设定 position = mark。标记在设定前是-1。

Buffer三种异常:
1、BufferOverflowException:缓冲区溢出异常。
发生在put()时,如果操作的position大于上界limit时,则抛出BufferOverflowException。

2、BufferUnderflowException:缓冲区下溢异常。
发生在get()时,如果操作的position大于上界limit时,则抛出BufferUnderflowException。

3、IndexOutOfBoundsException:索引越界异常。
put(byte[] src, int offset, int length)get(byte[] dst, int offset, int length)时导致数组索引越界,则抛出IndexOutOfBoundsException。
注:offset在position的基础上偏移。

Buffer常用方法:
  • final int capacity():返回缓冲区的容量。
  • final int position( ) :返回缓冲区的指针位置。
  • final Buffer position(int newPosition):设置缓冲区的位置。
  • final int limit( ) :返回缓冲区的上界。
  • final Buffer limit (int newLimit) :设置缓冲区的上界。
  • final Buffer mark( ) :设置缓冲区的标记。
mark = position。
  • final int remaining( ) :返回当为位置与上界之间的元素数。
return limit - position;
  • final boolean hasRemaining( ) :缓存的位置与上界之间是否还有元素。
return position < limit;
  • abstract boolean isReadOnly( ):缓冲区是否为只读缓冲区。

  • final Buffer reset( ) :将缓冲区的位置重置为标记的位置。
mark = position。
  • final Buffer clear( ) :清除缓冲区为初始状态。
mark = -1;
position = 0;
limit = capacity;
  • final Buffer flip( ) :反转缓冲区
mark = -1;
limit = position;
position = 0;
  • final Buffer rewind( ) :回退缓冲区。
mark = -1;
position = 0;

  • ByteBuffer put():将数据放入缓冲区,并移动position。
  • ByteBuffer get():获取缓冲区数据,并移动position。
  • ByteBuffer compact():压缩缓冲区。
System.arrayCopy(hb, postion, hb, 0, remaining);
mark = -1;
position = remaining;
limit = capacity;
  • ByteBuffer duplicate():复制缓冲区。
创建了一个与原始缓冲区相似的新缓冲区。两个缓冲区共享数据元素,拥有同样的容量,但每个缓冲区拥有各自的位置position,上界limit和标记mark属性。对一个缓冲区内的数据元素所做的改变会反映在另外一个缓冲区上。这一副本缓冲区具有与原始缓冲区同样的数据视图。如果原始的缓冲区为只读,或者为直接缓冲区,新的缓冲区将继承这些属性。
  • ByteBuffer asReadOnlyBuffer():复制为只读缓冲区。
与duplicate相同,只是不允许put(),isReadOnly()为true。
  • ByteBuffer slice():分割缓冲区
创建一个从原始缓冲区的当前 position 开始的新缓冲区,并且其容量capacity是原始缓冲区的剩余元素数量remaining(limit - position),limit为分割创建的capacity。这个新缓冲区与原始缓冲区共享一段数据元素子序列。分割出来的缓冲区也会继承只读和直接属性。


Selector:多路复用选择器
用于检查一个或多个NIO Channel的状态是否处于可读、可写。如此可以实现单线程管理多个channels,也就是可以管理多个网络链接。

1、SelectionKey:维护Channel与Selector的对应关系。

selectionKey的相关操作。
key.attachment(); //返回SelectionKey的attachment,attachment可以在注册channel的时候指定。
key.channel(); // 返回该SelectionKey对应的channel。
key.selector(); // 返回该SelectionKey对应的Selector。
key.interestOps(); //返回代表需要Selector监控的IO操作的bit mask
key.readyOps(); // 返回一个bit mask,代表在相应channel上可以进行的IO操作。

判断Selector是否对某个Channel的某种event事件感兴趣。
int interestSet = selectionKey.interestOps(); 
boolean isInterestedInAccept = (interestSet & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;

判断某个Channel的操作是否准备就绪。
//创建ready集合的方法
int readySet = selectionKey.readyOps();
//检查这些操作是否就绪的方法
boolean isAcceptable=key.isAcceptable();//是否可读,是返回 true
boolean isWritable()://是否可写,是返回 true
boolean isConnectable()://是否可连接,是返回 true
boolean isAcceptable()://是否可接收,是返回 true


通过SelectionKey访问其对应的Channel于Selector。
Channel channel = key.channel();
Selector selector = key.selector();

SelectionKey四种事件类型:
  • SelectionKey.OP_ACCEPT:服务端接收到新请求事件。
  • SelectionKey.OP_CONNECT:客户端连接上服务端事件。
  • SelectionKey.OP_READ:有IO可读数据事件。
  • SelectionKey.OP_WRITE:有IO可写数据事件。
Channel类型
描述
OP_ACCEPT
OP_CONNECT
OP_READ
OP_WRITE
SocketChannel
客户端Channel
Y
Y
Y
ServerSocketChannel
服务端Channel
Y
SocketChannel
服务端建立与客户端通讯的Channel
Y
Y
DatagramChannel
UDP通讯Channel
Y
Y

2、Selector
3个重要的SelectionKey集合:
(1)HashSet<SelectionKey> keys:所有注册到Selector的Channel所表示的SelectionKey都会存在于该集合中。keys元素的添加会在Channel注册到Selector时发生。这里的keys不代表所有的SelectionKey都是有效的。

(2)Set<SelectionKey> selectedKey:该集合中的每个SelectionKey都是其对应的Channel在上一次操作selection期间被检查到至少有一种SelectionKey中所注册的事件操作已经准备好被处理。该集合是keys的一个子集。

(3)Set<SelectionKey> cancelledKeys:执行了取消操作的SelectionKey会被放入到该集合中。该集合是keys的一个子集。

相关操作方法:
(1)Selector.open():创建一个Selector对象。

(2)selector.select():返回事件就绪的selectionKey数据(准备就绪的Channel数)。
该方法会一直阻塞直到至少一个Channel被选择(该Channel注册的event事件发生)为止,除非当前线程发生中断或者selector的wakeup方法被调用。

(3)selector.selectNow():不会阻塞,立即返回事件就绪的selectionKey数据(准备就绪的Channel数)。

(4)selector.selectedKeys():返回已经就绪的SelectionKey集合。
该方法一般会在selector.select()之后调用。

(5)selector.wakeup():唤醒当前select()阻塞线程。下一次select()还是会阻塞。

(6)Selector.close():唤醒所有阻塞线程,清除绑定在selector上的channel事件,对channel本身无影响。


示例:java nio实现简单聊天室
server.java
package com.weilai.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

/**
* @author weilai
* @email 352342845@qq.com
* @date 2019-11-22 21:27
*/
public class Server {

private static ConcurrentHashMap<String, SocketChannel> clientMap = new ConcurrentHashMap<>();

public static void main(String[] args) throws IOException {
// 创建一个ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 设置为非阻塞模式
serverSocketChannel.configureBlocking(false);
// 得ServerSocket
ServerSocket serverSocket = serverSocketChannel.socket();
// 绑定端口
serverSocket.bind(new InetSocketAddress(8000));
// 创建selector对象
Selector selector = Selector.open();
// 给ServerSocketChannel注册accept事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// 返回事件就绪的selectionKey数据,一直阻塞直到至少一个Channel被选择(该Channel注册的event事件发生)为止,除非当前线程发生中断或者selector的wakeup方法被调用。
selector.select();
// 获取事件触发的selectedKey
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
// 移除selectedKey
iterator.remove();
// 如果触发accept事件
if (selectionKey.isAcceptable()) {
// 得到ServerSocketChannel
ServerSocketChannel serverChannel = (ServerSocketChannel) selectionKey.channel();
// 获取accept事件创建的客户端SocketChannel
SocketChannel clientSocketChannel = serverChannel.accept();
System.out.println("客户端IP:" + clientSocketChannel.socket().getInetAddress().getHostAddress() + ",客户端端口:" + clientSocketChannel.socket().getPort());
// 设置客户端SocketChannel为非阻塞
clientSocketChannel.configureBlocking(false);
// 注册客户端SocketChannel可读事件到selector
clientSocketChannel.register(selector, SelectionKey.OP_READ);
// 保存到对应关系中
clientMap.put(UUID.randomUUID().toString(), clientSocketChannel);
// 如果触发了可读事件
} else if (selectionKey.isReadable()) {
// 初始化buffer
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// 得到客户端SocketChannel
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
try {
// 读取数据
int read = socketChannel.read(byteBuffer);
if (read > 0) {
// 重置buffer指针
byteBuffer.flip();
Charset charset = Charset.forName("utf-8");
// 将buffer转为string
String receiveMsg = String.valueOf(charset.decode(byteBuffer).array());
System.out.println(socketChannel + ":" + receiveMsg);
// 发送给其他客户端socket
for (Map.Entry<String, SocketChannel> entry : clientMap.entrySet()) {
SocketChannel returnSocketChannel = entry.getValue();
ByteBuffer msgByffer = ByteBuffer.wrap(("senderkey:" + entry.getKey() + receiveMsg).getBytes());
msgByffer.flip();
returnSocketChannel.write(msgByffer);
}
}
} catch (IOException e) {
e.printStackTrace();
System.out.println(socketChannel.getRemoteAddress().toString() + "断开连接");
// 关闭对应的socketChannel
socketChannel.close();
String senderkey = null;
for (Map.Entry<String, SocketChannel> entry : clientMap.entrySet()) {
if (entry.getValue() == socketChannel) {
senderkey = entry.getKey();
break;
}
}
clientMap.remove(senderkey, socketChannel);
}
}
}
}

}
}


client.java
package com.weilai.nio;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.time.LocalDateTime;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* @author weilai
* @email 352342845@qq.com
* @date 2019-11-22 21:49
*/
public class Client {

public static void main(String[] args) throws IOException {
// 创建客户端SocketChannel
SocketChannel socketChannel = SocketChannel.open();
// 设置为非阻塞模式
socketChannel.configureBlocking(false);
// 创建selector对象
Selector selector = Selector.open();
// 客户端SocketChannel注册连接事件
socketChannel.register(selector, SelectionKey.OP_CONNECT);
// 连接服务端
socketChannel.connect(new InetSocketAddress(8000));
while (true) {
// 返回事件就绪的selectionKey数据,一直阻塞直到至少一个Channel被选择(该Channel注册的event事件发生)为止,除非当前线程发生中断或者selector的wakeup方法被调用。
selector.select();
// 获取事件触发的selectedKey
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
// 移除触发的selectedKey
iterator.remove();
// 如果触发连接事件
if (selectionKey.isConnectable()) {
SocketChannel clientChannel = (SocketChannel) selectionKey.channel();
// 如果正在连接
if (clientChannel.isConnectionPending()) {
// 完成连接
clientChannel.finishConnect();
// 创建buffer并写入数据
ByteBuffer byteBuffer = ByteBuffer.wrap((LocalDateTime.now() + clientChannel.getLocalAddress().toString() + "连接成功").getBytes());
// 重置buffer指定
byteBuffer.flip();
// 数据写入channel中(即发送到服务端)
clientChannel.write(byteBuffer);
//接受键盘输入,因为非阻塞,所以启动线程接收不阻塞主线程
ExecutorService executorService = Executors.newSingleThreadExecutor(Executors.defaultThreadFactory());
executorService.submit(() -> {
while (true) {
try {
// 获取键盘输入
InputStreamReader inputStreamReader = new InputStreamReader(System.in);
BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
String msg = bufferedReader.readLine();
// 清空buffer
byteBuffer.clear();
// 将数据放入buffer
byteBuffer.put(msg.getBytes());
// 重置buffer指针
byteBuffer.flip();
// 数据写入channel中(即发送到服务端)
clientChannel.write(byteBuffer);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
// 注册可读事件
clientChannel.register(selector, SelectionKey.OP_READ);
// 如果触发了可读事件
} else if (selectionKey.isReadable()) {
// 得到客户端SocketChannel
SocketChannel clientChannel = (SocketChannel) selectionKey.channel();
// 分配buffer
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
try {
// 读取数据
int read = clientChannel.read(byteBuffer);
if (read > 0) {
// 重置buffer指针
byteBuffer.flip();
System.out.println(new String(byteBuffer.array(), 0, read));
}
} catch (Exception e) {
System.out.println(clientChannel.getRemoteAddress().toString() + "断开连接");
// 关闭socketChannel
clientChannel.close();
}

}
}
}
}
}



正文到此结束
本文目录