Java连接ActiveMQ

发表时间:2017-07-19 22:04:51 浏览量( 44 ) 留言数( 0 )



、新建java项目测试



pom.xml需要导入的包文件

           <dependency>

                <groupId>org.apache.activemq</groupId>

                <artifactId>activemq-core</artifactId>

                <version>5.7.0</version>

           </dependency>



    

           <!-- spring-jms -->

           <dependency>

                <groupId>org.springframework</groupId>

                <artifactId>spring-jms</artifactId>

                <version>${spring.version}</version>

           </dependency>

           <dependency>

                <groupId>org.apache.xbean</groupId>

                <artifactId>xbean-spring</artifactId>

                <version>4.5</version>

           </dependency>


发送方,有两种方式,队列和主题

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"

     xmlns:tx="http://www.springframework.org/schema/tx" xmlns:security="http://www.springframework.org/schema/security"

     xmlns:p="http://www.springframework.org/schema/p"

     xsi:schemaLocation="http://www.springframework.org/schema/security http://www.springframework.org/schema/security/spring-security-3.0.xsd

           http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd

           http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd

           http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd">

     <!-- 扫描文件(自动将servicec层注入) -->

     <context:component-scan base-package="com.hnair.scheduleplatform.appsend.web.appsend.test">

     </context:component-scan>

     <!-- 配置JMS连接工厂 -->

     <bean id="flightChangeCachingConnectionFactory"     class="org.springframework.jms.connection.CachingConnectionFactory">

           <!-- Session缓存数量 -->

           <property name="sessionCacheSize" value="10" />

           <!-- 接收者ID  

接收者ID,用于Topic订阅者的永久订阅 clientId 作为客户端的标识,连接同一服务的客户端不能拥有相同的

-->

           <!-- <property name="clientId" value="client_119" /> -->

           <property name="targetConnectionFactory">

                <bean class="org.apache.activemq.ActiveMQConnectionFactory">

                     <property name="brokerURL" value="tcp://localhost:61616" /> 

                   <property name="userName" value="liubao" /> 

                   <property name="password" value="123456" /> 

                </bean>

           </property>

     </bean>

     <!-- 定义JmsTemplate的Queue类型 -->

     <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">

           <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->

           <constructor-arg ref="flightChangeCachingConnectionFactory" />

           <!-- 非pub/sub模型(发布/订阅),即队列模式 -->

           <property name="pubSubDomain" value="false" />

     </bean>

     <!-- 定义JmsTemplate的Topic类型 -->

     <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">

           <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->

           <constructor-arg ref="flightChangeCachingConnectionFactory" />

           <!-- pub/sub模型(发布/订阅) -->

           <property name="pubSubDomain" value="true" />

     </bean>

</beans>



Java实现

队列

@Component("queueSender")

public class QueueSender {

      

      

      @Autowired

      @Qualifier("jmsQueueTemplate")   //注释的是队列

      private JmsTemplate jmsTemplate;//通过@Qualifier修饰符来注入对应的bean

      

      /**

       * 发送一条消息到指定的队列(目标)

       * @param queueName 队列名称

       * @param message 消息内容

       */

      public void send(String queueName,final String message){

                  jmsTemplate.send(queueName, new MessageCreator() {

                  @Override

                  public Message createMessage(Session session) throws JMSException {

                        return session.createTextMessage(message);

                  }

            });

      }

      

}


主题

@Component("topicSender")

public class TopicSender {

      

      @Autowired

      @Qualifier("jmsTopicTemplate")    //注释的是主题发布

      private JmsTemplate jmsTemplate;

      

      /**

       * 发送一条消息到指定的队列(目标)

       * @param queueName 队列名称

       * @param message 消息内容

       */

      public void send(String topicName,final String message){

            jmsTemplate.send(topicName, new MessageCreator() {

                  @Override

                  public Message createMessage(Session session) throws JMSException {

                        return session.createTextMessage(message);

                  }

            });

      }

}


6、客户端监听器


分别可以定义监听队列还是主题,也可以写上需要监听那个队列名称

<beans xmlns="http://www.springframework.org/schema/beans"

    xmlns:context="http://www.springframework.org/schema/context"

    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"

    xmlns:jms="http://www.springframework.org/schema/jms"

    xsi:schemaLocation="http://www.springframework.org/schema/beans

        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd   

        http://www.springframework.org/schema/context   

        http://www.springframework.org/schema/context/spring-context-4.0.xsd

        http://www.springframework.org/schema/jms

        http://www.springframework.org/schema/jms/spring-jms-4.0.xsd

        http://activemq.apache.org/schema/core

        http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">


    <context:property-placeholder location="classpath:sysconfig/activemq.properties" />

    <bean id="qReceiver" class="com.outsideasy.activemq.service.QReceiver"/>


    <!-- ActiveMQ 连接工厂 -->

     <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->

    <!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码-->

    <amq:connectionFactory id="amqConnectionFactory"

        brokerURL="tcp://localhost:61618" userName="admin" password="topsun" /> 


    <!-- Spring Caching连接工厂 -->

     <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory 

     org.springframework.jms.connection.CachingConnectionFactory

     org.apache.activemq.pool.PooledConnectionFactory

     --> 

    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">

        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> 

          <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>

          <!-- 同上,同理 -->

        <!-- <constructor-arg ref="amqConnectionFactory" /> -->

        <!-- Session缓存数量 -->

        <property name="sessionCacheSize" value="100" />

        <!-- 接收者ID,用于Topic订阅者的永久订阅-->

        <property name="clientId" value="client-C" /> 

    </bean>

      <!-- 消息消费者 start-->

    <!-- 定义Queue监听器   只有一个会受到信息 -->

    <jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto">

        <jms:listener destination="test.queue" ref="queueReceiver1"/>

        <jms:listener destination="test.queue" ref="queueReceiver2"/>

    </jms:listener-container>

    <!-- 定义Topic监听器  两个都会受到信息 -->

    <jms:listener-container destination-type="topic" container-type="default" connection-factory="connectionFactory" acknowledge="auto">

        <jms:listener destination="test.topic" ref="topicReceiver1"/>

        <jms:listener destination="test.topic" ref="topicReceiver2"/>

    </jms:listener-container>

    <!-- 消息消费者 end -->


实现类:

@Component

public class QReceiver implements MessageListener{


    @Override

    public void onMessage(Message message) {

        try {

            /*System.out.println("QueueReceiver2接收到消息:"+((TextMessage)message).getText());*/

            ObjectMessage objmsg=(ObjectMessage) message;

            PurchaseOrderSender purchaseOrderSender=(PurchaseOrderSender)objmsg.getObject(); //此消息对象和发送者的对象路径相同

            System.out.println("QueueReceiver1接收到消息:"+purchaseOrderVo.getPur_order_id());

            System.out.println("****:"+purchaseOrderVo.toString());

        } catch (JMSException e) {

            e.printStackTrace();

        }

    }


}