zoukankan      html  css  js  c++  java
  • python操作 rabbitMQ

    rabbitMQ介绍及基本使用

    官方文档看这里:

    http://www.rabbitmq.com/getstarted.html

    一、队列的介绍

    1、什么是rabbitMQ?

       RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。

    MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

       本质:socket之间的通信。

    2、其他的消息队列?

      queue:内存中的,一般用于测试,将列表放入内存中。

      redis列表:非专业队列,可以做队列。

      rabbitMQ:专业队列,消息队列。可以做持久化,可以保证数据的安全。

      zeroMQ:基于内存的队列,比rabbitMQ更快,因为是 将数据放到内存。

     3、为什么要有消息队列?

      1 处理生产者消费者不对等的关系。

        可以根据生产者的多少(用户请求的多少),消费者动态的增加或减少。

      2 进行数据通信:

        - restfull  api 传送的通过http协议发送的json格式数据

        - webservice传送的通过http协议发送的xml格式数据(在rest api诞生之前做的web网站之间的进行数据交互是通过webservice做的,c#,java居多)

        - rpc(远程服务调用),基于socket并使用自己封装的协议进行数据传输,做数据交互。

          ‘处理者’在消息队列的一端等候任务到来,一旦有人发送请求就会被立即接收,并做处理。在消息队列旁再临时创建一个队列,发送请求之后就在这个队列一端进行等候,并通知处理者将消息处理完之后就放到这个队列中。任务处理完之后处理者将结果放到队列中,等发送者拿到消息之后就删除临时创建的队列。一般在公司内部做交互,通过rpc来进行通信,各个公司使用的协议不一定相同。升级版:给待处理信息写一个函数名,并传参数,让服务端找到这个函数,通过反射执行,获取结果,然后将结果放到队列中。

    二、rabbit MQ的安装

    服务端安装:

      

    安装配置epel源
       $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
     
    安装erlang
       $ yum -y install erlang
     
    安装RabbitMQ
       $ yum -y install rabbitmq-server
    

       运行:

        方式1:rabbitmq-server (hang住) ctrl+c停止

        方式2:systemctl start rabbitmq-server(在后台进程运行)

        默认无密码,如果有密码:

        1 sudo rabbitmqctl add_user wupeiqi 123
        # 设置用户为administrator角色
        2 sudo rabbitmqctl set_user_tags peiqi administrator
        # 设置权限
        3 sudo rabbitmqctl set_permissions -p "/" root ".*" ".*" ".*"

        #重启:

        4 systemctl restart rabbitmq-server

      

    客户端:

      pip3 install pika 

    三、使用

    基于Queue实现生产者消费者模型

    import Queue
    import threading
    
    
    message = Queue.Queue(10)
    
    
    def producer(i):
        while True:
            message.put(i)
    
    
    def consumer(i):
        while True:
            msg = message.get()
    
    
    for i in range(12):
        t = threading.Thread(target=producer, args=(i,))
        t.start()
    
    for i in range(10):
        t = threading.Thread(target=consumer, args=(i,))
        t.start()
    queue

    对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。

    import pika
    
    credentials = pika.PlainCredentials("root","123")
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.13.92',credentials=credentials))
    
    channel = connection.channel()
    
    # 创建一个队列:s91
    channel.queue_declare(queue='s91')
    
    
    # 向队列s91中发送一个 Hello World!
    channel.basic_publish(exchange='',routing_key='s91',body='66')
    
    connection.close()
    生产者
    import pika
    
    credentials = pika.PlainCredentials("root","123")
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.13.92',credentials=credentials))
    
    channel = connection.channel()
    channel.queue_declare(queue='s91')
    
    def callback(ch, method, properties, body):
        print(body)
    
    
    channel.basic_consume(callback,queue='s91',no_ack=True)
    
    channel.start_consuming()
    消费者 获取队列中的值
    ################生产者#########
    import pika
    #无密码
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    #创建队列
    channel.queue_declare(queue='hello')
    
    channel.basic_publish(exchange='',
                          routing_key='hello',
                          body='Hello World!')
    print(" [x] Sent 'Hello World!'")
    connection.close()
    
    ###############消费者#################
    #!/usr/bin/env python
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    
    channel.queue_declare(queue='hello')
    #进行回调
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
    
    channel.basic_consume(callback,
                          queue='hello',
                          no_ack=True)
    
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()
    

      

    1、acknowledgment 消息不丢失

    no-ack = False,如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。

    import pika
    
    credentials = pika.PlainCredentials("root","123")
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.13.92',credentials=credentials))
    
    channel = connection.channel()
    
    # 创建一个队列:s91
    channel.queue_declare(queue='s91')
    
    
    # 向队列s91中发送一个 Hello World!
    channel.basic_publish(exchange='',routing_key='s91',body='66')
    
    connection.close()
    生产者
    import pika
    
    credentials = pika.PlainCredentials("root","123")
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.13.92',credentials=credentials))
    channel = connection.channel()
    
    # channel.queue_declare(queue='s91')
    
    def callback(ch, method, properties, body):
        print(body)
        ## 进行确认
        ch.basic_ack(delivery_tag=method.delivery_tag)   
    
    channel.basic_consume(callback,queue='s91',no_ack=False)
    
    channel.start_consuming()
    消费者 进行确认

     2、durable   消息不丢失(服务端持久化)

      就算服务端或客户端挂掉,也没有关系,RabbitMQ会重新将该任务添加到队列中。

    import pika
    
    credentials = pika.PlainCredentials("root","123")
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.13.92',credentials=credentials))
    
    channel = connection.channel()
    
    # make message persistent
    channel.queue_declare(queue='s92', durable=True)
    
    channel.basic_publish(exchange='',
                          routing_key='s92',
                          body='Hello World!',
                          properties=pika.BasicProperties(
                              delivery_mode=2, # make message persistent
                          ))
    connection.close()
    生产者
    import pika
    
    credentials = pika.PlainCredentials("root","123")
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.13.92',credentials=credentials))
    
    channel = connection.channel()
    
    # make message persistent
    channel.queue_declare(queue='s92', durable=True)
    
    
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
    
        ch.basic_ack(delivery_tag = method.delivery_tag)
    
    channel.basic_consume(callback,queue='s92',no_ack=False)
    
    
    channel.start_consuming()
    消费者

    3、消息获取顺序

    默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者1去队列中获取 偶数 序列的任务。

    channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照奇偶数排列

    import pika
    
    credentials = pika.PlainCredentials("root","123")
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.13.92',credentials=credentials))
    
    channel = connection.channel()
    
    # make message persistent
    channel.queue_declare(queue='s92', durable=True)
    
    channel.basic_publish(exchange='',
                          routing_key='s92',
                          body='Hello World!',
                          properties=pika.BasicProperties(
                              delivery_mode=2, # make message persistent
                          ))
    connection.close()
    生产者
    import pika
    
    credentials = pika.PlainCredentials("root","123")
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.13.92',credentials=credentials))
    
    channel = connection.channel()
    
    # make message persistent
    channel.queue_declare(queue='s92', durable=True)
    
    
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
    
        ch.basic_ack(delivery_tag = method.delivery_tag)
    
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(callback,queue='s92',no_ack=False)
    
    
    channel.start_consuming()
    消费者

    4、发布订阅

    发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。

    可有多个订阅者

     exchange type = fanout

    import pika
    
    credentials = pika.PlainCredentials("root","123")
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.13.92',credentials=credentials))
    
    channel = connection.channel()
    
    channel.exchange_declare(exchange='e1',exchange_type='fanout')
    
    message = "Hello World!"
    
    channel.basic_publish(exchange='e1',routing_key='',body=message)
    
    connection.close()
    发布者
    import pika
    
    credentials = pika.PlainCredentials("root","123")
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.13.92',credentials=credentials))
    
    channel = connection.channel()
    
    channel.exchange_declare(exchange='e1',exchange_type='fanout')
    
    # 随机生成对列 名
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    
    # 让队列和e1绑定
    channel.queue_bind(exchange='e1',queue=queue_name)
    
    
    def callback(ch, method, properties, body):
        print(" [x] %r" % body)
    
    channel.basic_consume(callback,queue=queue_name,no_ack=True)
    
    channel.start_consuming()
    订阅者

    5、关键字发送

     exchange type = direct

    之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。

    import pika
    
    credentials = pika.PlainCredentials("root","123")
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.13.92',credentials=credentials))
    
    channel = connection.channel()
    
    channel.exchange_declare(exchange='e2',exchange_type='direct')
    
    message = "Hello World!"
    
    channel.basic_publish(exchange='e2',routing_key='error',body=message)
    
    connection.close()
    生产者
    import pika
    
    credentials = pika.PlainCredentials("root","123")
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.13.92',credentials=credentials))
    
    channel = connection.channel()
    
    channel.exchange_declare(exchange='e2',exchange_type='direct')
    
    # 随机生成对列名
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    
    # 让队列和e1绑定
    channel.queue_bind(exchange='e2',queue=queue_name,routing_key='info')
    channel.queue_bind(exchange='e2',queue=queue_name,routing_key='error')
    
    
    def callback(ch, method, properties, body):
        print(" [x] %r" % body)
    
    channel.basic_consume(callback,queue=queue_name,no_ack=True)
    
    channel.start_consuming()
    消费者
    import pika
    
    credentials = pika.PlainCredentials("root","123")
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.13.92',credentials=credentials))
    
    channel = connection.channel()
    
    channel.exchange_declare(exchange='e2',exchange_type='direct')
    
    # 随机生成对列名
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    
    # 让队列和e1绑定
    channel.queue_bind(exchange='e2',queue=queue_name,routing_key='error')
    
    
    def callback(ch, method, properties, body):
        print(" [x] %r" % body)
    
    channel.basic_consume(callback,queue=queue_name,no_ack=True)
    
    channel.start_consuming()
    消费者2

    6、模糊匹配

     exchange type = topic

    在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。

    • # 表示可以匹配 0 个 或 多个 单词
    • *  表示只能匹配 一个 单词
    1
    2
    3
    发送者路由值              队列中
    old.boy.python          old.*  -- 不匹配
    old.boy.python          old.#  -- 匹配
    import pika
    
    credentials = pika.PlainCredentials("root","123")
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.13.92',credentials=credentials))
    
    channel = connection.channel()
    
    channel.exchange_declare(exchange='e3',exchange_type='topic')
    
    message = "Hello World!"
    
    channel.basic_publish(exchange='e3',routing_key='info.xx.uu',body=message)
    
    connection.close()
    生产者
    import pika
    
    credentials = pika.PlainCredentials("root","123")
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.13.92',credentials=credentials))
    
    channel = connection.channel()
    
    channel.exchange_declare(exchange='e3',exchange_type='topic')
    
    # 随机生成对列名
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    
    # 让队列和e1绑定
    channel.queue_bind(exchange='e3',queue=queue_name,routing_key='info.*')
    
    
    
    def callback(ch, method, properties, body):
        print(" [x] %r" % body)
    
    channel.basic_consume(callback,queue=queue_name,no_ack=True)
    
    channel.start_consuming()
    消费者 可有多个

    注意:

    sudo rabbitmqctl add_user wupeiqi 123
    # 设置用户为administrator角色
    sudo rabbitmqctl set_user_tags wupeiqi administrator
    # 设置权限
    sudo rabbitmqctl set_permissions -p "/" root ".*" ".*" ".*"
    
    # 然后重启rabbiMQ服务
    sudo /etc/init.d/rabbitmq-server restart
     
    # 然后可以使用刚才的用户远程连接rabbitmq server了。
    
    
    ------------------------------
    credentials = pika.PlainCredentials("wupeiqi","123")
    
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.14.47',credentials=credentials))
    复制代码
    设置链接密码
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import pika
    from pika.adapters.blocking_connection import BlockingChannel
    
    credentials = pika.PlainCredentials("root", "123")
    
    conn = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.20', credentials=credentials))
    # 超时时间
    conn.add_timeout(5, lambda: channel.stop_consuming())
    
    channel = conn.channel()
    
    channel.queue_declare(queue='hello')
    
    
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
        channel.stop_consuming()
    
    
    channel.basic_consume(callback,
                          queue='hello',
                          no_ack=True)
    
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()
    设置超时时间

    终极归纳:

    使用:
    a. 普通消息队列

    b. 批量向多个队列中发送(订阅者发布者)

    c. 根据关键字匹配向队列中发送

    d. 模糊匹配向队列中发送
    1. exchange的作用?
    - exchange和队列进行绑定
    - 用户向队列发送数据时,无序再找队列,直接向exchange中发送即可。

    2. rabbitmq中有几种exchange?
    - fanout,只要绑定就发
    - dirct,确定关键字
    - topic,模糊匹配

    3. 消息持久化和ack
    - 服务端(durable)
    - 客户端(ack)

    4. 超时

    5. 消息顺序

    6. 其他消息队列?
    - queue
    - redis列表
    - kafka
    - zeromq

    7. 什么时候用过消息队列?
    - 防止消息堆积(消息提醒)
    - 订单处理(celery+消息队列)
     
  • 相关阅读:
    jQuery扩展extend一
    json对象的操作,json工具
    typeof操作符的返回值
    jacksonall的使用,解析json
    jQuery的扩展
    An Introduction to Computer Thinking
    lazy初始化和线程安全的单例模式
    Compiler Principle
    Lineare Algebra
    Mathematik
  • 原文地址:https://www.cnblogs.com/amyleell/p/9315196.html
Copyright © 2011-2022 走看看