zoukankan      html  css  js  c++  java
  • 并发二分法查找算法【分布式】

    1. 利用【queue】【BaseManager】
    # master
    
    
    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-
    
    import queue
    from multiprocessing.managers import BaseManager
    
    # 发送任务的队列:
    task_queue = queue.Queue()
    # 接收结果的队列:
    result_queue = queue.Queue()
    
    
    # 增加如下的函数以适应windows ,【PicklingError】
    def task_queue_fun():
        return task_queue
    
    
    def result_queue_fun():
        return result_queue
    
    
    class QueueManager(BaseManager):
        pass
    
    
    def in_version_result_list(version_num,version_result_list):
        return version_num in [version_res[1] for version_res in version_result_list]
    
    
    def pre_check_search_version(check_list):
        if len(check_list) == 0:
            raise ValueError('list is empty!')
        return check_list[len(check_list) // 2]
    
    
    def multi_process_search(check_list, version_result_list, task_queue, result_queue, worker_timeout):
        # node个数
        node_num = 2
        # 根据node数拆分的list
        new_check_list_len, remain = divmod(len(check_list), node_num)
        if remain > 0:
            new_check_list_len += 1
        binary_list = [check_list[i:i + new_check_list_len] for i in range(0, len(check_list), new_check_list_len)]
        # 任务数量
        task_len = len(binary_list)
    
        # 查看每次列表情况
        print('待处理:')
        print('	 node_num %s' % node_num)
        print('	 org_check_list %s' % str(check_list))
    
        # add task
        for i in range(node_num):
            try:
                # 预处理,跳过不必要的重复搜索
                if not in_version_result_list(pre_check_search_version(binary_list[i]),version_result_list):
                    print('	 Put task %s' % str(binary_list[i]))
                    task_queue.put(binary_list[i])
                else:
                    task_len -= 1
                    print('	 version %s is in %s'%(pre_check_search_version(binary_list[i]),version_result_list))
                    # pass
            # 拆分后的列表小于node数量
            except IndexError:
                print('only %s nodes needed!'%len(binary_list))
    
        # get result
        for _ in range(task_len):
            r = result_queue.get(timeout=worker_timeout)
            version_result_list.append(r)
            print('	 Result: %s' % str(r))
        return_bool_list = [i[0] for i in version_result_list]
    
        # check
        print('处理结果:')
        print('	 return_result_list %s'% str(version_result_list))
    
        # 全部版本pass
        if all(return_bool_list):
            print('	 all True')
            max_true = check_list.index(max(version_result_list)[1])
            min_false = len(check_list)
    
        # 全部版本failed
        if not any(return_bool_list):
            print('	 all False')
            max_true = 0
            min_false = check_list.index(min(version_result_list)[1])
    
        # 有failed的版本和pass的版本
        if any(return_bool_list) and not all(return_bool_list):
            try:
                max_true = check_list.index(max(version_result_list)[1])
            except ValueError:
                max_true = 0
                print('	 all False ,max_true has been found in the upper recursion level!')
            try:
                min_false = check_list.index(min(version_result_list)[1])
            except ValueError:
                min_false = len(check_list)
                print('	 all True ,min_false has been found in the upper level!')
                # print(max_true, min_false)
                # raise
        if max_true+1 < min_false:
            # 返回递归结果
            return multi_process_search(check_list[max_true:min_false],version_result_list,task_queue,result_queue, worker_timeout)
    
        return min(version_result_list)
    
    
    if __name__ == '__main__':
    
        alist = [5, 10, 15, 18, 35, 55, 65, 75, 99 ,100 ,111 ,112 ,113 ,115 ,116 ,118 ,119,121,122,123,124,125]
    
        # 把两个Queue都注册到网络上, callable参数关联了Queue对象:
        QueueManager.register('get_task_queue', callable=task_queue_fun)
        QueueManager.register('get_result_queue', callable=result_queue_fun)
    
        # 绑定端口5000, 设置验证码'abc':
        manager = QueueManager(address=('127.0.0.1', 5001), authkey=b'abc')
        # manager = QueueManager(address=('public ip', 5001), authkey=b'abc')
        
        # 启动Queue:
        manager.start()
        # 获得通过网络访问的Queue对象:
        task_queue = manager.get_task_queue()
        result_queue = manager.get_result_queue()
    
        result_list = []
        res = multi_process_search(alist, result_list, task_queue, result_queue,worker_timeout=500)
        print(res)
    
        # 关闭:
        manager.shutdown()
        print('master exit.')
    
    
    # worker
    
    
    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-
    
    import time, sys,queue
    from multiprocessing.managers import BaseManager
    import os, time, random
    
    
    # 创建类似的QueueManager:
    class QueueManager(BaseManager):
        pass
    
    
    def check_version(version):
        time.sleep(10)
        print('	 process %s checking..' % os.getpid())
        if version <= 69:
            print('	 %s OK!' % version)
            return True
        else:
            print('	 %s not OK!' % version)
            return False
    
    
    # 多线程二分法
    def midd_search(check_list, check_function):
        if len(check_list) == 0:
            return False
        mid = len(check_list) // 2
        if check_function(check_list[mid]):
            return True, check_list[mid]
        else:
            return False, check_list[mid]
    
    
    if __name__ == '__main__':
    
        # 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:
        QueueManager.register('get_task_queue')
        QueueManager.register('get_result_queue')
    
        # 连接到服务器,也就是运行task_master.py的机器:
        server_addr = '127.0.0.1'
        # server_addr = 'public ip'
        print('Connect to server %s...' % server_addr)
        # 端口和验证码注意保持与task_master.py设置的完全一致:
        m = QueueManager(address=(server_addr, 5001), authkey=b'abc')
        # 从网络连接:
        m.connect()
    
        # 获取Queue的对象:
        task = m.get_task_queue()
        result = m.get_result_queue()
    
        # 从task队列取任务,并把结果写入result队列:
        for i in range(20):
            try:
                check_version_list = task.get(timeout=10)  # queue.Empty前等待timeout
                print('checking version %s' % check_version_list)
                res = midd_search(check_version_list,check_version)
                time.sleep(1)
                result.put(res)
            except queue.Empty:
                print('task queue is empty.')
            except (ConnectionResetError, EOFError):
                print('master done!')
    
        # 处理结束:
        print('worker exit.')
    
  • 相关阅读:
    42. Trapping Rain Water
    223. Rectangle Area
    645. Set Mismatch
    541. Reverse String II
    675. Cut Off Trees for Golf Event
    安装 VsCode 插件安装以及配置
    向上取整 向下取整 四舍五入 产生100以内随机数
    JS 判断是否为数字 数字型特殊值
    移动端初始配置,兼容不同浏览器的渲染内核
    Flex移动布局中单行和双行布局的区别以及使用
  • 原文地址:https://www.cnblogs.com/amize/p/15185096.html
Copyright © 2011-2022 走看看