zoukankan      html  css  js  c++  java
  • RPC通信

    @version: 
    @author: morgana
    @license: Apache Licence 
    @contact: vipmorgana@gmail.com
    @site: 
    @software: PyCharm
    @file: rpcclient.py
    @time: 2018/4/15 上午12:28
    """
    
    # 1.声明一个队列,作为reply_to返回消息结果的队列
    # 2.发消息到队列,消息里带一个唯一标识符uid,reply_to
    # 3.监听reply_to 的队列,直到有结果
    import queue
    
    import pika
    import uuid
    
    class CMDRpcClient(object):
        def __init__(self):
            credentials = pika.PlainCredentials('morgana', '123456')
            parameters = pika.ConnectionParameters(host='127.0.0.1', credentials=credentials)
            self.connection = pika.BlockingConnection(parameters)
            self.channel = self.connection.channel()
    
            #声明一个队列,作为reply_to返回消息结果的队列
            result = self.channel.queue_declare(exclusive=True)
            self.callback_queue = result.method.queue #命令的执行结果的queue
    
            #声明要监听callback_queue
            self.channel.basic_consume(self.on_response, no_ack=True,
                                       queue=self.callback_queue)
    
        def on_response(self, ch, method, props, body):
            """
            收到服务器端命令结果后执行这个函数
            :param ch:
            :param method:
            :param props:
            :param body:
            :return:
            """
            if self.corr_id == props.correlation_id:#服务端props.correlation_id
                self.response = body.decode("gbk") #把执行结果赋值给Response
    
        def call(self, n):
            self.response = None
            self.corr_id = str(uuid.uuid4()) #唯一标识符号 自己生成的
            self.channel.basic_publish(exchange='',
                                       routing_key='rpc_queue2',
                                       properties=pika.BasicProperties(
                                             reply_to = self.callback_queue,
                                             correlation_id = self.corr_id,
                                             ),
                                       body=str(n))
    
    
            while self.response is None:
                self.connection.process_data_events()
                #检测监听的队列里有没有新消息,如果有,收,如果没有,返回None
                #检测有没有要发送的新指令
            return self.response
    
    cmd_rpc = CMDRpcClient()
    
    print(" [x] Requesting fib(30)")
    response = cmd_rpc.call('ifconfig')
    
    print(response)
    
    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-
    """
    @version: 
    @author: morgana
    @license: Apache Licence 
    @contact: vipmorgana@gmail.com
    @site: 
    @software: PyCharm
    @file: rpcserver.py
    @time: 2018/4/15 上午12:29
    """
    #1.定义fib函数
    #2. 声明接收指令的队列名rpc_queue
    #3. 开始监听队列,收到消息后 调用fib函数
    #4 把fib执行结果,发送回客户端指定的reply_to 队列
    import subprocess
    import pika
    
    credentials = pika.PlainCredentials('morgana', '123456')
    parameters = pika.ConnectionParameters(host='127.0.0.1', credentials=credentials)
    connection = pika.BlockingConnection(parameters)
    
    channel = connection.channel() #队列连接通道
    
    channel.queue_declare(queue='rpc_queue2')
    
    def fib(n):
        if n == 0:
            return 0
        elif n == 1:
            return 1
        else:
            return fib(n-1) + fib(n-2)
    
    
    def run_cmd(cmd):
        cmd_obj = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
        result = cmd_obj.stdout.read() + cmd_obj.stderr.read()
    
        return result
    
    
    def on_request(ch, method, props, body):
        cmd = body.decode("utf-8")
    
        print(" [.] run (%s)" % cmd)
        response = run_cmd(cmd)
    
        ch.basic_publish(exchange='',
                         routing_key=props.reply_to, #队列
                         properties=pika.BasicProperties(correlation_id = 
                                                             props.correlation_id),
                         body=response)
    
        ch.basic_ack(delivery_tag = method.delivery_tag)
    
    channel.basic_consume(on_request, queue='rpc_queue2')
    
    print(" [x] Awaiting RPC requests")
    channel.start_consuming()
    

      

      

  • 相关阅读:
    javascript中this使用规律
    call和apply的作用和不同
    SVN的标准目录结构:trunk、branches、tags
    SVN 多人修改,如何管理 关于版本的问题
    公司考勤系统项目设计
    CDI Features
    Java Design Pattern
    公司考勤系统设计文件
    spring( history Design Philosophy )
    JSON/xml
  • 原文地址:https://www.cnblogs.com/morgana/p/8851742.html
Copyright © 2011-2022 走看看