使用 ActiveMQ 实现 Jms 到 Jms 的桥接





4.00/5 (2投票s)
集成现有 jms 与 ActiveMq jms 系统
引言
不久前,我被分配到一个需要与另一个 JMS 系统集成的项目。当我搜索网络寻找详细的教程时,我没有找到任何能帮助我的详细教程。最终,我弄清楚了所有我需要设计和实现才能实现这种 JMS 到 JMS 桥接集成的部分。我决定在这篇文章中总结我学到的东西,这不仅是为了我自己的记录,也是为了帮助其他需要类似信息的人。我假设读者了解 Spring 和 JMS 等技术。
本文的目的是提供一个关于以下主题的详细教程
• 本文所需的软件。
• 如何发布消息到队列并进行消费。
• 如何将一个 Open-JMS 桥接到另一个 ActiveMQ-JMS。
我希望读者会喜欢这篇文章!如果您有任何问题或评论,请在 FAQ 部分留言。
背景
ActiveMQ 是一个开源的、Apache 2.0 许可的消息代理和 JMS 1.1 实现,以及企业集成模式提供者,它可以无缝集成到 Geronimo、轻量级容器和任何 Java 应用程序中。Apache ActiveMQ 速度快,支持多种跨语言客户端和协议,提供易于使用的企业集成模式和许多高级功能,同时完全支持 JMS 1.1 和 J2EE 1.4。
ActiveMQ 提供与其他实现 JMS 1.0.2 及以上规范的 JMS 提供者的桥接功能。JMS 桥可以与 ActiveMQ 代理共存,也可以远程运行。为了支持 JMS 1.0.2,队列和主题之间是分离的。
特点
• 支持多种跨语言客户端和协议,包括 Java、C、C++、C#、Ruby、Perl、Python、PHP• OpenWire,用于 Java、C、C++、C# 中的高性能客户端
• Stomp 支持,使得客户端可以轻松地用 C、Ruby、Perl、Python、PHP、ActionScript/Flash、Smalltalk 编写,以便与 ActiveMQ 以及任何其他流行的消息代理进行通信
• 完全支持 JMS 客户端和消息代理中的企业集成模式
• 支持许多高级功能,如消息组、虚拟目的地、通配符和复合目的地
• 完全支持 JMS 1.1 和 J2EE 1.4,支持瞬态、持久化、事务性和 XA 消息传递
• Spring 支持,因此 ActiveMQ 可以轻松地嵌入到 Spring 应用程序中,并使用 Spring 的 XML 配置机制进行配置
• 在 Geronimo、JBoss 4、GlassFish 和 WebLogic 等流行的 J2EE 服务器中进行过测试
• 包括用于入站和出站消息传递的 JCA 1.5 资源适配器,因此 ActiveMQ 应该可以在任何符合 J2EE 1.4 的服务器中自动部署
• 支持可插入的传输协议,如 in-VM、TCP、SSL、NIO、UDP、组播、JGroups 和 JXTA 传输
• 使用 JDBC 支持非常快速的持久化,以及高性能日志
• 专为高性能集群、客户端-服务器、点对点通信设计
• REST API,提供面向消息传递的、与技术无关的、语言中立的 Web API
• Ajax 支持,支持使用纯 DHTML 的 Web 流式传输,使 Web 浏览器可以成为消息传递结构的一部分
• CXF 和 Axis 支持,因此 ActiveMQ 可以轻松地集成到这两个 Web 服务堆栈中,以提供可靠的消息传递
• 可用作内存中的 JMS 提供者,非常适合单元测试 JMS 可用作内存中的 JMS 提供者,非常适合单元测试 JMS
OpenJMS 是 Sun Microsystems 的 Java Message Service API 1.1 规范的一个开源实现
特点
• 点对点和发布-订阅消息模型
• 消息的保证送达
• 同步和异步消息送达
• 使用 JDBC 进行持久化
• 本地事务
• 使用 SQL92 类选择器进行消息过滤
• 身份验证
• 管理 GUI
• 基于 XML 的配置文件
• 内存和数据库垃圾回收
• 自动客户端断线检测
• Applet 支持
• 与 Jakarta Tomcat 等 Servlet 容器集成
• 支持 TCP、RMI、HTTP 和 SSL 协议栈
• 支持大量目的地和订阅者
安装和配置所需软件
本教程需要设置和配置以下软件包
• JDK 1.5 (或更高版本)。
• Open Jms 0.7.7 下载链接< http://openjms.sourceforge.net/downloads.html
• Apache ActiveMQ 5.1 下载链接 https://activemq.apache.ac.cn/activemq-510-release.html
我也推荐使用 Eclipse IDE。但是,对于本教程,所有工作都可以使用简单的代码编辑器(如 UltraEdit32、Crimson Editor 或 Notepad)、命令提示符来完成。为了方便起见,本教程是在 Windows XP 上创建的。迁移到其他平台应该相对容易。
在 Windows XP 上安装 JDK 非常简单,只需下载 MSI 安装程序,然后将其安装到“Program Files”默认位置或直接安装到“C:\”。安装 JDK 后,建议配置系统变量
1. 创建系统环境变量“JAVA_HOME”,并将其指向 JDK 的基本目录(即 C:\Program Files\Java\jdk-1.5.0_17 或 C:\jdk-1.5.0_17)。
配置系统变量后,打开命令提示符并输入“java -version”。输出将显示系统中安装的 JDK 版本。这应该有助于验证 JDK 安装的成功。验证后,关闭命令提示符。
安装 OpenJMS 和 ActiveMQ
安装 OpenJMS 也非常简单,从 Open JMS 网站(http://openjms.sourceforge.net/downloads.html)下载二进制可执行归档文件。然后将 openjms-0.7.7-beta-1.zip 归档文件解压到“C:\”。这将把归档文件解压到“C:\ ”。“C:\ openjms-0.7.7-beta-1”将是 OpenJMS 的基本目录。
安装 ActiveMQ 也非常简单,从 Apache ActiveMQ 网站(https://activemq.apache.ac.cn/activemq-510-release.html)下载二进制可执行归档文件。然后将 apache-activemq-5.1.0-bin.zip 归档文件解压到“C:\”。这将把归档文件解压到“C:\ ”。“C:\ apache-activemq-5.1.0”将是 ActiveMQ 的基本目录。

创建一个简单的 Java 类用于发布消息
import java.util.Hashtable; import javax.jms.JMSException; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.QueueSender; import javax.jms.QueueSession; import javax.jms.Session; import javax.jms.TextMessage; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; public class SimpleMessageSender { private Context jndiContext; private QueueConnectionFactory factory; private QueueConnection queueConnection; private static final String URL = "tcp://:3035"; private static final String CONNECTION_FACTORY = "ConnectionFactory"; private static final String QUEUE_NAME = "jmstojmsBridgeQueue"; /** * Constructor initialize queue connection factory and connection. * @throws NamingException * @throws JMSException */ public SimpleMessageSender() throws NamingException, JMSException { Hashtable environment = new Hashtable(); environment.put(Context.INITIAL_CONTEXT_FACTORY, org.exolab.jms.jndi.InitialContextFactory.class.getName()); environment.put(Context.PROVIDER_URL, URL); jndiContext = new InitialContext(environment); factory = (QueueConnectionFactory)jndiContext.lookup(CONNECTION_FACTORY); queueConnection = factory.createQueueConnection(); } /** * @param message */ public void sendMessage(String message){ try { QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = queueSession.createQueue(QUEUE_NAME); QueueSender sender = queueSession.createSender(queue); TextMessage txtMessage = queueSession.createTextMessage("OPEN_JMS :"+message); sender.send(txtMessage); System.out.println("Message has been send successfully"); close(); } catch (JMSException e) { e.printStackTrace(); } catch (NamingException e) { e.printStackTrace(); } } private void close() throws JMSException, NamingException { if(queueConnection!=null){ queueConnection.close(); } if(jndiContext!=null){ jndiContext.close(); } factory=null; } public static void main(String args[]) throws Exception{ if (args.length<1){ System.out.println("Usage : java SimpleMessageSender" ); return; } SimpleMessageSender simpleMsgSender = new SimpleMessageSender(); simpleMsgSender.sendMessage(args[0]); } }
创建一个简单的 Java 类用于消费消息
import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueReceiver; import javax.jms.QueueSession; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class SimpleMessageListener implements MessageListener{ private static final String QUEUE_NAME = "jmstojmsBridgeQueue"; public void onMessage(Message message) { if (message!=null && message instanceof TextMessage){ TextMessage txtMessage = (TextMessage)message; try { System.out.println("Message :: "+txtMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } } public SimpleMessageListener() throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); QueueConnection queueConnection = factory.createQueueConnection(); queueConnection.start(); QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = queueSession.createQueue(QUEUE_NAME); QueueReceiver receiver = queueSession.createReceiver(queue); receiver.setMessageListener(this); } public static void main(String args[])throws Exception { new SimpleMessageListener(); System.out.println("Waiting for Message"); } }
在编译代码之前,您必须将所需的 jar 文件设置到类路径中
c:\jms>set CLASSPATH=.;<OPEN_JMS_INSTALL_DIR>\lib\jms-1.1.jar; <OPEN_JMS_INSTALL_DIR>\lib\openjms-0.7.7-beta-1.jar;%CLASSPATH%
c:\jms>javac SimpleMessageSender.java
D:\jms>set CLASSPATH=<ACTIVEMQ_INSTALL_DIR>\activemq-all-5.1.0.jar;%CLASSPATH%
c:\jms>javac SimpleMessageListener.java添加 JMS 桥接连接器
打开 <ACTIVEMQ_INSTALLED_DIR>/conf/activmq.xml 文件。
在 </transportConnectors> 元素之后添加以下代码。
<jmsBridgeConnectors> <jmsQueueConnector name="OpenJMSBridge-Inbound" jndiOutboundTemplate="#remoteJndi" outboundQueueConnectionFactoryName="ConnectionFactory" localQueueConnectionFactory="#localFactory"> <inboundQueueBridges> <inboundQueueBridge inboundQueueName="jmstojmsBridgeQueue"/> </inboundQueueBridges> </jmsQueueConnector> </jmsBridgeConnectors> Add the following code after the </broker> element <bean id="remoteJndi" class="org.springframework.jndi.JndiTemplate"> <property name="environment"> <props> <prop key="java.naming.factory.initial">org.exolab.jms.jndi.InitialContextFactory <prop key="java.naming.provider.url">tcp://localhost:3035 </props> </property> </bean> <bean id="localFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://:61616" /> </bean>
打开 <OPENJMS_INSTALLED_DIR>/config/Openjms.xml 文件。
注释掉以下代码。
<!-- Connector scheme="rmi"> <ConnectionFactories> <QueueConnectionFactory name="JmsQueueConnectionFactory" /> <TopicConnectionFactory name="JmsTopicConnectionFactory" /> <ConnectionFactory name="RMIConnectionFactory"/> </ConnectionFactories> </Connector -->
注意:如果您不想注释掉上述代码,则应修改 ActiveMQ 中的 JMX 监听端口,因为默认情况下 activemq 监听 1099 端口。openjms rmi 连接器也监听相同的端口号。
运行示例代码的步骤
步骤 1:将 Open-JMS jar 文件复制到 <ActiveMQ-INSTALLED_DIR>/lib 目录。
步骤 2:使用以下批处理文件启动 Open-JMS。
<openjms_install_dir>\bin>start startup.bat

步骤 3:使用以下批处理文件启动 ActiveMQ-JMS。
<activemq_install_dir>\bin\start activemq.bat

步骤 4:使用以下命令运行 SimpleMessageSender 类
java SimpleMessageSender “This is a Sample Message”
SimpleMessageSender 在控制台上的输出

步骤 5
打开另一个 cmd 窗口,设置类路径和 Java 路径,然后运行 SimpleMessageListener 类。
java SimpleMessageListener
SimpleMessageListener 在控制台上的输出:
以上是您需要编写的所有 Java 代码来发布和消费 Java 消息。让我们检查一下 activemq.xml jmsBridgeConnectors 元素
jmsBridgeConnectors:是 jms 桥接连接器的根元素。
jmsQueueConnector: 桥接到其他 JMS 队列提供商。
jndiOutboundTemplate: 如果未设置 localTopicConnection 或 localTopicConnectionFactory,则用于定位 ActiveMQ Connection 的 Connection Factory。
outboundQueueConnectionFactoryName: 用于初始化外部 JMS Connection(如果未设置 localQueueConnection)。
localQueueConnectionFactory: 用于初始化 ActiveMQ JMS Connection(如果未设置 localQueueConnection)。
inboundQueueBridge: 创建一个入站队列桥接。
inboundQueueName: 要从中接收的外部队列名称。有关这些元素的更多信息,请访问以下链接: ActiveMQ。
历史
版本 1.0