通用通知器:C++ 中的消息传递






4.28/5 (13投票s)
2004年4月27日
5分钟阅读

82071

1824
本文描述了对象之间同步/异步通信的设计与实现
引言
通知器使得系统中的对象之间可以进行匿名通信。由于是匿名的,通信的对象彼此之间没有了解,因此与它们通信的对象是独立的。它们也易于理解,为新开发人员引入项目提供了无缝迁移。其他语言(尤其是 Smalltalk)内置了此功能;C++ 允许用户自由创建自己的。
设计
有兴趣发送消息给订阅者的,通过通知器类方法进行;有兴趣接收消息的,实现订阅者接口并注册到相应的通知器。通知器负责消息路由;对于每种类型的消息,都有一个不同的通知器。通知器可以解耦消息的发送者和接收者(订阅者)。
线程安全的通知器
通知器必须满足三个要求
- 简单
- 可扩展
- 线程安全
简洁是一个模糊的术语。我选择从复杂性来定义它——代码的复杂性越低,它就越容易维护、解释、优化和测试。代码的复杂性越低,它就越简单。方便?;)
可扩展性是一个难以衡量的指标——虽然有许多定义,但它们之间共同的主题是能够将新功能或技术进步添加和整合到现有代码库中。我理想中的可扩展性是代码允许我在不进行大量原始代码工作的情况下随时间维护它。模板通过参数化关键类型并允许在实现之外使用这些类型来实现这一点。
线程安全与以下事实有关:在多线程环境中使用通知器的客户端不必担心线程同步问题——订阅者需要,但通知器不应该。
通知器具有以下职责
- 注册订阅者
- 注销订阅者
- 通知订阅者
为此,会维护一个订阅者映射。订阅者作为键,一个指示订阅者是否已注册的标志作为数据。使用标志是为了使注销不会影响(从订阅映射中删除条目)订阅映射。如果订阅者在同步通知期间选择退订,这一点很重要。
注册后,订阅者将被插入订阅者映射;如果它已存在,则进行检查以确定它是否已被注销。如果订阅者已被标记为注销但尚未从订阅者映射中删除,则将其状态重置为“已注册
”。
通知涉及获取关键部分的锁,遍历映射并通知订阅者。任何被标记为未注册的订阅者都不会被通知。订阅者被通知后,所有在订阅者映射中被标记为未注册的订阅者都将被删除。
注销涉及将订阅者标记为注销。下次发生通知时,订阅者将从映射中删除。
异步通知
使用 Windows Thread Pooling 中创建的线程池,可以轻松实现异步通知。定义一个工作单元,该工作单元在处理过程中通知订阅者事件。
void async_notifier<T_state>::process() {
notifier<T_state>::instance().notify(m_state);
}
仅此而已!
演示
我们之所以需要通知器,是因为我们创建了一个启动套接字监听的服务。在等待传入连接时,调用线程会休眠,因为它等待通知。
// wait for accept
WSANETWORKEVENTS c_events = {0};
wait_for_event(e_event_accept, c_events, INFINITE);
// accept
SOCKADDR_IN sa_client = {0};
int n_sa = sizeof(sa_client);
smart_socket_handle sp_conn(accept((SOCKADDR*)&c_sa_client, n_sa));
如果服务需要干净地关闭,我们需要一种方法来信号化等待的套接字关闭。此外,还有几个对象需要服务关闭的通知。事实证明,通知器是最佳解决方案,因为我们可以快速将其绑定到 shutdown
事件。
namespace events
{
enum service
{
service_start,
service_pause,
service_resume,
service_stop
};
typedef notifier<service> service_notifier;
events::service
是通知数据;发布者只需通知我们的订阅者
template<class T_connection>
struct listener :
socket,
core::work_unit,
subscriber<events::service>
关于事件
void on_shutdown()
{
// notify our listener(s) to stop
events::service_notifier::instance()->notify(events::service_stop);
监听器中的处理程序负责繁重的工作。
void on_notify(const state& event)
{
switch (event)
{
case events::service_start:
case events::service_resume:
start_listening();
break;
case events::service_stop:
case events::service_pause:
stop_listening();
break;
default: throw std::invalid_argument("listener::on_notify");
}
}
Using the Code
要创建订阅者,请从 subscriber 派生您的类。您必须实现 on_notify
方法。
struct work : core::work_unit, subscriber<events>
{
work();
void process();
void on_notify(const events& e);
bool m_work;
double m_data;
};
要接收事件通知,您的订阅者必须使用 subscribe
方法订阅。
void work::process()
{
// we subscribe in process because
// we are created during notification. if
// we were to subscribe during work::work
// we would be added to the notifier map
// and spawning would never cease to end!
subscribe();
要停止事件通知,您的订阅者必须使用 unsubscribe
方法退订。
void work::on_notify(const events& e)
{
switch (e)
{
case die:
unsubscribe();
std::cout << "dying: " << this << std::endl;
m_work = false;
break;
要同步通知订阅者,请使用 notifier::notify
。
void input::process()
{
std::cout << "(S)pawn/(D)ie: ";
switch (::tolower(::getch()))
{
// die and leave
case 'd':
notifier<events>::instance().notify(die);
要异步通知订阅者,请创建并排队一个 async_notifier
实例。
// and then kill it asynchronously
global::thread_pool::instance().queue_request(new async_notifier<events>(die));
关于演示项目
演示项目模拟了上述场景。创建了工作线程,它们执行工作直到收到关机通知。收到通知后,工作线程退出,系统优雅地关闭。可以生成更多工作线程来模拟代码的繁重负载。
- 线程池已初始化。
- 工作已排队,并使用通知器异步杀死。
- 已排队一个生成工作单元和一个输入工作单元。
- 主线程等待输入工作单元死亡时发出的事件。
在查看测试平台时,需要牢记一些重要注意事项。我们的主线程等待一个在输入工作单元即将死亡时发出的事件。根据待处理工作的数量,立即退出应用程序可能会是灾难性的。
为了理解,请考虑以下场景:
用户已指示输入退出;输入同步通知所有订阅者退出,并信号化我们的主线程退出。
为了论证,假设有 100 个工作单元已排队并待处理。线程池中的线程在所有工作完成并收到关机标志之前不会退出。同时,主线程正在回溯并销毁进程堆。我们的工作单元最终在最后一点工作完成后被释放。没有堆,会发生什么?
为了保持简单,我暂停了主线程,在此期间机器会平稳关闭。可以使用更复杂的同步机制。但是,在关机期间,我不受性能要求的限制,并且倾向于简单的实现。您的经验可能不同。
结论
通知器是连接对状态变化感兴趣但又不想相互绑定的系统各部分的一种好方法。模板为通知子系统与不同状态数据之间的重用提供了自然的机制。同步和异步通知提供了除最晦涩的场景之外的所有情况所需的灵活性。
无论您选择使用免费提供的实现还是自己动手,通知器都能使在任何语言中构建复杂解决方案更加易于管理。祝您编码愉快!
历史
- 2004/05/09:已更新链接;感谢 Phillipe Lhoste 指出链接错误
- 2004/04/27:文章发布
许可证
本文未附加明确的许可证,但可能在文章文本或下载文件本身中包含使用条款。如有疑问,请通过下面的讨论区联系作者。
作者可能使用的许可证列表可以在此处找到。