zoukankan      html  css  js  c++  java
  • RabbitMQ 消费消息

    1, 创建一个 springboot 项目, 导入依赖(和生产者一致)

    2, application.properties (基础配置和生产者一致, 消费者需要再额外配置一些)

    # rabbitmq
    spring.rabbitmq.addresses=106.12.35.176:5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    spring.rabbitmq.virtual-host=/
    spring.rabbitmq.connection-timeout=15000
    
    # rabbitmq 消费者
    # 并发数
    spring.rabbitmq.listener.simple.concurrency=5
    # 最大并发数
    spring.rabbitmq.listener.simple.max-concurrency=10
    # 签收模式
    spring.rabbitmq.listener.simple.acknowledge-mode=AUTO
    # 限流, 避免同时处理大量消息导致服务器 down 机, 根据线程数来决定
    spring.rabbitmq.listener.simple.prefetch=1
    
    # 服务端口
    server.port=9002
    
    # 格式化时间
    spring.http.encoding.charset=UTF-8
    spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
    spring.jackson.time-zone=GMT+8

    3, 创建实体类, 实现序列化接口, 这个实体类要和生产者实体类一致, 因为发什么消息就应该接收什么消息

    4,  消费消息

      1) @RabbitListener 是一个强大的注解, 主要作用有二: 

        1, 监听 Queue

        2, 自动创建 exchange, queue, routing key

      2) @RabbitHandler 用于发现有消息产生就立即触发方法来消费

    import org.springframework.amqp.rabbit.annotation.Exchange;
    import org.springframework.amqp.rabbit.annotation.Queue;
    import org.springframework.amqp.rabbit.annotation.QueueBinding;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.messaging.handler.annotation.Headers;
    import org.springframework.messaging.handler.annotation.Payload;
    import org.springframework.stereotype.Component;
    
    import com.ictpaas.pojo.Order;
    
    @Component
    public class OrderRevicer {
        
        /**
         * 消费消息
         * @param order    消息内容
         * @param headers    消息 properties
         */
        @RabbitListener(bindings = @QueueBinding(
                    value = @Queue(value = "order-queue", durable = "true"),
                    exchange = @Exchange(name = "order-exchange", durable = "true", type = "topic"),
                    key = "order.#"
                )
        )
        @RabbitHandler
        public void onOrderMsg(@Payload Order order, @Headers Map<String, Object> headers) {
            System.out.println("-------- 收到消息, 开始消费 ---------");
            System.out.println("订单 ID : " + order.getId());
            System.out.println("消息 ID : " + order.getMsgId());
        };
    }

    5, 测试

    开发中一般会通过注解来创建 exchange, queue, routing key,

    先启动消费者服务来创建所需 rabbitMQ 组件, 因为要监听 queue 所以消费者服务不能停止, 一直要处于启动状态

    调用生产者服务来生成消息并发送

    结果会每生产一条消息, 消费者服务就会立马打印出相关信息

  • 相关阅读:
    ZOJ 2158 Truck History
    Knight Moves (zoj 1091 poj2243)BFS
    poj 1270 Following Orders
    poj 2935 Basic Wall Maze (BFS)
    Holedox Moving (zoj 1361 poj 1324)bfs
    ZOJ 1083 Frame Stacking
    zoj 2193 Window Pains
    hdu1412{A} + {B}
    hdu2031进制转换
    openjudge最长单词
  • 原文地址:https://www.cnblogs.com/huanggy/p/9695934.html
Copyright © 2011-2022 走看看