
1. 问题解析:Future的误用与类型不匹配
在java并发编程中,java.util.concurrent.future接口代表一个异步计算的结果。它允许我们检查计算是否完成、等待计算完成以及获取计算结果。然而,将list
原始代码片段中,elements被声明为List
List> elements = new ArrayList<>(); // ... elements.set(firstIndex, elements.get(firstIndex).get() - randomAmount);
这里存在两个核心问题:
-
类型不匹配:List.set(int index, E element)方法期望第二个参数的类型与列表的泛型类型E一致。在List
>的上下文中,这意味着set方法期望传入一个Future 对象。然而,表达式elements.get(firstIndex).get() - randomAmount的结果是一个原始的int类型值(或者装箱后的Integer)。尝试将一个int类型的值直接赋给一个Future 类型的槽位,会导致编译错误,提示set > is not applicable to arguments (int,int)。 - Future的不可变性:Future对象一旦创建并关联到某个异步任务,其所代表的计算结果在任务完成后是不可变的。你不能通过Future接口的方法来“修改”它所持有的值。Future的get()方法只是返回计算完成时的结果,而不是提供一个可修改的引用。如果需要修改数据,那么Future本身不是一个合适的容器。
此外,初始代码在填充elements列表时,即使是简单的初始值1000,也通过ex.submit(() -> { int val = 1000; return val; })来获取Future。这种做法对于静态初始值而言是过度设计,且没有必要。如果只是为了存储初始值,直接使用List
2. ExecutorService生命周期管理
另一个关键问题是ExecutorService的生命周期管理。在原始代码中,首次提交完100个初始化任务后,立即调用了ex.shutdown():
立即学习“Java免费学习笔记(深入)”;
// ...
for (int i = 0; i < 100; i++) {
elements.add(ex.submit(() -> { /* ... */ }));
}
ex.shutdown(); // 过早关闭
// ...
for (int i = 0; i < 10_000; i++) {
ex.submit(() -> { /* ... */ }); // 此处会因ExecutorService已关闭而失败
}ExecutorService.shutdown()方法会平缓地关闭线程池,不再接受新的任务提交,但会等待已提交任务执行完成。如果在此之后尝试提交新任务(如后续的10,000个转账任务),将会抛出RejectedExecutionException。正确的做法是,只有在所有任务都已提交且不再需要线程池时,才调用shutdown()。对于本例,shutdown()应该在所有转账任务提交完毕之后再调用。
3. 正确的共享数据结构选择与并发修改
鉴于上述问题,如果目标是存储和修改一组整数值,并允许多线程并发访问,我们需要选择一个合适的并发数据结构。直接使用ArrayList
3.1 解决方案:使用 AtomicIntegerArray
java.util.concurrent.atomic.AtomicIntegerArray是Java并发包提供的一个高效且线程安全的数组。它内部的每个元素都是一个AtomicInteger,支持原子性的读取、写入、更新等操作,无需显式使用synchronized关键字或锁。这非常适合本例中对数组中特定索引位置的整数进行并发修改的需求。
AtomicIntegerArray的优势:
- 原子性操作:getAndSet(), addAndGet(), compareAndSet()等方法保证了对单个元素的读-改-写操作是原子性的,有效避免了竞态条件。
- 性能优越:相比于使用synchronized块对整个ArrayList进行锁定,AtomicIntegerArray通常在并发性能上表现更好,因为它利用了底层的CAS(Compare-And-Swap)指令。
3.2 示例代码:使用AtomicIntegerArray实现并发转账
以下是修正后的代码,它使用AtomicIntegerArray来存储和修改元素,并正确管理了ExecutorService的生命周期:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerArray;
public class ConcurrentMoneyTransfer {
public static void main(String[] args) throws InterruptedException {
// 1. 初始化 ExecutorService
// 使用固定大小的线程池,例如10个线程
ExecutorService ex = Executors.newFixedThreadPool(10);
// 2. 使用 AtomicIntegerArray 存储可变整数值
// 初始包含100个元素,每个元素值为1000
AtomicIntegerArray elements = new AtomicIntegerArray(100);
for (int i = 0; i < 100; i++) {
elements.set(i, 1000);
}
// 计算初始总和
int initialSum = 0;
for (int i = 0; i < elements.length(); i++) {
initialSum += elements.get(i);
}
System.out.println("Initial sum: " + initialSum); // 预期输出 100 * 1000 = 100000
// 3. 提交并发转账任务
// 模拟10,000次转账操作
for (int i = 0; i < 10_000; i++) {
ex.submit(() -> {
int firstIndex = ThreadLocalRandom.current().nextInt(100);
int secondIndex = ThreadLocalRandom.current().nextInt(100); // 尽管未使用,保留原意
int randomAmount = ThreadLocalRandom.current().nextInt(1000);
// 原子性地获取并检查余额
int currentFirstValue = elements.get(firstIndex);
if (currentFirstValue - randomAmount >= 0) { // 确保余额足够
// 原子性地减少第一个账户的金额
elements.getAndAdd(firstIndex, -randomAmount);
// 如果有转入操作,也需要原子性增加
// elements.getAndAdd(secondIndex, randomAmount); // 如果需要转入,这里可以添加
}
});
}
// 4. 关闭 ExecutorService
// 等待所有提交的任务完成
ex.shutdown();
// 设置一个超时,防止无限等待
if (!ex.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("ExecutorService did not terminate in time.");
ex.shutdownNow(); // 强制关闭
}
// 5. 计算最终总和
int finalSum = 0;
for (int i = 0; i < elements.length(); i++) {
finalSum += elements.get(i);
}
System.out.println("Final sum: " + finalSum); // 预期输出小于等于 initialSum
}
}代码说明:
- AtomicIntegerArray elements = new AtomicIntegerArray(100);: 创建了一个长度为100的AtomicIntegerArray,其所有元素默认初始化为0。
- elements.set(i, 1000);: 在初始化阶段,为每个索引设置初始值。set()方法是原子性的。
- elements.get(firstIndex): 原子性地获取指定索引的值。
- elements.getAndAdd(firstIndex, -randomAmount): 这是关键的原子操作。它会先获取firstIndex位置的当前值,然后将该值与-randomAmount相加,并将结果原子性地写回该位置。这个操作保证了在多线程环境下,减少金额的过程不会被中断,避免了竞态条件。
- ex.shutdown() 和 ex.awaitTermination(): 确保在所有任务提交并执行完成后,再安全地关闭线程池,防止资源泄露。awaitTermination提供了一个等待机制和超时处理。
4. 注意事项与最佳实践
- 明确Future的职责:Future主要用于获取异步任务的结果,它不是一个可变的容器。如果需要存储和修改数据,应选择其他数据结构。
-
选择合适的并发数据结构:
- 对于原子性地修改单个变量,使用AtomicInteger、AtomicLong、AtomicReference等。
- 对于原子性地修改数组中的元素,使用AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray。
- 对于更复杂的集合操作,考虑java.util.concurrent包下的并发集合类,如ConcurrentHashMap、CopyOnWriteArrayList等。
- 对于需要更细粒度控制的同步,可以使用synchronized块或java.util.concurrent.locks包下的锁(如ReentrantLock)。
-
合理管理线程池生命周期:
- ExecutorService应该在所有任务提交完毕且不再需要时才关闭。
- 使用shutdown()启动平缓关闭,并结合awaitTermination()等待任务完成。
- 如果需要立即停止所有任务,可以使用shutdownNow(),但要注意处理未完成任务。
- 警惕竞态条件:任何时候,当多个线程访问和修改共享的可变状态时,都必须采取适当的同步措施来防止竞态条件和数据不一致。
5. 总结
在Java并发编程中,理解Future对象的本质及其不可变性至关重要。将其误用作可变数据容器不仅会导致编译错误,还会混淆并发模型。对于需要多线程并发修改的共享数据,应选择AtomicIntegerArray等原子类或并发集合,它们提供了高效且线程安全的机制。同时,正确管理ExecutorService的生命周期,确保在所有任务提交和执行完成后再关闭线程池,是编写健壮并发程序的关键。通过遵循这些原则,可以有效地避免常见的并发陷阱,构建出高性能、高可靠性的并发应用。











