zoukankan      html  css  js  c++  java
  • RabbitMQ发布/订阅模式

    1、生产者

    package com.ys.ps;

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.ys.utils.ConnectionUtil;

    /**
    * Create by YSOcean
    */
    public class Producer {
    private final static String EXCHANGE_NAME = "fanout_exchange";

    public static void main(String[] args) throws Exception {
    //1、获取连接
    Connection connection = ConnectionUtil.getConnection("192.168.146.251", 5672, "/", "guest", "guest");
    //2、声明信道
    Channel channel = connection.createChannel();
    //3、声明交换器
    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    //4、创建消息
    String message = "hello rabbitmq";
    //5、发布消息
    channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
    System.out.println("[x] Sent'" + message + "'");
    //6、关闭通道
    channel.close();
    //7、关闭连接
    connection.close();
    }
    }

    2、消费者

      消费者1:

    package com.ys.ps;

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    import com.ys.utils.ConnectionUtil;


    /**
    * Create by YSOcean
    */
    public class Consumer1 {

    private final static String QUEUE_NAME = "fanout_queue_1";

    private final static String EXCHANGE_NAME = "fanout_exchange";

    public static void main(String[] args) throws Exception{
    //1、获取连接
    Connection connection = ConnectionUtil.getConnection("192.168.146.251",5672,"/","guest","guest");
    //2、声明通道
    Channel channel = connection.createChannel();
    //3、声明队列
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    //4、绑定队列到交换机
    channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
    //同一时刻服务器只会发送一条消息给消费者
    channel.basicQos(1);
    //5、定义队列的消费者
    QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
    //6、监听队列,手动返回完成状态
    channel.basicConsume(QUEUE_NAME,false,queueingConsumer);
    //6、获取消息
    while (true){
    QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
    String message = new String(delivery.getBody());
    System.out.println(" 消费者1:" + message + "'");
    //消费者1接收一条消息后休眠10毫秒
    Thread.sleep(10);
    //返回确认状态
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
    }
    }

    }

    消费者:2

    package com.ys.ps;

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    import com.ys.utils.ConnectionUtil;


    /**
    * Create by YSOcean
    */
    public class Consumer2 {

    private final static String QUEUE_NAME = "fanout_queue_2";

    private final static String EXCHANGE_NAME = "fanout_exchange";

    public static void main(String[] args) throws Exception{
    //1、获取连接
    Connection connection = ConnectionUtil.getConnection("192.168.146.251",5672,"/","guest","guest");
    //2、声明通道
    Channel channel = connection.createChannel();
    //3、声明队列
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    //4、绑定队列到交换机
    channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
    //同一时刻服务器只会发送一条消息给消费者
    channel.basicQos(1);
    //5、定义队列的消费者
    QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
    //6、监听队列,手动返回完成状态
    channel.basicConsume(QUEUE_NAME,false,queueingConsumer);
    //6、获取消息
    while (true){
    QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
    String message = new String(delivery.getBody());
    System.out.println(" 消费者2:" + message + "'");
    //消费者2接收一条消息后休眠10毫秒
    Thread.sleep(1000);
    //返回确认状态
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
    }
    }
    }

    消费者1和消费者2都监听了被同一个交换器绑定的队列。如果消息发送到没有队列绑定的交换器时,消息将丢失,因为交换器没有存储消息的能力,消息只能存储在队列中。

  • 相关阅读:
    20210815 图论模拟赛
    20210813 杂项の模拟赛
    20210812dp模拟赛
    20210811数据结构
    html问题记录20180514
    Oracle导出表空间的创建语句、导入、导出dmp文件
    浮动子div撑开父div的几种方法、给select赋值、zoom样式的含义、实现select下拉框readonly
    Fidder教程
    JQuery插件的写法和规范
    box-sizing position calc() @media
  • 原文地址:https://www.cnblogs.com/yinzhou/p/11213639.html
Copyright © 2011-2022 走看看