
1. 理解Future的本质与误用
在java并发编程中,future接口代表一个异步计算的结果。当我们将一个任务提交给executorservice时,submit()方法会返回一个future对象,通过它可以查询任务是否完成、取消任务以及获取任务的最终结果。future的核心在于其表示的是一个未来可获得且一旦计算完成便不可变的结果。
原始代码中尝试使用List
List> elements = new ArrayList<>(); // ... elements.set(firstIndex, elements.get(firstIndex).get() - randomAmount);
这里存在两个主要问题:
-
类型不匹配:elements.set()方法的第二个参数期望的是一个Future
对象,但代码中传入的是一个int类型的值(elements.get(firstIndex).get() - randomAmount的结果)。这直接导致了编译错误:set > is not applicable to arguments (int,int)。 - Future的不可变性:即使类型匹配,Future对象一旦其内部结果被设定,其结果本身是不可变的。你不能直接“修改”一个Future所持有的值。如果需要修改,实际上是需要创建一个新的Future对象来替代旧的,但这通常不是管理可变共享数据的正确方式。
因此,Future不应被用作直接存储和修改可变共享数据的容器。当我们需要一个可变整数数组并在多个线程中对其进行操作时,应该选择更合适的并发数据结构。
2. 选择正确的共享数据结构
对于需要在多个线程间共享并进行修改的整数数组,我们应该避免使用Future列表。最直接的替代方案是使用List
立即学习“Java免费学习笔记(深入)”;
为了确保线程安全,Java提供了多种并发工具。在本场景中,AtomicIntegerArray是管理整数数组并提供原子性操作的理想选择。
AtomicIntegerArray提供了对数组中每个元素的原子性操作,例如get()、set()、compareAndSet()、getAndAdd()等,这些操作能够保证在多线程环境下的数据一致性,避免了手动加锁的复杂性。
3. ExecutorService的生命周期管理
原始代码中,在提交第一批初始化任务后就调用了ex.shutdown():
// ... 初始化 elements 列表后 ex.shutdown(); // 过早关闭 ExecutorService // ... 之后尝试提交新的更新任务
ExecutorService一旦调用了shutdown()方法,它将不再接受新的任务,但会完成所有已提交的任务。如果过早调用shutdown(),后续需要提交的更新任务将无法被执行,通常会抛出RejectedExecutionException。
正确的做法是,在所有任务都提交完毕并且我们希望等待它们全部执行完成后,才调用shutdown()。为了确保所有任务都已完成,我们通常会结合使用shutdown()和awaitTermination()方法。awaitTermination()会阻塞当前线程,直到所有任务执行完毕或者达到指定的超时时间。
4. 实现线程安全的数组更新
为了解决原始问题中的编译错误和潜在的并发问题,我们将采用AtomicIntegerArray并正确管理ExecutorService的生命周期。
示例代码:使用AtomicIntegerArray实现线程安全的数组更新
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.concurrent.ThreadLocalRandom;
public class ConcurrentArrayUpdateTutorial {
public static void main(String[] args) throws InterruptedException {
// 1. 初始化 ExecutorService,用于管理并发任务
var ex = Executors.newFixedThreadPool(10);
// 2. 使用 AtomicIntegerArray 存储可变整数,提供线程安全
// AtomicIntegerArray 内部管理一个 int[],并为每个元素提供原子操作
AtomicIntegerArray elements = new AtomicIntegerArray(100);
for (int i = 0; i < 100; i++) {
elements.set(i, 1000); // 初始化每个元素为1000
}
// 计算初始总和
int initialSum = 0;
for (int i = 0; i < elements.length(); i++) {
initialSum += elements.get(i);
}
System.out.println("Initial sum: " + initialSum);
// 3. 提交大量并发更新任务
int numberOfUpdates = 10_000;
// 使用 CountDownLatch 等待所有更新任务完成
CountDownLatch latch = new CountDownLatch(numberOfUpdates);
for (int i = 0; i < numberOfUpdates; i++) {
ex.submit(() -> {
try {
int firstIndex = ThreadLocalRandom.current().nextInt(100);
// int secondIndex = ThreadLocalRandom.current().nextInt(100); // 原始问题中提及,但在逻辑中未被使用
int randomAmount = ThreadLocalRandom.current().nextInt(1000);
// 确保减法操作的原子性及条件判断
boolean updated = false;
while (!updated) {
int currentValue = elements.get(firstIndex); // 原子性获取当前值
// 检查是否有足够的量可以减去
if (currentValue >= randomAmount) {
// 使用 compareAndSet 尝试原子性地更新
// 如果 currentValue 在此期间被其他线程修改,compareAndSet 会失败,
// 循环会再次执行,获取新的 currentValue 并重试。
if (elements.compareAndSet(firstIndex, currentValue, currentValue - randomAmount)) {
updated = true; // 更新成功,退出循环
// 如果需要将 randomAmount 加到 secondIndex,可以在这里进行原子操作
// elements.getAndAdd(secondIndex, randomAmount);
}
} else {
// 当前值不足以减去 randomAmount,放弃本次更新
updated = true; // 退出循环
}
}
} finally {
latch.countDown(); // 任务完成,计数器减一
}
});
}
// 4. 等待所有任务完成
latch.await(); // 阻塞当前线程,直到所有任务都调用了 countDown()
ex.shutdown(); // 提交所有任务后,关闭 ExecutorService
// 等待 ExecutorService 中的所有任务真正终止
if (!ex.awaitTermination(5, TimeUnit.SECONDS)) {
System.err.println("Executor did not terminate in time. Forcing shutdown.");
ex.shutdownNow(); // 如果超时,则强制关闭
}
// 5. 计算最终总和
int finalSum = 0;
for (int i = 0; i < elements.length(); i++) {
finalSum += elements.get(i);
}
System.out.println("Final sum: " + finalSum);
}
}代码解析与注意事项:
-
AtomicIntegerArray:它取代了List
>,直接存储整数,并提供了线程安全的get()和set()方法。更重要的是,对于条件性更新(如if (value - amount > 0)),我们使用了compareAndSet循环模式,这是一种确保原子性操作的经典方法。 - elements.get(firstIndex):原子性地获取指定索引的值。
- elements.compareAndSet(firstIndex, currentValue, currentValue - randomAmount):这是一个原子操作。它会检查firstIndex处的当前值是否仍然是currentValue。如果是,则将其原子性地更新为currentValue - randomAmount并返回true;否则(说明在get()和compareAndSet()之间,该值已被其他线程修改),它不执行更新并返回false。通过while循环,我们可以不断重试,直到更新成功或条件不满足。
- ExecutorService生命周期:ex.shutdown()被移到了所有任务提交之后,latch.await()确保所有任务执行完毕。awaitTermination()则用于等待线程池优雅关闭。
- CountDownLatch:用于主线程等待所有提交的子任务完成。每个任务完成时调用latch.countDown(),主线程通过latch.await()阻塞直到计数器归零。
- ThreadLocalRandom:在多线程环境中生成随机数时,ThreadLocalRandom比Random更高效且避免了竞争。
5. 最佳实践与总结
- 区分Future与可变数据:Future用于获取异步计算的结果,不应用于存储和修改共享的可变数据。
- 选择合适的并发数据结构:当需要并发访问和修改共享数据时,优先考虑java.util.concurrent包中的并发集合(如ConcurrentHashMap、CopyOnWriteArrayList)或java.util.concurrent.atomic包中的原子类(如AtomicInteger、AtomicLong、AtomicReference、AtomicIntegerArray)。它们提供了比手动加锁更高效、更安全的并发控制机制。
- 正确管理ExecutorService生命周期:确保在所有任务提交完成后才调用shutdown(),并使用awaitTermination()等待任务完成,以避免资源泄露和任务丢失。
- 原子性操作:对于涉及读取、判断和写入的复合操作,简单的get()和set()组合不足以保证线程安全。应使用原子类提供的compareAndSet()或getAndAdd()等方法来确保整个操作的原子性。当需要条件性更新时,compareAndSet循环模式是常见的解决方案。
- 避免过早优化:在设计并发程序时,首先要保证正确性,然后才是性能。线程安全是并发编程的基石。
通过遵循这些原则,可以编写出更健壮、更高效的Java并发应用程序。











