zoukankan      html  css  js  c++  java
  • python RabbitMQ队列使用(入门篇)

    ---恢复内容开始---

    python RabbitMQ队列使用

    关于python的queue介绍

    关于python的队列,内置的有两种,一种是线程queue,另一种是进程queue,但是这两种queue都是只能在同一个进程下的线程间或者父进程与子进程之间进行队列通讯,并不能进行程序与程序之间的信息交换,这时候我们就需要一个中间件,来实现程序之间的通讯。

    RabbitMQ

    MQ并不是python内置的模块,而是一个需要你额外安装(ubunto可直接apt-get其余请自行百度。)的程序,安装完毕后可通过python中内置的pika模块来调用MQ发送或接收队列请求。接下来我们就看几种python调用MQ的模式(作者自定义中文形象的模式名称)与方法。

     RabbitMQ设置远程链接账号密码

    启动rabbitmq web服务:
    2.远程访问rabbitmq:自己增加一个用户,步骤如下:
    l1. 创建一个admin用户:sudo rabbitmqctl add_user admin 123123
    l2. 设置该用户为administrator角色:sudo rabbitmqctl set_user_tags admin administrator
    l3. 设置权限:sudo rabbitmqctl set_permissions -p '/' admin '.' '.' '.'
    l4. 重启rabbitmq服务:sudo service rabbitmq-server restart
    之后就能用admin用户远程连接rabbitmq server了。

    轮询消费模式

    此模式下,发送队列的一方把消息存入mq的指定队列后,若有消费者端联入相应队列,即会获取到消息,并且队列中的消息会被消费掉。

    若有多个消费端同时连接着队列,则会已轮询的方式将队列中的消息消费掉。

    接下来是代码实例:

    producer生产者

    # !/usr/bin/env python
    import pika
    credentials = pika.PlainCredentials('admin','123456')
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        '192.168.56.19',5672,'/',credentials))
    channel = connection.channel()
    
    # 声明queue
    channel.queue_declare(queue='balance')
    
    # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
    channel.basic_publish(exchange='',
                          routing_key='balance',
                          body='Hello World!')
    print(" [x] Sent 'Hello World!'")
    connection.close()

    发送过队列后,可在MQ服务器中查看队列状态

    [root@localhost ~]# rabbitmqctl list_queues
    Listing queues ...
    hello    1

    consumer消费者

    # _*_coding:utf-8_*_
    __author__ = 'Alex Li'
    import pika
    
    credentials = pika.PlainCredentials('admin','123456')
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        '192.168.56.19',5672,'/',credentials))
    channel = connection.channel()
    
    # You may ask why we declare the queue again ‒ we have already declared it in our previous code.
    # We could avoid that if we were sure that the queue already exists. For example if send.py program
    # was run before. But we're not yet sure which program to run first. In such cases it's a good
    # practice to repeat declaring the queue in both programs.
    channel.queue_declare(queue='balance')
    
    
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
    
    
    channel.basic_consume(callback,
                          queue='balance',
                          no_ack=True)
    
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

    接收队列后,查看一下队列状态

    [root@localhost ~]#  rabbitmqctl list_queues
    Listing queues ...
    hello    0

    队列持久化

    当rabbitMQ意外宕机时,可能会有持久化保存队列的需求(队列中的消息不消失)。

    producer

    # Cheng
    # !/usr/bin/env python
    import pika
    
    credentials = pika.PlainCredentials('admin','123456')
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        '192.168.56.19',5672,'/',credentials))
    channel = connection.channel()
    
    # 声明queue
    channel.queue_declare(queue='durable',durable=True)
    
    # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
    channel.basic_publish(exchange='',
                          routing_key='durable',
                          body='Hello cheng!',
                          properties=pika.BasicProperties(
                              delivery_mode=2,  # make message persistent
                          )
                          )
    print(" [x] Sent 'Hello cheng!'")
    connection.close()

    执行后查看队列,记下队列名字与队列中所含消息的数量

    [root@localhost ~]# rabbitmqctl list_queues
    Listing queues ...
    durable    1
    #重启rabbitmq
    [root@localhost
    ~]# systemctl restart rabbitmq-server
    #重启完毕后再次查看 [root@localhost
    ~]# rabbitmqctl list_queues Listing queues ... durable #队列以及消息并未消失

    执行消费者代码

    cunsumer

    # Cheng
    # _*_coding:utf-8_*_
    __author__ = 'Alex Li'
    import pika
    
    credentials = pika.PlainCredentials('admin','123456')
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        '192.168.56.19',5672,'/',credentials))
    channel = connection.channel()
    
    # You may ask why we declare the queue again ‒ we have already declared it in our previous code.
    # We could avoid that if we were sure that the queue already exists. For example if send.py program
    # was run before. But we're not yet sure which program to run first. In such cases it's a good
    # practice to repeat declaring the queue in both programs.
    channel.queue_declare(queue='durable',durable=True)
    
    
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    channel.basic_consume(callback,
                          queue='durable',
                          #no_ack=True
                          )
    
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

    可正确接收到信息。

    再次查看队列的情况。

    [root@localhost ~]# rabbitmqctl list_queues
    Listing queues ...
    durable    0

    广播模式

    当producer发送消息到队列后,所有的consumer都会收到消息,需要注意的是,此模式下producer与concerned之间的关系类似与广播电台与收音机,如果广播后收音机没有接受到,那么消息就会丢失。

    建议先执行concerned

    concerned

    # _*_coding:utf-8_*_
    __author__ = 'Alex Li'
    import pika
    
    credentials = pika.PlainCredentials('admin','123456')
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        '192.168.56.19',5672,'/',credentials))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='Clogs',
                             type='fanout')
    
    result = channel.queue_declare(exclusive=True)  # 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
    queue_name = result.method.queue
    
    channel.queue_bind(exchange='Clogs',
                       queue=queue_name)
    
    print(' [*] Waiting for logs. To exit press CTRL+C')
    
    
    def callback(ch, method, properties, body):
        print(" [x] %r" % body)
    
    
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)
    
    channel.start_consuming()

    producer

    import pika
    import sys
    
    credentials = pika.PlainCredentials('admin','123456')
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        '192.168.56.19',5672,'/',credentials))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='Clogs',
                             type='fanout')
    
    message = ' '.join(sys.argv[1:]) or "info: Hello World!"
    channel.basic_publish(exchange='Clogs',
                          routing_key='',
                          body=message)
    print(" [x] Sent %r" % message)
    connection.close()

    谢土豪

    如果有帮到你的话,请赞赏我吧!

  • 相关阅读:
    科学家质疑当今商用量子计算机的性能
    科学家研制出可模拟大脑信息处理的微芯片
    2014年电子科技市场衰退
    号外!CentOS 宣布加入红帽公司!
    hadoop,高富帅的玩具?
    成为Linux内核高手的四个方法
    分阶段事件驱动架构【SEDA】
    原型程式设计【原型语言】
    IOS7.1 企业应用 证书无效 已解决
    iOS7.1企业应用"无法安装应用程序 因为证书无效"的解决方案
  • 原文地址:https://www.cnblogs.com/kerwinC/p/5967584.html
Copyright © 2011-2022 走看看