路由(使用Java客户端)
在先前的指南中,我们建立了一个简单德日志系统。我们可以将我们的日志信息广播到多个接收者。 绑定在之前的例子里我们已经创建绑定。你可以回顾下代码: channel.queueBind(queueName, EXCHANGE_NAME, "");
A binding is a relationship between an exchange and a queue. This can be simply read as: the queue is interested in messages from this exchange.
绑定可以带上额外的路由关键字参数。为了消除对 channel.queueBind(queueName, EXCHANGE_NAME, "black");
这绑定关键字的意义取决于交易所类型。这 直接交换
我们当前的日志系统将所有消息广播到所有消费者。我们想扩展它,让其允许依据其严格的规则过滤消息。例如我们可能想让一个往硬盘中写日志消息的程序仅仅接收关键的错误,而不是将硬盘空间浪费在警告和信息的日志消息上。
我们可以使用
为了说明那个,考虑接下来结构: 多种绑定
发出日志
我们将会为我们的日志系统使用这个模型。使用 channel.exchangeDeclare(EXCHANGE_NAME, "direct"); 我们准备发送一个消息: channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
为了简化这个事情,我们保证这 订阅
接收消息如先前那样工作,有一个例外,我们会把每一个我们感兴趣的 String queueName = channel.queueDeclare().getQueue(); for(String severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, severity); } 把它们放在一起
public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws java.io.IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String severity = getSeverity(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes()); System.out.println(" [x] Sent '" + severity + "':'" + message + "'"); channel.close(); connection.close(); } //.. }
java.lang.InterruptedException { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1){ System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]"); System.exit(1); } for(String severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, severity); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println(" [x] Received '" + routingKey + "':'" + message + "'"); } } }
如平常那样编译(看指南第一部分,编译和类路径的建议)。为了方便,当我们运行实例是,我们现在使用一个环境变量 $ java -cp $CP ReceiveLogsDirect warning error > logs_from_rabbit.log 如果你想在你的屏幕上看所有的日志信息,打开一个新的终端并键入: $ java -cp $CP ReceiveLogsDirect info warning error [*] Waiting for logs. To exit press CTRL+C 例如,为了发布一个错误日志信息,仅需要键入: $ java -cp $CP EmitLogDirect error "Run. Run. Or it will explode." [x] Sent 'error':'Run. Run. Or it will explode.' EmitLogDirect.java source和ReceiveLogsDirect.java source的所有源代码。
阅览指南第五部分,查看如何根据一个模式来监听消息。 转载请保留固定链接: https://linuxeye.com/Linux/RabbitMQ.html |