通过工作队列分配任务

发表时间:2017-12-04 11:19:49 浏览量( 18 ) 留言数( 0 )

学习目标:

1、通过对Rabbit的官方入门教程了解Rabbit的消息发送。


学习过程:

    有很多时候我们生产者效率比较高,而消费者比较慢,为了提供运行消息,我们可以配置多个消费者,把任务均匀得分配给消费者,如果生产者非常块,而消费者相对比较慢,为了不让消费者宕机,我们也可以控制消费者每次可以同时消费多少条任务。

    消费者在处理消息时,有可能会宕机,这时候这个消息可能还没有完全处理完毕,这样就会导致这条消息没有处理了,为了解决这个问题,Rabbit可以调整消息确认得模式为手动确认,你可以在消费者处理完毕后才确认消息,这时Rabbit才会把这条消息从队列中删除,否则MQ会把这个消息重新发送给其他得消费者,还有一点就是生命得队列需要持久化处理    

如下图:

attcontent/b1461cea-1ee3-4865-960c-44d77f57dbe3.png


1、生产者

public class Producer {
	
	private final static String QUEUE_NAME = "work-queue";
	
	public static void main(String[] args) throws IOException, TimeoutException {

		// 建立链接
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();

		//一次性得发送10条消息
		for(int i=0;i<10;i++) {
			String message = getMessage(args)+i;
			channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
			System.out.println(" [x] Sent '" + message + "'");
		}
		
	}

	private static String getMessage(String[] strings) {
		if (strings.length < 1)
			return "Hello.World!";
		return joinStrings(strings, " ");
	}

	private static String joinStrings(String[] strings, String delimiter) {
		int length = strings.length;
		if (length == 0)
			return "";
		StringBuilder words = new StringBuilder(strings[0]);
		for (int i = 1; i < length; i++) {
			words.append(delimiter).append(strings[i]);
		}
		return words.toString();
	}

}

2、消费者

public class Consumer {

	private final static String QUEUE_NAME = "work-queue";

	public static void main(String[] args) throws IOException, TimeoutException {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		Connection connection = factory.newConnection();
		final Channel channel = connection.createChannel();

		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
		
		
		//使用了channel.basicQos(1)保证在接收端一个消息没有处理完时不会接收另一个消息,即接收端发送了ack后才会接收下一个消息。
		//在这种情况下发送端会尝试把消息发送给下一个not busy的接收端。
		channel.basicQos(1); // accept only one unack-ed message at a time (see below)

		final DefaultConsumer consumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
					byte[] body) throws IOException {
				String message = new String(body, "UTF-8");

				System.out.println(" [x] Received '" + message + "'");
				try {
					doWork(message);
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				} finally {
					 channel.basicAck(envelope.getDeliveryTag(), false);//手动确认消息
					System.out.println(" [x] Done");
				}
			}
		};
		boolean autoAck = false; // 不自动确认,需要配置上面得channel.basicAck手动确认后,消息确认后才会
		channel.basicConsume(QUEUE_NAME, autoAck, consumer);

	}

	
	/**
	 * 模拟处理任务,使用sleep耗时的操作
	 * @param task
	 * @throws InterruptedException
	 */
	private static void doWork(String task) throws InterruptedException {
		for (char ch : task.toCharArray()) {
			if (ch == '.')
				Thread.sleep(1000);
		}
	}

}

1、channel.queueDeclare(QUEUE_NAME, true, false, false, null);声明消息队列,并使消息队列durable

2、在使用channel.basicConsume接收消息时使autoAck为false,即不自动会发ack,由channel.basicAck()在消息处理完成后发送消息。

3、使用了channel.basicQos(1)保证在接收端一个消息没有处理完时不会接收另一个消息,即接收端发送了ack后才会接收下一个消息。在这种情况下发送端会尝试把消息发送给下一个not busy的接收端。


3、测试

当然我们可以一下子给队列发送十条消息,你可以自己简单得修改一下上面生产者得代码。

我们可以启动两个消费者端,看看输出

消费者1:

 [*] Waiting for messages. To exit press CTRL+C

 [x] Received 'Hello.World!0'

 [x] Done

 [x] Received 'Hello.World!2'

 [x] Done

 [x] Received 'Hello.World!4'

 [x] Done

 [x] Received 'Hello.World!7'

 [x] Done

 [x] Received 'Hello.World!9'

 [x] Done


消费者2

 [*] Waiting for messages. To exit press CTRL+C

 [x] Received 'Hello.World!1'

 [x] Done

 [x] Received 'Hello.World!3'

 [x] Done

 [x] Received 'Hello.World!5'

 [x] Done

 [x] Received 'Hello.World!6'

 [x] Done

 [x] Received 'Hello.World!8'

 [x] Done