zoukankan      html  css  js  c++  java
  • Python之RabbitMQ的使用

    今天总结一下Python关于Rabbitmq的使用

      RabbitMQ官网说明,其实也是一种队列,那和前面说的线程queue和进程queue有什么区别呢?

        线程queue只能在同一个进程下进行数据交互

        进程queue只能在父进程和子进程之间,或者同一父进程下的子进程之间做数据交互

        如果需要对不同进程(eg:微信和qq)两个独立的程序间通信

    方法1就是直接把数据写在硬盘(disk)上然后各自的进程读取数据就可以,但是由于硬盘的读写速度太慢,效率太低

    方法2自己写个socket,直接做数据交互,问题是如果改变程序,或者再加一个程序,需要对写好的socket进行修改,还要处理黏包什么的复杂的连接关系,维护成本太高。

    方法3,利用已有的中间商(代理)。这个broker其实就是封装好的socket,我们拿来直接用就好了。

       这里的broker,就有RabbitMQ,ZeroMQ,ActiveMQ等等等等。

    一.安装及环境配置

        windows的安装和配置方法较为简单,直接安装就好了

        Rabbit支持多种语言: Java, .NET, PHP, Python, JavaScript, Ruby, Go这些常用语言都支持


    如图所示,python操作RabbitMQ需要的模块有上述几种选择,我们用最简单的pika,用pip直接安装

    pip install pika

    二.RabbitMQ的使用

    这里所有的用法都是基于RabbitMQ是工作在‘localhost’上,并且端口号为15672,能在浏览器里访问http://localhost:15672这个地址。

    1.消息分发(基础版)

    这就是RabbitMQ最简单的工作模式,p为生产者(Producer),生产者发送message给queue,queue再把消息发送至消费者c(Customer)

    先看看生产者至队列(send)这个过程

    import pika
    connect = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connect.channel()

    我们先建立了一个链接,然后就需要定义一个队列,队列的名字就暂时定位‘hello'

    channel.queue_declare(queue='hello')

    在RabbitMQ里消息并不能直接发送给队列,所有的信息发送都要通过一个exchange,但是这里我们先把这个exchange定义成一个空的字符串,后面在将他的具体用法

    channel.basic_publish(exchange='',
                          routing_key='hello',
                          body='123')

    在发送确认完成后,可以将连接关闭

    connect.close()

    这就是send端的代码

    import pika
    connect = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connect.channel()
    channel.queue_declare(queue='hello')
    channel.basic_publish(exchange='',
                          routing_key='hello',
                          body='123')
    print("[x] Sent 'hello world!'")
    connect.close()
    RabbitMQ_basic_producer

     运行了send代码后我们可以在terminal里RabbitMQ安装目录下sbin文件夹里查看一下消息队列

    rabbitmqctl.bat list_queues 

    如果是Linux命令为

    sudo rabbitmqctl list_queues

    这里就说明了队列信息和消息状态。

    然后再看一下消费者这一端的代码是什么样的,同样,先要建立连接并定义好队列名

    connect = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connect.channel()
    channel.queue_declare(queue='hello')

    这里可能有个疑问:我们不是在生产者里已经定义了队列名吗?为什么在消费者里还要定义呢?

    因为在实际工作中,我们并不能确定是生产者还是消费者先一步运行,如果队列名没有定义的话运行时候是会报错的。下面就是对消息的处理

    def callback(ch,method,properties,body):
        print("[x] Received %r"%body)
    
    channel.basic_consume(callback,
                          queue='hello',
                          no_ack=True)

    当消息来临时,消费者会执行回调函数callback。这里的callback就是直接打印消息内容(body)。

    回调函数另外的几个参数:chconne.channel的内存对象地址,

    <BlockingChannel impl=<Channel number=1 OPEN conn=<SelectConnection OPEN socket=('::1', 62145, 0, 0)->('::1', 5672, 0, 0) params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>>

    method是包含了发送的信息

    <Basic.Deliver(['consumer_tag=ctag1.9ae48c906b014a83a512413c0e6f9ef8', 'delivery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=hello'])>

     properties我们以后再了解。

    2.公平分发(workqueue)

      在这种结构里,我们要考虑到这样一种情况:有多个消费者,消费者在得到消息时需要对消息进行处理,并且有可能处理消息所消耗的时间是不同的

    。这里我们用的queue叫做workqueue

      为了模拟消费者对消息进行处理的过程,我们用time.sleep()做一个消耗时间的过程。消息的产生和接收是这样的

    message = ' '.join(sys.argv[1:]) or "Hello World!"
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
        time.sleep(body.count(b'.'))
        print(" [x] Done")

    这里插播一条''.join(sys.argv[1:])的作用:就是把在shell里输入的指令后跟的代码加在message里。消费者得到消息后,数消息里有几个“.”,sleep相应的秒数。

     放出第一版的代码

    import pika,sys
    connect = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connect.channel()
    channel.queue_declare(queue='hello')
    message = ' '.join(sys.argv[1:]) or "Hello World."
    channel.basic_publish(exchange='',
                          routing_key='hello',
                          body=message)
    print('send %s'%message)
    RabbitMQ生产者
    import time,pika
    
    connect = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connect.channel()
    channel.queue_declare(queue='hello')
    
    print(' [*] Waiting for messages. To exit press CTRL+C')
    
    
    
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
        time.sleep(body.count(b'.'))
        print(" [x] Done")
        ch.basic_ack(delivery_tag = method.delivery_tag)
    
    channel.basic_consume(callback,
                          queue='hello')
    channel.start_consuming()
    RabbbitMQ消费者

     这时候,我们可以多启动几个消费者,再用生产者发送消息,看看效果!

     

    可以发现,消息是被公平的依次被分发给各个消费者的(Fair dispatch),这种分发的方式叫轮询。

    消息确认message acknowledgments  

    现在考虑下这种情形:消费者在处理消息时需要较长的时间,在这时把这个消费者kill掉,正在处理的消息和已经接收但未被处理的消息就丢失了。这应该是不允许的,我们可不希望有数据丢失,就需要将这些任务重新发送给其他正常工作的消费者。

    为了保证任务不丢失,RabbitMQ支持使用message acknowledgments,消费者在完成任务后会给RabbitMQ发送个消息,告诉他活已经干完了,RabbitMQ就会把这个任务给释放掉。而当出现消费者宕机、掉线等情况时,RabbitMQ会重新把这个任务发送给其他的消费者。

    往回看看上文说到的no_ack,这个值默认的是False,RabbitMQ是不主动销毁消息的所以我们一看看在这里把值置为True。

    channel.basic_consume(callback,
                          queue='hello',
                          no_ack=True)

    这样只要消费者接收到消息,RabbitMQ就直接销毁掉这个消息,就成了手动确认。我们要想实现刚才说的消息不丢失,就要这样定义

    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
        # time.sleep(body.count(b'.'))
        time.sleep(10)     #修改了一下,在延时的10s把消费者断掉
        print(" [x] Done")
        ch.basic_ack(delivery_tag = method.delivery_tag)
    channel.basic_consume(callback,
                          queue='hello')#这里的no_ack默认为False

    这样,当一个消费者宕机了,RabbitMQ就会直接把任务拍个下一个消费者。

     消息持久化

    刚才通过了消息确认,我们保证了消费者在掉线的时候任务不丢失,可是还有一个问题,如果RabbitMQ如果断掉(或者服务重启)了,里面的任务(包括所有queue和exchange依旧会丢失)这时候我们可以用到——消息持久化Message durability

    channel.queue_declare(queue='hello',durable=True)#将队列持久化(只保存了队列)
    channel.basic_publish(exchange='',
                          routing_key='hello',
                          body=message,
                          properties=pika.BasicProperties(delivery_mode=2))#保持消息持久化

    必须同时将队列和消息持久化,可以保证RabbitMQ服务在重启后任务还存在。

    注意几点:

    1.如果只持久化了消息,服务重启后消息丢失

    2.如果只持久化了队列,服务重启后队列还在,但消息丢失

    3.在持久化队列的时候要保持生产者和消费者的一致性

    最后一点,因为有可能每个消费者处理信息的能力不一样,如果按公平分发的化有可能导致负载不平衡,旱的旱死、涝的涝死。为避免这种情况发生还有一个知识点

    channel.basic_qos(prefetch_count=1)

    用这个语句限制了消费者待处理信息的个数

    workQueue的终极代码

    import pika,sys
    connect = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connect.channel()
    channel.queue_declare(queue='hello',durable=True)#队列持久化
    message = ' '.join(sys.argv[1:]) or "Hello World2."
    channel.basic_publish(exchange='',
                          routing_key='hello',
                          body=message,
                          properties=pika.BasicProperties(delivery_mode=2)) #消息持久化
    print('send %s'%message)
    RabbitMQ_workqueue_procucer
    import time,pika
    connect = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connect.channel()
    channel.queue_declare(queue='hello',durable=True)#队列持久化
    print(' [*] Waiting for messages. To exit press CTRL+C')
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
        time.sleep(body.count(b'.'))
        print(" [x] Done")
        ch.basic_ack(delivery_tag = method.delivery_tag)  #消息回执
    channel.basic_qos(prefetch_count=1)  #限制消费者待处理任务个数
    channel.basic_consume(callback,
                          queue='hello'
                          )
    channel.start_consuming()
    RabbitMQ_workqueue_customer

     3.发布/订阅(publish/subscribe)

    我们在前面两部分将的都是将消息由生产者到消费者之间通过queue传递,现在将引入一个新的成员:exchange。

    其实生产者在发送的时候是不知道消息要发送给那个queue的,甚至他都不知道消息是由queue接收的。实际上生产者只是把message发送给了exchange。至于message后续的处理都是由exchange决定的。

    就像图上标示的,exchange在sender和queue之间起到了转呈的作用。

    按照工作方式,我们将exchange分成了fanoutdirecttopicheaders四种类型。

    fanout:所有绑定到这个exchange的队列都接收消息

    direct:通过routingKey和exchange决定的那个唯一的queue可以接收消息

    topic:所有符合routingKey(可以是表达式)的queue可以接收消息

      表达式说明:#表示一个或多个字符

                                *表示任何字符

            使用RoutingKey为#时相当于fanout

    headers:通过headers来决定把消息发送给哪些queue。

    在这个part我们来看fanout的作用。

    channel.exchange_declare(exchange='logs',
                             exchange_type='fanout') 

    我们定义一个exchange,名字随便起一个‘logs’,类型就声明为fanout。

    (在前面两节我们还没有引入exchange这个概念,就用了默认的exchange设置

    channel.basic_publish(exchange='',
                          routing_key='hello',
                          body='123')

    ,exchange=''空的字符串表示了默认的exchange或名字是空的,那exchange就把消息发送给routing_key指定的queue里(前提是这个queue是存在的),在声明了exchange以后,我们就可以用这个exchange发送消息了

    channel.basic_publish(exchange='logs',                #使用的exchange名称
                          routing_key='',                 #使用的队列名称
                          body='123')                     #消息内容

    注意到了一点没有?这里并没有定义队列的名称?为什么?在广播的时候是不用固定具体的哪个queue的,我们

    result = channel.queue_declare() #生成随机queue

    我们在消费端声明queue的时候可以生成一个随机的queue,这里还要加个命令

    result = channel.queue_declare(exclusive=True) 

    这个exclusive表示在连接在关闭以后这个queue直接被销毁掉。

    然后把这个queue绑定在转发器上。所有进入这个exchange的消息被发送给所有和他绑定的队列里。

    随机的queue已经声明了,现在就把他跟exchange绑定

    channel.queue_bind(exchange='logs',
                       queue=result.method.queue)#注意queue名的获取方法

    这就是最终的代码:

    import pika
    import sys
    connect = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connect.channel()
    channel.exchange_declare(exchange='logs',
                             exchange_type='fanout')   #logs 是随便起的名字,声明了exchange
    message = ' '.join(sys.argv[1:]) or 'info: Hello World!'
    channel.basic_publish(exchange='logs',
                          routing_key='',
                          body=message)
    fanout_publish
    import pika
    connect = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connect.channel()
    channel.exchange_declare(exchange='logs',exchange_type='fanout')
    result = channel.queue_declare(exclusive=True)   #exclusive 唯一的,为True时不指定queue名的化随机生成一个queue,
                                                        # 在断开连接后把queue删除,相当于生成一个随机queue
    channel.queue_bind(exchange='logs',
                       queue=result.method.queue) #绑定的是exchange对应的queue
    print('waiting for logs.')
    def callback(ch,method,preproteries,body):
        print('get data:%r'%body)
    channel.basic_consume(callback,
                          queue=result.method.queue,
                          no_ack=True)
    channel.start_consuming()
    fanout_customer

     总体看一下,发送端的代码跟前面的差不太多,最重要的差别就是把routingKey给忽略掉了,但是明确了exchange的对象。

    而接收方是在建立连接后要声明exchange,并且要和队列绑定。如果没有队列和exchange绑定,消息就被销毁了。这就是整个发送的过程

    还有一点,这个订阅——发布的模型就像电台和收音机一样,如果customer下线了是收不到信息的,消息也是在线发送的,并不会保存。

    4.routing(exchange type:direct)

    在这个过程中,我们大致了解了发布——订阅模型。其实就是在发送端定义了一个exchange,在接收端定义了一个队列,然后把这两者绑定,就OK了。可是我们现在只想订阅一部分有用的信息,比如只获取错误信息写到日志文件里,但同时又能将所有的信息都显示在控制台(或者terminal上)。

    上一节所讲述的bind,也可以简单的理解为这个queue对这个exchange的内容“感兴趣”。

    在binding的时候,还可以加一个routingKey的参数,这个参数就描述了queue对哪些exchange感兴趣。

    channel.queue_bind(exchange='logs',         #被绑定的exchange名
                       queue='queue_name',      #被绑定的queue名
                       routing_key='black')     #queue的‘兴趣爱好’

    对queue和exchange进行bind时,bind的参数主要取决于exchange的类型,比如在fanout模式下是不能有这个routingKey的,运行时候会报错。

    我们使用了fanout的发布订阅模式,在这个模式下接收端不能对信息进行一定原则的过滤,一股脑的照单全收,已经不能满足我们的要求了,现在就要用direct模式。

    在上面的图里,有两个queue分别和exchange绑定,Q1的routingKey是orange,Q2则有两个分别是black和green。在这个模型中,发布的消息关键字是orange则被分发到Q1内,而包含有black或green的则发给Q2.剩余的消息就被discard了。

    而在上图中,同样的key同多个队列进行绑定的方法也是合法的。所有包含关键字black的消息会被同时发送 给Q1和Q2。

    了解了上面所说的方法,我们来按照本节一开始的目标来修改下代码

    首先要声明exchange

    channel.exchange_declare(exchange='logs',
                             exchange_type='direct')#声明exchange,注意接收端的代码是一样的

    在发送的时候对消息进行分类

    serverity = sys.argv[1] if len(sys.argv)>1 else 'info'

    然后发送消息

    channel.basic_publish(exchange='logs',
                          routing_key=serverity,  #消息的分类
                          body=message)

    在接收端,我们用一个循环把所有的routingKey和queue绑定(有可能出现多个关键字和一个queue同时绑定的情况)

    servrities = sys.argv[1:]    #获取所有的关键字
    if not servrities:
        sys.stderr.write('Usage: %s [info] [warning] [error]
    '%sys.argv[0])
        sys.exit(1)              #关键字不存在打印提示后退出
    print('recived:%s'%servrities)
    for servrity in servrities:  #循环绑定
        channel.queue_bind(exchange='logs',
                           queue=queue_name,
                           routing_key=servrity)

    整个方案就是这样的

    我们启动两个terminal,按这样的方式启动

    python direct_consumer.py info error warning
    python direct_consumer.py error

     在分别发送

    python direct_publisher.py info 123
    python direct_publisher.py error 456
    python direct_publisher.py warning 789

    看看是什么效果

    是不是达到了订阅的效果!

    4.更加细致的消息过滤(topic模式)

    在上一节我们利用了direct的模式实现了初步的消息过滤,在这一节里,我们要看看如何实现如何实现更加细致的消息过滤,比如我在获取info的前提下还要知道哪些message是RabbitMQ发来的,哪些是Redis发来的,那怎么区分呢?

    就想这个图里的一样,我们在定义RoutingKey的时候利用了表达式,就像模糊查询一样其中

    *表示任意一个字符

    #表示0个或多个字符

    topic模式的代码和上一节的基本一致,只是改变了exchange的模式

    channel.exchange_declare(exchange='logs',
                             exchange_type='topic')#声明exchange

     启动terminal,输入指令

    python topic_customer kern.*                 可以接收以kern.开头的所有消息 kern.123 abc  接收到abc
    python topic_customer.py *.kern.*            中间包含.kern.的消息 123.kern.345 abc  接收到abc

     同时绑定多个关键字

    接收端
    d:pythonweek11>python topic_customer.py kern.* pip.* ['kern.*', 'pip.*'] [*] Waiting for logs. To exit press CTRL+C [x] 'pip.11':b'duziele' [x] 'kern.11':b'duziele'
    发送端
    d:pythonweek11>python topic_publisher.py pip.11 duziele
     [x] Sent 'pip.11':'duziele'
    d:pythonweek11>python topic_publisher.py kern.11 duziele
     [x] Sent 'kern.11':'duziele'

     还可以用#获取所有消息

    d:pythonweek11>python topic_customer.py #

     ps:#的作用我一直不大明白,我试过了

    d:pythonweek11>python topic_customer.py kern.#

    效果和kern.*是一样的。

    6.Remote procedure call(RPC)

           我们在前面的章节将到了在多个消费者之间分发耗时任务的方法,可是现在要实现这样的功能:调用远程的设备上的一个函数,然后等执行完毕返回结果。这样的工作模式就叫远程过程调用——Remote Procedure Call(RPC)。

      利用RabbitMQ也可以实现RPC的功能,为了能模拟这个过程,我们在server端设立一个fun:给定一个整数n,然后返回n对应的斐波那契数列。

    callback queue

      通过RabbitMQ实现RPC的方法很简单——客户端发送请求,服务端对请求响应然后把消息发送至叫callback的queue,过程类似这样

    result = channel.queue_declare(exclusive=True)
    callback_queue = result.method.queue
    
    channel.basic_publish(exchange='',
                          routing_key='rpc_queue',
                          properties=pika.BasicProperties(
                                reply_to = callback_queue,
                                ),
                          body=request)

    Correlation id

      我们刚才为每个请求对应的响应都声明了一个队列,但是在这里等待着结果的返回效率是不是太低了?还好有另外的一种方法:为每个客户端创建一个callback的队列。然而又引发了一个新问题:在这个队列里我不知道哪个响应是对应我这个请求的!这时候就到大神出马了——Correlation ID。对每个请求都设置一个唯一的ID,在callback的队列里通过查看属性来判断他对应哪个请求。如果出现没有对应的ID,安全起见我们还是把他忽略掉。

     

    总之我们的RPC的工作流程就是这样的:

    1.client启动,声明一个匿名的callback queue

    2.建立RPC请求,请求里除了消息还包含两个参数:a.replay_to(告诉server响应的结论callback的队列里)

                           b.correlation_id:每个请求都被赋予一个独一无二的值

    3.请求被发送给RPC_queue

    4.server等待queue里的消息,一旦出现请求,server响应请求并把结论发送给通过replay_to要求的queue里

    5.client在callback_queue里等待数据,一旦消息出现,他将correlation进行比对,如果相同就获取请求结果。

    import pika
    
    connect = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connect.channel()
    
    channel.queue_declare(queue='rpc_queue')
    
    def fib(n):
        if n == 0:
            return 0
        elif n == 1:return 1
        else:return fib(n-1)+fib(n-2)
    
    
    def on_request(ch,method,props,body):
        n = int(body)
        print('[.]fib(%s)'%n)
        response = fib(n)
        print(response)
    
        ch.basic_publish(exchange='',
                         routing_key=props.reply_to,
                         properties=pika.BasicProperties(
                             correlation_id=props.correlation_id),
                         body = str(response))
        print('send over')
        ch.basic_ack(delivery_tag=method.delivery_tag)   #确认client接收到消息
    
    channel.basic_qos(prefetch_count=1)   #限制消息处理个数
    channel.basic_consume(on_request,queue='rpc_queue')
    
    print('[x]Awaitiong RPC requests')
    channel.start_consuming()
    RPC_server
    import pika
    import uuid
    
    class FibonacciRpcClient(object):
        def __init__(self):
            self.connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
            self.channel = self.connection.channel()
            result = self.channel.queue_declare(exclusive=True)
            self.callback_queue = result.method.queue     #声明请求响应回执的queue
    
            self.channel.basic_consume(consumer_callback=self.on_response,
                                       queue=self.callback_queue,no_ack=True) #监听回执queue
    
        def on_response(self,ch,method,props,body):  #callback_queue的回调函数
            print(body)
            if self.corr_id == props.correlation_id:
                self.response = body
    
        def call(self,n):
            self.response = None
            self.corr_id = str(uuid.uuid4())
            self.channel.basic_publish(exchange='',
                                       routing_key='rpc_queue',
                                       properties=pika.BasicProperties   #发送回执消息的参数
                                       (reply_to=self.callback_queue,
                                        correlation_id=self.corr_id),
                                       body=str(n)
                                       )
            while self.response is None:
                self.connection.process_data_events()   #事件驱动,非阻塞版的start_consuming
            return int(self.response)
    
    
    fibonacci_rpc = FibonacciRpcClient()
    n = input('>>>')
    print('[x] Requesting fib(%s)'%n)
    response = fibonacci_rpc.call(n)
    print(response)
    RPC_client

     以上就是RabbitMQ的常规用法。

  • 相关阅读:
    【转】Scala基础知识
    Python知识之 方法与函数、偏函数、轮询和长轮询、流量削峰、乐观锁与悲观锁
    jQuery Ajax async=>false异步改为同步时,导致浏览器假死的处理方法
    Django框架之DRF APIView Serializer
    Python之虚拟环境virtualenv、pipreqs生成项目依赖第三方包
    celery 分布式异步任务框架(celery简单使用、celery多任务结构、celery定时任务、celery计划任务、celery在Django项目中使用Python脚本调用Django环境)
    微信推送功能实现
    Haystack搜索框架
    支付宝支付
    Redis初识01 (简介、安装、使用)
  • 原文地址:https://www.cnblogs.com/yinsedeyinse/p/10481056.html
Copyright © 2011-2022 走看看