zoukankan      html  css  js  c++  java
  • pika和kombu实现rpc代码

    pika不支持多线程的rpc,虽然可以用多进程做到,但进程比线程更要耗费资源,而且多进程支持的也并不是很好,会出现偶发的异常。

    https://stackoverflow.com/questions/49154404/pika-threaded-execution-gets-error-505-unexpected-frame

    根据文章的指引,最快的方式就是放弃pika模块,换成kombu。

    示例

    文档
    Kombu模块的文档地址:https://docs.celeryproject.org/projects/kombu/en/stable/

    安装

    pip install kombu
    

    代码
    服务端任然采用旧的pika模块的代码,因为服务端不需要多线程去执行命令,单线程即可,客户端代码更换为kombu模块。

    • 服务端代码
    #!/usr/bin/python3
    # _*_ coding: utf-8 _*_
    
    """
    @Software: PyCharm
    @File: RPCCmd.py
    @Author: 高留柱
    @E-mail: liuzhu.gao@foxmail.com
    @Time: 2020/7/6 15:49
    """
    import pika
    import uuid
    import json
    
    
    class CMD:
        def __init__(self):
            """
            初始化函数的时候就建立管道,接收服务端的任务结果
            """
            credentials = pika.PlainCredentials('qpm', 'cljslrl0620')
            self.connection = pika.BlockingConnection(pika.ConnectionParameters(
        "it.sucheon.com", credentials=credentials, virtual_host="qpm"))
    
            self.channel = self.connection.channel()
    
            # 建立随机管道,用于告诉服务端,任务的结果放在这个随机管道中
            result = self.channel.queue_declare('', exclusive=True)
    
            self.callback_queue = result.method.queue
    
            # 从随机管道中取任务
            self.channel.basic_consume(
                queue=self.callback_queue,
                on_message_callback=self.on_response,  # 回调函数
                auto_ack=True,
            )
    
        # 收到任务结果的回调函数
        def on_response(self, ch, method, props, body):
            # 如果客户端的随机字符串和服务端发送过来的随机字符串相等,就代表着该结果属于该任务
            if self.corr_id == props.correlation_id:
                self.response = body
    
        def call(self, cmd, port):
            """
            :param cmd:
            :return:
            exchange: 交换器
            routing_key: 是管道的名字
            reply_to: 告诉服务端执行完命令把结果丢到哪个管道中
            """
            # TODO: 根据port查询UUID
            self.response = None
            self.corr_id = str(uuid.uuid4())  # 唯一标识符, 用于标识服务端的结果和客户端命令之间的联系,防止服务端和客户端命令和结果不对等
            self.channel.basic_publish(exchange="",
                                       routing_key=str(port),
                                       properties=pika.BasicProperties(
                                           reply_to=self.callback_queue,
                                           correlation_id=self.corr_id,
                                       ),
                                       body=str(cmd))
    
            # 最多等待10秒,10秒内有值立刻返回
            self.connection.process_data_events(time_limit=30)  # 检查队列中有没有新消息,没加time_limit代表不会阻塞,加了之后会进入阻塞态
            if self.response is None:
                # 如果服务端没有返回值的话,将删除任务管道,以免积累消息,但会导致服务端脚本停止堵塞态,结束运行
                # 如果不删除任务管道的话,也没啥大问题,就是当服务端重新连接rabbitMQ的时候,会把之前没接收到的命令全部执行一遍,但接收结果的管道并不会积压
                self.channel.queue_delete(queue=str(port))
                return {"status": 1, "stdout": "", "stderr": "连接超时,请重试!".encode('utf-8')}
            return json.loads(self.response)
    
    • 客户端代码
      pika替换为kombu,实现多线程执行命令,且限制命令超时时间为10秒,无响应后终止命令执行。
    #!/usr/bin/python3
    # _*_ coding: utf-8 _*_
    
    """
    @Software: PyCharm
    @File: rpc_kombu.py
    @Author: 高留柱
    @E-mail: liuzhu.gao@foxmail.com
    @Time: 2020/8/6 15:35
    @Notes:
    """
    
    import subprocess
    import json
    import os
    import time
    from kombu import Producer, Queue, Connection
    from kombu.mixins import ConsumerProducerMixin
    from concurrent.futures import ThreadPoolExecutor
    
    # 开启线程池,提供多用户同时操作
    pool = ThreadPoolExecutor(5)
    
    
    def execute_cmd(cmd):
        timeout = 10
        try:
            p = subprocess.Popen(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE, preexec_fn=os.setsid, shell=True,
                                 close_fds=True)
            p.wait(timeout)
            out = p.stdout.read().decode("utf-8").strip("	
     ")
            err = p.stderr.read().decode("utf-8").strip("	
     ")
            return json.dumps({'status': int(p.returncode), 'stdout': out, 'stderr': err})
        except Exception as e:
            print(e)
            return json.dumps({'status': 1, 'stdout': '', 'stderr': str(e)})
        finally:
            try:
                p.stdout.flush()
                p.stderr.flush()
                p.stdout.close()
                p.stderr.close()
                p.kill()
                os.killpg(p.pid, subprocess.signal.SIGKILL)
            except:
                pass
    
    def getPort():
        port = json.loads(execute_cmd("/bin/echo $PORT")).get('stdout')
        if not port:
            port = json.loads(execute_cmd('/bin/hostname')).get('stdout').split('-')[-1]
        return port
    
    
    # 队列的名字
    port = getPort()
    rpc_queue = Queue(port)
    
    
    class Worker(ConsumerProducerMixin):
    
        def __init__(self, connection):
            self.connection = connection
    
        def task(self, message):
            cmd = message.payload  # 获取消息内容
            result = execute_cmd(cmd)
            self.producer.publish(
                body=result,
                exchange='',
                routing_key=message.properties['reply_to'],
                correlation_id=message.properties['correlation_id'],
                # serializer='json',  # 序列化器
                retry=False,
                expiration=3,  # 设置消息到期时间,单位是秒
            )
            message.ack()
    
        def get_consumers(self, Consumer, channel):
            return [Consumer(
                queues=[rpc_queue],
                on_message=self.on_request,
                accept={'application/json'},
                # prefetch_count=1,
                no_ack=True,  # 自动确认消息,为True自动确认收到消息,为False或不写的话,不会自动确认消息,消息执行失败,下次重启还能收到该消息
            )]
    
        def on_request(self, message):
            try:
                pool.submit(self.task, message)
                # t_obj = Thread(target=self.task, args=(message,))
                # t_obj.start()
            except Exception as e:
                print(e)
    
    
    def Main():
        try:
            with Connection('amqp://qpm:cljslrl0620@it.sucheon.com/qpm') as conn:
                Worker(conn).run()
        except Exception as e:
            print(e)
    
    
    if __name__ == '__main__':
        Main()
    
    
  • 相关阅读:
    PHP chgrp() 函数
    PHP basename() 函数
    PHP user_error() 函数
    PHP trigger_error() 函数
    my.cnf需要改的参数
    WPF 使用 Direct2D1 画图入门
    win10 uwp 如何开始写 uwp 程序
    win10 uwp 如何开始写 uwp 程序
    C# 快速释放内存的大数组
    C# 快速释放内存的大数组
  • 原文地址:https://www.cnblogs.com/cnhyk/p/13474448.html
Copyright © 2011-2022 走看看