
在并发编程中,不当的线程管理方式常常导致意想不到的问题,例如线程“卡住”或资源利用率低下。原始方法中存在几个核心缺陷:
Java中的Thread.start()方法只能被调用一次。一旦线程的run()方法执行完毕,线程就进入了死亡状态,不能再次通过start()方法启动。尝试对一个已死亡的线程调用start()会抛出IllegalThreadStateException。
在原始实现中,当一个线程运行1分钟后,它会设置threads[thread_num] = false并返回,这意味着该线程对象已完成其生命周期。onMessage方法随后检查threads[0] == false,如果为真,则尝试再次调用thread0.start()。这正是导致线程无法重新启动,最终只有少数(或一个)线程能够持续运行的根本原因。
原始线程的run()方法中包含以下逻辑:
while(System.currentTimeMillis() < endThreadTime) {
if(!global.isEmpty()) {
recordToUse = global.remove();
System.out.println("Successful removal: Thread-"+ thread_num);
} else {
continue; // If the queue is empty, keep checking until it is not empty.
}
// ... more operations
}当global队列为空时,线程会进入一个紧密的continue循环,不断检查队列是否为空,而不释放CPU资源。这种“忙等待”机制会极大地消耗CPU,尤其是在消息不频繁的场景下,导致系统性能下降和不必要的能源消耗。
立即学习“Java免费学习笔记(深入)”;
虽然原始描述中没有明确指出global队列的具体类型,但如果它是一个非线程安全的集合(如ArrayList或LinkedList),在多个线程同时进行add()和remove()操作时,将可能导致数据不一致、丢失或ConcurrentModificationException。即使使用了ConcurrentLinkedQueue,上述线程生命周期和忙等待的问题依然存在。
将线程设置为固定运行1分钟后自动终止,并试图通过外部逻辑重新启动,这是一种复杂的且易出错的模式。对于需要持续处理任务的场景,线程应被设计为长期运行,并在有任务时处理,无任务时等待。
为了解决上述问题,最佳实践是采用生产者-消费者模式,并利用Java并发包中提供的java.util.concurrent.BlockingQueue。这种模式能够优雅地处理并发任务,确保线程安全、高效和可维护性。
我们将使用LinkedBlockingQueue作为消息队列,并利用ExecutorService来管理消费者线程。
首先,定义一个共享的BlockingQueue来存储消息。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class MessageProcessor {
// 使用LinkedBlockingQueue作为消息队列,它是一个无界队列,也可以指定容量
private static final BlockingQueue<String> messageQueue = new LinkedBlockingQueue<>();
// 定义一个特殊的“毒丸”消息,用于通知消费者线程退出
public static final String POISON_PILL = "POISON_PILL_SHUTDOWN";
// ... 其他组件
}onMessage方法现在只负责将消息放入队列,无需关心消费者线程的状态。
// 在你的Spring Boot服务中
public class BusinessService {
public static void onMessage(String record) {
try {
messageQueue.put(record); // 使用put()方法,队列满时会阻塞
System.out.println("Producer added message: " + record);
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 重新设置中断状态
System.err.println("Producer was interrupted while adding message: " + e.getMessage());
}
}
// ... 其他业务逻辑
}创建一个Runnable实现,作为消费者线程的任务。这些线程将从BlockingQueue中取出消息并处理。
import java.util.concurrent.BlockingQueue;
public class ConsumerWorker implements Runnable {
private final BlockingQueue<String> queue;
private final int workerId;
public ConsumerWorker(BlockingQueue<String> queue, int workerId) {
this.queue = queue;
this.workerId = workerId;
}
@Override
public void run() {
System.out.println("ConsumerWorker-" + workerId + " started.");
try {
while (true) {
String record = queue.take(); // 队列为空时,线程会在此阻塞
// 检查是否是“毒丸”消息,用于优雅关闭
if (MessageProcessor.POISON_PILL.equals(record)) {
System.out.println("ConsumerWorker-" + workerId + " received poison pill, shutting down.");
break; // 退出循环,线程结束
}
System.out.println("ConsumerWorker-" + workerId + " successfully removed and processing: " + record);
// 模拟消息处理时间
Thread.sleep(50 + (long)(Math.random() * 100)); // 模拟处理耗时
// 这里可以添加更多对消息进行的操作
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 重新设置中断状态
System.err.println("ConsumerWorker-" + workerId + " was interrupted: " + e.getMessage());
} finally {
System.out.println("ConsumerWorker-" + workerId + " stopped.");
}
}
}使用ExecutorService来管理和运行消费者线程,这是推荐的Java并发实践。它提供了线程复用、统一管理和优雅关闭的能力。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ApplicationRunner {
private static ExecutorService executorService;
private static final int NUM_CONSUMERS = 8; // 定义消费者线程的数量
public static void main(String[] args) throws InterruptedException {
// 初始化线程池
executorService = Executors.newFixedThreadPool(NUM_CONSUMERS);
// 启动消费者线程
for (int i = 0; i < NUM_CONSUMERS; i++) {
executorService.submit(new ConsumerWorker(MessageProcessor.messageQueue, i));
}
System.out.println(NUM_CONSUMERS + " consumer workers started.");
// 模拟生产者不断生成消息
for (int i = 0; i < 100; i++) {
BusinessService.onMessage("Message-" + i);
Thread.sleep(50); // 模拟消息产生间隔
}
// 模拟运行一段时间后,进行优雅关闭
Thread.sleep(5000); // 让生产者继续产生一些消息
// 优雅关闭:发送“毒丸”消息给每个消费者
System.out.println("Initiating graceful shutdown...");
for (int i = 0; i < NUM_CONSUMERS; i++) {
MessageProcessor.messageQueue.put(MessageProcessor.POISON_PILL);
}
// 关闭ExecutorService
shutdownAndAwaitTermination(executorService);
}
// 优雅关闭ExecutorService的方法
private static void shutdownAndAwaitTermination(ExecutorService pool) {
pool.shutdown(); // 启动有序关闭,不再接受新任务
try {
// 等待所有任务执行完毕,最多等待60秒
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
pool.shutdownNow(); // 立即关闭,尝试停止所有正在执行的任务
// 再次等待,以防万一
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
System.err.println("Pool did not terminate");
}
} catch (InterruptedException ie) {
// (重新)取消
pool.shutdownNow();
// 保留中断状态
Thread.currentThread().interrupt();
}
}
}通过采用BlockingQueue和ExecutorService,我们构建了一个健壮、高效且易于管理的生产者-消费者系统,解决了原始方法中线程管理不当和资源浪费的问题。
通过上述改进,您的应用程序将能够更稳定、高效地处理并发消息,避免了因线程管理不当而产生的各种问题。
以上就是正确管理和重启Java线程:基于BlockingQueue的生产者-消费者模型的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号