美文网首页
NioEventLoop

NioEventLoop

作者: Pillar_Zhong | 来源:发表于2019-08-01 16:40 被阅读0次
1563541630038.png

一个 EventLoop 将由一个永远都不会改变的Thread 驱动,同时任务(Runnable 或者 Callable)可以直接提交给 EventLoop 实现,以立即执行或者调度执行。根据配置和可用核心的不同,可能会创建多个 EventLoop 实例用以优化资源的使用,并且单个 EventLoop 可能会被指派用于服务多个 Channel。

EventLoop 就是一个单例的线程池,里面含有一个死循环的线程不断的做着3件事情:监听端口,处理端口事件,处理队列事件。每个 EventLoop 都可以绑定多个 Channel,而每个 Channel 始终只能由一个 EventLoop 来处理。

初始化

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
             SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
    super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
    if (selectorProvider == null) {
        throw new NullPointerException("selectorProvider");
    }
    if (strategy == null) {
        throw new NullPointerException("selectStrategy");
    }
    provider = selectorProvider;
    // 创建selector
    selector = openSelector();
    selectStrategy = strategy;
}

protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                    boolean addTaskWakesUp, int maxPendingTasks,
                                    RejectedExecutionHandler rejectedHandler) {
    super(parent);
    this.addTaskWakesUp = addTaskWakesUp;
    this.maxPendingTasks = Math.max(16, maxPendingTasks);
    // 保存executor
    this.executor = ObjectUtil.checkNotNull(executor, "executor");
    // 初始化taskqueue,mpsc
    taskQueue = newTaskQueue(this.maxPendingTasks);
    rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}

启动

bind

public ChannelFuture bind(int inetPort) {
    return bind(new InetSocketAddress(inetPort));
}
// NioEventLoop的启动在执行AbstractBootstrap.doBind0方法时触发
private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {

    // 拿到channel绑定的NioEventLoop,然后调用execute方法来执行bind
    // 这里实际内部会将传入的Runnable封装成task,加到queue里面,等待进一步处理
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

execute

public void execute(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }

    // 当前调用线程和分配给EventLoop的线程是否一致
    // 因为是启动流程,所以这里线程还没有启动,返回false
    boolean inEventLoop = inEventLoop();
    if (inEventLoop) {
        // 如果是,加到EventLoop里的taskqueue里,等待执行
        addTask(task);
    } else {
        // 启动线程
        startThread();
        addTask(task);
        if (isShutdown() && removeTask(task)) {
            reject();
        }
    }

    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}

startThread

private void doStartThread() {
    // 首先这里的executor就是ThreadPerTaskExecutor
    // 而execute就会启动一个新的线程来执行Runnable
    executor.execute(new Runnable() {
        @Override
        public void run() {
            // 而这里就是设置NioEventLoop的绑定线程
            thread = Thread.currentThread();
            ...
            try {
                // 调用NioEventLoop的run方法
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
               ...
            } finally {
               ...
            }
        }
    });
}

执行

protected void run() {
    for (;;) {
        try {
            // 检测IO事件
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.SELECT:                 
                    select(wakenUp.getAndSet(false));
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                default:
                    // fallthrough
            }

            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                try {
                    // 处理IO事件
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    // 执行reactor线程任务
                    runAllTasks();
                }
            } else {
                final long ioStartTime = System.nanoTime();
                try {
                    // 处理IO事件
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    final long ioTime = System.nanoTime() - ioStartTime;
                    // 执行reactor线程任务
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        // Always handle shutdown even if the loop processing threw an exception.
        try {
            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    return;
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}

检测IO事件

private void select(boolean oldWakenUp) throws IOException {

try {
    int selectCnt = 0;
    long currentTimeNanos = System.nanoTime();
    // 拿到定时任务最早的截止事件
    long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
    // 自旋select
    for (;;) {
        long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
        // 如果截止事件已经到了,定时任务要开始执行,立即结束select自旋
        // 如果之前还没有select过,那么执行selectNow,尝试看看有没有IO事件
        if (timeoutMillis <= 0) {
            if (selectCnt == 0) {
                selector.selectNow();
                selectCnt = 1;
            }
            break;
        }

        // 如果有任务需要执行,且wakenUp是false,那么立即结束select自旋
        // 否则继续自旋
        if (hasTasks() && wakenUp.compareAndSet(false, true)) {
            selector.selectNow();
            selectCnt = 1;
            break;
        }

        // 超时阻塞select
        int selectedKeys = selector.select(timeoutMillis);
        selectCnt ++;

        // 如果有IO事件或select被人唤醒或有任务或有定时任务,满足其一,那么立即结束自旋
        if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
            // - Selected something,
            // - waken up by user, or
            // - the task queue has a pending task.
            // - a scheduled task is ready for processing
            break;
        }       

        // fix jdk epoll 空轮询 bug
        long time = System.nanoTime();
        // 因为上面是带超时时间的阻塞select,那么如果执行到这里的时间差比阻塞的时间小,说明上面的select
        // 提前返回了,如果比阻塞时间大,说明上面的select起码没有提前返回,在这里我们认为它执行成功了。
        if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
            // timeoutMillis elapsed without anything selected.
            selectCnt = 1;
            // 如果提前返回的次数超过一定阈值,那么说明空轮询的bug重现,我们需要rebuild下selector
            // 来规避
        } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
            // The selector returned prematurely many times in a row.
            // Rebuild the selector to work around the problem.    

            rebuildSelector();
            selector = this.selector;

            // Select again to populate selectedKeys.
            selector.selectNow();
            selectCnt = 1;
            break;
        }

        currentTimeNanos = time;
    }

} catch (CancelledKeyException e) {
    ...
}
}

重建selector

public void rebuildSelector() {
    // 线程安全,必须在NioEventLoop绑定的线程中执行
    if (!inEventLoop()) {
        execute(new Runnable() {
            @Override
            public void run() {
                rebuildSelector();
            }
        });
        return;
    }

    final Selector oldSelector = selector;
    final Selector newSelector;

    if (oldSelector == null) {
        return;
    }

    try {
        // 创建新的selector
        newSelector = openSelector();
    } catch (Exception e) {
        ...
    }

    // Register all channels to the new Selector.
    int nChannels = 0;
    for (;;) {
        try {
            // 拿到所有旧的key
            for (SelectionKey key: oldSelector.keys()) {
                // 以及key的attachment,也就是Netty封装的AbstractNioChannel
                Object a = key.attachment();
                try {
                    // 如果key无效或key已经跟新的selector绑定,那么continue
                    if (!key.isValid() || key.channel().keyFor(newSelector) != null) {
                        continue;
                    }
                    
                    // 拿到key的interestOps
                    int interestOps = key.interestOps();
                    // 取消监听
                    key.cancel();
                    //将新的selector和channel绑定,并注册原有的IO事件
                    SelectionKey newKey = key.channel().register(newSelector, interestOps, a);
                    if (a instanceof AbstractNioChannel) {
                        // Update SelectionKey
                        ((AbstractNioChannel) a).selectionKey = newKey;
                    }
                    nChannels ++;
                } catch (Exception e) {
                   ...
                }
            }
        } catch (ConcurrentModificationException e) {        
            continue;
        }

        break;
    }

    // 替换NioEventLoop的selector
    selector = newSelector;

    try {        
        // close老的selector
        oldSelector.close();
    } catch (Throwable t) {
        ...
    }
}

处理IO事件

private void processSelectedKeys() {
    if (selectedKeys != null) {
        // 通过SelectedSelectionKeySet的flip来获取selectedKeys的数组
        processSelectedKeysOptimized(selectedKeys.flip());
    } else {
        // 无优化,直接来
        processSelectedKeysPlain(selector.selectedKeys());
    }
}
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
    // 遍历selectedKeys数组
    for (int i = 0;; i ++) {
        final SelectionKey k = selectedKeys[i];
        if (k == null) {
            break;
        }
        // 拿到的同时,去除数组元素的引用,GC有利
        selectedKeys[i] = null;

        final Object a = k.attachment();

        if (a instanceof AbstractNioChannel) {
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            @SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }

        if (needsToSelectAgain) {
            for (;;) {
                i++;
                if (selectedKeys[i] == null) {
                    break;
                }
                selectedKeys[i] = null;
            }

            selectAgain();

            selectedKeys = this.selectedKeys.flip();
            i = -1;
        }
    }
}

processSelectedKey

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    // 拿到当前channel绑定的unsafe
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    if (!k.isValid()) {
       ...
    }

    try {
        // 拿到selectedkey就绪的事件
        int readyOps = k.readyOps();
        // 如果是CONNECT事件
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            // 不再关心CONNECT事件,调用finishConnect,代表连接成功。
            // SelectionKey.OP_CONNECT连接事件是只需要处理一次的事件,一旦连接建立完成,
            // 就可以进行读、写操作了。
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);

            unsafe.finishConnect();
        }

        // 如果是WRITE事件
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
            ch.unsafe().forceFlush();
        }

        // 如果是READ事件或ACCEPT
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
            if (!ch.isOpen()) {
                // Connection already closed - no need to handle write.
                return;
            }
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

任务执行

protected boolean runAllTasks(long timeoutNanos) {
    // 这里如果能找到定时任务会加到taskqueue里面
    fetchFromScheduledTaskQueue();
    // 去taskqueue拿task
    Runnable task = pollTask();
    if (task == null) {
        afterRunningAllTasks();
        return false;
    }

    // 设置超时时间
    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
    long runTasks = 0;
    long lastExecutionTime;
    for (;;) {
        // catch住task异常,以免影响其他任务的执行
        safeExecute(task);

        runTasks ++;

        // Check timeout every 64 tasks because nanoTime() is relatively expensive.
        // XXX: Hard-coded value - will make it configurable if it is really a problem.
        // 每执行64个任务后,统计是否超时,否则结束自旋
        if ((runTasks & 0x3F) == 0) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            if (lastExecutionTime >= deadline) {
                break;
            }
        }
        
        // 自旋拿取任务
        task = pollTask();
        if (task == null) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }

    afterRunningAllTasks();
    this.lastExecutionTime = lastExecutionTime;
    return true;
}

定时任务

private boolean fetchFromScheduledTaskQueue() {
    long nanoTime = AbstractScheduledEventExecutor.nanoTime();
    // 根据时间拿到符合条件的定时任务
    Runnable scheduledTask  = pollScheduledTask(nanoTime);
    while (scheduledTask != null) {
        // 加到taskqueue,也就是普通队列中,等待被EvenLoop调用
        if (!taskQueue.offer(scheduledTask)) {
            // 否则还是加到定时任务queue里面
            scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
            return false;
        }
        // 循环poll,直到没有定时任务为止
        scheduledTask  = pollScheduledTask(nanoTime);
    }
    return true;
}

优化

final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

// 拿到SelectorImpl的class对象
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
    @Override
    public Object run() {
        try {
            return Class.forName(
                    "sun.nio.ch.SelectorImpl",
                    false,
                    PlatformDependent.getSystemClassLoader());
        } catch (ClassNotFoundException e) {
            return e;
        } catch (SecurityException e) {
            return e;
        }
    }
});

// 判断是否是nio selector对象派生的实现类
if (!(maybeSelectorImplClass instanceof Class) ||
        !((Class<?>) maybeSelectorImplClass).isAssignableFrom(selector.getClass())) {
    if (maybeSelectorImplClass instanceof Exception) {
        Exception e = (Exception) maybeSelectorImplClass;
    }
    return selector;
}

final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;

// 通过反射来替换SelectorImpl里面字段对应的HashSet类型为SelectedSelectionKeySet, 这样来达到优化的目的
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
    public Object run() {
        try {
            // 分别获取selectedKeys和publicSelectedKeys的field
            Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
            Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

            selectedKeysField.setAccessible(true);
            publicSelectedKeysField.setAccessible(true);

            // 替换为SelectedSelectionKeySet
            selectedKeysField.set(selector, selectedKeySet);
            publicSelectedKeysField.set(selector, selectedKeySet);
            return null;
        } catch (NoSuchFieldException e) {
            ...
        }
    }
});

SelectedSelectionKeySet

// 名为Set,实际已经退化成一个数组。
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {

    private SelectionKey[] keysA;
    private int keysASize;
    private SelectionKey[] keysB;
    private int keysBSize;
    private boolean isA = true;

    SelectedSelectionKeySet() {
        keysA = new SelectionKey[1024];
        keysB = keysA.clone();
    }

    // 真正的hashset的add要考虑的比这个多得多
    @Override
    public boolean add(SelectionKey o) {
        if (o == null) {
            return false;
        }

        if (isA) {
            int size = keysASize;
            keysA[size ++] = o;
            keysASize = size;
            if (size == keysA.length) {
                // 扩容
                doubleCapacityA();
            }
        } else {
            int size = keysBSize;
            keysB[size ++] = o;
            keysBSize = size;
            if (size == keysB.length) {
                doubleCapacityB();
            }
        }

        return true;
    }

    SelectionKey[] flip() {
        if (isA) {
            isA = false;
            keysA[keysASize] = null;
            keysBSize = 0;
            return keysA;
        } else {
            isA = true;
            keysB[keysBSize] = null;
            keysASize = 0;
            return keysB;
        }
    }

    @Override
    public int size() {
        if (isA) {
            return keysASize;
        } else {
            return keysBSize;
        }
    }
}

相关文章

网友评论

      本文标题:NioEventLoop

      本文链接:https://www.haomeiwen.com/subject/yercdctx.html