zoukankan      html  css  js  c++  java
  • RabbitMQ 消费消息

    1, 创建一个 springboot 项目, 导入依赖(和生产者一致)

    2, application.properties (基础配置和生产者一致, 消费者需要再额外配置一些)

    # rabbitmq
    spring.rabbitmq.addresses=106.12.35.176:5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    spring.rabbitmq.virtual-host=/
    spring.rabbitmq.connection-timeout=15000
    
    # rabbitmq 消费者
    # 并发数
    spring.rabbitmq.listener.simple.concurrency=5
    # 最大并发数
    spring.rabbitmq.listener.simple.max-concurrency=10
    # 签收模式
    spring.rabbitmq.listener.simple.acknowledge-mode=AUTO
    # 限流, 避免同时处理大量消息导致服务器 down 机, 根据线程数来决定
    spring.rabbitmq.listener.simple.prefetch=1
    
    # 服务端口
    server.port=9002
    
    # 格式化时间
    spring.http.encoding.charset=UTF-8
    spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
    spring.jackson.time-zone=GMT+8

    3, 创建实体类, 实现序列化接口, 这个实体类要和生产者实体类一致, 因为发什么消息就应该接收什么消息

    4,  消费消息

      1) @RabbitListener 是一个强大的注解, 主要作用有二: 

        1, 监听 Queue

        2, 自动创建 exchange, queue, routing key

      2) @RabbitHandler 用于发现有消息产生就立即触发方法来消费

    import org.springframework.amqp.rabbit.annotation.Exchange;
    import org.springframework.amqp.rabbit.annotation.Queue;
    import org.springframework.amqp.rabbit.annotation.QueueBinding;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.messaging.handler.annotation.Headers;
    import org.springframework.messaging.handler.annotation.Payload;
    import org.springframework.stereotype.Component;
    
    import com.ictpaas.pojo.Order;
    
    @Component
    public class OrderRevicer {
        
        /**
         * 消费消息
         * @param order    消息内容
         * @param headers    消息 properties
         */
        @RabbitListener(bindings = @QueueBinding(
                    value = @Queue(value = "order-queue", durable = "true"),
                    exchange = @Exchange(name = "order-exchange", durable = "true", type = "topic"),
                    key = "order.#"
                )
        )
        @RabbitHandler
        public void onOrderMsg(@Payload Order order, @Headers Map<String, Object> headers) {
            System.out.println("-------- 收到消息, 开始消费 ---------");
            System.out.println("订单 ID : " + order.getId());
            System.out.println("消息 ID : " + order.getMsgId());
        };
    }

    5, 测试

    开发中一般会通过注解来创建 exchange, queue, routing key,

    先启动消费者服务来创建所需 rabbitMQ 组件, 因为要监听 queue 所以消费者服务不能停止, 一直要处于启动状态

    调用生产者服务来生成消息并发送

    结果会每生产一条消息, 消费者服务就会立马打印出相关信息

  • 相关阅读:
    我的2018:OCR、实习和秋招
    【OCR技术系列之六】文本检测CTPN的代码实现
    【OCR技术系列之五】自然场景文本检测技术综述(CTPN, SegLink, EAST)
    如何免费使用谷歌搜索
    CUDA编程之快速入门
    我在北京实习的四个月
    在C++98基础上学习C++11新特性
    Linux编程之线程池的设计与实现(C++98)
    ASP.NET Core中使用IOC三部曲(三.采用替换后的Autofac来实现AOP拦截)
    ASP.NET Core文件上传与下载(多种上传方式)
  • 原文地址:https://www.cnblogs.com/huanggy/p/9695934.html
Copyright © 2011-2022 走看看