美文网首页
Java NIO API

Java NIO API

作者: _大叔_ | 来源:发表于2021-07-20 15:46 被阅读0次

概述

Non-Blocking I/O,是一种非阻塞通信模型。在java.nio式jdk1.4版本引入的一套API,我们可以利用这套API实现非阻塞的网络编程模型。

大数据和实时计算的兴起,高性能RPC框架与网络编程技术再次成伟焦点。比图Fackebook的Thrift框架,scala的Akka框架,实时流领域的Storm、Spark框架,又或者开源分布式数据库的Mycat、VoltDB,这些框架的底层通信都采用了NIO通信技术。而java领域里大名鼎鼎的NIO框架——Netty,则被众多的开源或商业软件所采用。

NIO适合高并发、高访问量、段请求

Buffer

Buffer 缓冲区,是NIO通讯时数据的载体。常用的缓冲区是 ByteBuffer(字节缓冲区)。

// 创建10个字节的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(capacity);

缓冲区的属性:

  • capacity (容量):决定了存储容量的上线,一经写定,不能更改。
  • limit (限制):限制的初始位置=capacity
  • position (位置):初始值是0,但每当插入一个字节,就会向后移动一位

ByteBuffer 默认用的子类是 HeapByteBuffer(堆内字节缓冲区),这种类型的缓冲区,在JVM的堆中创建的,即缓冲区的生命周期由 (GC) JVM 管理的。MappedByteBuffer(堆外字节缓冲区),可以使用操作系统的内存,使用场景当创建大的字节缓冲区时,注意:如果使用堆外,生命周期的管理需要自来实现。

缓冲区的方法:

  • get() 方法会根据当前position的位置取值,才外,get() 没调用一次,位置会移动一位。
  • buffer.limit(buffer.position()) 获取buffer的position位置,并赋予 limit限制
  • buffer.flip() 和buffer.limit(buffer.position()) 类似
  • ByteBuffer.wrap(”test“.getByte()) 根据传入的字节数组创建对应大小的缓冲区,并写入数据,写完后,会自动掉用 flip()方法
  • clear() 该方法不会清除缓冲区的数据,只会把position重置为0,让后面的(3字节)新数据,覆盖前面的(6字节)老数据,但还有3字节的数据没有覆盖,可以使用 flip() 来清除掉未覆盖的老数据。
  • hasRemaining() 判断 limit 和 position之间是否还有元素可读,有返回true,无返回false

Channel

Channel 是通道,就像告诉公路从A城市通往B城市,而 buffer 就是货车。和socket的连接方式差不多,具体代码如下:

server端

    public static void main(String[] args) throws Exception {
        ServerSocketChannel server = ServerSocketChannel.open();
        // 设置 socket server 为非阻塞通信
        server.configureBlocking(false);
        // 绑定 本地 ip和8888 做为服务器连接口
        server.bind(new InetSocketAddress(8888));

        SocketChannel sc = null;
        // 等待接收到一个通信连接
        while (sc == null){
            sc = server.accept();
        }
        // 设置改通信连接 也为非阻塞模式
        sc.configureBlocking(false);

        ByteBuffer buffer = ByteBuffer.allocate(10);
        // 读取写道Buffer
        sc.read(buffer);
        String str = new String(buffer.array());
        System.out.println("服务收到消息:"+str);
    }

client端

    public static void main(String[] args) throws Exception {
        SocketChannel sock = SocketChannel.open();
        sock.connect(new InetSocketAddress("127.0.0.1",8888));
        ByteBuffer buffer = ByteBuffer.wrap("hellow".getBytes());
        sock.write(buffer);
    }

Selector

Selector 一般称 为选择器 ,当然你也可以翻译为 多路复用器 。它是Java NIO核心组件中的一个,用于检查一个或多个NIO Channel(通道)的状态是否处于可读、可写。如此可以实现单线程管理多个channels,也就是可以管理多个网络链接。
使用Selector的好处在于: 使用更少的线程来就可以来处理通道了, 相比使用多个线程,避免了线程上下文切换带来的开销。

服务端代码

    public static void main(String[] args) throws Exception {
        ServerSocketChannel server = ServerSocketChannel.open();
        server.configureBlocking(false);
        server.bind(new InetSocketAddress(8888));
        // 获取多路复用选择器
        Selector selector = Selector.open();
        // 服务端注册 接收客户端的 监听事件
        server.register(selector, SelectionKey.OP_ACCEPT);
        while(true){
            // 该方法会阻塞,直到有事件到达u,才会放心
            selector.select();
            // 获取所有事件的key,走到这代表有事件来了
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            // 准备迭代所有事件
            Iterator<SelectionKey> selectionKeyIterable = selectionKeys.iterator();
            while (selectionKeyIterable.hasNext()){
                // 获取事件
                SelectionKey selectionKey = selectionKeyIterable.next();
                // 表示有客户连接
                if(selectionKey.isAcceptable()){
                    // 得到连接
                    ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
                    // 建立和对应客户通信的通道
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    // 谁知为非阻塞通信
                    socketChannel.configureBlocking(false);
                    // 注册拥有读写事件
                    socketChannel.register(selector, SelectionKey.OP_READ|SelectionKey.OP_WRITE);
                }
                // 表示有数据发送到服务端,这里可以不用继续设置 configureBlocking 为非阻塞,在连接的时候通道已经标记过了
                if(selectionKey.isReadable()){
                    // 得到连接
                    SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                    ByteBuffer byteBuffer = ByteBuffer.allocateDirect(20);
                    socketChannel.read(byteBuffer);
                    while (byteBuffer.hasRemaining()){
                        System.out.println("收到客户端的通信: "+new String(byteBuffer.array()));
                    }
                    
                }
                // 给客户端发送数据
                if(selectionKey.isWritable()){
                    SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                    ByteBuffer byteBuffer = ByteBuffer.wrap("3333333".getBytes());
                    // 读写时非阻塞的,为了确保读写完整性,需要加上 hasRemaining
                    while (byteBuffer.hasRemaining()){
                        socketChannel.write(byteBuffer);
                    }
                }
                // 处理完毕移除事件,避免重复
                selectionKeyIterable.remove();
            }
        }
    }

客户端代码

    public static void main(String[] args) throws Exception {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        socketChannel.connect(new InetSocketAddress("127.0.0.1",8888));
        Selector selector = Selector.open();
        socketChannel.register(selector,SelectionKey.OP_CONNECT);
        while(true){
            selector.select();
            Iterator<SelectionKey> selectionKeyIterator = selector.selectedKeys().iterator();
            while (selectionKeyIterator.hasNext()){
                SelectionKey selectionKey = selectionKeyIterator.next();
                if(selectionKey.isConnectable()){
                    SocketChannel sc = (SocketChannel) selectionKey.channel();
                    sc.register(selector,SelectionKey.OP_READ|SelectionKey.OP_WRITE);
                }
                if(selectionKey.isReadable()){
                    SocketChannel sc = (SocketChannel) selectionKey.channel();
                    ByteBuffer byteBuffer = ByteBuffer.allocate(10);
                    while (byteBuffer.hasRemaining()){
                        System.out.println("收到服务端消息:"+new String(byteBuffer.array()));
                    } 
                }
                if(selectionKey.isWritable()){
                    SocketChannel sc = (SocketChannel) selectionKey.channel();
                    ByteBuffer byteBuffer = ByteBuffer.wrap("3333333".getBytes());
                    // 读写时非阻塞的,为了确保读写完整性,需要加上 hasRemaining
                    while (byteBuffer.hasRemaining()){
                        sc.write(byteBuffer);
                    }
                }
                selectionKeyIterator.remove();
            }
        }
    }

FileChannel

    public static void main(String[] args) throws Exception {
        FileChannel write = new FileOutputStream(new File("1.txt")).getChannel();
        ByteBuffer writeByteBuffer = ByteBuffer.wrap("2321312".getBytes());
        if(writeByteBuffer.hasRemaining()) {
            write.write(writeByteBuffer);
        }
        write.close();

        FileChannel read = new FileInputStream(new File("1.txt")).getChannel();
        ByteBuffer readByteBuffer = ByteBuffer.wrap("2321312".getBytes());
        read.read(readByteBuffer);
        if(readByteBuffer.hasRemaining()) {
            System.out.println("文件内容:"+new String(readByteBuffer.array()));
        }
        read.close();

    }

相关文章

网友评论

      本文标题:Java NIO API

      本文链接:https://www.haomeiwen.com/subject/egzirltx.html