概述
Non-Blocking I/O,是一种非阻塞通信模型。在java.nio式jdk1.4版本引入的一套API,我们可以利用这套API实现非阻塞的网络编程模型。
大数据和实时计算的兴起,高性能RPC框架与网络编程技术再次成伟焦点。比图Fackebook的Thrift框架,scala的Akka框架,实时流领域的Storm、Spark框架,又或者开源分布式数据库的Mycat、VoltDB,这些框架的底层通信都采用了NIO通信技术。而java领域里大名鼎鼎的NIO框架——Netty,则被众多的开源或商业软件所采用。
NIO适合高并发、高访问量、段请求
Buffer
Buffer 缓冲区,是NIO通讯时数据的载体。常用的缓冲区是 ByteBuffer(字节缓冲区)。
// 创建10个字节的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(capacity);
缓冲区的属性:
- capacity (容量):决定了存储容量的上线,一经写定,不能更改。
- limit (限制):限制的初始位置=capacity
- position (位置):初始值是0,但每当插入一个字节,就会向后移动一位
ByteBuffer 默认用的子类是 HeapByteBuffer(堆内字节缓冲区),这种类型的缓冲区,在JVM的堆中创建的,即缓冲区的生命周期由 (GC) JVM 管理的。MappedByteBuffer(堆外字节缓冲区),可以使用操作系统的内存,使用场景当创建大的字节缓冲区时,注意:如果使用堆外,生命周期的管理需要自来实现。
缓冲区的方法:
- get() 方法会根据当前position的位置取值,才外,get() 没调用一次,位置会移动一位。
- buffer.limit(buffer.position()) 获取buffer的position位置,并赋予 limit限制
- buffer.flip() 和buffer.limit(buffer.position()) 类似
- ByteBuffer.wrap(”test“.getByte()) 根据传入的字节数组创建对应大小的缓冲区,并写入数据,写完后,会自动掉用 flip()方法
- clear() 该方法不会清除缓冲区的数据,只会把position重置为0,让后面的(3字节)新数据,覆盖前面的(6字节)老数据,但还有3字节的数据没有覆盖,可以使用 flip() 来清除掉未覆盖的老数据。
- hasRemaining() 判断 limit 和 position之间是否还有元素可读,有返回true,无返回false
Channel
Channel 是通道,就像告诉公路从A城市通往B城市,而 buffer 就是货车。和socket的连接方式差不多,具体代码如下:
server端
public static void main(String[] args) throws Exception {
ServerSocketChannel server = ServerSocketChannel.open();
// 设置 socket server 为非阻塞通信
server.configureBlocking(false);
// 绑定 本地 ip和8888 做为服务器连接口
server.bind(new InetSocketAddress(8888));
SocketChannel sc = null;
// 等待接收到一个通信连接
while (sc == null){
sc = server.accept();
}
// 设置改通信连接 也为非阻塞模式
sc.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(10);
// 读取写道Buffer
sc.read(buffer);
String str = new String(buffer.array());
System.out.println("服务收到消息:"+str);
}
client端
public static void main(String[] args) throws Exception {
SocketChannel sock = SocketChannel.open();
sock.connect(new InetSocketAddress("127.0.0.1",8888));
ByteBuffer buffer = ByteBuffer.wrap("hellow".getBytes());
sock.write(buffer);
}
Selector
Selector 一般称 为选择器 ,当然你也可以翻译为 多路复用器 。它是Java NIO核心组件中的一个,用于检查一个或多个NIO Channel(通道)的状态是否处于可读、可写。如此可以实现单线程管理多个channels,也就是可以管理多个网络链接。
使用Selector的好处在于: 使用更少的线程来就可以来处理通道了, 相比使用多个线程,避免了线程上下文切换带来的开销。
服务端代码
public static void main(String[] args) throws Exception {
ServerSocketChannel server = ServerSocketChannel.open();
server.configureBlocking(false);
server.bind(new InetSocketAddress(8888));
// 获取多路复用选择器
Selector selector = Selector.open();
// 服务端注册 接收客户端的 监听事件
server.register(selector, SelectionKey.OP_ACCEPT);
while(true){
// 该方法会阻塞,直到有事件到达u,才会放心
selector.select();
// 获取所有事件的key,走到这代表有事件来了
Set<SelectionKey> selectionKeys = selector.selectedKeys();
// 准备迭代所有事件
Iterator<SelectionKey> selectionKeyIterable = selectionKeys.iterator();
while (selectionKeyIterable.hasNext()){
// 获取事件
SelectionKey selectionKey = selectionKeyIterable.next();
// 表示有客户连接
if(selectionKey.isAcceptable()){
// 得到连接
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
// 建立和对应客户通信的通道
SocketChannel socketChannel = serverSocketChannel.accept();
// 谁知为非阻塞通信
socketChannel.configureBlocking(false);
// 注册拥有读写事件
socketChannel.register(selector, SelectionKey.OP_READ|SelectionKey.OP_WRITE);
}
// 表示有数据发送到服务端,这里可以不用继续设置 configureBlocking 为非阻塞,在连接的时候通道已经标记过了
if(selectionKey.isReadable()){
// 得到连接
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(20);
socketChannel.read(byteBuffer);
while (byteBuffer.hasRemaining()){
System.out.println("收到客户端的通信: "+new String(byteBuffer.array()));
}
}
// 给客户端发送数据
if(selectionKey.isWritable()){
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.wrap("3333333".getBytes());
// 读写时非阻塞的,为了确保读写完整性,需要加上 hasRemaining
while (byteBuffer.hasRemaining()){
socketChannel.write(byteBuffer);
}
}
// 处理完毕移除事件,避免重复
selectionKeyIterable.remove();
}
}
}
客户端代码
public static void main(String[] args) throws Exception {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("127.0.0.1",8888));
Selector selector = Selector.open();
socketChannel.register(selector,SelectionKey.OP_CONNECT);
while(true){
selector.select();
Iterator<SelectionKey> selectionKeyIterator = selector.selectedKeys().iterator();
while (selectionKeyIterator.hasNext()){
SelectionKey selectionKey = selectionKeyIterator.next();
if(selectionKey.isConnectable()){
SocketChannel sc = (SocketChannel) selectionKey.channel();
sc.register(selector,SelectionKey.OP_READ|SelectionKey.OP_WRITE);
}
if(selectionKey.isReadable()){
SocketChannel sc = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(10);
while (byteBuffer.hasRemaining()){
System.out.println("收到服务端消息:"+new String(byteBuffer.array()));
}
}
if(selectionKey.isWritable()){
SocketChannel sc = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.wrap("3333333".getBytes());
// 读写时非阻塞的,为了确保读写完整性,需要加上 hasRemaining
while (byteBuffer.hasRemaining()){
sc.write(byteBuffer);
}
}
selectionKeyIterator.remove();
}
}
}
FileChannel
public static void main(String[] args) throws Exception {
FileChannel write = new FileOutputStream(new File("1.txt")).getChannel();
ByteBuffer writeByteBuffer = ByteBuffer.wrap("2321312".getBytes());
if(writeByteBuffer.hasRemaining()) {
write.write(writeByteBuffer);
}
write.close();
FileChannel read = new FileInputStream(new File("1.txt")).getChannel();
ByteBuffer readByteBuffer = ByteBuffer.wrap("2321312".getBytes());
read.read(readByteBuffer);
if(readByteBuffer.hasRemaining()) {
System.out.println("文件内容:"+new String(readByteBuffer.array()));
}
read.close();
}













网友评论