远程过程调用(RPC)(使用Java客户端)在指南的第二部分,我们学习了如何使用工作队列将耗时的任务分布到多个工作者中。 但是假如我们需要调用远端计算机的函数,等待结果呢?好吧,这又是另一个故事了。这模式通常被称为远程过程调用或RPC。 在这部分,我们将会使用RabbitMQ构建一个RPC系统:一个客户端和一个可扩展的RPC服务器。由于我们还没有值得分散的耗时任务,我们将会创建一个虚拟的RPC服务,用来返回Fibonacci(斐波纳契数列)。 用户接口
为了说明RPC服务如何使用,我们将会创建一个简单德客户端类。它会暴露一个叫 FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient(); String result = fibonacciRpc.call("4"); System.out.println( "fib(4) is " + result);
思想中煎熬,考虑下接下来的建议: 给你的系统加上文档,让组件之间的依赖项清晰可见的。
处理错误事件。当RPC服务器很久没有响应了,客户端应该如何响应? 回收队列
一般来说在RabbitMQ上做RPC是容易的。一个客户端发送一个请求消息,一个服务器返回响应消息。为了接受到响应,我们需要再请求中带上一个 callbackQueueName = channel.queueDeclare().getQueue(); BasicProperties props = new BasicProperties .Builder() .replyTo(callbackQueueName) .build(); channel.basicPublish("", "rpc_queue", props, message.getBytes()); // ... then code to read a response message from the callback_queue ...
我们需要这个新的引用: import com.rabbitmq.client.AMQP.BasicProperties; 相关性ID (原:Correlation Id)
在当前方法中我们建议为每一个RPC请求创建一个回收队列。这个效率十分低下的,但幸运的是有一个更好的方式- 让我们为每一个客户端创建一个单一的回收队列。 你可能会问,为什么我们要忽略哪些在回收队列中未知的消息,而不是以一个错误结束?因为在服务器竟态条件下,这种情况是可能的。RPC服务器发送给 我们答应之后,在发送一个确认消息之前,就死掉了,虽然这种可能性不大,但是它依旧存在可能。如果这事情发生了,RPC服务器重启之后,将会再一次处理请 求。这就是为什么我们要温和地处理重复的响应,这RPC理想情况下是幂等的。 摘要
把所有的放在一起斐波那契任务: private static int fib(int n) throws Exception { if (n == 0) return 0; if (n == 1) return 1; return fib(n-1) + fib(n-2); }
我们声明我们的斐波那契函数。它假定一个合法的正整数做为输入参数。(不要期望这个可以处理大量数字,它可能是最慢的递归实现了)。 private static final String RPC_QUEUE_NAME = "rpc_queue"; ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(RPC_QUEUE_NAME, false, consumer); System.out.println(" [x] Awaiting RPC requests"); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); BasicProperties props = delivery.getProperties(); BasicProperties replyProps = new BasicProperties .Builder() .correlationId(props.getCorrelationId()) .build(); String message = new String(delivery.getBody()); int n = Integer.parseInt(message); System.out.println(" [.] fib(" + message + ")"); String response = "" + fib(n); channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes()); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }
这服务器代码是相当简单明了的:
我们RPC客户端 private Connection connection; private Channel channel; private String requestQueueName = "rpc_queue"; private String replyQueueName; private QueueingConsumer consumer; public RPCClient() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); replyQueueName = channel.queueDeclare().getQueue(); consumer = new QueueingConsumer(channel); channel.basicConsume(replyQueueName, true, consumer); } public String call(String message) throws Exception { String response = null; String corrId = java.util.UUID.randomUUID().toString(); BasicProperties props = new BasicProperties .Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); channel.basicPublish("", requestQueueName, props, message.getBytes()); while (true) { QueueingConsumer.Delivery delivery =consumer.nextDelivery(); if (delivery.getProperties().getCorrelationId().equals(corrId)) { response = new String(delivery.getBody()); break; } } return response; } public void close() throws Exception { connection.close(); }
The client code is slightly more involved: 制造客户端请求: RPCClient fibonacciRpc = new RPCClient(); System.out.println(" [x] Requesting fib(30)"); String response = fibonacciRpc.call("30"); System.out.println(" [.] Got '" + response + "'"); fibonacciRpc.close();
现在是时候让我们回顾下我们RPCClient.java和RPCServer.java中的全部例子的源码(包含基本的异常处理)。 $ javac -cp rabbitmq-client.jar RPCClient.java RPCServer.java 我们的RPC服务现在准备好了,我们启动着服务器: $ java -cp $CP RPCServer [x] Awaiting RPC requests 为了请求一个斐波那契数字,运行客户端: $ java -cp $CP RPCClient [x] Requesting fib(30)
现在的设计不仅仅可以实现一个RPC服务,并且它还有几项重要的优势:
我们的代码一直是十分简单的,不能试着解决更复杂(但是重要)的问题,比如:
转载请保留固定链接: https://linuxeye.com/Linux/RabbitMQ.html |