
一个 EventLoop 将由一个永远都不会改变的Thread 驱动,同时任务(Runnable 或者 Callable)可以直接提交给 EventLoop 实现,以立即执行或者调度执行。根据配置和可用核心的不同,可能会创建多个 EventLoop 实例用以优化资源的使用,并且单个 EventLoop 可能会被指派用于服务多个 Channel。
EventLoop 就是一个单例的线程池,里面含有一个死循环的线程不断的做着3件事情:监听端口,处理端口事件,处理队列事件。每个 EventLoop 都可以绑定多个 Channel,而每个 Channel 始终只能由一个 EventLoop 来处理。
初始化
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
// 创建selector
selector = openSelector();
selectStrategy = strategy;
}
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
// 保存executor
this.executor = ObjectUtil.checkNotNull(executor, "executor");
// 初始化taskqueue,mpsc
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
启动
bind
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
// NioEventLoop的启动在执行AbstractBootstrap.doBind0方法时触发
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// 拿到channel绑定的NioEventLoop,然后调用execute方法来执行bind
// 这里实际内部会将传入的Runnable封装成task,加到queue里面,等待进一步处理
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
execute
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
// 当前调用线程和分配给EventLoop的线程是否一致
// 因为是启动流程,所以这里线程还没有启动,返回false
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
// 如果是,加到EventLoop里的taskqueue里,等待执行
addTask(task);
} else {
// 启动线程
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
startThread
private void doStartThread() {
// 首先这里的executor就是ThreadPerTaskExecutor
// 而execute就会启动一个新的线程来执行Runnable
executor.execute(new Runnable() {
@Override
public void run() {
// 而这里就是设置NioEventLoop的绑定线程
thread = Thread.currentThread();
...
try {
// 调用NioEventLoop的run方法
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
...
} finally {
...
}
}
});
}
执行
protected void run() {
for (;;) {
try {
// 检测IO事件
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
// fallthrough
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
// 处理IO事件
processSelectedKeys();
} finally {
// Ensure we always run tasks.
// 执行reactor线程任务
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
// 处理IO事件
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
// 执行reactor线程任务
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
检测IO事件
private void select(boolean oldWakenUp) throws IOException {
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
// 拿到定时任务最早的截止事件
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
// 自旋select
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
// 如果截止事件已经到了,定时任务要开始执行,立即结束select自旋
// 如果之前还没有select过,那么执行selectNow,尝试看看有没有IO事件
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
// 如果有任务需要执行,且wakenUp是false,那么立即结束select自旋
// 否则继续自旋
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
// 超时阻塞select
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
// 如果有IO事件或select被人唤醒或有任务或有定时任务,满足其一,那么立即结束自旋
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
}
// fix jdk epoll 空轮询 bug
long time = System.nanoTime();
// 因为上面是带超时时间的阻塞select,那么如果执行到这里的时间差比阻塞的时间小,说明上面的select
// 提前返回了,如果比阻塞时间大,说明上面的select起码没有提前返回,在这里我们认为它执行成功了。
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
// 如果提前返回的次数超过一定阈值,那么说明空轮询的bug重现,我们需要rebuild下selector
// 来规避
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
rebuildSelector();
selector = this.selector;
// Select again to populate selectedKeys.
selector.selectNow();
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
} catch (CancelledKeyException e) {
...
}
}
重建selector
public void rebuildSelector() {
// 线程安全,必须在NioEventLoop绑定的线程中执行
if (!inEventLoop()) {
execute(new Runnable() {
@Override
public void run() {
rebuildSelector();
}
});
return;
}
final Selector oldSelector = selector;
final Selector newSelector;
if (oldSelector == null) {
return;
}
try {
// 创建新的selector
newSelector = openSelector();
} catch (Exception e) {
...
}
// Register all channels to the new Selector.
int nChannels = 0;
for (;;) {
try {
// 拿到所有旧的key
for (SelectionKey key: oldSelector.keys()) {
// 以及key的attachment,也就是Netty封装的AbstractNioChannel
Object a = key.attachment();
try {
// 如果key无效或key已经跟新的selector绑定,那么continue
if (!key.isValid() || key.channel().keyFor(newSelector) != null) {
continue;
}
// 拿到key的interestOps
int interestOps = key.interestOps();
// 取消监听
key.cancel();
//将新的selector和channel绑定,并注册原有的IO事件
SelectionKey newKey = key.channel().register(newSelector, interestOps, a);
if (a instanceof AbstractNioChannel) {
// Update SelectionKey
((AbstractNioChannel) a).selectionKey = newKey;
}
nChannels ++;
} catch (Exception e) {
...
}
}
} catch (ConcurrentModificationException e) {
continue;
}
break;
}
// 替换NioEventLoop的selector
selector = newSelector;
try {
// close老的selector
oldSelector.close();
} catch (Throwable t) {
...
}
}
处理IO事件
private void processSelectedKeys() {
if (selectedKeys != null) {
// 通过SelectedSelectionKeySet的flip来获取selectedKeys的数组
processSelectedKeysOptimized(selectedKeys.flip());
} else {
// 无优化,直接来
processSelectedKeysPlain(selector.selectedKeys());
}
}
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
// 遍历selectedKeys数组
for (int i = 0;; i ++) {
final SelectionKey k = selectedKeys[i];
if (k == null) {
break;
}
// 拿到的同时,去除数组元素的引用,GC有利
selectedKeys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
for (;;) {
i++;
if (selectedKeys[i] == null) {
break;
}
selectedKeys[i] = null;
}
selectAgain();
selectedKeys = this.selectedKeys.flip();
i = -1;
}
}
}
processSelectedKey
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
// 拿到当前channel绑定的unsafe
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
...
}
try {
// 拿到selectedkey就绪的事件
int readyOps = k.readyOps();
// 如果是CONNECT事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// 不再关心CONNECT事件,调用finishConnect,代表连接成功。
// SelectionKey.OP_CONNECT连接事件是只需要处理一次的事件,一旦连接建立完成,
// 就可以进行读、写操作了。
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// 如果是WRITE事件
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// 如果是READ事件或ACCEPT
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
任务执行
protected boolean runAllTasks(long timeoutNanos) {
// 这里如果能找到定时任务会加到taskqueue里面
fetchFromScheduledTaskQueue();
// 去taskqueue拿task
Runnable task = pollTask();
if (task == null) {
afterRunningAllTasks();
return false;
}
// 设置超时时间
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
// catch住task异常,以免影响其他任务的执行
safeExecute(task);
runTasks ++;
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
// 每执行64个任务后,统计是否超时,否则结束自旋
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
// 自旋拿取任务
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
定时任务
private boolean fetchFromScheduledTaskQueue() {
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
// 根据时间拿到符合条件的定时任务
Runnable scheduledTask = pollScheduledTask(nanoTime);
while (scheduledTask != null) {
// 加到taskqueue,也就是普通队列中,等待被EvenLoop调用
if (!taskQueue.offer(scheduledTask)) {
// 否则还是加到定时任务queue里面
scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
// 循环poll,直到没有定时任务为止
scheduledTask = pollScheduledTask(nanoTime);
}
return true;
}
优化
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
// 拿到SelectorImpl的class对象
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (ClassNotFoundException e) {
return e;
} catch (SecurityException e) {
return e;
}
}
});
// 判断是否是nio selector对象派生的实现类
if (!(maybeSelectorImplClass instanceof Class) ||
!((Class<?>) maybeSelectorImplClass).isAssignableFrom(selector.getClass())) {
if (maybeSelectorImplClass instanceof Exception) {
Exception e = (Exception) maybeSelectorImplClass;
}
return selector;
}
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
// 通过反射来替换SelectorImpl里面字段对应的HashSet类型为SelectedSelectionKeySet, 这样来达到优化的目的
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
public Object run() {
try {
// 分别获取selectedKeys和publicSelectedKeys的field
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
selectedKeysField.setAccessible(true);
publicSelectedKeysField.setAccessible(true);
// 替换为SelectedSelectionKeySet
selectedKeysField.set(selector, selectedKeySet);
publicSelectedKeysField.set(selector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
...
}
}
});
SelectedSelectionKeySet
// 名为Set,实际已经退化成一个数组。
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
private SelectionKey[] keysA;
private int keysASize;
private SelectionKey[] keysB;
private int keysBSize;
private boolean isA = true;
SelectedSelectionKeySet() {
keysA = new SelectionKey[1024];
keysB = keysA.clone();
}
// 真正的hashset的add要考虑的比这个多得多
@Override
public boolean add(SelectionKey o) {
if (o == null) {
return false;
}
if (isA) {
int size = keysASize;
keysA[size ++] = o;
keysASize = size;
if (size == keysA.length) {
// 扩容
doubleCapacityA();
}
} else {
int size = keysBSize;
keysB[size ++] = o;
keysBSize = size;
if (size == keysB.length) {
doubleCapacityB();
}
}
return true;
}
SelectionKey[] flip() {
if (isA) {
isA = false;
keysA[keysASize] = null;
keysBSize = 0;
return keysA;
} else {
isA = true;
keysB[keysBSize] = null;
keysASize = 0;
return keysB;
}
}
@Override
public int size() {
if (isA) {
return keysASize;
} else {
return keysBSize;
}
}
}
网友评论