
在java并发编程中,future接口代表一个异步计算的结果。当我们将一个任务提交给executorservice时,submit()方法会返回一个future对象,通过它可以查询任务是否完成、取消任务以及获取任务的最终结果。future的核心在于其表示的是一个未来可获得且一旦计算完成便不可变的结果。
原始代码中尝试使用List<Future<Integer>>来存储和直接修改整数值:
List<Future<Integer>> elements = new ArrayList<>(); // ... elements.set(firstIndex, elements.get(firstIndex).get() - randomAmount);
这里存在两个主要问题:
因此,Future不应被用作直接存储和修改可变共享数据的容器。当我们需要一个可变整数数组并在多个线程中对其进行操作时,应该选择更合适的并发数据结构。
对于需要在多个线程间共享并进行修改的整数数组,我们应该避免使用Future列表。最直接的替代方案是使用List<Integer>或int[]。然而,ArrayList和普通数组本身都不是线程安全的。当多个线程同时对它们进行读写操作时,可能会发生竞态条件,导致数据不一致。
立即学习“Java免费学习笔记(深入)”;
为了确保线程安全,Java提供了多种并发工具。在本场景中,AtomicIntegerArray是管理整数数组并提供原子性操作的理想选择。
AtomicIntegerArray提供了对数组中每个元素的原子性操作,例如get()、set()、compareAndSet()、getAndAdd()等,这些操作能够保证在多线程环境下的数据一致性,避免了手动加锁的复杂性。
原始代码中,在提交第一批初始化任务后就调用了ex.shutdown():
// ... 初始化 elements 列表后 ex.shutdown(); // 过早关闭 ExecutorService // ... 之后尝试提交新的更新任务
ExecutorService一旦调用了shutdown()方法,它将不再接受新的任务,但会完成所有已提交的任务。如果过早调用shutdown(),后续需要提交的更新任务将无法被执行,通常会抛出RejectedExecutionException。
正确的做法是,在所有任务都提交完毕并且我们希望等待它们全部执行完成后,才调用shutdown()。为了确保所有任务都已完成,我们通常会结合使用shutdown()和awaitTermination()方法。awaitTermination()会阻塞当前线程,直到所有任务执行完毕或者达到指定的超时时间。
为了解决原始问题中的编译错误和潜在的并发问题,我们将采用AtomicIntegerArray并正确管理ExecutorService的生命周期。
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.concurrent.ThreadLocalRandom;
public class ConcurrentArrayUpdateTutorial {
public static void main(String[] args) throws InterruptedException {
// 1. 初始化 ExecutorService,用于管理并发任务
var ex = Executors.newFixedThreadPool(10);
// 2. 使用 AtomicIntegerArray 存储可变整数,提供线程安全
// AtomicIntegerArray 内部管理一个 int[],并为每个元素提供原子操作
AtomicIntegerArray elements = new AtomicIntegerArray(100);
for (int i = 0; i < 100; i++) {
elements.set(i, 1000); // 初始化每个元素为1000
}
// 计算初始总和
int initialSum = 0;
for (int i = 0; i < elements.length(); i++) {
initialSum += elements.get(i);
}
System.out.println("Initial sum: " + initialSum);
// 3. 提交大量并发更新任务
int numberOfUpdates = 10_000;
// 使用 CountDownLatch 等待所有更新任务完成
CountDownLatch latch = new CountDownLatch(numberOfUpdates);
for (int i = 0; i < numberOfUpdates; i++) {
ex.submit(() -> {
try {
int firstIndex = ThreadLocalRandom.current().nextInt(100);
// int secondIndex = ThreadLocalRandom.current().nextInt(100); // 原始问题中提及,但在逻辑中未被使用
int randomAmount = ThreadLocalRandom.current().nextInt(1000);
// 确保减法操作的原子性及条件判断
boolean updated = false;
while (!updated) {
int currentValue = elements.get(firstIndex); // 原子性获取当前值
// 检查是否有足够的量可以减去
if (currentValue >= randomAmount) {
// 使用 compareAndSet 尝试原子性地更新
// 如果 currentValue 在此期间被其他线程修改,compareAndSet 会失败,
// 循环会再次执行,获取新的 currentValue 并重试。
if (elements.compareAndSet(firstIndex, currentValue, currentValue - randomAmount)) {
updated = true; // 更新成功,退出循环
// 如果需要将 randomAmount 加到 secondIndex,可以在这里进行原子操作
// elements.getAndAdd(secondIndex, randomAmount);
}
} else {
// 当前值不足以减去 randomAmount,放弃本次更新
updated = true; // 退出循环
}
}
} finally {
latch.countDown(); // 任务完成,计数器减一
}
});
}
// 4. 等待所有任务完成
latch.await(); // 阻塞当前线程,直到所有任务都调用了 countDown()
ex.shutdown(); // 提交所有任务后,关闭 ExecutorService
// 等待 ExecutorService 中的所有任务真正终止
if (!ex.awaitTermination(5, TimeUnit.SECONDS)) {
System.err.println("Executor did not terminate in time. Forcing shutdown.");
ex.shutdownNow(); // 如果超时,则强制关闭
}
// 5. 计算最终总和
int finalSum = 0;
for (int i = 0; i < elements.length(); i++) {
finalSum += elements.get(i);
}
System.out.println("Final sum: " + finalSum);
}
}代码解析与注意事项:
通过遵循这些原则,可以编写出更健壮、更高效的Java并发应用程序。
以上就是Java并发编程:掌握Future、线程安全与原子操作的详细内容,更多请关注php中文网其它相关文章!
编程怎么学习?编程怎么入门?编程在哪学?编程怎么学才快?不用担心,这里为大家提供了编程速学教程(入门课程),有需要的小伙伴保存下载就能学习啦!
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号