
在处理大量数据时,将耗时操作并行化是提升性能的有效手段。Java 8引入的CompletableFuture为异步编程提供了强大的支持。然而,不恰当的使用方式可能导致预期的并行效果无法实现。
考虑以下场景:有一个包含大量数据(例如50,000条记录)的列表,需要对每个列表项执行一个耗时操作,并将结果映射到Java对象,最终写入CSV文件。如果采用顺序处理,例如:
list.stream()
.map(listItem -> service.methodA(listItem).map(result -> mapToBean(result, listItem)))
.flatMap(Optional::stream)
.collect(Collectors.toList());当数据量较大时,这种方式可能非常慢,例如处理2,000条数据就需要4小时。为了加速,开发者可能会尝试使用CompletableFuture进行并行化,常见的错误尝试如下:
ExecutorService service = Executors.newFixedThreadPool(noOfCores - 1);
Lists.partition(list, 500).stream() // 将大列表分成小块
.map(item -> CompletableFuture.supplyAsync(() -> executeListPart(item), service)) // 提交异步任务
.map(CompletableFuture::join) // 立即等待每个任务完成
.flatMap(List::stream)
.collect(Collectors.toList());尽管代码中使用了CompletableFuture.supplyAsync将任务提交到线程池,但紧随其后的.map(CompletableFuture::join)操作是导致性能问题的关键。CompletableFuture::join是一个阻塞操作,它会暂停当前流的执行,直到对应的CompletableFuture完成并返回结果。这意味着,尽管每个任务可能在不同的线程中执行,但流本身是按顺序处理每个CompletableFuture的,一个任务完成后,流才会处理下一个任务。这实际上将并行执行变成了顺序等待,从而失去了并行化的优势。
立即学习“Java免费学习笔记(深入)”;
要实现真正的并行,核心思想是:先创建并启动所有异步任务,然后统一等待它们完成并收集结果。 避免在创建任务的同一流式管道中立即阻塞等待。
以下是实现这一策略的步骤和示例代码:
import com.google.common.collect.Lists; // 假设使用Guava的Lists.partition
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
public class ParallelProcessingExample {
// 假设这是您的业务逻辑方法,处理列表的一个分片并返回结果列表
// executeListPart(List<MyItem> partition) 应该返回 List<MyProcessedBean>
private List<MyProcessedBean> executeListPart(List<MyItem> partition) {
// 模拟耗时操作
try {
Thread.sleep(100); // 假设每个分片处理100ms
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 实际业务逻辑:处理partition中的每个MyItem,并生成MyProcessedBean
return partition.stream()
.map(item -> new MyProcessedBean("Processed_" + item.getId())) // 示例转换
.collect(Collectors.toList());
}
public List<MyProcessedBean> processLargeListInParallel(List<MyItem> largeList, int partitionSize, int threadPoolSize) {
// 1. 创建并配置线程池
// 建议线程池大小根据CPU核心数和任务类型(IO密集型/CPU密集型)调整
ExecutorService executorService = Executors.newFixedThreadPool(threadPoolSize);
try {
// 2. 将大列表分成小块,并为每个小块创建异步任务
// CompletableFuture<List<MyProcessedBean>> 表示每个任务会返回一个MyProcessedBean列表
List<CompletableFuture<List<MyProcessedBean>>> futures = Lists.partition(largeList, partitionSize).stream()
.map(partition -> CompletableFuture.supplyAsync(() -> executeListPart(partition), executorService))
.collect(Collectors.toList());
// 3. 创建一个CompletableFuture,等待所有子任务完成
CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
// 4. 当所有子任务完成后,聚合结果
List<MyProcessedBean> finalResults = allOf.thenApply(v ->
futures.stream()
.map(CompletableFuture::join) // 此时所有future都已完成,join是非阻塞的
.flatMap(List::stream) // 扁平化List<List<MyProcessedBean>>为List<MyProcessedBean>
.collect(Collectors.toList())
).join(); // 阻塞等待最终结果的聚合
return finalResults;
} finally {
// 5. 关闭线程池,释放资源
executorService.shutdown();
// 可选:等待线程池终止,确保所有任务都已完成
// try {
// if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
// executorService.shutdownNow();
// }
// } catch (InterruptedException ex) {
// executorService.shutdownNow();
// Thread.currentThread().interrupt();
// }
}
}
// 示例数据类
static class MyItem {
private String id;
public MyItem(String id) { this.id = id; }
public String getId() { return id; }
}
static class MyProcessedBean {
private String processedId;
public MyProcessedBean(String processedId) { this.processedId = processedId; }
public String getProcessedId() { return processedId; }
@Override
public String toString() { return "MyProcessedBean{" + "processedId='" + processedId + '\'' + '}'; }
}
public static void main(String[] args) {
ParallelProcessingExample app = new ParallelProcessingExample();
// 构造一个大型列表
List<MyItem> largeList = new java.util.ArrayList<>();
for (int i = 0; i < 5000; i++) {
largeList.add(new MyItem("item_" + i));
}
long startTime = System.currentTimeMillis();
List<MyProcessedBean> results = app.processLargeListInParallel(largeList, 500, Runtime.getRuntime().availableProcessors() - 1);
long endTime = System.currentTimeMillis();
System.out.println("Processed " + results.size() + " items in " + (endTime - startTime) + " ms");
// System.out.println("First 10 results: " + results.subList(0, Math.min(10, results.size())));
}
}线程池管理:
列表分片:
错误处理:
结果聚合:
通过将CompletableFuture的创建和结果的join操作分离,我们能够充分利用多核CPU的优势,实现真正意义上的并行处理。这种模式是处理大量数据或执行耗时操作时提升Java应用程序性能的关键。理解CompletableFuture的非阻塞特性以及如何正确地聚合结果,是编写高效、并发代码的重要一步。
以上就是Java并行处理大型列表:使用CompletableFuture提升性能的详细内容,更多请关注php中文网其它相关文章!
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号