zoukankan      html  css  js  c++  java
  • python学习-day11

    一、Rabbitmq

           RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。RabbitMQ使用的是AMQP协议,它是一种二进制协议。默认启动端口 5672。在 RabbitMQ 中,如下图结构:

    • 左侧 P 代表 生产者,也就是往 RabbitMQ 发消息的程序。生产者需要完成的任务:
    • 1 创建RabbitMQ连接
      2 获取信道
      3 声明交换器
      4 创建消息
      5 发布消息
      6 关闭信道
      7 关闭RabbitMQ连接 
    • 中间即是 RabbitMQ,其中包括了 交换机 和 队列。
    • 右侧 C 代表 消费者,也就是往 RabbitMQ 拿消息的程序。消费者需要完成的任务:
      1 创建RabbitMQ连接
      2 获取信道
      3 声明交换器
      4 声明队列
      5 队列和交换器绑定
      6 消费信息
      7 关闭信道
      8 关闭RabbitMQ连接
    • Exchange: 接受客户端发送的消息,并根据Binding将消息路由给服务器中的队列,Exchange分为direct, fanout, topic三种。
    • Binding: 连接Exchange和Queue,包含路由规则。
    • Queue: 消息队列,存储还未被消费的消息。
    • Message: Header+Body
    • Channel: 通道,执行AMQP的命令;一个连接可创建多个通道以节省资源

    1. dircted exchange

         路由键exchange,该交换机收到消息后会把消息发送到指定routing-key的queue中。rabbitmq内部默认有一个特殊的dircted exchange,该交换机的name是空字符串,所有queue都默认binding 到该交换机上。所有binding到该交换机上的queue,routing-key都和queue的name一样。

     生产者:

     1 import pika
     2 credentials = pika.PlainCredentials('admin', '123456')
     3 connection = pika.BlockingConnection(pika.ConnectionParameters(
     4     '192.168.170.134',5672,'/',credentials))
     5 channel = connection.channel()
     6 
     7 # 声明queue
     8 channel.queue_declare(queue='hello')
     9 
    10 # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
    11 channel.basic_publish(exchange='',
    12                       routing_key='hello',
    13                       body='Hello World!')
    14 print(" [x] Sent 'Hello World!'")
    15 connection.close()
    View Code

    消费者:

     1 import pika
     2 credentials = pika.PlainCredentials('admin', '123456')
     3 connection = pika.BlockingConnection(pika.ConnectionParameters(
     4     '192.168.170.134',5672,'/',credentials))
     5 channel = connection.channel()
     6 def callback(ch, method, properties, body):
     7     print(" [x] Received %r" % body)
     8 
     9 
    10 channel.basic_consume(callback,
    11                       queue='hello',
    12                       no_ack=True)
    13 
    14 print(' [*] Waiting for messages. To exit press CTRL+C')
    15 channel.start_consuming()
    View Code

     队列绑定关键字,发送者将数据关键字发送到消息Exchange,Exchange根据关键字判定应该将数据发送至指定队列。

    生产者:

     1 import pika,sys
     2 
     3 credentials = pika.PlainCredentials('admin', '123456')
     4 connection = pika.BlockingConnection(pika.ConnectionParameters(
     5     '192.168.170.134',5672,'/',credentials))
     6 channel = connection.channel()
     7 channel.exchange_declare(exchange='direct_logs',
     8                          type='direct')
     9 
    10 severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
    11 message = ' '.join(sys.argv[2:]) or 'Hello World!'
    12 channel.basic_publish(exchange='direct_logs',
    13                       routing_key=severity,
    14                       body=message)
    15 print(" [x] Sent %r:%r" % (severity, message))
    16 connection.close()
    View Code

    消费者:

     1 import pika,sys
     2 
     3 credentials = pika.PlainCredentials('admin', '123456')
     4 connection = pika.BlockingConnection(pika.ConnectionParameters(
     5     '192.168.170.134',5672,'/',credentials))
     6 channel = connection.channel()
     7 
     8 channel.exchange_declare(exchange='direct_logs',
     9                          type='direct')
    10 
    11 result = channel.queue_declare(exclusive=True)
    12 queue_name = result.method.queue
    13 
    14 severities = sys.argv[1:]
    15 if not severities:
    16     sys.stderr.write("Usage: %s [info] [warning] [error]
    " % sys.argv[0])
    17     sys.exit(1)
    18 
    19 for severity in severities:
    20     channel.queue_bind(exchange='direct_logs',
    21                        queue=queue_name,
    22                        routing_key=severity)
    23 
    24 print(' [*] Waiting for logs. To exit press CTRL+C')
    25 
    26 
    27 def callback(ch, method, properties, body):
    28     print(" [x] %r:%r" % (method.routing_key, body))
    29 
    30 
    31 channel.basic_consume(callback,
    32                       queue=queue_name,
    33                       no_ack=True)
    34 
    35 channel.start_consuming()
    View Code

     运行结果:

    2. fanout exchange

          发布/订阅exchange ,发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,二发布者发布消息时,会将消息放置在所有相关队列中。任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上。

    生产者:

     1 import pika,sys
     2 
     3 credentials = pika.PlainCredentials('admin', '123456')
     4 connection = pika.BlockingConnection(pika.ConnectionParameters(
     5     '192.168.170.134',5672,'/',credentials))
     6 channel = connection.channel()
     7 # 声明queue
     8 channel.exchange_declare(exchange='logs',
     9                          type='fanout')
    10 
    11 message = ' '.join(sys.argv[1:]) or "info: Hello World!"
    12 channel.basic_publish(exchange='logs',
    13                       routing_key='',
    14                       body=message)
    15 print(" [x] Sent %r" % message)
    16 connection.close()
    View Code

    消费者:

     1 import pika
     2 
     3 credentials = pika.PlainCredentials('admin', '123456')
     4 connection = pika.BlockingConnection(pika.ConnectionParameters(
     5     '192.168.170.134',5672,'/',credentials))
     6 channel = connection.channel()
     7 
     8 result = channel.queue_declare(exclusive=True)  # 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
     9 queue_name = result.method.queue
    10 
    11 channel.queue_bind(exchange='logs',
    12                    queue=queue_name)
    13 
    14 print(' [*] Waiting for logs. To exit press CTRL+C')
    15 
    16 
    17 def callback(ch, method, properties, body):
    18     print(" [x] %r" % body)
    19 
    20 
    21 channel.basic_consume(callback,
    22                       queue=queue_name,
    23                       no_ack=True)
    24 
    25 channel.start_consuming()
    View Code

     3. topic exchange

          在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入"路由值"和"关键字"进行匹配,匹配成功,则将数据发送到指定队列。

    • # :表示可以匹配0个或多个单词;

    • * :表示只能匹配一个单词。

    生产者:

     1 import pika,sys
     2 
     3 credentials = pika.PlainCredentials('admin', '123456')
     4 connection = pika.BlockingConnection(pika.ConnectionParameters(
     5     '192.168.170.134',5672,'/',credentials))
     6 channel = connection.channel()
     7 
     8 channel.exchange_declare(exchange='topic_logs',
     9                          type='topic')
    10 
    11 routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
    12 message = ' '.join(sys.argv[2:]) or 'Hello World!'
    13 channel.basic_publish(exchange='topic_logs',
    14                       routing_key=routing_key,
    15                       body=message)
    16 print(" [x] Sent %r:%r" % (routing_key, message))
    17 connection.close()
    View Code

    消费者:

     1 import pika,sys
     2 
     3 credentials = pika.PlainCredentials('admin', '123456')
     4 connection = pika.BlockingConnection(pika.ConnectionParameters(
     5     '192.168.170.134',5672,'/',credentials))
     6 channel = connection.channel()
     7 
     8 channel.exchange_declare(exchange='topic_logs',
     9                          type='topic')
    10 
    11 result = channel.queue_declare(exclusive=True)
    12 queue_name = result.method.queue
    13 
    14 binding_keys = sys.argv[1:]
    15 if not binding_keys:
    16     sys.stderr.write("Usage: %s [binding_key]...
    " % sys.argv[0])
    17     sys.exit(1)
    18 
    19 for binding_key in binding_keys:
    20     channel.queue_bind(exchange='topic_logs',
    21                        queue=queue_name,
    22                        routing_key=binding_key)
    23 
    24 print(' [*] Waiting for logs. To exit press CTRL+C')
    25 
    26 
    27 def callback(ch, method, properties, body):
    28     print(" [x] %r:%r" % (method.routing_key, body))
    29 
    30 
    31 channel.basic_consume(callback,
    32                       queue=queue_name,
    33                       no_ack=True)
    34 
    35 channel.start_consuming()
    View Code

     二、基于rabbitmq的RPC

          基于rabbitmq的rpc实现流程:

        (1)首先客户端发送一个reply_to和corrention_id的请求,发布到RPC队列中;

        (2)服务器端处理这个请求,并把处理结果发布到一个回调Queue,此Queue的名称应当与reply_to的名称一致

        (3)客户端从回调Queue中得到先前correlation_id设定的值的处理结果。如果碰到和先前不一样的corrention_id的值,将会忽略而不是抛出异常。

      对于上面所提到的回调Queue中的消费处理使用的是BasicProperties类。

    服务端:

     1 import pika
     2 
     3 cre_publiser = pika.PlainCredentials('admin', '123456')
     4 conn_para = pika.ConnectionParameters('192.168.170.134',5672,'/',cre_publiser)
     5 connection = pika.BlockingConnection(conn_para)
     6 
     7 # 建立会话
     8 channel = connection.channel()
     9 
    10 # 声明RPC请求队列
    11 channel.queue_declare(queue='rpc_queue')
    12 
    13 # 数据处理方法
    14 def fib(n):
    15     if n == 0:
    16         return 0
    17     elif n == 1:
    18         return 1
    19     else:
    20         return fib(n-1) + fib(n-2)
    21 
    22 # 对RPC请求队列中的请求进行处理
    23 def on_request(ch, method, props, body):
    24     n = int(body)
    25 
    26     print(" [.] fib(%s)" % n)
    27 
    28     # 调用数据处理方法
    29     response = fib(n)
    30 
    31     # 将处理结果(响应)发送到回调队列
    32     ch.basic_publish(exchange='',
    33                      routing_key=props.reply_to,
    34                      properties=pika.BasicProperties(correlation_id=props.correlation_id),
    35                      body=str(response))
    36     ch.basic_ack(delivery_tag = method.delivery_tag)
    37 
    38 # 负载均衡,同一时刻发送给该服务器的请求不超过一个
    39 channel.basic_qos(prefetch_count=1)
    40 
    41 channel.basic_consume(on_request,
    42                       queue='rpc_queue')
    43 
    44 print(" [x] Awaiting RPC requests")
    45 channel.start_consuming()
    View Code

    客户端:

     1 import pika
     2 import uuid
     3 class FibonacciRpcClient(object):
     4     def __init__(self):
     5         self.cre_publiser = pika.PlainCredentials('admin', '123456')
     6         self.conn_para = pika.ConnectionParameters('192.168.170.134',5672,'/',self.cre_publiser)
     7         self.connection = pika.BlockingConnection(self.conn_para)
     8 
     9         self.channel = self.connection.channel()
    10 
    11         result = self.channel.queue_declare(exclusive=True)
    12         self.callback_queue = result.method.queue
    13 
    14         self.channel.basic_consume(self.on_response,
    15                                    no_ack=True,
    16                                    queue=self.callback_queue)
    17 
    18     def on_response(self, ch, method, props, body):
    19         if self.corr_id == props.correlation_id:
    20             self.response = body
    21 
    22     def call(self, n):
    23         self.response = None
    24         self.corr_id = str(uuid.uuid4())
    25         self.channel.basic_publish(exchange='',
    26                                    routing_key='rpc_queue',
    27                                    properties=pika.BasicProperties(
    28                                    reply_to=self.callback_queue,
    29                                    correlation_id=self.corr_id,
    30                                    ),
    31                                    body=str(n))
    32         while self.response is None:
    33             self.connection.process_data_events()
    34         return int(self.response)
    35 
    36 
    37 fibonacci_rpc = FibonacciRpcClient()
    38 
    39 print(" [x] Requesting fib(6)")
    40 response = fibonacci_rpc.call(6)
    41 print(" [.] Got %r" % response)
    View Code
  • 相关阅读:
    ArcGIS Pro获得一个要素图层一种方法
    ArcGIS Pro layout clone
    ActiveMapViewChanged和选择变化
    ArcGIS Pro 改变栅格的数据源
    ArcGIS Pro自定义图标
    Windows Server 2016 路由和远程访问
    IIS应用程序池_缓存回收
    asp.net RSA密钥之C#格式与Java格式转换(PEM格式)
    MD5和Hash
    C# list与数组的转换
  • 原文地址:https://www.cnblogs.com/iclq/p/5967777.html
Copyright © 2011-2022 走看看