0

0

Java并行处理大型列表:使用CompletableFuture提升性能

霞舞

霞舞

发布时间:2025-07-28 15:02:01

|

743人浏览过

|

来源于php中文网

原创

java并行处理大型列表:使用completablefuture提升性能

本文旨在解决在Java中使用CompletableFuture进行并行处理时常见的性能陷阱。许多开发者尝试通过在流式操作中直接调用CompletableFuture::join来并行化任务,但这往往导致任务实际串行执行。本教程将详细解释这一现象,并提供一种正确的、高效的并行处理策略,通过分离异步任务的创建与结果的聚合,结合CompletableFuture.allOf实现真正的并行计算,最终将分散的结果合并成一个单一的列表。

理解并行处理的常见误区

在处理大量数据时,将耗时操作并行化是提升性能的有效手段。Java 8引入的CompletableFuture为异步编程提供了强大的支持。然而,不恰当的使用方式可能导致预期的并行效果无法实现。

考虑以下场景:有一个包含大量数据(例如50,000条记录)的列表,需要对每个列表项执行一个耗时操作,并将结果映射到Java对象,最终写入CSV文件。如果采用顺序处理,例如:

list.stream()
    .map(listItem -> service.methodA(listItem).map(result -> mapToBean(result, listItem)))
    .flatMap(Optional::stream)
    .collect(Collectors.toList());

当数据量较大时,这种方式可能非常慢,例如处理2,000条数据就需要4小时。为了加速,开发者可能会尝试使用CompletableFuture进行并行化,常见的错误尝试如下:

ExecutorService service = Executors.newFixedThreadPool(noOfCores - 1);
Lists.partition(list, 500).stream() // 将大列表分成小块
    .map(item -> CompletableFuture.supplyAsync(() -> executeListPart(item), service)) // 提交异步任务
    .map(CompletableFuture::join) // 立即等待每个任务完成
    .flatMap(List::stream)
    .collect(Collectors.toList());

尽管代码中使用了CompletableFuture.supplyAsync将任务提交到线程池,但紧随其后的.map(CompletableFuture::join)操作是导致性能问题的关键。CompletableFuture::join是一个阻塞操作,它会暂停当前流的执行,直到对应的CompletableFuture完成并返回结果。这意味着,尽管每个任务可能在不同的线程中执行,但流本身是按顺序处理每个CompletableFuture的,一个任务完成后,流才会处理下一个任务。这实际上将并行执行变成了顺序等待,从而失去了并行化的优势。

立即学习Java免费学习笔记(深入)”;

正确的并行处理策略

要实现真正的并行,核心思想是:先创建并启动所有异步任务,然后统一等待它们完成并收集结果。 避免在创建任务的同一流式管道中立即阻塞等待。

以下是实现这一策略的步骤和示例代码:

佳蓝在线销售系统(创业版) 佳蓝在线销售
佳蓝在线销售系统(创业版) 佳蓝在线销售

1、对ASP内核代码进行DLL封装,从而大大提高了用户的访问速度和安全性;2、采用后台生成HTML网页的格式,使程序访问速度得到进一步的提升;3、用户可发展下级会员并在下级购买商品时获得差额利润;4、全新模板选择功能;5、后台增加磁盘绑定功能;6、后台增加库存查询功能;7、后台增加财务统计功能;8、后台面值类型批量设定;9、后台财务曲线报表显示;10、完善订单功能;11、对所有传输的字符串进行安全

下载
  1. 创建并启动所有异步任务: 遍历数据分片,为每个分片创建一个CompletableFuture,并将其提交到ExecutorService中执行。将这些CompletableFuture实例收集到一个列表中。
  2. 统一等待所有任务完成: 使用CompletableFuture.allOf()方法创建一个新的CompletableFuture,它将在所有已提交的任务都完成时才完成。
  3. 聚合所有任务的结果: 当CompletableFuture.allOf()完成时,表明所有子任务都已完成,此时可以安全地对之前收集的CompletableFuture列表调用join()方法,并对结果进行扁平化和收集。
import com.google.common.collect.Lists; // 假设使用Guava的Lists.partition
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

public class ParallelProcessingExample {

    // 假设这是您的业务逻辑方法,处理列表的一个分片并返回结果列表
    // executeListPart(List partition) 应该返回 List
    private List executeListPart(List partition) {
        // 模拟耗时操作
        try {
            Thread.sleep(100); // 假设每个分片处理100ms
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        // 实际业务逻辑:处理partition中的每个MyItem,并生成MyProcessedBean
        return partition.stream()
                .map(item -> new MyProcessedBean("Processed_" + item.getId())) // 示例转换
                .collect(Collectors.toList());
    }

    public List processLargeListInParallel(List largeList, int partitionSize, int threadPoolSize) {
        // 1. 创建并配置线程池
        // 建议线程池大小根据CPU核心数和任务类型(IO密集型/CPU密集型)调整
        ExecutorService executorService = Executors.newFixedThreadPool(threadPoolSize);

        try {
            // 2. 将大列表分成小块,并为每个小块创建异步任务
            // CompletableFuture> 表示每个任务会返回一个MyProcessedBean列表
            List>> futures = Lists.partition(largeList, partitionSize).stream()
                    .map(partition -> CompletableFuture.supplyAsync(() -> executeListPart(partition), executorService))
                    .collect(Collectors.toList());

            // 3. 创建一个CompletableFuture,等待所有子任务完成
            CompletableFuture allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));

            // 4. 当所有子任务完成后,聚合结果
            List finalResults = allOf.thenApply(v ->
                    futures.stream()
                            .map(CompletableFuture::join) // 此时所有future都已完成,join是非阻塞的
                            .flatMap(List::stream)       // 扁平化List>为List
                            .collect(Collectors.toList())
            ).join(); // 阻塞等待最终结果的聚合

            return finalResults;

        } finally {
            // 5. 关闭线程池,释放资源
            executorService.shutdown();
            // 可选:等待线程池终止,确保所有任务都已完成
            // try {
            //     if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
            //         executorService.shutdownNow();
            //     }
            // } catch (InterruptedException ex) {
            //     executorService.shutdownNow();
            //     Thread.currentThread().interrupt();
            // }
        }
    }

    // 示例数据类
    static class MyItem {
        private String id;
        public MyItem(String id) { this.id = id; }
        public String getId() { return id; }
    }

    static class MyProcessedBean {
        private String processedId;
        public MyProcessedBean(String processedId) { this.processedId = processedId; }
        public String getProcessedId() { return processedId; }
        @Override
        public String toString() { return "MyProcessedBean{" + "processedId='" + processedId + '\'' + '}'; }
    }

    public static void main(String[] args) {
        ParallelProcessingExample app = new ParallelProcessingExample();

        // 构造一个大型列表
        List largeList = new java.util.ArrayList<>();
        for (int i = 0; i < 5000; i++) {
            largeList.add(new MyItem("item_" + i));
        }

        long startTime = System.currentTimeMillis();
        List results = app.processLargeListInParallel(largeList, 500, Runtime.getRuntime().availableProcessors() - 1);
        long endTime = System.currentTimeMillis();

        System.out.println("Processed " + results.size() + " items in " + (endTime - startTime) + " ms");
        // System.out.println("First 10 results: " + results.subList(0, Math.min(10, results.size())));
    }
}

注意事项与最佳实践

  1. 线程池管理:

    • ExecutorService是管理线程的关键。对于CPU密集型任务,线程池大小通常设置为Runtime.getRuntime().availableProcessors()或noOfCores - 1。对于IO密集型任务,可以适当增加线程池大小,因为线程在等待IO时不会占用CPU。
    • 在任务完成后,务必调用executorService.shutdown()来优雅地关闭线程池,释放资源。如果线程池是应用程序生命周期内的单例,则可以在应用程序关闭时统一管理。
    • awaitTermination()可以用于等待所有已提交的任务完成,但对于一次性任务聚合,CompletableFuture.allOf().join()通常就足够了。
  2. 列表分片:

    • 将大列表分片(例如使用Guava的Lists.partition)是一个很好的策略。每个分片的大小需要根据任务的粒度和系统资源进行调整。过小的分片会增加任务调度开销,过大的分片可能导致部分线程长时间空闲。
    • 确保executeListPart方法是线程安全的,并且不依赖于共享的可变状态,或者对共享状态进行适当的同步。
  3. 错误处理:

    • CompletableFuture提供了丰富的错误处理机制,例如exceptionally()、handle()、whenComplete()等。在生产环境中,应为异步任务添加健壮的错误处理逻辑,以防止单个任务失败导致整个流程中断。
    • 当使用CompletableFuture.allOf()时,如果任何一个子CompletableFuture异常完成,那么allOf也会异常完成。你可以通过.exceptionally()或.handle()来捕获和处理这些异常。
  4. 结果聚合:

    • CompletableFuture.allOf()返回的是CompletableFuture,因为它本身不关心子任务的结果,只关心它们是否完成。
    • 要获取所有子任务的结果,需要像示例中那样,在allOf完成后,再次遍历原始的futures列表,并调用join()(此时是非阻塞的),然后进行结果的flatMap和collect。

总结

通过将CompletableFuture的创建和结果的join操作分离,我们能够充分利用多核CPU的优势,实现真正意义上的并行处理。这种模式是处理大量数据或执行耗时操作时提升Java应用程序性能的关键。理解CompletableFuture的非阻塞特性以及如何正确地聚合结果,是编写高效、并发代码的重要一步。

相关文章

数码产品性能查询
数码产品性能查询

该软件包括了市面上所有手机CPU,手机跑分情况,电脑CPU,电脑产品信息等等,方便需要大家查阅数码产品最新情况,了解产品特性,能够进行对比选择最具性价比的商品。

下载

本站声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

相关专题

更多
java
java

Java是一个通用术语,用于表示Java软件及其组件,包括“Java运行时环境 (JRE)”、“Java虚拟机 (JVM)”以及“插件”。php中文网还为大家带了Java相关下载资源、相关课程以及相关文章等内容,供大家免费下载使用。

835

2023.06.15

java正则表达式语法
java正则表达式语法

java正则表达式语法是一种模式匹配工具,它非常有用,可以在处理文本和字符串时快速地查找、替换、验证和提取特定的模式和数据。本专题提供java正则表达式语法的相关文章、下载和专题,供大家免费下载体验。

740

2023.07.05

java自学难吗
java自学难吗

Java自学并不难。Java语言相对于其他一些编程语言而言,有着较为简洁和易读的语法,本专题为大家提供java自学难吗相关的文章,大家可以免费体验。

736

2023.07.31

java配置jdk环境变量
java配置jdk环境变量

Java是一种广泛使用的高级编程语言,用于开发各种类型的应用程序。为了能够在计算机上正确运行和编译Java代码,需要正确配置Java Development Kit(JDK)环境变量。php中文网给大家带来了相关的教程以及文章,欢迎大家前来阅读学习。

397

2023.08.01

java保留两位小数
java保留两位小数

Java是一种广泛应用于编程领域的高级编程语言。在Java中,保留两位小数是指在进行数值计算或输出时,限制小数部分只有两位有效数字,并将多余的位数进行四舍五入或截取。php中文网给大家带来了相关的教程以及文章,欢迎大家前来阅读学习。

399

2023.08.02

java基本数据类型
java基本数据类型

java基本数据类型有:1、byte;2、short;3、int;4、long;5、float;6、double;7、char;8、boolean。本专题为大家提供java基本数据类型的相关的文章、下载、课程内容,供大家免费下载体验。

446

2023.08.02

java有什么用
java有什么用

java可以开发应用程序、移动应用、Web应用、企业级应用、嵌入式系统等方面。本专题为大家提供java有什么用的相关的文章、下载、课程内容,供大家免费下载体验。

430

2023.08.02

java在线网站
java在线网站

Java在线网站是指提供Java编程学习、实践和交流平台的网络服务。近年来,随着Java语言在软件开发领域的广泛应用,越来越多的人对Java编程感兴趣,并希望能够通过在线网站来学习和提高自己的Java编程技能。php中文网给大家带来了相关的视频、教程以及文章,欢迎大家前来学习阅读和下载。

16926

2023.08.03

高德地图升级方法汇总
高德地图升级方法汇总

本专题整合了高德地图升级相关教程,阅读专题下面的文章了解更多详细内容。

43

2026.01.16

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
10分钟--Midjourney创作自己的漫画
10分钟--Midjourney创作自己的漫画

共1课时 | 0.1万人学习

Midjourney 关键词系列整合
Midjourney 关键词系列整合

共13课时 | 0.9万人学习

AI绘画教程
AI绘画教程

共2课时 | 0.2万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号