zoukankan      html  css  js  c++  java
  • RABBITMQ/JAVA (主题)

    上篇博文中,我们进一步改良了日志系统。即使用Direct类型的转换器,使得接受者有能力进行选择性的接收日志,而非fanout那样,只能够无脑的转发。

    虽然使用Direct类型的转换器改进了日志系统。但它仍然有一定的局限性----不能根据多重条件进行路由选择。

    在我们的日志系统中,我们可能不仅仅根据严重性订阅日志,也想根据发送源订阅。你可能根据从unix工具syslog了解过这个概念,它可以根据严重性(info/warning/crit...)和设备(auth/cron/kern)转发日志。

    这将给我们更多的灵活性---我们可以订阅来自元‘cron’的致命错误日志,同事也可以订阅‘kern’的所有日志。

    为了在我们的日志系统中实现上述需求,我们需要了解更复杂的主题类型的转发器--Topic Exchange.

    Topic Exchange(主题转发器)

    发送给主题转发器的消息不能是任意设置的选择键,必须是用小数点隔开的一系列的标识符。这些标识符可以是随意,但是通常跟消息的某些特性相关联。一些合法的路由选择键比如socket.usd.nyse”,"nyse.vmw","quick.orange.rabbit",你愿意用多少单词都可以,只要不超过上限的255个字符。

    绑定键也必须以相同的格式,主题转发器的逻辑类似于direct类型的转发器。消息通过一个特定的路由键发送到所有与绑定键匹配的队列中。需要注意的是,关于绑定键有两种特殊的情况:*(星号)可以代替任意一个标识符;#(井号)可以代替零个或多个标识符。

    举个简单的例子,如下:

    上图的例子中,我们发送描述动物的消息。消息会转发给包含3个单词(2个小数点)的路由键绑定的队列中。绑定键中的第一个单词描述的是速度,第二个是颜色,第三个物种:“速度.颜色.物种”。

    路由键为“quick.orange.rabbit”的消息会被路由到2个队列中去。而“lazy.orange.elephant”的消息同样会发往2个队列。另外“quick.orange.fox” 仅仅发往第一个队列,而"lazy.brown.fox"则只发往第二个队列。“quick.brown.fox”则所有的绑定键都不匹配而被丢弃。

    如果我们违反约定,发送了只带1个或者4个标识符的选择键,像“orange”或者“quick.orange.male.rabbit”,会发生什么呢?这些消息都不匹配任何绑定,所以将被丢弃。

           另外,“lazy.orange.male.rabbit”,尽管有4个标识符,但是仍然匹配最后一个绑定键,所以会发送到第二个队列中。

           注:主题类型的转发器非常强大,可以实现其他类型的转发器。当队列绑定#绑定键,可以接受任何消息,类似于fanout转发器。当特殊字符*和#不包含在绑定键中,这个主题转发器就像一个direct类型的转发器。

     

    完整例子:

    我们将主题类型的转发器应用到日志系统中,路由格式为:“<设备>.<严重级别>”。

     EmitLogTopic.java

    package TOPIC;
    
    import java.io.IOException;
    
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    
    
    public class EmitLogTopic {
        private final static String EXCHANGE_NAME = "topic_logs";
    
        public static void main(String[] args) throws IOException {
            /**
             * 创建连接连接到MabbitMQ
             */
            ConnectionFactory factory = new ConnectionFactory();
            // 设置MabbitMQ所在主机ip或者主机名
             factory.setHost("115.159.181.204");
                factory.setPort(5672);
                factory.setUsername("admin");
                factory.setPassword("admin");
            // 创建一个连接
            Connection connection = factory.newConnection();
            // 创建一个频道
            Channel channel = connection.createChannel();
            // 指定转发——广播
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    
            //所有设备和日志级别
            String[] facilities ={"auth","cron","kern","auth.A"};
            String[] severities={"error","info","warning"};
            
            for(int i=0;i<4;i++){
                for(int j=0;j<3;j++){
                //每一个设备,每种日志级别发送一条日志消息
                String routingKey = facilities[i]+"."+severities[j%3];
                
                // 发送的消息
                String message =" Hello World!";
                //参数1:exchange name
                //参数2:routing key
                channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
                System.out.println(" [x] Sent [" + routingKey +"] : '"+ message + "'");
                }
            }
            // 关闭频道和连接
            channel.close();
            connection.close();
        }
    }

     ReceiveLogs2Console.java

    package TOPIC;
    
    import java.io.IOException;
    
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    
    public class ReceiveLogs2Console {
        private static final String EXCHANGE_NAME = "topic_logs";
    
        public static void main(String[] argv) throws IOException, InterruptedException {
            ConnectionFactory factory = new ConnectionFactory();
             factory.setHost("115.159.181.204");
                factory.setPort(5672);
                factory.setUsername("admin");
                factory.setPassword("admin");
            // 打开连接和创建频道,与发送端一样
            Connection connection = factory.newConnection();
            final Channel channel = connection.createChannel();
    
            //声明exchange
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
            // 声明一个随机队列
            String queueName = channel.queueDeclare().getQueue();
    
            String[] routingKeys ={"auth.*","*.info","#.warning"};//关注所有的授权日志、所有info和waring级别的日志
            for (String routingKey : routingKeys) {
                //关注所有级别的日志(多重绑定)
                channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
            }
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
            
            // 创建队列消费者
            final Consumer consumer = new DefaultConsumer(channel) {
                  @Override
                  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println(" [x] Received ["  + envelope.getRoutingKey() + "] :'" + message + "'");
                  }
                };
                channel.basicConsume(queueName, true, consumer);
        }
    }

     ReceiveLogs2File.java

    package TOPIC;
    import java.io.File;
    import java.io.FileNotFoundException;
    import java.io.FileOutputStream;
    import java.io.IOException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    
    
    public class ReceiveLogs2File {
        private static final String EXCHANGE_NAME = "topic_logs";
    
        public static void main(String[] argv) throws IOException, InterruptedException {
            ConnectionFactory factory = new ConnectionFactory();
             factory.setHost("115.159.181.204");
                factory.setPort(5672);
                factory.setUsername("admin");
                factory.setPassword("admin");
            // 打开连接和创建频道,与发送端一样
            Connection connection = factory.newConnection();
            final Channel channel = connection.createChannel();
    
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
            // 声明一个随机队列
            String queueName = channel.queueDeclare().getQueue();
            channel.queueBind(queueName, EXCHANGE_NAME, "");
    
            String severity="kern.error";//只关注核心错误级别的日志,然后记录到文件中去。
            channel.queueBind(queueName, EXCHANGE_NAME, severity);
            
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
            
            // 创建队列消费者
            final Consumer consumer = new DefaultConsumer(channel) {
                  @Override
                  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    //记录日志到文件:
                    print2File( "["+ envelope.getRoutingKey() + "] "+message);
                  }
                };
                channel.basicConsume(queueName, true, consumer);
        }
        
        private static void print2File(String msg) {
            try {
                String dir = ReceiveLogs2File.class.getClassLoader().getResource("").getPath();
                String logFileName = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
                File file = new File(dir, logFileName + ".log");
                FileOutputStream fos = new FileOutputStream(file, true);
                fos.write((new SimpleDateFormat("HH:mm:ss").format(new Date())+" - "+msg + "
    ").getBytes());
                fos.flush();
                fos.close();
            } catch (FileNotFoundException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }  
    }

    运行结果:

  • 相关阅读:
    阿里云Ubuntu环境搭建Docker服务
    Cocos2d-x手机游戏开发中-组合动作
    Java中将时间戳转化为Date类型
    Ubuntu14.04+eclipse下cocos2d-x3.0正式版环境的搭建
    hdu 4901 The Romantic Hero(dp)
    scikit-learn:3.4. Model persistence
    桥接模式和NAT模式差别
    JavaScript入门:004—JS凝视的写法和基本运算符
    MySQL 创建用户 与 授权
    【观点见解】解读大数据的5个误区
  • 原文地址:https://www.cnblogs.com/tinmh/p/6137949.html
Copyright © 2011-2022 走看看