zoukankan      html  css  js  c++  java
  • Python之RabbitMQ的使用

    今天总结一下Python关于Rabbitmq的使用

      RabbitMQ官网说明,其实也是一种队列,那和前面说的线程queue和进程queue有什么区别呢?

        线程queue只能在同一个进程下进行数据交互

        进程queue只能在父进程和子进程之间,或者同一父进程下的子进程之间做数据交互

        如果需要对不同进程(eg:微信和qq)两个独立的程序间通信

    方法1就是直接把数据写在硬盘(disk)上然后各自的进程读取数据就可以,但是由于硬盘的读写速度太慢,效率太低

    方法2自己写个socket,直接做数据交互,问题是如果改变程序,或者再加一个程序,需要对写好的socket进行修改,还要处理黏包什么的复杂的连接关系,维护成本太高。

    方法3,利用已有的中间商(代理)。这个broker其实就是封装好的socket,我们拿来直接用就好了。

       这里的broker,就有RabbitMQ,ZeroMQ,ActiveMQ等等等等。

    一.安装及环境配置

        windows的安装和配置方法较为简单,直接安装就好了

        Rabbit支持多种语言: Java, .NET, PHP, Python, JavaScript, Ruby, Go这些常用语言都支持


    如图所示,python操作RabbitMQ需要的模块有上述几种选择,我们用最简单的pika,用pip直接安装

    pip install pika

    二.RabbitMQ的使用

    这里所有的用法都是基于RabbitMQ是工作在‘localhost’上,并且端口号为15672,能在浏览器里访问http://localhost:15672这个地址。

    1.消息分发(基础版)

    这就是RabbitMQ最简单的工作模式,p为生产者(Producer),生产者发送message给queue,queue再把消息发送至消费者c(Customer)

    先看看生产者至队列(send)这个过程

    import pika
    connect = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connect.channel()

    我们先建立了一个链接,然后就需要定义一个队列,队列的名字就暂时定位‘hello'

    channel.queue_declare(queue='hello')

    在RabbitMQ里消息并不能直接发送给队列,所有的信息发送都要通过一个exchange,但是这里我们先把这个exchange定义成一个空的字符串,后面在将他的具体用法

    channel.basic_publish(exchange='',
                          routing_key='hello',
                          body='123')

    在发送确认完成后,可以将连接关闭

    connect.close()

    这就是send端的代码

    import pika
    connect = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connect.channel()
    channel.queue_declare(queue='hello')
    channel.basic_publish(exchange='',
                          routing_key='hello',
                          body='123')
    print("[x] Sent 'hello world!'")
    connect.close()
    RabbitMQ_basic_producer

     运行了send代码后我们可以在terminal里RabbitMQ安装目录下sbin文件夹里查看一下消息队列

    rabbitmqctl.bat list_queues 

    如果是Linux命令为

    sudo rabbitmqctl list_queues

    这里就说明了队列信息和消息状态。

    然后再看一下消费者这一端的代码是什么样的,同样,先要建立连接并定义好队列名

    connect = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connect.channel()
    channel.queue_declare(queue='hello')

    这里可能有个疑问:我们不是在生产者里已经定义了队列名吗?为什么在消费者里还要定义呢?

    因为在实际工作中,我们并不能确定是生产者还是消费者先一步运行,如果队列名没有定义的话运行时候是会报错的。下面就是对消息的处理

    def callback(ch,method,properties,body):
        print("[x] Received %r"%body)
    
    channel.basic_consume(callback,
                          queue='hello',
                          no_ack=True)

    当消息来临时,消费者会执行回调函数callback。这里的callback就是直接打印消息内容(body)。

    回调函数另外的几个参数:chconne.channel的内存对象地址,

    <BlockingChannel impl=<Channel number=1 OPEN conn=<SelectConnection OPEN socket=('::1', 62145, 0, 0)->('::1', 5672, 0, 0) params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>>

    method是包含了发送的信息

    <Basic.Deliver(['consumer_tag=ctag1.9ae48c906b014a83a512413c0e6f9ef8', 'delivery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=hello'])>

     properties我们以后再了解。

    2.公平分发(workqueue)

      在这种结构里,我们要考虑到这样一种情况:有多个消费者,消费者在得到消息时需要对消息进行处理,并且有可能处理消息所消耗的时间是不同的

    。这里我们用的queue叫做workqueue

      为了模拟消费者对消息进行处理的过程,我们用time.sleep()做一个消耗时间的过程。消息的产生和接收是这样的

    message = ' '.join(sys.argv[1:]) or "Hello World!"
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
        time.sleep(body.count(b'.'))
        print(" [x] Done")

    这里插播一条''.join(sys.argv[1:])的作用:就是把在shell里输入的指令后跟的代码加在message里。消费者得到消息后,数消息里有几个“.”,sleep相应的秒数。

     放出第一版的代码

    import pika,sys
    connect = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connect.channel()
    channel.queue_declare(queue='hello')
    message = ' '.join(sys.argv[1:]) or "Hello World."
    channel.basic_publish(exchange='',
                          routing_key='hello',
                          body=message)
    print('send %s'%message)
    RabbitMQ生产者
    import time,pika
    
    connect = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connect.channel()
    channel.queue_declare(queue='hello')
    
    print(' [*] Waiting for messages. To exit press CTRL+C')
    
    
    
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
        time.sleep(body.count(b'.'))
        print(" [x] Done")
        ch.basic_ack(delivery_tag = method.delivery_tag)
    
    channel.basic_consume(callback,
                          queue='hello')
    channel.start_consuming()
    RabbbitMQ消费者

     这时候,我们可以多启动几个消费者,再用生产者发送消息,看看效果!

     

    可以发现,消息是被公平的依次被分发给各个消费者的(Fair dispatch),这种分发的方式叫轮询。

    消息确认message acknowledgments  

    现在考虑下这种情形:消费者在处理消息时需要较长的时间,在这时把这个消费者kill掉,正在处理的消息和已经接收但未被处理的消息就丢失了。这应该是不允许的,我们可不希望有数据丢失,就需要将这些任务重新发送给其他正常工作的消费者。

    为了保证任务不丢失,RabbitMQ支持使用message acknowledgments,消费者在完成任务后会给RabbitMQ发送个消息,告诉他活已经干完了,RabbitMQ就会把这个任务给释放掉。而当出现消费者宕机、掉线等情况时,RabbitMQ会重新把这个任务发送给其他的消费者。

    往回看看上文说到的no_ack,这个值默认的是False,RabbitMQ是不主动销毁消息的所以我们一看看在这里把值置为True。

    channel.basic_consume(callback,
                          queue='hello',
                          no_ack=True)

    这样只要消费者接收到消息,RabbitMQ就直接销毁掉这个消息,就成了手动确认。我们要想实现刚才说的消息不丢失,就要这样定义

    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
        # time.sleep(body.count(b'.'))
        time.sleep(10)     #修改了一下,在延时的10s把消费者断掉
        print(" [x] Done")
        ch.basic_ack(delivery_tag = method.delivery_tag)
    channel.basic_consume(callback,
                          queue='hello')#这里的no_ack默认为False

    这样,当一个消费者宕机了,RabbitMQ就会直接把任务拍个下一个消费者。

     消息持久化

    刚才通过了消息确认,我们保证了消费者在掉线的时候任务不丢失,可是还有一个问题,如果RabbitMQ如果断掉(或者服务重启)了,里面的任务(包括所有queue和exchange依旧会丢失)这时候我们可以用到——消息持久化Message durability

    channel.queue_declare(queue='hello',durable=True)#将队列持久化(只保存了队列)
    channel.basic_publish(exchange='',
                          routing_key='hello',
                          body=message,
                          properties=pika.BasicProperties(delivery_mode=2))#保持消息持久化

    必须同时将队列和消息持久化,可以保证RabbitMQ服务在重启后任务还存在。

    注意几点:

    1.如果只持久化了消息,服务重启后消息丢失

    2.如果只持久化了队列,服务重启后队列还在,但消息丢失

    3.在持久化队列的时候要保持生产者和消费者的一致性

    最后一点,因为有可能每个消费者处理信息的能力不一样,如果按公平分发的化有可能导致负载不平衡,旱的旱死、涝的涝死。为避免这种情况发生还有一个知识点

    channel.basic_qos(prefetch_count=1)

    用这个语句限制了消费者待处理信息的个数

    workQueue的终极代码

    import pika,sys
    connect = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connect.channel()
    channel.queue_declare(queue='hello',durable=True)#队列持久化
    message = ' '.join(sys.argv[1:]) or "Hello World2."
    channel.basic_publish(exchange='',
                          routing_key='hello',
                          body=message,
                          properties=pika.BasicProperties(delivery_mode=2)) #消息持久化
    print('send %s'%message)
    RabbitMQ_workqueue_procucer
    import time,pika
    connect = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connect.channel()
    channel.queue_declare(queue='hello',durable=True)#队列持久化
    print(' [*] Waiting for messages. To exit press CTRL+C')
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
        time.sleep(body.count(b'.'))
        print(" [x] Done")
        ch.basic_ack(delivery_tag = method.delivery_tag)  #消息回执
    channel.basic_qos(prefetch_count=1)  #限制消费者待处理任务个数
    channel.basic_consume(callback,
                          queue='hello'
                          )
    channel.start_consuming()
    RabbitMQ_workqueue_customer

     3.发布/订阅(publish/subscribe)

    我们在前面两部分将的都是将消息由生产者到消费者之间通过queue传递,现在将引入一个新的成员:exchange。

    其实生产者在发送的时候是不知道消息要发送给那个queue的,甚至他都不知道消息是由queue接收的。实际上生产者只是把message发送给了exchange。至于message后续的处理都是由exchange决定的。

    就像图上标示的,exchange在sender和queue之间起到了转呈的作用。

    按照工作方式,我们将exchange分成了fanoutdirecttopicheaders四种类型。

    fanout:所有绑定到这个exchange的队列都接收消息

    direct:通过routingKey和exchange决定的那个唯一的queue可以接收消息

    topic:所有符合routingKey(可以是表达式)的queue可以接收消息

      表达式说明:#表示一个或多个字符

                                *表示任何字符

            使用RoutingKey为#时相当于fanout

    headers:通过headers来决定把消息发送给哪些queue。

    在这个part我们来看fanout的作用。

    channel.exchange_declare(exchange='logs',
                             exchange_type='fanout') 

    我们定义一个exchange,名字随便起一个‘logs’,类型就声明为fanout。

    (在前面两节我们还没有引入exchange这个概念,就用了默认的exchange设置

    channel.basic_publish(exchange='',
                          routing_key='hello',
                          body='123')

    ,exchange=''空的字符串表示了默认的exchange或名字是空的,那exchange就把消息发送给routing_key指定的queue里(前提是这个queue是存在的),在声明了exchange以后,我们就可以用这个exchange发送消息了

    channel.basic_publish(exchange='logs',                #使用的exchange名称
                          routing_key='',                 #使用的队列名称
                          body='123')                     #消息内容

    注意到了一点没有?这里并没有定义队列的名称?为什么?在广播的时候是不用固定具体的哪个queue的,我们

    result = channel.queue_declare() #生成随机queue

    我们在消费端声明queue的时候可以生成一个随机的queue,这里还要加个命令

    result = channel.queue_declare(exclusive=True) 

    这个exclusive表示在连接在关闭以后这个queue直接被销毁掉。

    然后把这个queue绑定在转发器上。所有进入这个exchange的消息被发送给所有和他绑定的队列里。

    随机的queue已经声明了,现在就把他跟exchange绑定

    channel.queue_bind(exchange='logs',
                       queue=result.method.queue)#注意queue名的获取方法

    这就是最终的代码:

    import pika
    import sys
    connect = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connect.channel()
    channel.exchange_declare(exchange='logs',
                             exchange_type='fanout')   #logs 是随便起的名字,声明了exchange
    message = ' '.join(sys.argv[1:]) or 'info: Hello World!'
    channel.basic_publish(exchange='logs',
                          routing_key='',
                          body=message)
    fanout_publish
    import pika
    connect = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connect.channel()
    channel.exchange_declare(exchange='logs',exchange_type='fanout')
    result = channel.queue_declare(exclusive=True)   #exclusive 唯一的,为True时不指定queue名的化随机生成一个queue,
                                                        # 在断开连接后把queue删除,相当于生成一个随机queue
    channel.queue_bind(exchange='logs',
                       queue=result.method.queue) #绑定的是exchange对应的queue
    print('waiting for logs.')
    def callback(ch,method,preproteries,body):
        print('get data:%r'%body)
    channel.basic_consume(callback,
                          queue=result.method.queue,
                          no_ack=True)
    channel.start_consuming()
    fanout_customer

     总体看一下,发送端的代码跟前面的差不太多,最重要的差别就是把routingKey给忽略掉了,但是明确了exchange的对象。

    而接收方是在建立连接后要声明exchange,并且要和队列绑定。如果没有队列和exchange绑定,消息就被销毁了。这就是整个发送的过程

    还有一点,这个订阅——发布的模型就像电台和收音机一样,如果customer下线了是收不到信息的,消息也是在线发送的,并不会保存。

    4.routing(exchange type:direct)

    在这个过程中,我们大致了解了发布——订阅模型。其实就是在发送端定义了一个exchange,在接收端定义了一个队列,然后把这两者绑定,就OK了。可是我们现在只想订阅一部分有用的信息,比如只获取错误信息写到日志文件里,但同时又能将所有的信息都显示在控制台(或者terminal上)。

    上一节所讲述的bind,也可以简单的理解为这个queue对这个exchange的内容“感兴趣”。

    在binding的时候,还可以加一个routingKey的参数,这个参数就描述了queue对哪些exchange感兴趣。

    channel.queue_bind(exchange='logs',         #被绑定的exchange名
                       queue='queue_name',      #被绑定的queue名
                       routing_key='black')     #queue的‘兴趣爱好’

    对queue和exchange进行bind时,bind的参数主要取决于exchange的类型,比如在fanout模式下是不能有这个routingKey的,运行时候会报错。

    我们使用了fanout的发布订阅模式,在这个模式下接收端不能对信息进行一定原则的过滤,一股脑的照单全收,已经不能满足我们的要求了,现在就要用direct模式。

    在上面的图里,有两个queue分别和exchange绑定,Q1的routingKey是orange,Q2则有两个分别是black和green。在这个模型中,发布的消息关键字是orange则被分发到Q1内,而包含有black或green的则发给Q2.剩余的消息就被discard了。

    而在上图中,同样的key同多个队列进行绑定的方法也是合法的。所有包含关键字black的消息会被同时发送 给Q1和Q2。

    了解了上面所说的方法,我们来按照本节一开始的目标来修改下代码

    首先要声明exchange

    channel.exchange_declare(exchange='logs',
                             exchange_type='direct')#声明exchange,注意接收端的代码是一样的

    在发送的时候对消息进行分类

    serverity = sys.argv[1] if len(sys.argv)>1 else 'info'

    然后发送消息

    channel.basic_publish(exchange='logs',
                          routing_key=serverity,  #消息的分类
                          body=message)

    在接收端,我们用一个循环把所有的routingKey和queue绑定(有可能出现多个关键字和一个queue同时绑定的情况)

    servrities = sys.argv[1:]    #获取所有的关键字
    if not servrities:
        sys.stderr.write('Usage: %s [info] [warning] [error]
    '%sys.argv[0])
        sys.exit(1)              #关键字不存在打印提示后退出
    print('recived:%s'%servrities)
    for servrity in servrities:  #循环绑定
        channel.queue_bind(exchange='logs',
                           queue=queue_name,
                           routing_key=servrity)

    整个方案就是这样的

    我们启动两个terminal,按这样的方式启动

    python direct_consumer.py info error warning
    python direct_consumer.py error

     在分别发送

    python direct_publisher.py info 123
    python direct_publisher.py error 456
    python direct_publisher.py warning 789

    看看是什么效果

    是不是达到了订阅的效果!

    4.更加细致的消息过滤(topic模式)

    在上一节我们利用了direct的模式实现了初步的消息过滤,在这一节里,我们要看看如何实现如何实现更加细致的消息过滤,比如我在获取info的前提下还要知道哪些message是RabbitMQ发来的,哪些是Redis发来的,那怎么区分呢?

    就想这个图里的一样,我们在定义RoutingKey的时候利用了表达式,就像模糊查询一样其中

    *表示任意一个字符

    #表示0个或多个字符

    topic模式的代码和上一节的基本一致,只是改变了exchange的模式

    channel.exchange_declare(exchange='logs',
                             exchange_type='topic')#声明exchange

     启动terminal,输入指令

    python topic_customer kern.*                 可以接收以kern.开头的所有消息 kern.123 abc  接收到abc
    python topic_customer.py *.kern.*            中间包含.kern.的消息 123.kern.345 abc  接收到abc

     同时绑定多个关键字

    接收端
    d:pythonweek11>python topic_customer.py kern.* pip.* ['kern.*', 'pip.*'] [*] Waiting for logs. To exit press CTRL+C [x] 'pip.11':b'duziele' [x] 'kern.11':b'duziele'
    发送端
    d:pythonweek11>python topic_publisher.py pip.11 duziele
     [x] Sent 'pip.11':'duziele'
    d:pythonweek11>python topic_publisher.py kern.11 duziele
     [x] Sent 'kern.11':'duziele'

     还可以用#获取所有消息

    d:pythonweek11>python topic_customer.py #

     ps:#的作用我一直不大明白,我试过了

    d:pythonweek11>python topic_customer.py kern.#

    效果和kern.*是一样的。

    6.Remote procedure call(RPC)

           我们在前面的章节将到了在多个消费者之间分发耗时任务的方法,可是现在要实现这样的功能:调用远程的设备上的一个函数,然后等执行完毕返回结果。这样的工作模式就叫远程过程调用——Remote Procedure Call(RPC)。

      利用RabbitMQ也可以实现RPC的功能,为了能模拟这个过程,我们在server端设立一个fun:给定一个整数n,然后返回n对应的斐波那契数列。

    callback queue

      通过RabbitMQ实现RPC的方法很简单——客户端发送请求,服务端对请求响应然后把消息发送至叫callback的queue,过程类似这样

    result = channel.queue_declare(exclusive=True)
    callback_queue = result.method.queue
    
    channel.basic_publish(exchange='',
                          routing_key='rpc_queue',
                          properties=pika.BasicProperties(
                                reply_to = callback_queue,
                                ),
                          body=request)

    Correlation id

      我们刚才为每个请求对应的响应都声明了一个队列,但是在这里等待着结果的返回效率是不是太低了?还好有另外的一种方法:为每个客户端创建一个callback的队列。然而又引发了一个新问题:在这个队列里我不知道哪个响应是对应我这个请求的!这时候就到大神出马了——Correlation ID。对每个请求都设置一个唯一的ID,在callback的队列里通过查看属性来判断他对应哪个请求。如果出现没有对应的ID,安全起见我们还是把他忽略掉。

     

    总之我们的RPC的工作流程就是这样的:

    1.client启动,声明一个匿名的callback queue

    2.建立RPC请求,请求里除了消息还包含两个参数:a.replay_to(告诉server响应的结论callback的队列里)

                           b.correlation_id:每个请求都被赋予一个独一无二的值

    3.请求被发送给RPC_queue

    4.server等待queue里的消息,一旦出现请求,server响应请求并把结论发送给通过replay_to要求的queue里

    5.client在callback_queue里等待数据,一旦消息出现,他将correlation进行比对,如果相同就获取请求结果。

    import pika
    
    connect = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connect.channel()
    
    channel.queue_declare(queue='rpc_queue')
    
    def fib(n):
        if n == 0:
            return 0
        elif n == 1:return 1
        else:return fib(n-1)+fib(n-2)
    
    
    def on_request(ch,method,props,body):
        n = int(body)
        print('[.]fib(%s)'%n)
        response = fib(n)
        print(response)
    
        ch.basic_publish(exchange='',
                         routing_key=props.reply_to,
                         properties=pika.BasicProperties(
                             correlation_id=props.correlation_id),
                         body = str(response))
        print('send over')
        ch.basic_ack(delivery_tag=method.delivery_tag)   #确认client接收到消息
    
    channel.basic_qos(prefetch_count=1)   #限制消息处理个数
    channel.basic_consume(on_request,queue='rpc_queue')
    
    print('[x]Awaitiong RPC requests')
    channel.start_consuming()
    RPC_server
    import pika
    import uuid
    
    class FibonacciRpcClient(object):
        def __init__(self):
            self.connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
            self.channel = self.connection.channel()
            result = self.channel.queue_declare(exclusive=True)
            self.callback_queue = result.method.queue     #声明请求响应回执的queue
    
            self.channel.basic_consume(consumer_callback=self.on_response,
                                       queue=self.callback_queue,no_ack=True) #监听回执queue
    
        def on_response(self,ch,method,props,body):  #callback_queue的回调函数
            print(body)
            if self.corr_id == props.correlation_id:
                self.response = body
    
        def call(self,n):
            self.response = None
            self.corr_id = str(uuid.uuid4())
            self.channel.basic_publish(exchange='',
                                       routing_key='rpc_queue',
                                       properties=pika.BasicProperties   #发送回执消息的参数
                                       (reply_to=self.callback_queue,
                                        correlation_id=self.corr_id),
                                       body=str(n)
                                       )
            while self.response is None:
                self.connection.process_data_events()   #事件驱动,非阻塞版的start_consuming
            return int(self.response)
    
    
    fibonacci_rpc = FibonacciRpcClient()
    n = input('>>>')
    print('[x] Requesting fib(%s)'%n)
    response = fibonacci_rpc.call(n)
    print(response)
    RPC_client

     以上就是RabbitMQ的常规用法。

  • 相关阅读:
    python模块--time模块
    python模块--如何相互调用自己写的模块
    Animating Views Using Scenes and Transitions
    fragment 切换
    android textview 设置text 字体
    android intent 5.1
    android EditView ime
    animation of android (4)
    animation of android (3)
    animation of android (2)
  • 原文地址:https://www.cnblogs.com/yinsedeyinse/p/10481056.html
Copyright © 2011-2022 走看看