zoukankan      html  css  js  c++  java
  • RabbitMQ队列

    RabbitMQ是什么?

    RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。

    MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

    RabbitMQ的安装

    首先说明,RabbitMQ在win上安装是一件颇为麻烦的事情。试了很长时间都没有成功,后来就转战linux了。在linux的安装中也可能会出现一点问题,下面会贴出一个网址有安装中出现问题的解决办法。

    linux上都是直接install rabbitmq-server

    当然可能会在安装中和后来的使用上出现这样或者是那样的问题,解决办法参见这篇博客http://www.cnblogs.com/kaituorensheng/p/4985767.html

    RabbitMQ的语法以及实例

    1.基本实例

    基于Queue实现生产者消费者模型

     1 import Queue
     2 import threading
     3 
     4 
     5 message = Queue.Queue(10)
     6 
     7 
     8 def producer(i):
     9     while True:
    10         message.put(i)
    11 
    12 
    13 def consumer(i):
    14     while True:
    15         msg = message.get()
    16 
    17 
    18 for i in range(12):
    19     t = threading.Thread(target=producer, args=(i,))
    20     t.start()
    21 
    22 for i in range(10):
    23     t = threading.Thread(target=consumer, args=(i,))
    24     t.start()
    View Code

    对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。

    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
    channel = connection.channel()#开通一个管道
    
    #声明queue
    channel.queue_declare(queue='hello')
    
    channel.basic_publish(exchange='',
                          routing_key='hello',#queue名字
                          body='Hello World!')#消息内容
    print(" [x] Sent 'Hello World!'")
    connection.close()
    import pika
    #建立连接
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
    channel = connection.channel()
    
    channel.queue_declare(queue='hello')
    
    
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
    
    
    channel.basic_consume(#消费消息
                          callback,#如果收到消息就调用callback函数处理消息
                          queue='hello',
                          no_ack=True)
    
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

    2.消息发布轮询

    • 上面的只是一个生产者、一个消费者,能不能一个生产者多个消费者呢? 
      可以上面的例子,多启动几个消费者consumer,看一下消息的接收情况。 
      采用轮询机制;把消息依次分发
    • 假如消费者处理消息需要15秒,如果当机了,那这个消息处理明显还没处理完,怎么处理? 
      (可以模拟消费端断了,分别注释和不注释 no_ack=True 看一下) 
      你没给我回复确认,就代表消息没处理完。
    • 上面的效果消费端断了就转到另外一个消费端去了,但是生产者怎么知道消费端断了呢? 
      因为生产者和消费者是通过socket连接的,socket断了,就说明消费端断开了。
    • 上面的模式只是依次分发,实际情况是机器配置不一样。怎么设置类似权重的操作?
      RabbitMQ怎么办呢,RabbitMQ做了简单的处理就能实现公平的分发。 
      就是RabbitMQ给消费者发消息的时候检测下消费者里的消息数量,如果超过指定值(比如1条),就不给你发了。 
      只需要在消费者端,channel.basic_consume前加上就可以了。
    channel.basic_qos(prefetch_count=1)  # 类似权重,按能力分发,如果有一个消息,就不在给你发

    3. acknowledgment 消息持久化

     no-ack = False

    如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。

    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
    
    channel.queue_declare(queue='hello')
    
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
        import time
        time.sleep(10)
        print ('ok')
        ch.basic_ack(delivery_tag = method.delivery_tag)
    
    channel.basic_consume(callback,
                          queue='hello',
                          no_ack=False)
    
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()
    生产者

    durable  

    如果队列里还有消息,RabbitMQ 服务端宕机了呢?消息还在不在? 
    把RabbitMQ服务重启,看一下消息在不在。 
    上面的情况下,宕机了,消息就久了,下面看看如何把消息持久化。 
    每次声明队列的时候,都加上durable,注意每个队列都得写,客户端、服务端声明的时候都得写。

    # 在管道里声明queue
    channel.queue_declare(queue='hello2', durable=True)

    durable的作用只是把队列持久化。离消息持久话还差一步: 
    发送端发送消息时,加上properties

    properties=pika.BasicProperties(
        delivery_mode=2,  # 消息持久化
        )
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
    channel = connection.channel()
    
    # make message persistent
    channel.queue_declare(queue='hello', durable=True)
    
    
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
        import time
        time.sleep(10)
        print 'ok'
        ch.basic_ack(delivery_tag = method.delivery_tag)
    
    channel.basic_consume(callback,
                          queue='hello',
                          no_ack=False)
    
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()
    
    消费者
    生产者
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
    channel = connection.channel()
    
    # make message persistent
    channel.queue_declare(queue='hello', durable=True)
    
    
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
        import time
        time.sleep(10)
        print( 'ok')
        ch.basic_ack(delivery_tag = method.delivery_tag)
    
    channel.basic_consume(callback,
                          queue='hello',
                          no_ack=False)
    
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()
    消费者

    4.消息获取顺序

    默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者2去队列中获取 偶数 序列的任务。

    channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照奇偶数排列

     1 #Auther: Xiaoliuer Li
     2 
     3 import pika
     4 
     5 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
     6 channel = connection.channel()
     7 
     8 # make message persistent
     9 channel.queue_declare(queue='hello')
    10 
    11 
    12 def callback(ch, method, properties, body):
    13     print(" [x] Received %r" % body)
    14     import time
    15     time.sleep(10)
    16     print ('ok')
    17     ch.basic_ack(delivery_tag = method.delivery_tag)
    18 
    19 channel.basic_qos(prefetch_count=1)
    20 
    21 channel.basic_consume(callback,
    22                       queue='hello',
    23                       no_ack=False)
    24 
    25 print(' [*] Waiting for messages. To exit press CTRL+C')
    26 channel.start_consuming()
    消费者

    5.发布订阅(广播模式)

    前面的效果都是一对一发,如果做一个广播效果可不可以,这时候就要用到exchange了 
    exchange必须精确的知道收到的消息要发给谁。exchange的类型决定了怎么处理, 
    类型有以下几种:

    • fanout: 所有绑定到此exchange的queue都可以接收消息
    • direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息
    • topic: 所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息

    fanout 纯广播、all

    发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。

    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='logs',
                             type='fanout')
    
    message = ' '.join(sys.argv[1:]) or "info: Hello World!"
    channel.basic_publish(exchange='logs',
                          routing_key='',
                          body=message)
    print(" [x] Sent %r" % message)
    connection.close()
    发布者
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='logs',
                             type='fanout')
    
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    
    channel.queue_bind(exchange='logs',
                       queue=queue_name)
    
    print(' [*] Waiting for logs. To exit press CTRL+C')
    
    def callback(ch, method, properties, body):
        print(" [x] %r" % body)
    
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)
    
    channel.start_consuming()
    订阅者

    注意:广播,是实时的,收不到就没了,消息不会存下来,类似收音机。

    direct 有选择的接收消息

     

    之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。

    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='direct_logs',
                             type='direct')
    
    severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
    message = ' '.join(sys.argv[2:]) or 'Hello World!'
    channel.basic_publish(exchange='direct_logs',
                          routing_key=severity,
                          body=message)
    print(" [x] Sent %r:%r" % (severity, message))
    connection.close()
    发送者
    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='direct_logs',
                             type='direct')
    
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    # 获取运行脚本所有的参数
    severities = sys.argv[1:]
    if not severities:
        sys.stderr.write("Usage: %s [info] [warning] [error]
    " % sys.argv[0])
        sys.exit(1)
    # 循环列表去绑定
    for severity in severities:
        channel.queue_bind(exchange='direct_logs',
                           queue=queue_name,
                           routing_key=severity)
    
    print(' [*] Waiting for logs. To exit press CTRL+C')
    
    def callback(ch, method, properties, body):
        print(" [x] %r:%r" % (method.routing_key, body))
    
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)
    
    channel.start_consuming()
    接收者

    运行接收端,指定接收级别的参数,例:

    python direct_sonsumer.py info warning 
    python direct_sonsumer.py warning error

    topic 更细致的过滤

    在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。

    • # 表示可以匹配 0 个 或 多个 单词
    • *  表示只能匹配 一个 单词
    发送者路由值              队列中
    old.boy.python          old.*  -- 不匹配
    old.boy.python          old.#  -- 匹配
    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='topic_logs',
                             type='topic')
    
    routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
    message = ' '.join(sys.argv[2:]) or 'Hello World!'
    channel.basic_publish(exchange='topic_logs',
                          routing_key=routing_key,
                          body=message)
    print(" [x] Sent %r:%r" % (routing_key, message))
    connection.close()
    生产者
    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='topic_logs',
                             type='topic')
    
    routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
    message = ' '.join(sys.argv[2:]) or 'Hello World!'
    channel.basic_publish(exchange='topic_logs',
                          routing_key=routing_key,
                          body=message)
    print(" [x] Sent %r:%r" % (routing_key, message))
    connection.close()
    消费者

    注意:

    sudo rabbitmqctl add_user alex 123
    # 设置用户为administrator角色
    sudo rabbitmqctl set_user_tags alex administrator
    # 设置权限
    sudo rabbitmqctl set_permissions -p "/" alex '.''.''.'
    
    # 然后重启rabbiMQ服务
    sudo /etc/init.d/rabbitmq-server restart
     
    # 然后可以使用刚才的用户远程连接rabbitmq server了。
    
    
    ------------------------------
    credentials = pika.PlainCredentials("alex","123")
    
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.14.47',credentials=credentials))
    View Code

    6.RabbitMQ RPC 实现(Remote procedure call)

    不知道你有没有发现,上面的流都是单向的,如果远程的机器执行完返回结果,就实现不了了。 
    如果返回,这种模式叫什么呢,RPC(远程过程调用),snmp就是典型的RPC 
    RabbitMQ能不能返回呢,怎么返回呢?既是发送端又是接收端。 
    但是接收端返回消息怎么返回?可以发送到发过来的queue里么?不可以。 
    返回时,再建立一个queue,把结果发送新的queue里 
    为了服务端返回的queue不写死,在客户端给服务端发指令的的时候,同时带一条消息说,你结果返回给哪个queue

    import pika
    import uuid
    import time
    
    class FibonacciRpcClient(object):
        def __init__(self):
            self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                    host='localhost'))
            self.channel = self.connection.channel()
            result = self.channel.queue_declare(exclusive=True)
            self.callback_queue = result.method.queue
    
            self.channel.basic_consume(self.on_response,  # 只要一收到消息就调用on_response
                                       no_ack=True,
                                       queue=self.callback_queue)  # 收这个queue的消息
    
        def on_response(self, ch, method, props, body):  # 必须四个参数
            # 如果收到的ID和本机生成的相同,则返回的结果就是我想要的指令返回的结果
            if self.corr_id == props.correlation_id:
                self.response = body
    
        def call(self, n):
            self.response = None  # 初始self.response为None
            self.corr_id = str(uuid.uuid4())  # 随机唯一字符串
            self.channel.basic_publish(
                    exchange='',
                    routing_key='rpc_queue',  # 发消息到rpc_queue
                    properties=pika.BasicProperties(  # 消息持久化
                        reply_to = self.callback_queue,  # 让服务端命令结果返回到callback_queue
                        correlation_id = self.corr_id,  # 把随机uuid同时发给服务器
                    ),
                    body=str(n)
            )
            while self.response is None:  # 当没有数据,就一直循环
                # 启动后,on_response函数接到消息,self.response 值就不为空了
                self.connection.process_data_events()  # 非阻塞版的start_consuming()
                # print("no msg……")
                # time.sleep(0.5)
            # 收到消息就调用on_response
            return int(self.response)
    
    if __name__ == '__main__':
        fibonacci_rpc = FibonacciRpcClient()
        print(" [x] Requesting fib(7)")
        response = fibonacci_rpc.call(7)
        print(" [.] Got %r" % response)
    RPC client
    import pika
    import time
    
    def fib(n):
        if n == 0:
            return 0
        elif n == 1:
            return 1
        else:
            return fib(n-1) + fib(n-2)
    
    def on_request(ch, method, props, body):
        n = int(body)
        print(" [.] fib(%s)" % n)
        response = fib(n)
    
        ch.basic_publish(
                exchange='',  # 把执行结果发回给客户端
                routing_key=props.reply_to,  # 客户端要求返回想用的queue
                # 返回客户端发过来的correction_id 为了让客户端验证消息一致性
                properties=pika.BasicProperties(correlation_id = props.correlation_id),
                body=str(response)
        )
        ch.basic_ack(delivery_tag = method.delivery_tag)  # 任务完成,告诉客户端
    
    if __name__ == '__main__':
        connection = pika.BlockingConnection(pika.ConnectionParameters(
                host='localhost'))
        channel = connection.channel()
        channel.queue_declare(queue='rpc_queue')  # 声明一个rpc_queue ,
    
        channel.basic_qos(prefetch_count=1)
        # 在rpc_queue里收消息,收到消息就调用on_request
        channel.basic_consume(on_request, queue='rpc_queue')
        print(" [x] Awaiting RPC requests")
        channel.start_consuming()
    RPC server
  • 相关阅读:
    第6课:datetime模块、操作数据库、__name__、redis、mock接口
    第5课:内置函数、处理json、常用模块
    第4课:函数、模块、集合
    第3课:列表和字典的增删改查、字符串常用方法、文件操作
    第2课:jmeter总结、Charles抓包
    第1课:接口测试和jmeter总结
    Centos下找不到eth0设备的解决方法
    Python安装、配置
    改进的SMO算法
    拉普拉斯特征图降维及其python实现
  • 原文地址:https://www.cnblogs.com/lixiaoliuer/p/6846063.html
Copyright © 2011-2022 走看看