对JMS不怎么了解,最近在尝试应用整合sping+activeMQ.
如果要在sping,activeMQ整合后,实现任意两用户间的点对点消息发送及广播消息功能,这样该如何实现呢?
希望大家多多指教,特别是在sping整合activeMQ的配置,及实现上述功能的思路.感谢!!
如果要在sping,activeMQ整合后,实现任意两用户间的点对点消息发送及广播消息功能,这样该如何实现呢?
希望大家多多指教,特别是在sping整合activeMQ的配置,及实现上述功能的思路.感谢!!
调试欢乐多
http://www.ibm.com/developerworks/cn/java/wa-spring4/点对点域
发布/订阅域
http://www.blogjava.net/hk2000c/archive/2007/11/16/161069.html
Class JmsTemplate102You must specify the domain or style of messaging to be either Point-to-Point (Queues) or Publish/Subscribe (Topics), using the "pubSubDomain" property. Point-to-Point (Queues) is the default domain.The "pubSubDomain" property is an important setting due to the use of similar but seperate class hierarchies in the JMS 1.0.2 API. JMS 1.1 provides a new domain-independent API that allows for easy mix-and-match use of Point-to-Point and Publish/Subscribe domain. public JmsTemplate102(ConnectionFactory connectionFactory,
boolean pubSubDomain)1,spring好像都用的是同样的类接口和方法,但是[发布订阅模式]要设置一下初始化的boolean值
JmsTemplate102(ConnectionFactory connectionFactory,
boolean pubSubDomain=true)2,默认的是Point-to-Point (Queues)点对点的传输!spring的调用就那样,详细的可以参考http://static.springframework.org/spring/docs/1.1.5/api/org/springframework/jms/core/JmsTemplate102.html3activeMQ应该还是要另外设置模式的,呵呵!
JmsTemplate默认是jms1.1规范的,点对点可以发,收消息,参考的是springside的代码,散列及负载均衡,我这样配置的:<beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.org/config/1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.org/config/1.0 http://activemq.apache.org/schema/core/activemq-core-5.0.0.xsd">
<!--
使用spring的listenerContainer,消息用持久化保存,服务器重启不会丢失
-->
<amq:broker useJmx="false" persistent="true">
<amq:networkConnectors>
<amq:networkConnector uri="static:(tcp://192.168.0.37:61616)" dynamicOnly="false" conduitSubscriptions="true"></amq:networkConnector>
</amq:networkConnectors>
<amq:persistenceAdapter>
<amq:jdbcPersistenceAdapter id="jdbcAdapter" dataSource="#hsql-ds" createTablesOnStartup="true"
useDatabaseLock="false"/>
<!--
Mysql can setup useDatabaseLock="true",this is defualt
HSQLDB,MSSQL plz setup useDatabaseLock="false",
if u setup useDatabaseLock="true",u will catch error:
MSSQL Error Info:FOR UPDATE clause allowed only for DECLARE CURSOR
HSQLDB Error Info:FOR in statement [SELECT * FROM ACTIVEMQ_LOCK FOR UPDATE] see http://www.nabble.com/ActiveMQ-JDBC-Persistence-with-SQL-Server-tf2022248.html#a5560296
-->
</amq:persistenceAdapter>
</amq:broker> <!-- ActiveMQ connectionFactory -->
<amq:connectionFactory id="jmsConnectionFactory" brokerURL="tcp://192.168.0.37:61616?wireFormat.maxInactivityDuration=0"/> <!-- ActiveMQ destinations -->
<!-- <amq:queue name="destination" physicalName="org.apache.activemq.spring.Test.spring.embedded"/> -->
<amq:topic name="Topic" physicalName="com.model.bo.Author">
</amq:topic>
<amq:queue name="Queue" physicalName="com.model.bo.Author">
</amq:queue>
<bean id="destination" class="javax.jms.Destination"></bean> <!-- The HSQL Datasource that will be used by the Broker -->
<bean id="hsql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url">
<value>jdbc:mysql://192.168.0.37:3306/test?useUnicode=true&characterEncoding=utf8&relaxAutoCommit=true</value>
</property>
<property name="username" value="root"/>
<property name="password" value=""/>
<property name="poolPreparedStatements" value="true"/>
</bean> <!-- Spring JmsTemplate config -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory">
<!-- lets wrap in a pool to avoid creating a connection per send -->
<bean class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="jmsConnectionFactory"/>
</bean>
</property>
<!-- custom MessageConverter -->
<property name="messageConverter" ref="messageTConverter"/>
<property name="defaultDestination" ref="destination">
</property>
</bean> <!-- Message converter -->
<bean id="messageTConverter" class="com.activemq.MessageTConverter"/>
<!-- POJO which send Message uses Spring JmsTemplate -->
<!-- <bean id="topicMessageTProducer" class="com.activemq.TopicMessageTProducerImpl">
<property name="jmsTemplate" ref="jmsTemplate"/>
<property name="destination" ref="destination"/>
</bean>
<bean id="queueMessageTProducer" class="com.activemq.QueueMessageTProducerImpl">
<property name="jmsTemplate" ref="jmsTemplate"/>
<property name="destination" ref="destination"/>
</bean> -->
<bean id="topicMessageTProducer" class="com.activemq.TopicMessageTProducerImpl">
<property name="jmsTemplate" ref="jmsTemplate"/>
<property name="destination" ref="Topic"/>
</bean>
<bean id="queueMessageTProducer" class="com.activemq.QueueMessageTProducerImpl">
<property name="jmsTemplate" ref="jmsTemplate"/>
<property name="destination" ref="Queue"/>
</bean> <!-- Message Driven POJO (MDP) -->
<bean id="topicMessageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="com.activemq.TopicMessageTConsumerImpl">
</bean>
</constructor-arg>
<!-- may be other method -->
<property name="defaultListenerMethod" value="findMesageT"/>
<!-- custom MessageConverter define -->
<property name="messageConverter" ref="messageTConverter"/>
</bean>
<bean id="queueMessageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="com.activemq.QueueMessageTConsumerImpl">
</bean>
</constructor-arg>
<!-- may be other method -->
<property name="defaultListenerMethod" value="findMesageT"/>
<!-- custom MessageConverter define -->
<property name="messageConverter" ref="messageTConverter"/>
</bean> <!-- listener container,MDP无需实现接口 -->
<bean id="topicListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsConnectionFactory"/>
<property name="destination" ref="Topic"/>
<property name="messageListener" ref="topicMessageListener"/>
</bean>
<bean id="queueListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsConnectionFactory"/>
<property name="destination" ref="Queue"/>
<property name="messageListener" ref="queueMessageListener"/>
</bean>
</beans>但是有点晕:比如我以A用户登录,想给B用户发送消息,只能B用户收到消息,在这种整合下如何实现?
感谢!!
http://www.ibm.com/developerworks/cn/java/wa-spring4/
发个收藏的链接我也会,我也有,关于activeMQ与spring整合,我在网上也google了好多资料,
可能是咱悟性比较差,水平很低,看了spring的资料,看了activeMQ的资料,也没清楚弄明白spring+activeMQ像1+1这么简单的事情只想看到有实际spring+activeMQ整合经验的朋友来指点下,发链接的朋友还是不用了吧我想发链接的朋友连我上边的那个配置都没看吧,我也不指望发链接的朋友给我指出那个配置问题了。谢谢
但是有点晕:比如我以A用户登录,想给B用户发送消息,只能B用户收到消息,在这种整合下如何实现?
这里有两种方式,点对点,这种方式就不说了,你只需要创建一个接收者,其实就是你上面的queueListenerContainer,只需要指向的路径和发布者是一样的,就会接收到信息,也就是说A发送到B。
如果是订阅和发布的形式,那么创建的接收者的时候你就需要指定他是连接到那个TOPIC,还可以根据TOPIC的消息进行检索,这个检索的条件和SQL语句是一模一样的,比如下面这样
TopicSubscriber receiver = session.createDurableSubscriber(topic, name,"id not in ('11','22','33','44','55')",true);
参数的第一个就是创建的TOPIC,第二个参数,就是你一个客户端去订阅到时候的clientid,然后后面就是根据发布的消息去接收了,
其中clientid一般都是向下面这样去设置
TopicConnection conn = factory.createTopicConnection();
conn.setClientID ("1111");
而要获得查询条件 比如上面有一个id not in ('11','22','33','44','55'),那么发布的时候就需要设置ID这个属性。
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.org/config/1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://activemq.org/config/1.0 http://activemq.apache.org/schema/core/activemq-core-5.0-SNAPSHOT.xsd" >
<!--
推荐版本,使用spring的listenerContainer,消息用数据库持久化保存,服务器重启不会丢失
-->
<!-- 连接外部的activeMQ
<amq:connectionFactory id="jmsConnectionFactory" brokerURL="tcp://localhost:61616?wireFormat.maxInactivityDuration=0" />
-->
<!-- ActiveMQ connectionFactory 连接内部的-->
<amq:connectionFactory id="jmsConnectionFactory" brokerURL="vm://localhost"/> <!-- ActiveMQ destinations 使用Queue方式 -->
<amq:queue name="QUEUE" physicalName="JMS-TEST-QUEUE"/>
<!-- 使用topic方式-->
<amq:topic name="TOPIC" physicalName="JMS-TEST-TOPIC" /> <!-- The msSQL Datasource that will be used by the Broker
<bean id="mssql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="net.sourceforge.jtds.jdbc.Driver"/>
<property name="url">
<value>jdbc:jtds:sqlserver://localhost:1433/wmjqxgl;SelectMethod=cursor;charset=GBK;tds=8.0;lastupdatecount=true</value>
</property>
<property name="username" value="sa"/>
<property name="password" value="sa"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
-->
<!-- Spring JmsTemplate config -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory">
<!-- lets wrap in a pool to avoid creating a connection per send -->
<bean class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="jmsConnectionFactory"/>
</bean>
</property>
<!-- custom MessageConverter -->
<property name="messageConverter" ref="orderMessageConverter"/>
</bean> <!-- OrderMessage converter -->
<bean id="orderMessageConverter" class="com.work.activemq.OrderMessageConverter"/> <!-- POJO which send Message uses Spring JmsTemplate -->
<bean id="orderMessageProducer" class="com.work.activemq.OrderMessageProducer">
<property name="template" ref="jmsTemplate"/>
<property name="destination" ref="QUEUE"/>
</bean>
<!-- topic 方式信息发送者 -->
<bean id="topicMessageProducer" class="com.work.activemq.TopicMessageProducer">
<property name="template" ref="jmsTemplate" />
<property name="destination" ref="TOPIC" />
</bean>
<!-- consumer1 for topic a 消息消费者 -->
<bean id="topicConsumerA" class="com.work.activemq.TopicConsumerA" /> <!-- consumer2 for topic a -->
<bean id="topicConsumerB" class="com.work.activemq.TopicConsumerB" />
<!-- Message Listener for -->
<bean id="topicListenerA" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg ref="topicConsumerA" />
<!-- may be other method -->
<property name="defaultListenerMethod" value="receive" />
<!-- custom MessageConverter define -->
<property name="messageConverter" ref="orderMessageConverter" />
</bean> <bean id="topicListenerB" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg ref="topicConsumerB" />
<!-- may be other method -->
<property name="defaultListenerMethod" value="receive" />
<!-- custom MessageConverter define -->
<property name="messageConverter" ref="orderMessageConverter" />
</bean>
<!-- Message Driven POJO (MDP) 通过queue的方式发送消息,一个发送一个接收-->
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="com.work.activemq.OrderMessageConsumer">
<!-- <property name="mailService" ref="mailService"/> -->
</bean>
</constructor-arg>
<!-- may be other method -->
<property name="defaultListenerMethod" value="sendEmail"/>
<!-- custom MessageConverter define -->
<property name="messageConverter" ref="orderMessageConverter"/>
</bean> <!-- listener container,MDP无需实现接口 -->
<bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsConnectionFactory"/>
<property name="destination" ref="QUEUE"/>
<property name="messageListener" ref="messageListener"/>
</bean>
<!-- listener container,MDP无需实现接口 -->
<bean id="topicListenerContainerA" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsConnectionFactory" />
<property name="destination" ref="TOPIC" />
<property name="messageListener" ref="topicListenerA" />
</bean> <bean id="topicListenerContainerB" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsConnectionFactory" />
<property name="destination" ref="TOPIC" />
<property name="messageListener" ref="topicListenerB" />
</bean>
<bean id="orderNotify" class="com.work.activemq.OrderNotifyImpl">
<property name="orderMessageProducer" ref="orderMessageProducer" />
<property name="topicMessageProducer" ref="topicMessageProducer" />
</bean>
<!-- -->
</beans>