0

0

Java并发消息发送系统中的会话管理与wait/notify机制深度解析

霞舞

霞舞

发布时间:2025-11-11 17:15:01

|

671人浏览过

|

来源于php中文网

原创

Java并发消息发送系统中的会话管理与wait/notify机制深度解析

java并发消息发送系统中的会话管理与`wait`/`notify`机制深度解析。本文将探讨如何利用java的`wait`/`notify`机制在多线程环境中实现短信批量发送与会话重连。我们将分析常见的同步问题,特别是因不当的`isempty()`检查和共享资源访问导致的`arrayindexoutofboundsexception`,并提供正确同步共享资源和管理线程状态的策略,以构建健壮的并发操作。

1. 引言:并发消息发送与会话管理挑战

在企业级应用中,批量发送短信(或其他消息)是一个常见需求。为了提高吞吐量,通常会采用多线程并发发送的策略。然而,消息发送依赖于与外部服务器(如SMSC)建立的会话(SMPPSession)。这种会话可能因网络波动、服务器重启等原因中断,此时需要一个机制来重新建立会话,并在会话重连期间暂停所有发送操作,待会话恢复后再继续。

本教程将深入探讨如何使用Java的Object.wait()和Object.notifyAll()机制来协调多个消息发送线程和一个会话管理线程,以实现上述功能。我们将分析在并发场景下可能遇到的同步问题,并提供一套健壮的解决方案。

2. wait()与notify()机制详解

wait()和notify()(或notifyAll())是Java中用于线程间协作的基础机制,它们允许线程在特定条件下暂停执行并等待,直到另一个线程通知它条件满足。

  • wait(): 当一个线程调用wait()方法时,它会释放当前持有的对象锁,并进入等待状态,直到被notify()或notifyAll()唤醒,或者被中断。
  • notify(): 唤醒在该对象上等待的一个任意线程。
  • notifyAll(): 唤醒在该对象上等待的所有线程。

关键点:

立即学习Java免费学习笔记(深入)”;

  1. 必须在synchronized块内调用:wait()、notify()和notifyAll()方法必须在持有对象监视器(即synchronized块所锁定的对象)的情况下调用,否则会抛出IllegalMonitorStateException。
  2. 操作同一个监视器对象:所有等待和通知操作都必须针对同一个对象进行,这个对象充当了线程间通信的“信号量”。
  3. 虚假唤醒与条件检查:wait()方法可能会在没有收到通知的情况下被唤醒(虚假唤醒)。因此,wait()调用通常应该放在一个while循环中,不断检查等待的条件是否真正满足。

3. 原始代码中的同步问题分析

原始代码尝试使用Client.messages列表作为监视器对象来协调线程。然而,其中存在几个关键的同步问题,导致了ArrayIndexOutOfBoundsException和线程不同步。

3.1 while (!Client.messages.isEmpty())的竞态条件

在Sender和SessionProducer线程的run()方法中,外部的while (!Client.messages.isEmpty())循环条件是在synchronized (Client.messages)块外部检查的。

// Sender线程示例
while (!Client.messages.isEmpty()){ // 问题:在同步块外检查
    synchronized (Client.messages){
        // ...
    }
}

问题分析: 假设Client.messages中只剩一条消息。多个Sender线程可能同时执行到while (!Client.messages.isEmpty()),它们都发现列表不为空,然后都尝试进入synchronized (Client.messages)块。当第一个线程进入同步块并成功移除消息后,列表变为空。此时,后续进入同步块的线程在执行Client.messages.remove(0)时,就会因为列表已空而抛出ArrayIndexOutOfBoundsException。

3.2 remove(0)的并发访问问题

即使isEmpty()检查放在同步块内,remove(0)操作也需要谨慎。如果多个线程同时尝试移除,且消息数量不足,仍然可能导致问题。在CopyOnWriteArrayList中,remove(0)本身是线程安全的,但它并不能阻止在列表为空时尝试移除。

多墨智能
多墨智能

多墨智能 - AI 驱动的创意工作流写作工具

下载

3.3 不当的通知机制

Sender线程在成功发送消息后调用了Client.messages.notifyAll()。然而,此时SessionProducer线程可能正在等待会话断开,或者其他Sender线程可能正在等待消息或会话。这种通知通常是不必要的,并且可能导致不必要的线程唤醒,甚至掩盖真正的等待条件。

3.4 wait()的条件检查缺失

原始代码中的wait()没有放在while循环中检查条件。

// 原始Sender线程的等待逻辑
} else {
    try {
        Client.messages.wait(); // 问题:没有在while循环中检查条件
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}

这可能导致线程在被唤醒后,其等待的条件(例如smppSession.isBind()为true或Client.messages不为空)实际上并未满足,从而导致逻辑错误或再次进入等待状态。

4. 改进方案与最佳实践

为了解决上述问题,我们需要对代码进行重构,遵循以下核心原则:

  1. 统一监视器对象:选择一个能代表共享状态的唯一对象作为所有wait()和notifyAll()操作的监视器。在本例中,SMPPSession对象本身是一个很好的选择,因为它代表了会话的绑定状态。
  2. 所有共享资源访问都需同步:包括isEmpty()、remove()等操作,都必须在持有监视器锁的同步块内执行。
  3. wait()必须在while循环中检查条件:防止虚假唤醒和条件不满足时继续执行。
  4. 明确通知时机:只有当某个线程改变了其他线程正在等待的条件时,才调用notifyAll()。

4.1 改进SMPPSession类

SMPPSession作为共享资源,其bind状态是所有线程关注的焦点。我们可以将它作为监视器对象。

public class SMPPSession {
    private boolean bind = false; // 初始状态为未绑定
    private static final Random idGenerator = new Random();

    public synchronized int sendMessage(String msg) { // 保持sendMessage同步
        try {
            Thread.sleep(100L); // 模拟发送延迟
            System.out.println("Sending message: " + msg);
            return Math.abs(idGenerator.nextInt());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            System.err.println("Message sending interrupted: " + e.getMessage());
        }
        return -1;
    }

    public synchronized void reBind() { // reBind方法也同步
        try {
            System.out.println("Rebinding...");
            Thread.sleep(2000L); // 模拟重连延迟
            this.bind = true;
            System.out.println("Session established!");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            System.err.println("Rebinding interrupted: " + e.getMessage());
        }
    }

    public synchronized boolean isBind() { // isBind方法也同步
        return this.bind;
    }

    public synchronized void setBind(boolean bind) { // 允许外部设置绑定状态
        this.bind = bind;
    }
}

4.2 改进Sender线程

Sender线程需要等待两个条件:SMPPSession已绑定,且消息队列不为空。

import java.util.concurrent.CopyOnWriteArrayList;

public class Sender extends Thread {
    private SMPPSession smppSession;
    private CopyOnWriteArrayList messages; // 引用共享消息列表
    private volatile boolean running = true; // 控制线程生命周期

    public Sender(String name, SMPPSession smppSession, CopyOnWriteArrayList messages) {
        this.setName(name);
        this.smppSession = smppSession;
        this.messages = messages;
    }

    public void terminate() {
        this.running = false;
        // 确保线程不会无限等待,如果正在wait(),需要被中断或notify
        synchronized (smppSession) {
            smppSession.notifyAll();
        }
    }

    @Override
    public void run() {
        while (running) {
            synchronized (smppSession) { // 使用smppSession作为监视器
                // 等待条件:会话未绑定 或 消息队列为空
                while (!smppSession.isBind() || messages.isEmpty()) {
                    // 如果消息已全部发送且会话已绑定,则此Sender可以退出
                    if (messages.isEmpty() && smppSession.isBind()) {
                        System.out.println(getName() + ":所有消息已发送完毕,线程退出。");
                        running = false; // 标记为停止
                        smppSession.notifyAll(); // 通知其他可能等待的线程
                        break; // 跳出内部while循环
                    }
                    try {
                        System.out.println(getName() + ":等待中... 会话绑定状态: " + smppSession.isBind() + ", 消息队列是否为空: " + messages.isEmpty());
                        smppSession.wait(); // 等待在smppSession对象上
                    } catch (InterruptedException e) {
                        System.out.println(getName() + ":被中断,线程退出。");
                        Thread.currentThread().interrupt();
                        running = false; // 标记为停止
                        break; // 跳出内部while循环
                    }
                }
                if (!running) { // 如果在等待过程中被标记为停止,则退出外部while循环
                    break;
                }

                // 条件满足:smppSession已绑定且messages不为空
                final String msg = messages.remove(0); // 安全移除消息
                final int msgId = smppSession.sendMessage(msg);
                System.out.println(Thread.currentThread().getName() + " 发送消息并收到ID: " + msgId + "。剩余消息数:" + messages.size());
                // 发送消息后,如果消息队列变空,可能需要通知其他Sender线程退出
                // 或者如果Producer在等待消息队列状态,则需要通知
                // 这里暂时不需要notifyAll,因为发送消息通常不改变Producer的等待条件
                // 但如果messages.isEmpty()是Producer的等待条件之一,则需要
            }
            // 考虑在发送消息后短暂休眠,避免发送过快
            try {
                Thread.sleep(50);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                running = false;
            }
        }
    }
}

4.3 改进SessionProducer线程

SessionProducer线程负责在会话未绑定时进行重连,并在重连成功后通知所有Sender线程。

import java.util.concurrent.CopyOnWriteArrayList;

public class SessionProducer extends Thread {
    private SMPPSession smppSession;
    private CopyOnWriteArrayList messages; // 引用共享消息列表,用于判断是否还有消息需要发送
    private volatile boolean running = true;

    public SessionProducer(String name, SMPPSession smppSession, CopyOnWriteArrayList messages) {
        this.setName(name);
        this.smppSession = smppSession;
        this.messages = messages;
    }

    public void terminate() {
        this.running = false;
        synchronized (smppSession) {
            smppSession.notifyAll();
        }
    }

    @Override
    public void run() {
        while (running) {
            synchronized (smppSession) { // 使用smppSession作为监视器
                // 如果会话已绑定,且所有消息都已发送完毕,则Producer可以退出
                if (smppSession.isBind() && messages.isEmpty()) {
                    System.out.println(getName() + ":所有消息已发送完毕,会话已绑定,线程退出。");
                    running = false;
                    smppSession.notifyAll(); // 通知所有线程可以退出
                    break;
                }

                // 等待条件:会话已绑定 或 消息队列为空(如果 Producer 也需要关注消息队列状态)
                // 这里主要关注会话绑定状态
                while (smppSession.isBind() && !messages.isEmpty()) { // 如果会话已绑定且还有消息要发,Producer等待
                    try {
                        System.out.println(getName() + ":等待中... 会话已绑定,等待会话断开或所有消息发送完毕。");
                        smppSession.wait(); // 等待在smppSession对象上
                    } catch (InterruptedException e) {
                        System.out.println(getName() + ":被中断,线程退出。");
                        Thread.currentThread().interrupt();
                        running = false;
                        break;
                    }
                }
                if (!running) {
                    break;
                }

                // 此时,会话可能未绑定,或者消息队列为空(如果上面条件包含)
                if (!smppSession.isBind()) { // 如果会话未绑定,则进行重连
                    smppSession.reBind();
                    System.out.println(Thread.currentThread().getName()

相关专题

更多
java
java

Java是一个通用术语,用于表示Java软件及其组件,包括“Java运行时环境 (JRE)”、“Java虚拟机 (JVM)”以及“插件”。php中文网还为大家带了Java相关下载资源、相关课程以及相关文章等内容,供大家免费下载使用。

843

2023.06.15

java正则表达式语法
java正则表达式语法

java正则表达式语法是一种模式匹配工具,它非常有用,可以在处理文本和字符串时快速地查找、替换、验证和提取特定的模式和数据。本专题提供java正则表达式语法的相关文章、下载和专题,供大家免费下载体验。

742

2023.07.05

java自学难吗
java自学难吗

Java自学并不难。Java语言相对于其他一些编程语言而言,有着较为简洁和易读的语法,本专题为大家提供java自学难吗相关的文章,大家可以免费体验。

740

2023.07.31

java配置jdk环境变量
java配置jdk环境变量

Java是一种广泛使用的高级编程语言,用于开发各种类型的应用程序。为了能够在计算机上正确运行和编译Java代码,需要正确配置Java Development Kit(JDK)环境变量。php中文网给大家带来了相关的教程以及文章,欢迎大家前来阅读学习。

397

2023.08.01

java保留两位小数
java保留两位小数

Java是一种广泛应用于编程领域的高级编程语言。在Java中,保留两位小数是指在进行数值计算或输出时,限制小数部分只有两位有效数字,并将多余的位数进行四舍五入或截取。php中文网给大家带来了相关的教程以及文章,欢迎大家前来阅读学习。

400

2023.08.02

java基本数据类型
java基本数据类型

java基本数据类型有:1、byte;2、short;3、int;4、long;5、float;6、double;7、char;8、boolean。本专题为大家提供java基本数据类型的相关的文章、下载、课程内容,供大家免费下载体验。

446

2023.08.02

java有什么用
java有什么用

java可以开发应用程序、移动应用、Web应用、企业级应用、嵌入式系统等方面。本专题为大家提供java有什么用的相关的文章、下载、课程内容,供大家免费下载体验。

431

2023.08.02

java在线网站
java在线网站

Java在线网站是指提供Java编程学习、实践和交流平台的网络服务。近年来,随着Java语言在软件开发领域的广泛应用,越来越多的人对Java编程感兴趣,并希望能够通过在线网站来学习和提高自己的Java编程技能。php中文网给大家带来了相关的视频、教程以及文章,欢迎大家前来学习阅读和下载。

16926

2023.08.03

菜鸟裹裹入口以及教程汇总
菜鸟裹裹入口以及教程汇总

本专题整合了菜鸟裹裹入口地址及教程分享,阅读专题下面的文章了解更多详细内容。

0

2026.01.22

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Kotlin 教程
Kotlin 教程

共23课时 | 2.8万人学习

C# 教程
C# 教程

共94课时 | 7.3万人学习

Java 教程
Java 教程

共578课时 | 49.4万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号