BlockingQueue简化了生产者消费者模式的实现,其核心是通过线程安全的阻塞队列自动处理同步与通信。使用ArrayBlockingQueue等实现可避免手动控制wait/notify的复杂性,提升代码健壮性。常见实现包括有界固定容量的ArrayBlockingQueue、高吞吐的LinkedBlockingQueue、支持优先级的PriorityBlockingQueue、延迟执行的DelayQueue及无缓冲的SynchronousQueue,各自适用于不同场景。需注意优雅停机(如“毒丸”机制)、中断处理、队列容量选择与监控,以确保系统稳定高效。

在Java中,利用
BlockingQueue
BlockingQueue
要实现生产者消费者模式,我们通常会定义一个共享的
BlockingQueue
BlockingQueue
put()
take()
下面是一个基础的Java代码示例:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
// 产品类
class Data {
private int id;
private String content;
public Data(int id, String content) {
this.id = id;
this.content = content;
}
@Override
public String toString() {
return "Data{" +
"id=" + id +
", content='" + content + '\'' +
'}';
}
}
// 生产者
class Producer implements Runnable {
private final BlockingQueue<Data> queue;
private volatile boolean isRunning = true; // 控制生产者的运行状态
private static final AtomicInteger counter = new AtomicInteger(0);
public Producer(BlockingQueue<Data> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (isRunning) {
int id = counter.incrementAndGet();
Data data = new Data(id, "Product-" + id);
// 模拟生产耗时
TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(100, 500));
if (queue.offer(data, 2, TimeUnit.SECONDS)) { // 尝试在2秒内放入,避免无限阻塞
System.out.println(Thread.currentThread().getName() + " 生产了: " + data);
} else {
System.out.println(Thread.currentThread().getName() + " 生产失败,队列已满或超时。");
}
}
} catch (InterruptedException e) {
System.err.println(Thread.currentThread().getName() + " 生产者被中断。");
Thread.currentThread().interrupt(); // 重新设置中断标志
} finally {
System.out.println(Thread.currentThread().getName() + " 生产者停止运行。");
}
}
public void stop() {
isRunning = false;
}
}
// 消费者
class Consumer implements Runnable {
private final BlockingQueue<Data> queue;
private volatile boolean isRunning = true; // 控制消费者的运行状态
public Consumer(BlockingQueue<Data> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (isRunning || !queue.isEmpty()) { // 当isRunning为false时,也要把队列中剩余的消费完
Data data = queue.poll(2, TimeUnit.SECONDS); // 尝试在2秒内取出,避免无限阻塞
if (data != null) {
// 模拟消费耗时
TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(300, 800));
System.out.println(Thread.currentThread().getName() + " 消费了: " + data);
} else if (!isRunning) { // 如果已经停止运行且队列为空,则退出
System.out.println(Thread.currentThread().getName() + " 队列为空,且消费者已停止接收新任务。");
break;
}
}
} catch (InterruptedException e) {
System.err.println(Thread.currentThread().getName() + " 消费者被中断。");
Thread.currentThread().interrupt();
} finally {
System.out.println(Thread.currentThread().getName() + " 消费者停止运行。");
}
}
public void stop() {
isRunning = false;
}
}
public class ProducerConsumerDemo {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Data> queue = new ArrayBlockingQueue<>(10); // 容量为10的阻塞队列
Producer producer1 = new Producer(queue);
Producer producer2 = new Producer(queue);
Consumer consumer1 = new Consumer(queue);
Consumer consumer2 = new Consumer(queue);
Thread p1Thread = new Thread(producer1, "生产者-A");
Thread p2Thread = new Thread(producer2, "生产者-B");
Thread c1Thread = new Thread(consumer1, "消费者-X");
Thread c2Thread = new Thread(consumer2, "消费者-Y");
p1Thread.start();
p2Thread.start();
c1Thread.start();
c2Thread.start();
// 运行一段时间后,尝试停止生产者
TimeUnit.SECONDS.sleep(10);
System.out.println("\n--- 停止生产者 ---");
producer1.stop();
producer2.stop();
// 等待生产者线程结束
p1Thread.join();
p2Thread.join();
// 确保所有产品被消费后,再停止消费者
// 实际场景可能需要更复杂的判断,例如通过“毒丸”或计数器
while (!queue.isEmpty()) {
System.out.println("等待队列清空,当前大小: " + queue.size());
TimeUnit.SECONDS.sleep(1);
}
System.out.println("\n--- 停止消费者 ---");
consumer1.stop();
consumer2.stop();
// 等待消费者线程结束
c1Thread.join();
c2Thread.join();
System.out.println("\n所有线程已停止,程序结束。");
}
}这段代码展示了如何通过
ArrayBlockingQueue
offer()
poll()
put()
take()
立即学习“Java免费学习笔记(深入)”;
说实话,我个人觉得,当你第一次接触
wait()
notify()
notifyAll()
synchronized
notify()
notifyAll()
BlockingQueue
put()
take()
synchronized
wait()
notify()
java.util.concurrent
BlockingQueue
BlockingQueue
ArrayBlockingQueue
ArrayBlockingQueue
LinkedBlockingQueue
Integer.MAX_VALUE
ArrayBlockingQueue
LinkedBlockingQueue
PriorityBlockingQueue
Comparable
Comparator
PriorityBlockingQueue
DelayQueue
Delayed
SynchronousQueue
put
take
SynchronousQueue
ThreadPoolExecutor
选择哪种
BlockingQueue
虽然
BlockingQueue
一个比较常见的挑战是优雅停机。生产者和消费者线程可能需要在一个合适的时间点停止运行。如果只是简单地中断线程,可能会导致队列中还有未处理的数据被丢弃。一个常见的做法是使用“毒丸”(Poison Pill)机制。当生产者决定停止生产时,它不是直接退出,而是向队列中放入一个特殊的“毒丸”对象(比如一个表示终止的特殊
Data
另一个需要考虑的是异常处理。
put()
take()
InterruptedException
Thread.currentThread().interrupt()
队列容量的选择也至关重要。对于有界队列(如
ArrayBlockingQueue
LinkedBlockingQueue
在性能方面,尽管
BlockingQueue
LinkedBlockingQueue
ArrayBlockingQueue
最后,监控也是一个容易被忽视但非常重要的环节。我们应该考虑如何监控
BlockingQueue
以上就是如何在Java中使用BlockingQueue实现生产者消费者模式的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号