RPC远程方法调用

发表时间:2017-12-04 14:28:51 浏览量( 25 ) 留言数( 0 )

学习目标:

1、通过对Rabbit的官方入门教程了解Rabbit的消息发送。


学习过程:

    应为rabbit的稳定性,所以我们使用rabbit实现RPC也是可以的,不过一般来说很少这样使用了,这里我们主要使用属性配置,在请求和返回的使用都带上Correlation Id,这样就可以标识本次请求。

attcontent/544c82a6-9fed-4b4b-a1cc-02b40760176c.png

一、服务提供者

public class RPCServer {

	private static final String RPC_QUEUE_NAME = "rpc_queue";

	public static void main(String[] argv) {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");

		Connection connection = null;
		try {
			connection = factory.newConnection();
			final Channel channel = connection.createChannel();

			channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);

			channel.basicQos(1);

			System.out.println(" [x] Awaiting RPC requests");

			DefaultConsumer consumer = new DefaultConsumer(channel) {
				@Override
				public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
						byte[] body) throws IOException {
					AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
							.correlationId(properties.getCorrelationId()).build();

					String response = "";

					try {
						String message = new String(body, "UTF-8");
						int n = Integer.parseInt(message);

						System.out.println(" [.] fib(" + message + ")");
						response += fib(n);
					} catch (RuntimeException e) {
						System.out.println(" [.] " + e.toString());
					} finally {
						
						//计算完毕通过MQ把结果发回去。
						channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));

						//确实消费成功
						channel.basicAck(envelope.getDeliveryTag(), false);

						// RabbitMq consumer worker thread notifies the RPC server owner thread
						synchronized (this) {
							this.notify();
						}
					}
				}

				
			};

			channel.basicConsume(RPC_QUEUE_NAME, false, consumer);

			// Wait and be prepared to consume the message from RPC client.
			while (true) {
				synchronized (consumer) {
					try {
						consumer.wait();
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if (connection != null)
				try {
					connection.close();
				} catch (IOException _ignore) {
				}
		}
	}
	
	private static int fib(int n) {
	    if (n == 0) return 0;
	    if (n == 1) return 1;
	    return fib(n-1) + fib(n-2);
	}
	
}

二、服务消费者

public class RPCClient {

	private Connection connection;
	private Channel channel;
	private String requestQueueName = "rpc_queue";
	private String replyQueueName;

	public RPCClient() throws IOException, TimeoutException {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");

		connection = factory.newConnection();
		channel = connection.createChannel();

		replyQueueName = channel.queueDeclare().getQueue();
	}

	public String call(String message) throws IOException, InterruptedException {
		final String corrId = UUID.randomUUID().toString();

		AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName)
				.build();

		channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));

		//使用BlockingQueue阻塞线程,达到同步的效果
		final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);

		channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
					byte[] body) throws IOException {
				if (properties.getCorrelationId().equals(corrId)) {
					response.offer(new String(body, "UTF-8"));//
				}
			}
		});

		return response.take();//等待结构
	}

	public void close() throws IOException {
		connection.close();
	}

}

三、测试

public class Run {
	public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
		RPCClient fibonacciRpc = new RPCClient();
		String result = fibonacciRpc.call("4");
		System.out.println("fib(4) is " + result);
	}
}