上次的并行计算是通过将大文件分割成小文件,涉及到文件分割,其实更有效的方法是在内存中对文件进行分割,分别计算
最后将返回结果直接写入目标文件,省去了分割小文件合并小文件删除小文件的过程
代码如下:
import json import math from multiprocessing import Pool import requests """ 不分割文件,直接起多个进程对文件进行读写 apply_async的callback接收的参数是调用行数的返回值,err_callback接收的参数是抛出来的异常 """ # 用户业务逻辑 def get_jw(addr_name): addr_name=addr_name.split(',')[1] url = 'https://restapi.amap.com/v3/geocode/geo?address={addr_name}&output=JSON&key=2f114ef951411f01c24a6384b59307a8' result = requests.get(url.format(addr_name=addr_name)) result_str = str(result.content, encoding="utf-8") rj = json.loads(result_str) if len(rj['geocodes']) > 0: jwd = rj['geocodes'][0]['location'] print(jwd) return addr_name + ',' + jwd + ' ' else: print('-,-') return addr_name + ',' + '-,-' + ' ' def my_callback(lines): with open('/opt/test/qiuxue/target2.txt', 'a') as f: f.writelines(lines) # 读取分块文件 class Reader(object): def __init__(self, file_name, start_pos, end_pos, business_func): self.file_name = file_name self.start_pos = start_pos self.end_pos = end_pos self.business_func = business_func def execute(self): lines = [] with open(self.file_name, 'rb') as f: if self.start_pos != 0: f.seek(self.start_pos - 1) if f.read(1) != ' ': line = f.readline() self.start_pos = f.tell() f.seek(self.start_pos) while self.start_pos < self.end_pos: line = f.readline().strip() line = str(line, encoding='utf8') try: new_line = self.business_func(line) lines.append(new_line) except Exception as e: offset = len(line.encode('utf8')) + 1 f.seek(-offset, 1) self.start_pos = f.tell() return ''.join(lines) # 将文件分成要求的块数,以list返回起止pos class FileBlock(object): def __init__(self, file_name, block_num): self.file_name = file_name self.block_num = block_num def block_file(self): pos_list = [] with open(self.file_name, 'r') as f: f.seek(0, 2) start_pos = 0 file_size = f.tell() block_size = math.ceil(file_size / self.block_num) while start_pos <= file_size: if start_pos + block_size > file_size: pos_list.append((start_pos, file_size)) else: pos_list.append((start_pos, start_pos + block_size)) start_pos = start_pos + block_size + 1 return pos_list if __name__ == '__main__': concurrency = 8 p = Pool(concurrency) input_file = '/opt/test/qiuxue/target.txt' fb = FileBlock(input_file, concurrency) for s, e in fb.block_file(): reader = Reader(input_file, s, e, get_jw) p.apply_async(reader.execute, callback=my_callback) p.close() p.join()