zoukankan      html  css  js  c++  java
  • RabbitMQ(三) RabbitMQ高级整合应用

    RabbitMQ整合Spring AMQP实战

    常用组件介绍

    • RabbitAdmin

    • Spring AMQP声明 通过@Bean注解进行声明

    • RabbitTemplate

    • SimpleMessageListenerContainer 对消息消费进行详细配置和优化

    • MessageListenerAdapter 消息监听适配器,建立在监听器基础之上

    • MessageConverter

    RabbitAdmin

    • RabbitAdmin类可以很好的操作RabbitMQ,在Spring中直接进行注入即可

    • 注意:autoSatrtup必须设置为true,否则spring容器不会加载RabbitAdmin类

    • RabbitAdmin底层实现就是从Spring容器中获取Exchange、Bingding、RoutingKey以及Queue的@Bean声明;

    • 底层使用RabbitTemplate的execute方法执行对应的声明、修改、删除等一系列RabbitMQ基础功能操作;

    RabbitMQ简单使用

    pom.xml

    <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>	
    

    配置

    @Configuration
    public class RabbitMqConfig1 {
        /**
         * 设置连接
         * @return ConnectionFactory
         */
        @Bean
        public ConnectionFactory connectionFactory(){
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
            connectionFactory.setHost("localhost");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            connectionFactory.setVirtualHost("/");
            return connectionFactory;
        }
    
        /**
         * 创建RabbitAdmin
         * @return RabbitAdmin
         */
        @Bean
        public RabbitAdmin rabbitAdmin() {
            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
            //默认就是true
            rabbitAdmin.setAutoStartup(true);
            return rabbitAdmin;
        }
    }
    

    测试

     @Autowired
        private RabbitAdmin rabbitAdmin;
    
        /**
         * RabbitAdmin api应用
         */
        @Test
        public void testAdmin() {
    
            rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false));
    
            rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false));
    
            rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false));
    
            rabbitAdmin.declareQueue(new Queue("test.direct.queue", false));
    
            rabbitAdmin.declareQueue(new Queue("test.topic.queue", false));
    
            rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false));
    
            //绑定
            rabbitAdmin.declareBinding(new Binding("test.direct.queue",
                    Binding.DestinationType.QUEUE,
                    "test.direct", "direct", new HashMap<>()));
    
            //使用 BindingBuilder 创建绑定
            // https://docs.spring.io/spring-amqp/docs/2.1.16.BUILD-SNAPSHOT/reference/html/#builder-api
            rabbitAdmin.declareBinding(
                    BindingBuilder
                            .bind(new Queue("test.topic.queue", false))        //直接创建队列
                            .to(new TopicExchange("test.topic", false, false))    //直接创建交换机 建立关联关系
                            .with("user.#"));    //指定路由Key
    
    
            //FanoutExchange 类型exchange不走路由键
            rabbitAdmin.declareBinding(
                    BindingBuilder
                            .bind(new Queue("test.fanout.queue", false))
                            .to(new FanoutExchange("test.fanout", false, false)));
    
            //清空队列数据
            // rabbitAdmin.purgeQueue("test.topic.queue", false);
        }
    

    SpringAMQP声明(Exchange、Queue、Binding)

    在RabbitMQ基础AP里面声明一个Exchange、声明一个绑定、一个队列

    //基础API声明一个exchange
    channel.exchangeDeclare(String exchange,String type,boolean durable,boolean autoDelete,boolean internal,Map<String, Object> arguments)
    //基础API 声明一个队列
    channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                     Map<String, Object> arguments) 
    //基础API 声明binding
    channel.queueBind(String queue, String exchange, String routingKey)
    

    使用SpringAMQP去声明,就需要使用SpringAMQP的如下模式,即声明@Bean方式

       //声明Topic 类型的exchange 
       @Bean
        public TopicExchange topicExchange() {
            //exchange 持久化
            // Exchange springEchange = ExchangeBuilder.topicExchange("spring_amqp_test_echange").durable(true).build();
            return new TopicExchange("spring_amqp_test_echange", true, false);
        }
    
        //声明队列
        @Bean
        public Queue queue() {
            //   Queue spring_amqp_test_echange = QueueBuilder.durable("spring_amqp_test_echange").build();
            return new Queue("spring_amqp_test_queue");
        }
    
       //建立绑定
        @Bean
        public Binding binding(TopicExchange topicExchange, Queue queue) {
            return BindingBuilder.bind(queue).to(topicExchange).with("spring.*");
        }
    

    消息模板 RabbitTemplate

    • RabbitTemplate,即消息模板。

    • 在与SpringAMQP整合的时候进行发送消息的关键类

    • 该类提供了丰富的发送消息的方法,包括可靠性投递消息方法、回调监听消息接口ConfirmCallback、返回值确认接口ReturnCallback等等。同样我们需要进入注入到Spring容器中,然后直接使用;

    • 在与Spring整合时需要实例化,但是在与SpringBoot整合时,在配置文件里添加配置即可;

    RabbitTemplate简单使用

    配置

    
    @Configuration
    public class RabbitMqConfig3 {
        /**
         * 设置连接
         *
         * @return ConnectionFactory
         */
        @Bean
        public ConnectionFactory connectionFactory() {
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
            connectionFactory.setHost("localhost");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            connectionFactory.setVirtualHost("/");
            return connectionFactory;
        }
    
        /**
         * 创建RabbitAdmin
         *
         * @return RabbitAdmin
         */
        @Bean
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
            //默认就是true
            rabbitAdmin.setAutoStartup(true);
            return rabbitAdmin;
        }
    
        /**
         * 消息模板
         *
         * @param connectionFactory connectionFactory
         * @return RabbitTemplate
         */
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            return new RabbitTemplate(connectionFactory);
        }
    
        /**
         * 针对消费者配置
         * 1. 设置交换机类型
         * 2. 将队列绑定到交换机
         * FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
         * HeadersExchange :通过添加属性key-value匹配
         * DirectExchange:按照routingkey分发到指定队列
         * TopicExchange:多关键字匹配
         */
        @Bean
        public TopicExchange exchange001() {
            return new TopicExchange("topic001", true, false);
        }
    
        @Bean
        public Queue queue001() {
            return new Queue("queue001", true); //队列持久
        }
    
        @Bean
        public Binding binding001(TopicExchange exchange001, Queue queue001) {
            return BindingBuilder.bind(queue001).to(exchange001).with("spring.*");
        }
    }
    
    

    测试

       @Test
        public void testSendMessage() {
            //1 创建消息
            //AMQP消息的消息属性
            //MessageBuilder(也可以构建Message) 使用流利的API从byte[]主体或其他消息构建Spring AMQP消息。
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.getHeaders().put("desc", "信息描述..");
            messageProperties.getHeaders().put("type", "自定义消息类型..");
            Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties);
    
            rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    System.err.println("------添加额外的设置---------");
                    message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述");
                    message.getMessageProperties().getHeaders().put("attr", "额外新加的属性");
                    return message;
                }
            });
        }
    

    队列queue001
    在这里插入图片描述

     @Test
    public void testSendMessage2() throws Exception {
            //1 创建消息
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setContentType("text/plain");
            Message message = new Message("mq 消息1234".getBytes(), messageProperties);
    
            rabbitTemplate.send("topic001", "spring.abc", message);
    
            rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!");
            rabbitTemplate.convertAndSend("topic002", "rabbit.abc", "hello object message send!");
        }
    

    队列queue001
    在这里插入图片描述
    队列queue002
    在这里插入图片描述

    简单消息监听容器:SimpleMessageListenerContainer

    • 这个类非常的强大,我们可以对他进行很多设置,对于消费者的配置项,这个类都可以满足
    • 监听队列(多个队列)、自动启动、自动声明功能
    • 设置事务特性、事务管理器、事务属性、事务容量(并发)、是否开启事务、回滚消息等
    • 设置消费者数量、最小最大数量、批量消费
    • 设置消息确认和自动确认模式、是否重回队列、异常捕获handler函数
    • 设置消费者标签生成策略、是否独占模式、消费者属性等
    • 设置具体的监听器、消息转换器等等

    注意:SimpleMessageListenerContainer可以进行动态设置,比如在运行中的应用可以动态的修改其消费者数量的大小、接收消息的模式等。很多基于RabbitMQ的自制定化后端管控台在进行动态设置的时候,也是根据这一特性去实现的。所以可以看出SpringAMQP非常的强大;
    思考一下:SimpleMessageListenerContainer为什么可以动态感知配置变更?
    在这里插入图片描述
    在这里插入图片描述

    配置

     @Bean
        public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
    
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
            // container.setQueueNames(); 接收字符串的队列名
            //
            container.setQueues(queue001(), queue002(), queue003());
            //当前消费者数量
            container.setConcurrentConsumers(1);
            //最大消费者数量
            container.setMaxConcurrentConsumers(5);
            //是否使用重队列
            container.setDefaultRequeueRejected(false);
            //自动签收
            container.setAcknowledgeMode(AcknowledgeMode.AUTO);
            container.setExposeListenerChannel(true);
    
            //消费端的标签策略
            container.setConsumerTagStrategy(new ConsumerTagStrategy() {
                @Override
                public String createConsumerTag(String queue) {
                    return queue + "_" + UUID.randomUUID().toString();
                }
            });
    
            //设置消息监听
            //必须设置消息监听 否则 报  No message listener specified - see property 'messageListener'
            container.setMessageListener(new ChannelAwareMessageListener() {
                @Override
                public void onMessage(Message message, Channel channel) throws Exception {
    
                    String msg = new String(message.getBody());
                    System.err.println("----------消费者: " + msg);
                    //做消息处理....
                }
            });
    
    
            return container;
        }
    

    消息监听适配器:MessageListenerAdapter

    通过MessageListenerAdapter的代码我们可以看出如下核心属性:

    • defaultListenerMethod默认监听方法名称:用于设置监听方法名称
    • Delegate委托对象:实际真实的委托对象,用于处理消息、
    • queueOrTagToMethodName: 队列标识与方法名称组成的集合
    • 可以一一进行队列与方法名称的匹配;
    • 队列和方法名称绑定,即指定队列里的消息会被绑定的方法所接收处理;

    配置

    public class MessageDelegate1 {
    
        public void handleMessage(byte[] messageBody) {
            System.err.println("默认方法, 消息内容:" + new String(messageBody));
        }
    
        public void consumeMessage(byte[] messageBody) {
            System.err.println("字节数组方法, 消息内容:" + new String(messageBody));
        }
    
        public void consumeMessage(String messageBody) {
            System.err.println("字符串方法, 消息内容:" + messageBody);
        }
    
        public void method1(String messageBody) {
            System.err.println("method1 收到消息内容:" + new String(messageBody));
        }
    
        public void method2(String messageBody) {
            System.err.println("method2 收到消息内容:" + new String(messageBody));
        }
    
    
        public void consumeMessage(Map messageBody) {
            System.err.println("map方法, 消息内容:" + messageBody);
        }
    }
    
      @Bean
      public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
    
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
            // container.setQueueNames(); 接收字符串的队列名
            //
            container.setQueues(queue001(), queue002(), queue003());
            //当前消费者数量
            container.setConcurrentConsumers(1);
            //最大消费者数量
            container.setMaxConcurrentConsumers(5);
            //是否使用重队列
            container.setDefaultRequeueRejected(false);
            //自动签收
            container.setAcknowledgeMode(AcknowledgeMode.AUTO);
            container.setExposeListenerChannel(true);
    
            //消费端的标签策略
            container.setConsumerTagStrategy(new ConsumerTagStrategy() {
                @Override
                public String createConsumerTag(String queue) {
                    return queue + "_" + UUID.randomUUID().toString();
                }
            });
    
    
            //1 适配器方式. 默认是有自己的方法名字的:handleMessage
            // 可以自己指定一个方法的名字: consumeMessage
            // 也可以添加一个转换器: 从字节数组转换为String
            //MessageDelegate1如何写 MessageListenerAdapter 源码里面也给出了一些建议
            MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate1());
            //默认的方法是 	public static final String ORIGINAL_DEFAULT_LISTENER_METHOD = "handleMessage";
            adapter.setDefaultListenerMethod("consumeMessage");
            //TextMessageConverter 自定义的消息转换器
            //new TextMessageConverter()-->consumeMessage(byte[] messageBody))->MessageProperties.setContentType("text/plian")
            //new Jackson2JsonMessageConverter()--->consumeMessage(Map messageBody))->MessageProperties.setContentType("application/json")
             // adapter.setMessageConverter(new Jackson2JsonMessageConverter());
            container.setMessageListener(adapter);
            return container;
        }
    

    MessageConverter消息转换器

    • 我们在进行发送消息的时候,正常情况下消息体为二进制的数据方式进行传输,如果希望内部帮我们进行转换,或者指定自定义的转换器,就需要用到MessageConverter;

    • 自定义常用转换器:MessageConverter,一般来讲都需要实现这个接口

    • 重写下面两个方法:

      • toMessage:java对象转换为Message
      • fromMessage:Message对象转换为java对象
    • MessageConverter消息转换器:

    • Json转换器:Jackson2JsonMessageConverter:可以进行java对象的转换功能;

    • DefaultJackson2JavaTypeMapper映射器:可以进行java对象的映射关系;

    • 自定义二进制转换器:比如图片类型、PDF、PPT、流媒体等

    使用转换器的目的是当传入不同的类型的数据(如json,类,PDF,图片等)时,在消息的接收方接收到时也总是以传入的类型接收结果对象;我们通过写入不同的转换器以达到此种效果。具体可百度。

    JSON格式转换

    默认监听方法的参数为Map

    public class Order {
    
    	private String id;
    	
    	private String name;
    	
    	private String content;
    
    	public Order() {
    	}
    	
    	public Order(String id, String name, String content) {
    		this.id = id;
    		this.name = name;
    		this.content = content;
    	}
    
    	public String getId() {
    		return id;
    	}
    
    	public void setId(String id) {
    		this.id = id;
    	}
    
    	public String getName() {
    		return name;
    	}
    
    	public void setName(String name) {
    		this.name = name;
    	}
    
    	public String getContent() {
    		return content;
    	}
    
    	public void setContent(String content) {
    		this.content = content;
    	}
    	
    }
    

    配置

         // 1.1 支持json格式的转换器
            MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
            //  public void consumeMessage(Map messageBody) {
            //        System.err.println("map方法, 消息内容:" + messageBody);
            //    }
            //对应map参数方法
            adapter.setDefaultListenerMethod("consumeMessage");
    
    
            Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
            adapter.setMessageConverter(jackson2JsonMessageConverter);
    
            container.setMessageListener(adapter);
    

    测试

      @Test
        public void testSendJsonMessage() throws Exception {
    
            Order order = new Order();
            order.setId("001");
            order.setName("消息订单");
            order.setContent("描述信息");
            ObjectMapper mapper = new ObjectMapper();
            String json = mapper.writeValueAsString(order);
            System.err.println("order 4 json: " + json);
    
            MessageProperties messageProperties = new MessageProperties();
            //这里注意一定要修改contentType为 application/json
            messageProperties.setContentType("application/json");
            Message message = new Message(json.getBytes(), messageProperties);
    
            rabbitTemplate.send("topic001", "spring.order", message);
        }
    
    JSON格式转换支持Java对象

    默认监听方法的参数为Java对象

    委托对象方法

    public void consumeMessage(Order order) {
    		System.err.println("order对象, 消息内容, id: " + order.getId() + 
    				", name: " + order.getName() + 
    				", content: "+ order.getContent());
    	}
    

    配置

         // 1.2 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter 支持java对象转换
    
            MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
            adapter.setDefaultListenerMethod("consumeMessage");
    
            Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
    
            DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
    
            //信任所有的包,否则会报 报不信任
            javaTypeMapper.setTrustedPackages("*");
    
            jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
    
            adapter.setMessageConverter(jackson2JsonMessageConverter);
            container.setMessageListener(adapter);
    

    测试

     @Test
        public void testSendJavaMessage() throws Exception {
    
            Order order = new Order();
            order.setId("001");
            order.setName("订单消息");
            order.setContent("订单描述信息");
            ObjectMapper mapper = new ObjectMapper();
            String json = mapper.writeValueAsString(order);
            System.err.println("order 4 json: " + json);
    
            MessageProperties messageProperties = new MessageProperties();
            //这里注意一定要修改contentType为 application/json
            messageProperties.setContentType("application/json");
            //__TypeId__ 这个是固定写法
            messageProperties.getHeaders().put("__TypeId__", "com.niugang.spring.entity.Order");
            Message message = new Message(json.getBytes(), messageProperties);
    
            rabbitTemplate.send("topic001", "spring.order", message);
        }
    

    输出

    order对象, 消息内容, id: 001, name: 订单消息, content: 订单描述信息
    
    JSON格式转换支持Java对象(二)

    委托对象方法

    	public void consumeMessage(Order order) {
    		System.err.println("order对象, 消息内容, id: " + order.getId() + 
    				", name: " + order.getName() + 
    				", content: "+ order.getContent());
    	}
    	
    	public void consumeMessage(Packaged pack) {
    		System.err.println("package对象, 消息内容, id: " + pack.getId() + 
    				", name: " + pack.getName() + 
    				", content: "+ pack.getDescription());
    	}
    

    配置

     //1.3 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter 支持java对象多映射转换
    
             MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
             adapter.setDefaultListenerMethod("consumeMessage");
             Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
             DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
    
             Map<String, Class<?>> idClassMapping = new HashMap<String, Class<?>>();
             idClassMapping.put("order", com.niugang.spring.entity.Order.class);
             idClassMapping.put("packaged", com.niugang.spring.entity.Packaged.class);
    
             javaTypeMapper.setIdClassMapping(idClassMapping);
    
             jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
             adapter.setMessageConverter(jackson2JsonMessageConverter);
             container.setMessageListener(adapter);
    

    测试

       @Test
        public void testSendMappingMessage() throws Exception {
    
            ObjectMapper mapper = new ObjectMapper();
    
            Order order = new Order();
            order.setId("001");
            order.setName("订单消息");
            order.setContent("订单描述信息");
    
            String json1 = mapper.writeValueAsString(order);
            System.err.println("order 4 json: " + json1);
    
            MessageProperties messageProperties1 = new MessageProperties();
            //这里注意一定要修改contentType为 application/json
            messageProperties1.setContentType("application/json");
            messageProperties1.getHeaders().put("__TypeId__", "order");
            Message message1 = new Message(json1.getBytes(), messageProperties1);
            rabbitTemplate.send("topic001", "spring.order", message1);
    
            Packaged pack = new Packaged();
            pack.setId("002");
            pack.setName("包裹消息");
            pack.setDescription("包裹描述信息");
    
            String json2 = mapper.writeValueAsString(pack);
            System.err.println("pack 4 json: " + json2);
    
            MessageProperties messageProperties2 = new MessageProperties();
            //这里注意一定要修改contentType为 application/json
            messageProperties2.setContentType("application/json");
            messageProperties2.getHeaders().put("__TypeId__", "packaged");
            Message message2 = new Message(json2.getBytes(), messageProperties2);
            rabbitTemplate.send("topic001", "spring.pack", message2);
        }
    
    全局消息转化器与自定义转化器

    自定义文本转化器

    public class TextMessageConverter implements MessageConverter {
    
    	@Override
    	public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
    		return new Message(object.toString().getBytes(), messageProperties);
    	}
    
    	@Override
    	public Object fromMessage(Message message) throws MessageConversionException {
    		String contentType = message.getMessageProperties().getContentType();
    		if(null != contentType && contentType.contains("text")) {
    			return new String(message.getBody());
    		}
    		return message.getBody();
    	}
    
    }
    
    

    自定义图片转化器

    /**
     * 图片转化器
     */
    public class ImageMessageConverter implements MessageConverter {
    
        @Override
        public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
            throw new MessageConversionException(" convert error ! ");
        }
    
        @Override
        public Object fromMessage(Message message) throws MessageConversionException {
            System.err.println("-----------Image MessageConverter----------");
    
            Object _extName = message.getMessageProperties().getHeaders().get("extName");
            String extName = _extName == null ? "png" : _extName.toString();
    
            byte[] body = message.getBody();
            String fileName = UUID.randomUUID().toString();
            //目录必须存在
            String path = "d:/springbootlog/" + fileName + "." + extName;
            File f = new File(path);
            try {
                //拷贝到指定路径
                Files.copy(new ByteArrayInputStream(body), f.toPath());
            } catch (IOException e) {
                e.printStackTrace();
            }
            return f;
        }
    
    }
    
    

    自定义pdf转化器

    public class PDFMessageConverter implements MessageConverter {
    
    	@Override
    	public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
    		throw new MessageConversionException(" convert error ! ");
    	}
    
    	@Override
    	public Object fromMessage(Message message) throws MessageConversionException {
    		System.err.println("-----------PDF MessageConverter----------");
    		
    		byte[] body = message.getBody();
    		String fileName = UUID.randomUUID().toString();
    		String path = "d:/springbootlog/" + fileName + ".pdf";
    		File f = new File(path);
    		try {
    			Files.copy(new ByteArrayInputStream(body), f.toPath());
    		} catch (IOException e) {
    			e.printStackTrace();
    		}
    		return f;
    	}
    
    }
    

    委托对象

        public void consumeMessage(File file) {
            System.err.println("文件对象 方法, 消息内容:" + file.getName());
        }
    

    配置

         //1.4 ext convert
    
            MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
            adapter.setDefaultListenerMethod("consumeMessage");
    
            //全局的转换器:
            ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter();
    
            TextMessageConverter textConvert = new TextMessageConverter();
            convert.addDelegate("text", textConvert);
            convert.addDelegate("html/text", textConvert);
            convert.addDelegate("xml/text", textConvert);
            convert.addDelegate("text/plain", textConvert);
    
            Jackson2JsonMessageConverter jsonConvert = new Jackson2JsonMessageConverter();
            convert.addDelegate("json", jsonConvert);
            convert.addDelegate("application/json", jsonConvert);
    
            ImageMessageConverter imageConverter = new ImageMessageConverter();
            convert.addDelegate("image/png", imageConverter);
            convert.addDelegate("image", imageConverter);
    
            PDFMessageConverter pdfConverter = new PDFMessageConverter();
            convert.addDelegate("application/pdf", pdfConverter);
    
    
            adapter.setMessageConverter(convert);
            container.setMessageListener(adapter);
    

    测试

    @Test
        public void testSendExtConverterMessage() throws Exception {
            byte[] body = Files.readAllBytes(Paths.get("C:\Users\Administrator\Desktop\公众号", "spring.png"));
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setContentType("image/png");
            messageProperties.getHeaders().put("extName", "png");
            Message message = new Message(body, messageProperties);
            rabbitTemplate.send("", "image_queue", message);
    
            byte[] body1 = Files.readAllBytes(Paths.get("D:\Documents\技术书籍", "Java huashan-2019-06-20.pdf"));
            MessageProperties messageProperties1 = new MessageProperties();
            messageProperties.setContentType("application/pdf");
            Message message1 = new Message(body1, messageProperties);
            rabbitTemplate.send("", "pdf_queue", message1);
        }
    

    SpringBoot整合配置详解(生产端)

    • publisher-confirms,实现一个监听器用于监听Broker端给我们返回的确认请求:RabbitTemplate.ConfirmCallback
    • publisher-returns,保证消息对Broker端是可达的,如果出现路由键不可达的情况,则使用监听器对不可达的消息进行后续的处理,保证消息的路由成功: RabbitTemplate.ReturnCallback

    注意一点,在发送消息的时候对template进行配置mandatory=true保证监听有效;生产端还可以配置其他属性,比如发送重试,超时时间、次数、间隔等。

    生产端代码示例

    application.properties

    spring.rabbitmq.addresses=localhost:5672
    #spring.rabbitmq.host=localhost
    #spring.rabbitmq.port=5762
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    spring.rabbitmq.virtual-host=/
    spring.rabbitmq.connection-timeout=15000
    
    # 消息确认模式
    spring.rabbitmq.publisher-confirms=true
    # 消息返回模式
    spring.rabbitmq.publisher-returns=true
    # 为true 消息返回模式才生效
    spring.rabbitmq.template.mandatory=true
    
    

    配置

    /**
     * springboot  消息生产者
     *
     * @author niugang
     */
    @Configuration
    public class RabbitMqConfig {
           /**
         * 自动注入RabbitTemplate模板类
         */
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        /**
         * 回调函数: confirm确认
         */
        final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.err.println("correlationData: " + correlationData);
                System.err.println("ack: " + ack);
                if (!ack) {
                    System.err.println("异常处理....");
                }
            }
        };
    
        /**
         * 回调函数: return返回
         */
        final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
                                        String exchange, String routingKey) {
                System.err.println("return exchange: " + exchange + ", routingKey: "
                        + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
            }
        };
    
    
       /*
        队列监听在消费者端配置,没有将会自动创建
        @Bean
        public TopicExchange topicExchange() {
            return new TopicExchange("exchange-1");
        }
    
        @Bean
        public Queue queue() {
            return new Queue("queue-1");
        }
    
    
        @Bean
        public Binding binding(Queue queue, TopicExchange topicExchange) {
            return BindingBuilder.bind(queue).to(topicExchange).with("springboot.#");
        }*/
    
         /**
         * 发送消息方法调用: 构建Message消息
         *
         * @param message    消息体
         * @param properties 消息属性
         */
        public void send(Object message, Map<String, Object> properties) {
    
            MessageProperties messageProperties = new MessageProperties();
            if (properties != null && properties.size() > 0) {
                Set<Map.Entry<String, Object>> entries = properties.entrySet();
                for (Map.Entry<String, Object> entry : entries) {
                    String key = entry.getKey();
                    Object value = entry.getValue();
                    messageProperties.setHeader(key, value);
                }
            }
    
            //org.springframework.amqp.core
            Message msg = MessageBuilder.withBody(message.toString().getBytes()).andProperties(messageProperties).build();
    
            //id + 时间戳 全局唯一
            CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    
            rabbitTemplate.setConfirmCallback(confirmCallback);
            rabbitTemplate.setReturnCallback(returnCallback);
    
            //routingKey修改 为 spring.abc 消息将走 returnCallback
            rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", msg, correlationData);
        }
       
    }
    

    测试

    在rabbitmq控制台新建,Exchange名为exchange-1,新建队列queue-1,并建立两者之间的绑定,routingKey为springboot.#

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class ApplicationTests {
    
    	@Test
    	public void contextLoads() {
    	}
    	
        @Autowired
        private RabbitMqConfig rabbitMqConfig ;
    
        private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
    
        @Test
        public void testSender1() throws Exception {
            Map<String, Object> properties = new HashMap<>();
            properties.put("number", "12345");
            properties.put("send_time", simpleDateFormat.format(new Date()));
            rabbitMqConfig.send("Hello RabbitMQ For Spring Boot!"+System.currentTimeMillis(), properties);
        }
    	
    }
    

    注意:进行单元测试,ack一直是false;改为url请求,ack就正常了

    SpringBoot整合配置详解(消费端)

    消费端核心配置

    #    NONE, MANUAL, AUTO;  手工消息消息确认
    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    #监听器调用程序线程的最小数量。
    spring.rabbitmq.listener.simple.concurrency=5
    #监听器调用程序线程的最大数量。
    spring.rabbitmq.listener.simple.max-concurrency=10
    # spring.rabbitmq.listener.type=simple 默认为 SimpleContainer  模式对应  spring.rabbitmq.listener.simple 前缀相关的
    

    注意点

    • 首先配置手工确认模式,用于ACK的手工处理,这样我们可以保证消息的可靠性送达,或者在消费端消费失败的时候可以做到重回队列、根据业务记录日志等处理
    • 可以设置消费端的监听个数和最大个数,用于控制消费端的并发情况。

    @RabbitListener注解的使用

    • 消费端监听@RabbitMQListener注解,这个对于在实际工作中非常的好用。
    • @RabbitListener是一个组合注解,里面可以注解配置
    • @QueueBinding、@Queue、@Exchange直接通过这个组合注解一次性搞定消费端交换机、队列、绑定、路由、并且配置监听功能等。

    消费者端代码示例

    类配置写在代码里非常不友好,所以强烈建议大家使用配置文件配置。

    properties

    #spring.rabbitmq.addresses=localhost:5672
    spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    spring.rabbitmq.virtual-host=/
    spring.rabbitmq.connection-timeout=15000
    #    NONE, MANUAL, AUTO;  手工消息消息确认
    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    #监听器调用程序线程的最小数量。
    spring.rabbitmq.listener.simple.concurrency=5
    #监听器调用程序线程的最大数量。
    spring.rabbitmq.listener.simple.max-concurrency=10
    # spring.rabbitmq.listener.type=simple 默认为 SimpleContainer  模式对应  spring.rabbitmq.listener.simple 前缀相关的
    spring.rabbitmq.listener.order.queue.name=queue-2
    spring.rabbitmq.listener.order.queue.durable=true
    spring.rabbitmq.listener.order.exchange.name=exchange-2
    spring.rabbitmq.listener.order.exchange.durable=true
    spring.rabbitmq.listener.order.exchange.type=topic
    spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
    spring.rabbitmq.listener.order.key=springboot.*
    

    配置

    public class Order implements Serializable {
    
    	private String id;
    	private String name;
    	
    	public Order() {
    	}
    	public Order(String id, String name) {
    		super();
    		this.id = id;
    		this.name = name;
    	}
    	public String getId() {
    		return id;
    	}
    	public void setId(String id) {
    		this.id = id;
    	}
    	public String getName() {
    		return name;
    	}
    	public void setName(String name) {
    		this.name = name;
    	}
    	
    	
    }
    
    /**
     * 消费者类
     *
     * @author niugang
     */
    @Configuration
    public class RabbitMQReceiver {
    
    
        /**
         * 从1.5.0版开始,您可以在类级别指定@RabbitListener注释。
         * 与新的@RabbitHandler批注一起,这使单个侦听器可以根据传入消息的有效负载类型调用不同的方法。
         *
         * @RabbitListener(id="multi", queues = "someQueue")
         * @SendTo("my.reply.queue") public class MultiListenerBean {
         * @RabbitHandler public String thing2(Thing2 thing2) {
         * ...
         * }
         * @RabbitHandler public String cat(Cat cat) {
         * ...
         * }
         * @RabbitHandler public String hat(@Header("amqp_receivedRoutingKey") String rk, @Payload Hat hat) {
         * ...
         * }
         * @RabbitHandler(isDefault = true)
         * public String defaultMethod(Object object) {
         * ...
         * }
         * }
         * 在这种情况下,如果转换后的有效负载是Thing2,Cat或Hat,则会调用各个@RabbitHandler方法。
         * 您应该了解,系统必须能够根据有效负载类型识别唯一方法。
         * 检查该类型是否可分配给没有注释或带有@Payload注释的单个参数。
         * 请注意,如方法级别@RabbitListener(前面所述)中所述,应用了相同的方法签名。
         */
        //队列 exchange  绑定 没有 自动创建
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(value = "queue-1",
                        durable = "true"),
                exchange = @Exchange(value = "exchange-1",
                        durable = "true",
                        type = ExchangeTypes.TOPIC,
                        ignoreDeclarationExceptions = "true"),
                key = "springboot.*" //routing key
        )
        )
        @RabbitHandler
        //@RabbitListener 提供了很多灵活的签名 如Message Channel  @Payload  @Header 等 具体可查看源码
        // org.springframework.amqp.core.Message
        // org.springframework.messaging.Message
        public void onMessage(Message message, Channel channel) throws Exception {
            System.err.println("--------------------------------------");
            System.err.println("消费端Payload: " + new String(message.getBody()));
            System.err.println("消费端MessageProperties.: " + message.getMessageProperties());
            //AmqpHeaders header属性封装
            //手工ACK
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    
    
        /**
         * spring.rabbitmq.listener.order.queue.name=queue-2
         * spring.rabbitmq.listener.order.queue.durable=true
         * spring.rabbitmq.listener.order.exchange.name=exchange-1
         * spring.rabbitmq.listener.order.exchange.durable=true
         * spring.rabbitmq.listener.order.exchange.type=topic
         * spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
         * spring.rabbitmq.listener.order.key=springboot.*
         *
         * @param order   order
         * @param channel channel
         * @param headers headers
         * @throws Exception Exception
         */
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}",
                        durable = "${spring.rabbitmq.listener.order.queue.durable}"),
                exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}",
                        durable = "${spring.rabbitmq.listener.order.exchange.durable}",
                        type = "${spring.rabbitmq.listener.order.exchange.type}",
                        ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),
                key = "${spring.rabbitmq.listener.order.key}"
        )
        )
        @RabbitHandler
        //@Headers 必须通过Map接收
        //@Header("amqp_receivedRoutingKey") String rk 直接获取header中某一个key
        //默认前缀为amqp_
        /**
         * {amqp_receivedDeliveryMode=PERSISTENT,
         * amqp_receivedExchange=exchange-2,
         * amqp_deliveryTag=1,
         * amqp_consumerQueue=queue-2,
         * amqp_redelivered=false,
         amqp_receivedRoutingKey=springboot.def,
         spring_listener_return_correlation=175a21c4-ffd5-4a3e-ac3a-2f63d60c18a5,
         spring_returned_message_correlation=0987654321,
         id=53443ced-0b23-3079-71c2-09997897a553,
         amqp_consumerTag=amq.ctag-V0hqyVObrHXJeC60MwPSVQ,
         contentType=application/x-java-serialized-object,
         timestamp=1591240122842}
         */
        public void onOrderMessage(@Payload Order order,
                                   Channel channel,
                                   @Headers Map<String, Object> headers) throws Exception {
            System.err.println("--------------------------------------");
            System.err.println("消费端order: " + order.getId());
            System.err.println("消费端headers: " + headers);
            Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
            //手工ACK
            channel.basicAck(deliveryTag, false);
        }
    
    
    }
    
    

    在这里插入图片描述

  • 相关阅读:
    搜索引擎判断跳转
    NPOI 2.0 教程(二):编辑既存的EXCEL文件
    linux服务器性能状态查看
    买服务器网址
    最全的蜘蛛
    CentOS中配置lvm存储
    CentOS中对ext4文件系统做磁盘配额
    CentOS中配置SoftWareRaid磁盘冗余阵列
    CentOS添加磁盘分区
    CentOS源码包安装apache、nginx服务
  • 原文地址:https://www.cnblogs.com/niugang0920/p/13043708.html
Copyright © 2011-2022 走看看