zoukankan      html  css  js  c++  java
  • 并发二分法查找算法【分布式】

    1. 利用【queue】【BaseManager】
    # master
    
    
    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-
    
    import queue
    from multiprocessing.managers import BaseManager
    
    # 发送任务的队列:
    task_queue = queue.Queue()
    # 接收结果的队列:
    result_queue = queue.Queue()
    
    
    # 增加如下的函数以适应windows ,【PicklingError】
    def task_queue_fun():
        return task_queue
    
    
    def result_queue_fun():
        return result_queue
    
    
    class QueueManager(BaseManager):
        pass
    
    
    def in_version_result_list(version_num,version_result_list):
        return version_num in [version_res[1] for version_res in version_result_list]
    
    
    def pre_check_search_version(check_list):
        if len(check_list) == 0:
            raise ValueError('list is empty!')
        return check_list[len(check_list) // 2]
    
    
    def multi_process_search(check_list, version_result_list, task_queue, result_queue, worker_timeout):
        # node个数
        node_num = 2
        # 根据node数拆分的list
        new_check_list_len, remain = divmod(len(check_list), node_num)
        if remain > 0:
            new_check_list_len += 1
        binary_list = [check_list[i:i + new_check_list_len] for i in range(0, len(check_list), new_check_list_len)]
        # 任务数量
        task_len = len(binary_list)
    
        # 查看每次列表情况
        print('待处理:')
        print('	 node_num %s' % node_num)
        print('	 org_check_list %s' % str(check_list))
    
        # add task
        for i in range(node_num):
            try:
                # 预处理,跳过不必要的重复搜索
                if not in_version_result_list(pre_check_search_version(binary_list[i]),version_result_list):
                    print('	 Put task %s' % str(binary_list[i]))
                    task_queue.put(binary_list[i])
                else:
                    task_len -= 1
                    print('	 version %s is in %s'%(pre_check_search_version(binary_list[i]),version_result_list))
                    # pass
            # 拆分后的列表小于node数量
            except IndexError:
                print('only %s nodes needed!'%len(binary_list))
    
        # get result
        for _ in range(task_len):
            r = result_queue.get(timeout=worker_timeout)
            version_result_list.append(r)
            print('	 Result: %s' % str(r))
        return_bool_list = [i[0] for i in version_result_list]
    
        # check
        print('处理结果:')
        print('	 return_result_list %s'% str(version_result_list))
    
        # 全部版本pass
        if all(return_bool_list):
            print('	 all True')
            max_true = check_list.index(max(version_result_list)[1])
            min_false = len(check_list)
    
        # 全部版本failed
        if not any(return_bool_list):
            print('	 all False')
            max_true = 0
            min_false = check_list.index(min(version_result_list)[1])
    
        # 有failed的版本和pass的版本
        if any(return_bool_list) and not all(return_bool_list):
            try:
                max_true = check_list.index(max(version_result_list)[1])
            except ValueError:
                max_true = 0
                print('	 all False ,max_true has been found in the upper recursion level!')
            try:
                min_false = check_list.index(min(version_result_list)[1])
            except ValueError:
                min_false = len(check_list)
                print('	 all True ,min_false has been found in the upper level!')
                # print(max_true, min_false)
                # raise
        if max_true+1 < min_false:
            # 返回递归结果
            return multi_process_search(check_list[max_true:min_false],version_result_list,task_queue,result_queue, worker_timeout)
    
        return min(version_result_list)
    
    
    if __name__ == '__main__':
    
        alist = [5, 10, 15, 18, 35, 55, 65, 75, 99 ,100 ,111 ,112 ,113 ,115 ,116 ,118 ,119,121,122,123,124,125]
    
        # 把两个Queue都注册到网络上, callable参数关联了Queue对象:
        QueueManager.register('get_task_queue', callable=task_queue_fun)
        QueueManager.register('get_result_queue', callable=result_queue_fun)
    
        # 绑定端口5000, 设置验证码'abc':
        manager = QueueManager(address=('127.0.0.1', 5001), authkey=b'abc')
        # manager = QueueManager(address=('public ip', 5001), authkey=b'abc')
        
        # 启动Queue:
        manager.start()
        # 获得通过网络访问的Queue对象:
        task_queue = manager.get_task_queue()
        result_queue = manager.get_result_queue()
    
        result_list = []
        res = multi_process_search(alist, result_list, task_queue, result_queue,worker_timeout=500)
        print(res)
    
        # 关闭:
        manager.shutdown()
        print('master exit.')
    
    
    # worker
    
    
    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-
    
    import time, sys,queue
    from multiprocessing.managers import BaseManager
    import os, time, random
    
    
    # 创建类似的QueueManager:
    class QueueManager(BaseManager):
        pass
    
    
    def check_version(version):
        time.sleep(10)
        print('	 process %s checking..' % os.getpid())
        if version <= 69:
            print('	 %s OK!' % version)
            return True
        else:
            print('	 %s not OK!' % version)
            return False
    
    
    # 多线程二分法
    def midd_search(check_list, check_function):
        if len(check_list) == 0:
            return False
        mid = len(check_list) // 2
        if check_function(check_list[mid]):
            return True, check_list[mid]
        else:
            return False, check_list[mid]
    
    
    if __name__ == '__main__':
    
        # 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:
        QueueManager.register('get_task_queue')
        QueueManager.register('get_result_queue')
    
        # 连接到服务器,也就是运行task_master.py的机器:
        server_addr = '127.0.0.1'
        # server_addr = 'public ip'
        print('Connect to server %s...' % server_addr)
        # 端口和验证码注意保持与task_master.py设置的完全一致:
        m = QueueManager(address=(server_addr, 5001), authkey=b'abc')
        # 从网络连接:
        m.connect()
    
        # 获取Queue的对象:
        task = m.get_task_queue()
        result = m.get_result_queue()
    
        # 从task队列取任务,并把结果写入result队列:
        for i in range(20):
            try:
                check_version_list = task.get(timeout=10)  # queue.Empty前等待timeout
                print('checking version %s' % check_version_list)
                res = midd_search(check_version_list,check_version)
                time.sleep(1)
                result.put(res)
            except queue.Empty:
                print('task queue is empty.')
            except (ConnectionResetError, EOFError):
                print('master done!')
    
        # 处理结束:
        print('worker exit.')
    
  • 相关阅读:
    使用HttpModule实现权限系统
    Asp.net的HTTP请求处理过程
    Asp.net的HTTP请求处理过程
    IHttpModule
    HttpModule内部事件机制和生命周期
    java 窗口中的动态效果
    first
    判断素数
    螺旋矩阵
    JavaBean笔记
  • 原文地址:https://www.cnblogs.com/amize/p/15185096.html
Copyright © 2011-2022 走看看