RabbitMQ和Sprin Boot的整合

发表时间:2017-12-01 09:26:22 浏览量( 55 ) 留言数( 0 )

学习目标:

1、了解使用spring boot整合rabbit


学习过程:

spring boot也已经越来越流行了。所以我们也说一下使用spring boot整合rabbit。

一、导入包

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
			<version>1.5.9.RELEASE</version>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<version>1.5.9.RELEASE</version>
			<scope>test</scope>
		</dependency>

二、配置文件

@Configuration
public class AmqpConfig {

	@Autowired
	private Environment env;

	@Bean
	@ConfigurationProperties(prefix = "spring.rabbitmq")
	public ConnectionFactory connectionFactory() {
		CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
		return connectionFactory;
	}

	@Bean
	@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
	// 必须是prototype类型
	public RabbitTemplate rabbitTemplate() {
		RabbitTemplate template = new RabbitTemplate(connectionFactory());
		return template;
	}

	@Bean
	public Queue queue() {
		return new Queue(env.getProperty("client.mq.queueName"), true); // 队列持久

	}

	@Bean
	public DirectExchange directExchange() {
		return new DirectExchange(env.getProperty("client.mq.directExchange"));
	}

	@Bean
	public TopicExchange topicExchange() {
		return new TopicExchange(env.getProperty("client.mq.topicExchange"));
	}

	@Bean
	public Binding bindingDirectExchange() {
		return BindingBuilder.bind(queue()).to(directExchange()).with(env.getProperty("client.mq.queueName"));
	}

	@Bean
	public Binding bindingTopicExchange() {
		return BindingBuilder.bind(queue()).to(topicExchange()).with(env.getProperty("client.mq.bindTopic"));
	}

	@Bean
	public SimpleMessageListenerContainer messageContainer() {
		SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
		container.setQueues(queue());// 需要监听的队列
		container.setExposeListenerChannel(true);
		container.setMaxConcurrentConsumers(1);
		container.setConcurrentConsumers(1);
		container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 设置确认模式手工确认
		container.setMessageListener(new ClientMessageListener());
		return container;
	}

}

队列的监听处理类

public class ClientMessageListener implements ChannelAwareMessageListener {
	@Override
	public void onMessage(Message message, Channel channel) throws Exception {
		byte[] body = message.getBody();
		System.out.println("receive msg : " + new String(body));
		channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 确认消息成功消费

	}
}

三、定义一个发送的工具类

@Component 
public class SendUtil {  
  
    private RabbitTemplate rabbitTemplate;  
  
    /**  
     * 构造方法注入  
     */  
    @Autowired  
    public SendUtil(RabbitTemplate rabbitTemplate) {  
        this.rabbitTemplate = rabbitTemplate;  
        rabbitTemplate.setConfirmCallback(new ConfirmCallBackListener()); //rabbitTemplate如果为单例的话,那回调就是最后设置的内容  
        rabbitTemplate.setReturnCallback(new ReturnCallBackListener()); //rabbitTemplate如果为单例的话,那回调就是最后设置的内容  
    }  
  
    public void sendMsg(String routingKey,String content,String msgId) {  
        CorrelationData correlationId = new CorrelationData(msgId);  
        rabbitTemplate.convertAndSend("",routingKey, content, correlationId);  
    }  
  
    public void sendMsg(String exChange,String routingKey,String content,String msgId) {  
        CorrelationData correlationId = new CorrelationData(msgId);  
        rabbitTemplate.convertAndSend(exChange, routingKey, content, correlationId);  
    }   
  
    
}

每一次发送我们都会有一个回调的监听类

/**
 * 成功发送到MQ确认后回调
 * @author liubao
 *
 */
@Service("confirmCallBackListener")
public class ConfirmCallBackListener implements RabbitTemplate.ConfirmCallback {

	@Override
	public void confirm(CorrelationData correlationData, boolean ack, String cause) {
		
		
		//可以在这里把公文修改成为已发送
		
		System.out.println(" 回调id:" + correlationData);
		if (ack) {
			System.out.println("成功发送到MQ");
		} else {
			System.out.println("失败:" + cause);
		}
	}
}

这个应该是发送失败

/**
 * 消息发送失败返回监听器
 * @author liubao
 *
 */
@Service("returnCallBackListener")
public class ReturnCallBackListener implements RabbitTemplate.ReturnCallback{
	@Override
	public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
		
		//发送失败,可以记录日志
		
		System.out.println(message.getMessageProperties().getConsumerQueue());
		System.out.println(message.getMessageProperties().getCorrelationIdString());
		System.out.println(replyCode);
		System.out.println(replyText);
		System.out.println(exchange);
		System.out.println(routingKey);
		
	}  
}

四、spring boot的启动类

@SpringBootApplication
@EnableAutoConfiguration
@ComponentScan(basePackages = "org.rabbitspringboot")
@EnableTransactionManagement
public class Application {
	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
		Scanner scanner=new Scanner(System.in);
		String input=scanner.nextLine();
		if("quit".equals(input)) {
			System.exit(0);
		}
	}
}

五、测试类

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = Application.class)
public class SendUtilTest {
	
	@Autowired
	private Environment env;
	@Autowired
	SendUtil sendUtil;

	
	@Test
	public void testDirectExchange() {
		//DirectExchange 
		 
		sendUtil.sendMsg("msg.0001.common","服务器单独发给客户端的。",UUID.randomUUID().toString());
	}
	
	
	@Test
	public void testTopic() {
		sendUtil.sendMsg(env.getProperty("client.mq.topicExchange"), "msg.all.common","服务器广播给所有的客户端的。",UUID.randomUUID().toString());
	}
	
}