zoukankan      html  css  js  c++  java
  • java/rabbitmp发布订阅示例(转)

    原文:http://www.cnblogs.com/tinmh/p/6134875.html

    发布/订阅模式即生产者将消息发送给多个消费者。

    下面介绍几个在发布/订阅模式中的关键概念--

    1. Exchanges (转发器)

    可能原来我们都是基于一个队列发送和接收消息。现在介绍一下完整的消息传递模式。

    Rabbitmq消息模式的核心理念是:生产者没有直接发送任何消息到队列。实际上,生产者都不知道这个消息是发送给哪个队列的。相反,生产者只能发送消息给转发器。

    转发器一方面接收生产者的消息,另一方面向队列推送消息。

    转发器必须清楚的指导如何处理接收到的消息,需要附加队列吗?附加几个?或者是否丢弃?这些规则通过转发器的类型进行定义。类型有:Direct、Topic、Headers、Fanout。

    这里我们关注最后一个。现在让我们创建一个Fanout类型的转发器,定义如下:

    channel.exchangeDeclare("logs", "fanout"); 

    2. Nameless exchange(匿名转发)

    之前我们对转发器可能一无所知,但还是可以将消息发送到队列,那是因为我们用了默认的转发器,转发器名为空字符串" "。之前我们发布消息的代码是:

    channel.basicPublish("", "hello", null, message.getBytes());

    第一个参数就是转发器的名字。空字符串表示匿名的转发器。消息通过队列的routingKey路由到指定的队列中去。

    现在我们就可以指定转发器的名字了;

    channel.basicPublish( "logs", "", null, message.getBytes());

    3.Temporary queues(临时队列)

    当我们需要为消费者指定同一个队列的时候,队列有名字对我们来说是非常重要的。

    但有时我们并不关心这个问题,我们只对当前流动的消息感兴趣。这个时候我们采取以下两个步骤解决:

    1)当我们连接到RabbitMQ时,需要一个新的空队列,为此我们需要创建一个随机名字的空队列,或者更好的。让服务器选好一个随机名字的空队列直接给我们。

    2)一旦消费者断开连接,队列将自动删除。

    这里我们提供一个无参的queueDeclare()方法,创建一个非持久化、独立的、自动删除的队列,且名字是随机生成的。

    String queueName = channel.queueDeclare().getQueue();

    queueName是一个随机队列名。

    4.Bindings(绑定)

    我们已经创建了一个广播的转发器和一个随机队列。现在需要告诉转发器转发消息到队列。这个关联转发器和队列我们叫他Binding。

    channel.queueBind(queueName, "logs", "");

    这样,转发器附加到日志队列上去。

    下面是一个关于日志系统的完整例子:

    发送端代码(生产者)EmitLog.java

    复制代码
    package sublog;
    
    import java.io.IOException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class EmitLog {
        private final static String EXCHANGE_NAME = "logs";
    
        public static void main(String[] args) throws IOException {
            /**
             * 创建连接连接到MabbitMQ
             */
            ConnectionFactory factory = new ConnectionFactory();
            // 设置MabbitMQ所在主机ip或者主机名
            factory.setHost("115.159.181.204");
            factory.setPort(5672);
            factory.setUsername("admin");
            factory.setPassword("admin");
            // 创建一个连接
            Connection connection = factory.newConnection();  
            // 创建一个频道  
            Channel channel = connection.createChannel();  
    
            // 指定转发——广播
            ((com.rabbitmq.client.Channel) channel).exchangeDeclare(EXCHANGE_NAME, "fanout");
    
            for(int i=0;i<3;i++){
                // 发送的消息
                String message = "Hello World!";
                ((com.rabbitmq.client.Channel) channel).basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
                System.out.println(" [x] Sent '" + message + "'");
            }
    
            // 关闭频道和连接
            channel.close();
            connection.close();
        }
    }
    复制代码

    消费者1 ReceiveLogs2Console.java

    复制代码
    package sublog;
    
    import java.io.IOException;
    
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.AMQP.Connection;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    
    public class ReceiveLogs2Console {
        private static final String EXCHANGE_NAME = "logs";
    
        public static void main(String[] argv) throws IOException, InterruptedException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("115.159.181.204");
            factory.setPort(5672);
            factory.setUsername("admin");
            factory.setPassword("admin");
            // 打开连接和创建频道,与发送端一样
            com.rabbitmq.client.Connection connection =factory.newConnection();
            final Channel channel =  connection.createChannel();
    
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            // 声明一个随机队列
            String queueName = channel.queueDeclare().getQueue();
            channel.queueBind(queueName, EXCHANGE_NAME, "");
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
            
            // 创建队列消费者
            final Consumer consumer = new DefaultConsumer(channel) {
                  @Override
                  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println(" [x] Received '" + message + "'");
                  }
                };
                channel.basicConsume(queueName, true, consumer);
        }
    }
    复制代码

    消费者2 ReceiveLogs2File.java

    复制代码
    package sublog;
    
    
    import java.io.File;
    import java.io.FileNotFoundException;
    import java.io.FileOutputStream;
    import java.io.IOException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.AMQP.Connection;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    public class ReceiveLogs2File {
        private static final String EXCHANGE_NAME = "logs";
    
        public static void main(String[] argv) throws IOException, InterruptedException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("115.159.181.204");
            factory.setPort(5672);
            factory.setUsername("admin");
            factory.setPassword("admin");
            // 打开连接和创建频道,与发送端一样
            com.rabbitmq.client.Connection connection = factory.newConnection();
            final Channel channel = connection.createChannel();
    
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            // 声明一个随机队列
            String queueName = channel.queueDeclare().getQueue();
            channel.queueBind(queueName, EXCHANGE_NAME, "");
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
            
            // 创建队列消费者
            final Consumer consumer = new DefaultConsumer(channel) {
                  @Override
                  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    print2File(message);
    //                System.out.println(" [x] Received '" + message + "'");
                  }
                };
                channel.basicConsume(queueName, true, consumer);
        }
        
        private static void print2File(String msg) {
            try {
                String dir = ReceiveLogs2File.class.getClassLoader().getResource("").getPath();
                String logFileName = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
                File file = new File(dir, logFileName + ".log");
                FileOutputStream fos = new FileOutputStream(file, true);
                fos.write((new SimpleDateFormat("HH:mm:ss").format(new Date())+" - "+msg + "
    ").getBytes());
                fos.flush();
                fos.close();
            } catch (FileNotFoundException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }  
    }
    复制代码

    我们用1个生产者用于发送log消息,2个消费者,一个用于打印接收到的消息,另一个除了打印接收到的消息还写有日志信息的文件。

    生产者声明了一个广播模式的转换器,订阅这个转换器的消费者都可以收到每一条消息。可以看到在生产者中,没有声明队列。这也验证了之前说的。生产者其实只关心exchange,至于exchange会把消息转发给哪些队列,并不是生产者关心的。

    2个消费者,一个打印日志,一个写入文件,除了这2个地方不一样,其他地方一模一样。也是声明一下广播模式的转换器,而队列则是随机生成的,消费者实例启动后,会创建一个随机实例,这个在管理页面可以看到(如图)。而实例关闭后,随机队列也会自动删除。最后将队列与转发器绑定。

    注:运行的时候要先运行2个消费者实例,然后在运行生产者实例。否则获取不到实例。

  • 相关阅读:
    《C++必知必会》读书笔记
    看美图是一种享受
    C指针-指向另一指针的指针
    顺序队列基本操作
    Using Windows Live Writer to write first offline blog
    堆和栈的区别 [摘录]
    进程与线程的区别
    《Effective C#》
    析构函数virtual与非virtual区别
    常用SQL语句技法
  • 原文地址:https://www.cnblogs.com/boshen-hzb/p/6839434.html
Copyright © 2011-2022 走看看