RabbitMQ和spring的整合

发表时间:2017-12-01 09:25:53 浏览量( 38 ) 留言数( 0 )

学习目标:

1、了解使用spring整合rabbit


学习过程:

一般我们会使用spring整合RabbitMQ,这样会使得RabbitMQ更易于使用。具体步骤如下:

一、导入相关包

<dependency>
			<groupId>com.rabbitmq</groupId>
			<artifactId>amqp-client</artifactId>
			<version>4.3.0</version>
		</dependency>

		<dependency>
			<groupId>org.springframework.amqp</groupId>
			<artifactId>spring-rabbit</artifactId>
			<version>1.7.5.RELEASE</version>
		</dependency>

二、新建配置

一般的fanout,topic、derect等等的exchange配置也比较简单,直接看配置文件即可。

	<!-- 连接配置 -->
	<rabbit:connection-factory id="connectionFactory"
		host="${mq.host}" username="${mq.username}" password="${mq.password}"
		port="${mq.port}" virtual-host="${mq.vhost}" />


	<rabbit:admin id="rabbitAdmin" connection-factory="connectionFactory" />




	<!-- spring template声明 -->
	<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
		message-converter="jsonMessageConverter" />

	<!-- 消息对象json转换类 -->
	<bean id="jsonMessageConverter"
		class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />

	<!--申明一个消息队列Queue -->

	<rabbit:queue id="test_queue_key" name="test_queue_key"
		durable="true" auto-delete="false" exclusive="false" />

	<rabbit:queue id="oa1.document.pro" name="document.exchange"
		durable="true" auto-delete="false" exclusive="false" />

	<rabbit:queue id="oa2.document.pro" name="document.exchange"
		durable="true" auto-delete="false" exclusive="false" />

	<!-- -->
	<rabbit:direct-exchange name="test-mq-exchange"
		durable="true" auto-delete="false" id="test-mq-exchange">
		<rabbit:bindings>
			<!-- key 就是routing key -->
			<rabbit:binding queue="test_queue_key" key="test_queue_key" />
		</rabbit:bindings>
	</rabbit:direct-exchange>


	<rabbit:fanout-exchange name="hello_fanout_exchange"
		durable="true" auto-delete="false" id="hello_fanout_exchange">
		<rabbit:bindings>
			<rabbit:binding queue="oa1.document.pro"></rabbit:binding>
			<rabbit:binding queue="oa2.document.pro"></rabbit:binding>
		</rabbit:bindings>
	</rabbit:fanout-exchange>

	<rabbit:topic-exchange name="hello_topic_exchange"
		id="hello_topic_exchange" durable="true" auto-delete="false">
		<rabbit:bindings>
			<rabbit:binding queue="oa1.document.pro" pattern="#.document.pro"></rabbit:binding>
			<rabbit:binding queue="oa2.document.pro" pattern="#.document.pro"></rabbit:binding>
		</rabbit:bindings>
	</rabbit:topic-exchange>


	<!-- 监听器 -->
	<rabbit:listener-container
		connection-factory="connectionFactory" acknowledge="auto">
		<rabbit:listener queues="test_queue_key" ref="queueListenter" />
	</rabbit:listener-container>

	<rabbit:listener-container
		connection-factory="connectionFactory" acknowledge="auto">
		<rabbit:listener queues="oa1.document.pro" ref="documentOneLintenter" />
	</rabbit:listener-container>

	<rabbit:listener-container
		connection-factory="connectionFactory" acknowledge="auto">
		<rabbit:listener queues="oa2.document.pro" ref="documentTwoLintenter" />
	</rabbit:listener-container>

三、监听处理类

上面的配置使用了几个队列的监听器

@Component
public class QueueListenter implements MessageListener {

	public void onMessage(Message msg) {
		try {
			System.out.println("我是QueueListenter:" + msg.toString());
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

}

四、测试

@RunWith(value = SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "classpath*:application-mq.xml",
		"classpath*:application-content.xml" })
public class RabbitTest {

	@Autowired
	private RabbitTemplate amqpTemplate;
	
	@Test
	public void testDirectExchange() {

		//direct-exchange不需要第一個參數
		amqpTemplate.convertAndSend("", "test_queue_key", "liubao");

	}
	
	@Test
	public void testFanoutExchange() {

		//FanoutExchange广播需要要routing key
		amqpTemplate.convertAndSend("hello_fanout_exchange", "", "FanoutExchange");
		
		Scanner scanner=new Scanner(System.in);
		scanner.next();

	}
	
	@Test
	public void testTopicExchange() {

		//FanoutExchange广播需要要routing key
		amqpTemplate.convertAndSend("hello_topic_exchange", "all.document.pro", "testTopicExchange");

	}

	

}


5、通过程序建立队列等

配置文件需要配置rabbitAdmin

<rabbit:admin id="rabbitAdmin" connection-factory="connectionFactory" />

测试代码如下:

	@Autowired
	private RabbitAdmin rabbitAdmin;
	@Test
	public void test() {

	
		// BindingBuilder.bind(queue).to(directExchange).with(queueName);//将queue绑定到exchange
		// rabbitAdmin.declareBinding(binding);//声明绑定关系源代码有这些方法:

		FanoutExchange fanoutExchange = new FanoutExchange("goods", true, true);

		rabbitAdmin.declareExchange(fanoutExchange);

		Queue queue1 = new Queue("hello1", true, false, true);
		Queue queue2 = new Queue("hello2", true, false, true);
		Queue queue3 = new Queue("hello3", true, false, true);

		Binding binding1 = BindingBuilder.bind(queue1).to(fanoutExchange);
		Binding binding2 = BindingBuilder.bind(queue2).to(fanoutExchange);
		Binding binding3 = BindingBuilder.bind(queue3).to(fanoutExchange);

		rabbitAdmin.declareQueue(queue1);
		rabbitAdmin.declareQueue(queue2);
		rabbitAdmin.declareQueue(queue3);

		rabbitAdmin.declareBinding(binding1);
		rabbitAdmin.declareBinding(binding2);
		rabbitAdmin.declareBinding(binding3);

		amqpTemplate.convertAndSend("goods", "", "liubao");

	}