0

0

Java8 CompletableFuture 如何实现异步多线程编程?

WBOY

WBOY

发布时间:2023-04-27 08:22:06

|

3460人浏览过

|

来源于亿速云

转载

1、一个示例回顾Future

一些业务场景我们需要使用多线程异步执行任务,加快任务执行速度。

JDK5新增了Future接口,用于描述一个异步计算的结果。

虽然 Future 以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,我们必须使用Future.get()的方式阻塞调用线程,或者使用轮询方式判断 Future.isDone 任务是否结束,再获取结果。

这两种处理方式都不是很优雅,相关代码如下:

    @Test
    public void testFuture() throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        Future future = executorService.submit(() -> {
            Thread.sleep(2000);
            return "hello";
        });
        System.out.println(future.get());
        System.out.println("end");
    }

与此同时,Future无法解决多个异步任务需要相互依赖的场景,简单点说就是,主线程需要等待子线程任务执行完毕之后在进行执行,这个时候你可能想到了「CountDownLatch」,没错确实可以解决,代码如下。

立即学习Java免费学习笔记(深入)”;

这里定义两个Future,第一个通过用户id获取用户信息,第二个通过商品id获取商品信息。

    @Test
    public void testCountDownLatch() throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        CountDownLatch downLatch = new CountDownLatch(2);
        long startTime = System.currentTimeMillis();
        Future userFuture = executorService.submit(() -> {
            //模拟查询商品耗时500毫秒
            Thread.sleep(500);
            downLatch.countDown();
            return "用户A";
        });
 
        Future goodsFuture = executorService.submit(() -> {
            //模拟查询商品耗时500毫秒
            Thread.sleep(400);
            downLatch.countDown();
            return "商品A";
        });
 
        downLatch.await();
        //模拟主程序耗时时间
        Thread.sleep(600);
        System.out.println("获取用户信息:" + userFuture.get());
        System.out.println("获取商品信息:" + goodsFuture.get());
        System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");
 
    }

「运行结果」

获取用户信息:用户A获取商品信息:商品A总共用时1110ms

从运行结果可以看出结果都已经获取,而且如果我们不用异步操作,执行时间应该是:500+400+600 = 1500,用异步操作后实际只用1110。

但是Java8以后我不在认为这是一种优雅的解决方式,接下来来了解下CompletableFuture的使用。

2、通过CompletableFuture实现上面示例

    @Test
    public void testCompletableInfo() throws InterruptedException, ExecutionException {
        long startTime = System.currentTimeMillis();
 
        //调用用户服务获取用户基本信息
        CompletableFuture userFuture = CompletableFuture.supplyAsync(() ->
                //模拟查询商品耗时500毫秒
        {
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "用户A";
        });
 
        //调用商品服务获取商品基本信息
        CompletableFuture goodsFuture = CompletableFuture.supplyAsync(() ->
                //模拟查询商品耗时500毫秒
        {
            try {
                Thread.sleep(400);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "商品A";
        });
 
        System.out.println("获取用户信息:" + userFuture.get());
        System.out.println("获取商品信息:" + goodsFuture.get());
 
        //模拟主程序耗时时间
        Thread.sleep(600);
        System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");
    }

运行结果

获取用户信息:用户A
获取商品信息:商品A
总共用时1112ms

通过CompletableFuture可以很轻松的实现CountDownLatch的功能,你以为这就结束了,远远不止,CompletableFuture比这要强多了。

比如可以实现:任务1执行完了再执行任务2,甚至任务1执行的结果,作为任务2的入参数等等强大功能,下面就来学学CompletableFuture的API。

3、CompletableFuture创建方式

3.1、常用的4种创建方式

CompletableFuture源码中有四个静态方法用来执行异步任务

public static  CompletableFuture supplyAsync(Supplier supplier){..}
public static  CompletableFuture supplyAsync(Supplier supplier,Executor executor){..}
public static CompletableFuture runAsync(Runnable runnable){..}
public static CompletableFuture runAsync(Runnable runnable,Executor executor){..}

一般我们用上面的静态方法来创建CompletableFuture,这里也解释下他们的区别:

  • 「supplyAsync」执行任务,支持返回值。

  • 「runAsync」执行任务,没有返回值。

3.1.1、「supplyAsync方法」

//使用默认内置线程池ForkJoinPool.commonPool(),根据supplier构建执行任务
public static  CompletableFuture supplyAsync(Supplier supplier)
//自定义线程,根据supplier构建执行任务
public static  CompletableFuture supplyAsync(Supplier supplier, Executor executor)

3.1.2、「runAsync方法」

//使用默认内置线程池ForkJoinPool.commonPool(),根据runnable构建执行任务
public static CompletableFuture runAsync(Runnable runnable) 
//自定义线程,根据runnable构建执行任务
public static CompletableFuture runAsync(Runnable runnable,  Executor executor)

3.2、结果获取的4种方式

对于结果的获取CompltableFuture类提供了四种方式

//方式一
public T get()
//方式二
public T get(long timeout, TimeUnit unit)
//方式三
public T getNow(T valueIfAbsent)
//方式四
public T join()

说明:

  • 「get()和get(long timeout, TimeUnit unit)」 => 在Future中就已经提供了,后者提供超时处理,如果在指定时间内未获取结果将抛出超时异常

  • 「getNow」 => 立即获取结果不阻塞,结果计算已完成将返回结果或计算过程中的异常,如果未计算完成将返回设定的valueIfAbsent值

  • 「join」 => 方法里不会抛出异常

示例

    @Test
    public void testCompletableGet() throws InterruptedException, ExecutionException {
 
        CompletableFuture cp1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "商品A";
        });
 
        // getNow方法测试 
        System.out.println(cp1.getNow("商品B"));
 
        //join方法测试 
        CompletableFuture cp2 = CompletableFuture.supplyAsync((() -> 1 / 0));
        System.out.println(cp2.join());
        System.out.println("-----------------------------------------------------");
        //get方法测试
        CompletableFuture cp3 = CompletableFuture.supplyAsync((() -> 1 / 0));
        System.out.println(cp3.get());
    }

「运行结果」:

  • 第一个执行结果为 「商品B」,因为要先睡上1秒结果不能立即获取

  • join方法获取结果方法里不会抛异常,但是执行结果会抛异常,抛出的异常为CompletionException

  • get方法获取结果方法里将抛出异常,执行结果抛出的异常为ExecutionException

4、异步回调方法

Java8 CompletableFuture异步多线程怎么实现

4.1、thenRun/thenRunAsync

通俗点讲就是,「做完第一个任务后,再做第二个任务,第二个任务也没有返回值」

示例

    @Test
    public void testCompletableThenRunAsync() throws InterruptedException, ExecutionException {
        long startTime = System.currentTimeMillis();
 
        CompletableFuture cp1 = CompletableFuture.runAsync(() -> {
            try {
                //执行任务A
                Thread.sleep(600);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
 
        });
 
        CompletableFuture cp2 = cp1.thenRun(() -> {
            try {
                //执行任务B
                Thread.sleep(400);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
 
        // get方法测试
        System.out.println(cp2.get());
 
        //模拟主程序耗时时间
        Thread.sleep(600);
        System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");
    }
 
    //运行结果
    /**
     *  null
     *  总共用时1610ms
     */

「thenRun 和thenRunAsync有什么区别呢?」

如果你执行第一个任务的时候,传入了一个自定义线程池:

  • 调用thenRun方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池。

  • 调用thenRunAsync执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池。

说明: 后面介绍的thenAccept和thenAcceptAsync,thenApply和thenApplyAsync等,它们之间的区别也是这个。

4.2、thenAccept/thenAcceptAsync

第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,但是回调方法是没有返回值的。

示例

    @Test
    public void testCompletableThenAccept() throws ExecutionException, InterruptedException {
        long startTime = System.currentTimeMillis();
        CompletableFuture cp1 = CompletableFuture.supplyAsync(() -> {
            return "dev";
 
        });
        CompletableFuture cp2 = cp1.thenAccept((a) -> {
            System.out.println("上一个任务的返回结果为: " + a);
        });
 
        cp2.get();
    }

4.3、 thenApply/thenApplyAsync

表示第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,并且回调方法是有返回值的。

示例

    @Test
    public void testCompletableThenApply() throws ExecutionException, InterruptedException {
        CompletableFuture cp1 = CompletableFuture.supplyAsync(() -> {
            return "dev";
 
        }).thenApply((a) -> {
            if (Objects.equals(a, "dev")) {
                return "dev";
            }
            return "prod";
        });
 
        System.out.println("当前环境为:" + cp1.get());
 
        //输出: 当前环境为:dev
    }

5、异常回调

当CompletableFuture的任务不论是正常完成还是出现异常它都会调用「whenComplete」这回调函数。

  • 「正常完成」:whenComplete返回结果和上级任务一致,异常为null;

  • 「出现异常」:whenComplete返回结果为null,异常为上级任务的异常;

    Vondy
    Vondy

    下一代AI应用平台,汇集了一流的工具/应用程序

    下载

即调用get()时,正常完成时就获取到结果,出现异常时就会抛出异常,需要你处理该异常。

下面来看看示例

5.1、只用whenComplete

    @Test
    public void testCompletableWhenComplete() throws ExecutionException, InterruptedException {
        CompletableFuture future = CompletableFuture.supplyAsync(() -> {
 
            if (Math.random() < 0.5) {
                throw new RuntimeException("出错了");
            }
            System.out.println("正常结束");
            return 0.11;
 
        }).whenComplete((aDouble, throwable) -> {
            if (aDouble == null) {
                System.out.println("whenComplete aDouble is null");
            } else {
                System.out.println("whenComplete aDouble is " + aDouble);
            }
            if (throwable == null) {
                System.out.println("whenComplete throwable is null");
            } else {
                System.out.println("whenComplete throwable is " + throwable.getMessage());
            }
        });
        System.out.println("最终返回的结果 = " + future.get());
    }

正常完成,没有异常时:

正常结束
whenComplete aDouble is 0.11
whenComplete throwable is null
最终返回的结果 = 0.11

出现异常时:get()会抛出异常

whenComplete aDouble is null
whenComplete throwable is java.lang.RuntimeException: 出错了
 
java.util.concurrent.ExecutionException: java.lang.RuntimeException: 出错了
 at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
 at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)

5.2、whenComplete + exceptionally示例

    @Test
    public void testWhenCompleteExceptionally() throws ExecutionException, InterruptedException {
        CompletableFuture future = CompletableFuture.supplyAsync(() -> {
            if (Math.random() < 0.5) {
                throw new RuntimeException("出错了");
            }
            System.out.println("正常结束");
            return 0.11;
 
        }).whenComplete((aDouble, throwable) -> {
            if (aDouble == null) {
                System.out.println("whenComplete aDouble is null");
            } else {
                System.out.println("whenComplete aDouble is " + aDouble);
            }
            if (throwable == null) {
                System.out.println("whenComplete throwable is null");
            } else {
                System.out.println("whenComplete throwable is " + throwable.getMessage());
            }
        }).exceptionally((throwable) -> {
            System.out.println("exceptionally中异常:" + throwable.getMessage());
            return 0.0;
        });
 
        System.out.println("最终返回的结果 = " + future.get());
    }

当出现异常时,exceptionally中会捕获该异常,给出默认返回值0.0。

whenComplete aDouble is null
whenComplete throwable is java.lang.RuntimeException: 出错了
exceptionally中异常:java.lang.RuntimeException: 出错了
最终返回的结果 = 0.0

6、多任务组合回调

Java8 CompletableFuture异步多线程怎么实现

6.1、AND组合关系

thenCombine / thenAcceptBoth / runAfterBoth都表示:「当任务一和任务二都完成再执行任务三」

区别在于:

  • 「runAfterBoth」 不会把执行结果当做方法入参,且没有返回值

  • 「thenAcceptBoth」: 会将两个任务的执行结果作为方法入参,传递到指定方法中,且无返回值

  • 「thenCombine」:会将两个任务的执行结果作为方法入参,传递到指定方法中,且有返回值

示例

    @Test
    public void testCompletableThenCombine() throws ExecutionException, InterruptedException {
        //创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        //开启异步任务1
        CompletableFuture task = CompletableFuture.supplyAsync(() -> {
            System.out.println("异步任务1,当前线程是:" + Thread.currentThread().getId());
            int result = 1 + 1;
            System.out.println("异步任务1结束");
            return result;
        }, executorService);
 
        //开启异步任务2
        CompletableFuture task2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("异步任务2,当前线程是:" + Thread.currentThread().getId());
            int result = 1 + 1;
            System.out.println("异步任务2结束");
            return result;
        }, executorService);
 
        //任务组合
        CompletableFuture task3 = task.thenCombineAsync(task2, (f1, f2) -> {
            System.out.println("执行任务3,当前线程是:" + Thread.currentThread().getId());
            System.out.println("任务1返回值:" + f1);
            System.out.println("任务2返回值:" + f2);
            return f1 + f2;
        }, executorService);
 
        Integer res = task3.get();
        System.out.println("最终结果:" + res);
    }

「运行结果」

异步任务1,当前线程是:17
异步任务1结束
异步任务2,当前线程是:18
异步任务2结束
执行任务3,当前线程是:19
任务1返回值:2
任务2返回值:2
最终结果:4

6.2、OR组合关系

applyToEither / acceptEither / runAfterEither 都表示:「两个任务,只要有一个任务完成,就执行任务三」

区别在于:

  • 「runAfterEither」:不会把执行结果当做方法入参,且没有返回值

  • 「acceptEither」: 会将已经执行完成的任务,作为方法入参,传递到指定方法中,且无返回值

  • 「applyToEither」:会将已经执行完成的任务,作为方法入参,传递到指定方法中,且有返回值

示例

    @Test
    public void testCompletableEitherAsync() {
        //创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        //开启异步任务1
        CompletableFuture task = CompletableFuture.supplyAsync(() -> {
            System.out.println("异步任务1,当前线程是:" + Thread.currentThread().getId());
 
            int result = 1 + 1;
            System.out.println("异步任务1结束");
            return result;
        }, executorService);
 
        //开启异步任务2
        CompletableFuture task2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("异步任务2,当前线程是:" + Thread.currentThread().getId());
            int result = 1 + 2;
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("异步任务2结束");
            return result;
        }, executorService);
 
        //任务组合
        task.acceptEitherAsync(task2, (res) -> {
            System.out.println("执行任务3,当前线程是:" + Thread.currentThread().getId());
            System.out.println("上一个任务的结果为:" + res);
        }, executorService);
    }

运行结果

//通过结果可以看出,异步任务2都没有执行结束,任务3获取的也是1的执行结果
异步任务1,当前线程是:17
异步任务1结束
异步任务2,当前线程是:18
执行任务3,当前线程是:19
上一个任务的结果为:2

注意

如果把上面的核心线程数改为1也就是

 ExecutorService executorService = Executors.newFixedThreadPool(1);

运行结果就是下面的了,会发现根本没有执行任务3,显然是任务3直接被丢弃了。

异步任务1,当前线程是:17
异步任务1结束
异步任务2,当前线程是:17

6.3、多任务组合

  • 「allOf」:等待所有任务完成

  • 「anyOf」:只要有一个任务完成

示例

allOf:等待所有任务完成

    @Test
    public void testCompletableAallOf() throws ExecutionException, InterruptedException {
        //创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        //开启异步任务1
        CompletableFuture task = CompletableFuture.supplyAsync(() -> {
            System.out.println("异步任务1,当前线程是:" + Thread.currentThread().getId());
            int result = 1 + 1;
            System.out.println("异步任务1结束");
            return result;
        }, executorService);
 
        //开启异步任务2
        CompletableFuture task2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("异步任务2,当前线程是:" + Thread.currentThread().getId());
            int result = 1 + 2;
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("异步任务2结束");
            return result;
        }, executorService);
 
        //开启异步任务3
        CompletableFuture task3 = CompletableFuture.supplyAsync(() -> {
            System.out.println("异步任务3,当前线程是:" + Thread.currentThread().getId());
            int result = 1 + 3;
            try {
                Thread.sleep(4000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("异步任务3结束");
            return result;
        }, executorService);
 
        //任务组合
        CompletableFuture allOf = CompletableFuture.allOf(task, task2, task3);
 
        //等待所有任务完成
        allOf.get();
        //获取任务的返回结果
        System.out.println("task结果为:" + task.get());
        System.out.println("task2结果为:" + task2.get());
        System.out.println("task3结果为:" + task3.get());
    }

anyOf: 只要有一个任务完成

    @Test
    public void testCompletableAnyOf() throws ExecutionException, InterruptedException {
        //创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        //开启异步任务1
        CompletableFuture task = CompletableFuture.supplyAsync(() -> {
            int result = 1 + 1;
            return result;
        }, executorService);
 
        //开启异步任务2
        CompletableFuture task2 = CompletableFuture.supplyAsync(() -> {
            int result = 1 + 2;
            return result;
        }, executorService);
 
        //开启异步任务3
        CompletableFuture task3 = CompletableFuture.supplyAsync(() -> {
            int result = 1 + 3;
            return result;
        }, executorService);
 
        //任务组合
        CompletableFuture anyOf = CompletableFuture.anyOf(task, task2, task3);
        //只要有一个有任务完成
        Object o = anyOf.get();
        System.out.println("完成的任务的结果:" + o);
    }

7、CompletableFuture使用有哪些注意点

Java8 CompletableFuture异步多线程怎么实现

 CompletableFuture 使我们的异步编程更加便利的、代码更加优雅的同时,我们也要关注下它,使用的一些注意点。

7.1、Future需要获取返回值,才能获取异常信息

    @Test
    public void testWhenCompleteExceptionally() {
        CompletableFuture future = CompletableFuture.supplyAsync(() -> {
            if (1 == 1) {
                throw new RuntimeException("出错了");
            }
            return 0.11;
        });
 
        //如果不加 get()方法这一行,看不到异常信息
        //future.get();
    }

Future需要获取返回值,才能获取到异常信息。如果不加 get()/join()方法,看不到异常信息。

小伙伴们使用的时候,注意一下哈,考虑是否加try...catch...或者使用exceptionally方法。

7.2、CompletableFuture的get()方法是阻塞的

CompletableFuture的get()方法是阻塞的,如果使用它来获取异步调用的返回值,需要添加超时时间。

//反例
 CompletableFuture.get();
//正例
CompletableFuture.get(5, TimeUnit.SECONDS);

7.3、不建议使用默认线程池

CompletableFuture代码中又使用了默认的「ForkJoin线程池」,处理的线程个数是电脑「CPU核数-1」。在大量请求过来的时候,处理逻辑复杂的话,响应会很慢。一般建议使用自定义线程池,优化线程池配置参数。

7.4、自定义线程池时,注意饱和策略

CompletableFuture的get()方法是阻塞的,我们一般建议使用future.get(5, TimeUnit.SECONDS)。并且一般建议使用自定义线程池。

但是如果线程池拒绝策略是DiscardPolicy或者DiscardOldestPolicy,当线程池饱和时,会直接丢弃任务,不会抛弃异常。因此建议,CompletableFuture线程池策略最好使用AbortPolicy,然后耗时的异步线程,做好线程池隔离哈。

相关专题

更多
Java 桌面应用开发(JavaFX 实战)
Java 桌面应用开发(JavaFX 实战)

本专题系统讲解 Java 在桌面应用开发领域的实战应用,重点围绕 JavaFX 框架,涵盖界面布局、控件使用、事件处理、FXML、样式美化(CSS)、多线程与UI响应优化,以及桌面应用的打包与发布。通过完整示例项目,帮助学习者掌握 使用 Java 构建现代化、跨平台桌面应用程序的核心能力。

37

2026.01.14

php与html混编教程大全
php与html混编教程大全

本专题整合了php和html混编相关教程,阅读专题下面的文章了解更多详细内容。

19

2026.01.13

PHP 高性能
PHP 高性能

本专题整合了PHP高性能相关教程大全,阅读专题下面的文章了解更多详细内容。

37

2026.01.13

MySQL数据库报错常见问题及解决方法大全
MySQL数据库报错常见问题及解决方法大全

本专题整合了MySQL数据库报错常见问题及解决方法,阅读专题下面的文章了解更多详细内容。

19

2026.01.13

PHP 文件上传
PHP 文件上传

本专题整合了PHP实现文件上传相关教程,阅读专题下面的文章了解更多详细内容。

16

2026.01.13

PHP缓存策略教程大全
PHP缓存策略教程大全

本专题整合了PHP缓存相关教程,阅读专题下面的文章了解更多详细内容。

6

2026.01.13

jQuery 正则表达式相关教程
jQuery 正则表达式相关教程

本专题整合了jQuery正则表达式相关教程大全,阅读专题下面的文章了解更多详细内容。

3

2026.01.13

交互式图表和动态图表教程汇总
交互式图表和动态图表教程汇总

本专题整合了交互式图表和动态图表的相关内容,阅读专题下面的文章了解更多详细内容。

45

2026.01.13

nginx配置文件详细教程
nginx配置文件详细教程

本专题整合了nginx配置文件相关教程详细汇总,阅读专题下面的文章了解更多详细内容。

9

2026.01.13

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Kotlin 教程
Kotlin 教程

共23课时 | 2.5万人学习

C# 教程
C# 教程

共94课时 | 6.7万人学习

Java 教程
Java 教程

共578课时 | 45.7万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号