zoukankan      html  css  js  c++  java
  • Python之RabbitMQ操作

    RabbitMQ是一个消息代理,从“生产者”接收消息并传递消息至“消费者”,期间可根据规则路由、缓存、持久化消息。“生产者”也即message发送者以下简称P,相对应的“消费者”乃message接收者以下简称C,message通过queue由P到C,queue存在于RabbitMQ,可存储尽可能多的message,多个P可向同一queue发送message,多个C可从同一个queue接收message。

    实现的协议:AMQP。
     
    术语(Jargon)
     
    P,Producing,制造和发送信息的一方。
    Queue,消息队列。
    C,Consuming,接收消息的一方。
     
    RabbitMQ安装
    1 安装配置epel源
    2    $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
    3  
    4 安装erlang
    5    $ yum -y install erlang
    6  
    7 安装RabbitMQ
    8    $ yum -y install rabbitmq-server

    安装rabbitmq API

    1 pip install pika
    2 or
    3 easy_install pika
    4 or
    5 源码
    6  
    7 https://pypi.python.org/pypi/pika

    使用API操作RabbitMQ

    基于Queue实现生产者消费者模型

     1 #!/usr/bin/env python
     2 import pika
     3  
     4 # ######################### 生产者 #########################
     5  
     6 connection = pika.BlockingConnection(pika.ConnectionParameters(
     7         host='localhost'))
     8 channel = connection.channel()
     9  
    10 channel.queue_declare(queue='hello')
    11  
    12 channel.basic_publish(exchange='',
    13                       routing_key='hello',
    14                       body='Hello World!')
    15 print(" [x] Sent 'Hello World!'")
    16 connection.close()
     1 #!/usr/bin/env python
     2 import pika
     3  
     4 # ########################## 消费者 ##########################
     5  
     6 connection = pika.BlockingConnection(pika.ConnectionParameters(
     7         host='localhost'))
     8 channel = connection.channel()
     9  
    10 channel.queue_declare(queue='hello')
    11  
    12 def callback(ch, method, properties, body):
    13     print(" [x] Received %r" % body)
    14  
    15 channel.basic_consume(callback,
    16                       queue='hello',
    17                       no_ack=True)
    18  
    19 print(' [*] Waiting for messages. To exit press CTRL+C')
    20 channel.start_consuming()

    1、acknowledgment 消息不丢失(订阅端消息不丢失)

    no-ack = False,如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。

     1 #!/usr/bin/env python
     2 # time:
     3 # Auto:PANpan
     4 # func:
     5 import pika
     6 connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接rabbitMQ服务器
     7 channel=connection.channel()#创建频道,通过频道操作rabbitmq
     8 channel.queue_declare(queue='hai')#创建一个MQ队列,名称为 hai,如果不创建,必须保证生产者已经创建,否则报错
     9 
    10 def callback(ch,method,properties,body):
    11     print("[x] Received %r" %body)#打印获得消息的内容
    12     ch.basic_ack(delivery_tag=method.delivery_tag)#当no_ack=Fales的时候,使用该语句提醒服务端,消息执行完毕,服务端可以删除队列中的消息
    13 
    14 channel.basic_consume(callback,queue='hai',no_ack=False)
    15 #获取hai队列中的消息,获得消息后执行callback函数,no_ack参数默认值为False,表示客户端收到消息后是否等待确认消息,
    16 # 如果为true,则表示获取数据后生产者将该消息删除,消费者如果丢失消息,无法找回,如果为false,则表示生产者等待消费者获取消息后等待生产者确认,
    17 # 如果没有收到确认,则会重新将消息加入到队列中,消息不会丢失
    18 
    19 print('[*]Waiting for messages to exit press CTRL+C')
    20 channel.start_consuming()
    消费者
     1 #!/usr/bin/env python
     2 # time:
     3 # Auto:PANpan
     4 # func:
     5 import pika
     6 connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接rabbitMQ服务器
     7 channel=connection.channel()#创建频道,通过频道操作rabbitmq
     8 channel.queue_declare(queue='hai')#创建一个MQ队列,名称为 hai
     9 channel.basic_publish(exchange='',routing_key='hai',body='hello world')
    10 #向hai队列中插入"hello world"消息。routing_key指定队列名称,body指定消息内容
    11 print("[x] Sent 'hello world' ")
    12 connection.close()
    生产者

    2、durable   消息不丢失(服务端消息不丢失)

     1 # time:
     2 # Auto:PANpan
     3 # func:
     4 import pika
     5 connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接rabbitMQ服务器
     6 channel=connection.channel()#创建频道,通过频道操作rabbitmq
     7 channel.queue_declare(queue='hai',durable=True)#创建一个MQ队列,名称为 hai,如果不创建,必须保证生产者已经创建,否则报错.durable=True是让消息持久化
     8 
     9 def callback(ch,method,properties,body):
    10     print("[x] Received %r" %body)#打印获得消息的内容
    11     ch.basic_ack(delivery_tag=method.delivery_tag)#当no_ack=Fales的时候,使用该语句提醒服务端,消息执行完毕,服务端可以删除队列中的消息
    12 
    13 channel.basic_consume(callback,queue='hai',no_ack=True)
    14 #获取hai队列中的消息,获得消息后执行callback函数,no_ack参数默认值为False,表示客户端收到消息后是否等待确认消息,
    15 # 如果为true,则表示获取数据后生产者将该消息删除,消费者如果丢失消息,无法找回,如果为false,则表示生产者等待消费者获取消息后等待生产者确认,
    16 # 如果没有收到确认,则会重新将消息加入到队列中,消息不会丢失
    17 
    18 print('[*]Waiting for messages to exit press CTRL+C')
    19 channel.start_consuming()
    消费者
     1 #!/usr/bin/env python
     2 # time:
     3 # Auto:PANpan
     4 # func:
     5 import pika
     6 connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接rabbitMQ服务器
     7 channel=connection.channel()#创建频道,通过频道操作rabbitmq
     8 channel.queue_declare(queue='hai',durable=True)#创建一个MQ队列,名称为 hai
     9 channel.basic_publish(exchange='',routing_key='hai',body='hello world',
    10                       properties=pika.BasicProperties(delivery_mode=2,))#该语句作用为告诉rabbit服务器将消息持久化
    11 #向hai队列中插入"hello world"消息。routing_key指定队列名称,body指定消息内容
    12 print("[x] Sent 'hello world' ")
    13 connection.close()
    生产者

    3、消息获取顺序

    默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者1去队列中获取 偶数 序列的任务。

    channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照索引排列

     1 #!/usr/bin/env python
     2 # time:
     3 # Auto:PANpan
     4 # func:
     5 import pika
     6 connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接rabbitMQ服务器
     7 channel=connection.channel()#创建频道,通过频道操作rabbitmq
     8 channel.queue_declare(queue='hai',durable=True)#创建一个MQ队列,名称为 hai,如果不创建,必须保证生产者已经创建,否则报错.durable=True是让消息持久化
     9 
    10 def callback(ch,method,properties,body):
    11     print("[x] Received %r" %body)#打印获得消息的内容
    12     ch.basic_ack(delivery_tag=method.delivery_tag)#当no_ack=Fales的时候,使用该语句提醒服务端,消息执行完毕,服务端可以删除队列中的消息
    13 channel.basic_qos(prefetch_count=1)#客户端按顺序去取,默认为奇偶数
    14 channel.basic_consume(callback,queue='hai',no_ack=True)
    15 #获取hai队列中的消息,获得消息后执行callback函数,no_ack参数默认值为False,表示客户端收到消息后是否等待确认消息,
    16 # 如果为true,则表示获取数据后生产者将该消息删除,消费者如果丢失消息,无法找回,如果为false,则表示生产者等待消费者获取消息后等待生产者确认,
    17 # 如果没有收到确认,则会重新将消息加入到队列中,消息不会丢失
    18 
    19 print('[*]Waiting for messages to exit press CTRL+C')
    20 channel.start_consuming()
    消费者
     1 #!/usr/bin/env python
     2 # time:
     3 # Auto:PANpan
     4 # func:
     5 import pika
     6 connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接rabbitMQ服务器
     7 channel=connection.channel()#创建频道,通过频道操作rabbitmq
     8 channel.queue_declare(queue='hai',durable=True)#创建一个MQ队列,名称为 hai
     9 channel.basic_publish(exchange='',routing_key='hai',body='hello world',
    10                       properties=pika.BasicProperties(delivery_mode=2,))#该语句作用为告诉rabbit服务器将消息持久化
    11 #向hai队列中插入"hello world"消息。routing_key指定队列名称,body指定消息内容
    12 print("[x] Sent 'hello world' ")
    13 connection.close()
    生产者

    4、发布订阅

    发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。

     exchange type = fanout

     1 #!/usr/bin/env python
     2 # time:
     3 # Auto:PANpan
     4 # func:
     5 import pika
     6 import sys
     7 connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接消息队列服务器
     8 channel=connection.channel()#创建频道,通过频道对rabbitmq进行操作
     9 
    10 channel.exchange_declare(exchange='logs',type='fanout')#创建exchange,名称为logs,若果该消息队列已经创建,可以省略
    11 message=''.join(sys.argv[1:]) or "info: Hello Wrold"
    12 
    13 channel.basic_publish(exchange='logs',routing_key='',body=message)#将消息添加今年队列
    14 print('[x] sent %r'%message)
    15 connection.close()
    发布者
     1 #!/usr/bin/env python
     2 # time:
     3 # Auto:PANpan
     4 # func:
     5 import pika
     6 connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8')) #连接消息队列服务器
     7 channel = connection.channel()#创建频道,通过频道对rabbitmq进行操作
     8 
     9 channel.exchange_declare(exchange='logs',#创建exchange,名称为logs
    10                          type='fanout')#type='fanout'作用为凡是和exchange相关联的队列,在用户给exchange发消息时,所有关联队列都会受到消息
    11 
    12 result=channel.queue_declare(exclusive=True)#不指定队列名,有系统随机创建
    13 queue_name=result.method.queue
    14 
    15 channel.queue_bind(exchange='logs',queue=queue_name)#将exchange和当前的消息队列做一个绑定
    16 print(' [*] Waiting for logs. To exit press CTRL+C')
    17 
    18 
    19 def callback(ch,method,properties,body):
    20     print('[x] %r' %body)
    21 
    22 channel.basic_consume(callback,queue=queue_name,no_ack=True)#在队列中获取消息
    23 
    24 channel.start_consuming()
    订阅者

    5、关键字发送

     exchange type = direct

    之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。

     1 #!/usr/bin/env python
     2 # time:
     3 # Auto:PANpan
     4 # func:
     5 import pika
     6 import sys
     7 
     8 connection = pika.BlockingConnection(pika.ConnectionParameters(
     9         host='192.168.11.138'))
    10 channel = connection.channel()
    11 
    12 channel.exchange_declare(exchange='direct_logs',
    13                          type='direct')
    14 
    15 #severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
    16 #message = ' '.join(sys.argv[2:]) or 'Hello World!'
    17 severity='info'
    18 message='test'
    19 channel.basic_publish(exchange='direct_logs',
    20                       routing_key=severity,
    21                       body=message)
    22 print(" [x] Sent %r:%r" % (severity, message))
    23 connection.close()
    发布者
     1 #!/usr/bin/env python
     2 # time:
     3 # Auto:PANpan
     4 # func:
     5 import pika
     6 import sys
     7 
     8 connection = pika.BlockingConnection(pika.ConnectionParameters(
     9         host='10.0.0.8'))
    10 channel = connection.channel()
    11 
    12 channel.exchange_declare(exchange='direct_logs',
    13                          type='direct')#设置exchange类型为direct
    14 
    15 result = channel.queue_declare(exclusive=True)  #创建随机队列
    16 queue_name = result.method.queue
    17 
    18 # severities = sys.argv[1:]
    19 # if not severities:
    20 #     sys.stderr.write("Usage: %s [info] [warning] [error]
    " % sys.argv[0])
    21 #     sys.exit(1)
    22 severities=['error']
    23 for severity in severities:
    24     channel.queue_bind(exchange='direct_logs',
    25                        queue=queue_name,
    26                        routing_key=severity)#绑定关键字
    27 
    28 print(' [*] Waiting for logs. To exit press CTRL+C')
    29 
    30 def callback(ch, method, properties, body):
    31     print(" [x] %r:%r" % (method.routing_key, body))
    32 
    33 channel.basic_consume(callback,
    34                       queue=queue_name,
    35                       no_ack=True)
    36 
    37 channel.start_consuming()
    订阅在1
     1 #!/usr/bin/env python
     2 # time:
     3 # Auto:PANpan
     4 # func:
     5 #!/usr/bin/env python
     6 # time:
     7 # Auto:PANpan
     8 # func:
     9 import pika
    10 import sys
    11 
    12 connection = pika.BlockingConnection(pika.ConnectionParameters(
    13         host='10.0.0.8'))
    14 channel = connection.channel()
    15 
    16 channel.exchange_declare(exchange='direct_logs',
    17                          type='direct')
    18 
    19 result = channel.queue_declare( )
    20 #声明queue,确认要从中接收message的queue
    21 #queue_declare函数是幂等的,可运行多次,但只会创建一次
    22 #若可以确信queue是已存在的,则此处可省略该声明,如producer已经生成了该queue
    23 #但在producer和consumer中重复声明queue是一个好的习惯
    24 #例如:  channel.queue_declare(queue='hello')
    25 queue_name = result.method.queue
    26 
    27 # severities = sys.argv[1:]
    28 # if not severities:
    29 #     sys.stderr.write("Usage: %s [info] [warning] [error]
    " % sys.argv[0])
    30 #     sys.exit(1)
    31 severities=['error','info']
    32 for severity in severities:
    33     channel.queue_bind(exchange='direct_logs',
    34                        queue=queue_name,
    35                        routing_key=severity)
    36 
    37 print(' [*] Waiting for logs. To exit press CTRL+C')
    38 
    39 def callback(ch, method, properties, body):
    40     print(" [x] %r:%r" % (method.routing_key, body))
    41 
    42 channel.basic_consume(callback,
    43                       queue=queue_name,
    44                       no_ack=True)
    45 
    46 channel.start_consuming()
    订阅在2

    6、模糊匹配

     exchange type = topic

    在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。

    • # 表示可以匹配 0 个 或 多个 单词
    • *  表示只能匹配 一个 单词
     1 #!/usr/bin/env python
     2 import pika
     3 import sys
     4 
     5 connection = pika.BlockingConnection(pika.ConnectionParameters(
     6         host='localhost'))
     7 channel = connection.channel()
     8 
     9 channel.exchange_declare(exchange='topic_logs',
    10                          type='topic')
    11 
    12 result = channel.queue_declare(exclusive=True)
    13 queue_name = result.method.queue
    14 
    15 binding_keys = sys.argv[1:]
    16 if not binding_keys:
    17     sys.stderr.write("Usage: %s [binding_key]...
    " % sys.argv[0])
    18     sys.exit(1)
    19 
    20 for binding_key in binding_keys:
    21     channel.queue_bind(exchange='topic_logs',
    22                        queue=queue_name,
    23                        routing_key=binding_key)
    24 
    25 print(' [*] Waiting for logs. To exit press CTRL+C')
    26 
    27 def callback(ch, method, properties, body):
    28     print(" [x] %r:%r" % (method.routing_key, body))
    29 
    30 channel.basic_consume(callback,
    31                       queue=queue_name,
    32                       no_ack=True)
    33 
    34 channel.start_consuming()
    订阅者
     1 #!/usr/bin/env python
     2 import pika
     3 import sys
     4 
     5 connection = pika.BlockingConnection(pika.ConnectionParameters(
     6         host='localhost'))
     7 channel = connection.channel()
     8 
     9 channel.exchange_declare(exchange='topic_logs',
    10                          type='topic')
    11 
    12 routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
    13 message = ' '.join(sys.argv[2:]) or 'Hello World!'
    14 channel.basic_publish(exchange='topic_logs',
    15                       routing_key=routing_key,
    16                       body=message)
    17 print(" [x] Sent %r:%r" % (routing_key, message))
    18 connection.close()
    发布者

    注:

    订阅/发布Demo
     
    发送消息给多个订阅者
    核心思想:消息发送给exchange,每个接收方创建匿名Queue绑定到exchange,exchange发送消息给每个接收方。
     
    Exchanges
     
    在RabbitMQ完整的模型中,消息只能发送给一个exchange。
    exchange一方面接收消息,另一方面push给queues。
     
    exchange类型
    > rabbitmqctl list_exchanges
    direct
    topic
    headers
    fanout 广播消息给已知队列
     
  • 相关阅读:
    HashMap源码学习
    java线程池
    MySQL的MVCC
    volatile关键字学习
    ArrayList, Vector和CopyOnWriteArrayList对比学习
    曹工说Redis源码(3)-- redis server 启动过程完整解析(中)
    曹工说Redis源码(2)-- redis server 启动过程解析及简单c语言基础知识补充
    曹工杂谈:我们的应用,启动就要去其他服务拉数据,那其他服务挂了,我们就起不来了?
    程序员正确的提问方式(个人建议)
    曹工说Redis源码(1)-- redis debug环境搭建,使用clion,达到和调试java一样的效果
  • 原文地址:https://www.cnblogs.com/panwenbin-logs/p/5715669.html
Copyright © 2011-2022 走看看