
本文旨在介绍如何利用 Java 并行流高效地处理大型列表,尤其是在每个元素的处理过程耗时较长的情况下。并行流能够将列表分割成多个子任务,并在多个线程上并发执行,从而显著提升处理速度。但同时,并发编程也带来了共享资源同步的问题,需要谨慎处理。
假设我们有一个 Foo 类,其 process 方法需要处理一个 Bar 类型的列表,并且 handle 方法的处理过程比较耗时。为了提高效率,我们可以将列表分割成多个子列表,然后使用并行流并发处理每个子列表。
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
class Foo {
private int len;
public Foo(int len) {
this.len = len;
}
public void process(List<Bar> list) {
List<List<Bar>> sublists = new ArrayList<>();
for (int i = 0; i < list.size(); i += len) {
sublists.add(list.subList(i, Math.min(i + len, list.size())));
}
// 并行处理子列表
sublists.parallelStream()
.forEach(this::handle);
}
private void handle(List<Bar> sublist) {
// 耗时的处理逻辑
System.out.println("Processing sublist: " + sublist);
try {
Thread.sleep(100); // 模拟耗时操作
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Bar {
private int id;
public Bar(int id) {
this.id = id;
}
@Override
public String toString() {
return "Bar{" +
"id=" + id +
'}';
}
}
public class Main {
public static void main(String[] args) {
List<Bar> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
list.add(new Bar(i));
}
Foo foo = new Foo(3);
foo.process(list);
}
}在这个例子中,我们首先将原始列表分割成多个大小为 len 的子列表。然后,我们使用 sublists.parallelStream().forEach(this::handle) 并行处理每个子列表。parallelStream() 方法将列表转换为并行流,forEach() 方法对流中的每个元素执行指定的操作。
如果 handle 方法返回一个结果,并且我们需要收集所有结果,可以使用 map 和 collect 方法。
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
class Foo {
private int len;
public Foo(int len) {
this.len = len;
}
public List<String> process(List<Bar> list) {
List<List<Bar>> sublists = new ArrayList<>();
for (int i = 0; i < list.size(); i += len) {
sublists.add(list.subList(i, Math.min(i + len, list.size())));
}
// 并行处理子列表并收集结果
return sublists.parallelStream()
.map(this::handle)
.collect(Collectors.toList());
}
private String handle(List<Bar> sublist) {
// 耗时的处理逻辑,并返回结果
System.out.println("Processing sublist: " + sublist);
try {
Thread.sleep(100); // 模拟耗时操作
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Result of " + sublist;
}
}
class Bar {
private int id;
public Bar(int id) {
this.id = id;
}
@Override
public String toString() {
return "Bar{" +
"id=" + id +
'}';
}
}
public class Main {
public static void main(String[] args) {
List<Bar> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
list.add(new Bar(i));
}
Foo foo = new Foo(3);
List<String> results = foo.process(list);
System.out.println("Results: " + results);
}
}在这个例子中,handle 方法返回一个字符串结果。我们使用 sublists.parallelStream().map(this::handle).collect(Collectors.toList()) 并行处理每个子列表,并将结果收集到一个列表中。map 方法将流中的每个元素转换为另一个元素,collect 方法将流中的所有元素收集到一个集合中。
需要注意的是,当 handle 方法访问共享资源时,需要进行同步处理,以避免出现线程安全问题。例如,如果 handle 方法需要修改一个共享的变量,可以使用 synchronized 关键字或 java.util.concurrent 包中的并发工具类来保证线程安全。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
class Foo {
private int len;
private AtomicInteger counter = new AtomicInteger(0); // 使用 AtomicInteger 保证线程安全
public Foo(int len) {
this.len = len;
}
public void process(List<Bar> list) {
List<List<Bar>> sublists = new ArrayList<>();
for (int i = 0; i < list.size(); i += len) {
sublists.add(list.subList(i, Math.min(i + len, list.size())));
}
// 并行处理子列表
sublists.parallelStream()
.forEach(this::handle);
}
private void handle(List<Bar> sublist) {
// 耗时的处理逻辑,并访问共享资源
System.out.println("Processing sublist: " + sublist);
try {
Thread.sleep(100); // 模拟耗时操作
} catch (InterruptedException e) {
e.printStackTrace();
}
// 原子性地增加计数器
counter.addAndGet(sublist.size());
}
public int getCounter() {
return counter.get();
}
}
class Bar {
private int id;
public Bar(int id) {
this.id = id;
}
@Override
public String toString() {
return "Bar{" +
"id=" + id +
'}';
}
}
public class Main {
public static void main(String[] args) {
List<Bar> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
list.add(new Bar(i));
}
Foo foo = new Foo(3);
foo.process(list);
// 等待所有任务完成
try {
Thread.sleep(2000); // 确保所有任务都已完成
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Counter: " + foo.getCounter());
}
}在这个例子中,我们使用 AtomicInteger 来保证计数器的线程安全。AtomicInteger 提供了原子性的 addAndGet 方法,可以安全地增加计数器的值。
并行流是 Java 中一种强大的并发处理工具,可以显著提高列表处理的效率。但是,在使用并行流时,需要注意共享资源的同步问题,并选择合适的并发工具类来保证线程安全。同时,需要合理分割任务,避免过多的线程切换带来的性能损耗。
以上就是并发处理共享列表并收集结果的方案的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号