zoukankan      html  css  js  c++  java
  • RbbitMQ 的 python 实现方法

    RbbitMQ(消息队列)
    
    #简单模式
    服务端
    import pika
    #连接
    connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))
    连接通道
    channel = connection.channel()
    声明队列
    channel.queue_declare(queue='hello')
    发送数据    
    channel.basic_publish(exchange='',
                          routing_key='hello',
                          body='Hello World!')
                      
    print(" [x] Sent 'Hello World!'")
    结束连接
    connection.close()
    
    
    # ########################## 客户端 ##########################
     #获得连接对象
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    获得连接通道
    channel = connection.channel()
     #声明队列
    channel.queue_declare(queue='hello')
     回调函数
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
     #从通道取出数据执行回调函数
    channel.basic_consume( callback,        
                           queue='hello',   #队列名
                           no_ack=True)   
    
    print(' [*] Waiting for messages. To exit press CTRL+C')
    在通道等待数据传递过来
    channel.start_consuming()
    #############################防止掉线客户端########################################
    #no-ack = False,如果消费者遇到情况挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。
        回调函数中的ch.basic_ack(delivery_tag=method.delivery_tag)
        basic_comsume中的no_ack=False
    
    import pika
    #连接
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='10.211.55.4'))
    连接通道
    channel = connection.channel()
    声明队列
    channel.queue_declare(queue='hello')
    
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
        import time
        time.sleep(10)
        print 'ok'
        #执行这行代码之后,才把数据销毁
        ch.basic_ack(delivery_tag = method.delivery_tag)
    获得管道数据执行回调函数
    channel.basic_consume(callback,
                          queue='hello',
                          no_ack=False)
    
    print(' [*] Waiting for messages. To exit press CTRL+C')
    等待
    channel.start_consuming()
    
    
    #########################durable :消息不丢失(服务端)########################################3
    import pika
    连接
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
    连接通道
    channel = connection.channel()
    声明队列
    channel.queue_declare(queue='hello', durable=True)
    push数据
    channel.basic_publish(exchange='',    #交换
                          routing_key='hello',
                          body='Hello World!',
                          基础属性
                          properties=pika.BasicProperties(
                              delivery_mode=2, # make message persistent
                              #让消息持久发送
                          ))
    print(" [x] Sent 'Hello World!'")
    connection.close()
    
    ##################################消息不丢失(客户端)#############################################)#############################################
    import pika
    连接
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
    通道
    channel = connection.channel()
    
    生成队列
    channel.queue_declare(queue='hello', durable=True)
    
    
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
        import time
        time.sleep(10)
        print 'ok'
        #确认
        ch.basic_ack(delivery_tag = method.delivery_tag)
    基础消耗方法,执行回调
    channel.basic_consume(callback,
                          queue='hello',
                          no_ack=False)
    
    print(' [*] Waiting for messages. To exit press CTRL+C')
    开始消耗
    channel.start_consuming()
    ##################################
    (3) 消息获取顺序
    
    默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者1去队列中获取 偶数 序列的任务。
    
    channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照奇偶数排列
    ################################客户端##################################
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
    channel = connection.channel()
    
    # make message persistent
    channel.queue_declare(queue='hello')
    
    
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
        import time
        time.sleep(10)
        print 'ok'
        #确认,
        ch.basic_ack(delivery_tag = method.delivery_tag)
    #谁来谁取
    channel.basic_qos(prefetch_count=1)
    
    channel.basic_consume(callback,
                          queue='hello',
                          no_ack=False)
    
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()
    
    
    #############exchange模型#############
    exchange type = fanout   #交换类型
    
    #############服务端########################
    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
    声明交流
    channel.exchange_declare(exchange='logs',
                             type='fanout')
    
    message = ' '.join(sys.argv[1:]) or "info: Hello World!"
    push数据
    channel.basic_publish(exchange='logs',    #交流name
                          routing_key='',   
                          body=message)
    print(" [x] Sent %r" % message)
    connection.close()
    
    ########################客户端##################################################
    # 消费者
    #!/usr/bin/env python
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
    声明交流
    channel.exchange_declare(exchange='logs',
                             type='fanout')     #订阅
    声明队列
    result = channel.queue_declare(exclusive=True)
    #队列名
    queue_name = result.method.queue
    与队列捆绑
    channel.queue_bind(exchange='logs',    
                       queue=queue_name)
    
    print(' [*] Waiting for logs. To exit press CTRL+C')
    
    def callback(ch, method, properties, body):
        print(" [x] %r" % body)
    基本消耗
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)        #保护数据
    #消耗通道
    channel.start_consuming()
    
    #######################关键字发送#################################
     exchange type = direct
     之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。
    ###########################客户端########################################
    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    连接通道
    channel = connection.channel()
    声明交流
    channel.exchange_declare(exchange='direct_logs',
                             type='direct')    #直接
    声明队列
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    
    severities = sys.argv[1:]
    if not severities:
        sys.stderr.write("Usage: %s [info] [warning] [error]
    " % sys.argv[0])
        sys.exit(1)
    
    for severity in severities:
        channel.queue_bind(exchange='direct_logs',
                           queue=queue_name,
                           routing_key=severity)  就是这个
    
    print(' [*] Waiting for logs. To exit press CTRL+C')
    
    def callback(ch, method, properties, body):
        print(" [x] %r:%r" % (method.routing_key, body))
    消耗,获取队列信息回调
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)
    开始消耗
    channel.start_consuming()
    #############################
    在 topic 类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。
    
    # 表示可以匹配 0 个 或 多个 单词
    *  表示只能匹配 一个 单词
    ##############################模糊查找############################################
    import pika
    import sys
    连接
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    管道
    channel = connection.channel()
    交流
    channel.exchange_declare(exchange='topic_logs',
                             type='topic')    话题
    声明队列
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    
    binding_keys = sys.argv[1:]
    if not binding_keys:
        sys.stderr.write("Usage: %s [binding_key]...
    " % sys.argv[0])
        sys.exit(1)
    
    for binding_key in binding_keys:
        与队列捆绑
        channel.queue_bind(exchange='topic_logs',
                           queue=queue_name,
                           routing_key=binding_key)   #查找#
    
    print(' [*] Waiting for logs. To exit press CTRL+C')
    
    def callback(ch, method, properties, body):
        print(" [x] %r:%r" % (method.routing_key, body))
    
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)
    
    channel.start_consuming()
    
    ################################ 基于RabbitMQ的RPC###############
    一个客户端向服务器发送请求,服务器端处理请求后,将其处理结果保存在一个存储体中。而客户端为了获得处理结果,那么客户在向服务器发送请求时,同时发送一个回调队列地址reply_to。
    ###################################服务器####
    # 建立连接,服务器地址为localhost,可指定ip地址
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    
    # 建立会话
    channel = connection.channel()
    
    # 声明RPC请求队列
    channel.queue_declare(queue='rpc_queue')
    
    # 数据处理方法
    def fib(n):
        if n == 0:
            return 0
        elif n == 1:
            return 1
        else:
            return fib(n-1) + fib(n-2)
    
    # 对RPC请求队列中的请求进行处理
    def on_request(ch, method, props, body):
        n = int(body)
    
        print(" [.] fib(%s)" % n)
    
        # 调用数据处理方法
        response = fib(n)
    
        # 将处理结果(响应)发送到回调队列
        ch.basic_publish(exchange='',
                         routing_key=props.reply_to,
                         properties=pika.BasicProperties(correlation_id = 
                                                             props.correlation_id),
                         body=str(response))
        ch.basic_ack(delivery_tag = method.delivery_tag)
    
    # 负载均衡,同一时刻发送给该服务器的请求不超过一个
    channel.basic_qos(prefetch_count=1)
    
    channel.basic_consume(on_request, queue='rpc_queue')
    
    print(" [x] Awaiting RPC requests")
    channel.start_consuming()
    
    ##################################################################
    import pika
    import uuid
    
    class FibonacciRpcClient(object):
        def __init__(self):
            ”“”
            客户端启动时,创建回调队列,会开启会话用于发送RPC请求以及接受响应
            
            “”“
            
            # 建立连接,指定服务器的ip地址
            self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                    host='localhost'))
                    
            # 建立一个会话,每个channel代表一个会话任务
            self.channel = self.connection.channel()
            
            # 声明回调队列,再次声明的原因是,服务器和客户端可能先后开启,该声明是幂等的,多次声明,但只生效一次
            result = self.channel.queue_declare(exclusive=True)
            # 将次队列指定为当前客户端的回调队列
            self.callback_queue = result.method.queue
            
            # 客户端订阅回调队列,当回调队列中有响应时,调用`on_response`方法对响应进行处理; 
            self.channel.basic_consume(self.on_response, no_ack=True,
                                       queue=self.callback_queue)
    
    
        # 对回调队列中的响应进行处理的函数
        def on_response(self, ch, method, props, body):
            if self.corr_id == props.correlation_id:
                self.response = body
    
    
        # 发出RPC请求
        def call(self, n):
        
            # 初始化 response
            self.response = None
            
            #生成correlation_id 
            self.corr_id = str(uuid.uuid4())
            
            # 发送RPC请求内容到RPC请求队列`rpc_queue`,同时发送的还有`reply_to`和`correlation_id`
            self.channel.basic_publish(exchange='',
                                       routing_key='rpc_queue',
                                       properties=pika.BasicProperties(
                                             reply_to = self.callback_queue,
                                             correlation_id = self.corr_id,
                                             ),
                                       body=str(n))
                                       
            
            while self.response is None:
                self.connection.process_data_events()
            return int(self.response)
    
    # 建立客户端
    fibonacci_rpc = FibonacciRpcClient()
    
    # 发送RPC请求
    print(" [x] Requesting fib(30)")
    response = fibonacci_rpc.call(30)
    print(" [.] Got %r" % response)
  • 相关阅读:
    phpstrom中Terminal窗口打开
    window安装reidis完成之后,想要把数据存入redis,必须开扩展,不然报错,redis windows phpstudy 安装扩展
    Windows 安装 Anaconda3+PyCharm
    表单序列化+ajax跨域提交
    微信小程序无法获取到unionId(专业踩坑20年)
    支付宝的同步和异步的区别
    layui多图上传
    多图上传控制器及模型代码(2)thinkphp5+layui实现多图上传保存到数据库,可以实现图片自由排序,自由删除。
    【JZOJ4816】【NOIP2016提高A组五校联考4】label
    【JZOJ4815】【NOIP2016提高A组五校联考4】ksum
  • 原文地址:https://www.cnblogs.com/ldq1996/p/8512337.html
Copyright © 2011-2022 走看看