zoukankan      html  css  js  c++  java
  • python-rabbitmq

    简单producer:

    import pika
    
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('localhost')
                                        )
    channel = connection.channel()
    
    #declar
    channel.queue_declare(queue='hello')  #声明一个queue
    
    
    channel.basic_publish(exchange='',
                          routing_key='hello',   #routing_key就是声明的queue的名字
                          body='Hello World!')   #消息内容
    print("[x] Send 'hello World!'")
    connection.close()
    

     简单consumer:

    import pika
    import time
    
    connection = pika.BlockingConnection(
         pika.ConnectionParameters('localhost')
    )
    channel = connection.channel()
    
    '''
    #如果确认这个queue已经存在, 可以不写下面语句,但是这里不声明,如果消费者先执行,就会出错。
    '''
    channel.queue_declare(queue='hello')
    
    def callback(ch,method,properties,body):
         '''
         :param ch: 管道的内存对象
         :param method:
         :param properties:
         :param body: 消息内容
         :return:
         '''
         print('---->',ch,method,properties,body)
         time.sleep(30)   
         print("[x] Recevied %r" % body)
    
    
    
    channel.basic_consume(callback,  #如果收到消息,就调用callback函数来处理消息
                          queue='hello', #从哪个队列收消息,收到消息执行callback
                          #no_ack=True   #True代表不确认,无论callback消息处理失败还是完成,都不会和生产端确认,默认是Flase,代表确认
                          )
    print('[*] Waiting for messages.To exit press CTRL+C')
    channel.start_consuming()  #start 执行命令
    

    队列以及消息持久化:

    #producer
    
    import pika
    
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('localhost')
                                        )
    channel = connection.channel()
    
    #declar
    channel.queue_declare(queue='hello3',durable=True)  #声明一个queue,durable参数设定持久化,注意持久化的只是队列,而不是消息内容
    
    channel.basic_publish(
                          exchange='',
                          routing_key='hello3',  #routing_key就是声明的queue的名字
                          body='Hello World!', #消息内容
                          properties=pika.BasicProperties(delivery_mode=2)   #消息持久化参数
                          )
    print("[x] Send 'hello World!'")
    connection.close()
    
    
    
    #consumer
    
    import pika
    import time
    
    connection = pika.BlockingConnection(
         pika.ConnectionParameters('localhost')
    )
    channel = connection.channel()
    
    '''
    #如果确认这个queue已经存在, 可以不写下面语句,但是这里不声明,如果消费者先执行,就会出错。
    '''
    channel.queue_declare(queue='hello3',durable=True) #durable参数要与生产端保持一致
    #channel.queue_declare(queue='hello2')
    def callback(ch,method,properties,body):
         '''
         :param ch: 管道的内存对象
         :param method:
         :param properties:
         :param body: 消息内容
         :return:
         '''
         print('---->',ch,method,properties,body)
         time.sleep(5)
         print("[x] Recevied %r" % body)
         ch.basic_ack(delivery_tag=method.delivery_tag)  #配合no_ack参数,处理完成,返回生产端确认
    
    
    channel.basic_qos(prefetch_count=1) #表示处理完一条再给我发消息
    channel.basic_consume(callback,  #如果收到消息,就调用callback函数来处理消息
                          queue='hello3', #从哪个队列收消息,收到消息执行callback
                          #no_ack=True   #True代表不确认,无论callback消息处理失败还是完成,都不会和生产端确认,默认是Flase,代表确认
                          )
    print('[*] Waiting for messages.To exit press CTRL+C')
    channel.start_consuming()  #start 执行命令
    

    rabbitMQ广播之:fanout (订阅发布)

    #producer:
    
    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
         host = 'localhost'))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='logs',
                             type='fanout')  #fanout表示广播
    
    #message = ' '.join(sys.argv[1:]) or "info: Hello World!"
    message = "info: Hello World!"
    channel.basic_publish(exchange='logs',
                          routing_key='',
                          body=message)
    print("[x] Sent %r" % message)
    connection.close()
    
    
    #consumer:
    
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
         host='localhost'))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='logs',
                             type='fanout'
    )
    
    result = channel.queue_declare(exclusive=True)   #exclusive排他的,唯一的,随机分配一个唯一的名字,消费者断开后自动删除
    queue_name = result.method.queue
    
    channel.queue_bind(exchange='logs',
                       queue=queue_name)
    
    print('[x] Waiting for logs. To exit press CTRL+C')
    
    def callback(ch, method, properties, body):
         print("[x] %r" % body)
    
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)
    
    channel.start_consuming()
    

    广播之direct:

    #producer
    
    #!/usr/bin/python3
    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
         host='localhost'))
    channel = connection.channel()
    
    channel.exchange_declare('direct_logs',
                             type='direct')
    
    severity = sys.argv[1] if len(sys.argv)>1 else 'info'
    message = ' '.join(sys.argv[2:]) or 'Hello World!'
    channel.basic_publish(exchange='direct_logs',
                          routing_key=severity,
                          body=message)
    print("[x] Sent %r:%r" % (severity, message))
    connection.close()
    
    
    #consumer
    
    #!/usr/bin/python3
    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
         host='localhost'))
    channel = connection.channel()
    channel.exchange_declare(exchange='direct_logs',
                             type='direct')
    
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    
    severities = sys.argv[1:]
    if not severities:
         sys.stderr.write("Usage: %s [info] [warning] [error]
    " % sys.argv[0])
         sys.exit(1)
    
    for severity in severities:
         channel.queue_bind(exchange='direct_logs',
                            queue=queue_name,
                            routing_key=severity)
    print('[*] Waiting for logs. To exit press CTRL+C')
    
    def callback(ch, method, properties, body):
         print("[x] %r:%r" % (method.routing_key,body))
    
    channel.basic_consume(callback,
                           queue=queue_name,
                           no_ack=True)
    channel.start_consuming()
    

    topic消息过滤广播:

    #producer
    
    import pika
    import sys
    connection = pika.BlockingConnection(pika.ConnectionParameters(
         host='localhost'))
    channel = connection.channel()
    channel.exchange_declare(exchange='topic_logs',
                             type='topic')
    
    routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
    message = ' '.join(sys.argv[2:]) or 'Hello World!'
    
    channel.basic_publish(exchange='topic_logs',
                          routing_key=routing_key,
                          body=message)
    
    print(" [x] Sent %r:%r" % (routing_key, message))
    connection.close()
    
    #consumer
    
    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
         host='localhost'))
    
    channel = connection.channel()
    channel.exchange_declare(exchange='topic_logs',
                             type='topic')
    
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    binding_keys = sys.argv[1:]
    
    if not binding_keys:
         sys.stderr.write("Usage: %s [binding_key]...
    " % sys.argv[0])
         sys.exit(1)
    
    for binding_key in binding_keys:
         channel.queue_bind(exchange='topic_logs',
                            queue=queue_name,
                            routing_key=binding_key)
         
    print(' [*] Waiting for logs. To exit press CTRL+C')
    
    def callback(ch, method, properties, body):
         print(" [x] %r:%r" % (method.routing_key, body))
    
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)
    
    channel.start_consuming()
    

    rpc:

    #sev:
    
    import pika
    import time
    connection = pika.BlockingConnection(pika.ConnectionParameters(
         host='localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='rpc_queue')
    
    def fib(n):
         if n == 0:
              return 0
         elif n == 1:
              return 1
         else:
              return fib(n - 1) + fib(n - 2)
    
    
    def on_request(ch, method, props, body):
         n = int(body)
         print(" [.] fib(%s)" % n)
         response = fib(n)
         ch.basic_publish(exchange='',
                          routing_key=props.reply_to,
                          properties=pika.BasicProperties(correlation_id= props.correlation_id),
                          body=str(response))
         ch.basic_ack(delivery_tag=method.delivery_tag)   #返回对端确认处理完成
    
    
    #channel.basic_qos(prefetch_count=1)  #此参数设定了处理消息的最大数量
    channel.basic_consume(on_request,    #回调函数
                          queue='rpc_queue')    #指定队列
    print(" [x] Awaiting RPC requests")
    channel.start_consuming()
    
    
    
    #cli:
    
    import pika
    import uuid
    import time
    
    
    class FibonacciRpcClient(object):
         def __init__(self):
              self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                   host='localhost'))
              self.channel = self.connection.channel()
              result = self.channel.queue_declare(exclusive=True)
              self.callback_queue = result.method.queue
              self.channel.basic_consume(self.on_response,
                                         no_ack=True,
                                         queue=self.callback_queue)
    
         def on_response(self, ch, method, props, body):
              if self.corr_id == props.correlation_id: #当接收到对端返回时先判断correlation_id是否与返回的id相同,
                                                      # 确保队列的一致性和唯一性
                   self.response = body
    
         def call(self, n):
              self.response = None
              self.corr_id = str(uuid.uuid4())   #产生一个随机数,赋给correlation_id传给对端,对端返回时再把这个随机数返回
              self.channel.basic_publish(exchange='',
                                         routing_key='rpc_queue',
                                         properties=pika.BasicProperties(
                                              reply_to=self.callback_queue,
                                              correlation_id=self.corr_id,
                                         ),
                                         body=str(n))
              while self.response is None:
                   self.connection.process_data_events() #非阻塞版的start_consumer()
                   #print("no msg...")
                   #time.sleep(0.5)
              return int(self.response)
    
    
    fibonacci_rpc = FibonacciRpcClient()
    print(" [x] Requesting fib(30)")
    while True:
         num = input("input>:").strip()
         if num.isdigit() and int(num) > 0:
              response = fibonacci_rpc.call(str(num))
              print(" [.] Got %r" % response)
         else:
              print("请输入大于0的整数")
    
  • 相关阅读:
    怎么用JQUERY设置div背景图片?
    为什么导入本地jquery.js老是无效?(已解决)
    问题:AJAX的send参数里,空格以及它后面的数据在传递时消失(已解决)
    问题:win7下配置好服务器就是不能查询数据库。(已解决)
    问题:怎么把mysql的系统时间调整为电脑的时间?(已解决)
    jira安装说明
    HTTP 响应时发生错误。这可能是由于服务终结点绑定未使用 HTTP 协议造成的。这还可能是由于服务器中止了 HTTP 请求上下文(可能由于服务关闭)所致。
    C# 清空控制台屏幕内容
    NPOI导出Excel,添加图片和设置格式,添加条形码
    JS简写
  • 原文地址:https://www.cnblogs.com/guqing/p/6574345.html
Copyright © 2011-2022 走看看