zoukankan      html  css  js  c++  java
  • 30. Spring Boot ActiveMQ

    1.引入依赖

    <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    2. 配置文件设置

    spring.application.name=springboot-rabbitmq
    spring.rabbitmq.host=127.0.0.1
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    spring.rabbitmq.publisher-confirms=true
    spring.rabbitmq.virtual-host=/

    3. 配置类

    package org.springframework.boot.autoconfigure.amqp;
    import 。。。。
    @Configuration
    @ConditionalOnClass({RabbitTemplate.class, Channel.class})
    @EnableConfigurationProperties({RabbitProperties.class})
    @Import({RabbitAnnotationDrivenConfiguration.class})
    public class RabbitAutoConfiguration {
        public RabbitAutoConfiguration() {
        }
    
        @Configuration
        @ConditionalOnClass({RabbitMessagingTemplate.class})
        @ConditionalOnMissingBean({RabbitMessagingTemplate.class})
        @Import({RabbitAutoConfiguration.RabbitTemplateConfiguration.class})
        protected static class MessagingTemplateConfiguration {
            protected MessagingTemplateConfiguration() {
            }
    
            @Bean
            @ConditionalOnSingleCandidate(RabbitTemplate.class)
            public RabbitMessagingTemplate rabbitMessagingTemplate(RabbitTemplate rabbitTemplate) {
                return new RabbitMessagingTemplate(rabbitTemplate);
            }
        }
    
        @Configuration
        @Import({RabbitAutoConfiguration.RabbitConnectionFactoryCreator.class})
        protected static class RabbitTemplateConfiguration {
            private final RabbitProperties properties;
            private final ObjectProvider<MessageConverter> messageConverter;
            private final ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers;
    
            public RabbitTemplateConfiguration(RabbitProperties properties, ObjectProvider<MessageConverter> messageConverter, ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers) {
                this.properties = properties;
                this.messageConverter = messageConverter;
                this.retryTemplateCustomizers = retryTemplateCustomizers;
            }
    
            @Bean
            @ConditionalOnSingleCandidate(ConnectionFactory.class)
            @ConditionalOnMissingBean
            public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
                PropertyMapper map = PropertyMapper.get();
                RabbitTemplate template = new RabbitTemplate(connectionFactory);
                MessageConverter messageConverter = (MessageConverter)this.messageConverter.getIfUnique();
                if (messageConverter != null) {
                    template.setMessageConverter(messageConverter);
                }
    
                template.setMandatory(this.determineMandatoryFlag());
                Template properties = this.properties.getTemplate();
                if (properties.getRetry().isEnabled()) {
                    template.setRetryTemplate((new RetryTemplateFactory((List)this.retryTemplateCustomizers.orderedStream().collect(Collectors.toList()))).createRetryTemplate(properties.getRetry(), Target.SENDER));
                }
    
                properties.getClass();
                map.from(properties::getReceiveTimeout).whenNonNull().as(Duration::toMillis).to(template::setReceiveTimeout);
                properties.getClass();
                map.from(properties::getReplyTimeout).whenNonNull().as(Duration::toMillis).to(template::setReplyTimeout);
                properties.getClass();
                map.from(properties::getExchange).to(template::setExchange);
                properties.getClass();
                map.from(properties::getRoutingKey).to(template::setRoutingKey);
                properties.getClass();
                map.from(properties::getDefaultReceiveQueue).whenNonNull().to(template::setDefaultReceiveQueue);
                return template;
            }
    
            private boolean determineMandatoryFlag() {
                Boolean mandatory = this.properties.getTemplate().getMandatory();
                return mandatory != null ? mandatory : this.properties.isPublisherReturns();
            }
    
            @Bean
            @ConditionalOnSingleCandidate(ConnectionFactory.class)
            @ConditionalOnProperty(
                prefix = "spring.rabbitmq",
                name = {"dynamic"},
                matchIfMissing = true
            )
            @ConditionalOnMissingBean
            public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
                return new RabbitAdmin(connectionFactory);
            }
        }
    
        @Configuration
        @ConditionalOnMissingBean({ConnectionFactory.class})
        protected static class RabbitConnectionFactoryCreator {
            protected RabbitConnectionFactoryCreator() {
            }
    
            @Bean
            public CachingConnectionFactory rabbitConnectionFactory(RabbitProperties properties, ObjectProvider<ConnectionNameStrategy> connectionNameStrategy) throws Exception {
                PropertyMapper map = PropertyMapper.get();
                CachingConnectionFactory factory = new CachingConnectionFactory((com.rabbitmq.client.ConnectionFactory)this.getRabbitConnectionFactoryBean(properties).getObject());
                properties.getClass();
                map.from(properties::determineAddresses).to(factory::setAddresses);
                properties.getClass();
                map.from(properties::isPublisherConfirms).to(factory::setPublisherConfirms);
                properties.getClass();
                map.from(properties::isPublisherReturns).to(factory::setPublisherReturns);
                org.springframework.boot.autoconfigure.amqp.RabbitProperties.Cache.Channel channel = properties.getCache().getChannel();
                channel.getClass();
                map.from(channel::getSize).whenNonNull().to(factory::setChannelCacheSize);
                channel.getClass();
                map.from(channel::getCheckoutTimeout).whenNonNull().as(Duration::toMillis).to(factory::setChannelCheckoutTimeout);
                Connection connection = properties.getCache().getConnection();
                connection.getClass();
                map.from(connection::getMode).whenNonNull().to(factory::setCacheMode);
                connection.getClass();
                map.from(connection::getSize).whenNonNull().to(factory::setConnectionCacheSize);
                connectionNameStrategy.getClass();
                map.from(connectionNameStrategy::getIfUnique).whenNonNull().to(factory::setConnectionNameStrategy);
                return factory;
            }
    
            private RabbitConnectionFactoryBean getRabbitConnectionFactoryBean(RabbitProperties properties) throws Exception {
                PropertyMapper map = PropertyMapper.get();
                RabbitConnectionFactoryBean factory = new RabbitConnectionFactoryBean();
                properties.getClass();
                map.from(properties::determineHost).whenNonNull().to(factory::setHost);
                properties.getClass();
                map.from(properties::determinePort).to(factory::setPort);
                properties.getClass();
                map.from(properties::determineUsername).whenNonNull().to(factory::setUsername);
                properties.getClass();
                map.from(properties::determinePassword).whenNonNull().to(factory::setPassword);
                properties.getClass();
                map.from(properties::determineVirtualHost).whenNonNull().to(factory::setVirtualHost);
                properties.getClass();
                map.from(properties::getRequestedHeartbeat).whenNonNull().asInt(Duration::getSeconds).to(factory::setRequestedHeartbeat);
                Ssl ssl = properties.getSsl();
                if (ssl.isEnabled()) {
                    factory.setUseSSL(true);
                    ssl.getClass();
                    map.from(ssl::getAlgorithm).whenNonNull().to(factory::setSslAlgorithm);
                    ssl.getClass();
                    map.from(ssl::getKeyStoreType).to(factory::setKeyStoreType);
                    ssl.getClass();
                    map.from(ssl::getKeyStore).to(factory::setKeyStore);
                    ssl.getClass();
                    map.from(ssl::getKeyStorePassword).to(factory::setKeyStorePassphrase);
                    ssl.getClass();
                    map.from(ssl::getTrustStoreType).to(factory::setTrustStoreType);
                    ssl.getClass();
                    map.from(ssl::getTrustStore).to(factory::setTrustStore);
                    ssl.getClass();
                    map.from(ssl::getTrustStorePassword).to(factory::setTrustStorePassphrase);
                    ssl.getClass();
                    map.from(ssl::isValidateServerCertificate).to((validate) -> {
                        factory.setSkipServerCertificateValidation(!validate);
                    });
                    ssl.getClass();
                    map.from(ssl::getVerifyHostname).to(factory::setEnableHostnameVerification);
                }
    
                properties.getClass();
                map.from(properties::getConnectionTimeout).whenNonNull().asInt(Duration::toMillis).to(factory::setConnectionTimeout);
                factory.afterPropertiesSet();
                return factory;
            }
        }
    }

    默认配置

    @ConfigurationProperties(
        prefix = "spring.rabbitmq"
    )
    public class RabbitProperties {
        private String host = "localhost";
        private int port = 5672;
        private String username = "guest";
        private String password = "guest";
        private final RabbitProperties.Ssl ssl = new RabbitProperties.Ssl();
        private String virtualHost;
        private String addresses;
        @DurationUnit(ChronoUnit.SECONDS)
        private Duration requestedHeartbeat;
        private boolean publisherConfirms;
        private boolean publisherReturns;
        private Duration connectionTimeout;
        private final RabbitProperties.Cache cache = new RabbitProperties.Cache();
        private final RabbitProperties.Listener listener = new RabbitProperties.Listener();
        private final RabbitProperties.Template template = new RabbitProperties.Template();
        private List<RabbitProperties.Address> parsedAddresses;

    SpringBoot 启动类

    /**
     * 自动配置
     *  1、RabbitAutoConfiguration
     *  2、有自动配置了连接工厂ConnectionFactory;
     *  3、RabbitProperties 封装了 RabbitMQ的配置
     *  4、 RabbitTemplate :给RabbitMQ发送和接受消息;
     *  5、 AmqpAdmin : RabbitMQ系统管理功能组件;
     *      AmqpAdmin:创建和删除 Queue,Exchange,Binding
     *  6、@EnableRabbit +  @RabbitListener 监听消息队列的内容
     *
     */
    @EnableRabbit  //开启基于注解的RabbitMQ模式
    @SpringBootApplication
    public class Springboot02AmqpApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(Springboot02AmqpApplication.class, args);
        }
    }

    使序列化到MQ的对象消息能够以JSON的形式展现而不是乱码

    @Configuration
    public class MyAMQPConfig {
    
        @Bean
        public MessageConverter messageConverter(){
            return new Jackson2JsonMessageConverter();
        }
    }

    测试类

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class Springboot02AmqpApplicationTests {
    
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        @Autowired
        AmqpAdmin amqpAdmin;
    
        @Test
        public void createExchange(){
    
    //        amqpAdmin.declareExchange(new DirectExchange("amqpadmin.exchange"));
    //        System.out.println("创建完成");
    
    //        amqpAdmin.declareQueue(new Queue("amqpadmin.queue",true));
            //创建绑定规则
    
    //        amqpAdmin.declareBinding(new Binding("amqpadmin.queue", Binding.DestinationType.QUEUE,"amqpadmin.exchange","amqp.haha",null));
    
            //amqpAdmin.de
        }
    
        /**
         * 1、单播(点对点)
         */
        @Test
        public void contextLoads() {
            //Message需要自己构造一个;定义消息体内容和消息头
            //rabbitTemplate.send(exchage,routeKey,message);
    
            //object默认当成消息体,只需要传入要发送的对象,自动序列化发送给rabbitmq;
            //rabbitTemplate.convertAndSend(exchage,routeKey,object);
            Map<String,Object> map = new HashMap<>();
            map.put("msg","这是第一个消息");
            map.put("data", Arrays.asList("helloworld",123,true));
            //对象被默认序列化以后发送出去
            rabbitTemplate.convertAndSend("exchange.direct","atguigu.news",new Book("西游记","吴承恩"));
    
        }
    
        //接受数据,如何将数据自动的转为json发送出去
        @Test
        public void receive(){
            Object o = rabbitTemplate.receiveAndConvert("atguigu.news");
            System.out.println(o.getClass());
            System.out.println(o);
        }
    
        /**
         * 广播
         */
        @Test
        public void sendMsg(){
            rabbitTemplate.convertAndSend("exchange.fanout","",new Book("红楼梦","曹雪芹"));
        }
    
    }

      

    相关文章:https://www.cnblogs.com/boshen-hzb/p/6841982.html

  • 相关阅读:
    组合算法实现
    Memcached 和 Redis 分布式锁方案
    CLR 内存分配和垃圾收集 GC
    Windbg 的使用和常用命令
    Geohash 算法学习
    经纬度计算
    Windbg 分析CPU上涨
    Windbg 分析内存上涨
    django基于存储在前端的token用户认证
    非常详细的Django使用Token(转)
  • 原文地址:https://www.cnblogs.com/guchunchao/p/10333434.html
Copyright © 2011-2022 走看看