import time from multiprocessing import Process, JoinableQueue, cpu_count import csv ####处理一条数据的方法 def deal_line(line, writer, csv_file): writer.writerow((line[3], line[1])) csv_file.flush()#重点,在多进程中写文件需要尽快刷新,否则可能会导致数据丢失 ####消费者模型 def consumer(queue, writer, csv_file): while True: line = queue.get() deal_line(line, writer, csv_file) queue.task_done() ####生产者模型 def producer(queue): with open('test.txt', 'r') as f: for line in f: queue.put(line) ####启动N个生产者N个消费者模型 def main(): with open('t1.csv', 'w+') as csv_file: writer = csv.writer(csv_file) queue = JoinableQueue(8)#可限制队列长度 pc = Process(target=producer, args=(queue,)) pc.start() #多消费者 for _ in range(cpu_count()): c1 = Process(target=consumer, args=(queue, writer, csv_file)) c1.daemon = True c1.start() pc.join()#等待生产者进程全部生成完毕 queue.join()# 等待所有数据全部处理完毕 # 当某些些进程是死循环时可强制终止 # pc.terminate() if __name__ == '__main__': now = lambda: time.time() start = now() main() print("Time:", now() - start)