65.9K
CodeProject 正在变化。 阅读更多。
Home

agsMSMQ v1.1 - 消息队列实现

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.80/5 (5投票s)

2001年12月20日

2分钟阅读

viewsIcon

132477

downloadIcon

3535

消息队列(MSMQ)的类包装器。

引言

这个类封装了消息队列。

  • 特点
  • API 参考
  • 在应用程序中使用

特点

  • 兼容 MFC 类
  • 易于使用和简单
  • 易于追踪错误

API 参考

这个 agsMSMQ 类在头文件中有如下特性

 struct MQmsg{
    CString label;
    CString msg;
 };

 class agsMSMQ  
 {
 public:
    BOOL closeMSMQ();
    MQmsg getMessageQ();
    int getTotalMessage();
    HRESULT getMSMQmsg();
    BOOL PrepapeGetMessage(CString srv,CString path,int modeMSMQ);
    BOOL SendPrivateQ(CString serv, CString pathmsg,CString labelmsg,CString msg);
    BOOL SendPublicQ(CString serv, CString pathmsg,CString labelmsg,CString msg);
    BOOL DeletePrivateQ(CString serv, CString pathmsg);
    BOOL DeletePublicQ(CString serv, CString pathmsg);
    BOOL CreatePrivateQ(CString serv, CString pathmsg,CString label);
    BOOL CreatePublicQ(CString serv, CString pathmsg,CString label);
    CString getErrorMsg();
    agsMSMQ();
    virtual ~agsMSMQ();

 private:
    void InitialMQMSG();
    void InitialPropidBody(CString label,CString msg);
    void InitialPropidQ();
    CString m_sError;
    void InitialMQQueue();
    DWORD            dwIDprop;
    DWORD            dwBufferLength;
    WCHAR            wszBuffer[250];
    CString            m_sPathQ;
    CString            m_sMSMQserv;
    CString            m_sLabelQ;
    unsigned short        wszPath[256];
    unsigned short        wszLabel[1024];
    DWORD            dwDestFormatLength;
    WCHAR            wszDestFormat[256];
    unsigned char        pMessage[2048];
    DWORD            dwAccessMode;
    DWORD            dwShareMode;
    DWORD            dwReaction; 
    int            m_nMessage;
    MQmsg            msgQ;
    

 protected:
    MQQUEUEPROPS        QProps;
    MQMSGPROPS        QPropMsg;
    MQPROPVARIANT        QPropVar[NUMBER_OF_PROPERTIES];    
    QUEUEPROPID        QPropID[NUMBER_OF_PROPERTIES];
    MSGPROPID        QMsg[NUMBER_OF_PROPERTIES];
    HRESULT            QStatus[NUMBER_OF_PROPERTIES];
    HRESULT            hr;
    HANDLE            hrTrack;
    HANDLE            hQue;
    PSECURITY_DESCRIPTOR    pSecurity;
    
 };

以下是函数的解释

  • BOOL CreatePublicQ(CString serv, CString pathmsg,CString label)

    用于在服务器队列 serv 上创建公共队列,队列名称为 pathmsg,队列标签为 label
  • BOOL CreatePrivateQ(CString serv, CString pathmsg,CString label)

    用于在服务器队列 serv 上创建私有队列,队列名称为 pathmsg,队列标签为 label
  • BOOL SendPublicQ(CString serv, CString pathmsg,CString labelmsg,CString msg);

    用于在 serv 队列服务器上发送公共消息队列, pathmsg 路径队列,消息标签为 labelmsg ,内容消息为 msg
  • BOOL SendPrivateQ(CString serv, CString pathmsg,CString labelmsg,CString msg);

    用于在 serv 队列服务器上发送私有消息队列, pathmsg 路径队列,消息标签为 labelmsg,内容消息为 msg
  • BOOL PrepapeGetMessage(CString srv,CString path,int modeMSMQ);

    准备从 serv 上的路径队列 path 获取消息队列,modeMSMQ 参数标识队列类型。 如果 modeMSMQ 参数的值为 0,则为公共队列。 否则,如果 modeMSMQ 的值为 1,则为私有队列。
  • MQmsg getMessageQ(void)

    获取数据队列消息。 MQmsg 是一个结构体

     struct MQmsg{
        CString label;
        CString msg;
     };
    
  • int getTotalMessage(void)

    获取消息数量。
  • HRESULT getMSMQmsg(void)

    用于检查在获取队列消息时是否发生错误。 
  • BOOL closeMSMQ(void)

    用于关闭队列服务器。
  • CString getErrorMsg(void)

    用于获取错误消息。

在应用程序中使用

这是实现 agsMSMQ 类的 GUI

不要忘记将头文件 (#include "agsMSMQ.h") 放在应用程序的顶部。 除此之外,您必须将库 (mqrt.lib) 添加到您的项目中,如下图所示

创建公共队列

agsMSMQ mque;    

UpdateData();
CWaitCursor wait;

if(!mque.CreatePublicQ(m_sServer,m_sPath,m_sLabel))
{
    AfxMessageBox(mque.getErrorMsg());
    return;
}

AfxMessageBox("Create Public Queue succesfully");

创建私有队列

agsMSMQ mque;    

UpdateData();
CWaitCursor wait;

if(!mque.CreatePrivateQ(m_sServer,m_sPath,m_sLabel))
{
    AfxMessageBox(mque.getErrorMsg());
    return;
}

AfxMessageBox("Create Private Queue succesfully");

删除公共队列

agsMSMQ mque;    

UpdateData();
CWaitCursor wait;

if(!mque.DeletePublicQ(m_sServer,m_sPath))
{
    AfxMessageBox(mque.getErrorMsg());
    return;
}

AfxMessageBox("Delete Public Queue succesfully");

删除私有队列

agsMSMQ mque;    

UpdateData();
CWaitCursor wait;

if(!mque.DeletePrivateQ(m_sServer,m_sPath))
{
    AfxMessageBox(mque.getErrorMsg());
    return;
}

AfxMessageBox("Delete Private Queue succesfully");

发送公共队列消息

agsMSMQ mque;    

UpdateData();
CWaitCursor wait;

if(!mque.SendPublicQ(m_sServer,m_sPath,m_sMessageLabel,m_sMessage))
{
    AfxMessageBox(mque.getErrorMsg());
    return;
}

AfxMessageBox("Send Public Message succesfully");    

发送私有队列消息

agsMSMQ mque;    

UpdateData();
CWaitCursor wait;

if(!mque.SendPrivateQ(m_sServer,m_sPath,m_sMessageLabel,m_sMessage))
{
    AfxMessageBox(mque.getErrorMsg());
    return;
}

AfxMessageBox("Send Public Message succesfully");    

获取所有公共队列消息

agsMSMQ mque;    
int current;
MQmsg msg;
HRESULT hr;

UpdateData();
CWaitCursor wait;
m_cMessage.DeleteAllItems();
if(!mque.PrepareGetMessage(m_sServer,m_sPath,0))
{
    AfxMessageBox(mque.getErrorMsg());
    return;
}

msg    = mque.getMessageQ();
current = m_cMessage.InsertItem(0,msg.label);
m_cMessage.SetItemText(current,1, msg.msg);    
do
{
        hr    = mque.getMSMQmsg(); 
        if(hr!=0) break;
        msg        = mque.getMessageQ();
        current = m_cMessage.InsertItem(0,msg.label);
        m_cMessage.SetItemText(current,1, msg.msg);        
            
        
}while(hr==0);
    
if(!mque.closeMSMQ())
{
    AfxMessageBox(mque.getErrorMsg());
    return;
}

if(mque.getTotalMessage()==0)
{
    AfxMessageBox("No message in Message Queue");
else
{
    CString mseg;
    mseg.Format("Get %d message(s) from public queue successfully",
                 mque.getTotalMessage());
    AfxMessageBox(mseg);        
}

获取所有私有队列消息

agsMSMQ mque;    
int current;
MQmsg msg;
HRESULT hr;

UpdateData();
CWaitCursor wait;
m_cMessage.DeleteAllItems();
if(!mque.PrepareGetMessage(m_sServer,m_sPath,1))
{
    AfxMessageBox(mque.getErrorMsg());
    return;
}

msg        = mque.getMessageQ();
current = m_cMessage.InsertItem(0,msg.label);
m_cMessage.SetItemText(current,1, msg.msg);    

do
{
    hr    = mque.getMSMQmsg();
    if(hr!=0) break;

    msg = mque.getMessageQ();
    current = m_cMessage.InsertItem(0,msg.label);
    m_cMessage.SetItemText(current,1, msg.msg);
}while(hr==0);
    
if(!mque.closeMSMQ())
{
    AfxMessageBox(mque.getErrorMsg());
    return;
}

if(mque.getTotalMessage()==0)
{
    AfxMessageBox("No message in Message Queue");
}else
{
    CString mseg;
    mseg.Format("Get %d message(s) from private queue successfully",
                 mque.getTotalMessage());
    AfxMessageBox(mseg);
}    

参考

消息队列 (MSMQ) 的平台 SDK。

历史

版本 1.0 首次发布:2001 年 11 月 16 日

版本 1.1 修改了 PrepareGetMessage() 和 getMSMQmsg() 函数

© . All rights reserved.