- 根据node个数,选出数字中node个数的数据进行验证
import time
from multiprocessing import Process
import os
from multiprocessing import Pool
import os, time, random
# 多线程二分法
def midd_search(check_list, check_function):
if len(check_list) == 0:
return False
mid = len(check_list) // 2
if check_function(check_list[mid]):
return True, check_list[mid]
# if len(check_list[mid+1:]) == 0:
# return {'last OK': check_list[mid]}
# return binary_search(check_list[mid + 1:], check_function)
else:
return False, check_list[mid]
# if len(check_list[:mid]) == 0:
# return {'first error': check_list[mid]}
# return binary_search(check_list[:mid], check_function)
# # 多线程二分法
# def get_issue_version1(version_list, check_function):
# return midd_search(version_list, check_function)
# 常规二分法
def binary_search(check_list, check_function):
if len(check_list) == 0:
return False
mid = len(check_list) // 2
if check_function(check_list[mid]):
if len(check_list[mid+1:]) == 0:
return {'last OK': check_list[mid]}
return binary_search(check_list[mid + 1:], check_function)
else:
if len(check_list[:mid]) == 0:
return {'first error': check_list[mid]}
return binary_search(check_list[:mid], check_function)
def get_issue_version(version_list, check_function, result_list = None):
# 单线程
# resit_dic = binary_search(version_list, check_function)
# 多线程
resit_dic = multi_process_search(version_list, check_function,result_list)
return resit_dic
# if 'last OK' in resit_dic:
# return version_list[version_list.index(resit_dic['last OK'])+1]
# return resit_dic['first error']
def multi_process_search(check_list, check_function, return_result_list):
# node个数
node_num = 4
new_check_list_len, remain = divmod(len(check_list), node_num)
if remain > 0:
new_check_list_len += 1
# 根据node数拆分的list
binary_list = [check_list[i:i + new_check_list_len] for i in range(0, len(check_list), new_check_list_len)]
print('org_check_list %s' % str(check_list))
print('binary_check_list %s' % str(binary_list))
p = Pool(node_num)
result_dic = {}
for i in range(node_num):
# reslt = get_issue_version(binary_list[i], check_version1) # 原始的方法,需要递归完一组list才返回
try:
p_result = p.apply_async(midd_search, args=(binary_list[i], check_function))
result_dic[i] = p_result
# print(result.get()) # 这个会hang住,导致不能异步执行
except IndexError:
pass
print('Waiting for all subprocesses done...')
p.close()
p.join()
print('All subprocesses done.')
result_list = [i.get() for i in result_dic.values()]
print(result_list)
return_result_list.extend(result_list)
try:
max_true = max([i.get()[1] for i in result_dic.values() if i.get()[0]])
max_true = check_list.index(max_true)
except ValueError:
# all the version failed, check the first error
max_true = 0
try:
min_false = min([i.get()[1] for i in result_dic.values() if not i.get()[0]])
min_false = check_list.index(min_false)
except ValueError:
# all the version pass, check the last error
min_false = len(check_list)
# print(max_true,min_false)
# print(check_list[max_true],check_list[min_false])
if max_true+1 < min_false:
# 返回递归结果
return multi_process_search(check_list[max_true:min_false], check_function,return_result_list)
# version <= 1 会遗漏第一个
# return multi_process_search(check_list[max_true+1:min_false], check_function,return_result_list)
# else:
# pass
# if 0 < max_true < min_false < len(check_list)-1:
# return {'last OK': check_list[max_true],
# 'first error': check_list[min_false]}
# # all the version failed, return the first error
# if max_true == 0:
# print('first error')
# return {'first error': check_list[max_true]}
# # all the version pass, return the last error
# if min_false == len(check_list)-1:
# print('last OK')
# return {'last OK': check_list[min_false-1]}
# 版本是否ok,True ok
def check_version(version):
time.sleep(0.1)
print('process %s checking..' % os.getpid())
if version <= 69:
print('%s OK!' % version)
return True
else:
print('%s not OK!' % version)
return False
if __name__ == '__main__':
# 多进程函数不能放在main函数里面
# def check_version1(version):
# time.sleep(1)
# print('checking..')
# if version <= 75: # last OK
# # if version <= 35: # first error
# print('found!')
# return True
# else:
# print('not found!')
# return False
# 版本列表
alist = [5, 10, 15, 18, 35, 55, 65, 75, 99 ,100 ,111 ,112 ,113 ,115 ,116 ,118 ,119,121,122,123,124,125]
result_list = []
# res = multi_process_search(alist,check_version)
res = get_issue_version(alist,check_version,result_list)
print(res)
print(min(result_list))
- 直接按照node个数,分割数组
# 缺点
每个node会递归计算完自己的list,但是这个list如果全部failed/pass,就是多余计算
# 优点
实现简单
- 按照node个数,间隔划分
# 比如两个node,就按照奇数偶数划分list
看上去不错,不过貌似和方法2一样