zoukankan      html  css  js  c++  java
  • Python使用RabbitMQ

     

    1. 基本用法流程

    • 生产者:
      • 建立socket连接上rabbitmq
        1 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
        1 pika.ConnectionParameters(host=_DEFAULT, port=_DEFAULT, virtual_host=_DEFAULT, credentials=_DEFAULT, channel_max=_DEFAULT, frame_max=_DEFAULT, heartbeat=_DEFAULT, ssl=_DEFAULT, ssl_options=_DEFAULT, connection_attempts=_DEFAULT, retry_delay=_DEFAULT, socket_timeout=_DEFAULT, locale=_DEFAULT, backpressure_detection=_DEFAULT, blocked_connection_timeout=_DEFAULT, client_properties=_DEFAULT, tcp_options=_DEFAULT, **kwargs)
        • 常用参数
          • host,与rabbitmq服务器连接,设置为localhost表示与本机的rabbitmq建立连接,通常程序与rabbitmq服务需要在统一环境方可使用
        • port,默认5672
        • credentials:用户认证,使用如pika.credentials.PlainCredentials进行认证
      • 建立通信通道
        channel = connection.channel()
      • 建立队列/建立交换机
        • 简单模式建立队列
          result = channel.queue_declare(self, queue='', passive=False, durable=False, exclusive=False, auto_delete=False, arguments=None)
          • 常用参数
            • queue,队列名
            • durable,消息是放在队列由消费者取走,该参数表示取走时是否备份一份
            • exclusive,队列是否本连接独占
          • 建立的队列
            • queue = result.method.queue
        • 其他模式建立交换机
          channel.exchange_declare(exchange=None, exchange_type='direct', passive=False, durable=False, auto_delete=False, internal=False, arguments=None)
          • 常用参数
            • exchange,交换机名称
            • exchange_type,重点,交换机的模式,有三种:fanout,direct,topic,详细见exchange模式
            • durable,该参数表示消费者取走时是否备份一份
      • 传输数据
        channel.basic_publish(exchange, routing_key, body, properties=None, mandatory=False, immediate=False)
        • 常用参数
          • exchange,传输到交换机的名,可为’‘,表示不传输到交换机
          • routing_key,传输到队列的名,可为’‘,表示不传输到队列
          • body,传输的数据properties,额外参数,必须是pika.BasicProperties的实例
            pika.BasicProperties(content_type=None, content_encoding=None, headers=None, delivery_mode=None, priority=None, correlation_id=None, reply_to=None, expiration=None, message_id=None, timestamp=None, type=None, user_id=None, app_id=None, cluster_id=None)
          • delivery_mode,传输模式
              • 设置为2,表示对消息进行持久化
            • reply_to,传递队列,可用于RPC回调队列
            • correlation_id,关联ID,可用于RPC回调时识别身份
      • 结束
        • connection.close()
    • 消费者
      • 建立socket连接上rabbitmq
        • 同上生产者
      • 建立通信通道
        • channel = connection.channel()
      • 建立交换机[可选]
        • 同上生产者
      • 建立队列
        • 同上生产者
      • 定制消息处理规则
        • 定义回调函数
          def callback(ch, method, properties, body): pass
        • 定制规则
          • channel.basic_consume(consumer_callback, queue, no_ack=False, exclusive=False, consumer_tag=None, arguments=None)
          • consumer_callback,回调函数
          • queue,接收消息队列
          • no_ack,是否不应答,True表示不应答,False表示应答
            • 是否给rabbitmq返回,已收到并处理消息
      • 阻塞监听队列,当有消息,则按照规则处理
        • channel.start_consuming()
      a

    RabbitMQ工作模型

    • 工作模型指的是生产者和消费者之间使用不同规则通信使用的RabbitMQ的模式

    简单模式

    • 示例
      • 生产者

         1 import pika
         2         
         3   # 封装socket通信,建立连接
         4   connection = pika.BlockingConnection(
         5       pika.ConnectionParameters(
         6           host='192.168.40.128',
         7           port=5672,
         8           credentials=pika.credentials.PlainCredentials(
         9               username='admin',
        10               password='123456'
        11           )
        12       ))
        13         
        14   # 创建通道对象
        15   channel = connection.channel()
        16         
        17   # 创建一个队列,名字为hello
        18   channel.queue_declare(queue='hello')
        19         
        20   # 直接向队列推送消息
        21   msg = "兔子"
        22   num = 10
        23   for i in range(num):
        24       channel.basic_publish(
        25           exchange='',                # 值为空表示简单模式
        26           routing_key='hello',        # 队列名
        27           body=''.join([msg, str(i)])    # 发送数据
        28       )
        29         
        30   print("Sent '%s' * %s" % (msg, num))
        31   connection.close()
      • 消费者

         1 import pika
         2         
         3   # 封装socket通信,建立连接
         4   connection = pika.BlockingConnection(
         5       pika.ConnectionParameters(
         6           host='192.168.40.128',
         7           port=5672,
         8           credentials=pika.credentials.PlainCredentials(
         9               username='admin',
        10               password='123456'
        11           )
        12       ))
        13         
        14   # 创建通道对象
        15   channel = connection.channel()
        16         
        17   # 创建一个叫hello的队列,有则连接该队列
        18   channel.queue_declare(queue='hello')
        19         
        20   # 定义回调函数
        21   def callback(ch, method, properties, body):
        22       print(" [x] Received %r" % body.decode("utf-8"))
        23         
        24   # 设置执行命令
        25   channel.basic_consume(
        26       callback,
        27       queue='hello',
        28       no_ack=True
        29   )
        30         
        31   print(' [*] Waiting for messages. To exit press CTRL+C')
        32   # 执行命令
        33   channel.start_consuming()

    exchange模式(三种)

    fanout
    • 分发模式,报纸订阅模式,生产者生产一份数据,交换机为每个消费者绑定的队列发送一样的数据

    • 生产者

       1 import pika
       2   import sys
       3     
       4   connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
       5   channel = connection.channel()
       6     
       7   channel.exchange_declare(exchange='logs',
       8                            exchange_type='fanout')
       9     
      10   message = ' '.join(sys.argv[1:]) or "info: Hello World!"
      11   channel.basic_publish(exchange='logs',
      12                         routing_key='',
      13                         body=message)
      14   print(" [x] Sent %r" % message)
      15   connection.close()
    • 消费者

       1   import pika
       2     
       3   connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
       4   channel = connection.channel()
       5     
       6   channel.exchange_declare(exchange='logs',
       7                            exchange_type='fanout')
       8     
       9   result = channel.queue_declare(exclusive=True)
      10   queue_name = result.method.queue
      11     
      12   channel.queue_bind(exchange='logs',
      13                      queue=queue_name)
      14     
      15   print(' [*] Waiting for logs. To exit press CTRL+C')
      16     
      17   def callback(ch, method, properties, body):
      18       print(" [x] %r" % body)
      19     
      20   channel.basic_consume(callback,
      21                         queue=queue_name,
      22                         no_ack=True)
      23     
      24   channel.start_consuming()
    direct
    • 关键字指定发送,

    • 生产者

       1 import pika
       2   import sys
       3     
       4   connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
       5   channel = connection.channel()
       6     
       7   channel.exchange_declare(exchange='direct_logs',
       8                            exchange_type='direct')
       9     
      10   severity = sys.argv[1] if len(sys.argv) > 2 else 'info'
      11   message = ' '.join(sys.argv[2:]) or 'Hello World!'
      12   channel.basic_publish(exchange='direct_logs',
      13                         routing_key=severity,
      14                         body=message)
      15   print(" [x] Sent %r:%r" % (severity, message))
      16   connection.close()
    • 消费者

       1 import pika
       2   import sys
       3     
       4   connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
       5   channel = connection.channel()
       6     
       7   channel.exchange_declare(exchange='direct_logs',
       8                            exchange_type='direct')
       9     
      10   result = channel.queue_declare(exclusive=True)
      11   queue_name = result.method.queue
      12     
      13   severities = sys.argv[1:]
      14   if not severities:
      15       sys.stderr.write("Usage: %s [info] [warning] [error]
      " % sys.argv[0])
      16       sys.exit(1)
      17     
      18   for severity in severities:
      19       channel.queue_bind(exchange='direct_logs',
      20                          queue=queue_name,
      21                          routing_key=severity)
      22     
      23   print(' [*] Waiting for logs. To exit press CTRL+C')
      24     
      25   def callback(ch, method, properties, body):
      26       print(" [x] %r:%r" % (method.routing_key, body))
      27     
      28   channel.basic_consume(callback,
      29                         queue=queue_name,
      30                         no_ack=True)
      31     
      32   channel.start_consuming()
      View Code
    • 测试

      • python producer.py warning error > logs_from_rabbit.log
      • python comsumer.py info warning error
    topic

    • 生产者

       1 import pika
       2   import sys
       3     
       4   connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
       5   channel = connection.channel()
       6     
       7   channel.exchange_declare(exchange='topic_logs',
       8                            exchange_type='topic')
       9     
      10   routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
      11   message = ' '.join(sys.argv[2:]) or 'Hello World!'
      12   channel.basic_publish(exchange='topic_logs',
      13                         routing_key=routing_key,
      14                         body=message)
      15   print(" [x] Sent %r:%r" % (routing_key, message))
      16   connection.close()
      View Code
    • 消费者

       1 import pika
       2   import sys
       3     
       4   connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
       5   channel = connection.channel()
       6     
       7   channel.exchange_declare(exchange='topic_logs',
       8                            exchange_type='topic')
       9     
      10   result = channel.queue_declare(exclusive=True)
      11   queue_name = result.method.queue
      12     
      13   binding_keys = sys.argv[1:]
      14   if not binding_keys:
      15       sys.stderr.write("Usage: %s [binding_key]...
      " % sys.argv[0])
      16       sys.exit(1)
      17     
      18   for binding_key in binding_keys:
      19       channel.queue_bind(exchange='topic_logs',
      20                          queue=queue_name,
      21                          routing_key=binding_key)
      22     
      23   print(' [*] Waiting for logs. To exit press CTRL+C')
      24     
      25   def callback(ch, method, properties, body):
      26       print(" [x] %r:%r" % (method.routing_key, body))
      27     
      28   channel.basic_consume(callback,
      29                         queue=queue_name,
      30                         no_ack=True)
      31     
      32   channel.start_consuming()
      View Code
    • 测试

      • 接收所有日志
        • python comsumer.py "#"
      • 接收包含kern开头的
        • python comsumer.py "kern.*"
      • 接收critical结尾的
        • python comsumer.py "*.critical"
      • 发送kern.critical、A critical kernel error
        • python producer.py "kern.critical" "A critical kernel error"

    基于RabbitMQ的RPC

    • 何为RPC
      • 客户端通过消息队列向服务端发送数据
      • 服务端处理完数据,将数据通过回调队列返回给客户端
    • 客户端可能回同时发起多个任务,在服务端回调后应当根据唯一标识为每一个任务返回值的指定数据

    • 服务端

       1 import pika
       2     
       3   connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
       4     
       5   channel = connection.channel()
       6     
       7   channel.queue_declare(queue='rpc_queue')
       8     
       9   def fib(n):
      10       if n == 0:
      11           return 0
      12       elif n == 1:
      13           return 1
      14       else:
      15           return fib(n-1) + fib(n-2)
      16     
      17   def on_request(ch, method, props, body):
      18       n = int(body)
      19     
      20       print(" [.] fib(%s)" % n)
      21       response = fib(n)
      22     
      23       ch.basic_publish(exchange='',
      24                        routing_key=props.reply_to,
      25                        properties=pika.BasicProperties(correlation_id = 
      26                                                            props.correlation_id),
      27                        body=str(response))
      28       ch.basic_ack(delivery_tag = method.delivery_tag)
      29     
      30   channel.basic_qos(prefetch_count=1)
      31   channel.basic_consume(on_request, queue='rpc_queue')
      32     
      33   print(" [x] Awaiting RPC requests")
      34   channel.start_consuming()
      View Code
    • 客户端

       1 import pika
       2   import uuid
       3     
       4   class FibonacciRpcClient(object):
       5       def __init__(self):
       6           self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
       7     
       8           self.channel = self.connection.channel()
       9     
      10           result = self.channel.queue_declare(exclusive=True)
      11           self.callback_queue = result.method.queue
      12     
      13           self.channel.basic_consume(self.on_response, no_ack=True,
      14                                      queue=self.callback_queue)
      15     
      16       def on_response(self, ch, method, props, body):
      17           if self.corr_id == props.correlation_id:
      18               self.response = body
      19     
      20       def call(self, n):
      21           self.response = None
      22           self.corr_id = str(uuid.uuid4())
      23           self.channel.basic_publish(exchange='',
      24                                      routing_key='rpc_queue',
      25                                      properties=pika.BasicProperties(
      26                                            reply_to = self.callback_queue,
      27                                            correlation_id = self.corr_id,
      28                                            ),
      29                                      body=str(n))
      30           while self.response is None:
      31               self.connection.process_data_events()
      32           return int(self.response)
      33     
      34   fibonacci_rpc = FibonacciRpcClient()
      35     
      36   print(" [x] Requesting fib(30)")
      37   response = fibonacci_rpc.call(30)
      38   print(" [.] Got %r" % response)
      View Code

     

  • 相关阅读:
    java中的 equals 与 ==
    String类的内存分配
    SVN用命令行更换本地副本IP地址
    npoi 设置单元格格式
    net core 微服务框架 Viper 调用链路追踪
    打不死的小强 .net core 微服务 快速开发框架 Viper 限流
    net core 微服务 快速开发框架 Viper 初体验20201017
    Anno 框架 增加缓存、限流策略、事件总线、支持 thrift grpc 作为底层传输
    net core 微服务 快速开发框架
    Viper 微服务框架 编写一个hello world 插件02
  • 原文地址:https://www.cnblogs.com/musl/p/12925471.html
Copyright © 2011-2022 走看看