zoukankan      html  css  js  c++  java
  • 使用django+rpc进行服务内部交互

    一、为什么使用rpc。

    1)相比uwsgi,使用rpc的长连接可以不需要频繁创建连接,提高传输效率。

    2)rpc支持同步和异步,对于不需要等待返回的消息可以不等待返回继续运行,减少客户端等待时间。

    3)使用rpc入口是我们自己定义的,可以根据不同消息类型定制不同的策略。

    二、设计思路

    使用统一入口,采用django的url resolve匹配,然后完成调用,不改变django rest接口的开发模式。

    服务端处理采用同步异步分离,异步任务用单独的进程处理,并为异步任务制定处理策略:

    1)对于同步任务,仍然需要立即调用返回。

    2)对于异步任务,可以进行任务分级:

          一级是重要任务,属于系统能力不足时必须优先保障的;

          二级任务,在系统能力足够时仍然需要执行,一旦能力不足,优先保障一级任务;

    3)对异步任务,制定执行策略:

          一是必须执行的任务,这部分任务即使积压也有一条条全部执行完成;

          二是只需要执行最后一条的,常见于更新信息,对于积压多条的同一消息,丢弃前面的,保留最后一条;

          三是可丢弃的,遇到性能不足,这一类消息不执行,直接丢弃。

    三、 grpc的proto文件

    syntax = "proto3";
    package rpc;
    service RPCServer {
      rpc handel(Input) returns (Output){}
    }
    
    message Input {
      string params = 1;
    }
    
    message Output {
      string content = 1;
    }

    入参为Input,返回为Output,所有接口调用都走这边。

    四、客户端调用

    import grpc
    import time
    import json
    import traceback
    import threading
    import uuid
    from datetime import datetime
    
    from . import data_pb2, data_pb2_grpc
    
    _HOST = ''
    _PORT = ''
    CHANNEL = grpc.insecure_channel(_HOST + ':' + _PORT)
    
    
    class ManoEncoder(json.JSONEncoder):
        def default(self, obj):
            if isinstance(obj, datetime):
                return str(obj)
            if isinstance(obj, uuid.UUID):
                return str(obj)
            return json.JSONEncoder.default(self, obj)
    
    
    def mano_encode(data):
        return json.dumps(data, cls=ManoEncoder)
    
    
    def call_rpc(url, headers, resource, content, logger):
        try:
            params = json.dumps({
                'url': url,
                'headers': headers,
                'method': resource['method'],
                'content': content
            })
            timeout = resource.get('timeout', 5)
            client = data_pb2_grpc.RPCServerStub(CHANNEL)
            response = client.handel.future(data_pb2.Input(params=params), timeout)
            while not response.done():
                time.sleep(0.01)
            result = json.loads(response.result().content)
            print(result['status_code'])
            return result['status_code'], mano_encode(result['data'])
        except Exception as err:
            logger.error(traceback.format_exc())
            logger.error('call url %s failed, msg is %s' % (url, err.message))
            return '409', err.message

    入参params需包含:rest url,头信息headers,rest类型,以及request body;

    结果采用异步获取,不持续占用连接,对于不需要结果的,可以不等待,这边没写。

    五、服务端实现

    import os
    import django
    
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "*.settings")
    django.setup()
    
    import grpc
    import json
    import time
    import random
    import traceback
    import threading
    import uuid
    import logging
    from datetime import datetime
    
    from concurrent import futures
    from multiprocessing import Process, Queue, Value
    from Queue import Queue as ManoQueue
    
    from . import data_pb2, data_pb2_grpc
    from django.urls import get_resolver
    from django.utils.functional import cached_property
    
    _ONE_DAY_IN_SECONDS = 60 * 60 * 24
    _HOST = '[::]'
    _PORT = '12330'
    _PROCESS_COUNT = 2
    RESOLVER = get_resolver()
    logger = logging.getLogger(__name__)
    
    message_queue = Queue()  # 异步任务队列,用于进程通信
    status_level2 = Value('I', 1)  # 二级队列状态,用于进程通信
    
    
    class ManoEncoder(json.JSONEncoder):
        def default(self, obj):
            if isinstance(obj, datetime):
                return str(obj)
            if isinstance(obj, uuid.UUID):
                return str(obj)
            return json.JSONEncoder.default(self, obj)
    
    
    def mano_encode(data):
        return json.dumps(data, cls=ManoEncoder)
    
    
    class RPCServer(data_pb2_grpc.RPCServerServicer):
        def handel(self, request, context):
            input_info = json.loads(request.params)
            if input_info.get('reply', True) is True:  # reply为True代表同步,否则异步
                res_url = input_info['url']
                headers = input_info['headers']
                method = input_info['method']
                content = input_info['content']
                status_code, data = self.call_sync(res_url, headers, method, content)
                return data_pb2.Output(content=mano_encode({'data': data, 'status_code': status_code}))
            else:
                if input_info['queue_detail']['level'] == 2 and not status_level2:
                    data = 'queue of status level2 is not active'
                    status_code = '409'
                else:
                    message_queue.put(request.params)
                    data = 'success'
                    status_code = '201'
                return data_pb2.Output(content=mano_encode({'data': data, 'status_code': status_code}))
    
        @staticmethod
        def call_sync(res_url, headers, method, content):
            try:
                resp_status, resp_body = call_inner(res_url, headers, method, content, logger)
                return resp_status, resp_body
            except Exception as err:
                logger.error(traceback.format_exc())
                logger.error('call url %s failed, msg is %s' % (res_url, err.message))
                return '409', err.message
    
    
    def main():  # rpc 服务主进程
        bind_address = '%s:%s' % (_HOST, _PORT)
        _run_server(bind_address)  # 启动rpc进程
        _run_queue_process()  # 启动异步任务处理进程
    
    
    def _run_server(bind_address):
        grpc_server = grpc.server(futures.ThreadPoolExecutor(max_workers=100, ))
        data_pb2_grpc.add_RPCServerServicer_to_server(RPCServer(), grpc_server)
        grpc_server.add_insecure_port(bind_address)
        grpc_server.start()
    
    
    def _run_queue_process():
        worker = Process(target=_handle_no_wait_request, args=(message_queue, status_level2,))
        worker.start()
        worker.join()
    
    
    def _handle_no_wait_request(q, status_2):  # 异步任务分类
        first_order_queue = [ManoQueue(maxsize=0), list()]
        second_order_queue = [ManoQueue(maxsize=1000), list()]
        mano_queue = [first_order_queue, second_order_queue]
        thread_pool = futures.ThreadPoolExecutor(max_workers=50)
        threading.Thread(target=_start_message_monitor, args=(q, mano_queue, status_2,)).start()  # 根据策略进行异步任务分类
        while True:
            num_threads = len(thread_pool._threads)
            if num_threads < 50:
                input_info = _get_request(mano_queue)  # 获取本次需执行的任务,每个队列机会均等
                res_url = input_info['url']
                headers = input_info['headers']
                method = input_info['method']
                content = input_info['content']
                thread_pool.submit(RPCServer.call_sync, res_url, headers, method, content)  # 交给工作线程
                logger.info('handle success')
            else:
                logger.info('process busy')
                time.sleep(0.1)
    
    
    def _start_message_monitor(q, mano_queue, status_2):
        while True:
            data = q.get()
            _handel_by_queue(data, mano_queue, status_2)
    
    
    def _get_request(mano_queue):
        active_index = _get_active_queue(mano_queue)
        if active_index:
            index = random.choice(active_index)
            i, k = int(index.split('_')[0]), int(index.split('_')[1])
            q = mano_queue[i][k]
            if isinstance(q, ManoQueue):
                request_info = json.loads(q.get())
            else:
                request_info = json.loads(q.pop(0))
        else:
            request_info = {}
        return request_info
    
    
    def _get_active_queue(mano_queue):
        active_index = []
        if not mano_queue[0][0].empty():
            active_index.append('0_0')
        if not mano_queue[1][0].empty():
            active_index.append('1_0')
        if len(mano_queue[0][1]) != 0:
            active_index.append('0_1')
        if len(mano_queue[1][1]) != 0:
            active_index.append('1_1')
        return active_index
    
    
    def _handel_by_queue(data, mano_queue, status_2):  # 根据请求级别进行消息分类
        input_info = json.loads(data)
        level = input_info['queue_detail']['level']
        policy = input_info['queue_detail']['limit_policy']
        if level == 1:
            _handel_by_policy(mano_queue[0], policy, data)
        elif level == 2:
            request_queue = mano_queue[1]
            _handel_by_policy(mano_queue[1], policy, data)
            if request_queue[0].qsize() > 0.8 * request_queue[0].maxsize:
                status_2.value = 0
            elif request_queue[0].qsize() < 0.6 * request_queue[0].maxsize:
                status_2.value = 1
    
    
    def _handel_by_policy(request_queue, policy, data):  # 根据请求策略进行消息分类
        if policy == 'execute':  # 必须执行的异步任务
            request_queue[0].put(data)
        elif policy == 'last':  # 阻塞时可以只执行最后一次的异步任务
            try:
                while True:
                    request_queue[1].remove(data)
            except ValueError:
                request_queue[1].append(data)
        else:  # 阻塞时可以丢弃的异步任务
            if request_queue[0].qsize < request_queue[0].maxsize * 0.6:
                request_queue[0].put(data)  # 先丢弃前面的
    
    
    def call_inner(res_url, headers, method, content, logger):
        logger.info('[call_inner] url is %s' % res_url)
        url, params = get_url_and_params(res_url)
        meta = get_meta(headers)
        request = Request(url=url, full_url=res_url, params=params, content=content, meta=meta, method=method)
        resolver_match = RESOLVER.resolve(url)  # URL 匹配
        callback, callback_args, callback_kwargs = resolver_match
        call_method = getattr(callback.view_class(), method.lower())
        if not method:
            return '404', 'not support this operate'
        try:
            if callback_kwargs:
                result = call_method(request, '', **callback_kwargs)
            else:
                result = call_method(request)
        except BaseException as err:
            logger.error(traceback.format_exc())
            logger.error('call url %s failed, msg is %s' % (res_url, err.message))
            return '409', err.message
        return str(result.status_code), result.data
    
    
    def get_url_and_params(full_url):
        params = {}
        if '?' in full_url:
            url, params_str = full_url.split('?')[0], full_url.split('?')[1]
            for key_value in params_str.split('&'):
                key, value = key_value.split('=')[0], key_value.split('=')[1]
                params[key] = value
        else:
            url = full_url
        return url, params
    
    
    def get_meta(headers):
        meta = {}
        # custom
        return meta
    
    
    class Request(object):
        def __init__(self, **kwargs):
            self.data = self.get_content(kwargs['content'])
            self.query_params = kwargs['params']
            self.path = kwargs['url']
            self.full_path = kwargs['full_url']
            self.FILES = {}
            self.META = kwargs['meta']
            self.COOKIES = {}
            self._request = InnerOBJ(kwargs['method'])
    
        @staticmethod
        def get_content(content):
            if not content:
                req_data = {}
            else:
                req_data = content if isinstance(content, dict) else json.loads(content)
            return req_data
    
        def __str__(self):
            return '<Request> %s' % self.path
    
        @cached_property
        def GET(self):
            return self.query_params
    
        def get_full_path(self):
            return self.full_path
    
    
    class InnerOBJ(object):
        def __init__(self, method):
            self.method = method.upper()
    
    
    if __name__ == '__main__':
        main()
  • 相关阅读:
    图-拓扑排序
    图-最短路径-Dijkstra及其变种
    【链表问题】打卡7:将单向链表按某值划分成左边小,中间相等,右边大的形式
    【链表问题】打卡5:环形单链表约瑟夫问题
    【链表问题】打卡6:三种方法带你优雅判断回文链表
    【链表问题】打卡4:如何优雅着反转单链表
    【链表问题】打卡3:删除单链表的中间节点
    【链表问题】打卡2:删除单链表的第 K个节点
    史上最全面试题汇总,没有之一,不接受反驳
    一些可以让你装逼的算法技巧总结
  • 原文地址:https://www.cnblogs.com/small-office/p/10998504.html
Copyright © 2011-2022 走看看