zoukankan      html  css  js  c++  java
  • (转)RabbitMQ学习之路由(java)

    http://blog.csdn.net/zhu_tianwei/article/details/40887755

    参考:http://blog.csdn.NET/lmj623565791/article/details/37669573

    使用direct类型实现:消息会被推送至绑定键(binding key)和消息发布附带的选择键(routing key)完全匹配的队列。例如:将不同的日志发送到不同的消费端。

    1.发送日志端SendLogDirect.Java

    [java] view plain copy
     
     print?
    1. package cn.slimsmart.rabbitmq.demo.route;  
    2.   
    3. import java.util.Random;  
    4. import java.util.UUID;  
    5.   
    6. import com.rabbitmq.client.AMQP;  
    7. import com.rabbitmq.client.Channel;  
    8. import com.rabbitmq.client.Connection;  
    9. import com.rabbitmq.client.ConnectionFactory;  
    10.   
    11. //随机发送6条随机类型(routing key)的日志给转发器~~  
    12. public class SendLogDirect {  
    13.       
    14.      //交换名称  
    15.      private static final String EXCHANGE_NAME = "ex_logs_direct";    
    16.      //日志分类  
    17.      private static final String[] SEVERITIES = { "info", "warning", "error" };    
    18.           
    19.     public static void main(String[] args) throws Exception {  
    20.         //创建连接和频道    
    21.         ConnectionFactory factory = new ConnectionFactory();    
    22.         factory.setHost("192.168.101.174");  
    23.         // 指定用户 密码  
    24.         factory.setUsername("admin");  
    25.         factory.setPassword("admin");  
    26.         // 指定端口  
    27.         factory.setPort(AMQP.PROTOCOL.PORT);  
    28.         Connection connection = factory.newConnection();    
    29.         Channel channel = connection.createChannel();    
    30.         // 声明转发器的类型    
    31.         channel.exchangeDeclare(EXCHANGE_NAME, "direct");    
    32.     
    33.         //发送6条消息    
    34.         for (int i = 0; i < 6; i++)    
    35.         {    
    36.             String severity = getSeverity();    
    37.             String message = severity + "_log :" + UUID.randomUUID().toString();    
    38.             // 发布消息至转发器,指定routingkey    
    39.             channel.basicPublish(EXCHANGE_NAME, severity, null, message  .getBytes());    
    40.             System.out.println(" [x] Sent '" + message + "'");    
    41.         }    
    42.     
    43.         channel.close();    
    44.         connection.close();    
    45.     }  
    46.       
    47.     /**  
    48.      * 随机产生一种日志类型  
    49.      *   
    50.      * @return  
    51.      */    
    52.     private static String getSeverity()    
    53.     {    
    54.         Random random = new Random();    
    55.         int ranVal = random.nextInt(3);    
    56.         return SEVERITIES[ranVal];    
    57.     }    
    58.   
    59. }  

    2.接收日志端ReceiveLogsDirect.java

    [java] view plain copy
     
     print?
    1. package cn.slimsmart.rabbitmq.demo.route;  
    2.   
    3. import java.util.Random;  
    4.   
    5. import com.rabbitmq.client.AMQP;  
    6. import com.rabbitmq.client.Channel;  
    7. import com.rabbitmq.client.Connection;  
    8. import com.rabbitmq.client.ConnectionFactory;  
    9. import com.rabbitmq.client.QueueingConsumer;  
    10.   
    11. //接收端随机设置一个日志严重级别(binding_key)。。。  
    12. public class ReceiveLogsDirect {  
    13.       
    14.     private static final String EXCHANGE_NAME = "ex_logs_direct";    
    15.     private static final String[] SEVERITIES = { "info", "warning", "error" };    
    16.   
    17.     public static void main(String[] args) throws Exception {  
    18.           // 创建连接和频道    
    19.         ConnectionFactory factory = new ConnectionFactory();    
    20.         factory.setHost("192.168.101.174");  
    21.         // 指定用户 密码  
    22.         factory.setUsername("admin");  
    23.         factory.setPassword("admin");  
    24.         // 指定端口  
    25.         factory.setPort(AMQP.PROTOCOL.PORT);  
    26.         Connection connection = factory.newConnection();    
    27.         Channel channel = connection.createChannel();    
    28.         // 声明direct类型转发器    
    29.         channel.exchangeDeclare(EXCHANGE_NAME, "direct");    
    30.     
    31.         String queueName = channel.queueDeclare().getQueue();    
    32.         String severity = getSeverity();    
    33.         // 指定binding_key    
    34.         channel.queueBind(queueName, EXCHANGE_NAME, severity);    
    35.         System.out.println(" [*] Waiting for "+severity+" logs. To exit press CTRL+C");    
    36.     
    37.         QueueingConsumer consumer = new QueueingConsumer(channel);    
    38.         channel.basicConsume(queueName, true, consumer);    
    39.     
    40.         while (true)    
    41.         {    
    42.             QueueingConsumer.Delivery delivery = consumer.nextDelivery();    
    43.             String message = new String(delivery.getBody());    
    44.     
    45.             System.out.println(" [x] Received '" + message + "'");    
    46.         }    
    47.     }    
    48.     
    49.     /**  
    50.      * 随机产生一种日志类型  
    51.      *   
    52.      * @return  
    53.      */    
    54.     private static String getSeverity()    
    55.     {    
    56.         Random random = new Random();    
    57.         int ranVal = random.nextInt(3);    
    58.         return SEVERITIES[ranVal];    
    59.     }    
    60. }  

    启动几个接收端服务,再启动发送端,接收端对应绑定的键收到对应的消息。

    注:发送消息时可以设置routing_key,接收队列与转发器间可以设置binding_key,接收者接收与binding_key与routing_key相同的消息。

  • 相关阅读:
    [ Algorithm ] N次方算法 N Square 动态规划解决
    [ Algorithm ] LCS 算法 动态规划解决
    sql server全文索引使用中的小坑
    关于join时显示no join predicate的那点事
    使用scvmm 2012的动态优化管理群集资源
    附加数据库后无法创建发布,error 2812 解决
    浅谈Virtual Machine Manager(SCVMM 2012) cluster 过载状态检测算法
    windows 2012 r2下安装sharepoint 2013错误解决
    sql server 2012 数据引擎任务调度算法解析(下)
    sql server 2012 数据引擎任务调度算法解析(上)
  • 原文地址:https://www.cnblogs.com/telwanggs/p/7124609.html
Copyright © 2011-2022 走看看