zoukankan      html  css  js  c++  java
  • python第六十天-----RabbitMQ

    RabbitMQ消息队列:默认为消息轮循模式,按client端启动是顺序接收

    server端

     1 import pika
     2 connection = pika.BlockingConnection(pika.ConnectionParameters(
     3                'localhost'))
     4 channel = connection.channel()#管道
     5 
     6 #声明queue
     7 #channel.queue_declare(queue='hello')#队列名 hello
     8 channel.queue_declare(queue='hello',durable=True)#队列名 hello,持久化队列
     9 
    10 for i in range(10):
    11 
    12     channel.basic_publish(exchange='',
    13                           routing_key='hello',
    14                           body='Hello World!%s'%i,
    15                           properties=pika.BasicProperties(delivery_mode=2))
    16     print(" [x] Sent 'Hello World!'",i)
    17 connection.close()
    View Code

    client端

     1 import pika,time
     2 
     3 connection = pika.BlockingConnection(pika.ConnectionParameters(
     4                'localhost'))
     5 channel = connection.channel()
     6 #channel.queue_declare(queue='hello')
     7 channel.queue_declare(queue='hello',durable=True)#队列名 hello,持久化队列
     8 
     9 
    10 def callback(ch, method, properties, body):#回调函数
    11     print('接收消息中…………')
    12     time.sleep(1)
    13     print(" [x] Received %r" % body)
    14     ch.basic_ack(delivery_tag=method.delivery_tag)
    15 
    16 
    17 channel.basic_qos(prefetch_count=1)#同时只处理一个消息
    18 channel.basic_consume(callback,#接收到消息调用回调函数 callback
    19                       queue='hello',
    20                       #no_ack=True
    21                        )
    22 
    23 print(' [*] 接收消息中. To exit press CTRL+C')
    24 channel.start_consuming()#启动消息接收
    View Code

    消息持久化

    server端

     1 import pika
     2 connection = pika.BlockingConnection(pika.ConnectionParameters(
     3                'localhost'))
     4 channel = connection.channel()#管道
     5 
     6 #声明queue
     7 channel.queue_declare(queue='hello2',durable=True)#队列名 hello
     8 #channel.queue_declare(queue='hello2',durable=True)#队列名 hello,持久化队列
     9 
    10 
    11 channel.basic_publish(exchange='',
    12                         routing_key='hello2',
    13                         body='Hello World!%s---->')
    14 print(" [x] Sent 'Hello World!'")
    15 connection.close()
    View Code

    client端

     1 import pika,time
     2 
     3 connection = pika.BlockingConnection(pika.ConnectionParameters(
     4                'localhost'))
     5 channel = connection.channel()
     6 channel.queue_declare(queue='hello2')#服务端与客户端的设置需一致,不然会报错
     7 #channel.queue_declare(queue='hello2',durable=True)#队列名 hello,持久化队列
     8 
     9 
    10 def callback(ch, method, properties, body):#回调函数
    11     print('接收消息中…………')
    12     time.sleep(5)
    13     print(" [x] Received %r" % body)
    14     ch.basic_ack(delivery_tag=method.delivery_tag)
    15 
    16 
    17 channel.basic_qos(prefetch_count=1)#同时只处理一个消息
    18 channel.basic_consume(callback,#接收到消息调用回调函数 callback
    19                       queue='hello2',
    20                       #no_ack=True
    21                        )
    22 
    23 print(' [*] 接收消息中. To exit press CTRL+C')
    24 channel.start_consuming()#启动消息接收
    View Code

    fanout广播模式:实时发送,发送时,如果client端没启动将无法收到消息

    server端

     1 import pika,sys,time
     2 connection = pika.BlockingConnection(pika.ConnectionParameters(
     3                'localhost'))
     4 channel = connection.channel()#管道
     5 
     6 #声明queue 广播模式不用声明队列
     7 #channel.queue_declare(queue='hello')#队列名 hello
     8 #channel.queue_declare(queue='hello',durable=True)#队列名 hello,持久化队列
     9 
    10 argv=input('输入消息')
    11 msg=''.join(sys.argv[1:]) or  'info:消息默认发送………'
    12 for i in range(10):
    13     time.sleep(1)
    14     channel.basic_publish(exchange='logs',#绑定频道
    15                           #routing_key='hello',
    16                           routing_key='',
    17                           body=msg+str(i),
    18                           #properties=pika.BasicProperties(delivery_mode=2)#持久化 广播不能使用
    19                            )
    20     print(msg,i)
    21 #connection.close()
    View Code

    client端

     1 import pika,time
     2 
     3 connection = pika.BlockingConnection(pika.ConnectionParameters(
     4                'localhost'))
     5 channel = connection.channel()
     6 #channel.queue_declare(queue='hello2')#服务端与客户端的设置需一致,不然会报错
     7 #channel.queue_declare(queue='hello2',durable=True)#队列名 hello,持久化队列
     8 channel.exchange_declare(exchange='logs',#绑定频道
     9                          type='fanout')#接收类型
    10 reult=channel.queue_declare(exclusive=True)#随机生成唯一的队列名,会在消息接收后自动删除
    11 queuename=reult.method.queue#队列名 自动生成
    12 channel.queue_bind(exchange='logs',#先要绑定频道
    13                    queue=queuename
    14                    )
    15 
    16 
    17 def callback(ch, method, properties, body):#回调函数
    18     print('接收消息中…………')
    19     #time.sleep(5)
    20     print(" [x] Received %r" % body.decode())
    21     ch.basic_ack(delivery_tag=method.delivery_tag)
    22 
    23 
    24 channel.basic_qos(prefetch_count=1)#同时只处理一个消息
    25 channel.basic_consume(callback,#接收到消息调用回调函数 callback
    26                       queue=queuename,
    27                       #no_ack=True
    28                        )
    29 
    30 print(' [*] 接收消息中. To exit press CTRL+C')
    31 
    32 channel.start_consuming()#启动消息接收
    View Code

    direct广播模式:分级别发送消——客户端可以按级别接收

    server端

     1 import pika,sys,time
     2 connection = pika.BlockingConnection(pika.ConnectionParameters(
     3                'localhost'))
     4 channel = connection.channel()#管道
     5 
     6 
     7 
     8 severity = sys.argv[1] if len(sys.argv) > 1 else 'info'#启动参数 默认无参数为 info 级别
     9 msg=''.join(sys.argv[2:]) or  'info:消息默认发送………'#启动参数 为空,发默认消息
    10 for i in range(10):
    11     time.sleep(1)
    12     channel.basic_publish(exchange='direct_logs',#绑定频道
    13                           routing_key=severity,#默认的消息队列级别
    14                           body=msg+str(i),
    15                           #properties=pika.BasicProperties(delivery_mode=2)#持久化 广播不能使用
    16                            )
    17     print(msg,severity)
    18 connection.close()
    View Code

    client端

     1 import pika,time,sys
     2 
     3 connection = pika.BlockingConnection(pika.ConnectionParameters(
     4                'localhost'))
     5 channel = connection.channel()
     6 
     7 channel.exchange_declare(exchange='direct_logs',#定义一个接收的频道
     8                          type='direct')
     9 
    10 reult=channel.queue_declare(exclusive=True)#随机生成唯一的队列名,会在消息接收后自动删除
    11 queuename=reult.method.queue#队列名 自动生成
    12 
    13 
    14 severities = sys.argv[1:]
    15 if not severities:
    16     sys.stderr.write("Usage: %s [info] [warning] [error]
    " % sys.argv[0])#启动接收的消息级别
    17     sys.exit(1)
    18 
    19 for severity in severities:#循环接收各级别的消息
    20     channel.queue_bind(exchange='direct_logs',
    21                        queue=queuename,
    22                        routing_key=severity)
    23 
    24 def callback(ch, method, properties, body):#回调函数
    25     print('接收消息中…………')
    26     #time.sleep(5)
    27     print(" [x] Received %r" % body.decode())
    28     ch.basic_ack(delivery_tag=method.delivery_tag)
    29 
    30 
    31 channel.basic_qos(prefetch_count=1)#同时只处理一个消息
    32 channel.basic_consume(callback,#接收到消息调用回调函数 callback
    33                       queue=queuename,
    34                       #no_ack=True
    35                        )
    36 
    37 print(' [*] 接收消息中. To exit press CTRL+C')
    38 
    39 channel.start_consuming()#启动消息接收
    View Code

    topic细致消息过滤

    server端

     1 import pika,sys,time
     2 connection = pika.BlockingConnection(pika.ConnectionParameters(
     3                'localhost'))
     4 channel = connection.channel()#管道
     5 
     6 
     7 
     8 #severity = sys.argv[1] if len(sys.argv) > 1 else 'info'#启动参数 默认无参数为 info 级别
     9 routing_key= sys.argv[1] if len(sys.argv) > 1 else 'anorrymous.info'#启动参数 默认无参数为 info 级别
    10 msg=''.join(sys.argv[2:]) or  'info:消息默认发送………'#启动参数 为空,发默认消息
    11 for i in range(10):
    12     time.sleep(1)
    13     channel.basic_publish(exchange='direct_logs',#绑定频道
    14                           #routing_key=severity,#默认的消息队列级别
    15                           routing_key=routing_key,#默认的消息队列级别
    16                           body=msg+str(i),
    17                           #properties=pika.BasicProperties(delivery_mode=2)#持久化 广播不能使用
    18                            )
    19     #print(msg,severity)
    20     print(msg,routing_key)
    21 connection.close()
    View Code

    client端

     1 import pika,time,sys
     2 
     3 connection = pika.BlockingConnection(pika.ConnectionParameters(
     4                'localhost'))
     5 channel = connection.channel()
     6 
     7 channel.exchange_declare(exchange='direct_logs',#定义一个接收的频道
     8                          type='topic')
     9 
    10 reult=channel.queue_declare(exclusive=True)#随机生成唯一的队列名,会在消息接收后自动删除
    11 queuename=reult.method.queue#队列名 自动生成
    12 
    13 
    14 #severities = sys.argv[1:]
    15 binding_key = sys.argv[1:]
    16 #if not severities:
    17 if not binding_key:
    18     sys.stderr.write("Usage: %s [info] [warning] [error]
    " % sys.argv[0])#启动接收的消息级别
    19     sys.exit(1)
    20 
    21 #for severity in severities:#循环接收各级别的消息
    22 for severity in binding_key:#循环接收各级别的消息
    23     channel.queue_bind(exchange='direct_logs',
    24                        queue=queuename,
    25                        routing_key=severity)
    26 
    27 def callback(ch, method, properties, body):#回调函数
    28     print('接收消息中…………')
    29     #time.sleep(5)
    30     print(" [x] Received %r" % body.decode())
    31     ch.basic_ack(delivery_tag=method.delivery_tag)
    32 
    33 
    34 channel.basic_qos(prefetch_count=1)#同时只处理一个消息
    35 channel.basic_consume(callback,#接收到消息调用回调函数 callback
    36                       queue=queuename,
    37                       #no_ack=True
    38                        )
    39 
    40 print(' [*] 接收消息中. To exit press CTRL+C')
    41 
    42 channel.start_consuming()#启动消息接收
    View Code

    RPC模式:结果返回

    server端

     1 import pika
     2 import time
     3 connection = pika.BlockingConnection(pika.ConnectionParameters(
     4         host='localhost'))#生成消息对队
     5 
     6 channel = connection.channel()#生成管道
     7 
     8 channel.queue_declare(queue='rpc_queue')#消息收接的模式
     9 
    10 def fib(n):#计算函数——非波那
    11     if n == 0:
    12         return 0
    13     elif n == 1:
    14         return 1
    15     else:
    16         return fib(n-1) + fib(n-2)
    17 
    18 def on_request(ch, method, props, body):#回调函数
    19     n = int(body)
    20 
    21     print(" [.] fib(%s)" % n)
    22     response = fib(n)#调用计算函数
    23 
    24     ch.basic_publish(exchange='',
    25                      routing_key=props.reply_to,#收消息的队列
    26                      properties=pika.BasicProperties(correlation_id =props.correlation_id),#返回消息的队列
    27                      body=str(response))#返回结果数据
    28     ch.basic_ack(delivery_tag = method.delivery_tag)##确保消息被 客户端接收
    29 
    30 channel.basic_qos(prefetch_count=1)#同时只处理一个消息
    31 channel.basic_consume(on_request, queue='rpc_queue')#接收消息,自动调用回调函数
    32 
    33 print(" [x] Awaiting RPC requests")
    34 channel.start_consuming()#开始接收
    View Code

    client端

     1 import pika
     2 import uuid
     3 
     4 class FibonacciRpcClient(object):
     5     def __init__(self):
     6         self.connection = pika.BlockingConnection(pika.ConnectionParameters(
     7                 host='localhost'))#生成连接的服务端 ip
     8 
     9         self.channel = self.connection.channel()#创建一个管道
    10 
    11         result = self.channel.queue_declare(exclusive=True)#随机生成一个队列,收消息后自动删除
    12         self.callback_queue = result.method.queue#赋于管道 变量
    13 
    14         self.channel.basic_consume(self.on_response,#回调函数
    15                                    no_ack=True,#不用服务端确认本条消息
    16                                    queue=self.callback_queue)#随机生的队列
    17 
    18     def on_response(self, ch, method, props, body):#回调函数
    19         if self.corr_id == props.correlation_id:#判断服务端返回的队列名是否与当前所生成的队列名一致
    20             self.response = body#  将服务端的结果赋于返回来的结果变量
    21 
    22     def call(self, n):#发送消息的函数
    23         self.response = None#初始返回结果为空
    24         self.corr_id = str(uuid.uuid4())#生成一个服务端返回消息的队列名
    25         self.channel.basic_publish(exchange='',
    26                                    routing_key='rpc_queue',#确认为rpc模式 实时发送
    27                                    properties=pika.BasicProperties(
    28                                          reply_to = self.callback_queue,#发送的管道队列名
    29                                          correlation_id = self.corr_id,#发送给服务端,用于返回消息的队列名
    30                                          ),
    31                                    body=str(n))#发送的内容数据
    32         while self.response is None:
    33             self.connection.process_data_events()#非阻塞模式接收消息
    34             print('没有返回消息……')
    35         return int(self.response)#返回结果
    36 
    37 fibonacci_rpc = FibonacciRpcClient()#生成一个实例
    38 
    39 print(" [x] Requesting fib(10)")
    40 response = fibonacci_rpc.call(10)#调用发送消息的函数
    41 print(" [.] Got %r" % response)#打印结果
    View Code
  • 相关阅读:
    《ERP从内部集成起步》读书笔记——第2章 从优化业务流程谈信息集成的必要性 2.1从流程优化的需要理解信息化与管理的关系 2.1.1全局观念和全流程
    《ERP从内部集成起步》读书笔记——第一章 Garthner公司是如何提出ERP的 1.4 ERP内部集成与MRP II
    Reporting Service中Rdlc导出为pdf中文字乱码解决方法
    善用Wink将电脑操作录屏为Flash文件
    树本来就是疯的
    关于启动BIM工程硕士教育的思考
    AIRPAK3.0用户指导手册第一部分手册简介
    梦想
    How to Deal With Bullies 如何应对欺负你的人
    为什么说面试荒诞
  • 原文地址:https://www.cnblogs.com/uge3/p/7103231.html
Copyright © 2011-2022 走看看