
本文旨在解决使用CompletableFuture并行处理大型列表时遇到的性能瓶颈问题。通过移除导致串行执行的join操作,并提供两种等待所有任务完成的方法,帮助开发者充分利用多线程优势,显著提升数据处理速度。重点讲解如何正确地提交任务到线程池,并确保所有任务并行执行并最终完成,从而优化应用程序的性能。
在使用CompletableFuture处理大型列表时,并行执行可以显著提高处理速度。然而,不当的使用方式可能会导致看似并行,实则串行执行,从而达不到预期的性能提升效果。本文将针对这一问题,提供一种解决方案,并解释其背后的原理。
问题分析
原始代码中,在流式处理过程中使用了.map(CompletableFuture::join)。这个操作会导致主线程等待每个CompletableFuture完成,然后才能继续处理下一个CompletableFuture。因此,虽然每个CompletableFuture都在不同的线程中运行,但它们实际上是按顺序启动和完成的,导致并行处理失效。
解决方案
核心思想是先将所有CompletableFuture提交到线程池,然后等待所有任务完成,最后再收集结果。
代码示例
以下是修改后的代码示例:
import com.google.common.collect.Lists;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
public class CompletableFutureParallel {
public static void main(String[] args) {
int noOfCores = Runtime.getRuntime().availableProcessors();
ExecutorService service = Executors.newFixedThreadPool(noOfCores - 1);
List list = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); // 示例数据
// 将任务提交到线程池
List>> completableFutureList = Lists.partition(list, 2).stream()
.map(item -> CompletableFuture.supplyAsync(() -> executeListPart(item), service))
.collect(Collectors.toList());
// 等待所有任务完成
completableFutureList.forEach(CompletableFuture::join);
// 收集结果
List result = completableFutureList.stream()
.map(CompletableFuture::join) // 再次join,确保获取到结果
.flatMap(List::stream)
.collect(Collectors.toList());
System.out.println("Result: " + result);
service.shutdown();
}
// 模拟耗时操作
private static List executeListPart(List item) {
try {
Thread.sleep(1000); // 模拟处理时间
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Processing: " + item + " in thread: " + Thread.currentThread().getName());
return item.stream().map(i -> i * 2).collect(Collectors.toList()); // 示例操作
}
} 代码解释
- 创建线程池: 使用Executors.newFixedThreadPool(noOfCores - 1)创建一个固定大小的线程池,线程数量通常设置为CPU核心数减1,以避免过度竞争。
- 分割列表: 使用Lists.partition(list, 500)将大型列表分割成多个小列表,每个小列表作为一个任务提交到线程池。
- 提交任务: 使用CompletableFuture.supplyAsync(() -> executeListPart(item), service)将每个小列表的处理任务提交到线程池。supplyAsync方法异步执行executeListPart方法,并返回一个CompletableFuture对象。
- 收集CompletableFuture: 将所有CompletableFuture对象收集到一个列表中。
- 等待任务完成: 使用completableFutureList.forEach(CompletableFuture::join)等待所有CompletableFuture对象完成。join方法会阻塞当前线程,直到对应的CompletableFuture完成。
- 收集结果: 使用流式操作收集每个CompletableFuture的结果。
注意事项
- 异常处理: 在executeListPart方法中,需要妥善处理可能抛出的异常。可以将异常记录到日志中,或者向上抛出,由主线程处理。
- 线程池管理: 在程序结束时,需要关闭线程池,释放资源。可以使用service.shutdown()方法关闭线程池。
- 数据同步: 如果executeListPart方法需要访问共享数据,需要确保数据同步,避免出现线程安全问题。
- 资源限制: 需要根据实际情况调整线程池大小和列表分割大小,避免资源耗尽。
另一种等待任务完成的方法
除了使用forEach(CompletableFuture::join),还可以使用CompletableFuture.allOf等待所有任务完成。
CompletableFutureallFutures = CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[0])); allFutures.join();
CompletableFuture.allOf方法返回一个新的CompletableFuture,当所有输入的CompletableFuture都完成时,该CompletableFuture才完成。
总结
通过移除导致串行执行的join操作,并使用forEach(CompletableFuture::join)或CompletableFuture.allOf等待所有任务完成,可以实现CompletableFuture的并行执行,从而显著提高大型列表的处理速度。 在实际应用中,需要根据具体情况调整线程池大小和列表分割大小,并注意异常处理和数据同步,以确保程序的正确性和性能。










