给Future一个Promise

对java开发者来说,经常需要在一个线程中另起一个线程来异步干其他事,就涉及到熟悉的ThreadRunnable。使用方式如下:

System.out.println("Do something ...");
new Thread(new Runnable() {
    @Override
    public void run() {
        System.out.println("Async do something ....");
    }
});
System.out.println("Done  ...");

或者java8使用线程池的方式:

ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.submit(() -> {
    System.out.println("Async do something ....");
});
executorService.shutdown();

有时候,我们希望知道执行结果或者拿到线程执行的返回值,Future就是为了解决这个问题,可以判断任务(线程)是否完成,可以中断任务,可以获取执行结果。如下:

JDK实现

ExecutorService executor = Executors.newCachedThreadPool();
Future<Integer> future = executor.submit(() -> {
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return 100;
});
System.out.println("Do something ...");
try {
    while (true) {
        if (future.isDone()) {
            Integer integer = future.get();
            System.out.println("Future return value " + integer);
            break;
        }
        System.out.println("wait.....");
        Thread.sleep(500);
    }
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}
executor.shutdown();
}

Do something … wait….. wait….. wait….. wait….. Future return value 100

或者ScheduledExecutorService:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());
ScheduledFuture<?> future = executor.schedule(task, 5, TimeUnit.SECONDS);
while (true) {
    done = future.isDone();
    if (done) {
        break;
    }
    System.out.printf("Remaining Delay: %sms ",future.getDelay(TimeUnit.MILLISECONDS));
    Thread.sleep(1000);
}
executor.shutdown();

Remaining Delay: 4999ms Remaining Delay: 3973ms Remaining Delay: 2969ms Remaining Delay: 1966ms Remaining Delay: 962ms Scheduling: 148060026198219

但代码中也体现出一个问题,如果获取返回值,只能阻塞等待,或者定时查询是否执行完,增加编码的复杂度。如何做到异步非阻塞呢(线程执行完后,主动通知调用者)。

Guava 实现

GUAVA提供了一个ListenableFuture,继承JDK的Future接口,可以实现完成后出发回调,如下:

ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
ListenableFuture<Integer> future = executor.submit(() -> {
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return 100;
});
System.out.println("Do something ...");
Futures.addCallback(future, new FutureCallback<Integer>() {
    @Override
    public void onSuccess(Integer result) {
        System.out.println("Future return value " + result);
    }
    @Override
    public void onFailure(Throwable t) {

    }
});
System.out.println("... Main thread Done ...");
executor.shutdown();

Do something … … Main thread Done … Future return value 100

可以看到,要获取执行者的放回值,不需要get()阻塞获取,继续执行后续逻辑,线程结束后会通知调用者。

Netty 实现

Netty util包中也提供了异步非阻塞的实现:

EventExecutorGroup group = new DefaultEventExecutorGroup(1);
System.out.println("Do something ...");
Future<Integer> future = group.submit(() -> {
    System.out.println("Sleep......");
    Thread.sleep(1000);
    return 99;
});
future.addListener(future1 -> System.out.println("result:" + future1.get()));
System.out.println("Done ...");

Do something … Done … Sleep…… result:99

而Scala语言本身就支持Future-promise模型,可以看看这篇文章:Scala Future and Promise


TODO 从概念上好好解释

CONTENTS