ActiveMQ的ACK机制

发表时间:2017-07-19 22:06:18 浏览量( 22 ) 留言数( 0 )





1.写RedeliveryPolicy配置文件

      既然ctiveMQ提供了消息重发机制(RedeliveryPolicy),那么我们只需要在Spring ActiveMQ 整合(一): 一个简单的demo,测试消息的发送与接收  的基础上, 完善activemq的xml配置文件即可。

[html] view plain copy

      

<span style="font-family:Microsoft YaHei;">      </span><!-- 定义ReDelivery(重发机制)机制 ,重发时间间隔是100毫秒,最大重发次数是3次 http://www.kuqin.com/shuoit/20140419/339344.html -->  

    <bean id="activeMQRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">  

        <!--是否在每次尝试重新发送失败后,增长这个等待时间 -->  

        <property name="useExponentialBackOff" value="true"></property>  

        <!--重发次数,默认为6次   这里设置为1次 -->  

        <property name="maximumRedeliveries" value="1"></property>  

        <!--重发时间间隔,默认为1秒 -->  

        <property name="initialRedeliveryDelay" value="1000"></property>  

        <!--第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value -->  

        <property name="backOffMultiplier" value="2"></property>  

        <!--最大传送延迟,只在useExponentialBackOff为true时有效(V5.5),假设首次重连间隔为10ms,倍数为2,那么第   

            二次重连时间间隔为 20ms,第三次重连时间间隔为40ms,当重连时间间隔大的最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。 -->  

        <property name="maximumRedeliveryDelay" value="1000"></property>  

    </bean>  


这一段配置就是消息重发机制(RedeliveryPolicy)的实现,  每一行都有详细的注释。

2.引用RedeliveryPolicy的配置:

[html] view plain copy

       <!--创建连接工厂 -->  

<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  

    <property name="brokerURL" value="tcp://localhost:61616"></property>  

    <property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy" />  <!-- 引用重发机制 -->  

</bean>  


在链接工厂里面引入刚才配置好的RedeliveryPolicy。


配置完成后,消息一旦发送失败,就会按照消息重发机制配置好的重发次数,时间间隔等因素,再次发送。


可以看到配置文件中:<property name="maximumRedeliveries" value="1"></property>

设置的重发一次,但是控制台打印出来两个条消息“2”。这就是第一次发送,失败后她又发送了一次,也就是两次。


以上就是消息重发机制的所有实现。





ACK

继上篇文章之后,我消息发送失败后,可以重新发送了。但是至于别人有没有收到,这就不得而而而知了。

这时候就要用到ACK确认机制了。



1.ACK机制:


      ACK (Acknowledgement),即确认字符,在数据通信中,接收站发给发送站的一种传输类控制字符。表示发来的数据已确认接收无误。

     JMS API中约定了Client端可以使用四种ACK_MODE,在javax.jms.Session接口中:

         AUTO_ACKNOWLEDGE = 1    自动确认

         CLIENT_ACKNOWLEDGE = 2    客户端手动确认   

         DUPS_OK_ACKNOWLEDGE = 3    自动批量确认

         SESSION_TRANSACTED = 0    事务提交并确认


         此外AcitveMQ补充了一个自定义的ACK_MODE:    INDIVIDUAL_ACKNOWLEDGE = 4    单条消息确认

     我们在开发JMS应用程序的时候,会经常使用到上述ACK_MODE,其中"INDIVIDUAL_ACKNOWLEDGE "只有ActiveMQ支持,当然开发者也可以使用它. ACK_MODE描述了Consumer与broker确认消息的方式(时机),比如当消息被Consumer接收之后,Consumer将在何时确认消息。对于broker而言,只有接收到ACK指令,才会认为消息被正确的接收或者处理成功了,通过ACK,可以在consumer与Broker之间建立一种简单的“担保”机制. 

   

     Client端指定了ACK_MODE,但是在Client与broker在交换ACK指令的时候,还需要告知ACK_TYPE,ACK_TYPE表示此确认指令的类型,不同的ACK_TYPE将传递着消息的状态,broker可以根据不同的ACK_TYPE对消息进行不同的操作。

 

     比如Consumer消费消息时出现异常,就需要向broker发送ACK指令,ACK_TYPE为"REDELIVERED_ACK_TYPE",那么broker就会重新发送此消息。在JMS API中并没有定义ACT_TYPE,因为它通常是一种内部机制,并不会面向开发者。ActiveMQ中定义了如下几种ACK_TYPE(参看MessageAck类):


         DELIVERED_ACK_TYPE = 0    消息"已接收",但尚未处理结束

         STANDARD_ACK_TYPE = 2    "标准"类型,通常表示为消息"处理成功",broker端可以删除消息了

         POSION_ACK_TYPE = 1    消息"错误",通常表示"抛弃"此消息,比如消息重发多次后,都无法正确处理时,消息将会被删除或者DLQ(死信队列)

         REDELIVERED_ACK_TYPE = 3    消息需"重发",比如consumer处理消息时抛出了异常,broker稍后会重新发送此消息

         INDIVIDUAL_ACK_TYPE = 4    表示只确认"单条消息",无论在任何ACK_MODE下    

         UNMATCHED_ACK_TYPE = 5    BROKER间转发消息时,接收端"拒绝"消息

    到目前为止,我们已经清楚了大概的原理: Client端在不同的ACK_MODE时,将意味着在不同的时机发送ACK指令,每个ACK Command中会包含ACK_TYPE,那么broker端就可以根据ACK_TYPE来决定此消息的后续操作. 

    上面这一段呢就是:ACK_MODE与ACK_TYPE.   想要更多的了解,有兴趣的可以自行去看看,这里由于本文的重点是怎么实现,具体原理,可以自行了解一下。

2.实现消息确认:


1.首先在配置文件中配置你的应答方式是什么。我这里配置的是点对点的消息确认方式。



[html] view plain copy

<!-- 消息监听容器 消息接收监听器用于异步接收消息 -->  

    <bean id="jmsContainerOne" class="org.springframework.jms.listener.DefaultMessageListenerContainer">  

        <property name="connectionFactory" ref="connectionFactory" />  

        <property name="destination" ref="destinationOne" />  

        <property name="messageListener" ref="consumerMessageListenerOfOne" />  

[html] view plain copy

<pre name="code" class="html">                <!-- 应答模式是 INDIVIDUAL_ACKNOWLEDGE http://blog.csdn.net/yueding_h/article/details/54944254 --></pre>  <property name="sessionAcknowledgeMode" value="4"></property> </bean>  

<pre></pre>  

<br>  

<h4>2.在接收的代码中,手动确认 </h4>  

  :给消息发送者一个回应“我收到你的消息了,你可以出队了”,activemq在没有配置应答方式的时候,他是默认确认的。但是需求的各不同,就需要有不同的配置

[html] view plain copy

//只要被确认后  就会出队,接受失败没有确认成功,会在原队列里面  

an style="font-family:Microsoft YaHei;">     </span>textMsg.acknowledge();  


以上就完成了activemq的消息确认。