zoukankan      html  css  js  c++  java
  • RabbitMQ总结

    RabbitMQ

    概念

    RabbitMQ(message queue)从字面意思既可以看出属于一个消息队列,既然属于一个队列即拥有队列的特点先进先出,只不过在队列中存放的属于消息,其主要用于跨进程通信,用于在上下游之间进行通信,在常见的架构中RabbitMQ属于一种常见的逻辑解藕+物理解藕消息通信服务,在消息的上游只需要负责产生消息即可,由RabbitMQ进行存储转发消息,无需依赖其它服务。

    MQ优点

    流量削峰

    在正常的订单系统中假设我们服务器正常情况最多一秒钟可以处理一万笔订单,正常情况下我们下单基本立马产生响应的结果,但是假如此时我们处于优惠活动中,此时一秒钟拥有两万笔订单,我们的服务器此时无法处理这么多的请求会面临宕机的危险,如果此时我们使用缓冲队列,把一秒钟的订单存入RabbitMQ中进行分散,分成一段时间来进行处理,此时用户虽然在十几秒之后才可以收到响应结果,但是我们成功的完成订单支付,服务器同时正常运行,保证了业务的可靠性。

    image-20210819204838653

    应用解藕

    在电商系统中,包含物流系统,库存系统,支付系统等,在耦合调用物流系统,库存系统,支付系统等,假如任意一个系统出现问题,都会导致用户无法下单进而导致下单失败,如果使用RabbitMQ消息队列下单的方式,当某个系统出现故障,需要通过几分钟的时间进行系统修复,此时物流请求被存放在RabbitMQ队列之中,用户可以完成下单操作,当系统恢复正常,从队列之中读取消息进一步进行处理即可,在整个下单流程之中,用户是无法感知系统出现故障,可以保证业务的可靠性。

    image-20210819212228157

    异步调用

    在某些服务调用中是进行异步处理,例如A调用B服务,但是B耗时较长,同时A又需要获取B的执行结果,此时A可以主动调用B的查询API进行主动查询,或者A提供一个回调API由B执行完毕之后调用回调API进行结果通知,但是上述两种方式增加了代码复杂度,如果使用RabbitMQ可以很好的解决上述执行结果问题,当B执行完毕的时候将执行结果发送给RabbitMQ进行存储,染红RabbitMQ将消息转发给A服务,此时A可以很轻松的获取B的执行结果。

    image-20210819231158613

    核心组件

    生产者

    产生数据发送消息的程序属于生产者

    消费者

    消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者

    交换机

    接收生产者产生的消息同时将消息转发到队列中,交换机必须准确的知如何处理其接收的消息,是推送到特定的队列还是推送到多个队列或者将数据丢弃,其有交换机的类型决定

    队列

    队列是RabbitMQ的一种数据结构,数据虽然由应用程序与RabbitMQ之间进行交互,但是数据的存储则是存储在RabbitMQ队列之中,队列受主机硬盘与内存大小限制,本质属于一个消息缓冲区,生产者可以将数据发送到队列,消费者可以从队列进行数据读取。

    工作原理

    图解

    mq

    名词解释

    Broker

    接收消息和分发消息的应用,一个RabbitMQ服务就是Broker在Broker中可以包含多个交换机与队列。

    Vhost

    出于安全因素考虑把AMQP基本组件划分到虚拟分组中类似于网络中的名称空间,当多个用户连接同一个RabbitMQ服务时候,可以划分多个Vhost,每个Vhost拥有自己独立的交换机,队列等。

    Connection

    生产者与消费者与Broker之间的TCP连接

    Channel

    如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的Connection 极大减少了操作系统建立 TCP connection 的开销。

    Exchange

    消息到达Broker的第一站,根据分发规则,匹配查找routing key转发到相对应的队列中

    Queue

    存放生产者产生的数据,等待消费者读取转发数据

    Binding

    exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

    RabbitMQ安装配置

    软件下载

    wget https://github.com/rabbitmq/erlang-rpm/releases/download/v21.0.9/erlang-21.0.9-1.el7.centos.x86_64.rpm
    
    wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.9/rabbitmq-server-3.7.9-1.el7.noarch.rpm 
    

    安装

    rpm -ivh erlang-21.0.9-1.el7.centos.x86_64.rpm 
    
    yum install socat -y
    
    rpm -ivh rabbitmq-server-3.7.9-1.el7.noarch.rpm 
    

    启动

    # 配置开机自启动
    systemctl start rabbitmq-server.service && systemctl enable rabbitmq-server.service 
    
    systemctl status rabbitmq-server.service 
    

    image-20210901202451800

    WEB插件

    # 开启web插件
    rabbitmq-plugins enable rabbitmq_management
    
    # 查看监听端口
    netstat -aunpt | grep 15672
    

    image-20210901202730204

    工作模式

    hello world

    简介

    最简单的工作队列,其中一个消息生产者,一个消息处理者,一个工作队列,其中一个消费者对应一个生产者属于点对点模式,RabbitMQ就是消息代理,它接受并推动消息流动。你可以把它想象成一个邮局:当你把一封信塞进邮箱,你需要确保它能送到收信人的手里。而RabbitMQ就是一个邮箱,邮局,邮递员。不同于真实的邮局(处理信件),RabbitMQ处理接受、存储、推动消息。

    此处我们使用Python编写生产者以及消费者

    image-20210820002003294

    抽象公共代码

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
    
    # @File    :base.py
    # @Author  :SR
    # @Date    :2021/8/20 下午10:44
    import pika
    
    
    class Base:
        # 抽象公共代码 连接mq 创建信道
    
        def __init__(self, username='guest', password='guest', host='localhost', port=5672, virtual_host='/', ):
            '''
    
            :param username: rabbitmq连接用户
            :param password: rabbitmq连接密码
            :param host: rabbitmq连接地址
            :param port: rabbitmq连接端口
            :param virtual_host: 虚拟用户
            '''
            self.username = username
            self.password = password
            self.host = host
            self.port = port
            self.virtual_host = virtual_host
    
        def connection(self):
            credential = pika.PlainCredentials(self.username, self.password)
    
            parameters = pika.ConnectionParameters(host=self.host, port=self.port, virtual_host=self.virtual_host,
                                                   credentials=credential)
    
            connection = pika.BlockingConnection(parameters=parameters)
    
            return connection
    
        def channel(self):
            connection = self.connection()
    
            channel = connection.channel()
            
            return channel
    
    
    

    生产者

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
    
    # @File    :produce1.py
    # @Author  :SR
    # @Date    :2021/8/18 下午5:30
    
    import pika
    
    from .base import Base
    
    
    class Produce(Base):
    
        def producer(self):
            # 生成一个通信信道
            channel = self.channel()
    
            # 声明队列 并且设置队列持久化 防止rabbitmq重启导致队列丢失
            channel.queue_declare("hello", durable=True)
            
            # 获取mq链接
            connection = self.connection()
    
    
            # 创建消息
            for i in range(10):
                message = "hello world_%s" % i
    
                # 产生数据发送给mq
                '''
                exchange:指定交换机为空
                routing_key:指定队列 需要与上述生成的队列一致
                body:需要发送的数据
                properties:设置消息持久化 因为消息存放在内存中 当mq重启时候数据会丢失 delivery_mode = 2表示消息持久化
                '''
                
                properties=pika.BasicProperties(delivery_mode=2)
                
                channel.basic_publish(exchange='', routing_key='hello', body=message.encode(),properties=properties)
             
             # 关闭链接
    		 connection.close()
    
    
    if __name__ == '__main__':
        produce = Produce()
    
        produce.producer()
    
    

    消费者

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
    
    # @File    :consume1.py
    # @Author  :SR
    # @Date    :2021/8/18 下午5:30
    
    from .base import Base
    
    
    class Consumer(Base):
    
        def consumer(self):
            channel = self.channel()
    
            # 声明队列 并且设置队列持久化 防止rabbitmq重启导致队列丢失
            channel.queue_declare('hello', durable=True)
            
            # 使用auto_ack = False 当消息处理失败的时候会将消息返回给mq队列 防止消息丢失
            channel.basic_consume('hello', self.callback, auto_ack=False)
    
            channel.start_consuming()
    
        def callback(self, ch, method, properties, body):
            '''
            ch:生产者的管道内存地址
            method:生产者向谁发送信息,给那个队列队列发送信息
            properties:生产者发给消费者的一些属性
            body:生产者发送的消息
            '''
            print(body.decode())
    
    
    if __name__ == '__main__':
        consume = Consumer()
    
        consume.consumer()
    

    image-20210820002853125

    work queue

    简介

    在简单队列模式之中一个生产者对应一个消费者进行消息处理,如果消费者处理比较耗时的任务,需要大量的时间来进行处理完毕,显然此种应用效率低下,无法满足高并发需求。

    在工作队列模式之中,如果处于资源密集型任务,有了队列我们将任务安排在队列之后执行,我们将任务封装成消息发送到队列之中,一个工作进程就可以从队列之中读取消息进行处理,如果启动了多个进程,多个进程轮询从队列之中读取消息进行处理。

    worker_queus

    生产者

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
    
    # @File    :Produce.py
    # @Author  :SR
    # @Date    :2021/8/20 下午11:43
    import pika
    
    from .base import Base
    
    
    class Produce(Base):
    
        def producer(self):
            # 生成一个通信信道
            channel = self.channel()
            
            connection = self.connection()
    
    
            # 声明队列 并且设置队列持久化 防止rabbitmq重启导致队列丢失
            channel.queue_declare("hello", durable=True)
            
            properties = pika.BasicProperties(delivery_mode=2)
    
            for i in range(1, 11):
                message = "hello_world_%s" % i
    
                channel.basic_publish('', routing_key='hello', body=message, properties=properties)
                
            connection.close()  
                                                                                                             
    if __name__ == '__main__':
        produce = Produce()
    
        produce.producer()
    
    

    消费者

    消费者开启两个worker用来进行轮询测试

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
    
    # @File    :Consumer.py
    # @Author  :SR
    # @Date    :2021/8/20 下午11:43
    import time
    
    from .base import Base
    
    
    class Consumer(Base):
    
        def consumer(self):
            channel = self.channel()
    
            # 声明队列 并且设置队列持久化 防止rabbitmq重启导致队列丢失
            channel.queue_declare('hello', durable=True)
    
            channel.basic_consume("hello", on_message_callback=self.callback)
    
            channel.start_consuming()
    
        def callback(self, ch, method, properties, body):
    
            print("当前消息为:" + body.decode())
    		
    
    if __name__ == '__main__':
        consume = Consumer()
    
        print("worker2开始等待接收消息。。。。。")
    
        consume.consumer()
    

    image-20210821001805323

    Publish Subscribe

    简介

    在上述的工作队列中虽然一个队列可以有多个消费者执行任务,但是该种队列只能消费同一队列的任务,无法消耗多个队列的任务,在我们工作环境中可能需要同时消费多个队列来完成任务,例如我们需要完成用户注册,注册的时候需要给用户发送邮箱与手机验证码,当消费者向注册队列完成注册任务的时候,消费者开始读取注册消息,我们需要进行邮箱发送与验证码发送,如果邮箱验证码两个业务逻辑如果放在一个队列进行处理显然解藕度不高,因此我们需要不同的业务放入不同的队列进行处理,因此最终我们需要达到的一个效果是消费者通过交换机可以往多个队列发送消息,每个队列的所对应的消费者,处理该队列的任务。

    简单来说就是消费者可以通过交换机将消息发布到多个队列之中,每个队列中的消息由自己队列所对应的消费者进行监听处理。

    publish

    生产者

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
    
    # @File    :Produce.py
    # @Author  :SR
    # @Date    :2021/8/26 下午11:13
    
    import pika
    from mq_one.base import Base
    
    
    class Produce(Base):
        def producer(self):
            channel = self.channel()
    
            # 声明交换机 指定交换机类型
            channel.exchange_declare(exchange="hello", exchange_type="fanout")
    
            properties = pika.BasicProperties(delivery_mode=2)
    
            for i in range(1, 11):
                message = "hello_world_%s" % i
    
                channel.basic_publish(
                    exchange="hello",
                    routing_key="",
                    body=message,
                    properties=properties
                )
    
    
    if __name__ == '__main__':
        produce = Produce()
    
        produce.producer()
    
    

    消费者

    运行两份消费者代码

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
    
    # @File    :Consumer.py
    # @Author  :SR
    # @Date    :2021/8/26 下午11:13
    
    from .base import Base
    
    
    class Consumer(Base):
        def consumer(self):
            channel = self.channel()
    
            channel.exchange_declare('hello', exchange_type='fanout')
    
            # 不指定queue名字, rabbitmq会随机分配一个名字, 消息处理完成后queue会自动删除
            response = channel.queue_declare(queue='', exclusive=True)
    
            # 获取队列名称
            queue_name = response.method.queue
    
            # 队列与交换机绑定
            channel.queue_bind(queue=queue_name, exchange='hello')
    
            channel.basic_consume(
                queue=queue_name, on_message_callback=self.callback, auto_ack=False
            )
    
            channel.start_consuming()
    
        def callback(self, channel, method, properties, body):
            print("当前消息为:%s" % body.decode())
    
            # 手动应答
            channel.basic_ack(delivery_tag=method.delivery_tag, multiple=False)
    
    
    if __name__ == '__main__':
        consume = Consumer()
        print("worker2开始工作")
        consume.consumer()
    
    

    image-20210827212929538

    Routing

    简介

    在项目运行的时候我们可能需要收集项目运行的日志,假设我们需要收集告警(warning)与错误(error)信息,由于两个业务场景,因此为了降低耦合度需要通过不同的队列存放不同的消息来供消费者进行消费,但是我们如何保证告警信息将消息发送到告警队列错误信息将错误消息发送到错误队列呢。

    思考上述的业务场景,我们的需求是将指定的消息内容发送到指定的队列中,既然进行指定转发因此我们可以考虑路由,通过路由匹配消息进行转发,在RabbitMQ中便提供了路由模式,其回根据消息的路由键将消息转发到与路由键绑定的队列之中,相比与发布订阅此时我们不是广播发送消息,而是选择性的发送消息。

    routing

    生产者

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
    
    # @File    :Produce.py
    # @Author  :SR
    # @Date    :2021/8/26 下午11:13
    
    import pika
    from .base import Base
    
    logs = ['warning', 'error']
    
    
    class Produce(Base):
        def producer(self):
            channel = self.channel()
    
            channel.exchange_declare(exchange='logs', exchange_type='direct')
    
            properties = pika.BasicProperties(delivery_mode=2)
    
            for i in range(10):
                message = '{} server logs! {}'.format(i, logs[i % 2])
    
                channel.basic_publish(exchange='logs', routing_key=logs[i % 2], body=message, properties=properties)
    
                print(" [x] Sent: {}".format(message))
    
    
    
    if __name__ == '__main__':
        produce = Produce()
    
        produce.producer()
    

    image-20210827224010713

    消费者(warning)

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
    
    # @File    :Consumer.py
    # @Author  :SR
    # @Date    :2021/8/26 下午11:13
    
    from .base import Base
    
    
    class Consumer(Base):
        def consumer(self):
            channel = self.channel()
    
            channel.exchange_declare(exchange='logs', exchange_type='direct')
            response = channel.queue_declare(queue='', exclusive=True)
            queue_name = response.method.queue
    
            # 将队列与交换机绑定
            channel.queue_bind(queue=queue_name, exchange='logs', routing_key='warning')
    
            channel.basic_consume(queue=queue_name, on_message_callback=self.callback, auto_ack=False)
    
            channel.start_consuming()
    
        def callback(self, channel, method, properties, body):
            print("当前消息为:%s" % body.decode())
    
            # 手动应答
            channel.basic_ack(delivery_tag=method.delivery_tag, multiple=False)
    
    
    if __name__ == '__main__':
        consume = Consumer()
        print("warning开始工作")
        consume.consumer()
    
    

    image-20210827224153496

    消费者(error)

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
    
    # @File    :Consumer.py
    # @Author  :SR
    # @Date    :2021/8/26 下午11:13
    
    from .base import Base
    
    
    class Consumer(Base):
        def consumer(self):
            channel = self.channel()
    
            channel.exchange_declare(exchange='logs', exchange_type='direct')
            response = channel.queue_declare(queue='', exclusive=True)
            queue_name = response.method.queue
    
            # 将队列与交换机绑定
            channel.queue_bind(queue=queue_name, exchange='logs', routing_key='error')
    
            channel.basic_consume(queue=queue_name, on_message_callback=self.callback, auto_ack=False)
    
            channel.start_consuming()
    
        def callback(self, channel, method, properties, body):
            print("当前消息为:%s" % body.decode())
    
            # 手动应答
            channel.basic_ack(delivery_tag=method.delivery_tag, multiple=False)
    
    
    if __name__ == '__main__':
        consume = Consumer()
        print("error开始工作")
        consume.consumer()
    
    

    image-20210827224239957

    Topic

    简介

    在上述日志系统中虽然可以根据日志的级别来进行选择性消费,但是我们此时要求更加进一步精确,我们不但要求根据日志级别来进行消费我们还想根据日志的来源来进行消费,例如我们想要接收定时任务警告消息和内核的严重消息如果使用路由模式不能支持多条件匹配对与复杂的业务场景支持度比较差。

    在RabbitMQ中提供了主题模式,在主题模式中可以根据模糊路由键进行消息匹配然后转发到对应的队列之中,在主题模式中不能含有具体的键,必须根据英文.来进行分割,我们将.分割开的每一个单独的字符串称为单词,在路由键中可以存在两种特殊单词 "*"匹配一个单词,"#"匹配零个或多个。

    topic

    生产者

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
    
    # @File    :produce.py
    # @Author  :SR
    # @Date    :2021/8/18 下午5:30
    
    import pika
    
    from mq_one.base import Base
    
    routing_key = ['[warning].cron', '[error].kernel']
    
    
    class Produce(Base):
    
        def producer(self):
            # 生成一个通信信道
            channel = self.channel()
    
            channel.exchange_declare(exchange='logs', exchange_type='topic')
    
            properties = pika.BasicProperties(delivery_mode=2)
    
            message = ["cron warning message", 'error kernel message']
    
            for i in range(len(routing_key)):
                channel.basic_publish(exchange='logs', routing_key=routing_key[i], body=message[i],
                                      properties=properties)
                print('[生产者] Send %s:%s' % (routing_key, message))
    
    
    if __name__ == '__main__':
        produce = Produce()
        print("生产者开始工作!")
        produce.producer()
    
    

    image-20210828235006968

    消费者(cron)

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
    
    # @File    :Consumer.py
    # @Author  :SR
    # @Date    :2021/8/26 下午11:13
    
    from mq_one.base import Base
    
    binding_key = ['[warning].*', '[error].*']
    
    
    class Consumer(Base):
        def consumer(self):
            channel = self.channel()
    
            channel.exchange_declare(exchange='logs', exchange_type='topic')
    
            response = channel.queue_declare(queue='', exclusive=True)
            queue_name = response.method.queue
    
            channel.queue_bind(exchange='logs', queue=queue_name, routing_key=binding_key[0])
    
            channel.basic_consume(queue=queue_name, on_message_callback=self.callback, auto_ack=False)
    
            channel.start_consuming()
    
        def callback(self, channel, method, properties, body):
            print("当前消息为:%s" % body.decode())
    
            # 手动应答
            channel.basic_ack(delivery_tag=method.delivery_tag, multiple=False)
    
    
    if __name__ == '__main__':
        consume = Consumer()
        print("cron消费者开始消费")
        consume.consumer()
    
    

    image-20210828235341817

    消费者(error)

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
    
    # @File    :Consumer.py
    # @Author  :SR
    # @Date    :2021/8/26 下午11:13
    
    from mq_one.base import Base
    
    binding_key = ['[warn].*', '[error].*']
    
    
    class Consumer(Base):
        def consumer(self):
            channel = self.channel()
    
            channel.exchange_declare(exchange='logs', exchange_type='topic')
    
            response = channel.queue_declare(queue='', exclusive=True)
            queue_name = response.method.queue
    
            channel.queue_bind(exchange='logs', queue=queue_name, routing_key=binding_key[1])
    
            channel.basic_consume(queue=queue_name, on_message_callback=self.callback, auto_ack=False)
    
            channel.start_consuming()
    
        def callback(self, channel, method, properties, body):
            print("当前消息为:%s" % body.decode())
    
            # 手动应答
            channel.basic_ack(delivery_tag=method.delivery_tag, multiple=False)
    
    
    if __name__ == '__main__':
        consume = Consumer()
        print("error消费者开始消费")
        consume.consumer()
    
    

    image-20210828235216926

    消息应答

    概念

    消费者处理生产者产生的消息需要一定的时间,当消费者在处理生产者产生的消息时候只完成部分功能的时候由于某些原因导致消费者突然挂掉导致消息没有被完全处理成功,然而RabbitMQ消息被处理的时候会立即将消息从队列中删除,但是此时我们的消息没有被完全处理同时又被删除,导致消息丢失处理失败。

    为了解决上述消息丢失的问题,在RabbitMQ中引入了消息应答概念,当消费者成功的处理生产者产生的消息请求的时候,会给RabbitMQ产生一个应答消息,告知MQ此时消费者的消息已经成功被处理,可以将消息从队列中删除。

    消息应答分类

    自动应答

    在自动应答方面当RabbitMQ将消息发送完毕之后会立即认为消息成功处理将该消息从队列之中删除,这种模式需要在大量数据与安全性之间做权衡,当有大量数据被发送的时候,此时消息还未发送给消费者由于某种原因导致信道断开此时消息无法传达给消费者但是MQ已经将消息给删除了一样会导致消息丢失,另一种情况当消息量过大且发送给消费者的时候,但是消费者的数量有限处理能力有限,导致大量的消息在内从之中堆积导致内存耗尽,最终导致消费者被操作系统杀死,此时生产者产生的消息依旧没有被处理成功,但是该消息已经被删除了导致消息丢失。

    手动应答

    不同于上述自动应答消息发送之后立马被删除的概念,在手动应答方面当我们成功的处理了消费者的消息,手动给RabbitMQ发送一个应答消息,告知RabbitMQ该消息已经成功处理,可以将该消息从队列之中删除。

    在手动应答时候又分为批量应答以及非批量应答,当选择批量应答的时候会将信道之内的所有消息都自动应答,显然此种方式不安全,因为不能确保信道内的所有消息都成功被处理,在非批量应答的时候只会应答当前被处理的消息。

    message

    消息重入队

    简介

    当消费者成功接收消息并且进行处理的时候,由于某些原因(通道已关闭,TCP连接丢失),导致消费者未能发送ACK确认消息给RabbitMQ,此时RabbitMQ会将消息重新加入队列,如果此时有其余的消费者可以进行消费,则RabbitMQ会将该消息转发给其余的消费者,确保我们即使偶尔某个消费者挂掉消息也不会丢失。

    如图所示,c1/c2两个消费者分别处理消息1与消息2,此时由于某些原因导致c1与队列连接断开未发送ACK消息,此时消息1重新加入了队列,但是由于c2是正常工作的,因此c2将可以处理消息1,这样保证消息1既不会丢失同时也被处理。

    image-20210821105823909

    生产者

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
    
    # @File    :Produce.py
    # @Author  :SR
    # @Date    :2021/8/20 下午11:43
    import pika
    
    from mq_one.base import Base
    
    
    class Produce(Base):
        def producer(self):
            connection = self.connection()
    
            channel = self.channel()
    
            channel.queue_declare("hello", durable=True)
    
            properties = pika.BasicProperties(delivery_mode=2)
    
            for i in range(1, 11):
                message = "hello_world_%s" % i
    
                channel.basic_publish('', routing_key='hello', body=message, properties=properties)
    
            connection.close()
    
    
    if __name__ == '__main__':
        produce = Produce()
    
        produce.producer()
    
    

    image-20210821112616481

    消费者

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
    
    # @File    :Consumer.py
    # @Author  :SR
    # @Date    :2021/8/20 下午11:43
    import time
    
    from mq_one.base import Base
    
    
    class Consumer(Base):
        def consumer(self):
            channel = self.channel()
    
            channel.queue_declare('hello', durable=True)
    
            channel.basic_consume('', on_message_callback=self.callback, auto_ack=False)
    
            channel.start_consuming()
    
        def callback(self, channel, method, properties, body):
            time.sleep(5)
    
            print("当前消息为:%s" % body.decode())
    
            # 手动应答 非批量处理
            channel.basic_ack(delivery_tag=method.delivery_tag, multiple=False)
    
    
    if __name__ == '__main__':
        consume = Consumer()
    
        print("worker2开始等待接收消息。。。。。")
    
        consume.consumer()
    
    

    当worker1运行一段时间主动关闭该程序

    image-20210821114452114

    查看worker2是否能接受worker1中消息5并且处理

    image-20210821114622194

    RabbitMQ持久化

    概念

    为了保证队列与消息在RabbitMQ停止的的时候不会出现队列与消息丢失,默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它忽视队列和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事:我们需要将队列和消息都标记为持久化。

    队列持久化

    如果未设置队列持久化,则RabbitMQ重启的时候,该队列就会被删除,导致队列的丢失,因此我们需要在声明队列的时候将该 durable = True,以便告知RabbitMQ在创建队列的时候会将队列进行持久化处理

    # 声明一个队列并且设置为持久化队列 
    channel.queue_declare('hello', durable=True)
    

    image-20210821174020926

    如果在创建队列的时候未进行持久化处理,但是后期我们将队列修改为持久化,此时会导致代码报错,我们需要将队列手动删除在重新创建队列并且设置为持久化处理

    # 声明一个world队列 未设置持久化
    channel.queue_declare('world', durable=False)
    

    image-20210821174353231

    此时如果我们重启RabbitMQ服务查看非持久化队列是否存在

    # 重启MQ服务
    systemctl restart rabbitmq.service 
    

    image-20210821174703859

    如果我们此时手动将队列修改为持久化,则代码会报错,此时我们需要手动将队列删除在重新声明队列的时候设置持久化即可

    channel.queue_declare('world', durable=True)
    

    image-20210821174914871

    消息持久化

    上述虽然将队列设置为持久化,但是消息未被设置为持久化,当RabbitMQ服务停止的时候处于队列的消息依旧会丢失,因此我们需要进行消息持久化,即将消息从内存写入到磁盘之中,将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候 但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘。持久性保证并不强,如果需要更强力的策略,需要进行发布确认机制。

    # delivery_mode = 2表示消息持久化
    properties = pika.BasicProperties(delivery_mode=2)
    
    message = "hello_world"
    
    channel.basic_publish('', routing_key='hello', body=message, properties=properties)
    

    不公平分发

    在上述的轮询分发之中,虽然表面看上去很公平每个人依次读取消息进行处理,但是当我们部署多台服务器运行RabbitMQ的时候,由于每台服务器的性能有差异,导致消费者处理消息的能力是不一样的,例如服务器A一秒钟可以处理十条消息,但是服务器B十秒钟只能处理一条消息,在这样的情况下,我们服务器A有大量的空闲时间,而服务器B本就执行效率低却一直处于执行状态,但是RabbitMQ却不知道这样的场景,因此依旧很公平的在进行分发,这样的情况表面看起来很公平,实际上并不公平,真正的公平应该处于能者多劳情况。

    # 设置prefetch_count = 1 告知RabbitMQ我只有给你返回应答的时候 才给我分配任务
    channel.basic_qos(prefetch_count=1)
    

    p1

    意思就是如果这个任务我还没有处理完或者我还没有应答你,你先别分配给我,我目前只能处理一个任务,然后 rabbitmq 就会把该任务分配给没有那么忙的那个空闲消费者,当然如果所有的消费者都没有完成手上任务,队列还在不停的添加新任务,队列有可能就会遇到队列被撑满的情况,这个时候就只能添加新worker 或者改变其他存储任务的策略。

    生产者

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
    
    # @File    :Produce.py
    # @Author  :SR
    # @Date    :2021/8/20 下午11:43
    import pika
    
    from mq_one.base import Base
    
    
    class Produce(Base):
        def producer(self):
            connection = self.connection()
    
            channel = self.channel()
    
            channel.queue_declare("world", durable=True)
    
            properties = pika.BasicProperties(delivery_mode=2)
    
            for i in range(1, 11):
                message = "hello_world_%s" % i
    
                channel.basic_publish('', routing_key='hello', body=message, properties=properties)
    
            connection.close()
    
    
    if __name__ == '__main__':
        produce = Produce()
    
        produce.producer()
    
    

    消费者1

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
    
    # @File    :Consumer.py
    # @Author  :SR
    # @Date    :2021/8/20 下午11:43
    import time
    
    from mq_one.base import Base
    
    
    class Consumer(Base):
        def consumer(self):
            channel = self.channel()
    
            channel.queue_declare('hello', durable=True)
    
            # 告知MQ只有工作完成之后才继续分配任务
            channel.basic_qos(prefetch_count=1)
    
            channel.basic_consume('', on_message_callback=self.callback, auto_ack=False)
    
            channel.start_consuming()
    
        def callback(self, channel, method, properties, body):
            print("当前消息为:%s" % body.decode())
    
            # 手动应答
            channel.basic_ack(delivery_tag=method.delivery_tag, multiple=False)
    
    
    if __name__ == '__main__':
        consume = Consumer()
    
        print("worker1开始等待接收消息。。。。。")
    
        consume.consumer()
    
    

    image-20210821185002448

    消费者2

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
    
    # @File    :Consumer.py
    # @Author  :SR
    # @Date    :2021/8/20 下午11:43
    import time
    
    from .base import Base
    
    
    class Consumer(Base):
        def consumer(self):
            channel = self.channel()
    
            channel.queue_declare('hello', durable=True)
    
            # 只有工作完成之后才会继续接受任务
            channel.basic_qos(prefetch_count=1)
    
            channel.basic_consume('', on_message_callback=self.callback, auto_ack=False)
    
            channel.start_consuming()
    
        def callback(self, channel, method, properties, body):
            # 沉睡5S表示处理消息很慢
            time.sleep(5)
    
            print("当前消息为:%s" % body.decode())
    
            # 手动应答
            channel.basic_ack(delivery_tag=method.delivery_tag, multiple=False)
    
    
    if __name__ == '__main__':
        consume = Consumer()
    
        print("worker2开始等待接收消息。。。。。")
    
        consume.consumer()
    

    image-20210821184924249

    预取值

    概念

    当生产者产生大量的消息的时候,如果RabbitMQ不做任何限制无节制的向消费者转发消息,由于消费者处理消息的能力有限,导致大量的消息堆积在信道造成缓冲区溢出等问题,如果设置预取值RabbitMQ会在信道之中设置一个最大的未确认消息数量,当信道存放的消息达到设置的最大值的时候,RabbitMQ就不会在向信道转发消息,除非信道之中未被确认的消息得到确认从而小于信道的最大值,RabbitMQ才会继续向信道转发消息。

    我们可以通过设置 basic.qos中的 prefetch_count来达到上述的效果,在RabbitMQ中每个信道会开启一个 rabbit_limiter当收到 basic.qos指令之后会查看 prefetch_count的值并在 rabbit_limiter记录该值,同时记录未 ACK的消息个数。

    p2

    生产者

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
    
    # @File    :Produce.py
    # @Author  :SR
    # @Date    :2021/8/20 下午11:43
    import pika
    
    from mq_one.base import Base
    
    
    class Produce(Base):
        def producer(self):
            connection = self.connection()
    
            channel = self.channel()
    
            channel.queue_declare("world", durable=True)
    
            properties = pika.BasicProperties(delivery_mode=2)
    
            for i in range(1, 1001):
                message = "hello_world_%s" % i
    
                channel.basic_publish('', routing_key='hello', body=message, properties=properties)
    
            connection.close()
    
    
    if __name__ == '__main__':
        produce = Produce()
    
        produce.producer()
    
    

    消费者1

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
    
    # @File    :Consumer.py
    # @Author  :SR
    # @Date    :2021/8/20 下午11:43
    import time
    
    from mq_one.base import Base
    
    
    class Consumer(Base):
        def consumer(self):
            channel = self.channel()
    
            channel.queue_declare('hello', durable=True)
    
            # 设置信道最大存储未ACK消息数量为2
            channel.basic_qos(prefetch_count=2)
    
            channel.basic_consume('', on_message_callback=self.callback, auto_ack=False)
    
            channel.start_consuming()
    
        def callback(self, channel, method, properties, body):
            print("当前消息为:%s" % body.decode())
    
            # 手动应答
            channel.basic_ack(delivery_tag=method.delivery_tag, multiple=False)
    
    
    if __name__ == '__main__':
        consume = Consumer()
    
        print("worker1开始等待接收消息。。。。。")
    
        consume.consumer()
    
    

    消费者2

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
    
    # @File    :Consumer.py
    # @Author  :SR
    # @Date    :2021/8/20 下午11:43
    import time
    
    from mq_one.base import Base
    
    
    class Consumer(Base):
        def consumer(self):
            channel = self.channel()
    
            channel.queue_declare('hello', durable=True)
    
            # 设置信道最大存储未ACK消息数量为5
            channel.basic_qos(prefetch_count=5)
    
            channel.basic_consume('', on_message_callback=self.callback, auto_ack=False)
    
            channel.start_consuming()
    
        def callback(self, channel, method, properties, body):
            time.sleep(5)
    
            print("当前消息为:%s" % body.decode())
    
            # 手动应答
            channel.basic_ack(delivery_tag=method.delivery_tag, multiple=False)
    
    
    if __name__ == '__main__':
        consume = Consumer()
    
        print("worker2开始等待接收消息。。。。。")
    
        consume.consumer()
    
    

    image-20210821232239805

    发布确认

    简介

    在上述文章中讲述到了队列持久化与消息持久化,虽然表面上看似将数据成功的进行了持久化,但是依旧存在一些问题,假如生产者一秒钟发送十万条数据,然而队列进行持久化的操作一秒钟只能持久化一万条数据,此时还剩九万条消息未被持久化,然而此时RabbitMQ出现故障,导致还未来得及持久化的九万条数据丢失。

    为解决上述场景,我们可以开启RabbitMQ的发布确认机制,开启发布确认机制当生产者发送的消息经过信道的时候会被打上一个唯一的ID(从1开始),一旦生产者产生的消息到达队列,Broker会回传一个确认消息给生产者这样生产者就知道消息已经到达队列,如果队列开启了消息持久化与队列持久化且消息被写入磁盘之后,则Broker将回传一个确认信息(包含消息ID)告知生产者消息已经被持久化处理。

    c1

    单个确认发布

    这是一种同步的确认发布方式,当生产者发送消息只有该消息收到Broker的确认消息才会继续发布下一条消息,如果在指定的时间之内没有收到确认消息就会抛出异常,因为同步确认可想而知发布速度有多慢,因此该种方式适合吞吐量小的场景。

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
    
    # @File    :Produce.py
    # @Author  :SR
    # @Date    :2021/8/20 下午11:43
    import pika
    import time
    
    from mq_one.base import Base
    
    
    class Produce(Base):
        def producer(self):
            channel = self.channel()
    
            channel.queue_declare("confirm", durable=True)
    
            # 开启发布确认机制
            channel.confirm_delivery()
    
            # 消息持久化
            properties = pika.BasicProperties(delivery_mode=2)
    
            # 发送消息的数量
            message_count = 10000
    
            # 计算发布开始时间
            begin_time = time.time()
    
            for i in range(message_count):
                message = "hello_world_%s" % i
                channel.basic_publish('', routing_key='confirm', body=message, properties=properties)
    
            # 发布结束时间
            end_time = time.time()
    
            # 查看耗时
            confirm_time = end_time - begin_time
    		
            # 共发送消息数量为:10000--->总共耗时为:46.80592346191406
            print("共发送消息数量为:%s--->总共耗时为:%s" % (message_count, confirm_time))
    
    
    if __name__ == '__main__':
        produce = Produce()
    
        produce.producer()
    

    image-20210822111226476

    批量发布确认

    上述单个发布确认虽然可以保证我们的消息不会丢失,但是由于是同步确认效率低下,显然高并发情况下此种确认方式不符合要求,在RabbitMQ中提供了批量确认机制当生产者发送一批消息之后,Broker会对这一批消息进行确认,此种方式虽然可以提高确认的效率,但是如果某个消息发送故障丢失我们不能明确的知道是哪个消息发生了故障,当然此种方式一样是同步的依旧会阻塞消息。

    PS:由于在python不支持rabbitmq批量发布确认,此代码省略

    异步确认发布

    上述两种方式都是同步确认在高并发情况下显然不合适,同时虽然批量确认效率相对于单个确认来说效率高点,但是存在消息丢失我们却不能准确的获取丢失的消息,显然上述两种确认机制都有一定的弊端。

    为了解决上述问题RabbitMQ提供啦异步确认机制,从名字便可以看出来该机制是异步的因此效率相比同步会更加高效,同时为了保证消息确认的可靠性,其通过回调函数来通知生产者消息是否被确认发送,最重要的是在发送消息的时候我们可以给消息编号,这样即使有数据丢失,我们也可以通过编号来确认哪个数据丢失。

    async

    发布缓存

    在某些生产环境下由于未知原因运行RabbitMQ的服务器宕机,导致RabbitMQ服务不能正确的对外提供服务,当此时生产者消息到达由于无法正确被处理导致消息丢失,需要手动处理和恢复消息,那么如何才能保证消息可靠的投递呢,特别是在这样极端情况下。

    对于上述情况我们可以考虑使用缓存,当RabbitMQ的交换机或者队列有任何一方不能正确的对外提供服务,此时我们将生产者的消息进行缓存处理,当RabbitMQ恢复正常的时候我们在从缓存中读取生产者的消息进而处理,正确处理完毕之后将消息从缓存之中删除。

    cf

    死信队列

    简介

    死信从字面意思就可以看出来属于死去的信息,当信息无法被正常的消费者进行消费即属于死信,既然有死信那么存放信息的队列则被称之为死信队列。

    应用场景:当订单业务发生异常的时候,为了保证订单消息不丢失,可以将发生异常的消息丢入到死信队列,之后在从死信队列取出异常信息再次进行处理,同时RabbitMQ还可以应用在定时任务,当订单在指定的时候未进行支付我们将取消该订单业务。

    死信来源

    1. TTL过期:在指定时间之内未对消息进行处理
    2. 队列饱合:即队列存储消息达到最大值,无法继续存储生产者的消息
    3. 消息被拒绝:basic.rejectbasic.nack并且 requeue=false.

    图解

    asdasd

    TTL过期

    生产者

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
    
    # @File    :producer_active_queue.py
    # @Author  :SR
    # @Date    :2021/8/31 下午9:57
    
    
    from mq_one.base import Base
    
    import pika
    
    
    class Produce(Base):
    
        def producer(self):
            channel = self.channel()
    
            routing_key = 'active'
    
            properties = pika.BasicProperties(delivery_mode=2)
    
            for i in range(10):
                message = "hello_world_%s" % i
    
                channel.basic_publish(exchange='active_exchange', routing_key=routing_key, body=message,
                                      properties=properties)
    
                print(" [x] Sent: {}".format(message))
    
    
    if __name__ == '__main__':
        produce = Produce()
    
        produce.producer()
    
    

    image-20210831223320948

    消费者(active)

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
    
    # @File    :consumer_active_queue.py
    # @Author  :SR
    # @Date    :2021/8/31 下午9:27
    
    
    from mq_one.base import Base
    
    
    class Consumer(Base):
    
        def consumer(self):
            channel = self.channel()
    
            # 创建正常交换机与队列
            active_exchange = 'active_exchange'
            active_queue = 'active_queue'
    
            # 创建死信交换机与队列
            dead_exchange = 'dead_exchange'
            dead_queue = 'dead_queue'
    
            # 设置路由key
            active_key = 'active'
            dead_key = 'dead'
    
            # 对正常队列声明死信交换机与死信队列与过期时间 key是固定值
            arguments = {
    
                "x-dead-letter-exchange": dead_exchange,
                "x-dead-letter-routing-key": dead_key,
                'x-message-ttl': 10000
            }
    
            # 声明死信交换机
            channel.exchange_declare(exchange=dead_exchange, exchange_type='direct')
    
            # 声明死信队列
            channel.queue_declare(queue=dead_queue, durable=True)
    
            # 将死信交换机与队列绑定
            channel.queue_bind(queue=dead_queue, exchange=dead_exchange, routing_key=dead_key)
    
            # 声明正常交换机
            channel.exchange_declare(exchange=active_exchange, exchange_type='direct')
    
            # 声明正常队列
            channel.queue_declare(queue=active_queue, durable=True, arguments=arguments)
    
            # 将正常交换机与队列绑定
            channel.queue_bind(queue=active_queue, exchange=active_exchange, routing_key=active_key)
    
            channel.basic_consume(queue=active_queue, on_message_callback=self.callback, auto_ack=True)
    
            channel.start_consuming()
    
        def callback(self, ch, method, properties, body):
            print(body.decode())
    
    
    if __name__ == '__main__':
        consume = Consumer()
    
        consume.consumer()
    
    

    此时查看RabbitMQ信息

    image-20210831225439386

    关闭Active消费者开启生产者

    image-20210831225544428

    查看死信队列是否有消息

    image-20210831225708409

    消费者(dead)

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
    
    # @File    :consumer_active_queue.py
    # @Author  :SR
    # @Date    :2021/8/31 下午10:57
    
    
    from mq_one.base import Base
    
    class Consumer(Base):
    
        def consumer(self):
            channel = self.channel()
    
            channel.basic_consume(queue='dead_queue', on_message_callback=self.callback, auto_ack=True)
    
            channel.start_consuming()
    
        def callback(self, ch, method, properties, body):
            print(body.decode())
    
    
    if __name__ == '__main__':
        consume = Consumer()
        
        consume.consumer()
    

    image-20210831230351439

    队列最大值

    生产者

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
    
    # @File    :producer_active_queue.py
    # @Author  :SR
    # @Date    :2021/8/31 下午9:57
    
    
    from mq_one.base import Base
    
    import pika
    
    
    class Produce(Base):
    
        def producer(self):
            channel = self.channel()
    
            routing_key = 'active'
    
            properties = pika.BasicProperties(delivery_mode=2)
    
            for i in range(10):
                # 发送10条数据五个进入正常队列 五个进入死信队列
                message = "hello_world_%s" % i
    
                channel.basic_publish(exchange='active_exchange', routing_key=routing_key, body=message,
                                      properties=properties)
    
                print(" [x] Sent: {}".format(message))
    
    
    if __name__ == '__main__':
        produce = Produce()
    
        produce.producer()
    
    

    image-20210901084716276

    消费者(Active)

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
    
    # @File    :consumer_active_queue.py
    # @Author  :SR
    # @Date    :2021/8/31 下午9:27
    
    
    from mq_one.base import Base
    
    
    class Consumer(Base):
    
        def consumer(self):
            channel = self.channel()
    
            # 创建正常交换机与队列
            active_exchange = 'active_exchange'
            active_queue = 'active_queue'
    
            # 创建死信交换机与队列
            dead_exchange = 'dead_exchange'
            dead_queue = 'dead_queue'
    
            # 设置路由key
            active_key = 'active'
            dead_key = 'dead'
    
            # 对正常队列声明死信交换机与死信队列与队列最大值key是固定值
            # 此时设置x-max-length = 5表名队列只能存储五条数据 其余数据进入非正常数据
            arguments = {
    
                "x-dead-letter-exchange": dead_exchange,
                "x-dead-letter-routing-key": dead_key,
                'x-max-length': 5
            }
    
            # 声明死信交换机
            channel.exchange_declare(exchange=dead_exchange, exchange_type='direct')
    
            # 声明死信队列
            channel.queue_declare(queue=dead_queue, durable=False)
    
            # 将死信交换机与队列绑定
            channel.queue_bind(queue=dead_queue, exchange=dead_exchange, routing_key=dead_key)
    
            # 声明正常交换机
            channel.exchange_declare(exchange=active_exchange, exchange_type='direct')
    
            # 声明正常队列
            channel.queue_declare(queue=active_queue, durable=False, arguments=arguments)
    
            # 将正常交换机与队列绑定
            channel.queue_bind(queue=active_queue, exchange=active_exchange, routing_key=active_key)
    
            channel.basic_consume(queue=active_queue, on_message_callback=self.callback, auto_ack=True)
    
            channel.start_consuming()
    
        def callback(self, ch, method, properties, body):
            print(body.decode())
    
    
    if __name__ == '__main__':
        consume = Consumer()
    
        consume.consumer()
    
    

    image-20210901084446756

    消费者(Dead)

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
    
    # @File    :consumer_active_queue.py
    # @Author  :SR
    # @Date    :2021/8/31 下午10:57
    
    
    from mq_one.base import Base
    
    
    class Consumer(Base):
    
        def consumer(self):
            channel = self.channel()
    
            channel.basic_consume(queue='dead_queue', on_message_callback=self.callback, auto_ack=True)
    
            channel.start_consuming()
    
        def callback(self, ch, method, properties, body):
            print(body.decode())
    
    
    if __name__ == '__main__':
        consume = Consumer()
        consume.consumer()
    

    image-20210901085238114

    消息拒绝

    生产者

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
    
    # @File    :producer_active_queue.py
    # @Author  :SR
    # @Date    :2021/8/31 下午9:57
    
    
    from mq_one.base import Base
    
    import pika
    
    
    class Produce(Base):
    
        def producer(self):
            channel = self.channel()
    
            routing_key = 'active'
    
            properties = pika.BasicProperties(delivery_mode=2)
    
            for i in range(10):
                # 发送10条数据五个进入正常队列 五个进入死信队列
                message = "hello_world_%s" % i
    
                channel.basic_publish(exchange='active_exchange', routing_key=routing_key, body=message,
                                      properties=properties)
    
                print(" [x] Sent: {}".format(message))
    
    
    if __name__ == '__main__':
        produce = Produce()
    
        produce.producer()
    
    

    image-20210901094423176

    消费者(Active)

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
    
    # @File    :consumer_active_queue.py
    # @Author  :SR
    # @Date    :2021/8/31 下午9:27
    
    
    from mq_one.base import Base
    
    
    class Consumer(Base):
    
        def consumer(self):
            channel = self.channel()
    
            # 创建正常交换机与队列
            active_exchange = 'active_exchange'
            active_queue = 'active_queue'
    
            # 创建死信交换机与队列
            dead_exchange = 'dead_exchange'
            dead_queue = 'dead_queue'
    
            # 设置路由key
            active_key = 'active'
            dead_key = 'dead'
    
            arguments = {
    
                "x-dead-letter-exchange": dead_exchange,
                "x-dead-letter-routing-key": dead_key,
    
            }
    
            # 声明死信交换机
            channel.exchange_declare(exchange=dead_exchange, exchange_type='direct')
    
            # 声明死信队列
            channel.queue_declare(queue=dead_queue, durable=False)
    
            # 将死信交换机与队列绑定
            channel.queue_bind(queue=dead_queue, exchange=dead_exchange, routing_key=dead_key)
    
            # 声明正常交换机
            channel.exchange_declare(exchange=active_exchange, exchange_type='direct')
    
            # 声明正常队列
            channel.queue_declare(queue=active_queue, durable=False, arguments=arguments)
    
            # 将正常交换机与队列绑定
            channel.queue_bind(queue=active_queue, exchange=active_exchange, routing_key=active_key)
    
            channel.basic_consume(queue=active_queue, on_message_callback=self.callback, auto_ack=False)
    
            channel.start_consuming()
    
        def callback(self, ch, method, properties, body):
            # 获取队列消息tag值
            delivery_tag = method.delivery_tag
            if delivery_tag == 10:
                # 将消息10给拒绝并且不设置重新加队列
                ch.basic_reject(delivery_tag=delivery_tag, requeue=False)
    
            print(body.decode())
            ch.basic_ack(delivery_tag=delivery_tag, multiple=False)
    
    
    if __name__ == '__main__':
        consume = Consumer()
    
        consume.consumer()
    
    

    消费者(Dead)

    #!/usr/bin/env python
    # -*- coding: UTF-8 -*-
    
    # @File    :consumer_active_queue.py
    # @Author  :SR
    # @Date    :2021/8/31 下午10:57
    
    
    from mq_one.base import Base
    
    
    class Consumer(Base):
    
        def consumer(self):
            channel = self.channel()
    
            channel.basic_consume(queue='dead_queue', on_message_callback=self.callback, auto_ack=True)
    
            channel.start_consuming()
    
        def callback(self, ch, method, properties, body):
            print(body.decode())
    
    
    if __name__ == '__main__':
        consume = Consumer()
        consume.consumer()
    

    image-20210901094525594

    延迟队列

    简介

    延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。

    使用场景

    1. 订单在十分钟之内未支付则自动取消
    2. 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
    3. 用户注册成功后,如果三天内没有登陆则进行短信提醒。
    4. 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
    5. 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。
      这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如:发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;看起来似乎使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?如果数据量比较少,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求,如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支付的账单,确实也是一个可行的方案。但对于数据量比较大,并且时效性较强的场景,如:“订单十分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。 |

    图解应用场景

    ycdl

    RabbitMQ集群部署

    简介

    最开始我们介绍了如何安装及运行 RabbitMQ 服务,不过这些是单机版的,无法满足目前真实应用的要求。如果 RabbitMQ 服务器遇到内存崩溃、机器掉电或者主板故障等情况,该怎么办?单台 RabbitMQ服务器可以满足每秒 1000 条消息的吞吐量,那么如果应用需要 RabbitMQ 服务满足每秒 10 万条消息的吞吐量呢?购买昂贵的服务器来增强单机 RabbitMQ 务的性能显得捉襟见肘,搭建一个 RabbitMQ 集群才是解决实际问题的关键。

    环境规划

    名称 IP 角色
    master-node 10.1.1.2 master
    slave-node1 10.1.1.3 slave
    slave-node1 10.1.1.4 slave

    hosts映射

    [root@master-node ~]# vim /etc/hosts                                                                                                                                                                                                                                                                                                                                                             
      10.1.1.2 master-node                                                                                                                                                                                                                                                                                                                                                                             
      10.1.1.3 slave-node1                                                                                                                                                                                                                                                                                                                                                                              
      10.1.1.4 slave-node2                                                                                                                                                                                                                                                                                                                                                                             
    [root@master-node ~]# scp /etc/hosts root@10.1.1.3:/etc/                                                                                                                                                                                                                                                                                                                                         
    [root@master-node ~]# scp /etc/hosts root@10.1.1.4:/etc/                                                                                                                                                                                                                                                                                                                                         
    

    复制cookie

    [root@master-node ~]# scp /var/lib/rabbitmq/.erlang.cookie root@slave-node1:/var/lib/rabbitmq/                                                                                                                                                                                                                                                                                                   
    [root@master-node ~]# scp /var/lib/rabbitmq/.erlang.cookie root@slave-node2:/var/lib/rabbitmq/                                                                                                                                                                                                                                                                                                   
    

    加入集群

    # 重启mqfuwu 所有节点都开启                                                                                                                                                                                                                                                                                                                                                                               
    [root@slave-node1 ~]# systemctl restart rabbitmq-server.service                                                                                                                                                                                                                                                                                                                                  
    # 节点2与节点3执行如下命令                                                                                                                                                                                                                                                                                                                                                                                  
    # (rabbitmqctl stop 会将 Erlang 虚拟机关闭,rabbitmqctl stop_app 只关闭 RabbitMQ 服务)                                                                                                                                                                                                                                                                                                                        
    [root@slave-node1 ~]# rabbitmqctl stop_app                                                                                                                                                                                                                                                                                                                                                       
    [root@slave-node1 ~]# rabbitmqctl reset                                                                                                                                                                                                                                                                                                                                                          
    [root@slave-node1 ~]# rabbitmqctl join_cluster rabbit@master-node                                                                                                                                                                                                                                                                                                                                
    [root@slave-node1 ~]# rabbitmqctl start_app                                                                                                                                                                                                                                                                                                                                                      
    

    创建rabbitMQ用户

    # 创建账号root 密码root                                                                                                                                                                                                                                                                                                                                                                                
    [root@master-node ~]# rabbitmqctl add_user root root                                                                                                                                                                                                                                                                                                                                             
    # 设置用户角色                                                                                                                                                                                                                                                                                                                                                                                         
    [root@master-node ~]# rabbitmqctl set_user_tags root administrator                                                                                                                                                                                                                                                                                                                               
    # 设置用户权限                                                                                                                                                                                                                                                                                                                                                                                         
    [root@master-node ~]# rabbitmqctl set_permissions -p "/" root ".*" ".*" ".*"                                                                                                                                                                                                                                                                                                                     
    # 查看用户                                                                                                                                                                                                                                                                                                                                                                                           
    [root@master-node ~]# rabbitmqctl list_users                                                                                                                                                                                                                                                                                                                                                     
    

    image-20210901215029581

    查看集群

    [root@master-node ~]# rabbitmqctl cluster_status                                                                                                                                                                                                                                                                                                                                                 
    

    image-20210901215739807

    镜像队列

    如果 RabbitMQ 集群中只有一个 Broker 节点,那么该节点的失效将导致整体服务的临时性不可用,并且也可能会导致消息的丢失。可以将所有消息都设置为持久化,并且对应队列的durable属性也设置为true,但是这样仍然无法避免由于缓存导致的问题:因为消息在发送之后和被写入磁盘井执行刷盘动作之间存在一个短暂却会产生问题的时间窗。通过 publisherconfirm 机制能够确保客户端知道哪些消息己经存入磁盘,尽管如此,一般不希望遇到因单点故障导致的服务不可用。
    引入镜像队列(Mirror Queue)的机制,可以将队列镜像到集群中的其他 Broker 节点之上,如果集群中的一个节点失效了,队列能自动地切换到镜像中的另一个节点上以保证服务的可用性。

    '''                                                                                                                                                                                                                                                                                                                                                                                              
    Name:policy的名称                                                                                                                                                                                                                                                                                                                                                                                   
    Pattern: queue的匹配模式(正则表达式)                                                                                                                                                                                                                                                                                                                                                                       
    priority:可选参数,policy的优先级                                                                                                                                                                                                                                                                                                                                                                         
    Definition:镜像定义,包括三个部分ha-mode、ha-params、ha-sync-mode                                                                                                                                                                                                                                                                                                                                             
    ha-mode:指明镜像队列的模式,有效值为 all/exactly/nodes                                                                                                                                                                                                                                                                                                                                                         
    all:表示在集群中所有的节点上进行镜像                                                                                                                                                                                                                                                                                                                                                                             
    exactly:表示在指定个数的节点上进行镜像,节点的个数由ha-params指定                                                                                                                                                                                                                                                                                                                                                        
    nodes:表示在指定的节点上进行镜像,节点名称通过ha-params指定                                                                                                                                                                                                                                                                                                                                                            
    ha-params:ha-mode模式需要用到的参数                                                                                                                                                                                                                                                                                                                                                                       
    ha-sync-mode:进行队列中消息的同步方式,有效值为automatic(自动)和manual(手动)                                                                                                                                                                                                                                                                                                                                           
    '''                                                                                                                                                                                                                                                                                                                                                                                              
    # 对队列名称以"queue_"开头的所有队列进行镜像,并在集群的两个节点上完成进行                                                                                                                                                                                                                                                                                                                                                       
    [root@master-node ~]# rabbitmqctl set_policy ha-queue-two '^queue_' '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'                                                                                                                                                                                                                                                             
    

    image-20210902150324299

  • 相关阅读:
    leetcode 763. Partition Labels
    JS字符串格式化~欢迎来搂~~
    手把手教你在pycharm上上传项目至GitHub
    手把手教你用原始方式上传项目至GitHub
    python3.7环境下创建app、运行Django1.11版本项目报错Generator expression must be parenthesized
    在学习python的DjangoFlaskTornado前你需要知道的,what is web?
    python手撸桌面计算器
    jQuery之克隆事件--clone()与clone(true)区别
    前端之jQuery基础
    通过案例来剖析JQuery与原生JS
  • 原文地址:https://www.cnblogs.com/SR-Program/p/15219428.html
Copyright © 2011-2022 走看看