对JMS不怎么了解,最近在尝试应用整合sping+activeMQ.
如果要在sping,activeMQ整合后,实现任意两用户间的点对点消息发送及广播消息功能,这样该如何实现呢?
希望大家多多指教,特别是在sping整合activeMQ的配置,及实现上述功能的思路.感谢!!

解决方案 »

  1.   

    收藏此帖,过几天也要用jms,望楼主多多赐教!
      

  2.   

    http://whitesock.javaeye.com/blog/164925
    http://www.ibm.com/developerworks/cn/java/wa-spring4/点对点域
    发布/订阅域
    http://www.blogjava.net/hk2000c/archive/2007/11/16/161069.html
      

  3.   

    感谢楼上的这位朋友,那些链接的文章我前几天也看过了可能是我领悟能力比较差吧,现在还是不清楚在sping+activeMQ整合情况下如何来实现P2p,及订阅.在sping中如何来配置实现上述功能?
      

  4.   

    org.springframework.jms.core
    Class JmsTemplate102You must specify the domain or style of messaging to be either Point-to-Point (Queues) or Publish/Subscribe (Topics), using the "pubSubDomain" property. Point-to-Point (Queues) is the default domain.The "pubSubDomain" property is an important setting due to the use of similar but seperate class hierarchies in the JMS 1.0.2 API. JMS 1.1 provides a new domain-independent API that allows for easy mix-and-match use of Point-to-Point and Publish/Subscribe domain. public JmsTemplate102(ConnectionFactory connectionFactory,
                          boolean pubSubDomain)1,spring好像都用的是同样的类接口和方法,但是[发布订阅模式]要设置一下初始化的boolean值
    JmsTemplate102(ConnectionFactory connectionFactory,
                          boolean pubSubDomain=true)2,默认的是Point-to-Point (Queues)点对点的传输!spring的调用就那样,详细的可以参考http://static.springframework.org/spring/docs/1.1.5/api/org/springframework/jms/core/JmsTemplate102.html3activeMQ应该还是要另外设置模式的,呵呵!
      

  5.   

    spring就是实例化JmsTemplate102这个类的时候指定参数就好了!
      

  6.   


    JmsTemplate默认是jms1.1规范的,点对点可以发,收消息,参考的是springside的代码,散列及负载均衡,我这样配置的:<beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.org/config/1.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
     http://activemq.org/config/1.0 http://activemq.apache.org/schema/core/activemq-core-5.0.0.xsd">
    <!--
        使用spring的listenerContainer,消息用持久化保存,服务器重启不会丢失
    -->
    <amq:broker useJmx="false" persistent="true">
    <amq:networkConnectors>
    <amq:networkConnector uri="static:(tcp://192.168.0.37:61616)" dynamicOnly="false" conduitSubscriptions="true"></amq:networkConnector>
    </amq:networkConnectors>
    <amq:persistenceAdapter>
    <amq:jdbcPersistenceAdapter id="jdbcAdapter" dataSource="#hsql-ds" createTablesOnStartup="true"
    useDatabaseLock="false"/>
    <!-- 
    Mysql can setup useDatabaseLock="true",this is defualt
    HSQLDB,MSSQL plz setup useDatabaseLock="false",
    if u setup useDatabaseLock="true",u will catch error:
    MSSQL Error Info:FOR UPDATE clause allowed only for DECLARE CURSOR 
    HSQLDB Error Info:FOR in statement [SELECT * FROM ACTIVEMQ_LOCK FOR UPDATE] see http://www.nabble.com/ActiveMQ-JDBC-Persistence-with-SQL-Server-tf2022248.html#a5560296
    -->
    </amq:persistenceAdapter>

    </amq:broker> <!--  ActiveMQ connectionFactory  -->
    <amq:connectionFactory id="jmsConnectionFactory" brokerURL="tcp://192.168.0.37:61616?wireFormat.maxInactivityDuration=0"/> <!--  ActiveMQ destinations  -->
    <!-- <amq:queue name="destination" physicalName="org.apache.activemq.spring.Test.spring.embedded"/> -->

    <amq:topic name="Topic" physicalName="com.model.bo.Author">
     </amq:topic>
    <amq:queue name="Queue" physicalName="com.model.bo.Author">
    </amq:queue>
    <bean id="destination" class="javax.jms.Destination"></bean> <!-- The HSQL Datasource that will be used by the Broker -->
    <bean id="hsql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    <property name="url">
    <value>jdbc:mysql://192.168.0.37:3306/test?useUnicode=true&amp;characterEncoding=utf8&amp;relaxAutoCommit=true</value>
    </property>
    <property name="username" value="root"/>
    <property name="password" value=""/>
    <property name="poolPreparedStatements" value="true"/>
    </bean> <!--  Spring JmsTemplate config -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory">
    <!--  lets wrap in a pool to avoid creating a connection per send -->
    <bean class="org.springframework.jms.connection.SingleConnectionFactory">
    <property name="targetConnectionFactory" ref="jmsConnectionFactory"/>
    </bean>
    </property>
    <!-- custom MessageConverter -->
    <property name="messageConverter" ref="messageTConverter"/>
    <property name="defaultDestination" ref="destination">
    </property>
    </bean> <!--  Message converter  -->
    <bean id="messageTConverter" class="com.activemq.MessageTConverter"/>
    <!-- POJO which send Message uses  Spring JmsTemplate -->
    <!-- <bean id="topicMessageTProducer" class="com.activemq.TopicMessageTProducerImpl">
    <property name="jmsTemplate" ref="jmsTemplate"/>
    <property name="destination" ref="destination"/>
    </bean>
    <bean id="queueMessageTProducer" class="com.activemq.QueueMessageTProducerImpl">
    <property name="jmsTemplate" ref="jmsTemplate"/>
    <property name="destination" ref="destination"/>
    </bean> -->
    <bean id="topicMessageTProducer" class="com.activemq.TopicMessageTProducerImpl">
    <property name="jmsTemplate" ref="jmsTemplate"/>
    <property name="destination" ref="Topic"/>
    </bean>
    <bean id="queueMessageTProducer" class="com.activemq.QueueMessageTProducerImpl">
    <property name="jmsTemplate" ref="jmsTemplate"/>
    <property name="destination" ref="Queue"/>
    </bean> <!--  Message Driven POJO (MDP) -->
    <bean id="topicMessageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
    <constructor-arg>
    <bean class="com.activemq.TopicMessageTConsumerImpl">
    </bean>
    </constructor-arg>
    <!--  may be other method -->
    <property name="defaultListenerMethod" value="findMesageT"/>
    <!-- custom MessageConverter define -->
    <property name="messageConverter" ref="messageTConverter"/>
    </bean>
    <bean id="queueMessageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
    <constructor-arg>
    <bean class="com.activemq.QueueMessageTConsumerImpl">
    </bean>
    </constructor-arg>
    <!--  may be other method -->
    <property name="defaultListenerMethod" value="findMesageT"/>
    <!-- custom MessageConverter define -->
    <property name="messageConverter" ref="messageTConverter"/>
    </bean> <!--  listener container,MDP无需实现接口 -->
    <bean id="topicListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="jmsConnectionFactory"/>
    <property name="destination" ref="Topic"/>
    <property name="messageListener" ref="topicMessageListener"/>
    </bean>
    <bean id="queueListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="jmsConnectionFactory"/>
    <property name="destination" ref="Queue"/>
    <property name="messageListener" ref="queueMessageListener"/>
    </bean>
    </beans>但是有点晕:比如我以A用户登录,想给B用户发送消息,只能B用户收到消息,在这种整合下如何实现?
      

  7.   

    哪位熟悉spring+activeMQ整合开发的高手给指点指点啊- -...
    感谢!!
      

  8.   

    http://whitesock.javaeye.com/blog/164925 
    http://www.ibm.com/developerworks/cn/java/wa-spring4/ 
      

  9.   

    我也不想说什么了,感谢那些关注的朋友!
    发个收藏的链接我也会,我也有,关于activeMQ与spring整合,我在网上也google了好多资料,
    可能是咱悟性比较差,水平很低,看了spring的资料,看了activeMQ的资料,也没清楚弄明白spring+activeMQ像1+1这么简单的事情只想看到有实际spring+activeMQ整合经验的朋友来指点下,发链接的朋友还是不用了吧我想发链接的朋友连我上边的那个配置都没看吧,我也不指望发链接的朋友给我指出那个配置问题了。谢谢
      

  10.   

    不好意思 前段时间时间比较忙,说给你发些资料的,一直没上CSDN,我看了你上面的配置文件,基本上配置齐全了,你说如何实现
    但是有点晕:比如我以A用户登录,想给B用户发送消息,只能B用户收到消息,在这种整合下如何实现?
    这里有两种方式,点对点,这种方式就不说了,你只需要创建一个接收者,其实就是你上面的queueListenerContainer,只需要指向的路径和发布者是一样的,就会接收到信息,也就是说A发送到B。
    如果是订阅和发布的形式,那么创建的接收者的时候你就需要指定他是连接到那个TOPIC,还可以根据TOPIC的消息进行检索,这个检索的条件和SQL语句是一模一样的,比如下面这样
    TopicSubscriber receiver = session.createDurableSubscriber(topic, name,"id not in ('11','22','33','44','55')",true);
    参数的第一个就是创建的TOPIC,第二个参数,就是你一个客户端去订阅到时候的clientid,然后后面就是根据发布的消息去接收了,
    其中clientid一般都是向下面这样去设置
    TopicConnection conn = factory.createTopicConnection();
    conn.setClientID ("1111");
    而要获得查询条件 比如上面有一个id not in ('11','22','33','44','55'),那么发布的时候就需要设置ID这个属性。
      

  11.   

    我倒是配置成功了,不过,发布订阅方式,每条生产的信息总是被消费两次,不知道是为什么。
    <beans
    xmlns="http://www.springframework.org/schema/beans"
    xmlns:amq="http://activemq.org/config/1.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://activemq.org/config/1.0 http://activemq.apache.org/schema/core/activemq-core-5.0-SNAPSHOT.xsd" >
     <!-- 
         推荐版本,使用spring的listenerContainer,消息用数据库持久化保存,服务器重启不会丢失
         -->
         <!-- 连接外部的activeMQ
    <amq:connectionFactory id="jmsConnectionFactory" brokerURL="tcp://localhost:61616?wireFormat.maxInactivityDuration=0" />
    -->
      
    <!--  ActiveMQ connectionFactory  连接内部的-->
    <amq:connectionFactory id="jmsConnectionFactory" brokerURL="vm://localhost"/>  <!--  ActiveMQ destinations  使用Queue方式 -->
    <amq:queue name="QUEUE" physicalName="JMS-TEST-QUEUE"/>
    <!--  使用topic方式-->
    <amq:topic name="TOPIC" physicalName="JMS-TEST-TOPIC" /> <!-- The msSQL Datasource that will be used by the Broker
    <bean id="mssql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="net.sourceforge.jtds.jdbc.Driver"/>
    <property name="url">
    <value>jdbc:jtds:sqlserver://localhost:1433/wmjqxgl;SelectMethod=cursor;charset=GBK;tds=8.0;lastupdatecount=true</value>
    </property>
    <property name="username" value="sa"/>
    <property name="password" value="sa"/>
    <property name="poolPreparedStatements" value="true"/>
    </bean>
     -->
    <!--  Spring JmsTemplate config -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory">
    <!--  lets wrap in a pool to avoid creating a connection per send -->
    <bean class="org.springframework.jms.connection.SingleConnectionFactory">
    <property name="targetConnectionFactory" ref="jmsConnectionFactory"/>
    </bean>
    </property>
    <!-- custom MessageConverter -->
    <property name="messageConverter" ref="orderMessageConverter"/>
    </bean> <!--  OrderMessage converter  -->
    <bean id="orderMessageConverter" class="com.work.activemq.OrderMessageConverter"/> <!-- POJO which send Message uses  Spring JmsTemplate -->
    <bean id="orderMessageProducer" class="com.work.activemq.OrderMessageProducer">
    <property name="template" ref="jmsTemplate"/>
    <property name="destination" ref="QUEUE"/>
    </bean>
    <!-- topic 方式信息发送者 -->
    <bean id="topicMessageProducer" class="com.work.activemq.TopicMessageProducer">
    <property name="template" ref="jmsTemplate" />
    <property name="destination" ref="TOPIC" />
    </bean>

        <!-- consumer1 for topic a 消息消费者 -->
        <bean id="topicConsumerA" class="com.work.activemq.TopicConsumerA" />    <!-- consumer2 for topic a -->
        <bean id="topicConsumerB" class="com.work.activemq.TopicConsumerB" />
        
        
        <!-- Message Listener for  -->
    <bean id="topicListenerA" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
    <constructor-arg ref="topicConsumerA" />
    <!--  may be other method -->
    <property name="defaultListenerMethod" value="receive" />
    <!-- custom MessageConverter define -->
    <property name="messageConverter" ref="orderMessageConverter" />
    </bean> <bean id="topicListenerB" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
    <constructor-arg ref="topicConsumerB" />
    <!--  may be other method -->
    <property name="defaultListenerMethod" value="receive" />
    <!-- custom MessageConverter define -->
    <property name="messageConverter" ref="orderMessageConverter" />
    </bean>
        
    <!--  Message Driven POJO (MDP)  通过queue的方式发送消息,一个发送一个接收-->
    <bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
    <constructor-arg>
    <bean class="com.work.activemq.OrderMessageConsumer">
    <!-- <property name="mailService" ref="mailService"/>  -->
    </bean>
    </constructor-arg>
    <!--  may be other method -->
    <property name="defaultListenerMethod" value="sendEmail"/>
    <!-- custom MessageConverter define -->
    <property name="messageConverter" ref="orderMessageConverter"/>
    </bean> <!--  listener container,MDP无需实现接口 -->
    <bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="jmsConnectionFactory"/>
    <property name="destination" ref="QUEUE"/>
    <property name="messageListener" ref="messageListener"/>
    </bean>
            <!--  listener container,MDP无需实现接口 -->
    <bean id="topicListenerContainerA" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="jmsConnectionFactory" />
    <property name="destination" ref="TOPIC" />
    <property name="messageListener" ref="topicListenerA" />
    </bean>    <bean id="topicListenerContainerB" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="jmsConnectionFactory" />
    <property name="destination" ref="TOPIC" />
    <property name="messageListener" ref="topicListenerB" />
    </bean>
    <bean id="orderNotify" class="com.work.activemq.OrderNotifyImpl">
    <property name="orderMessageProducer" ref="orderMessageProducer" />
    <property name="topicMessageProducer" ref="topicMessageProducer" />
    </bean>
    <!--  -->
    </beans>
      

  12.   

    http://www.javaeye.com/topic/234101?page=3
      

  13.   

    有没有spring配置持久化topic的的朋友?我topic配置成功,但是持久化topic的不知道咋配置啊