
在Java中,线程的停止并非强制性的,而是通过一种“合作”机制实现。当一个线程被请求中断时(例如,通过调用Thread.interrupt()方法),它并不会立即停止执行。相反,Java虚拟机只是设置了该线程的一个“中断标志”(interrupted status)。线程需要周期性地检查这个标志,并根据其状态决定是否终止当前操作、清理资源并退出。
线程检查中断标志的常用方法有两种:
此外,当线程在执行某些阻塞操作(如Thread.sleep(), Object.wait(), BlockingQueue.take()等)时被中断,这些操作会抛出InterruptedException。捕获此异常是处理中断请求的另一种重要方式。
ExecutorService是Java并发包中管理线程池的核心接口。当需要停止ExecutorService中的所有任务时,通常会想到使用shutdownNow()方法。
executor.shutdownNow()方法的作用是:
然而,shutdownNow()的副作用是它会彻底关闭ExecutorService,使其无法再接受新的任务提交。 这与用户提出的“我不想关闭executor(我不能再提交线程)”的需求相悖。如果业务场景需要ExecutorService在处理完当前一批任务后,仍然能够继续处理后续的任务批次,那么shutdownNow()并非合适的解决方案。
为了在不关闭整个ExecutorService的前提下,实现对特定任务的选择性中断或取消,我们需要利用ExecutorService.submit()方法返回的Future对象。
当您通过executor.submit(Runnable task)或executor.submit(Callable<T> task)提交任务时,ExecutorService会返回一个Future对象。这个Future对象代表了异步计算的结果,同时也提供了控制任务执行的方法,其中最关键的就是cancel()方法。
Future.cancel(boolean mayInterruptIfRunning)方法:
因此,为了实现超时后取消特定任务组的需求,我们应该在CountDownLatch超时时,遍历该组任务对应的Future对象,并调用cancel(true)。
以下是基于用户原始代码的改进版本,演示了如何在CountDownLatch超时后,取消该批次中尚未完成的任务,同时保持ExecutorService的可用性:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class ExecutorServiceTaskCancellation {
// 模拟一个执行耗时任务的方法
private static void doTask(Object obj) {
try {
System.out.println(Thread.currentThread().getName() + " - 开始处理: " + obj);
// 模拟任务执行,并定期检查中断状态
for (int i = 0; i < 10; i++) {
if (Thread.interrupted()) { // 检查中断标志
System.out.println(Thread.currentThread().getName() + " - 任务 " + obj + " 被中断,提前退出。");
return; // 响应中断,退出任务
}
Thread.sleep(500); // 模拟耗时操作
}
System.out.println(Thread.currentThread().getName() + " - 完成处理: " + obj);
} catch (InterruptedException e) {
// 捕获InterruptedException,同样表示中断
System.out.println(Thread.currentThread().getName() + " - 任务 " + obj + " 捕获到InterruptedException,提前退出。");
// 重新设置中断标志,因为捕获InterruptedException会清除中断标志
Thread.currentThread().interrupt();
}
}
public static void main(String[] args) {
// 创建一个固定大小的线程池,例如5个线程
ExecutorService executor = Executors.newFixedThreadPool(5);
// 模拟一个大的对象列表
List<String> objectList = new ArrayList<>();
for (int i = 0; i < 20; i++) {
objectList.add("Task-" + (i + 1));
}
// 将对象列表分成每组5个
// 假设这里使用Guava的Lists.partition,实际项目中请引入Guava库
// 或者自己实现分区逻辑
List<List<String>> objectGroups = partitionList(objectList, 5);
int groupCount = 0;
for (List<String> eachGroup : objectGroups) {
groupCount++;
System.out.println("\n--- 开始处理第 " + groupCount + " 组任务 ---");
CountDownLatch latch = new CountDownLatch(eachGroup.size());
List<Future<?>> futures = new ArrayList<>(); // 存储当前组的所有Future对象
for (String obj : eachGroup) {
Future<?> future = executor.submit(() -> {
try {
doTask(obj);
} finally {
latch.countDown(); // 无论任务成功、失败或中断,都减少计数
}
});
futures.add(future); // 将Future添加到列表中
}
try {
// 等待当前组任务完成,最长等待15分钟
if (!latch.await(15, TimeUnit.SECONDS)) { // 将15分钟改为15秒方便测试
System.out.println("警告:第 " + groupCount + " 组任务在15秒内未能全部完成,尝试取消未完成任务。");
// 超时发生,尝试取消所有尚未完成的任务
for (Future<?> future : futures) {
if (!future.isDone()) { // 检查任务是否已经完成
boolean cancelled = future.cancel(true); // 尝试中断任务
System.out.println(" - 尝试取消任务: " + (cancelled ? "成功" : "失败或已完成") + " for " + future);
}
}
} else {
System.out.println("第 " + groupCount + " 组任务全部完成。");
}
} catch (InterruptedException e) {
System.out.println("等待第 " + groupCount + " 组任务时被中断: " + e.getMessage());
Thread.currentThread().interrupt(); // 重新设置中断标志
}
}
// 所有组处理完毕后,优雅地关闭ExecutorService
System.out.println("\n所有任务组处理完毕,准备关闭ExecutorService。");
executor.shutdown(); // 阻止新任务提交,等待已提交任务完成
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("ExecutorService未能及时关闭,尝试强制关闭。");
executor.shutdownNow(); // 强制关闭
}
} catch (InterruptedException e) {
System.err.println("关闭ExecutorService时被中断: " + e.getMessage());
executor.shutdownNow(); // 捕获中断异常时,也强制关闭
Thread.currentThread().interrupt();
}
System.out.println("ExecutorService已关闭。");
}
// 模拟Lists.partition方法,实际项目中可使用Guava库
private static <T> List<List<T>> partitionList(List<T> list, int size) {
List<List<T>> partitions = new ArrayList<>();
if (list == null || list.isEmpty() || size <= 0) {
return partitions;
}
for (int i = 0; i < list.size(); i += size) {
partitions.add(new ArrayList<>(list.subList(i, Math.min(i + size, list.size())));
}
return partitions;
}
}上述示例的关键在于doTask方法内部对中断的响应。一个可中断的任务应该:
在doTask示例中,我们展示了这两种处理方式:通过Thread.interrupted()在循环中检查,以及通过try-catch InterruptedException处理阻塞操作。
在ExecutorService中停止任务是一个常见的需求,但理解其背后的Java线程中断机制至关重要。直接使用shutdownNow()虽然能中断所有任务,但会使ExecutorService无法复用。通过存储Future对象并在需要时调用future.cancel(true),我们可以实现对特定任务的选择性中断,同时保持ExecutorService的活性。最重要的是,任务代码本身必须是“中断友好”的,即能够主动检查中断状态并响应中断信号,这是实现优雅任务取消的基石。
以上就是在ExecutorService中实现可控的任务中断与取消的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号