zoukankan      html  css  js  c++  java
  • RabbitMq 概述

    RabbitMQ是实现了高级消息队列协议(Advanced Message Queueing Protocol , AMQP)的开源消息代理软件(亦称面向消息的中间件)。

    1、AMQP协议

      RocketMQ基于JMS(Java Messaging Service , java 消息服务)协议。

      重点:协议的作用:两者的交互基础。

      三个主要的功能模块:

        1.1、“exchange”接收发布应用程序发送的消息,并根据一定的规则将这些消息路由到“消息队列”。(交换器类型:direct, fanout, topic, header)

        1.2、“message queue”存储消息,直到这些消息被消费者安全处理完为止。

        1.3、“binding”定义了exchange和message queue之间的关联,提供路由规则。

      AMQP协议是一个二进制协议。

       协议模型图:

      

    2、RabbitMQ

      2.1、交换器类型:

        2.1.1、direct (默认):每个队列都会使用它的队列名字作为路由关键字(routing key)去自动地绑定到默认交换器上。

        2.1.2、fanout : 该类型的交换器会将消息转发给所有与之绑定的队列上。(类似广播机制)

        2.1.3、topic : 该类型的交换器会视消息路由关键字和绑定路由关键字之间的匹配情况,进行消息的路由转发。解析:一个消息过来后,将根据路由key转发给符合要求的所有队列。

        2.1.4、headers : 用于路由的属性是取自于消息header属性的,当消息header的值与队列绑定时指定的值相同时,消息就会路由至相应的队列中。

      2.2、虚拟机(virtual hosts)

        AMQP使用了虚拟机的概念,在一个broker上面划分出多个隔离的环境(各环境下的用户、交换器以及队列等互不影响)。这样一来,AMQP客户端们在进行连接的时候,需要协商指定同一个vhost才能进行正常的往来业务。

    3、spring-boot-starter-amqp 的使用

      1、BeanConfig

    @Configuration
    public class AmqpConfig {
    
        @Resource
        private RabbitTemplate rabbitTemplate;
    
        @Bean
        public AmqpTemplate amqpTemplate() {
            Logger log = LoggerFactory.getLogger(RabbitTemplate.class);
            rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
            rabbitTemplate.setEncoding("UTF-8");
            rabbitTemplate.setMandatory(true);
            rabbitTemplate.setUsePublisherConnection(true);
            rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
                if (ack) {
                    log.info("消息发送到exchange成功,id: {}", correlationData.getId());
                } else {
                    log.info("消息发送到exchange失败,原因: {}", cause);
                }
            });
            rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
                String correlationId = message.getMessageProperties().getCorrelationId();
                log.info("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {}  路由键: {}", correlationId, replyCode, replyText, exchange, routingKey);
            });
            return rabbitTemplate;
        }
    
        /**
         * 修改好友备注交换机
         */
        @Bean(AmqpConstant.MODIFY_FRIEND_REMAKE_NAME_DIRECT_EXCHANGE)
        public Exchange modifyFriendRemakeNameDirectExchange() {
            return ExchangeBuilder
                    .directExchange(AmqpConstant.MODIFY_FRIEND_REMAKE_NAME_DIRECT_EXCHANGE)
                    .durable(true)
                    .build();
        }
    
        /**
         * 队列
         */
        @Bean(AmqpConstant.MODIFY_FRIEND_REMAKE_NAME_QUEUE)
        public Queue modifyFriendRemakeNameQueue() {
            return QueueBuilder.durable(AmqpConstant.MODIFY_FRIEND_REMAKE_NAME_QUEUE).build();
        }
    
        /**
         * 队列 关联 路由key 关联 交换机
         * @return
         */
        @Bean
        public Binding modifyFriendRemakeNameBinding(Queue modifyFriendRemakeNameQueue, Exchange modifyFriendRemakeNameDirectExchange) {
            return BindingBuilder
                    .bind(modifyFriendRemakeNameQueue)
                    .to(modifyFriendRemakeNameDirectExchange)
                    .with(AmqpConstant.MODIFY_FRIEND_REMAKE_NAME_ROUTING_KEY)
                    .noargs();
        }
    
    
    }

      2、AmqpConstant

    public interface AmqpConstant {
    
        /**
         * 修改好友备注交换机
         */
        String MODIFY_FRIEND_REMAKE_NAME_DIRECT_EXCHANGE = "modifyFriendRemakeNameDirectExchange";
    
        /**
         * 修改好友备注队列
         */
        String MODIFY_FRIEND_REMAKE_NAME_QUEUE = "modifyFriendRemakeNameQueue";
    
        /**
         * 修改好友备注路由键
         */
        String MODIFY_FRIEND_REMAKE_NAME_ROUTING_KEY = "modifyFriendRemakeNameRoutingKey";
    
    
    }

     3、RabbitUtils 

    public class RabbitUtils {
    
        private static final Logger logger = LoggerFactory.getLogger(RabbitUtils.class);
    
       
        public static void sendModifyRemakeNameMsg(RabbitTemplate rabbitTemplate, RabbitFriend rabbitFriend){
            try{
                if(rabbitFriend.getLists().size() < 1){
                    return;
                }
                logger.info("sendModifyRemakeNameMsg request:"+JSON.toJSONString(rabbitFriend));
                CorrelationData correlationDataId = new CorrelationData(UUID.randomUUID().toString());
                rabbitTemplate.convertAndSend(
                        AmqpConstant.MODIFY_FRIEND_REMAKE_NAME_DIRECT_EXCHANGE,
                        AmqpConstant.MODIFY_FRIEND_REMAKE_NAME_ROUTING_KEY,
                        rabbitFriend,
                        correlationDataId
                );
            }catch (Exception e){
                logger.info("mq消息发送错误",e);
            }
        }
    }

       4、生产者 

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class RabbitTest {
    
        @Autowired
        RabbitTemplate rabbitTemplate;
    
    
        @Test
        public void test1(){
    
    
            RabbitFriend rabbitFriend = new RabbitFriend();
            rabbitFriend.setUserId(100000022);
            rabbitFriend.setLists(new ArrayList<>());
            rabbitFriend.getLists().add(new RabbitNickName("2637845647931","2637845647931123哈哈"));
            rabbitFriend.getLists().add(new RabbitNickName("2637","2637哈哈"));
    
            RabbitUtils.sendModifyRemakeNameMsg(rabbitTemplate,rabbitFriend);
    
            try {
                int read = System.in.read();
            } catch (IOException e) {
                e.printStackTrace();
            }
            System.out.println("end!");
        }
    }

       5、消费者

    @Component
    public class RemakeNameMessageListener {
    
        private static final Logger log = LoggerFactory.getLogger(RemakeNameMessageListener.class);
    
        
        @RabbitListener(queues = AmqpConstant.MODIFY_FRIEND_REMAKE_NAME_QUEUE)
        public void remakeNameQueueListener(Message message, Channel channel) throws IOException {
            try {
                RabbitFriend rabbitFriend = RabbitUtils.buildMessage(message, RabbitFriend.class);
                log.info("RemakeNameMessageListener|remakeNameQueueListener,correlationDataId:{},RabbitFriend:{}", message.getMessageProperties().getCorrelationId(), rabbitFriend);
                
                //do you things
        
    
                log.info("RemakeNameMessageListener|remakeNameQueueListener,Consumption of success");
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
            } catch (Exception e) {
                log.error("RemakeNameMessageListener|remakeNameQueueListener,Consumption of failed,cause:{}", e.getMessage());
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, true);
            }
        }
    
    
    }        

       6、springboot 配置文件

    spring:
      rabbitmq:
        host: 192.168.4.21
        port: 5672
        username: sasa
        password: sasa
        publisher-confirms: true  #手动确认
        publisher-returns: true
        virtualHost: /        #虚拟机路径
        template:
          mandatory: true
          retry:
            enabled: true       #重试
            multiplier: 2       #重试次数
        
  • 相关阅读:
    狡猾的商人
    差分约束系统
    【模板】负环
    关于Java8的精心总结
    rabbitmq+sleuth+zinkip 分布式链路追踪
    Linux下一只五颜六色的「猫」
    整理 Linux下列出目录内容的命令
    从封装变化的角度看设计模式——组件协作
    从封装变化的角度看设计模式——接口隔离
    从封装变化的角度看设计模式——对象创建
  • 原文地址:https://www.cnblogs.com/chen--biao/p/11606575.html
Copyright © 2011-2022 走看看