
本教程深入探讨了在Java多线程环境中,如何使用`wait()`和`notifyAll()`机制实现并发消息发送与会话重连的同步控制。文章分析了共享资源访问中的常见陷阱,特别是`ArrayIndexOutOfBoundsException`的根源,并提供了基于`wait/notifyAll`的正确同步方案。此外,教程还介绍了`volatile`关键字的重要性以及`java.util.concurrent`包中更高级的并发工具,旨在帮助开发者构建健壮、高效的多线程应用。
理解Java中的wait()和notifyAll()机制
在Java中,wait()、notify()和notifyAll()是Object类提供的核心方法,用于线程间的协作。它们必须在synchronized代码块内部调用,并且作用于同一个锁对象。
- wait(): 使当前线程进入等待状态,并释放它所持有的锁。线程会一直等待,直到被其他线程调用notify()或notifyAll()唤醒,或者等待超时。被唤醒后,线程会尝试重新获取锁,然后从wait()方法返回。
- notify(): 唤醒在该锁对象上等待的单个线程。如果有多个线程在等待,JVM会选择其中一个进行唤醒,具体是哪一个线程是不确定的。
- notifyAll(): 唤醒在该锁对象上等待的所有线程。所有被唤醒的线程都会尝试重新获取锁,但只有一个线程能成功获取并继续执行。
这些机制是构建生产者-消费者模型或处理共享资源状态变化时不可或缺的工具。
场景描述:多线程消息发送与会话管理
设想一个场景:我们需要通过SMPP协议发送大量短信。存在多个“发送者”(Sender)线程并发地从一个共享的消息队列中取出消息并发送。同时,有一个独立的“会话管理器”(SessionProducer)线程负责维护SMPP会话的连接状态。当会话断开时,会话管理器需要重新建立连接,在此期间,所有发送者线程必须暂停发送。一旦会话恢复,发送者线程应继续工作。SMPPSession对象是所有线程共享的资源,其isBind()方法指示会话是否有效,reBind()方法用于重新建立会话。
立即学习“Java免费学习笔记(深入)”;
原始代码分析与问题识别
给定的原始代码尝试使用wait/notifyAll来协调发送者和会话管理器。然而,它存在一个关键的同步问题,导致了ArrayIndexOutOfBoundsException。
原始代码结构(简化)
// SMPPSession 类 (共享资源)
public class SMPPSession {
private boolean bind; // 会话绑定状态
public int sendMessage(String msg) { /* ... */ }
public void reBind() { /* ... */ this.bind = true; }
public boolean isBind() { return this.bind; }
}
// Sender 线程
public class Sender extends Thread {
private SMPPSession smppSession;
// ... 构造函数
@Override
public void run() {
while (!Client.messages.isEmpty()){ // 问题点1: 在同步块外部检查
synchronized (Client.messages){ // 锁定共享消息列表
if (smppSession.isBind()){
final String msg = Client.messages.remove(0); // 移除消息
// ... 发送消息
Client.messages.notifyAll(); // 问题点2: 在此调用notifyAll可能不合适
} else {
try {
Client.messages.wait(); // 等待会话恢复
} catch (InterruptedException e) { /* ... */ }
}
}
}
}
}
// SessionProducer 线程
public class SessionProducer extends Thread {
private SMPPSession smppSession;
// ... 构造函数
@Override
public void run() {
while (!Client.messages.isEmpty()){ // 问题点3: 生产者不应依赖消息列表是否为空
synchronized (Client.messages){ // 锁定共享消息列表
if (!smppSession.isBind()){
smppSession.reBind(); // 重新绑定
Client.messages.notifyAll(); // 唤醒所有等待线程
} else{
try {
Client.messages.wait(); // 等待会话断开
} catch (InterruptedException e) { /* ... */ }
}
}
}
}
}
// Client 类 (主程序)
public class Client {
public static final List messages = new CopyOnWriteArrayList<>(); // 共享消息列表
// ... main 方法启动线程
} 导致ArrayIndexOutOfBoundsException的原因
异常的根本原因在于Sender线程在synchronized (Client.messages)块外部执行了while (!Client.messages.isEmpty())检查。
一套傻瓜式的建站程序,由前台购物、后台管理、在线支付三部分组成介绍说明:1.注册与否均可购物(同类程序大多要求注册才能购物),方便了那些懒得注册的客户。降低用户使用门槛,自然可抓住更多潜在商机。2.会员等级和折扣功能。管理员可方便的为会员设置不同等级,不同等级的员会可享受不同的购物折扣。3.站内短信、留言发布,沟通无极限。会员和游客均可发送短信和留言。4.完美融合在线支付功能,无需编程、无需修改源
- 竞态条件: 假设Client.messages列表中还剩一条消息。
- 多个发送者线程: 多个Sender线程(例如Sender1, Sender2, Sender3, Sender4)同时检查!Client.messages.isEmpty(),都发现列表不为空,因此都尝试进入synchronized (Client.messages)块。
- 顺序执行: 线程会依次获取锁。假设Sender1首先获取锁。
- Sender1执行: Sender1进入同步块,检查smppSession.isBind()为真,然后成功执行Client.messages.remove(0),将列表清空。它发送消息后,释放锁。
- 其他Sender线程: 紧接着,Sender2获取锁。当它进入同步块时,Client.messages已经为空。然而,它并没有再次检查isEmpty(),而是直接尝试执行Client.messages.remove(0)。由于列表已空,这将抛出ArrayIndexOutOfBoundsException。Sender3和Sender4也会遇到同样的问题。
此外,SessionProducer线程的while (!Client.messages.isEmpty())循环条件也不合理。作为会话管理器,它应该持续运行以监控和维护会话状态,而不是仅仅在消息队列不为空时才工作。
改进的同步方案
为了解决上述问题,我们需要确保所有对共享状态(smppSession.isBind()和Client.messages)的检查和修改都发生在同步块内部,并且使用while循环来包裹wait()调用,以处理虚假唤醒和条件变化。
1. SMPPSession的改进
为了确保bind状态的可见性,应将其声明为volatile。
public class SMPPSession {
private volatile boolean bind = false; // 初始化为false,并声明为volatile
private static final Random idGenerator = new Random();
public int sendMessage(String msg){
try{
Thread.sleep(1000L);
System.out.println("Sending message: " + msg);
return Math.abs(idGenerator.nextInt());
} catch (InterruptedException e){
Thread.currentThread().interrupt(); // 重新设置中断标志
e.printStackTrace();
}
return -1;
}
public void reBind(){
try{
System.out.println("Rebinding...");
Thread.sleep(1000L);
this.bind = true; // 更新绑定状态
System.out.println("Session established!");
} catch (InterruptedException e){
Thread.currentThread().interrupt(); // 重新设置中断标志
e.printStackTrace();
}
}
public boolean isBind(){
return this.bind;
}
}2. Sender线程的改进
Sender线程应该在获取锁后,在一个while循环中检查两个条件:会话是否绑定,以及消息队列是否为空。只有当这两个条件都满足时,才能尝试发送消息。
public class Sender extends Thread{
private SMPPSession smppSession;
public Sender(String name, SMPPSession smppSession){
this.setName(name);
this.smppSession = smppSession;
}
@Override
public void run(){
while (true){ // 持续运行,直到所有消息发送完毕或被中断
String msgToSend = null;
synchronized (Client.messages){ // 锁定共享消息列表
// 条件检查:会话未绑定 或 消息列表为空
while (!smppSession.isBind() || Client.messages.isEmpty()){
// 如果消息列表为空且会话已绑定,说明所有消息已发送,此发送者线程可以退出
if (Client.messages.isEmpty() && smppSession.isBind()){
System.out.println(getName() + " finished sending all messages. Terminating.");
return; // 退出线程
}
try {
System.out.println(getName() + " waiting. Bind status: " + smppSession.isBind() + ", Messages empty: " + Client.messages.isEmpty());
Client.messages.wait(); // 等待条件满足
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 重新设置中断标志
System.out.println(getName() + " interrupted while waiting. Terminating.");
return; // 退出线程
}
}
// 条件满足:会话已绑定且有消息可发送
msgToSend = Client.messages.remove(0); // 在同步块内安全移除消息
// 注意:这里不需要notifyAll,因为Sender线程移除消息不会改变SessionProducer的等待条件,
// 也不会让其他Sender线程立即从等待中恢复(除非它们也等待消息出现,但这里它们等待的是isBind和消息非空)
}
// 在同步块外部执行耗时的消息发送操作
final int msgId = smppSession.sendMessage(msgToSend);
System.out.println(getName() + " sent msg and received msgId: " + msgId);
}
}
}3. SessionProducer线程的改进
SessionProducer的主要职责是确保SMPPSession始终处于绑定状态。它应该持续运行,并在会话未绑定时进行重连。
public class SessionProducer extends Thread{
private SMPPSession smppSession;
public SessionProducer(String name, SMPPSession smppSession){
this.setName(name);
this.smppSession = smppSession;
}
@Override
public void run(){
while (true){ // 持续运行,监控并维护会话状态
synchronized (Client.messages){ // 锁定共享消息列表
// 条件检查:如果会话已绑定,则等待
while (smppSession.isBind()){
// 如果所有消息都已发送,且会话已绑定,SessionProducer可以等待新的消息出现或等待终止信号
// 暂时让它等待,因为它的核心职责是确保会话可用。
if (Client.messages.isEmpty()) {
System.out.println(getName() + " session is bound and messages are empty. Waiting for potential new messages or session break.");
} else {
System.out.println(getName() +









