0

0

Java中利用CompletableFuture高效并行处理大型列表数据

心靈之曲

心靈之曲

发布时间:2025-07-28 15:42:18

|

1056人浏览过

|

来源于php中文网

原创

Java中利用CompletableFuture高效并行处理大型列表数据

本文深入探讨了在Java中如何利用CompletableFuture和ExecutorService高效并行处理大型列表数据。针对将耗时操作并行化的常见需求,文章分析了在并行处理中可能遇到的陷阱,特别是过早调用CompletableFuture::join导致任务串行执行的问题。通过提供正确的并行处理策略和示例代码,指导读者实现真正的并发执行,并有效聚合结果,从而显著提升数据处理性能。

1. 引言:并行处理大型列表的必要性

在现代数据处理场景中,我们经常需要对包含数万甚至数十万条记录的大型列表执行耗时操作,例如网络请求、数据库查询、复杂计算或文件i/o。如果采用传统的顺序处理方式,即使单条记录的处理时间很短,累积起来也可能导致整个流程耗时数小时,严重影响系统吞吐量和用户体验。

Java 8引入的CompletableFuture为异步编程和并行处理提供了强大的支持,它能够帮助我们有效地将这些耗时任务分解并并行执行,从而显著缩短总处理时间。然而,不恰当的使用方式也可能导致并行能力无法充分发挥,甚至退化为串行执行。

2. 初始尝试与常见陷阱分析

许多开发者在尝试使用CompletableFuture进行并行处理时,可能会遇到一个常见问题:尽管代码看起来像是并行的,但实际执行却仍然是串行的。以下是一个典型的错误示例:

import com.google.common.collect.Lists;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

// 假设存在以下辅助类和方法
// class ListItem { /* ... */ }
// class ProcessResult { /* ... */ }
// class OutputBean { /* ... */ }
// class MyService { public Optional methodA(ListItem item) { /* ... */ } }
// class MyProcessor {
//     private MyService service = new MyService();
//     private OutputBean mapToBean(ProcessResult result, ListItem originalItem) { /* ... */ }
//     public List executeListPart(List subList) {
//         return subList.stream()
//                       .map(listItem -> service.methodA(listItem)
//                                               .map(result -> mapToBean(result, listItem)))
//                       .flatMap(Optional::stream)
//                       .collect(Collectors.toList());
//     }
// }

public class ParallelProcessingIncorrect {

    // 假设这是您的列表和处理器实例
    private static List largeList = /* 初始化一个包含50k ListItem的列表 */;
    private static MyProcessor processor = new MyProcessor();

    public static void main(String[] args) {
        int noOfCores = Runtime.getRuntime().availableProcessors();
        ExecutorService service = Executors.newFixedThreadPool(noOfCores - 1);

        try {
            long startTime = System.currentTimeMillis();
            List results = Lists.partition(largeList, 500).stream()
                    .map(item -> CompletableFuture.supplyAsync(() -> processor.executeListPart(item), service))
                    // 核心问题:在这里调用 CompletableFuture::join
                    .map(CompletableFuture::join)
                    .flatMap(List::stream)
                    .collect(Collectors.toList());
            long endTime = System.currentTimeMillis();
            System.out.println("Incorrect approach total time: " + (endTime - startTime) + " ms");
            System.out.println("Processed " + results.size() + " items.");

        } finally {
            service.shutdown();
        }
    }
}

上述代码的问题在于 .map(CompletableFuture::join) 这一行。CompletableFuture.join() 方法是一个阻塞操作,它会等待当前 CompletableFuture 完成并返回其结果。这意味着,当 Stream 处理第一个分区的 CompletableFuture 时,它会立即阻塞并等待该分区的所有任务完成,然后才能继续处理下一个分区的 CompletableFuture。结果是,尽管每个分区内部的任务可能在单独的线程中执行,但不同分区之间的处理却是严格串行的,从而失去了并行处理的优势。

3. 正确的CompletableFuture并行处理策略

要实现真正的并行执行,关键在于将异步任务的创建和结果的等待(join)分离。我们应该首先创建并提交所有异步任务,将它们的CompletableFuture实例收集到一个列表中,然后在一个单独的步骤中等待所有这些CompletableFuture完成。

立即学习Java免费学习笔记(深入)”;

以下是正确的并行处理代码示例:

用Apache Spark进行大数据处理
用Apache Spark进行大数据处理

本文档主要讲述的是用Apache Spark进行大数据处理——第一部分:入门介绍;Apache Spark是一个围绕速度、易用性和复杂分析构建的大数据处理框架。最初在2009年由加州大学伯克利分校的AMPLab开发,并于2010年成为Apache的开源项目之一。 在这个Apache Spark文章系列的第一部分中,我们将了解到什么是Spark,它与典型的MapReduce解决方案的比较以及它如何为大数据处理提供了一套完整的工具。希望本文档会给有需要的朋友带来帮助;感

下载
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

// 辅助类定义(与上述示例相同,此处省略以保持简洁)
// class ListItem { /* ... */ }
// class ProcessResult { /* ... */ }
// class OutputBean { /* ... */ }
// class MyService { /* ... */ }
// class MyProcessor { /* ... */ }


public class ParallelProcessingCorrect {

    private static List largeList; // 假设已初始化,例如:
    static {
        largeList = new ArrayList<>();
        for (int i = 0; i < 50000; i++) {
            largeList.add(new ListItem("item_" + i));
        }
    }
    private static MyProcessor processor = new MyProcessor();

    public static void main(String[] args) throws InterruptedException {
        int noOfCores = Runtime.getRuntime().availableProcessors();
        ExecutorService service = Executors.newFixedThreadPool(noOfCores - 1); // 推荐线程池大小为核心数-1或根据IO/CPU密集型任务调整

        try {
            long startTime = System.currentTimeMillis();

            // 1. 创建并提交所有异步任务,收集CompletableFuture实例
            List>> futures = Lists.partition(largeList, 500).stream()
                    .map(itemPart -> CompletableFuture.supplyAsync(() -> processor.executeListPart(itemPart), service))
                    .collect(Collectors.toList());

            // 2. 等待所有CompletableFuture完成并获取结果
            // 使用 CompletableFuture.allOf() 可以等待所有Future完成,但其本身不返回结果
            // 更好的做法是遍历futures列表,逐个join或使用allof().join()后,再map获取结果

            // 方法一:遍历futures列表,逐个join(更直接,但仍然是顺序join)
            // List results = futures.stream()
            //                                  .map(CompletableFuture::join) // 此时join是等待所有任务提交后才开始
            //                                  .flatMap(List::stream)
            //                                  .collect(Collectors.toList());

            // 方法二:使用 CompletableFuture.allOf() 结合 thenApply/thenCompose(更优雅,推荐)
            CompletableFuture allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));

            List results = allOf.thenApply(v -> futures.stream()
                                                .map(CompletableFuture::join) // 此时所有future都已完成,join是非阻塞的
                                                .flatMap(List::stream)
                                                .collect(Collectors.toList()))
                                        .join(); // 等待所有结果收集完成

            long endTime = System.currentTimeMillis();
            System.out.println("Correct approach total time: " + (endTime - startTime) + " ms");
            System.out.println("Processed " + results.size() + " items.");

        } finally {
            // 确保线程池关闭
            service.shutdown();
            if (!service.awaitTermination(60, TimeUnit.SECONDS)) {
                System.err.println("ExecutorService did not terminate in the specified time.");
                service.shutdownNow();
            }
        }
    }
}

工作原理:

  1. 任务提交: Lists.partition(largeList, 500).stream().map(...) 这部分会遍历所有分区,并为每个分区创建一个 CompletableFuture 任务。CompletableFuture.supplyAsync() 会将任务提交给 ExecutorService 立即执行,而不会阻塞当前的流处理。
  2. Future收集: 所有的 CompletableFuture 实例被收集到一个 List>> futures 中。此时,所有任务可能已经开始并行执行了。
  3. 统一等待:
    • CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) 创建了一个新的 CompletableFuture,它会在所有传入的 CompletableFuture 都完成后才完成。
    • .thenApply(...) 定义了在 allOf 完成后执行的逻辑,即遍历 futures 列表,对每个 CompletableFuture 调用 join()。此时,由于 allOf 已经确保所有子任务都已完成,因此这些 join() 调用将是非阻塞的,能够立即获取结果。
    • 最后的 .join() 是等待整个结果聚合过程完成。

通过这种方式,所有分区的处理任务几乎同时开始执行,只有在需要聚合所有结果时,主线程才会被阻塞,从而实现了真正的并行加速。

4. ExecutorService的管理与考量

ExecutorService 是管理线程池的核心组件。在并行处理中,它的配置和生命周期管理至关重要。

  • 创建: Executors.newFixedThreadPool(noOfCores - 1) 是一个常见的选择,它创建一个固定大小的线程池。线程池的大小应根据任务类型(CPU密集型或I/O密集型)和系统可用资源进行调整。对于CPU密集型任务,通常设置为 CPU核心数 - 1 或 CPU核心数;对于I/O密集型任务,可以设置得更大,因为线程在等待I/O时不会占用CPU。
  • 生命周期: ExecutorService 是一个重量级资源,应该在不再需要时显式关闭。
    • service.shutdown():启动有序关闭,不再接受新任务,但会完成已提交的任务。
    • service.awaitTermination(timeout, unit):等待已提交任务完成,或直到超时。这通常与 shutdown() 配合使用,以确保所有任务在程序退出前完成。
    • service.shutdownNow():尝试立即停止所有正在执行的任务,并停止等待中的任务。这通常在 awaitTermination 超时后作为强制关闭的手段。

在应用程序的整个生命周期中,如果会频繁地进行并行处理,通常推荐复用同一个 ExecutorService 实例,而不是每次都创建和关闭新的实例,以减少资源开销。

5. 注意事项与性能优化

  • 任务粒度: 适当的任务分块大小(如 Lists.partition(list, 500))非常重要。如果分块过小,会增加任务提交和线程调度的开销;如果分块过大,则可能导致某些线程负载过重,无法充分利用并行性。最佳分块大小通常需要根据实际任务的复杂度和执行时间进行测试和调整。
  • 异常处理: 在并行任务中,异常处理变得更为复杂。CompletableFuture 提供了 exceptionally() 和 handle() 等方法来处理异步任务中可能抛出的异常。在 join() 或 get() 时,如果任务抛出异常,它们会将异常重新抛出(通常是 CompletionException 或 ExecutionException),因此需要捕获并处理。
  • 结果聚合: CompletableFuture.allOf() 结合 thenApply 是一个优雅的聚合方式。如果需要聚合不同类型的 CompletableFuture 结果,可以使用 CompletableFuture.supplyAsync(() -> future1.join()).thenCombine(future2, (r1, r2) -> ...) 等组合方法。
  • 异步上下文: 确保传递给 CompletableFuture.supplyAsync() 的 ExecutorService 是合适的,避免使用默认的 ForkJoinPool.commonPool(),因为它可能被其他库或系统任务占用,导致资源争抢。

6. 总结

通过 CompletableFuture 进行大型列表的并行处理是提升Java应用性能的有效手段。核心在于避免在任务提交阶段就阻塞性地等待结果。正确的做法是先将所有任务异步提交并收集其 CompletableFuture 实例,待所有任务均已启动或完成时,再统一进行结果的聚合。合理配置 ExecutorService、选择合适的任务粒度以及完善异常处理机制,将确保您的并行处理方案既高效又健壮。

相关专题

更多
java
java

Java是一个通用术语,用于表示Java软件及其组件,包括“Java运行时环境 (JRE)”、“Java虚拟机 (JVM)”以及“插件”。php中文网还为大家带了Java相关下载资源、相关课程以及相关文章等内容,供大家免费下载使用。

835

2023.06.15

java正则表达式语法
java正则表达式语法

java正则表达式语法是一种模式匹配工具,它非常有用,可以在处理文本和字符串时快速地查找、替换、验证和提取特定的模式和数据。本专题提供java正则表达式语法的相关文章、下载和专题,供大家免费下载体验。

739

2023.07.05

java自学难吗
java自学难吗

Java自学并不难。Java语言相对于其他一些编程语言而言,有着较为简洁和易读的语法,本专题为大家提供java自学难吗相关的文章,大家可以免费体验。

735

2023.07.31

java配置jdk环境变量
java配置jdk环境变量

Java是一种广泛使用的高级编程语言,用于开发各种类型的应用程序。为了能够在计算机上正确运行和编译Java代码,需要正确配置Java Development Kit(JDK)环境变量。php中文网给大家带来了相关的教程以及文章,欢迎大家前来阅读学习。

397

2023.08.01

java保留两位小数
java保留两位小数

Java是一种广泛应用于编程领域的高级编程语言。在Java中,保留两位小数是指在进行数值计算或输出时,限制小数部分只有两位有效数字,并将多余的位数进行四舍五入或截取。php中文网给大家带来了相关的教程以及文章,欢迎大家前来阅读学习。

399

2023.08.02

java基本数据类型
java基本数据类型

java基本数据类型有:1、byte;2、short;3、int;4、long;5、float;6、double;7、char;8、boolean。本专题为大家提供java基本数据类型的相关的文章、下载、课程内容,供大家免费下载体验。

446

2023.08.02

java有什么用
java有什么用

java可以开发应用程序、移动应用、Web应用、企业级应用、嵌入式系统等方面。本专题为大家提供java有什么用的相关的文章、下载、课程内容,供大家免费下载体验。

430

2023.08.02

java在线网站
java在线网站

Java在线网站是指提供Java编程学习、实践和交流平台的网络服务。近年来,随着Java语言在软件开发领域的广泛应用,越来越多的人对Java编程感兴趣,并希望能够通过在线网站来学习和提高自己的Java编程技能。php中文网给大家带来了相关的视频、教程以及文章,欢迎大家前来学习阅读和下载。

16926

2023.08.03

高德地图升级方法汇总
高德地图升级方法汇总

本专题整合了高德地图升级相关教程,阅读专题下面的文章了解更多详细内容。

43

2026.01.16

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Django 教程
Django 教程

共28课时 | 3.2万人学习

SciPy 教程
SciPy 教程

共10课时 | 1.1万人学习

Sass 教程
Sass 教程

共14课时 | 0.8万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号