65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 ActiveMQ 实现 Jms 到 Jms 的桥接

starIconstarIconstarIconstarIconemptyStarIcon

4.00/5 (2投票s)

2009年4月28日

CPOL

6分钟阅读

viewsIcon

65947

downloadIcon

438

集成现有 jms 与 ActiveMq jms 系统

引言

不久前,我被分配到一个需要与另一个 JMS 系统集成的项目。当我搜索网络寻找详细的教程时,我没有找到任何能帮助我的详细教程。最终,我弄清楚了所有我需要设计和实现才能实现这种 JMS 到 JMS 桥接集成的部分。我决定在这篇文章中总结我学到的东西,这不仅是为了我自己的记录,也是为了帮助其他需要类似信息的人。我假设读者了解 Spring 和 JMS 等技术。

本文的目的是提供一个关于以下主题的详细教程

• 本文所需的软件。

• 如何发布消息到队列并进行消费。

• 如何将一个 Open-JMS 桥接到另一个 ActiveMQ-JMS。

我希望读者会喜欢这篇文章!如果您有任何问题或评论,请在 FAQ 部分留言。

背景

ActiveMQ 是一个开源的、Apache 2.0 许可的消息代理和 JMS 1.1 实现,以及企业集成模式提供者,它可以无缝集成到 Geronimo、轻量级容器和任何 Java 应用程序中。Apache ActiveMQ 速度快,支持多种跨语言客户端和协议,提供易于使用的企业集成模式和许多高级功能,同时完全支持 JMS 1.1 和 J2EE 1.4。

ActiveMQ 提供与其他实现 JMS 1.0.2 及以上规范的 JMS 提供者的桥接功能。JMS 桥可以与 ActiveMQ 代理共存,也可以远程运行。为了支持 JMS 1.0.2,队列和主题之间是分离的。

特点

• 支持多种跨语言客户端和协议,包括 Java、C、C++、C#、Ruby、Perl、Python、PHP

• OpenWire,用于 Java、C、C++、C# 中的高性能客户端

• Stomp 支持,使得客户端可以轻松地用 C、Ruby、Perl、Python、PHP、ActionScript/Flash、Smalltalk 编写,以便与 ActiveMQ 以及任何其他流行的消息代理进行通信

• 完全支持 JMS 客户端和消息代理中的企业集成模式

• 支持许多高级功能,如消息组、虚拟目的地、通配符和复合目的地

• 完全支持 JMS 1.1 和 J2EE 1.4,支持瞬态、持久化、事务性和 XA 消息传递

• Spring 支持,因此 ActiveMQ 可以轻松地嵌入到 Spring 应用程序中,并使用 Spring 的 XML 配置机制进行配置

• 在 Geronimo、JBoss 4、GlassFish 和 WebLogic 等流行的 J2EE 服务器中进行过测试

• 包括用于入站和出站消息传递的 JCA 1.5 资源适配器,因此 ActiveMQ 应该可以在任何符合 J2EE 1.4 的服务器中自动部署

• 支持可插入的传输协议,如 in-VM、TCP、SSL、NIO、UDP、组播、JGroups 和 JXTA 传输

• 使用 JDBC 支持非常快速的持久化,以及高性能日志

• 专为高性能集群、客户端-服务器、点对点通信设计

• REST API,提供面向消息传递的、与技术无关的、语言中立的 Web API

• Ajax 支持,支持使用纯 DHTML 的 Web 流式传输,使 Web 浏览器可以成为消息传递结构的一部分

• CXF 和 Axis 支持,因此 ActiveMQ 可以轻松地集成到这两个 Web 服务堆栈中,以提供可靠的消息传递

• 可用作内存中的 JMS 提供者,非常适合单元测试 JMS 可用作内存中的 JMS 提供者,非常适合单元测试 JMS

OpenJMS 是 Sun Microsystems 的 Java Message Service API 1.1 规范的一个开源实现

特点

• 点对点和发布-订阅消息模型

• 消息的保证送达

• 同步和异步消息送达

• 使用 JDBC 进行持久化

• 本地事务

• 使用 SQL92 类选择器进行消息过滤

• 身份验证

• 管理 GUI

• 基于 XML 的配置文件

• 内存和数据库垃圾回收

• 自动客户端断线检测

• Applet 支持

• 与 Jakarta Tomcat 等 Servlet 容器集成

• 支持 TCP、RMI、HTTP 和 SSL 协议栈

• 支持大量目的地和订阅者

安装和配置所需软件

本教程需要设置和配置以下软件包

• JDK 1.5 (或更高版本)。

• Open Jms 0.7.7 下载链接< http://openjms.sourceforge.net/downloads.html

• Apache ActiveMQ 5.1 下载链接 https://activemq.apache.ac.cn/activemq-510-release.html

我也推荐使用 Eclipse IDE。但是,对于本教程,所有工作都可以使用简单的代码编辑器(如 UltraEdit32、Crimson Editor 或 Notepad)、命令提示符来完成。为了方便起见,本教程是在 Windows XP 上创建的。迁移到其他平台应该相对容易。

在 Windows XP 上安装 JDK 非常简单,只需下载 MSI 安装程序,然后将其安装到“Program Files”默认位置或直接安装到“C:\”。安装 JDK 后,建议配置系统变量

1. 创建系统环境变量“JAVA_HOME”,并将其指向 JDK 的基本目录(即 C:\Program Files\Java\jdk-1.5.0_17 或 C:\jdk-1.5.0_17)。

配置系统变量后,打开命令提示符并输入“java -version”。输出将显示系统中安装的 JDK 版本。这应该有助于验证 JDK 安装的成功。验证后,关闭命令提示符。

安装 OpenJMS 和 ActiveMQ

安装 OpenJMS 也非常简单,从 Open JMS 网站(http://openjms.sourceforge.net/downloads.html)下载二进制可执行归档文件。然后将 openjms-0.7.7-beta-1.zip 归档文件解压到“C:\”。这将把归档文件解压到“C:\ ”。“C:\ openjms-0.7.7-beta-1”将是 OpenJMS 的基本目录。

安装 ActiveMQ 也非常简单,从 Apache ActiveMQ 网站(https://activemq.apache.ac.cn/activemq-510-release.html)下载二进制可执行归档文件。然后将 apache-activemq-5.1.0-bin.zip 归档文件解压到“C:\”。这将把归档文件解压到“C:\ ”。“C:\ apache-activemq-5.1.0”将是 ActiveMQ 的基本目录。

创建一个简单的 Java 类用于发布消息

import java.util.Hashtable;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class SimpleMessageSender {

	private Context jndiContext;
	
	private QueueConnectionFactory factory;
	
	private QueueConnection queueConnection;
	
	private static final String URL = "tcp://:3035";
	
	private static final String CONNECTION_FACTORY = "ConnectionFactory";
	
	private static final String QUEUE_NAME = "jmstojmsBridgeQueue";
	
	/**
	 * Constructor initialize queue connection factory and connection.
	 * @throws NamingException 
	 * @throws JMSException 
	 */
	public SimpleMessageSender() throws NamingException, JMSException
	{
		Hashtable environment = new Hashtable();
		environment.put(Context.INITIAL_CONTEXT_FACTORY, org.exolab.jms.jndi.InitialContextFactory.class.getName());
		environment.put(Context.PROVIDER_URL, URL);
		jndiContext = new InitialContext(environment);
		factory = (QueueConnectionFactory)jndiContext.lookup(CONNECTION_FACTORY);
		queueConnection = factory.createQueueConnection();
	}
	/**
	 * @param message
	 */
	public void sendMessage(String message){
		 
		try {
		
			QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
			Queue queue = queueSession.createQueue(QUEUE_NAME);
			QueueSender sender = queueSession.createSender(queue);
			TextMessage txtMessage = queueSession.createTextMessage("OPEN_JMS :"+message);
			sender.send(txtMessage);
			System.out.println("Message has been send successfully");
			close();
		} catch (JMSException e) {
			e.printStackTrace();
		} catch (NamingException e) {
			
			e.printStackTrace();
		}
	}
	private void close() throws JMSException, NamingException
	{
	   if(queueConnection!=null){
		   queueConnection.close();
	   }
	   if(jndiContext!=null){
		   jndiContext.close();
	   }
	   factory=null;
	}
	
	
	public static void main(String args[]) throws Exception{
		if (args.length<1){
			System.out.println("Usage : java SimpleMessageSender ");
			return;
		}
		SimpleMessageSender simpleMsgSender = new SimpleMessageSender();
		simpleMsgSender.sendMessage(args[0]);
		
	}
}

		

创建一个简单的 Java 类用于消费消息

import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueReceiver; import javax.jms.QueueSession; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class SimpleMessageListener implements MessageListener{ private static final String QUEUE_NAME = "jmstojmsBridgeQueue"; public void onMessage(Message message) { if (message!=null && message instanceof TextMessage){ TextMessage txtMessage = (TextMessage)message; try { System.out.println("Message :: "+txtMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } } public SimpleMessageListener() throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); QueueConnection queueConnection = factory.createQueueConnection(); queueConnection.start(); QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = queueSession.createQueue(QUEUE_NAME); QueueReceiver receiver = queueSession.createReceiver(queue); receiver.setMessageListener(this); } public static void main(String args[])throws Exception { new SimpleMessageListener(); System.out.println("Waiting for Message"); } }

在编译代码之前,您必须将所需的 jar 文件设置到类路径中

c:\jms>set CLASSPATH=.;<OPEN_JMS_INSTALL_DIR>\lib\jms-1.1.jar; <OPEN_JMS_INSTALL_DIR>\lib\openjms-0.7.7-beta-1.jar;%CLASSPATH%

c:\jms>javac SimpleMessageSender.java

D:\jms>set CLASSPATH=<ACTIVEMQ_INSTALL_DIR>\activemq-all-5.1.0.jar;%CLASSPATH%

c:\jms>javac SimpleMessageListener.java

添加 JMS 桥接连接器

打开 <ACTIVEMQ_INSTALLED_DIR>/conf/activmq.xml 文件。

在 </transportConnectors> 元素之后添加以下代码。

<jmsBridgeConnectors> <jmsQueueConnector name="OpenJMSBridge-Inbound" jndiOutboundTemplate="#remoteJndi" outboundQueueConnectionFactoryName="ConnectionFactory" localQueueConnectionFactory="#localFactory"> <inboundQueueBridges> <inboundQueueBridge inboundQueueName="jmstojmsBridgeQueue"/> </inboundQueueBridges> </jmsQueueConnector> </jmsBridgeConnectors> Add the following code after the </broker> element <bean id="remoteJndi" class="org.springframework.jndi.JndiTemplate"> <property name="environment"> <props> <prop key="java.naming.factory.initial">org.exolab.jms.jndi.InitialContextFactory <prop key="java.naming.provider.url">tcp://localhost:3035 </props> </property> </bean> <bean id="localFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://:61616" /> </bean>

打开 <OPENJMS_INSTALLED_DIR>/config/Openjms.xml 文件。

注释掉以下代码。

<!-- Connector scheme="rmi"> <ConnectionFactories> <QueueConnectionFactory name="JmsQueueConnectionFactory" /> <TopicConnectionFactory name="JmsTopicConnectionFactory" /> <ConnectionFactory name="RMIConnectionFactory"/> </ConnectionFactories> </Connector -->

注意:如果您不想注释掉上述代码,则应修改 ActiveMQ 中的 JMX 监听端口,因为默认情况下 activemq 监听 1099 端口。openjms rmi 连接器也监听相同的端口号。

运行示例代码的步骤

步骤 1:将 Open-JMS jar 文件复制到 <ActiveMQ-INSTALLED_DIR>/lib 目录。

步骤 2:使用以下批处理文件启动 Open-JMS。

<openjms_install_dir>\bin>start startup.bat

步骤 3:使用以下批处理文件启动 ActiveMQ-JMS。

<activemq_install_dir>\bin\start activemq.bat

步骤 4:使用以下命令运行 SimpleMessageSender 类

java SimpleMessageSender “This is a Sample Message”

SimpleMessageSender 在控制台上的输出

步骤 5

打开另一个 cmd 窗口,设置类路径和 Java 路径,然后运行 SimpleMessageListener 类。

java SimpleMessageListener

SimpleMessageListener 在控制台上的输出:

以上是您需要编写的所有 Java 代码来发布和消费 Java 消息。让我们检查一下 activemq.xml jmsBridgeConnectors 元素

jmsBridgeConnectors:是 jms 桥接连接器的根元素。

jmsQueueConnector: 桥接到其他 JMS 队列提供商。

jndiOutboundTemplate: 如果未设置 localTopicConnection 或 localTopicConnectionFactory,则用于定位 ActiveMQ Connection 的 Connection Factory。

outboundQueueConnectionFactoryName: 用于初始化外部 JMS Connection(如果未设置 localQueueConnection)。

localQueueConnectionFactory: 用于初始化 ActiveMQ JMS Connection(如果未设置 localQueueConnection)。

inboundQueueBridge: 创建一个入站队列桥接。

inboundQueueName: 要从中接收的外部队列名称。有关这些元素的更多信息,请访问以下链接: ActiveMQ

历史

版本 1.0

© . All rights reserved.