答案:C++多线程观察者模式通过线程安全的观察者列表管理、异步事件分发、weak_ptr避免循环引用、事件队列与工作线程解耦通知过程,确保并发环境下的安全性与高性能。

在C++中实现观察者模式并结合多线程进行事件通知,本质上是为了构建一个解耦、响应迅速且可扩展的系统。当一个“被观察者”(Subject)发生状态变化时,它能够通知所有注册的“观察者”(Observer),而多线程的引入则让这个通知过程可以异步进行,避免阻塞主逻辑,并允许观察者在各自的线程中处理事件,这对于构建高性能、低延迟的并发应用至关重要。
要实现一个健壮的C++多线程观察者模式,我们需要关注几个核心点:线程安全的观察者列表管理、异步事件分发机制、以及观察者生命周期的妥善处理。
首先,定义观察者和被观察者的接口。一个
IObserver
update
ISubject
attach
detach
notify
// 事件基类,用于传递不同类型的事件数据
struct Event {
enum Type {
Generic,
DataChanged,
StatusUpdate
// ...更多事件类型
};
Type type = Generic;
// 可以添加时间戳、源ID等通用信息
virtual ~Event() = default;
};
// 观察者接口
class IObserver {
public:
virtual ~IObserver() = default;
virtual void update(const Event& event) = 0;
};
// 被观察者接口
class ISubject {
public:
virtual ~ISubject() = default;
virtual void attach(std::shared_ptr<IObserver> observer) = 0;
virtual void detach(std::shared_ptr<IObserver> observer) = 0;
virtual void notify(const Event& event) = 0;
};接着,实现一个具体的
Subject
std::vector<std::weak_ptr<IObserver>>
std::mutex
立即学习“C++免费学习笔记(深入)”;
#include <vector>
#include <memory>
#include <mutex>
#include <algorithm> // for std::remove_if
#include <iostream> // for example output
// 前面定义的Event和IObserver/ISubject接口
class ConcreteSubject : public ISubject {
public:
void attach(std::shared_ptr<IObserver> observer) override {
std::lock_guard<std::mutex> lock(mtx_);
// 避免重复添加
for (const auto& w_obs : observers_) {
if (auto s_obs = w_obs.lock(); s_obs == observer) {
return;
}
}
observers_.push_back(observer);
}
void detach(std::shared_ptr<IObserver> observer) override {
std::lock_guard<std::mutex> lock(mtx_);
observers_.erase(
std::remove_if(observers_.begin(), observers_.end(),
[&](const std::weak_ptr<IObserver>& w_obs) {
return w_obs.lock() == observer;
}),
observers_.end());
}
void notify(const Event& event) override {
// 这里的通知可以是同步的,也可以是异步的
// 在多线程环境中,通常会选择异步通知
// 对于同步通知,需要注意锁的粒度和潜在的死锁风险
std::vector<std::shared_ptr<IObserver>> active_observers;
{
std::lock_guard<std::mutex> lock(mtx_);
// 复制一份活跃的观察者列表,避免在通知过程中修改列表
// 同时清理已失效的weak_ptr
observers_.erase(
std::remove_if(observers_.begin(), observers_.end(),
[&](const std::weak_ptr<IObserver>& w_obs) {
if (auto s_obs = w_obs.lock()) {
active_observers.push_back(s_obs);
return false; // 仍然有效
}
return true; // 已失效,移除
}),
observers_.end());
}
// 异步通知的实现通常会将事件放入一个队列,由专门的线程处理
// 这里只是同步通知的示例,实际多线程会更复杂
for (const auto& observer : active_observers) {
// 在多线程场景下,这里不应该直接调用update,而是将事件推送到一个队列
// observer->update(event); // 同步调用会阻塞
// 假设我们有一个异步事件分发器
EventDispatcher::getInstance().dispatchEvent(observer, event);
}
}
private:
std::vector<std::weak_ptr<IObserver>> observers_;
std::mutex mtx_; // 保护observers_列表
};异步通知是多线程环境的关键。我们可以引入一个全局的
EventDispatcher
Subject
notify
Observer::update
EventDispatcher
update
#include <queue>
#include <thread>
#include <condition_variable>
#include <atomic>
// 前面定义的Event和IObserver接口
class EventDispatcher {
public:
static EventDispatcher& getInstance() {
static EventDispatcher instance;
return instance;
}
void dispatchEvent(std::shared_ptr<IObserver> observer, const Event& event) {
{
std::lock_guard<std::mutex> lock(queue_mtx_);
event_queue_.push({observer, std::make_shared<Event>(event)}); // 复制事件,确保生命周期
}
cv_.notify_one(); // 通知一个工作线程有新事件
}
void start(int num_threads = 2) {
if (running_.exchange(true)) return; // 避免重复启动
for (int i = 0; i < num_threads; ++i) {
worker_threads_.emplace_back(&EventDispatcher::worker_loop, this);
}
}
void stop() {
if (!running_.exchange(false)) return; // 避免重复停止
cv_.notify_all(); // 唤醒所有等待的线程
for (std::thread& t : worker_threads_) {
if (t.joinable()) {
t.join();
}
}
worker_threads_.clear();
}
private:
struct QueuedEvent {
std::shared_ptr<IObserver> observer;
std::shared_ptr<Event> event_data; // 使用shared_ptr管理事件数据生命周期
};
EventDispatcher() = default;
~EventDispatcher() {
stop(); // 确保在析构时停止所有线程
}
EventDispatcher(const EventDispatcher&) = delete;
EventDispatcher& operator=(const EventDispatcher&) = delete;
void worker_loop() {
while (running_) {
QueuedEvent q_event;
{
std::unique_lock<std::mutex> lock(queue_mtx_);
cv_.wait(lock, [this]{ return !running_ || !event_queue_.empty(); });
if (!running_ && event_queue_.empty()) {
break; // 停止信号且队列为空,退出循环
}
if (!event_queue_.empty()) {
q_event = event_queue_.front();
event_queue_.pop();
} else {
continue; // 队列为空但未停止,继续等待
}
} // 解锁,允许其他线程添加事件
if (q_event.observer) { // 检查观察者是否仍然存在
q_event.observer->update(*q_event.event_data);
}
}
}
std::queue<QueuedEvent> event_queue_;
std::mutex queue_mtx_;
std::condition_variable cv_;
std::vector<std::thread> worker_threads_;
std::atomic<bool> running_ = false;
};确保C++观察者模式在多线程环境下的线程安全,在我看来,主要涉及到对共享资源的保护和事件通知机制的选择。这可不是简单地加几个锁就能万事大吉的,需要一套组合拳。
首先,最直观也是最关键的一点,就是保护观察者列表。
Subject
std::vector<std::weak_ptr<IObserver>>
attach
detach
notify
std::mutex
observers_
std::lock_guard
std::unique_lock
其次,妥善处理观察者的生命周期。这是个老生常谈但又容易出错的地方。如果
Subject
std::shared_ptr<IObserver>
IObserver
std::shared_ptr<ISubject>
std::weak_ptr
Subject
std::weak_ptr<IObserver>
Subject
weak_ptr::lock()
nullptr
再者,事件通知的同步与异步选择。在多线程环境下,直接在
notify
update
Subject
update
Subject
最后,事件数据本身的生命周期管理。当事件数据从
Subject
Observer
const&
std::shared_ptr<const Event>
在我看来,线程安全不仅仅是技术细节,更是一种设计哲学,它要求我们对系统中所有共享状态和并发操作有清晰的认识和预判。
异步事件通知机制在C++多线程观察者模式中,可以说是一剂“灵丹妙药”,它带来的优势是显而易见的,并且实现起来也有其精妙之处。
核心优势:
Subject
Observer
Subject
Observer
update
Subject
实现细节:
异步事件通知的实现,通常围绕着一个线程安全的队列和一组工作线程(或一个单独的事件分发线程)展开,这本质上是一个生产者-消费者模型。
线程安全队列:
std::queue
std::mutex
push
pop
std::condition_variable
Subject
push
notify_one()
notify_all()
pop
wait()
std::shared_ptr<IObserver>
std::shared_ptr<Event>
shared_ptr
事件分发器(Dispatcher)与工作线程:
dispatchEvent
Subject
std::condition_variable::wait
update
shared_ptr<IObserver>
join
std::atomic<bool>
std::condition_variable::notify_all()
running_
false
坦白说,实现一个高效且健壮的异步事件分发器,需要对C++并发编程有深入的理解,尤其是对生命周期管理、异常安全和锁的细粒度控制。
在C++观察者模式中,随着系统复杂度的提升,事件数据不再是简单的整型或字符串,回调函数也不再是单一的
update
处理复杂的事件数据:
Event
MouseClickEvent
KeyboardEvent
NetworkDataReceivedEvent
struct MouseClickEvent : public Event {
MouseClickEvent(int x, int y) : x_pos(x), y_pos(以上就是C++观察者模式与多线程事件通知实现的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号