JDK Future接口
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
-
idDone()方法返回 true 的情况:- 成功完成
- 取消
- 发生异常
-
get()是阻塞方法,会等待完成。
FutureTask类
FutureTask类实现了RunnableFuture接口,该接口即继承了Future接口,又继承了Runnable接口,代表一个有返回结果的、可执行的任务。
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
FutureTask类的构造函数支持Runnable和Callable接口的实现类,其中Runnable实例通过工具类Executors.callable方法转换为Callable实例,并赋值给实例变量callable。
public class FutureTask<V> implements RunnableFuture<V> {
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
private Callable<V> callable;
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
}
FutureTask的实例可以提交到ExecutorService中执行。例如:
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<Integer> future = executorService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return 1 + 2;
}
});
System.out.println(future.get());
}
RunableAdapter的适配器模式
看一下Executors.callable的实现,创建了一个RunnableAdapter实例:
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
RunnableAdapter类是工具栏Executors的静态内部类,实现了Callable接口定义的call方法。它持有Runnable类型的任务task对象和返回结果result:
static final class RunnableAdapter<T> implements Callable<T> {
//持有目标对象
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
call方法的执行逻辑是调用task对象的run方法,然后将传入的结果result返回。
Netty提供的Future接口
Netty的Future接口继承了JDK的Future接口,同时提供了更多的方法:
public interface Future<V> extends java.util.concurrent.Future<V> {
boolean isSuccess();
Throwable cause();
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> sync() throws InterruptedException;
Future<V> await() throws InterruptedException;
V getNow();
}
-
任务成功完成后
isSuccess()返回true -
任务执行过程中有异常,
cause()会返回异常对象 -
任务被取消执行,父接口方法
isCancelled返回true -
以上3种情况
isDone()均为true//任务完成 if (task.isDone()) { if (task.isSuccess()) { // 成功 } else if (task.isCancelled()) { // 被取消 } else { // 异常 System.out.print(task.cause()) } } -
await和sync都会阻塞,并等待任务完成 -
getNow()不会阻塞,会立即返回,但任务尚未执行完成时,会返回null -
addListener方法在当前Future对象中添加监听器,当任务完成时,会通知所有的监听器。
ChannelFuture接口
ChannelFuture继承了Netty的Future接口,代表 Netty channel的I/O操作的执行结果。在Netty中所有的I/O操作都是异步的,会立即返回一个代表I/O操作的结果,即ChannelFuture。
在获得执行结果时,推荐使用添加监听器,监听执行完成事件
operaionCompleted,而不要使用await方法。
public interface GenericFutureListener<F extends Future<?>> extends EventListener {
//当任务完成时,会被调用
void operationComplete(F future) throws Exception;
}
不能在
ChannelHandler中调用await,会造成死锁。因为ChannelHandler中的方法通常是I/O线程调用的,再调用await会造成I/O阻塞。
//错误
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ChannelFuture future = ctx.channel().close();
future.awaitUninterruptibly();
// Perform post-closure operation
// ...
}
// 正确
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ChannelFuture future = ctx.channel().close();
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
// Perform post-closure operation
// ...
}
});
}
即使是通过添加
ChannelFutureListener的方式获取执行结果,但要注意的是:回调方法operationComplete也是由I/O线程调用的,所以也不能在其中执行耗时任务。如必须,则启用线程池执行。
ChannelFuture channelFuture = serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ServerInitializer())
.bind(8899)
.sync();
bind方法是异步的,其返回值是ChannelFuture类型。需要调用sync()同步方法,等待绑定动作执行完成。












网友评论