
在java并行流中对共享可变状态(如外部列表)进行操作时,由于多线程并发访问,可能导致不可预测的行为,例如`list.size()`的非预期变化。本文将深入探讨并行流中状态操作引发的竞态条件,并提供使用并发锁等机制进行有效控制的方法,以确保数据一致性和程序正确性。
Java 8引入的Stream API极大地简化了集合操作。并行流(Parallel Stream)是Stream API的一个强大特性,它允许我们将流操作并行化,从而利用多核处理器的优势来提高处理速度。然而,并行流的强大能力也伴随着对并发编程的挑战。
当流操作是“无状态的”(stateless)时,即每个元素的操作独立于其他元素,并且不修改任何外部共享状态时,并行流能很好地工作。但如果流操作是“有状态的”(stateful),例如在lambda表达式中访问或修改一个外部变量(如一个List),那么就可能引入并发问题。
考虑以下示例代码,它尝试在一个并行流中根据条件向一个外部List添加元素:
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class ParallelStreamStatefulExample {
static void statefulParallelLambdaSetProblem() {
Set<Integer> s = new HashSet<>(
Arrays.asList(1, 2, 3, 4, 5, 6)
);
List<Integer> list = new ArrayList<>();
int sum = s.parallelStream().mapToInt(e -> {
// 问题:list.size() 在管道操作执行期间可能发生变化
// mapToInt 的 lambda 表达式依赖于此值,因此它是“有状态的”
if (list.size() <= 3) {
list.add(e);
return e;
} else {
return 0;
}
}).sum();
System.out.println("计算结果 sum: " + sum);
System.out.println("最终 list: " + list);
System.out.println("最终 list size: " + list.size());
}
public static void main(String[] args) {
statefulParallelLambdaSetProblem();
}
}在上述代码中,list.size()和list.add(e)都在并行流的lambda表达式中被访问和修改。由于并行流会使用多个线程同时处理数据,这些对共享list的操作会交错执行,导致不可预测的结果。
立即学习“Java免费学习笔记(深入)”;
当多个线程同时访问和修改同一个共享资源,并且至少有一个操作是写入操作时,如果这些操作的最终结果取决于线程执行的时序,就称之为发生了“竞态条件”(Race Condition)。在上述示例中,list.size()的非预期变化正是竞态条件的一个典型表现。
具体来说,当一个线程执行if (list.size() <= 3)时,它读取了list的当前大小。但在它判断完条件并准备执行list.add(e)之前,CPU调度器可能将执行权切换给另一个线程。这个新线程也可能读取list.size(),并根据它自己的判断添加一个元素,从而改变了list的实际大小。当控制权再次回到第一个线程时,它之前读取的list.size()值已经过时,但它仍会基于这个过时的值做出判断并执行list.add(e)。
这种线程执行顺序的不确定性,加上对非线程安全的ArrayList的并发修改,使得list.size()的值在不同的执行时刻和不同的线程看来可能不同,最终导致:
为了解决并行流中状态操作引发的竞态条件,我们需要引入并发控制机制,确保对共享资源的访问是同步的(Synchronized)和原子性的(Atomic)。Java提供了多种并发工具,其中最常用的是synchronized关键字和java.util.concurrent.locks包下的锁。
synchronized关键字可以用于方法或代码块,确保在任何给定时刻只有一个线程可以执行被同步的代码。
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class ParallelStreamStatefulExampleSynchronized {
static void statefulParallelLambdaSetSynchronized() {
Set<Integer> s = new HashSet<>(
Arrays.asList(1, 2, 3, 4, 5, 6)
);
List<Integer> list = new ArrayList<>();
// 使用一个专门的锁对象,或者直接同步在list对象上(如果list本身不是线程安全的,需要谨慎)
// 这里为了清晰,使用一个独立的锁对象
final Object lock = new Object();
int sum = s.parallelStream().mapToInt(e -> {
int result = 0;
synchronized (lock) { // 同步访问 list.size() 和 list.add()
if (list.size() <= 3) {
list.add(e);
result = e;
}
}
return result;
}).sum();
System.out.println("同步后的 sum: " + sum);
System.out.println("同步后的 list: " + list);
System.out.println("同步后的 list size: " + list.size());
}
public static void main(String[] args) {
statefulParallelLambdaSetSynchronized();
}
}通过将if (list.size() <= 3)和list.add(e)操作放入synchronized (lock)块中,我们确保了在任何时刻只有一个线程能够执行这段代码,从而避免了竞态条件。
ReentrantLock提供了比synchronized更灵活的锁定机制,例如可以尝试获取锁、定时获取锁等。
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ParallelStreamStatefulExampleReentrantLock {
static void statefulParallelLambdaSetReentrantLock() {
Set<Integer> s = new HashSet<>(
Arrays.asList(1, 2, 3, 4, 5, 6)
);
List<Integer> list = new ArrayList<>();
final Lock listLock = new ReentrantLock(); // 创建一个可重入锁
int sum = s.parallelStream().mapToInt(e -> {
int result = 0;
listLock.lock(); // 获取锁
try {
if (list.size() <= 3) {
list.add(e);
result = e;
}
} finally {
listLock.unlock(); // 确保在finally块中释放锁
}
return result;
}).sum();
System.out.println("ReentrantLock 同步后的 sum: " + sum);
System.out.println("ReentrantLock 同步后的 list: " + list);
System.out.println("ReentrantLock 同步后的 list size: " + list.size());
}
public static void main(String[] args) {
statefulParallelLambdaSetReentrantLock();
}
}使用ReentrantLock时,需要手动调用lock()获取锁和unlock()释放锁,并且通常建议将unlock()放在finally块中,以确保在发生异常时也能正确释放锁。
Java并行流是提高程序性能的强大工具,但它要求开发者对并发编程有深入的理解。在并行流中使用有状态操作,特别是对共享可变状态进行读写时,极易引发竞态条件,导致程序行为不可预测。通过理解竞态条件的本质,并合理运用synchronized关键字或java.util.concurrent.locks包下的锁机制,我们可以有效地控制并发访问,确保数据的一致性和程序的正确性。然而,最好的实践是尽量设计无状态的流操作,或利用Java并发API提供的线程安全结构,以最小化锁的开销,充分发挥并行流的优势。
以上就是Java并行流中状态操作的陷阱:理解竞态条件与并发控制的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号