Java实现生产者消费者模型应优先使用BlockingQueue,因其天然线程安全且避免唤醒丢失、虚假唤醒等问题;需高级控制时才用ReentrantLock+Condition。

Java 中实现生产者消费者模型,核心不是手写 wait/notify,而是优先用 java.util.concurrent 包里现成的线程安全组件——它们已解决唤醒丢失、虚假唤醒、锁粒度、公平性等底层问题。
用 BlockingQueue 实现最简可靠版本
这是最推荐的起点:避免自己管理锁和条件变量,BlockingQueue 的 put() 和 take() 方法天然阻塞且线程安全。
-
ArrayBlockingQueue:固定容量、基于数组、可选公平策略,适合容量明确、注重吞吐或响应公平性的场景 -
LinkedBlockingQueue:默认无界(实际是Integer.MAX_VALUE),注意 OOM 风险;有界构造时性能略低于ArrayBlockingQueue -
SynchronousQueue:不存储元素,每个put()必须等待配对take(),适合“直传”型任务交接,常用于线程池的DirectHandoff
class Producer implements Runnable {
private final BlockingQueue queue;
Producer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
for (int i = 0; i < 5; i++) {
String item = "item-" + i;
System.out.println("Produced: " + item);
queue.put(item); // 自动阻塞直到有空间
Thread.sleep(100);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
class Consumer implements Runnable {
private final BlockingQueue queue;
Consumer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
String item = queue.take(); // 自动阻塞直到有元素
System.out.println("Consumed: " + item);
if (item.equals("item-4")) break; // 简单退出条件
Thread.sleep(150);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
为什么不用 wait()/notify() 手写?
除非教学或特殊调度逻辑,否则手写极易出错:
- 必须在
synchronized块内调用,否则抛IllegalMonitorStateException -
notify()可能唤醒错误线程(比如多个生产者+多个消费者共用一把锁),应优先用notifyAll() - 必须用
while而非if检查条件,否则遭遇虚假唤醒(spurious wakeup)会直接跳过判断导致逻辑崩溃 - 无法控制唤醒顺序,容易造成线程饥饿(如生产者一直抢到锁,消费者永远等不到)
需要自定义逻辑时:用 ReentrantLock + Condition
当需分离「生产就绪」和「消费就绪」两个等待队列,或需尝试获取、超时获取、中断响应等高级行为时,才考虑该组合。
立即学习“Java免费学习笔记(深入)”;
- 一个
Lock对应多个Condition:例如notFull和notEmpty,避免notifyAll()唤醒无关线程 -
awaitNanos(long)或awaitUntil(Date)支持超时,比wait(long)更灵活 -
lockInterruptibly()让阻塞中的线程能响应中断,比synchronized更可控
class BoundedBuffer{ private final ReentrantLock lock = new ReentrantLock(); private final Condition notFull = lock.newCondition(); private final Condition notEmpty = lock.newCondition(); private final Object[] items; private int putIndex, takeIndex, count; BoundedBuffer(int capacity) { this.items = new Object[capacity]; } public void put(T x) throws InterruptedException { lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); // 等待非满 items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; ++count; notEmpty.signal(); // 唤醒一个消费者 } finally { lock.unlock(); } } @SuppressWarnings("unchecked") public T take() throws InterruptedException { lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); // 等待非空 Object x = items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; --count; notFull.signal(); // 唤醒一个生产者 return (T) x; } finally { lock.unlock(); } } }
真正难的从来不是写出能跑的代码,而是让边界条件(如中断、容量耗尽、多线程竞争、JVM 优化重排序)下的行为完全可预期。用好 BlockingQueue 就规避了 90% 的坑;真要手写,ReentrantLock + Condition 是唯一值得投入精力的底层路径。










