zoukankan      html  css  js  c++  java
  • 2.2_springboot2.x消息RabbitMQ整合&amqpAdmin管理组件的使用

    5.1.1、基本测试

    1.引 spring-boot-starter-amqp**

     <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.amqp</groupId>
                <artifactId>spring-rabbit-test</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
    

    自动配置原理说明 RabbitAutoConfiguration

    1、有自动配置了连接工厂CachingConnectionFactory;获取与rabbitmq连接信息

    @Configuration
    @ConditionalOnClass({ RabbitTemplate.class, Channel.class })
    @EnableConfigurationProperties(RabbitProperties.class)
    @Import(RabbitAnnotationDrivenConfiguration.class)
    public class RabbitAutoConfiguration {
    
    	@Configuration
    	@ConditionalOnMissingBean(ConnectionFactory.class)
    	protected static class RabbitConnectionFactoryCreator {
    
    		@Bean
    		public CachingConnectionFactory rabbitConnectionFactory(RabbitProperties properties,
    				ObjectProvider<ConnectionNameStrategy> connectionNameStrategy) throws Exception {
                
               ... 
            }
            ...
    

    2、RabbitProperties封装了 RabbitMQ的配置

    @ConfigurationProperties(prefix = "spring.rabbitmq")
    public class RabbitProperties {
    
    	/**
    	 * RabbitMQ host.
    	 */
    	private String host = "localhost";
    
    	/**
    	 * RabbitMQ port.
    	 */
    	private int port = 5672;
        ....
    

    application.yml配置

    spring:
      rabbitmq:
        host: xxx.xxx.xxx.xxx
        username: guest
        password: guest
        port: 5672
    

    3、RabbitTemplate:给RabbitMQ发送和接收消息

    @Configuration
    @ConditionalOnClass({ RabbitTemplate.class, Channel.class })
    @EnableConfigurationProperties(RabbitProperties.class)
    @Import(RabbitAnnotationDrivenConfiguration.class)
    public class RabbitAutoConfiguration {
        ....
            
    	@Configuration
    	@Import(RabbitConnectionFactoryCreator.class)
    	protected static class RabbitTemplateConfiguration {
    
    		private final RabbitProperties properties;
    
    		private final ObjectProvider<MessageConverter> messageConverter;
    
    		private final ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers;
    
    		public RabbitTemplateConfiguration(RabbitProperties properties,
    				ObjectProvider<MessageConverter> messageConverter,
    				ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers) {
    			this.properties = properties;
    			this.messageConverter = messageConverter;
    			this.retryTemplateCustomizers = retryTemplateCustomizers;
    		}
    
    		@Bean
    		@ConditionalOnSingleCandidate(ConnectionFactory.class)
    		@ConditionalOnMissingBean(RabbitOperations.class)
    		public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    			PropertyMapper map = PropertyMapper.get();
    			RabbitTemplate template = new RabbitTemplate(connectionFactory);
    			MessageConverter messageConverter = this.messageConverter.getIfUnique();
    			if (messageConverter != null) {
    				template.setMessageConverter(messageConverter);
    			}
    			template.setMandatory(determineMandatoryFlag());
    			RabbitProperties.Template properties = this.properties.getTemplate();
    			if (properties.getRetry().isEnabled()) {
    				template.setRetryTemplate(new RetryTemplateFactory(
    						this.retryTemplateCustomizers.orderedStream().collect(Collectors.toList())).createRetryTemplate(
    								properties.getRetry(), RabbitRetryTemplateCustomizer.Target.SENDER));
    			}
    			map.from(properties::getReceiveTimeout).whenNonNull().as(Duration::toMillis)
    					.to(template::setReceiveTimeout);
    			map.from(properties::getReplyTimeout).whenNonNull().as(Duration::toMillis).to(template::setReplyTimeout);
    			map.from(properties::getExchange).to(template::setExchange);
    			map.from(properties::getRoutingKey).to(template::setRoutingKey);
    			map.from(properties::getDefaultReceiveQueue).whenNonNull().to(template::setDefaultReceiveQueue);
    			return template;
    		}
    }
    

    4、AmqpAdmin:RabbitMQ系统管理组件,用来声明队列,交换器等 , 当没有在网页端自己创建queue、exchange、Binding时可采用 AmqpAdmin:创建和删除queue、exchange、Binding

    @Configuration
    @ConditionalOnClass({ RabbitTemplate.class, Channel.class })
    @EnableConfigurationProperties(RabbitProperties.class)
    @Import(RabbitAnnotationDrivenConfiguration.class)
    public class RabbitAutoConfiguration {
        
        ...
            @Bean
    		@ConditionalOnSingleCandidate(ConnectionFactory.class)
    		@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true)
    		@ConditionalOnMissingBean
    		public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
    			return new RabbitAdmin(connectionFactory);
    		}
    }
    

    2.测试RabbitMQ

    1)单波-点对点

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class Springboot02AmqpApplicationTests {
    
        @Autowired
        RabbitTemplate rabbitTemplate;
          @Test
        public void contextLoads() {
            //message需要自己定义,定义一个消息体内容
            //rabbitTemplate.send(exchage,routeKey,message);
    
            //常用的convertAndSend,消息体会自动转换,object:默认当成消息体,只要传入要发送的对象,自动序列化Babbitmq
            //rabbitTemplate.convertAndSend(exchange,routeKey,object);
            Map<String,Object> maps = new HashMap<String,Object>();
            maps.put("msg","这是一个消息");
            maps.put("data", Arrays.asList("helloworld",123,true));
            //对象被默认序列化以后发送出去(jdk)
            rabbitTemplate.convertAndSend("exchange.direct","jatpeo.news",new Book("西游记","吴承恩"));
        }
        //接收数据,如何将数据自动转为json发送出去?
        @Test
        public void receive(){
    
            Object o = rabbitTemplate.receiveAndConvert("jatpeo.news");
            System.out.println(o.getClass());
            System.out.println(o);
        }
    

    ​ 常用的convertAndSend,消息体会自动转换,object:默认当成消息体,只要传入要发送的对象,自动序列化Babbitmq,对象被默认序列化以后发送出去(jdk)

    源码分析:

    public class RabbitTemplate extends RabbitAccessor // NOSONAR type line count
    		implements BeanFactoryAware, RabbitOperations, MessageListener,
    			ListenerContainerAware, PublisherCallbackChannel.Listener, Lifecycle, BeanNameAware {
                    
                    	private MessageConverter messageConverter = new SimpleMessageConverter();
    
    

    调用SimpleMessageConverter

    public class SimpleMessageConverter extends WhiteListDeserializingMessageConverter implements BeanClassLoaderAware {
    
          content = new String(message.getBody(), encoding);
                    } catch (UnsupportedEncodingException var8) {
                        throw new MessageConversionException("failed to convert text-based Message content", var8);
                    }
                } else if (contentType != null && contentType.equals("application/x-java-serialized-object")) {
                    try {
                        content = SerializationUtils.deserialize(this.createObjectInputStream(new ByteArrayInputStream(message.getBody()), this.codebaseUrl));
                    } catch (IllegalArgumentException | IllegalStateException | IOException var7) {
                        throw new MessageConversionException("failed to convert serialized Message content", var7);
                    }
                }
    
    }
    

    自定义MessageConvert

    新建MyAMQPConfig

    @EnableRabbit//开启基于注解的RabbitMQ
    @Configuration
    public class MyAMQPConfig {
    
        @Bean
        public MessageConverter messageConverter(){
    
            return new Jackson2JsonMessageConverter();
        }
    
    
    
    
    } 
    

    2)广播

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class Springboot02AmqpApplicationTests {
    
        @Autowired
        RabbitTemplate rabbitTemplate;
         /**
         * 2、广播
         *
         * 广播路由键无所谓
         * */
    
        @Test
        public void Test(){
            rabbitTemplate.convertAndSend("exchange.fanout","",new Book("红楼梦","曹雪芹"));
    
        }
    

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-TNAhBooF-1571057027287)(C:UsersAdministratorAppDataRoamingTypora ypora-user-images1571056120180.png)]

    5.1.2、@RabbitListener和@EnableRabbit

    @EnableRabbit + @RabbitListener 监听消息队列的内容

    @RabbitListener:监听队列

    @EnableRabbit:开启基于注解的RabbitMq

    @Service
    public class BookService {
    
    
        //只要这个消息队列收到消息就打印消息,要让此注解生效要在配置类中开启注解@EnableRabbit
        @RabbitListener(queues = "jatpeo.news")
        public void receive(Book book){
    
            System.out.println("收到消息。。。打印");
        }
    
        @RabbitListener(queues = "jatpeo")
        public void receive02(Message message){
            System.out.println(message.getBody());
            System.out.println(message.getMessageProperties());
    
    
        }
    }
    

    5.1.3、AmqpAdmin:RabbitMQ

    AmqpAdmin:RabbitMQ系统管理组件,用来声明队列,交换器等

    当没有在网页端自己创建queue、exchange、Binding时可采用* AmqpAdmin:创建和删除queue、exchange、Binding

    注入AmqpAdmin

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class Springboot02AmqpApplicationTests {
    
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        @Autowired
        AmqpAdmin amqpAdmin;
    
    
        @Test
        public void createExchange(){
            //创建DirectExchange
           /* amqpAdmin.declareExchange(new DirectExchange("amqpAdmin.exchange"));
            System.out.println("创建完成");*/
            //创建队列
           //amqpAdmin.declareQueue(new Queue("amqpAdmin.queue",true));
    
            //创建绑定规则
            amqpAdmin.declareBinding(new Binding("amqpAdmin.queue", Binding.DestinationType.QUEUE,
                    "amqpAdmin.exchange","amqpAdmin.haha",null));
    
        }
    

    网页端查看:

    在这里插入图片描述

    等。

  • 相关阅读:
    android 打包错误
    mysql innoDB 挂了的临时解决方案
    android notification 传值关键
    maven eclipse 插件下载地址
    微信html5开发选哪一个
    android AsyncTask 只能在线程池里单个运行的问题
    关于Fragment 不响应onActivityResult的情况分析 (
    Android-BaseLine基础性开发框架
    linux网络流量实时监控工具之iptraf
    android 圆角按钮和按钮颜色
  • 原文地址:https://www.cnblogs.com/jatpeo/p/11767476.html
Copyright © 2011-2022 走看看