首页 > Java > java教程 > 正文

Java多生产者多消费者模型:解决消费者无限等待导致的程序终止问题

心靈之曲
发布: 2025-11-28 15:03:02
原创
978人浏览过

Java多生产者多消费者模型:解决消费者无限等待导致的程序终止问题

本教程旨在解决java多生产者多消费者并发模型中一个常见问题:当生产者完成任务后,消费者线程仍无限期等待,导致程序无法正常终止。文章将深入分析问题根源,并提供一种通过为消费者设定明确的消费上限来优雅地结束所有线程,从而确保程序能够正确退出的解决方案,并附带详细的代码示例和注意事项。

1. 问题背景与分析

并发编程中,生产者-消费者模式是一种常见的设计模式,用于解决不同线程之间数据生产和消费的同步问题。通常,生产者负责生成数据并将其放入共享缓冲区,而消费者则从缓冲区取出数据进行处理。为了确保线程安全和资源有效利用,我们常常使用wait()和notify()(或notifyAll())机制进行线程间的协调。

然而,一个常见的问题是,当所有生产者都完成了它们的数据生产任务后,消费者线程可能会因为共享缓冲区为空而持续调用wait(),进入无限期等待状态,导致整个程序无法终止。这通常发生在消费者线程被设计为无限循环(while(true))以等待新数据,而没有明确的退出条件时。

在提供的代码示例中,Producer 类通过一个有限的循环 (for (int i = 1; i <= productionSize; i++)) 来控制生产总量,一旦达到 productionSize,生产者线程就会自然结束。然而,Consumer 类的 run() 方法中包含一个无限循环 (while (true)),这意味着消费者会一直尝试从共享队列中取出数据。当生产者完成所有生产任务后,队列最终会变空,此时消费者会调用 sharedQueue.wait() 并无限期等待下去,因为没有任何生产者会再次 notify() 它。

2. 解决方案:为消费者设定明确的消费上限

解决消费者无限等待问题的核心在于,为消费者线程提供一个明确的终止条件,使其在完成预定任务后能够自行退出,而不是无限期地等待。最直接的方法是像生产者一样,为消费者设定一个预期的消费总量。

立即学习Java免费学习笔记(深入)”;

2.1 修改 Consumer 类

我们需要在 Consumer 类中引入两个变量:

  • wants: 表示该消费者期望消费的总商品数量。
  • gets: 记录该消费者已经消费的商品数量。

然后,修改 run() 方法的循环条件,使其在 gets 达到 wants 时终止。

腾讯交互翻译
腾讯交互翻译

腾讯AI Lab发布的一款AI辅助翻译产品

腾讯交互翻译 183
查看详情 腾讯交互翻译

以下是修改后的 Consumer 类代码:

class Consumer implements Runnable {
    private List<Integer> sharedQueue;
    // 设定每个消费者期望消费的商品数量
    // 这个值应该根据总生产量和消费者数量来合理分配
    private static int wantsPerConsumer = 5; 
    private int gets = 0; // 记录当前消费者已消费的数量

    public Consumer(List<Integer> sharedQueue) {
        this.sharedQueue = sharedQueue;
    }

    @Override
    public void run() {
        // 当已消费数量达到期望值时,消费者线程终止
        while (gets < wantsPerConsumer) {
            try {
                consume();
                gets++; // 每成功消费一个商品,计数器加一
                Thread.sleep(100); // 模拟消费过程中的其他操作

            } catch (InterruptedException e) {
                Thread.currentThread().interrupt(); // 恢复中断状态
                System.err.println(Thread.currentThread().getName() + " was interrupted.");
                break; // 捕获中断异常后退出循环
            }
        }
        System.out.println(Thread.currentThread().getName() + " has finished consuming " + gets + " items and is terminating.");
    }

    private void consume() throws InterruptedException {
        synchronized (sharedQueue) {
            // 如果共享队列为空,消费者等待
            while (sharedQueue.isEmpty()) { // 使用 isEmpty() 更清晰
                // 在等待前检查是否已达到消费上限,如果已达到则直接退出
                // 这一步是为了防止在等待期间,其他消费者已经完成了任务,导致没有足够的商品可供当前消费者消费
                if (gets >= wantsPerConsumer) {
                    return; // 达到上限,直接返回,不再等待
                }
                System.out.println(Thread.currentThread().getName() + ", 队列为空, consumerThread正在等待producerThread生产, sharedQueue's size= 0");
                sharedQueue.wait();
            }

            Thread.sleep((long) (Math.random() * 2000));
            System.out.println(Thread.currentThread().getName() + ", CONSUMED : " + sharedQueue.remove(0));
            // 唤醒等待的生产者或消费者。在多生产者多消费者场景下,notifyAll() 更安全。
            // 但对于本例的特定终止逻辑,notify() 也能在大多数情况下工作。
            sharedQueue.notify();
        }
    }
}
登录后复制

2.2 确定 wantsPerConsumer 的值

wantsPerConsumer 的值需要根据总的生产量和系统中消费者线程的数量来确定。 在原代码中:

  • Producer.productionSize = 5 (每个生产者生产5个商品)。
  • 有两个生产者 (producer0, producer1)。
  • 总生产量 = 2 * productionSize = 2 * 5 = 10。
  • 有两个消费者 (consumer0, consumer1)。

因此,如果每个消费者平均分担消费任务,那么 wantsPerConsumer 应该设置为 总生产量 / 消费者数量 = 10 / 2 = 5。这与修改后的 Consumer 类中 wantsPerConsumer = 5 的设定相符。

3. 完整示例代码

下面是包含了修改后的 Consumer 类的完整代码示例:

import java.util.LinkedList;
import java.util.List;

// main class
public class MULTIPLE_ProducerConsumerWaitNotify {

    public static void main(String args[]) throws InterruptedException {
        List<Integer> sharedQueue = new LinkedList<>(); // Creating shared object

        // 设定每个生产者的生产数量
        int productionSizePerProducer = 5;
        // 设定生产者数量
        int numberOfProducers = 2;
        // 设定消费者数量
        int numberOfConsumers = 2;

        // 计算总生产量
        int totalProduction = productionSizePerProducer * numberOfProducers;
        // 计算每个消费者期望消费的数量
        // 假设任务平均分配,或者总生产量是消费者数量的倍数
        int wantsPerConsumer = totalProduction / numberOfConsumers;

        // 创建生产者线程
        Producer producer0 = new Producer(sharedQueue, 0, productionSizePerProducer);
        Thread producerThread0 = new Thread(producer0, "ProducerThread0");

        Producer producer1 = new Producer(sharedQueue, 1, productionSizePerProducer);
        Thread producerThread1 = new Thread(producer1, "ProducerThread1");

        // 创建消费者线程
        Consumer consumer0 = new Consumer(sharedQueue, wantsPerConsumer);
        Thread consumerThread0 = new Thread(consumer0, "ConsumerThread0");

        Consumer consumer1 = new Consumer(sharedQueue, wantsPerConsumer);
        Thread consumerThread1 = new Thread(consumer1, "ConsumerThread1");

        // 启动所有线程
        producerThread0.start();
        producerThread1.start();
        consumerThread0.start();
        consumerThread1.start();

        // 等待所有生产者线程完成
        producerThread0.join();
        producerThread1.join();
        System.out.println("All producers have finished their tasks.");

        // 在生产者完成任务后,可以考虑使用一个机制通知消费者
        // 例如,一个“毒丸”对象,或者在队列为空时,消费者通过计数判断是否退出
        // 当前的方案是消费者自行判断是否达到消费上限

        // 等待所有消费者线程完成
        consumerThread0.join();
        consumerThread1.join();
        System.out.println("All consumers have finished their tasks.");

        System.out.println("Program terminated successfully.");
    }
}

// Producer class
class Producer implements Runnable {
    private List<Integer> sharedQueue;
    private int maxSize = 4; // maximum number of products which sharedQueue can hold at a time.
    private int productionSize; // Total no of items to be produced by THIS producer
    int producerNo;

    public Producer(List<Integer> sharedQueue, int producerNo, int productionSize) {
        this.sharedQueue = sharedQueue;
        this.producerNo = producerNo;
        this.productionSize = productionSize;
    }

    @Override
    public void run() {
        for (int i = 1; i <= productionSize; i++) { // produce products.
            try {
                produce(i);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt(); // 恢复中断状态
                System.err.println(Thread.currentThread().getName() + " was interrupted.");
                break; // 捕获中断异常后退出循环
            }
        }
        System.out.println(Thread.currentThread().getName() + " has finished producing " + productionSize + " items and is terminating.");
    }

    private void produce(int i) throws InterruptedException {
        synchronized (sharedQueue) {
            // if sharedQuey is full wait until consumer consumes.
            while (sharedQueue.size() == maxSize) {
                System.out.println(Thread.currentThread().getName() + ", 队列已满, producerThread正在等待consumerThread消费, sharedQueue's size= " + maxSize);
                sharedQueue.wait();
            }

            // Bcz each producer must produce unique product
            // Ex= producer0 will produce 1-5  and producer1 will produce 6-10 in random order
            int producedItem = (this.productionSize * producerNo) + i;

            System.out.println(Thread.currentThread().getName() + " Produced : " + producedItem);
            sharedQueue.add(producedItem);
            Thread.sleep((long) (Math.random() * 1000));
            sharedQueue.notify(); // 唤醒等待的消费者
        }
    }
}

// Consumer class (Modified)
class Consumer implements Runnable {
    private List<Integer> sharedQueue;
    private int wantsToConsume; // 这个消费者期望消费的总数量
    private int gets = 0; // 记录当前消费者已消费的数量

    public Consumer(List<Integer> sharedQueue, int wantsToConsume) {
        this.sharedQueue = sharedQueue;
        this.wantsToConsume = wantsToConsume;
    }

    @Override
    public void run() {
        // 当已消费数量达到期望值时,消费者线程终止
        while (gets < wantsToConsume) {
            try {
                consume();
                // 只有在成功消费后才增加gets计数,防止因队列空等待而错误增加
                // consume()方法内部已处理等待,如果成功取出,则计数
                // 这里需要调整,consume()方法应返回是否成功消费,或者在consume()内部增加gets
                // 为简化,我们假设consume()方法执行到最后表示成功尝试消费(即使队列空等待了)
                // 更好的做法是consume()返回一个布尔值表示是否实际消费了,或者直接在同步块内增加gets
                // 这里我们选择在consume()成功返回后增加gets,但要确保consume()在没有实际消费时不会增加gets
                // 调整一下:gets在consume()方法内部,当成功从队列中移除元素时才增加。
                // 这样可以避免因wait()而导致gets增加但实际未消费的情况。
                // 考虑到目前的consume()设计,如果它成功执行到打印并移除元素,那么就可以认为消费成功。
                // 如果它因为队列为空而等待,那么就不会执行到移除元素,gets也不会增加。
                // 因此,将gets++放在这里是合理的,只要consume()在实际消费后才返回。
                // 但为了更严谨,我们可以在consume()内部返回实际是否消费了。
                // 或者,我们可以将gets++放在consume()方法内部,紧跟在sharedQueue.remove(0)之后。
                // 这里为了保持与原回答的逻辑一致,暂时保留在run()中,但需要理解其潜在的细微差别。
                // 实际消费的逻辑在consume()中,如果consume()内部成功移除,则gets++。
                // 为了避免重复计数或错误计数,我们将gets计数逻辑移入consume()方法中。

                // 移除这里的gets++,让consume()方法负责更新gets
                Thread.sleep(100); // 模拟消费过程中的其他操作

            } catch (InterruptedException e) {
                Thread.currentThread().interrupt(); // 恢复中断状态
                System.err.println(Thread.currentThread().getName() + " was interrupted.");
                break; // 捕获中断异常后退出循环
            }
        }
        System.out.println(Thread.currentThread().getName() + " has finished consuming " + gets + " items and is terminating.");
    }

    private void consume() throws InterruptedException {
        synchronized (sharedQueue) {
            // 如果共享队列为空,消费者等待
            while (sharedQueue.isEmpty()) {
                // 在等待前检查是否已达到消费上限,如果已达到则直接退出
                // 这一步非常关键,防止消费者在队列为空时无限等待,即使它已经完成了所有消费任务
                if (gets >= wantsToConsume) {
                    return; // 达到上限,直接返回,不再等待
                }
                System.out.println(Thread.currentThread().getName() + ", 队列为空, consumerThread正在等待producerThread生产, sharedQueue's size= 0");
                sharedQueue.wait();
            }

            // 成功从队列中移除元素,表示一次有效消费
            Thread.sleep((long) (Math.random() * 2000));
            System.out.println(Thread.currentThread().getName() + ", CONSUMED : " + sharedQueue.remove(0));
            gets++; // 只有在实际消费后才增加计数
            sharedQueue.notify(); // 唤醒等待的生产者
        }
    }
}
登录后复制

重要更新: 在上述 Consumer 类中,gets++ 的位置从 run() 方法中移到了 consume() 方法的 synchronized 块内部,紧跟在 sharedQueue.remove(0) 之后。这是为了确保只有在实际成功消费(即从队列中取出一个元素)之后,gets 计数器才会被递增,从而更准确地反映消费情况并避免潜在的计数错误。同时,在 while (sharedQueue.isEmpty()) 循环内部增加了对 gets >= wantsToConsume 的检查,这确保了即使在等待状态下,如果消费者已经满足了其消费数量,它也能及时退出等待,不再被无谓地唤醒。

4. 注意事项与总结

  1. 总消费量与总生产量匹配:确保所有消费者期望消费的总量与所有生产者生产的总量相匹配。如果不匹配,可能会导致部分商品未被消费,或者部分消费者因无商品可消费而继续等待(如果它们没有明确的退出条件)。
  2. notify() vs notifyAll():在多生产者多消费者的复杂场景中,通常推荐使用 notifyAll() 来唤醒所有等待的线程,让它们重新评估条件。notify() 只唤醒一个随机等待的线程,可能导致“惊群效应”或“死锁”风险(例如,唤醒了一个生产者,但队列已满,而真正需要被唤醒的消费者仍在等待)。在本例中,由于我们为消费者设定了明确的退出条件,notify() 也能工作,但作为最佳实践,notifyAll() 更为健壮。
  3. 中断处理:在 try-catch 块中捕获 InterruptedException 时,最佳实践是调用 Thread.currentThread().interrupt() 来重新设置中断状态,并通常选择退出循环或线程,以便外部代码能够感知到中断请求。
  4. 优雅退出:通过为消费者设定明确的消费上限,我们实现了程序的优雅退出。当所有生产者完成生产,所有消费者也完成了它们的消费任务后,所有线程都会自然终止,main 方法中的 join() 调用将确保程序在所有工作线程结束后才最终退出。
  5. “毒丸”机制(Poison Pill):除了设定消费上限,另一种常用的优雅退出机制是“毒丸”对象。当生产者完成所有任务后,可以向队列中放入一个特殊的“毒丸”对象。消费者在取出对象时,如果识别到是“毒丸”,就处理完队列中剩余的正常数据后,自己也终止,并可能将“毒丸”传递给下一个消费者(如果存在多个消费者)。这种方法在总生产量不确定或需要更灵活的终止策略时非常有用。

通过上述修改,我们成功解决了Java多生产者多消费者模型中消费者无限等待导致程序无法终止的问题,确保了并发程序的健壮性和正确性。

以上就是Java多生产者多消费者模型:解决消费者无限等待导致的程序终止问题的详细内容,更多请关注php中文网其它相关文章!

最佳 Windows 性能的顶级免费优化软件
最佳 Windows 性能的顶级免费优化软件

每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新 English
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号