zoukankan      html  css  js  c++  java
  • 消息队列&Celery&RabbitMQ&zeromq

    一、消息队列

    什么是消息队列?

    “消息队列”是在消息的传输过程中保存消息的容器。
    “消息”是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串;也可以更复杂,可能包含嵌入对象。
    消息被发送到队列中。“消息队列”是在消息的传输过程中保存消息的容器。消息队列管理器是消息从它的源传输到它的目标时充当中间人。队列的主要目的是提供路由并保证消息的传递;如果发送消息时接收者不可用,消息队列会保留消息,直到可以成功地传递它。

    为什么使用消息队列?

    主要原因是由于在高并发环境下,由于来不及同步处理,请求往往会发生堵塞,比如说,大量的insert,update之类的请求同时到达MySQL,直接导致无数的行锁表锁,甚至最后请求会堆积过多,从而触发too many connections错误。通过使用消息队列,我们可以异步处理请求,从而缓解系统的压力。

    总结:消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等

      

    消息队列特点:

    • 采用异步处理模式:消息发送者可以发送一个消息而无须等待响应。消息发送者将消息发送到一条虚拟的通道队列)上,消息接收者订阅监听该通道。一条信息可能最终转发给一个或多个消息接收者,这些接收者都无需对消息发送者做出同步回应。整个过程都是异步的

    • 应用系统之间解耦合:

      • 发送者和接受者不必了解对方、只需要 确认消息
      • 发送者和接受者 不必同时在线

      比如在线交易系统为了保证数据的最终一致,在支付系统处理完成后会把支付结果放到消息中间件里,通知订单系统修改订单支付状态。两个系统是通过消息中间件解耦的。

    应用场景: 

    • 异步处理,举个栗子:现有用户注册模块,需要同时完成写入注册数据至数据库、发送激活邮件、发送短信验证码。实现包括:串行方式、并行方式
      • 串行方式:先将注册信息写入数据库成功后,再发送激活邮件,最后发送短信验证码。以上三个任务依次全部完成后,返回给客户。
      • 并行方式:先将注册信息写入数据库成功后,发送注册邮件的同时,一起发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间。
    • 应用解耦,举个栗子:现有用户下单模块,当用户下单后,订单系统需要通知库存系统。传统的做法是,在订单系统调用库存系统的接口。假如库存系统无法访问,则订单系统减库存将失败,从而导致订单失败,订单系统与库存系统耦合度过高。
      • 订单系统:用户下单后,在订单系统中将调用库存系统接口的操作放入到消息队列,订单系统中不再阻塞等待库存系统的返回结果。并将订单下单成功返回给用户。

      • 库存系统:在消息队列中订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作。

      • 假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦。

    • 流量削锋,一般在秒杀或团抢活动中使用广泛。举个栗子:现有一个秒杀活动,一般会因为流量过大、暴增而导致应用挂掉。为解决这个问题,一般需要将用户请求加入消息队列,达到控制活动的人数,可以缓解短时间内高流量压垮应用。消息通讯
      • 服务器接收用户请求后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。
      • 秒杀业务根据消息队列中的请求信息,再做后续处理。

    • 消息通讯
      • 点对点通讯:客户端A和客户端B使用同一队列,进行消息通讯;

    • 聊天室通讯:客户端A,客户端B,客户端N订阅同一主题,进行消息发布和接收。实现类似聊天室效果。
     

    消息队列的缺点:

    系统可用性降低:系统引入的外部依赖越多,越容易挂掉,假如BCD系统现在都要调用系统A,为了使应用之间解耦合,使用了消息队列MQ,但是有一个问题是如果MQ挂掉,整个系统就都不能使用了。

    系统复杂性提高:硬生生加个MQ进来,无法保证消息没有被重复消费,也无法自动解决消息丢失的情况,消息传递的顺序性也没办法保证。

    一致性问题:由于后台任务是定期对消息队列中的消息进行处理,因而触发的时机是不可预测的。生产者将任务发布于消息队列中,然后再由消费者订阅任务进行处理,但生产者将任务存放后不再关系其执行结果是否成功,而是直接返回成功,如果执消费者处理任务失败了,就造成了数据的不一致。

    消息队列的传输模型

    • 点对点模型:用于消息生产者消息消费者之间点到点的通信。消息生产者将消息发送到由某个名字标识的特定消费者。这个名字实际上对于消费服务中的一个队列Queue),在消息传递给消费者之前它被存储在这个队列中。队列消息可以放在内存中也可以持久化,以保证在消息服务出现故障时仍然能够传递消息。

      传统的点对点消息中间件通常由 消息队列服务消息传递服务消息队列消息应用程序接口 API 组成,其典型的结构如下图所示。



      • 特点:每个消息只用一个消费者;发送者和接受者没有时间依赖;接受者确认消息接受和处理成功。

    • 发布/订阅模型:支持向一个特定的消息主题生产消息。0多个订阅者 可能对接收来自 特定消息主题 的消息感兴趣。在这种模型下,发布者和订阅者彼此不知道对方,就好比是匿名公告板。这种模式被概况为:多个消费者可以获得消息,发布者订阅者 之间存在 时间依赖性。发布者需要建立一个 订阅subscription),以便能够消费者订阅。订阅者 必须保持 持续的活动状态接收消息

      在这种情况下,在订阅者 未连接时,发布的消息将在订阅者 重新连接重新发布,如下图所示:


      • 特点:每个消息可以有多个订阅者;客户端只有订阅后才能接收到消息;持久订阅和非持久订阅。

    注意:

    1. 发布者和订阅者有时间依赖:接受者和发布者只有建立订阅关系才能收到消息;
    2. 持久订阅:订阅关系建立后,消息就不会消失,不管订阅者是否都在线;
    3. 非持久订阅:订阅者为了接受消息,必须一直在线。 当只有一个订阅者时约等于点对点模式

    消息丢失的解决:

    生产者弄丢了数据(producer)生产者将数据发送到 RabbitMQ 的时候,可能数据在半路就丢失了,因为大并发写入队列导致消息丢失,网络问题啥的等等原因。 

    • RabbitMQ:1. 开启RabbitMQ提供的事务功能,在生产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后再发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,再重试发送消息;如果收到了消息,就可以提交事务channel.txCommit但开启事务太耗费系统性能不推荐。
      # 开启事务
      channel.txSelect()
      try# 发送消息到消息队列中
      except Exception as e:
          # 事务回滚
          channel.txRollback()
      else:
          # 提交事务
          channel.txCommit()
    • RabbitMQ:2. 开启 confirm 模式,生产者设置开启 confirm 模式之后,当每次写入消息时都会分配一个唯一的 id,如果消息成功写入了 RabbitMQ 中,RabbitMQ 会回传一个 ack 消息,提示这个消息写入成功。如果 RabbitMQ 没能处理这个消息,会回调生产者的一个 nack 接口,提示这个消息写入失败,可以再次重试。而且可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。

      :事务机制和 cnofirm 机制的区别:事务机制是同步的,提交一个事务之后会一直阻塞,但是 confirm 机制是异步的,发送完这个消息之后可以继续发送下一个消息,第一个写入的那个消息 RabbitMQ 接收之后,会异步回调你的一个接口通知你这个消息接收到了。所以一般在生产者这块避免数据丢失,都是用 confirm 机制的。

    • Redis:RedisPUSH/POP机制,利用redis的列表数据结构。比较好的使用模式是,生产者lpush消息,消费者brpop消息,并设定超时时间,可以减少redis的压力。这种方案相对于第一种方案数据可靠性是提高了,只有在redis宕机且数据没有持久化的情况下丢失数据,可以根据业务通过AOF和缩短持久化时间间隔来保证很高的可靠性,而且也可以通过多个client来提高消费速度。但相对于专业的消息队列来说,这种方案消息的状态过于简单(没有状态),没有ack机制,消息取出后消费失败依赖于client记录日志或者重新push到队列里面。

      注:Redis相较于rabbitMQ没有ack机制,也不能保证消息的顺序性,不适应用作于消息队列来使用。可以考虑消息中间件:Redis作者开源的Disque、阿里开源RocketMQ,以及基于Golang的nsq等,Redis更适用于存储数据。

      • 每个消费者(单线程)都有自己的单独的队列,这个队列是用rpoplpush命令从公共队列生成。然后:如果消费成功了,用rpop命令把自己的队列销毁,进行下一次循环;如果消费失败,因为是消费者私有的队列,可以自由选择如何处理,不用担心冲突;如果消费者崩溃了,在消费者启动时会检查自己的队列,把自己队列里的元素放回公共队列。在不考虑redis挂掉的情况下,这种机制保证了任务至少被消费一次。


    消息队列(Queue)弄丢了数据:  消息队列自己弄丢了数据,这此时必须开启持久化功能,就是消息写入之后会持久化到磁盘,哪怕是消息队列自己宕机了,恢复之后会自动读取之前存储的数据,一般数据不会丢。

    RabbitMQ 设置持久化有两个步骤

    • 创建 queue 时设置为持久化:这样就可以保证 RabbitMQ 持久化 queue 的元数据,但是它是不会持久化 queue 里的数据的。
    channel.queue_declare(queue='shuaigaogao', durable=True) # durable=True 持久化 
    • 发送消息时设置持久化,将消息的 deliveryMode 设置为 2,就是将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。
    channel.basic_publish(exchange="",
                          routing_key="shuaigaogao",   # queue的名字
                          body="hello world",          # body是要发送的内容
                          properties=pika.BasicProperties(delivery_mode=2,) # make message persistent=>使消息持久化的特性
                          )

      注意:必须要同时设置这两个持久化,这样 RabbitMQ 哪怕是挂了,再次重启,也会从磁盘上重启恢复 queue,恢复这个 queue 里的数据。另一种情况,已经给 RabbitMQ 开启了持久化机制,生产者发送了消息到消息队列中,并且也将其写入了 RabbitMQ 中,但是还没来得及持久化到磁盘上,结果不巧,此时 RabbitMQ 挂了,就会导致内存里的一点点数据丢失。所以,持久化可以跟生产者那边的 confirm 机制配合起来,只有当消息被持久化到磁盘之后,才会通知生产者 ack 了,所以哪怕是在持久化到磁盘之前,RabbitMQ 挂了,数据丢了,生产者收不到 ack你也是可以自己重发的。


    Redis设置持久化有两种方式:RDBAOF
     

    消费端弄丢了数据:消费者从消息队列中订阅消息时,可能刚消费到还没处理完,进程就挂了,比如宕机、死锁、网络抖动等等,但是消息队列认为你都消费了那么这数据就丢了。

    • RabbitMQ启用手动确认模式即可
      • ①自动确认模式,消费者挂掉,待ack的消息回归到队列中。消费者抛出异常,消息会不断的被重发,直到处理成功。不会丢失消息,即便服务挂掉,没有处理完成的消息会重回队列,但是异常会让消息不断重试。
      • ②手动确认模式,如果消费者来不及处理就死掉时,没有响应ack时会重复发送一条信息给其他消费者;如果监听程序处理异常了,且未对异常进行捕获,会一直重复接收消息,然后一直抛异常;如果对异常进行了捕获,但是没有在finallyack,也会一直重复发送消息(重试机制)。
      • ③不确认模式,acknowledge="none" 不使用确认机制,只要消息发送完成会立即在队列移除,无论客户端异常还是断开,只要发送完就移除,不会重发。
      • 消费者业务逻辑异常,但是未手动执行noack导致:通过noack方式来重新放入队列

         注 :消息队列通过判断consumer连接情况来判断消息是否被重新放入队列

    二、celery

    a、概念 celery是基于python实现的一个分布式任务队列框架,主要用于管理分布式任务队列、处理耗时的任务,支持使用任务队列的方式在分布的 机器/进程/线程上 执行任务调度。可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行,通常使用它实现异步任务&定时任务

    b、组件 

    • 任务(tasks)-- 用户定义的函数,用于实现应用功能,比如执行一个发送短信的耗时任务。
    • 消息中间件(Broker)-- 用于存放tasks的地方,代指任务队列本身,这个中间人需要解决的一个问题,就是可能需要存放非常非常多的tasks,而且要保证Worker能够从这里拿取,常见的有broker有Redis
    • 任务执行单元(Worker)-- 用于执行tasks,从broker中取出tasks,调用执行任务函数。

    c、具体功能 :

     任务模块 Task:包含异步任务和定时任务。其中,异步任务通常在业务逻辑中被触发并发往任务队列,而定时任务由 Celery Beat 进程周期性地将任务发往任务队列。

     消息中间件 Broker:Broker,即为任务调度队列,接收任务生产者发来的消息(即任务),将任务存入队列。Celery 本身不提供队列服务,官方推荐使用 RabbitMQ 和 Redis 等。

     任务执行单元 Worker:Worker 是执行任务的处理单元,它实时监控消息队列,获取队列中调度的任务,并执行它。

     任务结果存储 Backend:Backend 用于存储任务的执行结果,以供查询。同消息中间件一样,存储也可使用 RabbitMQ, Redis 和 MongoDB 等。

    d、底层原理

    celery架构由三个模块组成:消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

    消息中间件(Broker): 消息中间人,是任务调度队列,是一个独立的服务,是一个生产者消费之模式,生产者把任务放入队列中,消费者(worker)从任务队列中取出任务执行,任务的执行可以按照顺序依次执行也可以按照计划时间进行。但是Broker本身不提供队列服务,所以要集成第三方队列,推荐使用RatbbitMQ或Redis.

    任务执行单元(worker):即执行任务的程序,可以有多个并发。它实时监控消息队列,获取队列中调度的任务,并执行它。

    任务执行结果存储(task result store):由于任务的执行同主程序分开,如果主程序想获取任务执行的结果,就必须通过中间件存储。同消息中间人一样,存储也可以使用RabbitMQ、Redis;另外,假如不需要保存执行的结果也可以不配置这个模块。

    e、实现步骤 :

    • 创建一个 Celery 实例 
    • 启动 Celery Worker 
    • 应用程序中调用异步任务

    三、RabbitMQ 

    a、概念

    RabbitMQ 是一个由 Erlang 语言开发的,并且基于 AMQPAdvanced Message Queuing Protocol)高级消息队列协议的消息队列服务。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。默认端口5672。

    AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。

    b、特点: 

    • 可靠性(Reliability):RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。

    • 灵活的路由(Flexible Routing):在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange

    • 消息集群(Clustering):多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker

    • 高可用(Highly Available Queues):队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。

    • 多种协议(Multi-protocol):RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。

    • 多语言客户端(Many Clients):RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。

    • 管理界面(Management UI):RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。

    • 跟踪机制(Tracing):如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。

    • 插件机制(Plugin System):RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。

     
    c、组件:

    RabbitMQ是一个消息代理:它接受和转发消息。可以将其视为邮局:服务器将(生产者)将要发布的邮件放在邮箱中,RabbitMQ最终会将邮件发送给对应API(消费者)。 在这个类比中,RabbitMQ是一个邮箱,邮局和邮递员。RabbitMQ和邮局之间的主要区别在于它不处理纸张,而是接受,存储和转发二进制blob数据 - 消息。在RabbitMQ中邮箱就是一个队列,消息存储在队列当中,队列只受主机的内存和磁盘限制的约束,它本质上是一个大的消息缓冲区。许多生产者可以发送到同一个消息队列,反之也允许很多消费者从同一个队列中接收数据。

    生产者是发送消息的程序,消费者是等待接收消息的程序。生产者,消费者和代理(消息队列)不必驻留在同一主机上;

    • Broker:也称Broker/RabbitMQ Server,一种传输服务,维护一条从ProducerConsumer的路线,保证数据能够按照指定的方式进行传输。

    • Producer:消息生产者,数据的发送方。一个Message有两个部分:payload(有效载荷)和label(标签)。payload:传输的数据。labelexchange的名字或者说是一个tag,它描述了payload,而且RabbitMQ也是通过这个label来决定把这个Message发给哪个ConsumerAMQP仅仅描述了label,而RabbitMQ决定了如何使用这个label的规则。

    • Consumer:消息消费者,数据的接收方。把queue比作是一个有名字的邮箱。当有Message到达某个邮箱后,RabbitMQ把它发送给它的某个订阅者即Consumer, 也可能会把同一个Message发送给很多的Consumer。在这个Message中,只有payload,label已经被删掉了。对于Consumer来说,它是不知道谁发送的这个信息的,就是协议本身不支持。如果Producer发送的payload包含了Producer的信息就另当别论。
    • Exchange:消息交换器,用于接受、分配消息;它指定消息按什么规则、路由到哪个队列,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

    • Queue:队列,用于存储生产者的消息;保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

    • Binding:绑定,决定交换器的消息应该发送到那个队列。它的作用就是把交换器(exchange)和消息队列(queue)按照路由规则绑定起来,是基于路由键将交换器和消息队列连接起来的路由规则成为一个绑定,所以可以将交换器理解成一个由绑定构成的路由表。

    • Routing Key:路由键,用于把生成者的数据分配到交换器上;exchange根据这个关键字进行消息投递

    • BindingKey(绑定键):用于把交换器的消息绑定到队列上;
    • VHost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhostAMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。

    • Connection: 网络连接,比如一个TCP连接。ProducerConsumer都是通过TCP连接到RabbitMQ Server的,程序的起始处就是建立这个TCP连接。
    • Channel(信道):消息通道,消息推送使用的通道;在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务;多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。

    • ConnectionFactory(连接管理器)应用程序与Rabbit之间建立连接的管理器,程序代码中使用;
    • Message:  消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。

    Exchange、Queue、RoutingKey三个才能决定一个从ExchangeQueue的唯一的线路。

    为什么使用Channel,而不是直接使用TCP连接

    对于OS来说,建立和关闭TCP连接是有代价的,频繁的建立关闭TCP连接对于系统的性能有很大的影响,而且TCP的连接数也有限制,这也限制了系统处理高并发的能力。但是,在TCP连接中建立Channel是没有上述代价的。对于Producer或者Consumer来说,可以并发的使用多个Channel进行Publish或者Receive。有实验表明,1s的数据可以Publish10K的数据包。当然对于不同的硬件环境,不同的数据包大小这个数据肯定不一样,但是我只想说明,对于普通的Consumer或者Producer来说,这已经足够了。如果不够用,你考虑的应该是如何细化SPLIT你的设计。

    Exchange的分发策略:

    Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到。

    • 消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。它是完全匹配、单播的模式。
    • fanout每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。
    • topic:topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“”。匹配0个或多个单词匹配不多不少一个单词。

    四、zeromq

    a、概念

    zeromq 是一个为可伸缩的分布式或并发应用程序设计的高性能异步消息库,提供一个消息队列, 但是与面向消息的中间件不同,ZeroMQ的运行不需要专门的消息代理(message broker)该库设计成常见的套接字风格的API。zeromq 并不是类似rabbitmq消息列队,它实际上只一个消息列队组件,一个库。

    请求响应模式(Request-Reply用于将来自ZMQ_REQ客户端的请求发送到一个或多个ZMQ_REP服务,并接收对发送的每个请求的后续回复,是一种远程过程调用和任务分发模式。客户端在请求后,服务端必须回响应

    发布/订阅模式Publish-Subscribe): 从单个发布者到多个订阅者的一对多数据分发。广播所有client,没有队列缓存,断开连接数据将永远丢失。client可以进行数据过滤。

    管道模式Parallel Pipeline)以 push/pull 模式连接节点,可以有多个步骤、循环。这是一种并行的任务分发和收集模式。由三部分组成,push进行数据推送,work进行数据缓存,pull进行数据竞争获取处理。区别于Publish-Subscribe存在一个数据缓存和处理负载。当连接被断开,数据不会丢失,重连后数据继续发送到对端。

    b、代码实现

    • 请求响应模式
      # server端:
      import zmq
      
      context = zmq.Context()
      socket = context.socket(zmq.REP)
      socket.bind("tcp://*:5555")
      
      while True:
          message = socket.recv()
          print("Received: %s" % message)
          socket.send("I am OK!")
      
      
      # client端:
      import zmq
      import sys
      
      context = zmq.Context()
      socket = context.socket(zmq.REQ)
      socket.connect("tcp://localhost:5555")
      
      socket.send('Are you OK?')
      response = socket.recv();
      print("response: %s" % response)
      
      
      # 输出:
      $ python app/server.py 
      Received: Are you OK?
      
      $ python app/client1.py 
      response: I am OK!
    • 发布订阅模式
      # server端:
      import zmq
      import time
      
      context = zmq.Context()
      socket = context.socket(zmq.PUB)
      socket.bind("tcp://*:5555")
      
      while True:
          print('发送消息')
          socket.send("消息群发")
          time.sleep(1)    
      
      
      # client端1:
      import zmq
      import sys
      
      context = zmq.Context()
      socket = context.socket(zmq.SUB)
      socket.connect("tcp://localhost:5555")
      socket.setsockopt(zmq.SUBSCRIBE,'')  # 消息过滤
      while True:
          response = socket.recv();
          print("response: %s" % response)
      
      
      #client端2:
      import zmq
      import sys
      
      context = zmq.Context()
      socket = context.socket(zmq.SUB)
      socket.connect("tcp://localhost:5555")
      socket.setsockopt(zmq.SUBSCRIBE,'') 
      while True:
          response = socket.recv();
          print("response: %s" % response)
      
      
      # 输出:
      $ python app/server.py 
      发送消息
      发送消息
      发送消息
      
      $ python app/client2.py 
      response: 消息群发
      response: 消息群发
      response: 消息群发
      
      $ python app/client1.py 
      response: 消息群发
      response: 消息群发
      response: 消息群发
    • 管道模式
      # server端:
      import zmq
      import time
      
      context = zmq.Context()
      socket = context.socket(zmq.PUSH)
      socket.bind("tcp://*:5557")
      
      while True:
          socket.send("测试消息")
          print "已发送"    
          time.sleep(1)    
      
      
      # work端:
      import zmq
      
      context = zmq.Context()
      
      recive = context.socket(zmq.PULL)
      recive.connect('tcp://127.0.0.1:5557')
      
      sender = context.socket(zmq.PUSH)
      sender.connect('tcp://127.0.0.1:5558')
      
      while True:
          data = recive.recv()
          print "正在转发..."
          sender.send(data)
      
      
      # client端:
      import zmq
      import sys
      
      context = zmq.Context()
      socket = context.socket(zmq.PULL)
      socket.bind("tcp://*:5558")
      
      while True:
          response = socket.recv();
          print("response: %s" % response)
      
      
      # 输出结果:
      $ python app/server.py 
      已发送
      已发送
      已发送
      
      $ python app/work.py 
      正在转发...
      正在转发...
      正在转发...
      
      $ python app/client1.py
      response: 测试消息
      response: 测试消息
      response: 测试消息

    特性

    ActiveMQ

    RabbitMQ

    RocketMQ

    Kafka

    单机吞吐量

    万级,吞吐量比RocketMQ和Kafka要低了一个数量级

    万级,吞吐量比RocketMQ和Kafka要低了一个数量级

    10万级,RocketMQ也是可以支撑高吞吐的一种MQ

    10万级别,这是kafka最大的优点,就是吞吐量高。

    一般配合大数据类的系统来进行实时数据计算、日志采集等场景

    topic数量对吞吐量的影响

    topic可以达到几百,几千个的级别,吞吐量会有较小幅度的下降

    这是RocketMQ的一大优势,在同等机器下,可以支撑大量的topic

    topic从几十个到几百个的时候,吞吐量会大幅度下降

    所以在同等机器下,kafka尽量保证topic数量不要过多。如果要支撑大规模topic,需要增加更多的机器资源

    时效性

    ms级

    微秒级,这是rabbitmq的一大特点,延迟是最低的

    ms级

    延迟在ms级以内

    可用性

    高,基于主从架构实现高可用性

    高,基于主从架构实现高可用性

    非常高,分布式架构

    非常高,kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用

    消息可靠性

    有较低的概率丢失数据

    经过参数优化配置,可以做到0丢失

    经过参数优化配置,消息可以做到0丢失

    功能支持

    MQ领域的功能极其完备

    基于erlang开发,所以并发能力很强,性能极其好,延时很低

    MQ功能较为完善,还是分布式的,扩展性好

    功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用,是事实上的标准

    优劣势总结

    非常成熟,功能强大,在业内大量的公司以及项目中都有应用

    偶尔会有较低概率丢失消息

    而且现在社区以及国内应用都越来越少,官方社区现在对ActiveMQ 5.x维护越来越少,几个月才发布一个版本

    而且确实主要是基于解耦和异步来用的,较少在大规模吞吐的场景中使用

    erlang语言开发,性能极其好,延时很低;

    吞吐量到万级,MQ功能比较完备

    而且开源提供的管理界面非常棒,用起来很好用

    社区相对比较活跃,几乎每个月都发布几个版本分

    在国内一些互联网公司近几年用rabbitmq也比较多一些

    但是问题也是显而易见的,RabbitMQ确实吞吐量会低一些,这是因为他做的实现机制比较重。

    而且erlang开发,国内有几个公司有实力做erlang源码级别的研究和定制?如果说你没这个实力的话,确实偶尔会有一些问题,你很难去看懂源码,你公司对这个东西的掌控很弱,基本职能依赖于开源社区的快速维护和修复bug。

    而且rabbitmq集群动态扩展会很麻烦,不过这个我觉得还好。其实主要是erlang语言本身带来的问题。很难读源码,很难定制和掌控。

    接口简单易用,而且毕竟在阿里大规模应用过,有阿里品牌保障

    日处理消息上百亿之多,可以做到大规模吞吐,性能也非常好,分布式扩展也很方便,社区维护还可以,可靠性和可用性都是ok的,还可以支撑大规模的topic数量,支持复杂MQ业务场景

    而且一个很大的优势在于,阿里出品都是java系的,我们可以自己阅读源码,定制自己公司的MQ,可以掌控

    社区活跃度相对较为一般,不过也还可以,文档相对来说简单一些,然后接口这块不是按照标准JMS规范走的有些系统要迁移需要修改大量代码

    还有就是阿里出台的技术,你得做好这个技术万一被抛弃,社区黄掉的风险,那如果你们公司有技术实力我觉得用RocketMQ挺好的

    kafka的特点其实很明显,就是仅仅提供较少的核心功能,但是提供超高的吞吐量,ms级的延迟,极高的可用性以及可靠性,而且分布式可以任意扩展

    同时kafka最好是支撑较少的topic数量即可,保证其超高吞吐量

    而且kafka唯一的一点劣势是有可能消息重复消费,那么对数据准确性会造成极其轻微的影响,在大数据领域中以及日志采集中,这点轻微影响可以忽略

    这个特性天然适合大数据实时计算以及日志收集

    RocketMQ,适用于可靠性要求高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况。

    RabbitMQ,erlang语言本身的并发优势,性能较好,社区活跃度也比较高,但是不利于做二次开发和维护,适用于数据量没有那么大的场景。

    Kafka,是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,适合产生大量数据的互联网服务的数据收集业务,有日志采集功能。

    ----------------------------------------------------

    举个栗子:“P”是我们的生产者,“C”是我们的消费者。中间的框是一个队列 -  RabbitMQ代表消费者保留的消息缓冲区。

     

    生产者将消息发送到“hello”队列。使用者从该队列接收消息。

    -------------------------------------------------------------------------------

  • 相关阅读:
    MiniUI 在线示例(gridview)
    数据库的事务处理和并发控制
    数据库大并发操作要考虑死锁和锁的性能问题
    c# 财务报表数字转大写的方法
    js 实现打印功能1
    js 实现打印功能
    《C++ Primer 4th》读书笔记 序
    谈谈Vim中实用又好记的一些命令
    《Unix网络编程》卷2 读书笔记 第3章- System V IPC
    《Unix网络编程》卷2 读书笔记 第2章- Posix IPC
  • 原文地址:https://www.cnblogs.com/hsmwlyl/p/10575607.html
Copyright © 2011-2022 走看看