zoukankan      html  css  js  c++  java
  • RabbitMQ消息队列

    一、RabbitMQ安装

    1、简介

    RabbitMQ是一个消息代理:它接受和转发消息。可以将其视为邮局,当你要发送的邮件放入邮局中时,你可以确认邮件是否安全的达到接收者的手中,这里RabbitMQ就相当于邮局的角色。

    2、RabbitMQ的安装

    • 安装erlang

    因为RabbitMQ是erlang语言开发的,所以需要先安装erlang。

    从EPEL源安装:

    [root@localhost ~]# yum install epel-release
    [root@localhost ~]# yum install erlang 
    • 安装RabbitMQ

    下载:

    [root@localhost ~]# wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.6/rabbitmq-server-3.6.6-1.el7.noarch.rpm

    安装:

    [root@localhost ~]# yum install rabbitmq-server-3.6.6-1.el7.noarch.rpm 

    启动服务:

    [root@localhost bin]# rabbitmq-server start

    查看用户以及权限:

    查看用户:rabbitmqctl list_users  
    
    查看用户权限:rabbitmqctl list_user_permissions guest
    
    新增用户: rabbitmqctl add_user admin 123
    
    赋予管理员权限:
    
    rabbitmqctl set_user_tags admin administrator 
    
    rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*" 

    启动web监控:

    [root@localhost ~]# rabbitmq-plugins enable rabbitmq_management

    访问默认端口15672

    另外,如果操作RabbitMQ,需要安装API,进行操作,在开发环境中安装pika模块

    pip install pika

    二、六种工作模式

    1、生产者消费者模式

     

    “P”是生产者,“C”是消费者。中间的框是一个队列 ,用于存放消息。

    生产者是将任务放入到队列中:

    import pika
    
    #创建用户名密码
    credentials = pika.PlainCredentials("admin","123")
    #创建连接
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.0.104',credentials=credentials))
    channel = connection.channel()
    # 创建一个队列
    channel.queue_declare(queue='hello')
    
    channel.basic_publish(exchange='',
                          routing_key='hello', # 消息队列名称
                          body='helloworld')
    connection.close()

    消费者是取出任务队列并且进行处理:

    import pika
    
    credentials = pika.PlainCredentials("admin","123")
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.0.104',credentials=credentials))
    channel = connection.channel()
    
    # 创建一个队列,如果已经存在就不会重新创建
    channel.queue_declare(queue='hello')
    
    def callback(ch, method, properties, body):
        print("消费者接受到了任务: %r" % body)
    
    channel.basic_consume('hello',callback,auto_ack=True)
    
    channel.start_consuming()

    详情参考:https://www.rabbitmq.com/tutorials/tutorial-one-python.html

    2、 竞争消费者模式

      (1)消息确认auto_ack

      如果消费者(上图中的C1和C2)处理从队列取出的任务,但是没有完成时就已经挂掉了,那么如果使用之前的代码auto_ack=True,一旦RabbitMQ向消费者传递任务,它立即将其标记为删除。在这种情况下,如果挂掉一个消费者,将丢失它刚刚处理的任务。

       为了确保任务永不丢失,RabbitMQ支持消息确认消费者发回ack(nowledgement)告诉RabbitMQ已收到,处理了特定消息,RabbitMQ可以自由删除它。

      如果消费者挂掉(其通道关闭,连接关闭或TCP连接丢失)而不发送确认,RabbitMQ将未完全处理的任务并重新排队。如果同时有其他在线消费者,则会迅速将其重新发送给其他消费者

    主要变动在于消费者的变动:

    import pika
    
    credentials = pika.PlainCredentials("admin","123")
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.0.104',credentials=credentials))
    channel = connection.channel()
    
    # 创建一个队列,如果已经存在就不会重新创建
    channel.queue_declare(queue='hello')
    
    def callback(ch, method, properties, body):
        print("消费者接受到了任务: %r" % body)
    
        ch.basic_ack(delivery_tag=method.delivery_tag)#防止消费者挂掉任务丢失
    channel.basic_consume('hello',callback,auto_ack=False)#默认情况auto_ack=False手动消息确认打开
    
    channel.start_consuming()

    生产者未变动:

    import pika
    
    #创建用户名密码
    credentials = pika.PlainCredentials("admin","123")
    #创建连接
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.0.104',credentials=credentials))
    channel = connection.channel()
    # 创建一个队列
    channel.queue_declare(queue='hello')
    
    channel.basic_publish(exchange='',
                          routing_key='hello', # 消息队列名称
                          body='你好')
    connection.close()

      (2)消息持久性

    如果RabbitMQ服务器停止,队列中任务仍然会丢失。此时需要使用durable 参数,声明队列是持久的,但是此时以前的队列就不可以使用,持久的队列需要重新声明,也就是说需要改一下队列名称。

    主要变动在与生产者的变动:

    import pika
    
    #创建用户名密码
    credentials = pika.PlainCredentials("admin","123")
    #创建连接
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.0.104',credentials=credentials))
    channel = connection.channel()
    # 创建一个持久化队列
    channel.queue_declare(queue = 'task_queue',durable = True)
    
    channel.basic_publish(exchange='',
                          routing_key='task_queue', # 消息队列名称
                          body='你好',
                          properties=pika.BasicProperties(
                              delivery_mode=2,  # 将消息标记为持久性
                          ))
    
    connection.close()

    消费者未变动:

    import pika
    
    credentials = pika.PlainCredentials("admin","123")
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.0.104',credentials=credentials))
    channel = connection.channel()
    
    # 声明一个队列,已经创建就不会再创建了
    channel.queue_declare(queue='task_queue')
    
    def callback(ch, method, properties, body):
        print("消费者接受到了任务: %r" % body)
    
    channel.basic_consume(‘task_queue’,callback,auto_ack=True)
    
    channel.start_consuming()

      (3)公平派遣

      可以看到上面的图,一个生产者,两个消费者,生产者将任务不断的放入到队列中,消费者不断的取出任务,那么这两个消费者是如何取任务的呢?

      RabbitMQ默认情况下是均匀地发送消息,也就是消费者一个接一个的取出任务。这样如果一个消费者处理任务的时间比较长,还是均匀的给任务,势必造成一个消费者将经常忙,而另一个会很闲。

      此时可以通过basic.qos方法和 prefetch_count = 1设置,将任务发送给空闲的消费者。

    变动主要在消费者:

    import pika
    
    credentials = pika.PlainCredentials("admin","123")
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.0.104',credentials=credentials))
    channel = connection.channel()
    
    # 声明一个队列(创建一个队列)
    channel.queue_declare(queue='hello')
    
    def callback(ch, method, properties, body):
        print("消费者接受到了任务: %r" % body)
    
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume('hello',callback,auto_ack=True)
    
    channel.start_consuming()

    生产者未变动:

    import pika
    # 无密码
    # connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.0.104'))
    
    # 有密码
    credentials = pika.PlainCredentials("admin","123")
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.0.104',credentials=credentials))
    channel = connection.channel()
    # 声明一个队列(创建一个队列)
    channel.queue_declare(queue='hello')
    
    channel.basic_publish(exchange='',
                          routing_key='hello', # 消息队列名称
                          body='jjfdk')
    connection.close()

    详情参考:https://www.rabbitmq.com/tutorials/tutorial-two-python.html

     3、发布/订阅

      在之前的RabbittMQ中主要用于将任务放入一个队列钟,然后消费者分别取出任务进行处理,而发布订阅是每一个消费者都将拥有自己的一个队列,从而获取消息。这样每一个消费者都将拥有相同的消息。

      在上述模型中,“P”是生产者,也就是消息制造者,“X”是exchange ,用于将生产出来的消息给每一个队列给一份,中间红色的就是队列,“C1”和“C2”是消息接受者。

    发布者:

    import pika
    credentials = pika.PlainCredentials("admin","123")
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.0.104',credentials=credentials))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='ex1',exchange_type='fanout')#fanout工作方式为ex1每一个队列添加消息
    
    channel.basic_publish(exchange='ex1',
                          routing_key='',
                          body='abcd')
    
    connection.close()

    订阅者:

    import pika
    
    credentials = pika.PlainCredentials("admin","123")
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.0.104',credentials=credentials))
    channel = connection.channel()
    
    # exchange='ex1',exchange的名称
    # exchange_type='fanout' , 工作方式将消息发送给所有的队列
    channel.exchange_declare(exchange='ex1',exchange_type='fanout')
    
    # 随机生成一个队列
    result = channel.queue_declare(queue = '',exclusive = True)
    queue_name = result.method.queue
    # 让exchange和queque进行绑定.
    channel.queue_bind(exchange='ex1',queue=queue_name)

    详情参考:https://www.rabbitmq.com/tutorials/tutorial-three-python.html 

    4、关键字发布/订阅

     上述发布的工作方式是:

    exchange_type='fanout'

    将消息发送给所有队列,而选择性的发布/订阅,需要使用exchange_type='direct'和routing_key=""进行关键字发布订阅。

    发布者

    import pika
    credentials = pika.PlainCredentials("admin","123")
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.0.104',credentials=credentials))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='ex2',exchange_type='direct')
    
    channel.basic_publish(exchange='ex2',
                          routing_key='gh',
                          body='nhgjod')
    
    connection.close()

    exchange名称为“ex2”,工作方式为“direct”,然后将exchange与关键字routing_key关键字进行绑定。

    订阅者1

    import pika
    
    credentials = pika.PlainCredentials("admin","123")
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.0.104',credentials=credentials))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='ex2',exchange_type='direct')
    
    # 随机生成一个队列
    result = channel.queue_declare(queue = '',exclusive=True)
    queue_name = result.method.queue
    
    # 让exchange和queque进行绑定.
    channel.queue_bind(exchange='ex2',queue=queue_name,routing_key='bright')
    channel.queue_bind(exchange='ex2',queue=queue_name,routing_key='gh')

    exchange与queue以及关键字进行绑定,routing_key=“gh”可以收到消息。

    订阅者2

    import pika
    
    credentials = pika.PlainCredentials("admin","123")
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.0.104',credentials=credentials))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='ex2',exchange_type='direct')
    
    # 随机生成一个队列
    result = channel.queue_declare(queue = '',exclusive=True)
    queue_name = result.method.queue
    
    # 让exchange和queque进行绑定.
    channel.queue_bind(exchange='ex2',queue=queue_name,routing_key='bright')
    
    def callback(ch, method, properties, body):
        print("消费者接受到了任务: %r" % body)
    
    channel.basic_consume(queue_name,callback,auto_ack=True)
    
    channel.start_consuming()

    routing_key=“bright”不能接收到关键字为“gh”的发布者发布的消息。

    详情参考:https://www.rabbitmq.com/tutorials/tutorial-four-python.html

    5、关键字模糊匹配发布

     

    可以看到使用type=topic,以及使用“#”以及“*”:

    当队列绑定“#”绑定routing_key时,匹配任意字符
    当特殊字符“
    *”绑定routing_key时,匹配一个单词。

    消息发布:

    import pika
    credentials = pika.PlainCredentials("admin","123")
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.0.104',credentials=credentials))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='ex3',exchange_type='topic')
    
    channel.basic_publish(exchange='ex3',
                          routing_key='bright.gh.km',
                          body='abcdefd')
    
    connection.close()

    可以看到关键字routing_key='bright.gh.km',其余的与之前发布/订阅并没什么区别。

    消息订阅1:

    import pika
    
    credentials = pika.PlainCredentials("admin","123")
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.0.104',credentials=credentials))
    channel = connection.channel()
    
    
    channel.exchange_declare(exchange='ex3',exchange_type='topic')
    
    # 随机生成一个队列
    result = channel.queue_declare(queue="",exclusive=True)
    queue_name = result.method.queue
    # 让exchange和queque进行绑定.
    channel.queue_bind(exchange='ex3',queue=queue_name,routing_key='bright.*')
    
    
    def callback(ch, method, properties, body):
        print("消费者接受到了任务: %r" % body)
    
    channel.basic_consume(queue_name,callback,auto_ack=True)
    
    channel.start_consuming()

    这时可以看到订阅者1的routing_key='bright.*',可以匹配发布消息以bright开头的后面跟一个单词,所以发布者的消息它接收不到。

    订阅者2:

    import pika
    
    credentials = pika.PlainCredentials("admin","123")
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.0.104',credentials=credentials))
    channel = connection.channel()
    
    
    channel.exchange_declare(exchange='ex3',exchange_type='topic')
    
    # 随机生成一个队列
    result = channel.queue_declare(queue="",exclusive=True)
    queue_name = result.method.queue
    # 让exchange和queque进行绑定.
    channel.queue_bind(exchange='ex3',queue=queue_name,routing_key='bright.#')
    
    
    def callback(ch, method, properties, body):
        print("消费者接受到了任务: %r" % body)
    
    channel.basic_consume(queue_name,callback,auto_ack=True)
    
    channel.start_consuming()

    这时可以看到订阅者1的routing_key='bright.#',可以匹配发布消息以bright开头的后面多个单词、字符,所以发布者的消息它可以接收到,这也就说明“#”比“*”更强大。

    详情参考:https://www.rabbitmq.com/tutorials/tutorial-five-python.html

    6、远程过程调用(RPC)

    RPC工作方式:

    • 当客户端启动时,它会创建一个随机回调队列。
    • 对于RPC请求,客户端发送带有两个属性的消息: reply_to,设置为回调队列,correlation_id,设置为每个请求的唯一值。
    • 请求被发送到rpc_queue队列。
    • Server正在等待rpc_queue上的请求。当出现请求时,它会执行任务并使用reply_to字段中的队列将结果返回给客户端
    • 客户端等待回调队列上的数据。出现消息时,它会检查correlation_id属性。如果它与请求中的值匹配,则将响应返回给应用程序。

    客户端:

    import pika
    import uuid
    
    class FibonacciRpcClient(object):
        def __init__(self):
            credentials = pika.PlainCredentials("admin", "123")
            self.connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.0.104', credentials=credentials))
            self.channel = self.connection.channel()
    
            # 随机生成一个消息队列(用于接收结果)
            result = self.channel.queue_declare(queue="",exclusive=True)
            self.callback_queue = result.method.queue
    
            # 监听消息队列(返回结果的队列)中是否有值返回,如果有值则执行 on_response 函数(一旦有结果,则执行on_response)
            self.channel.basic_consume(self.callback_queue,self.on_response, auto_ack=True)
    
        def on_response(self, ch, method, props, body):
            if self.corr_id == props.correlation_id:
                self.response = body
    
        def call(self, n):
            self.response = None
            self.corr_id = str(uuid.uuid4())
    
            # 发送一个任务包含的内容:  任务id = corr_id ;任务内容 = '10' ;用于接收结果的队列名称
            self.channel.basic_publish(exchange='',
                                       routing_key='rpc_queue', # 接收任务的队列名称
                                       properties=pika.BasicProperties(
                                             reply_to = self.callback_queue, # 用于接收结果的队列
                                             correlation_id = self.corr_id, # 任务ID
                                             ),
                                       body=str(n))
    
            while self.response is None:
                self.connection.process_data_events()
    
            return self.response
    
    fibonacci_rpc = FibonacciRpcClient()
    
    response = fibonacci_rpc.call(10)
    print('结果:',response)

    在客户端需要做以下的事情:

    • 建立连接
    • 随机生成用于接收返回结果的消息队列
    • 监听接收返回结果的消息队列,看是否有结果(是否执行on_response)
    • 定义主调用方法,执行rpc请求,包含请求的详细信息
    • 等待响应

    服务端:

    import pika
    credentials = pika.PlainCredentials("admin","123")
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.0.104',credentials=credentials))
    channel = connection.channel()
    
    # 监听任务队列,是否有任务到来
    channel.queue_declare(queue='rpc_queue')
    
    def on_request(ch, method, props, body):
        n = int(body)
        response = n*100
        # props.reply_to  要放结果的队列.
        # props.correlation_id  任务id
        ch.basic_publish(exchange='',
                         routing_key=props.reply_to,
                         properties=pika.BasicProperties(correlation_id= props.correlation_id),
                         body=str(response))
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(queue = 'rpc_queue',on_message_callback = on_request)
    channel.start_consuming()

    在服务端需要做以下的事情:

    • 建立连接
    • 监听请求队列,看是否有请求到来。
    • basic_consume声明了一个回调,是RPC服务器的核心。它在收到请求时执行。处理请求并发回响应。
    • 想要运行多个服务器进程。为了在多个服务器上平均分配负载,需要设置prefetch_count

    详情参考:https://www.rabbitmq.com/tutorials/tutorial-six-python.html

  • 相关阅读:
    Dalvik虚拟机进程和线程的创建过程分析
    Dalvik虚拟机的运行过程分析
    Dalvik虚拟机JNI方法的注册过程分析
    Dalvik虚拟机简要介绍和学习计划
    Dalvik虚拟机的启动过程分析
    Android应用程序资源的查找过程分析
    Android应用程序资源管理器(Asset Manager)的创建过程分析
    Android应用程序资源的编译和打包过程分析
    Android视图SurfaceView的实现原理分析
    MySQL中CASE的使用
  • 原文地址:https://www.cnblogs.com/shenjianping/p/11017625.html
Copyright © 2011-2022 走看看