
在处理大量数据时,为了提高处理速度,我们通常会考虑使用并行化技术。java 8引入的completablefuture为异步和并行编程提供了强大的支持。然而,不恰当的使用方式可能导致预期的并行效果无法实现,甚至退化为串行执行。
一个常见的错误模式是在流式操作(Stream API)中直接调用CompletableFuture::join。考虑以下代码片段:
// 错误示例:导致串行执行
ExecutorService service = Executors.newFixedThreadPool(noOfCores - 1);
List<ResultBean> results = Lists.partition(largeList, 500).stream()
.map(item -> CompletableFuture.supplyAsync(() -> executeListPart(item), service))
.map(CompletableFuture::join) // 错误:在这里调用join会阻塞当前流的执行,直到当前Future完成
.flatMap(List::stream)
.collect(Collectors.toList());上述代码的意图是并行处理列表的各个分区。然而,由于在stream管道中紧接着map(CompletableFuture::join),这意味着每次迭代都会等待当前CompletableFuture完成并获取其结果后,才会继续处理流中的下一个元素。这实际上将并行提交的任务变成了串行等待,失去了并行处理的优势。尽管每个任务可能在不同的线程中执行,但主线程(或驱动流的线程)在等待,从而导致整体执行时间并未显著缩短。
要实现真正的并行执行,关键在于将异步任务的提交与结果的收集/等待操作分离。正确的做法是先将所有异步任务提交到线程池,并收集它们返回的CompletableFuture实例,然后再统一等待这些CompletableFuture全部完成并聚合结果。
首先,我们需要一个ExecutorService来管理线程池,以便CompletableFuture可以在其中执行异步任务。然后,将大型列表划分为更小的分区(这有助于管理内存和任务粒度),并为每个分区提交一个异步任务。每个任务都返回一个CompletableFuture,这些CompletableFuture实例会被收集到一个列表中。
立即学习“Java免费学习笔记(深入)”;
import com.google.common.collect.Lists; // 假设使用Guava的Lists.partition
import java.util.List;
import java.util.Optional;
import java.util.concurrent.*;
import java.util.stream.Collectors;
// 假设的ListItem和ResultBean类
class ListItem {}
class ResultBean {}
class SomeService {
public Optional<Object> methodA(ListItem item) {
// 模拟耗时操作
try { Thread.sleep(10); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
return Optional.of(new Object());
}
}
public class ParallelDataProcessor {
private static SomeService service = new SomeService(); // 假设的服务实例
// 假设的mapToBean方法
private static ResultBean mapToBean(Object result, ListItem item) {
// 实际的映射逻辑
return new ResultBean();
}
// 模拟的executeListPart方法,它处理一个ListItem分区并返回List<ResultBean>
private static List<ResultBean> executeListPart(List<ListItem> partition) {
return partition.stream()
.map(listItem -> service.methodA(listItem)
.map(result -> mapToBean(result, listItem)))
.flatMap(Optional::stream)
.collect(Collectors.toList());
}
public static void main(String[] args) throws InterruptedException {
int noOfCores = Runtime.getRuntime().availableProcessProcessors();
ExecutorService executor = Executors.newFixedThreadPool(noOfCores - 1);
// 模拟一个大型列表
List<ListItem> largeList = new java.util.ArrayList<>();
for (int i = 0; i < 50000; i++) {
largeList.add(new ListItem());
}
// 1. 将大型列表分区
List<List<ListItem>> partitionedList = Lists.partition(largeList, 500);
// 2. 提交异步任务并收集CompletableFuture实例
List<CompletableFuture<List<ResultBean>>> futures = partitionedList.stream()
.map(partition -> CompletableFuture.supplyAsync(() -> executeListPart(partition), executor))
.collect(Collectors.toList());
// ... 后续等待和结果收集
// 3. 等待所有CompletableFuture完成并收集结果
List<ResultBean> finalResults = futures.stream()
.map(CompletableFuture::join) // 在所有Future都已提交后,统一等待并获取结果
.flatMap(List::stream) // 将List<List<ResultBean>>扁平化为List<ResultBean>
.collect(Collectors.toList());
System.out.println("Total processed items: " + finalResults.size());
// 4. 关闭ExecutorService
executor.shutdown();
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
}
}在这个阶段,map操作只负责创建并返回CompletableFuture,它本身是非阻塞的。所有的异步任务几乎同时被提交到executor管理的线程池中,实现了真正的并行执行。
在所有CompletableFuture实例都被收集到列表后,我们可以统一等待它们完成。最直接的方式是遍历这个CompletableFuture列表,并对每个Future调用join()方法。由于此时所有的异步任务都已经启动,join()操作将按顺序阻塞并获取每个已完成任务的结果。
// 承接上一步的代码
List<ResultBean> finalResults = futures.stream()
.map(CompletableFuture::join) // 在所有Future都已提交后,统一等待并获取结果
.flatMap(List::stream) // 将List<List<ResultBean>>扁平化为List<ResultBean>
.collect(Collectors.toList()); // 收集所有结果通过这种方式,我们确保了所有任务都在并行执行,并且只在所有任务都启动后才开始等待它们的完成。
在使用ExecutorService时,合理管理其生命周期至关重要。
如果你的应用程序生命周期中会频繁地执行类似的批处理任务,那么保持ExecutorService实例的存活并复用它会更高效,而不是每次都创建和销毁。在这种情况下,你可能不会在每次任务完成后立即调用shutdown()。
数据分区(Partitioning): 将大型列表划分为较小的分区是并行处理大数据集的常用策略。这有助于:
线程池大小: Executors.newFixedThreadPool(noOfCores - 1)是一个常见的起点,但最佳线程池大小取决于任务类型:
异常处理: CompletableFuture提供了丰富的异常处理机制,例如exceptionally()、handle()等。在实际应用中,务必考虑异步任务中可能出现的异常,并进行适当的捕获和处理,以防止任务失败导致整个批处理流程中断。
结果聚合: 如果需要将所有分区的结果聚合到一个单一的列表中,如示例所示,flatMap(List::stream)是常见的模式。确保你的executeListPart方法返回的是一个列表,以便后续的扁平化操作。
通过将CompletableFuture的提交与结果的join操作分离,我们能够有效地利用Java的并行处理能力来加速大数据集的处理。核心思想是:先启动所有异步任务,让它们在后台并行执行,然后统一等待这些任务的完成并收集结果。同时,合理配置ExecutorService和数据分区策略,并注意异常处理,是构建健壮、高效并行处理系统的关键。
以上就是Java CompletableFuture并行处理大数据列表的优化实践的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号