结合观察者模式与事件回调可构建灵活解耦的事件系统,通过定义事件类型、创建发布者与观察者、注册回调函数及触发事件实现;为避免循环依赖,可采用事件分级、过滤、依赖注入等策略;在多线程环境下,需使用线程安全数据结构、事件队列和锁机制保障并发安全;Lambda表达式可简化回调注册,提升代码简洁性与可读性。

C++观察者模式和事件回调结合使用,可以构建灵活且解耦的事件处理系统。观察者模式定义了对象之间的一对多依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都会得到通知并自动更新。事件回调则是一种更直接的函数调用机制,允许对象在特定事件发生时执行预定义的回调函数。结合使用这两种模式,既能实现观察者模式的广播通知,又能利用事件回调的精确控制,从而实现更细粒度的事件处理。
解决方案
将观察者模式与事件回调结合,通常涉及以下几个步骤:
定义事件类型: 首先,定义需要通知的事件类型。这可以是一个枚举类,也可以是字符串常量。
立即学习“C++免费学习笔记(深入)”;
创建事件发布者(Subject): 事件发布者维护一个观察者列表,并提供注册和取消注册观察者的方法。此外,发布者还需要提供触发特定事件的方法,该方法会遍历观察者列表,并调用每个观察者注册的、与该事件类型关联的回调函数。
创建观察者(Observer): 观察者实现一个或多个回调函数,用于处理来自发布者的事件通知。观察者需要向发布者注册,指定其感兴趣的事件类型以及对应的回调函数。
事件回调函数注册: 观察者通过发布者提供的注册方法,将事件类型和回调函数关联起来。
事件触发: 当发布者的状态发生改变,需要通知观察者时,它会调用相应的事件触发方法。该方法会遍历观察者列表,找到注册了该事件类型的观察者,并调用它们的回调函数。
#include <iostream>
#include <vector>
#include <map>
#include <functional>
// 事件类型枚举
enum class EventType {
DATA_READY,
DATA_PROCESSED,
ERROR
};
// 观察者接口 (可选,如果每个观察者需要实现相同的接口)
class Observer {
public:
virtual void onEvent(EventType type, const std::string& data) = 0;
};
// 事件发布者
class Subject {
public:
using Callback = std::function<void(const std::string&)>;
void registerObserver(EventType type, Callback callback) {
observers_[type].push_back(callback);
}
void unregisterObserver(EventType type, Callback callback) {
// 简化:这里只移除第一个匹配的回调函数。实际使用中可能需要更复杂的逻辑
auto& callbacks = observers_[type];
for (auto it = callbacks.begin(); it != callbacks.end(); ++it) {
if (*it == callback) {
callbacks.erase(it);
break;
}
}
}
void notify(EventType type, const std::string& data) {
for (const auto& callback : observers_[type]) {
callback(data);
}
}
private:
std::map<EventType, std::vector<Callback>> observers_;
};
// 具体观察者
class DataProcessor {
public:
DataProcessor(Subject& subject) : subject_(subject) {
subject_.registerObserver(EventType::DATA_READY,
std::bind(&DataProcessor::onDataReady, this, std::placeholders::_1));
subject_.registerObserver(EventType::ERROR,
std::bind(&DataProcessor::onError, this, std::placeholders::_1));
}
~DataProcessor() {
subject_.unregisterObserver(EventType::DATA_READY,
std::bind(&DataProcessor::onDataReady, this, std::placeholders::_1));
subject_.unregisterObserver(EventType::ERROR,
std::bind(&DataProcessor::onError, this, std::placeholders::_1));
}
void onDataReady(const std::string& data) {
std::cout << "DataProcessor received data: " << data << std::endl;
// 模拟数据处理
std::cout << "Processing data..." << std::endl;
// 处理后触发另一个事件
subject_.notify(EventType::DATA_PROCESSED, "Processed: " + data);
}
void onError(const std::string& errorMessage) {
std::cerr << "DataProcessor received error: " << errorMessage << std::endl;
}
private:
Subject& subject_;
};
class DataConsumer {
public:
DataConsumer(Subject& subject) : subject_(subject) {
subject_.registerObserver(EventType::DATA_PROCESSED,
std::bind(&DataConsumer::onDataProcessed, this, std::placeholders::_1));
}
~DataConsumer() {
subject_.unregisterObserver(EventType::DATA_PROCESSED,
std::bind(&DataConsumer::onDataProcessed, this, std::placeholders::_1));
}
void onDataProcessed(const std::string& data) {
std::cout << "DataConsumer received processed data: " << data << std::endl;
}
private:
Subject& subject_;
};
int main() {
Subject subject;
DataProcessor processor(subject);
DataConsumer consumer(subject);
// 模拟数据准备好
subject.notify(EventType::DATA_READY, "Raw data from sensor");
// 模拟发生错误
subject.notify(EventType::ERROR, "File not found");
return 0;
}循环依赖是指观察者A依赖于观察者B,而观察者B又依赖于观察者A的情况。这会导致无限循环的事件触发,最终导致程序崩溃。
避免循环依赖的几种策略:
事件分级: 将事件划分为不同的级别,确保低级别事件的观察者不会触发高级别事件。例如,可以将数据处理事件分为“数据校验”、“数据转换”、“数据存储”等级别,确保数据校验事件的观察者不会触发数据转换或数据存储事件。
事件过滤: 在观察者中添加事件过滤机制,只处理特定条件的事件。例如,观察者可以根据事件的来源或事件携带的数据,决定是否处理该事件。
依赖注入: 使用依赖注入容器管理对象之间的依赖关系,可以更容易地检测和解决循环依赖。
避免双向关联: 尽量避免观察者和发布者之间的双向关联。如果观察者需要向发布者发送消息,可以使用回调函数或事件总线等机制。
状态管理: 使用集中的状态管理系统,例如Redux或状态机,可以避免对象之间的直接依赖,从而减少循环依赖的风险。
临时取消注册: 在处理事件时,临时取消观察者的注册,防止事件再次触发自身。处理完毕后再重新注册。这种方法需要谨慎使用,确保在取消注册期间不会丢失重要的事件通知。
在多线程环境中使用观察者模式和事件回调需要特别注意线程安全问题。
线程安全的数据结构: 观察者列表必须使用线程安全的数据结构,例如
std::mutex
std::vector
std::shared_mutex
std::map
避免在回调函数中执行耗时操作: 回调函数应该尽可能简单快速,避免执行耗时操作,以免阻塞事件发布线程。如果需要执行耗时操作,可以将任务提交到线程池中异步执行。
使用线程安全的事件队列: 可以使用线程安全的事件队列来缓冲事件,然后由单独的线程从队列中取出事件并通知观察者。这样可以避免事件发布线程被阻塞。
避免死锁: 在回调函数中访问共享资源时,需要使用锁机制,但要避免死锁的发生。可以使用
std::lock_guard
std::unique_lock
原子操作: 对于简单的状态更新,可以使用原子操作,例如
std::atomic
读写锁: 如果读操作远多于写操作,可以使用读写锁(
std::shared_mutex
避免在回调函数中修改观察者列表: 在回调函数中修改观察者列表可能会导致并发问题。应该避免这种情况,或者使用线程安全的数据结构和锁机制来保护观察者列表。
#include <iostream>
#include <vector>
#include <map>
#include <functional>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
// 线程安全的事件队列
template <typename T>
class ThreadSafeQueue {
public:
void enqueue(T item) {
std::lock_guard<std::mutex> lock(mutex_);
queue_.push(item);
condition_.notify_one();
}
T dequeue() {
std::unique_lock<std::mutex> lock(mutex_);
condition_.wait(lock, [this] { return !queue_.empty(); });
T item = queue_.front();
queue_.pop();
return item;
}
private:
std::queue<T> queue_;
std::mutex mutex_;
std::condition_variable condition_;
};
// 线程安全的Subject
class ThreadSafeSubject {
public:
using Callback = std::function<void(const std::string&)>;
using EventQueueItem = std::pair<EventType, std::string>;
ThreadSafeSubject() : event_queue_(), is_running_(true) {
// 启动事件处理线程
event_thread_ = std::thread([this]() {
while (is_running_) {
try {
EventQueueItem event = event_queue_.dequeue();
notifyObservers(event.first, event.second);
} catch (const std::exception& e) {
std::cerr << "Exception in event thread: " << e.what() << std::endl;
}
}
});
}
~ThreadSafeSubject() {
is_running_ = false;
event_queue_.enqueue({EventType::ERROR, "Shutdown"}); // Unblock queue if waiting
if (event_thread_.joinable()) {
event_thread_.join();
}
}
void registerObserver(EventType type, Callback callback) {
std::lock_guard<std::mutex> lock(observers_mutex_);
observers_[type].push_back(callback);
}
void unregisterObserver(EventType type, Callback callback) {
std::lock_guard<std::mutex> lock(observers_mutex_);
auto& callbacks = observers_[type];
for (auto it = callbacks.begin(); it != callbacks.end(); ++it) {
if (*it == callback) {
callbacks.erase(it);
break;
}
}
}
void notify(EventType type, const std::string& data) {
event_queue_.enqueue({type, data}); // 将事件放入队列
}
private:
void notifyObservers(EventType type, const std::string& data) {
std::lock_guard<std::mutex> lock(observers_mutex_);
for (const auto& callback : observers_[type]) {
callback(data);
}
}
std::map<EventType, std::vector<Callback>> observers_;
std::mutex observers_mutex_;
ThreadSafeQueue<EventQueueItem> event_queue_;
std::thread event_thread_;
std::atomic<bool> is_running_;
};
// 线程安全的观察者示例
class ThreadSafeDataProcessor {
public:
ThreadSafeDataProcessor(ThreadSafeSubject& subject) : subject_(subject) {
subject_.registerObserver(EventType::DATA_READY,
std::bind(&ThreadSafeDataProcessor::onDataReady, this, std::placeholders::_1));
subject_.registerObserver(EventType::ERROR,
std::bind(&ThreadSafeDataProcessor::onError, this, std::placeholders::_1));
}
~ThreadSafeDataProcessor() {
subject_.unregisterObserver(EventType::DATA_READY,
std::bind(&ThreadSafeDataProcessor::onDataReady, this, std::placeholders::_1));
subject_.unregisterObserver(EventType::ERROR,
std::bind(&ThreadSafeDataProcessor::onError, this, std::placeholders::_1));
}
void onDataReady(const std::string& data) {
std::cout << "Thread " << std::this_thread::get_id() << ": DataProcessor received data: " << data << std::endl;
// 模拟耗时的数据处理
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::cout << "Thread " << std::this_thread::get_id() << ": Processing data..." << std::endl;
subject_.notify(EventType::DATA_PROCESSED, "Processed: " + data);
}
void onError(const std::string& errorMessage) {
std::cerr << "Thread " << std::this_thread::get_id() << ": DataProcessor received error: " << errorMessage << std::endl;
}
private:
ThreadSafeSubject& subject_;
};
class ThreadSafeDataConsumer {
public:
ThreadSafeDataConsumer(ThreadSafeSubject& subject) : subject_(subject) {
subject_.registerObserver(EventType::DATA_PROCESSED,
std::bind(&ThreadSafeDataConsumer::onDataProcessed, this, std::placeholders::_1));
}
~ThreadSafeDataConsumer() {
subject_.unregisterObserver(EventType::DATA_PROCESSED,
std::bind(&ThreadSafeDataConsumer::onDataProcessed, this, std::placeholders::_1));
}
void onDataProcessed(const std::string& data) {
std::cout << "Thread " << std::this_thread::get_id() << ": DataConsumer received processed data: " << data << std::endl;
}
private:
ThreadSafeSubject& subject_;
};
int main() {
ThreadSafeSubject subject;
ThreadSafeDataProcessor processor(subject);
ThreadSafeDataConsumer consumer(subject);
// 模拟多个线程同时产生数据
std::vector<std::thread> threads;
for (int i = 0; i < 5; ++i) {
threads.emplace_back([&subject, i]() {
std::this_thread::sleep_for(std::chrono::milliseconds(50 * i)); // 错开时间
subject.notify(EventType::DATA_READY, "Raw data from sensor " + std::to_string(i));
});
}
// 等待所有线程完成
for (auto& thread : threads) {
thread.join();
}
// 模拟发生错误
subject.notify(EventType::ERROR, "File not found");
// 等待一段时间,确保所有事件处理完成
std::this_thread::sleep_for(std::chrono::seconds(1));
return 0;
}Lambda表达式可以简化事件回调的注册,避免创建单独的回调函数。
#include <iostream>
#include <vector>
#include <map>
#include <functional>
// 事件类型枚举
enum class EventType {
BUTTON_CLICKED,
TEXT_CHANGED
};
// 事件发布者
class Button {
public:
using Callback = std::function<void(const std::string&)>;
void registerObserver(EventType type, Callback callback) {
observers_[type].push_back(callback);
}
void click() {
notify(EventType::BUTTON_CLICKED, "Button clicked!");
}
void setText(const std::string& text) {
text_ = text;
notify(EventType::TEXT_CHANGED, text_);
}
private:
void notify(EventType type, const std::string& data) {
for (const auto& callback : observers_[type]) {
callback(data);
}
}
std::map<EventType, std::vector<Callback>> observers_;
std::string text_;
};
int main() {
Button button;
// 使用Lambda表达式注册回调函数
button.registerObserver(EventType::BUTTON_CLICKED, [](const std::string& data) {
std::cout << "Button click event: " << data << std::endl;
});
button.registerObserver(EventType::TEXT_CHANGED, [](const std::string& text) {
std::cout << "Text changed event: " << text << std::endl;
});
button.click();
button.setText("Hello, world!");
return 0;
}使用Lambda表达式可以使代码更简洁易懂,尤其是在回调函数逻辑比较简单的情况下。它避免了定义单独的函数,并将回调函数的定义直接嵌入到注册代码中。
以上就是C++观察者模式与事件回调结合使用的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号