zoukankan      html  css  js  c++  java
  • rabbitmq 详解

    mq

    消息队列 先进先出

    1.为什么要使用mq?

    异步 削峰 解耦

    1.流量削峰

    使用消息队列做一个缓冲

    2.应用解耦

    可以解决系统之间的调用问题。如果物流系统出现故障,需要几分钟修复,通过消息队列作为中间件,在这几分钟内,物流系统要处理的内存被缓存在消息队列中,用户可以正常下单。

    缺点

    3.异步处理

    A调用B 只需要监听b处理完成的消息,B处理完成之后,会发送一条消息给MQ ,MQ会将这条消息转给A服务。

    mq的种类

    ActiveMQ

    单机吞吐量高 时效性ms级,可用性高,消息可靠性高

    官方社区对其维护越来越少,高吞吐量场景较少使用

    Kafka

    大数据领域内的消息传输 百万级别吞吐量 

    优点 吞吐量高 时效期ms级,分布式 少数机器宕机,不会导致不可用,消息有序,能保证所有消息被消费且只能消费一次 在日志领域比较成熟

    主要用于大数据领域的实时计算以及日志采集

    缺点:消息失败不支持重试 单机超过64个分区,load(CPU)会发生明显的飙高

    采用短轮询方式,实时性取决于轮询间隔时间

    一台代理宕机,会产生乱序 

     

    Rocketmq

    订单 交易 充值 日志流式处理

    优点:单机吞吐量十万级 可用性高 分布式  消息可以做到0丢失 扩展性好 支持大量数据的数据堆积

    缺点;支持语言少 支持java和c++

     

    Rabbitmq

    由于erlang的高并发性,吞吐量到万级,支持多种语言,开源,提供了管理页面,社区活跃性高

    缺点;商业版需要收费

    mq的选择

    Kafka 大量数据的互联网公司

    Rocketmq 金融互联网

    Rabbitmq 中小型公司

     

    Rabbitmq

    接收 存储 转发消息

    Rabbitmq

    接收 存储 转发消息

    生产者 交换机  队列  消费者

     

    六大模式

    简单模式 工作模式  发布订阅模式  路由模式  主题模式 发布确认模式

    Broker 接收和分发消息的应用 mq的服务器 

    -exchange

       -quenue

    Channel 信道

    连接里面多个信道 减少建立连接的开销

    Broker 里面有多个virtual host  每个用户在自己的vhost创建exchange/queue 

     

    简单模式

    一个消费者  mq  一个生产者 

    工作模式

    工作队列的主要思想是避免立即执行资源密集型任务,而不得不等待它完成,相反我们安排任务在之后完成。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程,这些工作线程将一起处理这些任务。

    生产者大量发消息给队列,造成很多消息停留在队列中,无法进行及时处理。通过多个工作线程,采用轮询的方式来处理。

    消费者-》多个工作线程。轮询 竞争关系

    一个消息只能被处理一次 不可以处理多次

     

    消息应答

    问题:

    某个消费者处理一个长的工作任务并且仅完成了部分就突然挂掉了。rabbitmq一旦向消费者发送了某条消息,就立即将消息设置为删除。这种情况下,我们将会丢失正在处理的消息,以及后续发送给该消费者的消息,它将无法接收。

    消息应答:消费者接收到消息并处理完消息之后,告诉rabbitmq消息已经处理了,rabbitmq可以把消息删除了。

    自动应答

    高吞吐量和数据传输安全要有保证

    手动应答 

    手动应答的方法

    basicAck 肯定确认(如果批量应答 是否批量 true)

    basicNack 否定确认 比另一个多一个参数。是否批量

    basicReject  否定确认

    批量应答 最好别  multiple 

     

    消息的自动重新入队

    消息未发送ACK确认 会重新入队 rabbitmq会安排另一个消费者处理

     消息手动应答时是不丢失的 放回队列中重新消费

     

    //手动应答

    channel.basicAck(message.getEnvelope().getDeliveryTag(),false);

     

    rabbitmq持久化

    确保消息不丢失 队列和消息持久化 

    1.队列持久化 durable true 

     boolean durable=true;

    channel.queueDeclare(normal_queue,durable,false,false,arguments);

    队列不是持久化的 需要把原来的队列先删除掉 或者重新创建一个持久化的队列 不然会报错

    2.消息持久化

    生产者发消息时通知mq消息持久化 MessageProperties.PERSISTENT_TEXT_PLAIN

    channel.basicPublish("",task_queue_name, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes(“UTF-8"));

     

    不公平分发

    channel.basicQos(1);

    消费者接收消息之前设置不公平分发

     

    预取值

    指定消费者分多少条消息   

    prefetchCount 5

    prefetchCount 2

    channel.basicQos(prefetchCount);

    如果超过7条 按照不公平分发

     

    发布确认原理

    生产者

    设置队列必须持久化

    设置要求队列中的消息必须持久化

    发布确认mq 把消息保存到磁盘上 ,保存成功后 通知生产者

     1) 单个确认发布

      发布速度特别慢 如果没有确认发布的消息就会阻塞后续所有消息的发布

    channel.confirmSelect();

    channel.waitForConfirms() //

     2) 批量确认发布

    当发生故障导致发布出现问题时,不知道是哪个消息出现问题了。

     3) 异步确认发布.  利用回调函数 保证是否投递成功

    如何处理异步确认中确认失败的消息?把未确认的消息放到一个基于内存的能被发布线程访问的队列  

    //异步确认
        public static void publishMessageAsync() throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            String queueName = UUID.randomUUID().toString();
            channel.queueDeclare(queueName,true,false,false,null);
            //发布确认
            //线程安全有序的一个哈希表 适用于高并发的情况下
            /**
             * 1.轻松将序号和消息关联
             * 2。轻松批量删除条目 只要给到序号
             * 3。支持高并发
             */
            ConcurrentSkipListMap<Long,String> outstandingConfirms =new ConcurrentSkipListMap<>();
            //消息确认成功
            ConfirmCallback ackCallback=(deliveryTag, multiple) -> {
                //删除掉已经确认成功的消息
                if(multiple){
                    ConcurrentNavigableMap<Long, String> confimed= outstandingConfirms.headMap(deliveryTag);
                    confimed.clear();
                    //批量
                }else{
                    outstandingConfirms.remove(deliveryTag);
                    //单个
                }
    
                System.out.println("确认成功的消息"+deliveryTag);
            };
            //消失确认失败
            ConfirmCallback nackCallback=(deliveryTag, multiple) -> {
                String message = outstandingConfirms.get(deliveryTag);
    
                System.out.println("确认失败的消息"+message);
            };
    
            channel.confirmSelect();
    
            //准备消息的监听器
            channel.addConfirmListener(ackCallback,nackCallback);
            int batch =1000;
            long start = System.currentTimeMillis();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String message=i+"";
                channel.basicPublish("",queueName,null,message.getBytes());
                //记录下所有的消息
                outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
    
            }
            long end = System.currentTimeMillis();
            System.out.println("发布1000个单独确认需要时间:"+(end-start));
        }
    

     

    交换机

    发布订阅模式  一个消息想被多个消费者消费 

    生产者-消息- 交换机 -routingkey-队列- 消息只能被消费一次-消费者

                                  -routingkey-队列-消息只能被消费一次 -消费者

     

    生产者只能将消息发送到交换机,交换机一方面接收来自生产者的消息,另一方面将它们推入队列,

     

    exchange

    直接direct 主题topic 标题headers(不常用) 扇出fanout

    “”表示无名或者默认交换机

    routingkey 绑定key 指定交换机

     

    临时队列

    不带有持久化 名字是随机的 队列 一旦断开了消息者的队列,队列将被自动删除。

    String queueName=Channel.queueDeclare().getQueue();

     

    绑定 (交换机 queue 之间的桥梁)

    通过routingkey进行绑定

    通过routing key 区分不同的队列

     

    1)Fanout(广播) 发布订阅模式。 扇出

    将接收到的所有的消息广播到所有队列中

    绑定交换机和队列

     

    Channel.queueBind(queueName,Exchange_NAME,“”);//第三个参数 routingKey

    两个队列的Routingkey相同 将都接收到消息

     

    2)direct交换机 路由模式  routingkey模式

    声明队列的时候 指明交换机是direct类型

    生产者-消息- 交换机 -routingkey-队列- 消息只能被消费一次-消费者

                                  -routingkey-队列-消息只能被消费一次 -消费者

    routingkey相同是扇出交换机 不同是直接交换机

    direct_logs   交换机->console 队列 ->nfo                routingkey

                              - >console 队列->warming(多重绑定)

                                   ->disk 队列->error

    一个队列,拥有多个routing key 多重绑定

    谁能接收到消息 完全取决于rouingkey

    routingkey->info console接收

    routingkey-> warming console接收

    routingkey->error  disk接收

     

    3)topic交换机 

    Routingkey不同,直接交换机只能给一个队列发消息

    主题交换机的routing key 必须是一个单词列表 以点号分割

     “quick,orange.rabbit” 不能超过255个字节

    (*.orange.*)匹配三个单词中间是orange

    lazy.#)#匹配多个单词

     

    当一个队列绑定键是#,那么这个队列将接收所有的数据,类似于fanout

    当队列绑定键中没有#和*出现,那这个队列绑定类型类似于direct

     

    死信队列

    消息无法被消费

    某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信

    应用场景:为了保证订单业务的消息数据不丢失,需要用到死信队列机制。

    当消息发生异常,将消息投入死信队列。

    支付超时未付款订单会自动失效

     

    死信的来源

    消息ttl过期(存活时间)

    队列达到最大长度

    消息被拒绝

     

    生产者 -普通交换机 type=direct->普通队列—-》c1

                                            |

                                     消息TTL过期

                                     队列达到最大长度   成为死信

                                    消息被拒绝

                                          

                              死信交换机 type=direct

                                            |

                                 死信队列———》c2

     

     

    死信消息

    //通过参数转发消息到死信队列

    HashMap<String, Object> arguments = new HashMap<>();
    //过期时间. ms
    arguments.put("x-message-ttl",100000);
    //正常队列设置死信交换机
    arguments.put("x-dead-letter-exchange",dead_exchange);
    //死信routingkey
    arguments.put("x-dead-letter-routing-key","lisi");
    channel.queueDeclare(normal_queue,false,false,false,arguments);ggu p

    设置过期时间:

    一种在普通队列 设置过期时间

    另一种在生产方发消息时设置过期时间。(比较灵活 可以随意修改过期时间)

     

    发消息时设置过期时间:props 设置死信消息的过期时间

    AMQP.BasicProperties properties =new AMQP.BasicProperties().builder().expiration("1000").build();

     

    设置死信队列的长度

    arguments.put("x-max-length",6);

    超过部分会成为死信消息

     

    消息被拒绝 指定某条消息被拒绝

    需要开启手动应答

    if(message.equals(“info5")){

    channel.basicReject(message.getEnvelope().getDeliveryTag(),false);//(消息的标志,是否放回队列)

    }

     

    延迟队列(死信队列中ttl过期)

    队列内部是有序的 

    在某个事件发生之前或者之后的指定时间完成某一项任务

    订单十分钟内未支付则关闭

     

    整合springboot:跳过

     

    延迟队列:延迟指定时间消费消息

     

    优化

    每新增一个时间需求,就要新增一个队列

    QA QB指定了过期时间。【QC不指定过期时间 没设置ddl时间】

    解决方法:发送消息的时候设置过期时间

    rabbitTemplate.convertAndSend(“X”,”XC”,message,msg->{
    msg.getMessageProperties().setExpiration(ttlTime);
    return msg;
    });
    

    基于死信的延迟存在问题:

    发送多条信息会排队,rabbitmq只会检查第一个队列,如果第一个消息的延时时长很长,第二个消息的延时时长很短,第二条消息并不会得到优待。

     

    基于延迟消息插件的延迟队列:延迟交换机  x-delayed-message

     

    生产者 -》延迟交换机 -〉队列 -》消费者 

     

    声明一个延迟交换机 基于插件的延时队列

    public CustomExchange delayedExchange(){
        Map<String,Object> arguments =new HashMap<>();
         arguments.put("x-delayed_type","direct");
         return new CustomExchange(delayed_exchange_name,"x-delayed-message",true,false,arguments);
    
    }
    

    延时队列:使用rabbitmq实现延时队列可以很好的利用rabbitmq的特性,消息的可靠发送,可靠投递,利用死信队列保证消息至少被消费一次 以及未被正确处理的消息不会被丢弃。

    Rabbitmq集群的特性 可以解决单点故障的问题 不会因为单个节点挂掉导致延时队列不可用或者消息丢失。

  • 相关阅读:
    【java框架】SpringBoot(3) -- SpringBoot集成Swagger2
    【java框架】SpringBoot(2) -- SpringBoot主要注解说明
    【java框架】SpringBoot2(1) -- SpringBoot2入门及基础配置
    【java框架】MyBatis-Plus(1)--MyBatis-Plus快速上手开发及核心功能体验
    UUID随机验证码
    MySQL汇总
    使用waitgroup在循环中开Goroutine处理并发任务
    使用Go处理SDK返回的嵌套层级数据并将所需字段存入数据库(一)
    Go时间相互转换的处理
    go常用操作
  • 原文地址:https://www.cnblogs.com/yxj808/p/15437725.html
Copyright © 2011-2022 走看看