Phaser支持多阶段同步与动态注册,示例中3线程协作完成三阶段任务,主线程注册后启动动态线程,各阶段通过arriveAndAwaitAdvance阻塞等待,phase递增至2后终止。

在Java并发编程中,当多个线程需要分阶段协同执行任务时,Phaser 是比 CountDownLatch 或 CyclicBarrier 更灵活的选择。它不仅支持动态注册/注销线程,还能控制多阶段的同步流程。下面介绍如何使用 Phaser 实现分阶段任务的并发控制。
理解Phaser的核心机制
Phaser 可以看作是 CountDownLatch 和 CyclicBarrier 的结合体,但它更强大:
- 支持多阶段同步:每个阶段结束后自动进入下一阶段
- 动态参与:线程可以在任意阶段加入或退出
- 可重用:不像 CountDownLatch 只能用一次
- 批量到达:通过 arriveAndAwaitAdvance() 阻塞等待本阶段所有参与者到达
Phaser 使用 phase 编号表示当前所处阶段(从0开始),每完成一轮同步 phase 值递增。
基本使用:实现三阶段协同任务
以下示例展示三个线程协作完成三个阶段的任务:
立即学习“Java免费学习笔记(深入)”;
import java.util.concurrent.Phaser;
public class PhaseTaskExample {
public static void main(String[] args) {
Phaser phaser = new Phaser(3); // 初始化3个参与者
for (int i = 1; i <= 3; i++) {
new Thread(new Worker(phaser, "Worker-" + i)).start();
}
}
static class Worker implements Runnable {
private final Phaser phaser;
private final String name;
Worker(Phaser phaser, String name) {
this.phaser = phaser;
this.name = name;
}
@Override
public void run() {
for (int phase = 0; phase < 3; phase++) {
System.out.println(name + " 正在执行第 " + phase + " 阶段");
// 模拟工作
try { Thread.sleep(500); } catch (InterruptedException e) {}
// 到达并等待其他线程完成本阶段
int currentPhase = phaser.arriveAndAwaitAdvance();
System.out.println(name + " 完成第 " + currentPhase + " 阶段");
}
}
}
}
输出会显示每个线程按阶段顺序推进,所有线程都完成后才进入下一阶段。
高级技巧:动态注册与分阶段回调
Phaser 允许在运行时注册新线程,并可在每阶段结束时执行自定义逻辑:
Phaser phaser = new Phaser() {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("【阶段回调】当前阶段: " + phase + ", 参与者数: " + registeredParties);
return phase >= 2 || registeredParties == 0; // 阶段>=2时终止
}
};
// 主线程先注册
phaser.register();
new Thread(() -> {
phaser.register(); // 动态注册新线程
workAndArrive(phaser, "LateWorker");
}).start();
// 主线程执行三阶段
for (int i = 0; i < 3; i++) {
System.out.println("Main 执行阶段 " + i);
try { Thread.sleep(400); } catch (InterruptedException e) {}
phaser.arriveAndAwaitAdvance();
}
onAdvance 方法在每个阶段结束、所有参与者到达后自动调用,返回 true 表示 Phaser 应该终止。
实用建议与注意事项
使用 Phaser 时应注意以下几点:
- 调用 register() 增加参与者数量,unregister() 减少
- 子 Phaser 可用于构建树形同步结构,提升大规模并发性能
- 避免长时间阻塞在 arriveAndAwaitAdvance() 中影响整体进度
- 若某线程提前退出,应调用 forceTermination() 或 unregister()
- 适合场景包括并行算法的迭代同步、分批数据处理、流水线任务等
基本上就这些。Phaser 提供了精细的阶段控制能力,合理使用能让复杂并发流程更清晰可控。










