zoukankan      html  css  js  c++  java
  • 消息队列的对比调研

    消息队列的对比调研

    2017.03.03 16:17* 字数 2593 阅读 6423评论 3

    我们发现Redis的作者出了一个新的消息队列系统Disque,我做了一点调研来决定我们使用哪种消息队列,主要对比了Disque、Kafka和RocketMQ。

    • Disque的特性
    • 消息发送可以选择至少一次或者最多一次。
    • 消息需要消费者确认。
    • 如果没有确认,会一直重发,直至到期。确认信息会广播给拥有消息副本的所有结点,然后消息会被垃圾收集或者删除。
    • 队列是持久的。(需要开启aof)
    • Disque默认只运行在内存里,持久性是通过同步备份实现的。
    • 队列为了保证最大吞吐量,不是全局一致的,但会尽力提供排序。
    • 在压力大的时候,消息不会丢弃,但会拒绝新的消息。(kafka在消息大量积压时,会直接丢弃新消息)
    • 消费者和生产者可以通过命令查看队列中的消息。
    • 队列尽力提供FIFO。(kafka可以保证顺序)
    • 一组master作为中介,客户端可以与任一结点通信。
    • 中介有命名的队列,无需消费者和生产者干预。
    • 消息发送是事务性的,保证集群中会有所需数量的副本。
    • 消息接收不是事务性的。
    • 消费者默认是接收时是阻塞的,但也可以选择查看新消息。
    • 生产者在队列满时发新消息可以得到错误信息,也可以让集群异步地复制消息。
    • 支持延迟作业,粒度是秒,最久可以长达数年。但需要消耗内存。(kafka不支持,这个功能用来做分布式定时任务系统很不错)
    • 消费者和生产者可以连接不同的结点。

    Disque的一些不足
    最近一次提交时间:Apr 29, 2016(调研时间2017-2-21)
    社区并不活跃,网上可以查到的资料较少
    C编写,我们的技术栈为go和c#,我花了一些时间阅读了资料,并做了大量测试,也阅读了一些源码,在一定程度上弥补了上述问题
    由于运行在内存,获得大吞吐量的同时,失去了保存大量消息的能力

    ** Disque的消息存储**
    一条消息称为一个job,job会根据配置的副本数量分布在多个节点的内存中,队列信息是每个节点单独存储的
    存储job使用的跳表

    • Kafka的特性和优势
    • 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间复杂度的访问性能。
    • 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条以上消息的传输。
    • 支持Kafka Server间的消息分区,及分布式消费,同时保证每个Partition内的消息顺序传输。(这里是个很讨巧的做法,topic下所有消息顺序传输几乎不能实现;相比之下Disque更加简单暴力,直接宣称不保证顺序)
    • 同时支持离线数据处理和实时数据处理。
    • Scale out:支持在线水平扩展。
    • 最近一次提交时间:Feb 20, 2017(调研时间2017-2-21)
    • 社区非常活跃,网上资料很多,众多公司在使用

    Kafka的消息存储

    partition简单图示.png

    物理存储简单图示.png

    RocketMQKafka的一些对比
    RocketMQ使用Java实现,kafka使用Scala实现
    性能上kafka略高,可用性和数据可靠性都差不多(因为存储模型差不多)
    RocketMQ支持事务消息(这里的事务消息存在缺陷,而且在分布式事务中)
    RocketMQKafka主要在这几个方面:消息的存储、Prdocuer端的服务发现、消费offset的存储、consumer负载均衡、Name Server和ZooKeeper
    总体来说,RocketMQKafka比较类似,RocketMQ数据安全性稍好,kafka性能稍好;RocketMQ不需要zookeeper,但是同样需要额外的机器来部署他的Name Server
    另外,RocketMQ有延时消费功能

    • RocketMQDisque的对比

    • 在消息量大时, RocketMQ在成本上具有优势;消息量小时,Disque具有优势

    • 性能: Disque > RocketMQ

    • 数据安全性: RocketMQ > Disque

    • 都有延时消费功能

    • RocketMQ运维更加复杂,需要额外部署Name Server集群

    • Disque客户端对go的支持更好

    • 一些汇总

    • 数据安全性: RocketMQ > Kafka > Disque

    • 吞吐量:Disque > Kafka > RocketMQ

    • 机器成本(数据量大时):Disque > RocketMQ ≈ Kafka

    • 机器成本(数据量小时): RocketMQ ≈ Kafka > Disque

    • Kafka和RocketMQ支持1对多广播,Disque不支持

    • Disque和RocketMQ支持延时消费, Kafka不支持

    • 我们的现状:

    • 数据量不大

    • 需要一个定时任务系统,但是并没有太多开发力量自己开发

    • 没有部署Zookeeper,使用的consul做服务注册和发现等

    我们的选择:Disque

    • 数据量不大,所以Disque消息积压能力弱,并不会引发问题

    • 超高的并发能力在大促时可以发挥力量

    • 有延时消费的功能,可以用来做定时任务

    • 运维比较简单

    • Disque集群时需要注意的点

    • 只要集群中有一个节点失效,则 ADDJOB topic message timeout 这种默认命令会返回失败,必须使用 ADDJOB topic message timeout REPLICATE n RETRY m 这种带有 REPLICATE 和 RETRY 的命令

    • 在使用 REPLICATE 参数时,必须同时有 RETRY 命令,否则 REPLICATE 参数将失效,并被固定为1

    • 如果生产者和消费者连接不同的节点,如果消费者在 GETJOB 后不立即 ACKJOB,则会产生重复消费

    • 如果生产者和消费者连接不同的节点,消费者会产生比较大的延迟,经常发生生产者发送消息几秒之后消费者才收到消息

    • 这些命令将可能会返回错误的值:QLEN、QPEEK、DEQUEUE

    测试数据

    • 测试用的我的开发机mac book air,处理器:1.6 GHz Intel Core i5,内存:4 GB 1600 MHz DDR3

    • Replicate=2,3节点集群(各自占用一个核),20个topic,每个topic 2w消息,每条消息12B(因为我的机器内存比较小...)
      读写同时进行读写比1:1,QPS:1.8w 对应单机时QPS:2.3w,每条消息1KB
      读写同时进行读写比1:1,QPS:2w (开启aof:1.7w) 对应单机时QPS:2.7w
      只发消息:QPS:1w3 (开启aof:1w) 对应单机是QPS:3.2w
      只收消息:QPS:0.8w 对应单机QPS:1.8w

    • EZMqClient

    • EZMqClient是一组接口,用于操作远程消息队列,目前完成了Disque的实现版本。这个实现版本在disque-go的基础上封装而成。解决了超时后error和message同时为nil的bug,解决了有时会panic的问题,封装了使用细节,增加消费监听接口。

    • 封装了Disque的配置细节,结合EZMqClient,可使Disque集群拥有高可用、可热扩展的能力

    • 同时提供消息广播和定时任务接口(这里在我有空时会再更新一篇博客来说明我的实现方式)

    下面是EZMqClient的使用示例:

    // EZMqClient
    mqClient = ezlib.NewDisqueClientWithDefaultConf(conf.MqServer, conf.MqClient)
    
    // 添加消息监听
    mqClient.AddConsumeListener("testTopic", func(job *ezlib.EZJob) {
        fmt.Printf("consume message: %v
    ", job)
    })
    // 发送消息
    mqClient.Push("testTopic", "bbbbbb")
    // 发送延时消息,发送后,消息队列会在delay之后将消息推送给消费者
    mqClient.PushDelay("testDelayTopic", "aaaaaaaaaaaaaaa", 15*time.Second)
    
    // 定时任务,你可以在多个节点启动相同的定时任务,系统会保证只有一个节点去执行任务,所以只需要有一个节点存活,这个定时任务就可以执行
    mqClient.AddCrontab("testCrontab", "0/10 * * * * *", func(taskTime time.Time) {
        fmt.Printf("crontab taskTime:%v now:%v
    ", taskTime, time.Now())
    })
    
    ///////////////   消息广播  /////////////////////////
    // 增加广播消息监听,同一个topic下的不同的group都会收到广播消息,同一个group中只有一个节点会收到消息
    mqClient.AddBroadcastListener("broadcastNameTest", "broadcastNameTestGroup1", func(msg string) { fmt.Printf("broadcast1:%v
    ", msg) })
    // 发送广播消息
    mqClient.PushBroadcast("broadcastNameTest", "hahahahaha")
    // 发送广播延时消息
    mqClient.PushBroadcastDelay("broadcastNameTest", "ooooooooooo", 10*time.Second)
    

    Disque热扩展
    热扩展:Disque在一个节点内存不足时,收到新的Job,会将这个Job转发给其他内存足够的节点;热扩展只需增加节点,并且通过Disque提供的客户端将新启动的节点加进集群即可

    Disque如何实现高可用
    高可用: EZMqClient 会为每个addjob操作增加参数指定副本数量,并且在getjob成功之后调用ackjob,以此保证“至少一次”的消息消费;当有节点失效时,只要一个job的多个副本不是都在那些失效节点上时,则job不会丢失,整个集群正常工作
    目前打算设置副本数量为2,集群物理机3台,由于Disque单线程,一台物理机可以启动多个Disque实例,但需要注意job的2个副本不可处于同一台物理机,否则这台物理机失效时将导致job丢失,考虑到Disque的吞吐量完全足够,而且Disque无法保证job的2个副本所在的节点一定会分布在不同的物理机上,所以单机启动一个Disque实例就可以了,可以容忍集群中1台物理机的挂机

    阅读Disque源码的一些建议
    Disque 大量重用了 Redis 的底层代码, 比如数据结构部分、事件部分、网络通信部分、服务器主循环部分等等。
    Job会根据命令指定的副本数量存放在多个节点中,Queue底层为跳表,客户端addjob时连接哪个节点则在哪个节点建立Queue(这个节点没有这个topic的Queue时),副本传播到的节点只会存储Job副本不会建立Queue
    只有同一个topic的所有job都在同一个节点的同一个Queue中时,才能保证顺序(但由于网络延迟,消费者也不一定会按顺序接收到消息),其他情况都无法保证顺序(这里比较复杂,想要详细了解的同学请自行阅读源码并实际操作尝试)
    如果对Redis源码有阅读过的同学,可以只需要阅读Disque的job、queue、cluster、disque* 这几部分的源码

  • 相关阅读:
    多线程中sleep方法,简单介绍。
    线程终止的四种方式,interrupt 方法使用的简单介绍。
    线程的生命周期 介绍
    线程池之 newSingleThreadExecutor 介绍
    python 中 *args he **kwargs的区别
    转载:创业者和工作谈的是一场永不分手的虐恋
    给自己一份勇气,勇敢的面对生活
    做一面锃亮的镜子吧
    与人交往时关注内容而不是表情
    最近比较需要正能量:经典励志人生感悟的句子
  • 原文地址:https://www.cnblogs.com/yanzi-meng/p/9492901.html
Copyright © 2011-2022 走看看