对java开发者来说,经常需要在一个线程中另起一个线程来异步干其他事,就涉及到熟悉的Thread
和Runnable
。使用方式如下:
System.out.println("Do something ...");
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("Async do something ....");
}
});
System.out.println("Done ...");
或者java8使用线程池的方式:
ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.submit(() -> {
System.out.println("Async do something ....");
});
executorService.shutdown();
有时候,我们希望知道执行结果或者拿到线程执行的返回值,Future就是为了解决这个问题,可以判断任务(线程)是否完成,可以中断任务,可以获取执行结果。如下:
JDK实现
ExecutorService executor = Executors.newCachedThreadPool();
Future<Integer> future = executor.submit(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 100;
});
System.out.println("Do something ...");
try {
while (true) {
if (future.isDone()) {
Integer integer = future.get();
System.out.println("Future return value " + integer);
break;
}
System.out.println("wait.....");
Thread.sleep(500);
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
executor.shutdown();
}
Do something … wait….. wait….. wait….. wait….. Future return value 100
或者ScheduledExecutorService:
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());
ScheduledFuture<?> future = executor.schedule(task, 5, TimeUnit.SECONDS);
while (true) {
done = future.isDone();
if (done) {
break;
}
System.out.printf("Remaining Delay: %sms ",future.getDelay(TimeUnit.MILLISECONDS));
Thread.sleep(1000);
}
executor.shutdown();
Remaining Delay: 4999ms Remaining Delay: 3973ms Remaining Delay: 2969ms Remaining Delay: 1966ms Remaining Delay: 962ms Scheduling: 148060026198219
但代码中也体现出一个问题,如果获取返回值,只能阻塞等待,或者定时查询是否执行完,增加编码的复杂度。如何做到异步非阻塞呢(线程执行完后,主动通知调用者)。
Guava 实现
GUAVA提供了一个ListenableFuture
,继承JDK的Future接口,可以实现完成后出发回调,如下:
ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
ListenableFuture<Integer> future = executor.submit(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 100;
});
System.out.println("Do something ...");
Futures.addCallback(future, new FutureCallback<Integer>() {
@Override
public void onSuccess(Integer result) {
System.out.println("Future return value " + result);
}
@Override
public void onFailure(Throwable t) {
}
});
System.out.println("... Main thread Done ...");
executor.shutdown();
Do something … … Main thread Done … Future return value 100
可以看到,要获取执行者的放回值,不需要get()
阻塞获取,继续执行后续逻辑,线程结束后会通知调用者。
Netty 实现
Netty util包中也提供了异步非阻塞的实现:
EventExecutorGroup group = new DefaultEventExecutorGroup(1);
System.out.println("Do something ...");
Future<Integer> future = group.submit(() -> {
System.out.println("Sleep......");
Thread.sleep(1000);
return 99;
});
future.addListener(future1 -> System.out.println("result:" + future1.get()));
System.out.println("Done ...");
Do something … Done … Sleep…… result:99
而Scala语言本身就支持Future-promise模型,可以看看这篇文章:Scala Future and Promise
TODO 从概念上好好解释