zoukankan      html  css  js  c++  java
  • spring boot 整合 RabbitMq (注解)

    1、增加rabbitmq的依赖包

    <!-- ampq 依赖包 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
     </dependency>

    2、application.yaml文件中配置

    spring:
      rabbitmq:
          host: localhost
          port: 5672
          username: admin
          password: admin
          publisher-confirms: true
          virtual-host: /

    3、RabbitMq的工厂连接和模板创建

    @Configuration
    public class RabbitConfig
    {
        @Value("${spring.rabbitmq.host}")
        private String host;
    
        @Value("${spring.rabbitmq.port}")
        private String port;
    
        @Value("${spring.rabbitmq.username}")
        private String username;
    
        @Value("${spring.rabbitmq.password}")
        private String password;
    
        @Value("${spring.rabbitmq.publisher-confirms}")
        private Boolean publisherConfirms;
    
        @Value("${spring.rabbitmq.virtual-host}")
        private String virtualHost;
    
        //创建工厂连接
        @Bean
        public ConnectionFactory connectionFactory(){
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
            connectionFactory.setAddresses(this.host);
            connectionFactory.setUsername(this.username);
            connectionFactory.setPassword(this.password);
            connectionFactory.setVirtualHost(this.virtualHost);
            connectionFactory.setPublisherConfirms(this.publisherConfirms); //必须要设置
            return connectionFactory;
        }
    
        //rabbitmq的模板配置
        @Bean
        @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) //必须是prototype类型
        public RabbitTemplate rabbitTemplate() {
            RabbitTemplate template = new RabbitTemplate(this.connectionFactory());
            //template.setConfirmCallback(); 设置消息确认
            //template.setReturnCallback();
            return template;
        }
    }

    4、创建交换机、创建队列、绑定交换机和队列

    @Configuration
    public class RabbitExchangeConfig
    {
        //直连交换机
        @Bean
        public DirectExchange defaultExchange() {
            return new DirectExchange(RabbitConstant.EXCHANGE_NAME); 
        }
    
        //队列
        @Bean
        public Queue queue() {
            return QueueBuilder.durable(RabbitConstant.QUEUE_NAME).build();
        }
    
        //绑定
        @Bean
        public Binding binding() {
            return BindingBuilder.bind(this.queue()).to(this.defaultExchange()).with(RabbitConstant.ROUTING_KEY);
        }
    }

    5、消费者

    @Component
    public class RabbitSender implements RabbitTemplate.ConfirmCallback
    {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        public void sendMsg(String content) {
            CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
            this.rabbitTemplate.setConfirmCallback(this); //rabbitTemplate如果为单例的话,那回调就是最后设置的内容
            this.rabbitTemplate.convertAndSend(RabbitConstant.EXCHANGE_NAME,
                    RabbitConstant.ROUTING_KEY, content, correlationId);
        }
    
        //回调
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.out.println(" 回调id:" + correlationData);
            if (ack) {
                System.out.println("消息成功消费");
            } else {
                System.out.println("消息消费失败:" + cause);
            }
        }
    }

    6、消费者

    @Configuration
    public class RabbitReceive
    {
        @Autowired
        private RabbitConfig rabbitConfig;
    
        @Autowired
        private RabbitExchangeConfig rabbitExchangeConfig;
    
        @Bean
        public SimpleMessageListenerContainer messageContainer() {
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(rabbitConfig.connectionFactory());
            container.setQueues(rabbitExchangeConfig.queue()); //设置要监听的队列
            container.setExposeListenerChannel(true);
            container.setMaxConcurrentConsumers(1);
            container.setConcurrentConsumers(1);
            container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认
            container.setMessageListener(new ChannelAwareMessageListener() {
                    @Override
                    public void onMessage(Message message, Channel channel) throws Exception {
                        byte[] body = message.getBody();
                        System.out.println("receive msg : " + new String(body));
                        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费
                }
            });
            return container;
        }
    }
  • 相关阅读:
    .NET牛人应该知道些什么
    秋梦
    感谢你走进我的生命里
    漂流有感
    QQ在跳舞
    青春无痕
    名人只是传说,你我才是传奇
    祝福紫秋
    可以用在任何人身上:百战百胜人生10大成功秘诀
    c#发送邮件
  • 原文地址:https://www.cnblogs.com/yufeng218/p/8075941.html
Copyright © 2011-2022 走看看