zoukankan      html  css  js  c++  java
  • RabbitMQ发布/订阅模式

    1、生产者

    package com.ys.ps;

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.ys.utils.ConnectionUtil;

    /**
    * Create by YSOcean
    */
    public class Producer {
    private final static String EXCHANGE_NAME = "fanout_exchange";

    public static void main(String[] args) throws Exception {
    //1、获取连接
    Connection connection = ConnectionUtil.getConnection("192.168.146.251", 5672, "/", "guest", "guest");
    //2、声明信道
    Channel channel = connection.createChannel();
    //3、声明交换器
    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    //4、创建消息
    String message = "hello rabbitmq";
    //5、发布消息
    channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
    System.out.println("[x] Sent'" + message + "'");
    //6、关闭通道
    channel.close();
    //7、关闭连接
    connection.close();
    }
    }

    2、消费者

      消费者1:

    package com.ys.ps;

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    import com.ys.utils.ConnectionUtil;


    /**
    * Create by YSOcean
    */
    public class Consumer1 {

    private final static String QUEUE_NAME = "fanout_queue_1";

    private final static String EXCHANGE_NAME = "fanout_exchange";

    public static void main(String[] args) throws Exception{
    //1、获取连接
    Connection connection = ConnectionUtil.getConnection("192.168.146.251",5672,"/","guest","guest");
    //2、声明通道
    Channel channel = connection.createChannel();
    //3、声明队列
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    //4、绑定队列到交换机
    channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
    //同一时刻服务器只会发送一条消息给消费者
    channel.basicQos(1);
    //5、定义队列的消费者
    QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
    //6、监听队列,手动返回完成状态
    channel.basicConsume(QUEUE_NAME,false,queueingConsumer);
    //6、获取消息
    while (true){
    QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
    String message = new String(delivery.getBody());
    System.out.println(" 消费者1:" + message + "'");
    //消费者1接收一条消息后休眠10毫秒
    Thread.sleep(10);
    //返回确认状态
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
    }
    }

    }

    消费者:2

    package com.ys.ps;

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    import com.ys.utils.ConnectionUtil;


    /**
    * Create by YSOcean
    */
    public class Consumer2 {

    private final static String QUEUE_NAME = "fanout_queue_2";

    private final static String EXCHANGE_NAME = "fanout_exchange";

    public static void main(String[] args) throws Exception{
    //1、获取连接
    Connection connection = ConnectionUtil.getConnection("192.168.146.251",5672,"/","guest","guest");
    //2、声明通道
    Channel channel = connection.createChannel();
    //3、声明队列
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    //4、绑定队列到交换机
    channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
    //同一时刻服务器只会发送一条消息给消费者
    channel.basicQos(1);
    //5、定义队列的消费者
    QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
    //6、监听队列,手动返回完成状态
    channel.basicConsume(QUEUE_NAME,false,queueingConsumer);
    //6、获取消息
    while (true){
    QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
    String message = new String(delivery.getBody());
    System.out.println(" 消费者2:" + message + "'");
    //消费者2接收一条消息后休眠10毫秒
    Thread.sleep(1000);
    //返回确认状态
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
    }
    }
    }

    消费者1和消费者2都监听了被同一个交换器绑定的队列。如果消息发送到没有队列绑定的交换器时,消息将丢失,因为交换器没有存储消息的能力,消息只能存储在队列中。

  • 相关阅读:
    深入了解CSS3新特性(转)
    微小,但是美好的改变 G2 2.2发布
    可视化框架设计-数据调整
    可视化框架设计-图表类型
    可视化框架设计-数据流
    人之初,性本动
    可视化框架设计-坐标系
    可视化框架设计-视觉通道
    可视化框架设计-数据类型
    可视化框架设计-整体思路
  • 原文地址:https://www.cnblogs.com/yinzhou/p/11213639.html
Copyright © 2011-2022 走看看