zoukankan      html  css  js  c++  java
  • RabbitMQ消息中介之Python使用

    本文介绍RabbitMQ在python下的基本使用 

    1. RabbitMQ安装,安装RabbitMQ需要预安装erlang语言,Windows直接下载双击安装即可

      RabbitMQ下载地址:http://www.rabbitmq.com/download.html

      Erlang语言下载地址:http://www.erlang.org/downloads

    2. RabbitMQ的基本使用

      发送消息端

    # -*- coding:utf-8 -*-
    # Author:Wong Du
    
    import pika
    
    # 生成RabbitMQ连接对象
    connection = pika.BlockingConnection(
        pika.ConnectionParameters("localhost")
    )
    
    # 建立连接管道
    channel = connection.channel()
    
    # 进行队列声明,指定消息要发送到的队列
    # durable声明队列持久性,即RabbitMQ重启队列仍旧不丢失(不完全可靠)
    # 对现有队列进行durable声明更改无法生效,只有重新声明一个新的队列才生效
    channel.queue_declare(queue="hello4", durable=True)
    
    # 发送消息到RabbitMQ消息队列里
    channel.basic_publish(
        exchange='',    # 指定交换器
        routing_key="hello4",   # 要绑定的队列
        body="Nice to meet you22223333...",     # 要发送的消息
        properties=pika.BasicProperties(
            delivery_mode=2,        # 通过delivery_mode=2将队列内的消息持久化
        )
    )
    
    print("[x] sent 'Nice to meet you22233332...'")
    connection.close()

      接收消息端

    # -*- coding:utf-8 -*-
    # Author:Wong Du
    
    import pika
    import time
    
    # 生成RabbitMQ连接对象
    connection = pika.BlockingConnection(
        pika.ConnectionParameters("localhost")
    )
    
    # 建立连接管道
    channel = connection.channel()
    
    # 声明队列名
    channel.queue_declare(queue="hello4", durable=True)
    
    # 定义构建回调函数
    def callback(ch, method, properties, body):
        print(ch)
        # time.sleep(20)
        print("[x] received %r" % body)
        # ch.basic_ack(delivery_tag= method.delivery_tag)  # 接收端回复消息给rabbixmq,代表该消息处理已完成
    
    # 默认情况下,RabbitMQ循环把消息发送给consumer, # 通过basic_qos(prefetch_count=1)设置,可以在处理并确认完前一个消息之前,不再接收新信息 # 即实现“能者多劳”的效果 channel.basic_qos(prefetch_count=1)
    channel.basic_consume( callback, queue
    ="hello4", no_ack=True, # 如果为True,rabbitmq会在consumer接收到数据时就删除队列中的消息 ) print("[*] waiting for messages. To exit press CTRL+C...") channel.start_consuming()

    -----------------------拓展一吓,RabbitMQ消息转发类型:分类筛选,广播,正则匹配------------------------------

    3. 分类筛选:RabbitMQ通过消息分类进行对应发送和接收

      发送消息端

     1 # -*- coding:utf-8 -*-
     2 # Author:Wong Du
     3 
     4 import pika
     5 import sys
     6 
     7 # 生成RabbitMQ连接对象
     8 connection = pika.BlockingConnection(
     9     pika.ConnectionParameters('localhost')
    10 )
    11 
    12 # 建立连接管道
    13 channel = connection.channel()
    14 
    15 # 声明交换器和交换器转发消息类型;'direct'为分类筛选 型
    16 channel.exchange_declare(exchange='_direct',
    17                          exchange_type='direct')
    18 
    19 # 获取脚本命令:根据脚本命令,定义对应严重级别的消息(默认为info类消息)
    20 severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
    21 # 获取要发送给RabbitMQ的消息(默认为Hello world!)
    22 msg = "".join(sys.argv[2:]) or "Hello world!"
    23 '''例:python direct分类筛选_producer.py info 无关痛痒的消息
    24         表示给RabbitMQ发送一条info级别的消息,消息内容:无关痛痒的消息
    25 '''
    26 
    27 channel.basic_publish(exchange='_direct',   # 指定交换器
    28                       routing_key=severity, # 指定要筛选的字段
    29                       body=msg              # 要发送给RabbitMQ的消息
    30 )
    31 
    32 print("----exec done----")
    33 connection.close()
    direct分类筛选_producer

      接收消息端

     1 # -*- coding:utf-8 -*-
     2 # Author:Wong Du
     3 
     4 import pika
     5 import sys
     6 
     7 # 生成RabbitMQ连接对象
     8 connection = pika.BlockingConnection(
     9     pika.ConnectionParameters('localhost')
    10 )
    11 
    12 # 建立连接管道
    13 channel = connection.channel()
    14 
    15 # 声明交换器和交换器消息转发类型;'direct'为分类筛选 型
    16 channel.exchange_declare(exchange='_direct',
    17                          exchange_type='direct',)
    18 
    19 # 声明随机独家管道,用以接收RabbitMQ的消息
    20 Random_Queue = channel.queue_declare(exclusive=True)
    21 # 获取管道名
    22 queue_name = Random_Queue.method.queue
    23 
    24 # 获取脚本命令;获取相应严重级别的消息,可多个严重级别
    25 severities = sys.argv[1:]
    26 if not severities:
    27     sys.stderr.write("33[31;1mUsage: %s [info] [warning] [error]33[0m" % sys.argv)
    28     exit()
    29 
    30 for severity in severities:
    31     # 将交换器、队列和筛选分类绑定起来
    32     # 可以简单理解为:从交换器中通过rongting_key筛选消息后发送给queue
    33     channel.queue_bind(exchange='_direct',
    34                        queue=queue_name,
    35                        routing_key=severity)
    36 
    37 # 定义消息回调函数
    38 def callback(ch, method, properties, body):
    39     print(body)
    40 
    41 channel.basic_consume(
    42     callback,   # 获取消息后交给回调函数进行处理
    43     queue=queue_name,   # 从指定queue中获取消息
    44     no_ack=True         # 如果为True,rabbitmq会在consumer接收到数据时就删除队列中的消息
    45 )
    46 
    47 print("[*] waiting for messages. To exit press CTRL+C...")
    48 # 运行管道,一直接收消息,如无消息则为阻塞状态
    49 channel.start_consuming()
    direct分类筛选_consumer

    4. 广播:RabbitMQ通过广播发送消息给多个consumer,广播模式下发送消息是实时的,没有即时接收,则丢失

      发送消息端

     1 # -*- coding:utf-8 -*-
     2 # Author:Wong Du
     3 
     4 '''
     5 RabbitMQ交换器广播模式,
     6 当producer发送消息给RabbitMQ时,
     7 RabbitMQ会立即转发给所有当前在线的consumer,然后将消息删除
     8 如当前没有在线的consumer,消息则会丢失
     9 即RabbitMQ下交换器广播模式的消息是实时的,当时发送,当时接收
    10 '''
    11 
    12 import pika
    13 
    14 # 生成RabbitMQ连接对象
    15 connection = pika.BlockingConnection(
    16     pika.ConnectionParameters("localhost")
    17 )
    18 
    19 # 建立连接管道
    20 channel = connection.channel()
    21 
    22 # 声明交换器及交换器消息转发类型;'fanout'为广播类型
    23 channel.exchange_declare(exchange="dudu",
    24                          # type="fanout",
    25                          exchange_type="fanout",)
    26 
    27 msg = "Hello world! Nice to meet you..."
    28 channel.basic_publish(exchange="dudu",  # 指定交换器
    29                       routing_key='',   # routing_key必须配置
    30                       body=msg)         # 要发送给RabbitMQ的消息
    31 
    32 print("---exec done---")
    33 connection.close()
    fanout广播_producer

      接收消息端

     1 # -*- coding:utf-8 -*-
     2 # Author:Wong Du
     3 
     4 '''
     5 RabbitMQ交换器广播模式,
     6 当producer发送消息给RabbitMQ时,
     7 RabbitMQ会立即转发给所有当前在线的consumer,然后将消息删除
     8 如当前没有在线的consumer,消息则会丢失
     9 即RabbitMQ下交换器广播模式的消息是实时的,当时发送,当时接收
    10 '''
    11 
    12 import pika
    13 
    14 # 生成RabbitMQ连接对象
    15 connection = pika.BlockingConnection(
    16     pika.ConnectionParameters("localhost")
    17 )
    18 
    19 # 建立连接管道
    20 channel = connection.channel()
    21 
    22 # 声明交换器和交换器消息转发类型;'fanout'为广播类型
    23 channel.exchange_declare(exchange="dudu",
    24                          exchange_type="fanout",)
    25 
    26 # 声明随机独家队列,用以接收RabbitMQ发来的消息
    27 Random_Queue = channel.queue_declare(exclusive=True)
    28 # 获取队列名
    29 queue_name = Random_Queue.method.queue
    30 
    31 # 将交换器和队列绑定在一起
    32 channel.queue_bind(exchange="dudu",
    33                    queue=queue_name,)
    34 
    35 # 定义消息处理回调函数
    36 def callback(ch, method, properties, body):
    37     print("[x] received %r" % body)
    38 
    39 channel.basic_consume(
    40     callback,   # 接收到消息后通过回调函数处理
    41     queue=queue_name,   # 配置获取消息的队列
    42     # no_ack=True,        # consumer接收到消息后RabbitMQ即删除掉该消息
    43 )
    44 
    45 print("[*] waiting for messages. To exit press CTRL+C...")
    46 # 运行管道,一直接收消息,如无消息则为阻塞状态
    47 channel.start_consuming()
    fanout广播_consumer

    5. 正则匹配:RabbitMQ通过关键字标记发送消息,通过匹配关键字接收消息

      发送消息端

     1 # -*- coding:utf-8 -*-
     2 # Author:Wong Du
     3 
     4 '''
     5 RabbitMQ交换器的topic正则匹配模式,
     6 可理解为direct分类筛选的升级版,
     7 既能通过关键字标记消息内容,也能通过正则匹配标记关键字
     8 '''
     9 
    10 import pika
    11 import sys
    12 
    13 # 生成RabbitMQ连接对象
    14 connection = pika.BlockingConnection(
    15     pika.ConnectionParameters('localhost')
    16 )
    17 
    18 # 建立连接管道
    19 channel = connection.channel()
    20 
    21 # 声明交换器和交换器消息转发类型;'topic'为正则匹配类型
    22 channel.exchange_declare(exchange='_topic',
    23                          exchange_type='topic')
    24 
    25 # 获取脚本命令;word用来标记msg,msg为消息内容本身
    26 word = sys.argv[1] if len(sys.argv) > 1 else 'any.info'
    27 msg = " ".join(sys.argv[2:]) or "Hello world!"
    28 
    29 
    30 channel.basic_publish(
    31     exchange='_topic',      # 指定交换器
    32     routing_key=word,       # 配置标记关键字
    33     body=msg                # 要发送给RabbitMQ的消息内容
    34 )
    35 
    36 print("----exec done----")
    37 connection.close()
    topic匹配_producer

      接收消息端

     1 # -*- coding:utf-8 -*-
     2 # Author:Wong Du
     3 
     4 '''
     5 RabbitMQ交换器的topic正则匹配模式,
     6 可理解为direct分类筛选的升级版,
     7 既能通过关键字标记消息内容,也能通过正则匹配标记关键字
     8 '''
     9 
    10 import pika
    11 import sys
    12 
    13 # 生成RabbitMQ连接对象
    14 connection = pika.BlockingConnection(
    15     pika.ConnectionParameters('localhost')
    16 )
    17 
    18 # 建立连接管道
    19 chanel = connection.channel()
    20 
    21 # 声明交换器和交换器消息转发类型;'topic'为匹配类型
    22 chanel.exchange_declare(exchange='_topic',
    23                         exchange_type='topic')
    24 
    25 # 声明随机独家队列,用以接收RabbitMQ发来的消息
    26 Random_Queue = chanel.queue_declare(exclusive=True)
    27 # 获取随机队列队列名
    28 queue_name = Random_Queue.method.queue
    29 
    30 # 获取脚本命令;即匹配规则
    31 severities = sys.argv[1:]
    32 if not severities:
    33     print("Usage: %s [*.info] [mysql.*] [#]" % sys.argv)
    34     exit(1)
    35 
    36 for severity in severities:
    37     # 对交换器、队列、匹配规则进行绑定
    38     chanel.queue_bind(exchange='_topic',
    39                       queue=queue_name,
    40                       routing_key=severity,)
    41 
    42 # 构建消息处理回调函数
    43 def callback(ch, method, properties, body):
    44     print(body)
    45 
    46 chanel.basic_consume(
    47     callback,
    48     queue=queue_name,
    49     no_ack=True,
    50 )
    51 
    52 print("[*] waiting for messages. To exit press CTRL+C...")
    53 # 运行管道,一直接收消息,如无消息则为阻塞状态
    54 chanel.start_consuming()
    topic匹配_consumer
  • 相关阅读:
    聚合根、实体、值对象
    哀悼的CSS 把网站变成灰色
    Ubuntu13.04更换aptget源
    JS判断用户终端,跳转到不同的页面.
    分享一个使用的FireFox 截图插件小巧方便
    Linux 下面FireFox 看CCTV直播
    Ubuntu 11.10后 Guest账户禁用!
    修改Ubuntu的启动画面plymouth
    ubuntu开机自启动小键盘
    Linux 用cat做图片种子||Windows 用copy做图片种子
  • 原文地址:https://www.cnblogs.com/Caiyundo/p/9554448.html
Copyright © 2011-2022 走看看