.NET IBM MQ Series Adapter






3.83/5 (12投票s)
一个自定义组件,用于帮助连接到 IBM MQ 系列服务器。
引言
在系统集成和遗留系统集成过程中,您可能需要处理不同的平台。业务集成的重点是连接不同的计算机系统、不同的地理位置和不同的 IT 基础设施,以便运行无缝操作。IBM MQ 系列是一个广泛使用的平台。IBM MQ 系列为应用程序之间或用户与一组应用程序之间在不同系统上提供通信。由于其对 35 多种平台的支持以及集成不同的自动化系统的能力,随着应用程序通过互联网提供,它的普及程度不断提高。有关更多详细信息,请查看 IBM 网站:http://www-01.ibm.com/software/integration/wmq/。
背景
要开始本文,您需要熟悉异步通信、消息传递平台基础知识、Microsoft .NET 框架和 Microsoft C#.NET。
使用代码
这个 IBM MQ 系列示例的工作方式如下:我们有两个主要组件,MQAdapter
和 Utilities
。MQAdapter
包含所有负责定义 MQ 系列队列管理器、通道属性、打开和关闭连接以及执行 Push 和 Pop 功能的类。此外,我们还拥有自定义异常处理机制,负责将 MQ 错误包装成可读的错误,用于日志记录和审计。
MQAdapter
类:包含一个构造函数,用于实例化 MQ 适配器的新的实例,该实例接受多个参数。
public MQAdapter(string mqManager,string channel, string ipAddress,string putQueue,
string getQueue,int timeout, int charSet, int port)
{
try
{
// IBM MQ Series server address
MQEnvironment.Hostname = ipAddress;
// server channel name
MQEnvironment.Channel = channel;
MQEnvironment.Port = 1000;
mqQueueManagerName = mqManager;
mqRequestQueueName = putQueue;
mqResponseQueueName = getQueue;
characterSet = charSet;
pollingTimeout = timeout;
// Connect to an MQ Manager, and share the connection handle with other threads
mqQueueManager = new MQQueueManager(mqManager,channel, ipAddress);
// Open Queue for Inquiry, Put Message in, and fail if Queue Manager is stopping
mqPutQueue = mqQueueManager.AccessQueue(putQueue, MQC.MQOO_INQUIRE |
MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING|MQC.MQOO_SET_IDENTITY_CONTEXT);
mqGetQueue = mqQueueManager.AccessQueue(getQueue,MQC.MQOO_INPUT_AS_Q_DEF +
MQC.MQOO_FAIL_IF_QUIESCING);
}
catch (MQException mqe)
{
throw new MQAdapterException("Error Code: " +
MQAdapterErrorReasons.GetMQFailureReasonErrorCode(mqe.Reason));
}
}
Push MQ 消息:此函数负责连接到 MQ 服务器并将消息推送到提供的 put 队列中。
public string PushMQRequestMessage(string message)
{
try
{
MQMessage requestMessage = new MQMessage();
requestMessage.Persistence = 0;
requestMessage.ReplyToQueueName = mqResponseQueueName;
requestMessage.ReplyToQueueManagerName = mqQueueManagerName;
requestMessage.Format = MQC.MQFMT_STRING;
requestMessage.CharacterSet = characterSet;
requestMessage.MessageType = MQC.MQMT_REQUEST;
requestMessage.MessageId = HexaDecimalUtility.ConvertToBinary(GenerateMQMsgId());
requestMessage.CorrelationId = requestMessage.MessageId;
MQPutMessageOptions pmo = new MQPutMessageOptions();
pmo.Options = MQC.MQPMO_SET_IDENTITY_CONTEXT;
requestMessage.WriteString(message);
mqPutQueue.Put(requestMessage, pmo);
string _msgId = BinaryUtility.ConvertToHexaDecimal(requestMessage.MessageId);
return _msgId;
}
catch (MQException mqe)
{
// Close request Queue if still opened
if(mqPutQueue.OpenStatus)
mqPutQueue.Close();
// Close Queue manager if still opened
if(mqQueueManager.OpenStatus)
mqQueueManager.Close();
throw new MQAdapterException("Error Code: " +
MQAdapterErrorReasons.GetMQFailureReasonErrorCode(mqe.Reason));
}
}
POP MQ 消息:此函数负责连接到 MQ 服务器并弹出 MQ 消息(FIFO)。
public string GetMQResponseMessage(string correlationId)
{
MQMessage rsMsg = new MQMessage();
rsMsg.CorrelationId = HexaDecimalUtility.ConvertToBinary(correlationId);
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.Options = MQC.MQGMO_WAIT;
gmo.MatchOptions = MQC.MQMO_MATCH_CORREL_ID;
gmo.WaitInterval = pollingTimeout;
try
{
mqGetQueue.Get(rsMsg,gmo);
return rsMsg.ReadString(rsMsg.DataLength);
}
catch(MQException mqe)
{
// Close Reponse Queue if still opened
if(mqGetQueue.OpenStatus)
mqGetQueue.Close();
// Close Queue manager if still opened
if(mqQueueManager.OpenStatus)
mqQueueManager.Close();
// Check if it a timeout exception
if(MQAdapterErrorReasons.GetMQFailureReasonErrorCode(mqe.Reason) ==
"MQRC_NO_MSG_AVAILABLE")
throw new MQAdapterTimeoutException("Message with correlation Id " +
correlationId + " Timed out");
// MQ Exception
throw new MQAdapterException("Error Code: " +
MQAdapterErrorReasons.GetMQFailureReasonErrorCode(mqe.Reason));
}
}
为了关联请求和响应,并且由于我们使用的是异步通信方法,您必须提供一个唯一的键,以便 MQ 系列可以将请求与响应关联起来。(MQ 系列支持关联)。为了应用同步支持,实现了一个 SendMQRequestSync
函数,用于在回复队列上等待特定时间以获取响应。
要使用该组件,请按以下步骤操作
MQAdapter adapter = new MQAdapter("MqmanagerName",
"RequestChannelName","Queue.RequestName",
"Queue.ResponseName",timeout,characterset, port);
adapater.SendMQRequestSync(strMessage);
执行流程将是
- 打开与 MQ 服务器的连接。
- 将 MQ 请求推送到请求队列。
- 从响应队列中弹出 MQ 响应。
- 等待超时过期并抛出异常。
P.S.:所有 MQ 系列异常都将被包装。
历史
版本 1.0。