出版和订阅(使用java 客户端)在先前的指南中,我们创建了一个工作队列。这工作队列后面的假想是每一个任务都被准确的传递给工作者。在这部分我们将会做一些完全不同的事情–我们将一个消息传递给多个消费者。这部分被认知为“出版和订阅”。 为了说明这个部分,我们会建立一个简单德日志系统,它是由两个程序组成–第一个发出日志消息,第二个接收和打印它们。
在我们的日志系统中,每一个运行的接收者拷贝程序将会获得信息。通过这个方式我们可以运行一个接收者,直接的把日志记录到硬盘中;在同一时间我们可以运行另一个接收者,在屏幕上看这些日志。 交换
在先前指南部分,我们将消息发送到队列里,并从队列中接收消息。现在是时候介绍RabbitMQ中全消息模型。
一个发送消息的生产者是一个用户程序。
相反,生产者仅能将消息发送到一个交易所。一个交易所是一个非常简单的事物。在它的一遍,它从生产者那里接收消息,另一边将消息推送到队列中。这个 交换所必须清楚的知道它所接收到的消息要如何处理。是否将它附加到一个特别的队列中?是否将它附加到多个队列中?或者是否它应该被丢弃。规则的定义是由交 换类型决定的。 channel.exchangeDeclare("logs", "fanout");
这
现在,我们可以发布我们自己命名的交易所: channel.basicPublish( "logs", "", null, message.getBytes()); 临时队列
你可能会想起先前我们使用的队列是有特定的名字的(是否记得 String queueName = channel.queueDeclare().getQueue();
在这点,队列名中包含一个随机队列名。例如名字像 绑定
我们已经创建了一个 channel.queueBind(queueName, "logs", ""); 从现在开始,日志交换所将要附加消息到我们的队列中。
把所有放在一起
import java.io.IOException; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; public class EmitLog { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws java.io.IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } //... }
(EmitLog.java source)
如果队列还没有绑定到交易所上,消息将会丢失,但是这个对我们来说是ok的;如果没有消费者正在监听,我们可以安全的丢弃消息。 java.lang.InterruptedException { import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } } }
(ReceiveLogs.java source) $ javac -cp rabbitmq-client.jar EmitLog.java ReceiveLogs.java 如果你想把日志保存到文件中,仅仅打开一个控制平台,键入: $ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar ReceiveLogs > logs_from_rabbit.log 如果你想在你的屏幕上看这些日志, 新建一个终端并且运行: $ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar ReceiveLogs 当然,为了发出日志键入: $ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar EmitLog
使用 $ sudo rabbitmqctl list_bindings Listing bindings ... logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue [] logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue [] ...done.
这结果的解释是直白简单的:来自交易所的日志流向服务器安排的两个队列中。并且那确实我们所期望的。 转载请保留固定链接: https://linuxeye.com/Linux/RabbitMQ.html |