zoukankan      html  css  js  c++  java
  • RabbitMQ(二)核心组件介绍

    前言

    本文主要介绍AMQP核心组件:

    • RabbitAdmin
    • SpringAMQP 声明
    • RabbitTemplate
    • SimpleMessageListenerContainer
    • MessageListenerAdapte
    • MessageConverter

    RabbitAdmin

    RabbitAdmin类是针对RabbitMQ管理端进行封装操作,比如:Exchange操作、Queue操作,Binding绑定操作等,操作起来简单便捷!

    • 用于声明RabbitMQ相关配置、操作RabbitMQ
    • autoStartup设为true:表示Spring容器启动时自动初始化RabbitAdmin
    • 底层实现:从Spring容器中获取Exchange、Binding、RoutingKey以及Queue的@Bean声明
    • rabbitTemplate的execute方法执行对应的声明等操作
    @Configuration
    @ComponentScan({"com.orcas.spring"})
    public class RabbitMQConfig {
    
       /**
         * 创建 RabbitMQ 连接工厂
         */
        @Bean
        public ConnectionFactory connectionFactory() {
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
            connectionFactory.setAddresses("192.168.58.129:5672");
            connectionFactory.setUsername("orcas");
            connectionFactory.setPassword("1224");
            connectionFactory.setVirtualHost("/");
            return connectionFactory;
        }
    
       /**
         * 创建 RabbitAdmin 类,这个类封装了对 RabbitMQ 管理端的操作! 比如:Exchange 操作,Queue 操作,Binding 绑定 等
         */
        @Bean
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
            // 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
            rabbitAdmin.setAutoStartup(true);
            return rabbitAdmin;
        }
    } 
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class ApplicationTests {
    
        @Autowired
        private RabbitAdmin rabbitAdmin;
    
        /**
         * 交换机操作
         */
        @Test
        public void testAdminExchange() {
            // 创建交换机, 类型为 direct,durable 参数表示是否持久化
            rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false));
    
            // 创建交换机,类型为 topic,durable 参数表示是否持久化
            rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false));
    
            // 创建交换机,类型为 fanout,durable 参数表示是否持久化
            rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false));
        }
    
        /**
         * 队列操作
         */
        @Test
        public void testAdminQueue() {
            // 创建队列
            // durable 参数表示是否持久化
            rabbitAdmin.declareQueue(new Queue("test.direct.queue", false));
            // 创建队列
            // durable 参数表示是否持久化
            rabbitAdmin.declareQueue(new Queue("test.topic.queue", false));
            // 创建队列
            // durable 参数表示是否持久化
            rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false));
        }
    
        /**
         * 绑定操作
         */
        @Test
        public void testAdminBinding() {
            /**
             * 两种写法都可以,都选择绑定 队列 或者 交换机
             */
    
            /**
             * destination 需要绑定队列的名字
             * DestinationType 绑定类型,
             *          Binding.DestinationType.QUEUE 表示是队列绑定
             *          Binding.DestinationType.EXCHANGE 表示交换机绑定
             *
             * exchange 交换机名称
             * routingKey 路由key
             * arguments 额外参数(比如绑定队列,可以设置 死信队列的参数)
             */
            rabbitAdmin.declareBinding(new Binding("test.direct.queue", Binding.DestinationType.QUEUE, "test.direct", "routing_direct", new HashMap<>()));
    
            /**
             * 链式写法
             */
            rabbitAdmin.declareBinding(
                    BindingBuilder.bind(new Queue("test.topic.queue", false)) // 直接创建队列
                            .to(new TopicExchange("test.topic", false, false)) // 直接创建交换机,并建立关联关系
                            .with("routing_topic") // 指定路由 key
            );
    
            /**
             * 链式写法
             *
             * FanoutExchange 交换机,和路由key没有绑定关系,因为他是给交换机内所有的 queue 都发送消息!
             */
            rabbitAdmin.declareBinding(
                    BindingBuilder.bind(new Queue("test.fanout.queue", false)) // 直接创建队列
                            .to(new FanoutExchange("test.fanout", false, false)) // 直接创建交换机,并建立关联关系
            );
        }
    
        /**
         * 其他操作-清空队列
         */
        @Test
        public void testAdminOther() {// noWait 参数是否需要等待: true 表示需要,false 表示不需要
            //      也就是需要清空的时候,我需要等待一下,在清空(会自动等待几秒钟)
            rabbitAdmin.purgeQueue("test.fanout.queue", false);
        }
    } 

    Spring AMQP 声明

    • exchange
      • TopicExchange
      • FanoutExchange
      • DirectExchange
    • queue
    • binding
      • BindingBuilder

    spring使用@bean声明exchange queue binding 例子如下:

     @Bean  
     public TopicExchange exchange001() {  
         return new TopicExchange("topic001", true, false);  
     }  
    
     @Bean  
     public Queue queue001() {  
         return new Queue("queue001", true); //队列持久  
     }  
        
     @Bean  
     public Binding binding001() {  
         return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*");  
     }  

    RabbitTemplate

    消息模板

    • 与SpringAMQP整合的时候进行发送消息的关键类
    • 该类提供了丰富的发送消息方法,包括可靠性投递方法,回调监听消息接口ConfirmCallback,返回值确认接口ReturnCallback等等。同样我们需要进行注入到spring容器中。然后进行使用。
    • 与Spring整合时需要实例化,但是与Springboot整合时不需要,在配置文件添加配置即可
    • rabbitTemplate.convertAndSend方法是主要的发送消息的方法
    • MessageProperties 用于构造消息体
    • MessagePostProcessor:消息发送之后对消息进行的设置

    以下是RabbitTemplate实例化的例子:

     @Bean
     public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
         RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
         rabbitTemplate.setConfirmCallback(//);
         rabbitTemplate.setReturnCallback(//);
         return rabbitTemplate;
     }

    以下是发送消息例子:

        @Test
        public void testSendMessage() throws Exception {
            //1 创建消息
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.getHeaders().put("desc", "信息描述..");
            messageProperties.getHeaders().put("type", "自定义消息类型..");
            Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties);
            
            rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    System.err.println("------添加额外的设置---------");
                    message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述");
                    message.getMessageProperties().getHeaders().put("attr", "额外新加的属性");
                    return message;
                }
            });
        }

    SimpleMessageListenerContainer

    简单消息监听容器 功能:

    • 监听多个队列、自动启动、自动声明
    • 设置事务特性、事务管理器、事务属性、事务容器、是否开启事务、回滚消息
    • 设置消费者数量、最小最大数量、批量消费
    • 设置消息确认和自动确认模式、是否重回队列、异常捕获函数
    • 设置消费者标签生成策略、是否独占模式、消费者属性等
    • 设置具体的监听器、消息转换器 message convert
    • SimpleMessageListenerContainer可以进行动态设置,比如在运行中的应用可以动态的修改其消费者的大小、接收消息的模式等,可以基于此开发rabbitmq自定义后台管控平台
    属性:
    • queues: 消费队列
    • concurrentConsumers:当前消费者数
    • maxConcurrentConsumers:最大消费者并发
    • defaultRequeueRejected: 是否重回队列,默认: false
    • acknowledgeMode:消息确认模型,默认:AUTO
    • exposeListenerChannel:
    • messageListener: 消息监听
    • consumerTagStrategy:consumerTag生成策略
        /**
         * 简单消息监听容器
         * 配置完成后。可以在管控台看到消息者信息。 以及消费者标签信息
         *
         * @param connectionFactory 链接工厂
         * @return SimpleMessageListenerContainer
         */
        @Bean
        public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
            SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
            //设置要监听的队列
            simpleMessageListenerContainer.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
            //初始化消费者数量
            simpleMessageListenerContainer.setConcurrentConsumers(1);
            //最大消费者数量
            simpleMessageListenerContainer.setMaxConcurrentConsumers(5);
            //设置是否重回队列[一般为false]
            simpleMessageListenerContainer.setDefaultRequeueRejected(false);
            //设置自动ack
            simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
            //设置channel 是否外露
             simpleMessageListenerContainer.setExposeListenerChannel(true);
            //设置消费端标签的策略
            simpleMessageListenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy() {
                @Override
                public String createConsumerTag(String queueName) {
                    return queueName + "_" + UUID.randomUUID().toString();
                }
            });
            //设置消息监听 ChannelAwareMessageListener
            simpleMessageListenerContainer.setMessageListener(new ChannelAwareMessageListener() {
                @Override
                public void onMessage(Message message, Channel channel) throws Exception {
                    String msg = new String(message.getBody());
                    System.out.println("----------消费者: " + msg);
                }
            });
    
            return simpleMessageListenerContainer;
        }

    MessageListenerAdaper

    消息监听适配器

    • extends AbstractAdaptableMessageListener 消息listener
    • queueOrTagToMethodName 队列标识与方法名称组成的集合
    • defaultListenerMethod 默认监听方法名称
    • Delegate 委托对象:实际真实的委托对象
    • 可以一一进行队列和方法名称的匹配
    • 队列和方法名称绑定,即指定队列里的消息会被绑定的方法所接受处理
    /**
     * 通过`simpleMessageListenerContainer` 配置消息监听适配器。 指向这个类
     */
    public class MessageDelegate {
        /**
         * MessageListenerAdapter 默认指定接收消息的方法的名字就是 handleMessage .当然也可以手动设置
         *
         * @param messageBody message信息
         */
        public void handleMessage(byte[] messageBody) {
            System.err.println("默认方法,消息内容: " + new String(messageBody));
        }
    
        public void consumeMessage(byte[] messageBody) {
            System.err.println("字节数组方法, 消息内容:" + new String(messageBody));
        }
    
        public void consumeMessage(String messageBody) {
            System.err.println("字符串方法, 消息内容:" + messageBody);
        }
    }
    
    
    
    /**
     * spring amqp 消息转换器
     *
     * @author yangHX
     * createTime  2019/4/6 12:28
     */
    public class TextMessageConverter implements MessageConverter {
    
        /**
         * 将数据转化为 message 类
         *
         * @param o                 要发送的数据
         * @param messageProperties 消息头
         * @return Message
         * @throws MessageConversionException ex
         */
        @Override
        public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
            return new Message(o.toString().getBytes(), messageProperties);
        }
    
        /**
         * 将message转换为想要的数据类型
         *
         * @param message message
         * @return Object
         * @throws MessageConversionException ex
         */
        @Override
        public Object fromMessage(Message message) throws MessageConversionException {
    
            String contentType = message.getMessageProperties().getContentType();
            if (null != contentType && contentType.contains("text")) {
                return new String(message.getBody());
            }
            return message.getBody();
        }
    }
    
    //消息监听适配器 只截取了一小段 /* * 适配器方式。 默认是有自己的方法名字。 handleMessage * 可以自己指定一个方法的名称。 consumerMessage * 也可以添加一个转换器: 从字节数组转换为String */ MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate); //设定queue使用哪个adapter方法处理 Map<String,String> queueOrTagToMethodName = new HashMap<>(); queueOrTagToMethodName.put("queue001","method1"); queueOrTagToMethodName.put("queue002","method2"); adapter.setQueueOrTagToMethodName(queueOrTagToMethodName); //设置默认处理方法,默认处理方法是handleMessage adapter.setDefaultListenerMethod("handleMessage"); //设置消息转换方式 adapter.setMessageConverter(new TextMessageConvert()); //消息监听 container.setMessageListener(adapter); /** * 发送消息。测试转换器和适配器 * <p> * 转换器判断contentType 将字节数组转化为字符串 * 适配器将数据交给 MessageDelegate 的 consumeMessage 方法进行处理 */ @Test public void testMessage4Text() { MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("text/plan"); Message message = new Message("mq 消息1234".getBytes(), messageProperties); rabbitTemplate.send("topic001", "spring.abc", message); rabbitTemplate.send("topic002", "rabbit.abc", message); }

    MessageConverter

    • 我们在消息传输的时候,正常情况下消息体为二进制的数据方式进行传输。如果希望内部帮我们进行转换,或者指定自定义的转换器,就需要用到MessageConverter

    • 自定义常用转换器 MessageConverter 一般来讲都需要实现这个接口

    • 重写下面两个方法

      • toMessage : java 对象转换为Message
      • fromMessage : Message对象转换为java对象
    • 转换器类型
      • json转换器: jackson2JsonMessageConverter: 可以进行java对象的转换功能
      • DefaultJackson2JavaTypeMapper映射器: 可以进行java对象的映射关系
      • 自定义二进制转换器: 比如图片类型、PDF,PPT, 流媒体

    1、Json转换器:Jackson2JsonMessageConverter

    MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
    container.setMessageListener(jackson2JsonMessageConverter);

    2、支持Java对象的转换:DefaultJackson2JavaTypeMapper&Jackson2JsonMessageConverter

    MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
    adapter.setDefaultListenerMethod("consumeMessage");
            
    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
    DefaultJackson2JavaTypeMapper defaultJackson2JavaTypeMapper = new DefaultJackson2JavaTypeMapper();
    jackson2JsonMessageConverter.setJavaTypeMapper(defaultJackson2JavaTypeMapper);
            adapter.setMessageConverter(jackson2JsonMessageConverter);

    3、支持java对象多映射转换:DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter

    MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
    adapter.setDefaultListenerMethod("consumeMessage");
    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
    DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
            
    Map<String, Class<?>> idClassMapping = new HashMap<String, Class<?>>();
    // (标签, 类的全路径)将标签和类进行绑定
    idClassMapping.put("order", com.orcas.spring.entity.Order.class);
    idClassMapping.put("packaged", com.orcas.spring.entity.Packaged.class);
            
    javaTypeMapper.setIdClassMapping(idClassMapping);
            
    jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
    adapter.setMessageConverter(jackson2JsonMessageConverter);

    4、全局的转换器:ContentTypeDelegatingMessageConverter

    MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
    adapter.setDefaultListenerMethod("consumeMessage");
            
    //全局的转换器:
    ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter();
            
    TextMessageConverter textConvert = new TextMessageConverter();
    convert.addDelegate("text", textConvert);
    convert.addDelegate("html/text", textConvert);
    convert.addDelegate("xml/text", textConvert);
    convert.addDelegate("text/plain", textConvert);
            
    Jackson2JsonMessageConverter jsonConvert = new Jackson2JsonMessageConverter();
    convert.addDelegate("json", jsonConvert);
    convert.addDelegate("application/json", jsonConvert);
            
    ImageMessageConverter imageConverter = new ImageMessageConverter();
    convert.addDelegate("image/png", imageConverter);
    convert.addDelegate("image", imageConverter);
            
    PDFMessageConverter pdfConverter = new PDFMessageConverter();
    convert.addDelegate("application/pdf", pdfConverter);
                    
    adapter.setMessageConverter(convert);
    container.setMessageListener(adapter);

    5、图片转换器:

    public class ImageMessageConverter implements MessageConverter {
    
        @Override
        public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
            throw new MessageConversionException(" convert error ! ");
        }
    
        @Override
        public Object fromMessage(Message message) throws MessageConversionException {
            System.err.println("-----------Image MessageConverter----------");
            // 获取消息扩展属性中的"extName"
            Object _extName = message.getMessageProperties().getHeaders().get("extName");
            // 若为空默认为png, 否则就获取该扩展名
            String extName = _extName == null ? "png" : _extName.toString();
            
            byte[] body = message.getBody();
            String fileName = UUID.randomUUID().toString();
            // 该图的路径+图片名
            String path = "d:/010_test/" + fileName + "." + extName;
            File f = new File(path);
            try {
                Files.copy(new ByteArrayInputStream(body), f.toPath());
            } catch (IOException e) {
                e.printStackTrace();
            }
            return f;
        }
    
    }

    六、PDF转换器:

    public class PDFMessageConverter implements MessageConverter {
    
        @Override
        public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
            throw new MessageConversionException(" convert error ! ");
        }
    
        @Override
        public Object fromMessage(Message message) throws MessageConversionException {
            System.err.println("-----------PDF MessageConverter----------");
            
            byte[] body = message.getBody();
            String fileName = UUID.randomUUID().toString();
            String path = "d:/010_test/" + fileName + ".pdf";
            File f = new File(path);
            try {
                Files.copy(new ByteArrayInputStream(body), f.toPath());
            } catch (IOException e) {
                e.printStackTrace();
            }
            return f;
        }
    
    }

    七、测试代码:

        @Test
        public void testSendExtConverterMessage() throws Exception {
    //            byte[] body = Files.readAllBytes(Paths.get("d:/002_books", "picture.png"));
    //            MessageProperties messageProperties = new MessageProperties();
    //            messageProperties.setContentType("image/png");
    //            messageProperties.getHeaders().put("extName", "png");
    //            Message message = new Message(body, messageProperties);
    //            rabbitTemplate.send("", "image_queue", message);
            
                byte[] body = Files.readAllBytes(Paths.get("d:/002_books", "mysql.pdf"));
                MessageProperties messageProperties = new MessageProperties();
                messageProperties.setContentType("application/pdf");
                Message message = new Message(body, messageProperties);
                rabbitTemplate.send("", "pdf_queue", message);
        }

    引用:

    https://juejin.im/post/5d0309596fb9a07ebf4b6ad2

    https://www.javatt.com/p/11158

  • 相关阅读:
    Kafka如何保证读写的跨分区与会话
    Kafka topic中的partition的leader选举
    Kafka为什么这么快
    sqoop导入导出
    为什么要用redis去重
    bypass SortShuffleManager的bypass运行机制
    大数据常用端口号
    vector基础
    【拓扑排序】
    【POJ】Crazy Search(hash)
  • 原文地址:https://www.cnblogs.com/caoweixiong/p/12915407.html
Copyright © 2011-2022 走看看