美文网首页
2021-08-14_Nio多线程模型时序图分析

2021-08-14_Nio多线程模型时序图分析

作者: kikop | 来源:发表于2021-08-14 16:57 被阅读0次

20210814_Nio多线程模型时序图分析

1概述

1.6关于Nio多线程的时序图

1.6.1服务端初始化

从左到右,依次为:Main-->MyBossHaneler-->MyRefactor

[图片上传失败...(image-9715d-1628931332173)]


image-20210814164326402.png

1.6.2MyBossHandler

1.6.3MyReactorTaskRunnable

1.6.3.1轮询Boss事件1

[图片上传失败...(image-e5bc3c-1628931332173)]


image-20210814164506381.png

1.6.3.2轮询Work事件2

[图片上传失败...(image-e58fbc-1628931332173)]


image-20210814164528239.png

1.6.4MyWorkHandler

[图片上传失败...(image-ef36df-1628931332173)]


image-20210814164647998.png

2代码实战(多线程版本)

2.1maven依赖

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>${netty.version}</version>
</dependency>

2.2配置

2.3服务端

2.3.1MultiThreadEchoServerReactor

package com.kikop.myreactor.multipthreadapp.server;


import com.kikop.myreactor.multipthreadapp.config.NioDemoConfig;
import com.kikop.myreactor.multipthreadapp.server.accept.MyBossHandler;
import com.kikop.myreactor.multipthreadapp.server.refactor.MyReactorTaskRunnable;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;

/**
 * @author kikop
 * @version 1.0
 * @project Name: myrefactor_multipthreadapp
 * @file Name: EchoServerReactor
 * @desc 多线程反应器(一个Runnable任务)
 * @date 2021/6/22
 * @time 9:30
 * @by IDE: IntelliJ IDEA
 */
public class MultiThreadEchoServerReactor {

    private ServerSocketChannel serverSocketChannel;

    // 2个 selector 选择器
    private Selector[] selectors = new Selector[2];

    // 2个子反应器线程,boss,work
    private MyReactorTaskRunnable[] myReactorTaskRunnables = null;

    public MultiThreadEchoServerReactor() throws IOException {

        // 1.初始化多个selector选择器
        selectors[0] = Selector.open(); //new WindowsSelectorProvider
        selectors[1] = Selector.open();

        // 2.创建 serverSocketChannel
        serverSocketChannel = ServerSocketChannel.open();

        // 2.1.开启服务端监听
        InetSocketAddress address =
                new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,
                        NioDemoConfig.SOCKET_SERVER_PORT);
        serverSocketChannel.socket().bind(address);

        // 2.2.设置非阻塞
        serverSocketChannel.configureBlocking(false);

        // 3.创建选择键

        // 3.1.构建SelectionKey
        // 通过第一个 selector,负责监控新连接OP_ACCEPT事件
        // 对应通道:serverSocket
        SelectionKey sk = serverSocketChannel.register(selectors[0], SelectionKey.OP_ACCEPT);
        // 3.1.创建附件参数
        MyBossHandler attachObject = new MyBossHandler(selectors, serverSocketChannel);

        // 3.2.绑定附件参数:MyBossHandler
        // 附加新连接处理 MyBossHandler 处理器到 SelectionKey(选择键)
        sk.attach(attachObject);

        // 4.构建连个反应器线程
        // 4.1.第一个子反应器,一子反应器负责一个选择器
        MyReactorTaskRunnable subReactor1 = new MyReactorTaskRunnable(selectors[0]);
        // 4.2.第二个子反应器,一子反应器负责一个选择器
        MyReactorTaskRunnable subReactor2 = new MyReactorTaskRunnable(selectors[1]);
        myReactorTaskRunnables = new MyReactorTaskRunnable[]{subReactor1, subReactor2};
    }

    private void startService() {

        System.out.println("服务端开始启动...");
        // 2个线程,不断轮询、监听
        new Thread(myReactorTaskRunnables[0]).start(); // as server.boss,负责:OP_ACCEPT和事件分发
        new Thread(myReactorTaskRunnables[1]).start(); // as server.work,负载IO事件读写节和业务逻辑
        System.out.println("服务端启动成功!");
    }


    public static void main(String[] args) throws IOException {

        MultiThreadEchoServerReactor server =
                new MultiThreadEchoServerReactor();

        server.startService();
    }

}

2.3.2MyBossHandler(accept连接建立,构建MyWorkHandler)

package com.kikop.myreactor.multipthreadapp.server.accept;


import com.kikop.myreactor.multipthreadapp.server.handler.MyWorkHandler;

import java.io.IOException;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author kikop
 * @version 1.0
 * @project Name: mynettydemo
 * @file Name: BossHandler
 * @desc Boss连接处理器, 等同 于netty:bossNioEventLoop
 * boosGroup用于Accetpt连接建立事件
 * @date 2021/6/22
 * @time 10:30
 * @by IDE: IntelliJ IDEA
 */
public class BossHandler
        implements Runnable {

    AtomicInteger next = new AtomicInteger(0);

    private Selector[] selectors;
    private ServerSocketChannel serverSocket;

    public BossHandler(Selector[] selectors, ServerSocketChannel serverSocket) {
        this.selectors = selectors;
        this.serverSocket = serverSocket;
    }

    /**
     * 事件分发 BossHandler 对应的处理逻辑
     * 构建 MyWorkHandler,如果是多线程,则会将此时的 SocketChannel 注册到另外一个 selector
     */
    public void run() {
        try {

            SocketChannel channel = serverSocket.accept();
            if (channel != null) {

                // 将 channel 动态分配到 selector,通道IO事件就绪时,
                // 由 selector 进行分发(MyReactorTaskRunnable不断轮询监听,调用对应的 handlerXXX.run)

                // 顺序选择,每次连接对应的选择器
                // 第一个 selector连接压力比较大:负责连接监听 ServerSocketChannel +业务处理 SocketChannel
                // 第二个 selector 连接压力比较小:只负责业务处理 SocketChannel
                new MyWorkHandler(selectors[next.get()], channel);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        if (next.incrementAndGet() == selectors.length) { // 只能是:0,1
            next.set(0);
        }
    }
}

2.3.3MyReactorTaskRunnable(所有事件分发线程)

处理附件类型如下:

如果是 ServerSocketChannel(代表服务端),则 handler为 BossHandler
如果是 SocketChannel(代表连接的某个客户端网络通道),则 handler为 MyWorkHandler

package com.kikop.myreactor.multipthreadapp.server.handler;

import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.Set;


/**
 * @author kikop
 * @version 1.0
 * @project Name: mynettydemo
 * @file Name: MyReactorTask
 * @desc boss,work统一的线程,不同的是通道绑定的附件不一样
 * @date 2021/6/22
 * @time 10:30
 * @by IDE: IntelliJ IDEA
 */
public class MyReactorTaskRunnable implements Runnable {

    // 每个线程负责一个选择器的查询
    final Selector selector;

    public MyReactorTaskRunnable(Selector selector) {
        this.selector = selector;
    }

    public void run() {
        try {

            while (!Thread.interrupted()) { // 非阻塞等待

                // 负载某个 seleector 等待就绪事件
                selector.select();

                // 有读写,获取待就绪事件
                Set<SelectionKey> keySet = selector.selectedKeys();
                Iterator<SelectionKey> it = keySet.iterator();

                while (it.hasNext()) {
                    // Reactor负责 dispatch收到的事件
                    SelectionKey sk = it.next();
                    dispatch(sk);
                }
                keySet.clear();
            }
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }

    /**
     * 获得就绪IO事件并完成事件分发
     *
     * @param sk
     */
    void dispatch(SelectionKey sk) {

        // 1.获取 handler
        // 1.1.如果是 ServerSocketChannel(代表服务端),则 handler为 BossHandler
        // 1.2.如果是 SocketChannel(代表连接的某个客户端网络通道),则 handler为 MyWorkHandler
        Runnable handler = (Runnable) sk.attachment();

        // 在当前线程中处理,调用之前 attach 绑定到选择键的handler处理器对象
        if (handler != null) {
            System.out.println("dispatch:"+handler.getClass().toString());
            handler.run();
        }
    }
}

2.3.4MyWorkHandler

package com.kikop.myreactor.multipthreadapp.server.handler;


import com.kikop.util.Logger;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


/**
 * @author kikop
 * @version 1.0
 * @project Name: mynettydemo
 * @file Name: MyWorkHandler
 * @desc 执行业务处理逻辑(在一个主线程main开辟的线程池中)
 * 负责单个通道的同步sync读写
 * @date 2021/6/22
 * @time 10:30
 * @by IDE: IntelliJ IDEA
 */
public class MyWorkHandler implements Runnable {

    final SocketChannel channel;
    final SelectionKey sk;

    final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);


    static final int RECIEVING = 0, SENDING = 1;

    int state = RECIEVING;

    // 主线程 main:引入线程池
    static ExecutorService pool = Executors.newFixedThreadPool(4);

    /**
     * @param selector 可能是1或2
     * @param c
     * @throws IOException
     */
    public MyWorkHandler(Selector selector, SocketChannel c) throws IOException {

        channel = c;
        c.configureBlocking(false);

        // 取得选择键,设置感兴趣的IO事件
        sk = channel.register(selector, 0);

        // 将本 Handler:MultiThreadEchoHandler作为 MyReactorTaskRunnable.dispatch sk选择键的附件
        // 方便事件 dispatch
        sk.attach(this);

        // 向 SelectionKey 选择键注册Read就绪事件
        sk.interestOps(SelectionKey.OP_READ);

        // 使尚未返回的第一个选择操作立即返回,唤醒的
        // 原因是:注册了新的channel或者事件;channel关闭,取消注册;优先级更高的事件触发(如定时器事件),希望及时处理。
        selector.wakeup();
    }

    // MyReactorTaskRunnable.dispatch来触发
    public void run() {
        // 异步任务,在独立的线程池中执行
        pool.execute(new AysncPoolTask(this));
    }

    /**
     * synchronized 感觉没有意义 todo
     */
    public synchronized void process() {
        try {
            if (state == SENDING) {
                //写入通道
                channel.write(byteBuffer);
                //写完后,准备开始从通道读,byteBuffer切换成写模式
                byteBuffer.clear();
                //写完后,注册read就绪事件
                sk.interestOps(SelectionKey.OP_READ);
                //写完后,进入接收的状态
                state = RECIEVING;
            } else if (state == RECIEVING) {
                // 从通道读
                int length = 0;
                // 注意:channel.read 定义1024,kernel 内核有个缓存
                while ((length = channel.read(byteBuffer)) > 0) {
                    Logger.info(new String(byteBuffer.array(), 0, length));
                }
                //读完后,准备开始写入通道,byteBuffer切换成读模式
                byteBuffer.flip();
                //读完后,注册 write就绪事件

        
                sk.interestOps(SelectionKey.OP_WRITE);
                // 读完后,进入发送的状态
                state = SENDING;
            }
            //处理结束了, 这里不能关闭select key,需要重复使用
            //sk.cancel();

        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }


}

2.3.4.1AysncPoolTask

package com.kikop.myreactor.multipthreadapp.server.handler;


/**
 * @author kikop
 * @version 1.0
 * @project Name: mynettydemo
 * @file Name: AysncPoolTask
 * @desc 线程程中的某个任务
 * @date 2021/6/22
 * @time 10:30
 * @by IDE: IntelliJ IDEA
 */
public class AysncPoolTask implements Runnable {

    MyWorkHandler workHandler;

    public AysncPoolTask(MyWorkHandler workHandler) {
        this.workHandler = workHandler;
    }

    public void run() {
        this.workHandler.process();
    }
}

2.4客户端

2.4.1EchoClient

package com.kikop.myreactor.multipthreadapp.client;


import com.kikop.myreactor.multipthreadapp.config.NioDemoConfig;
import com.kikop.myreactor.singlethreadapp.client.handler.MyTaskRunnable;
import com.kikop.util.Print;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;

/**
 * @author kikop
 * @version 1.0
 * @project Name: mynettydemo
 * @file Name: EchoClient
 * @desc EchoClient
 * @date 2021/6/22
 * @time 10:30
 * @by IDE: IntelliJ IDEA
 */
public class EchoClient {

    public void start() throws IOException {

        InetSocketAddress address =
                new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP, NioDemoConfig.SOCKET_SERVER_PORT);

        // 1、获取网络通道
        SocketChannel socketChannel = SocketChannel.open(address);

        // 2、切换成非阻塞模式
        socketChannel.configureBlocking(false);

        // 不断的自旋、等待与服务端的连接完成,或者做一些其他的事情
        while (!socketChannel.finishConnect()) {

        }
        Print.tcfo("客户端启动成功!");

        // 3.启动业务处理线程
        MyTaskRunnable myTaskRunnable = new MyTaskRunnable(socketChannel);
        new Thread(myTaskRunnable).start();

    }

    public static void main(String[] args) throws IOException {
        new EchoClient().start();
    }
}

2.4.2MyTaskRunnable

package com.kikop.myreactor.multipthreadapp.client.handler;

import com.kikop.myreactor.multipthreadapp.config.NioDemoConfig;
import com.kikop.util.Dateutil;
import com.kikop.util.Logger;
import com.kikop.util.Print;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;

/**
 * @author kikop
 * @version 1.0
 * @project Name: mynettydemo
 * @file Name: MyTaskRunnable
 * @desc 客户端业务处理线程
 * @date 2021/6/22
 * @time 10:30
 * @by IDE: IntelliJ IDEA
 */
public class MyTaskRunnable implements Runnable {

    // 客户端选择器(提供通道的IO读、写事件的注册)
    private final Selector selector;
    private final SocketChannel channel;

    /**
     * MyTaskRunnable
     *
     * @param channel
     * @throws IOException
     */
    public MyTaskRunnable(SocketChannel channel) throws IOException {
        selector = Selector.open(); // Reactor初始化
        this.channel = channel;

        channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
    }

    public void run() {

        try {

            while (!Thread.interrupted()) {

                selector.select();
                // 获取就绪事件列表,底层用两个数组进行筛选,Buffer有读写事件,操作系统产生中断
                Set<SelectionKey> selected = selector.selectedKeys();
                Iterator<SelectionKey> it = selected.iterator();

                while (it.hasNext()) {
                    SelectionKey sk = it.next();
                    if (sk.isWritable()) { // 通道对应的缓存可写
                        ByteBuffer buffer = ByteBuffer.allocate(NioDemoConfig.SEND_BUFFER_SIZE);

                        Scanner scanner = new Scanner(System.in);
                        Print.tcfo("请输入发送内容:");

                        if (scanner.hasNext()) {

                            SocketChannel socketChannel = (SocketChannel) sk.channel();
                            String next = scanner.next();

                            // 先写数据到buffer
                            buffer.put((Dateutil.getNow() + " >>" + next).getBytes());

                            // buffer切为读
                            buffer.flip();

                            // 发送数据
                            // 通过 DatagramChannel数据报通道
                            socketChannel.write(buffer);

                            // 清空缓存
                            buffer.clear();
                        }

                    }
                    if (sk.isReadable()) { // 通道对应的缓存可读
                        // 若选择键的IO事件是“可读”事件,读取数据
                        SocketChannel socketChannel = (SocketChannel) sk.channel();

                        //读取数据
                        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

                        int length = 0;
                        // 注意:socketChannel.read需要等到一定的数量
                        while ((length = socketChannel.read(byteBuffer)) > 0) {

                            byteBuffer.flip(); // 方便下一次写数据到buffer
                            Logger.info("client recv:" + new String(byteBuffer.array(), 0, length));
                            // 业务处理完成,清空缓存
                            byteBuffer.clear();
                        }
                    }
                    // 处理结束了, 这里不能关闭 select key,需要重复使用
                    // selectionKey.cancel();
                }

                selected.clear();
            }
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }
}

总结

1.1优化演进

模式演进分为2个方面:
1)升级Refactor反应器。引入多个selector选择器,提升选择大量通道的能力。

2)升级Handler处理器。既要使用多线程,又要尽可能的高效率,则可以使用线程池。

总体如下:
1)将负责输入输出处理的IO Handler处理器的执行,放入独立的线程池中。这样,业务处理线程与负责服务监听和IO事件查询的反应器线程相隔离,避免服务器的连接监听收到阻塞。

2)如果服务器为多核CPU,可以将反应器线程拆分为多个子反应器线程;同时,引入多个选择器,每一个反应器线程负责一个选择器,这样,充分释放系统资源的能力;也提高了反应器管理大量连接,提升选择大量通道的能力。

参考

1大规模分布式系统架构与设计实战(彭渊)

http://code.google.com/p/fourinone

https://github.com/fourinone/fourinone

2 fourinone分布式协调设计解析

https://blog.51cto.com/3503265/1058623

3Netty面试题(2021 最新版)

https://blog.csdn.net/qq_17231297/article/details/117324371

相关文章

网友评论

      本文标题:2021-08-14_Nio多线程模型时序图分析

      本文链接:https://www.haomeiwen.com/subject/ovdebltx.html