zoukankan      html  css  js  c++  java
  • 工作机制.py

    rpc_server.py
    # !/usr/bin/env python3.5
    # -*- coding:utf-8 -*-
    # __author__ == 'LuoTianShuai'
    """
    RPC/Server端
    """
    import pika
    # 添加认证信息
    credentials = pika.PlainCredentials("admin", "admin")
    connection = pika.BlockingConnection(pika.ConnectionParameters("192.168.31.123", 5672, "/", credentials))

    # 添加一个通道
    channel = connection.channel()
    # 添加一个队列,这个队列在Server就是我们监听请求的队列
    channel.queue_declare(queue='rpc_queue')


    # 斐波那契计算
    def fib(n):
    if n == 0:
    return 0
    elif n == 1:
    return 1
    else:
    return fib(n-1) + fib(n-2)


    # 应答函数,它是我们接受到消息后如处理的函数替代原来的callback
    def on_request(ch, method, props, body):
    n = int(body)
    print(" [.] fib(%s)" % n)
    response = fib(n)

    ch.basic_publish(exchange='',
    # 返回的队列,从属性的reply_to取出来
    routing_key=props.reply_to,
    # 添加correlation_id,和Client进行一致性匹配使用的
    properties=pika.BasicProperties(correlation_id=props.correlation_id),
    # 相应消息/我们的处理结果
    body=str(response))
    # 增加消息回执
    ch.basic_ack(delivery_tag=method.delivery_tag)

    # 每次预处理的请求个数/这个请求我没有处理完不要给我发了
    channel.basic_qos(prefetch_count=1)
    # 定义接收通道的属性/定义了callback方法,接收的队列,返回的队列在哪里?on_request 的routing_key=props.reply_to
    channel.basic_consume(on_request, queue='rpc_queue')

    # 开始接收消息
    print(" [x] Awaiting RPC requests")
    channel.start_consuming()

    rpc_client.py
    # !/usr/bin/env python3.5
    # -*- coding:utf-8 -*-
    # __author__ == 'LuoTianShuai'
    """
    RPC/Client端
    """
    import pika
    import uuid


    # 定义菲波那切数列RPC Client类调用RPC Server
    class FibonacciRpcClient(object):
    def __init__(self):
    # 添加认证信息
    credentials = pika.PlainCredentials("admin", "admin")
    self.connection = pika.BlockingConnection(pika.ConnectionParameters("192.168.31.123", 5672, "/", credentials))
    # 添加一个通道
    self.channel = self.connection.channel()
    # 生成一个随机队列-定义callback回调队列
    result = self.channel.queue_declare(exclusive=True)
    self.callback_queue = result.method.queue
    # 定义回调的通道属性
    self.channel.basic_consume(self.on_response, # 回调结果执行完执行的Client端的callback方法
    no_ack=True,
    queue=self.callback_queue)
    # 这里注意我们并没有直接阻塞的开始接收消息了

    # Client端 Callback方法
    def on_response(self, ch, method, props, body):
    if self.corr_id == props.correlation_id:
    # 定义了self.response = body
    self.response = body

    def call(self, n):
    # 定义了一个普通字段
    self.response = None
    # 生成了一个uuid
    self.corr_id = str(uuid.uuid4())
    # 发送一个消息
    self.channel.basic_publish(exchange='', # 使用默认的Exchange,根据发送的routing_key来选择队列
    routing_key='rpc_queue', # 消息发送到rpc_queue队列中
    # 定义属性
    properties=pika.BasicProperties(
    reply_to=self.callback_queue, # Client端定义了回调消息的callback队列
    correlation_id=self.corr_id, # 唯一值用来做什么的?request和callback 匹配用
    ),
    body=str(n))

    # 开始循环 我们刚才定义self.response=None当不为空的时候停止`
    while self.response is None:
    # 非阻塞的接受消息
    self.connection.process_data_events(time_limit=3)
    return int(self.response)

    # 实例化对象
    fibonacci_rpc = FibonacciRpcClient()

    print(" [x] Requesting fib(30)")
    # 发送请求计算菲波那切数列 30
    response = fibonacci_rpc.call(30)
    print(" [.] Got %r" % response)

  • 相关阅读:
    界面控件DevExpress WPF入门 表达式编辑器功能
    Telerik UI for WPF全新版本——拥有Office2019高对比度主题
    DevExpress报表控件v21.2 全新的Visual Studio报表设计器
    报告生成器FastReport .NET入门指南 在Linux中启动应用程序
    文档控件DevExpress Office File API v21.2 自定义字体加载引擎
    UI组件库Kendo UI for Angular入门 如何开始使用图表功能
    WPF界面工具Telerik UI for WPF入门级教程 设置一个主题(二)
    DevExtreme初级入门教程(React篇) TypeScript支持
    报表开发利器FastReport .NET v2022.1 添加关键对象和属性
    python项目打包(一) setup.py、Python源代码项目结构
  • 原文地址:https://www.cnblogs.com/luoyan01/p/9734123.html
Copyright © 2011-2022 走看看