Topics模式

发表时间:2017-12-04 14:18:56 浏览量( 18 ) 留言数( 0 )

学习目标:

1、通过对Rabbit的官方入门教程了解Rabbit的消息发送。


学习过程:

  和direct类似,不过更加强大,发送端不只按固定的routing key发送消息,而是按字符串“匹配”发送,接收端同样如此。

attcontent/eb5d6bc3-28eb-47e8-8468-dade7fa46c8e.png

一、生产者

public class EmitLogTopic {
	private static final String EXCHANGE_NAME = "topic_logs";

	public static void main(String[] argv) {
		Connection connection = null;
		Channel channel = null;
		try {
			ConnectionFactory factory = new ConnectionFactory();
			factory.setHost("localhost");

			connection = factory.newConnection();
			channel = connection.createChannel();

			channel.exchangeDeclare(EXCHANGE_NAME, "topic");

			String routingKey = getRouting(argv);
			String message = getMessage(argv);

			channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
			System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");

		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if (connection != null) {
				try {
					connection.close();
				} catch (Exception ignore) {
				}
			}
		}
	}

	private static String getRouting(String[] strings) {
		if (strings.length < 1)
			return "anonymous.info";
		return strings[0];
	}

	private static String getMessage(String[] strings) {
		if (strings.length < 2)
			return "Hello World!";
		return joinStrings(strings, " ", 1);
	}

	private static String joinStrings(String[] strings, String delimiter, int startIndex) {
		int length = strings.length;
		if (length == 0)
			return "";
		if (length < startIndex)
			return "";
		StringBuilder words = new StringBuilder(strings[startIndex]);
		for (int i = startIndex + 1; i < length; i++) {
			words.append(delimiter).append(strings[i]);
		}
		return words.toString();
	}
}

1、exchange的type为topic

2、发送消息的routing key不是固定的单词,而是匹配字符串,如"*.lu.#",*匹配一个单词,#匹配0个或多个单词。

二、消费者

public class ReceiveLogsTopic {
	private static final String EXCHANGE_NAME = "topic_logs";

	public static void main(String[] argv) {
		Connection connection = null;
		Channel channel = null;
		try {
			ConnectionFactory factory = new ConnectionFactory();
			factory.setHost("localhost");

			connection = factory.newConnection();
			channel = connection.createChannel();

			channel.exchangeDeclare(EXCHANGE_NAME, "topic");
			String queueName = channel.queueDeclare().getQueue();

			if (argv.length < 1) {
				System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
				System.exit(1);
			}

			for (String bindingKey : argv) {
				channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
			}

			System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

			QueueingConsumer consumer = new QueueingConsumer(channel);
			channel.basicConsume(queueName, true, consumer);

			while (true) {
				QueueingConsumer.Delivery delivery = consumer.nextDelivery();
				String message = new String(delivery.getBody());
				String routingKey = delivery.getEnvelope().getRoutingKey();

				System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if (connection != null) {
				try {
					connection.close();
				} catch (Exception ignore) {
				}
			}
		}
	}
}

三、测试

可以参考一下官方网站的各种不同的topic的写法,其实也还是比较简单的了。

To receive all the logs:

java -cp $CP ReceiveLogsTopic "#"

To receive all logs from the facility "kern":

java -cp $CP ReceiveLogsTopic "kern.*"

Or if you want to hear only about "critical" logs:

java -cp $CP ReceiveLogsTopic "*.critical"

You can create multiple bindings:

java -cp $CP ReceiveLogsTopic "kern.*" "*.critical"