zoukankan      html  css  js  c++  java
  • SpringBoot+RabbitMq

    代码整合

    • maven依赖
           <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.56</version>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
            </dependency>
    
    • yml配置
    server:
      port: 8021
    spring:
      #配置rabbitMq 服务器
      rabbitmq:
        host: 192.168.100.120
        port: 5672
        username: test
        password: test
        #Vhost
        virtual-host: testRabbit
    
    • 配置DirectConfig
    @Configuration
    public class DirectConfig {
        /**
         * 交换机
         */
        public static final String DESTINATION_NAME = "rabbitMq_direct";
        /**
         * 队列名称
         */
        public static final String SMS_QUEUE = "Sms_msg";
        public static final String EMAIL_QUEUE = "Email_msg";
        //RouteKey
        public static final String SMS_ROUTINGKEY = "sms";
        public static final String EMAIL_ROUTINGKEY = "email";
        //配置队列
        @Bean
        public Queue smsDirectQueue() {
            return new Queue(SMS_QUEUE, true);
        }
        @Bean
        public Queue emailDirectQueue() {
            return new Queue(EMAIL_QUEUE, true);
        }
        //配置交换机
        @Bean
        public DirectExchange directExchange() {
            return new DirectExchange(DESTINATION_NAME);
        }
        //交换机与队列绑定
        @Bean
        Binding smsBindingDirect() {
            return BindingBuilder.bind(smsDirectQueue()).to(directExchange()).with(SMS_ROUTINGKEY);
        }
        @Bean
        Binding emailBindingDirect() {
            return BindingBuilder.bind(emailDirectQueue()).to(directExchange()).with(EMAIL_ROUTINGKEY);
        }
    }
    
    • 发送方配置
    @RestController
    @Slf4j
    public class DirectSendController {
        @Autowired
        private RabbitTemplate template;
    
        @GetMapping("/sendSms")
        private String sendSms(@RequestParam("msg") String message) throws Exception {
            User user = new User();
            user.setId(UUID.randomUUID().toString().replace("-", ""));
            user.setPassword("sendSms");
            user.setUsername("sendSms");
            user.setMsg(message);
            String userJson = JSON.toJSONString(user);
            log.info("sendSms:{}", userJson);
            //发送的时候需要指定队列  指定的交换机,指定的ROUTINGKEY
            template.convertAndSend(DirectConfig.DESTINATION_NAME, DirectConfig.SMS_ROUTINGKEY, userJson);
            return "OK,sendSms:" + message;
        }
    
        @GetMapping("/sendEmail")
        private String sendEmail(@RequestParam("msg") String message) throws Exception {
            User user = new User();
            user.setId(UUID.randomUUID().toString().replace("-", ""));
            user.setPassword("sendEmail");
            user.setUsername("sendEmail");
            user.setMsg(message);
            String userJson = JSON.toJSONString(user);
            log.info("sendEmail:{}", userJson);
            //发送的时候需要指定队列  指定的交换机,指定的ROUTINGKEY
            template.convertAndSend(DirectConfig.DESTINATION_NAME, DirectConfig.EMAIL_ROUTINGKEY, user);
            return "OK,sendEmail:" + message;
        }
    }
    
    • 消费者
    @Component
    @Slf4j
    public class DirectConsumer {
        @RabbitListener(queues = DirectConfig.SMS_QUEUE)
        public void sms_msg(Message message) throws IOException {
            System.out.println("sms_msg消费者收到消息 : "+new String(message.getBody(),"UTF-8"));
        }
        @RabbitListener(queues = DirectConfig.EMAIL_QUEUE)
        public void email_msg(User user) {
            System.out.println("email_msg消费者收到消息 : "+JSON.toJSONString(user));
        }
    }
    

    在这里插入图片描述
    其他的几种方式都类似。

    消息的手动签收、消息退回、消息的回调

    • yml增加
       # 是否开启消息确认机制
        publisher-confirms: true
        # 开启消息发送到队列失败返回
        publisher-returns: true
    
    • 增加RabbitMq配置
    @Configuration
    public class RabbitMqConfig {
        /**
         * 使用SimpleMessageListenerContainer容器设置消费队列监听
         * 不使用@RabbitListener注解
         */
    //    @Bean
    //    public SimpleMessageListenerContainer simpleMessageListenerContainer() {
    //        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
    //        simpleMessageListenerContainer.setConnectionFactory(connectionFactory());
    //        simpleMessageListenerContainer.setMessageListener(new MessageListener() {
    //            @Override
    //            public void onMessage(Message message) {
    //
    //            }
    //        });
    //        //simpleMessageListenerContainer.setQueueNames("","");
    //        //impleMessageListenerContainer.addQueueNames();
    //        //手动确认
    //        simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    //        return simpleMessageListenerContainer;
    //    }
    
        /**
         * @return
         * @RabbitListener注解指定目标方法来作为消费消息的方法,通过注解参数指定所监听的队列或者Binding。使用@RabbitListener可以设置一个自己明确默认值的RabbitListenerContainerFactory对象。
         */
        @Bean
        public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
            SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory =
                    new SimpleRabbitListenerContainerFactory();
            //这个connectionFactory就是我们自己配置的连接工厂直接注入进来
            simpleRabbitListenerContainerFactory.setConnectionFactory(connectionFactory);
            //这边设置消息确认方式由自动确认变为手动确认
            simpleRabbitListenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
            //设置消息预取数量
    //        simpleRabbitListenerContainerFactory.setPrefetchCount(1);
            return simpleRabbitListenerContainerFactory;
        }
    
        //如果是单例的
    
        /**
         * 每个rabbitTemplate方法只可以有一个回调,不然会报错 only one ConfirmCallback is supported by each RabbitTemplate,解决办法是配成多利的
         *
         * @param connectionFactory
         * @return
         */
        @Bean
    //    @Scope("prototype")
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            RabbitTemplate template = new RabbitTemplate(connectionFactory);
            //成功回调
            template.setConfirmCallback(new Callback());
            // 开启mandatory模式(开启失败回调)
            template.setMandatory(true);
            //失败回调
            template.setReturnCallback(new Callback());
    
            return template;
        }
    
    • 增加回调类
    public class Callback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.out.println("ConfirmCallback:     " + "相关数据:" + correlationData);
            System.out.println("ConfirmCallback:     " + "确认情况:" + ack);
            System.out.println("ConfirmCallback:     " + "原因:" + cause);
            if (ack) {
                System.out.println("消息发送确认成功");
            } else {
                System.out.println("消息发送确认失败");
            }
        }
    
        @Override
        public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
            //失败的回调
            try {
                System.out.println("ReturnCallback:     " + "消息:" + new String(message.getBody(), "UTF-8"));
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
            System.out.println("ReturnCallback:     " + "回应码:" + replyCode);
            System.out.println("ReturnCallback:     " + "回应信息:" + replyText);
            System.out.println("ReturnCallback:     " + "交换机:" + exchange);
            System.out.println("ReturnCallback:     " + "路由键:" + routingKey);
    
        }
    }
    
    • 发送方增加一个CorrelationData
      每个发送的消息都需要配备一个 CorrelationData 相关数据对象,CorrelationData 对象内部只有一个 id 属性,用来表示当前消息唯一性。
            CorrelationData correlationData = new CorrelationData(id);
            template.convertAndSend(DirectConfig.DESTINATION_NAME, DirectConfig.SMS_ROUTINGKEY, userJson,correlationData);
    
    • 消费方
     @RabbitListener(queues = DirectConfig.SMS_QUEUE, containerFactory = "simpleRabbitListenerContainerFactory")
        public void sms_msg(Message message, Channel channel, @Headers Map<String, Object> headers) throws IOException {
            long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
            try {
                System.out.println("sms_msg消费者收到消息 : " + new String(message.getBody(), "UTF-8"));
                /**
                 * 手动ack
                 * deliveryTag:该消息的index
                 * multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。
                 */
                channel.basicAck(deliveryTag, false);
            } catch (Exception e) {
                //消息退回 (可以在可视化界面看到)
                //批量退回 退回之后重回消息队列 true  false的话就是丢弃这条信息,如果配置了死信队列,那这条消息会进入死信队列
                channel.basicNack(deliveryTag, false, true);
                //单条退回 channel.basicReject();
            }
        }
    
  • 相关阅读:
    SQL Server 2005 数据定义语言触发器(Data Definition Language Triggers)[翻译]
    sqlserver 存储过程例子
    微软CEO鲍尔默:科技产业终将成为经济救世主
    poj:2689用筛选法选素数求区间[L,U]的所有素数
    ZOJ Problem Set 1002 Fire Net
    去除多余括号
    模板元编程:求N的阶乘
    算法导论10.2习题
    奇数阶魔方算法
    TSQL 编程常用例子
  • 原文地址:https://www.cnblogs.com/yangk1996/p/12673830.html
Copyright © 2011-2022 走看看