zoukankan      html  css  js  c++  java
  • 基于内存共享的并行排序算法慢谈(下)

    题目再现:请用Python多线程对一个4G以上的文件, 进行外排序,尽量优化性能。假设系统内核数为8,Mem=512MB,关键字是字符串

    6.实验

    6.1python多线程

    关于python多线程可以参考:http://www.cnblogs.com/holbrook/archive/2012/03/02/2376940.html

    python的多线程机制和Java很像,这里就不多讲了。似乎没有涉及到共享变量,锁也不用了。

    6.2问题解决思路

      我也想过先划分数据再交给线程,但是没试过,感觉需要很多内存,搞不好还会浪费。

      第三个非常恶劣的问题就是,线程里面新建线程会产生很多问题,具体我说不清楚。

      第四个问题就是,pdb调试无法在线程里面 设置断点,线程的问题无法跟踪,可能是我太蠢了。

    由于以上问题,我最好只能换了一种方法,就是用前面提到的算法间并行。

    开始时主线程读取数据,每读到一定量时就开辟一个新的线程,把线程插入到一个队列里。

    主线程把读到的数据块交给线程排序。线程启动,线程排完序后会将结果输出到文件。

    主线程每次读新数据的时候,先判断下是否还有足够内存开瓶新的数据。若没有则等待线程队里的第一个线程结束。

    因为第一个线程是最先开辟的,它结束后就可以释放内存了。

    当主线程读完所有数据时,等待线程队列里的所有线程结束。

    然后主线程使用归并思想对子文件数据进行归并,并输出到最终文件。

    在归并的时候,若内存有多,尽量预取数据。

    6.3完整代码

    产生待排序数据
     1 import random
     2 
     3 def generatekey(num):
     4     str = []
     5     while num > 0:
     6         str.append(chr(random.randint(97, 122)))
     7         num = num - 1
     8     str.append('\n')
     9     return ''.join(str)
    10 
    11 print 'please enter the number'
    12 N = input()
    13 f=open('unsortdata','w')
    14 while N > 0:
    15     str = generatekey(30)
    16     f.write(str)
    17     N = N - 1
    18     
    19 f.close()
    多线程排序
      1 import threading
      2 import time
      3 import random
      4 
      5 class MyThread(threading.Thread): 
      6 
      7     def __init__(self, file, data):
      8         threading.Thread.__init__(self)
      9         self.file = file
     10         self.data = data
     11 
     12     def run(self):
     13         last = len(self.data)
     14         self.quickSort(0, last - 1)
     15         for j in range(last):
     16             self.file.write(self.data[j])
     17         self.file.close()
     18 
     19     def quickSort(self, first, last):
     20         if last - first > 7:
     21             mid = self.partition(first, last)
     22             if first < mid - 1:
     23                 self.quickSort(first, mid - 1)
     24             if mid + 1 < last:
     25                 self.quickSort(mid + 1, last)
     26         else:
     27             self.insertSort(first, last)
     28 
     29     def insertSort(self, first, last):
     30         for i in range(first + 1, last + 1):
     31             if self.data[i - 1] > self.data[i]:
     32                 comp = self.data[i]
     33                 low = first
     34                 high = i - 1
     35                 while low <= high:
     36                     mid = (low + high) / 2
     37                     if self.data[mid] < comp:
     38                         low = mid + 1
     39                     else:
     40                         high = mid - 1
     41                 j = i - 1
     42                 while j >= high + 1:
     43                     self.data[j + 1] = self.data[j]
     44                     j = j - 1
     45                 self.data[high + 1] = comp
     46 
     47     def partition(self, first, last): 
     48         mid = random.randint(first, last)
     49         comp = self.data[mid]
     50         #交换临界值和数组第一个值
     51         temp = self.data[first]
     52         self.data[first] = comp
     53         self.data[mid] = temp
     54         while first < last:
     55             while first < last and self.data[last] >= comp:
     56                 last = last - 1
     57             if first < last:
     58                 self.data[first] = self.data[last]
     59                 first = first + 1
     60             while first < last and self.data[first] <= comp:
     61                 first = first + 1
     62             if first < last:
     63                 self.data[last] = self.data[first]
     64                 last = last - 1
     65         self.data[first] = comp
     66         return first
     67 
     68 def getbuffer(flist, i, X):
     69     temp = []
     70     for j in range(X):
     71         str = flist[i].readline()
     72         if str == '':
     73             break
     74         temp.append(str)
     75     return temp
     76 
     77 def outerMergeSort(X, N, M):
     78     buffer = []
     79     Mlist = []
     80     flist = []
     81     num = 0
     82     for i in range(M):
     83         flist.append(open('sortdata' + str(i), 'r'))
     84         buffer.append(getbuffer(flist, i, X))
     85         Mlist.append([buffer[i][0], i])
     86         del buffer[i][0]
     87     f = open('sortdata', 'w')        
     88     while(num < N and len(Mlist) > 0):
     89         Mlist.sort()
     90 
     91         f.write(Mlist[0][0])
     92         num = num + 1
     93         fro = Mlist[0][1]
     94         del Mlist[0]
     95         #here has a bug if X == 1
     96         if len(buffer[fro]) > 0:
     97             Mlist.append([buffer[fro][0], fro])
     98         if len(buffer[fro]) <= 1:
     99             buffer[fro] = getbuffer(flist, fro, X)
    100         else:
    101             del buffer[fro][0]
    102 
    103     f.close()
    104     for i in range(M):
    105         flist[i].close()
    106 
    107 if __name__ == '__main__':
    108     #N为待排序的总数据量
    109     #M为线程数,Sum为每个线程处理的数据量
    110     #X为每个子文件的预取数组大小
    111     #L为内存限制的数据量
    112     print 'please enter the number N'
    113     N = input()
    114     print 'please enter the number M'
    115     M = input()
    116     Sum = N / M
    117     #print 'please enter the mem limit data'
    118     L = 8 * Sum 
    119     X = (L - M) / M
    120     ThreadL = L / Sum
    121     filename = 'unsortdata'
    122     f = open(filename, 'r')
    123     i = 0
    124     #tlist存放线程
    125     tlist = []
    126     data = []
    127     start = time.time()
    128     while i < M:
    129         #主线程从unsortdata中读取Sum行
    130         for j in range(Sum):
    131             data.append(f.readline())
    132         if len(tlist) == ThreadL:
    133             tlist[0].join()
    134         #新建子文件用于输出线程排序结果
    135         file = open('sortdata' + str(i), 'w+')
    136         #新建线程使用混合快速排序对data进行排序,将排序后的结果输出到flist[i]中
    137         t = MyThread(file,  data[:])
    138         tlist.append(t)
    139         t.start()
    140         if len(tlist) == ThreadL:
    141             del tlist[0]
    142         data = []
    143         i = i + 1
    144 
    145     f.close()
    146     for i in range(len(tlist)):
    147         tlist[i].join()
    148 
    149     #归并外排序
    150     outerMergeSort(X, N, M)
    151     finish = time.time()
    152     print 'total cost time:%s\n' % (finish - start)

    6.4后期实验

    可以做很多后期实验,但是时间有限

    N=200000

    M=1,T=12.854446888

    M=2,T=8.58908414841

    M=4,T=6.01675200462

    M=8,T=4.41931986809

    M=16,T=3.61740279198 内存限制8线程

    这些都是一次实验,没求平均。

    N=200000,M=8, 内排序消耗时间2.29,总共时间4.10

    N=200000,M=4,内排序消耗时间2.62,总共时间5.61

    ......

    全文完

    后记:感谢李兄台指出的问题,python很多版本不能实现并行。看样子要并行的话得换个语言。

    另外在本文中我们也发现了,虽然是并发,但是性能提升还是很显著的,这主要是因为算法的瓶颈在IO的缘故。

  • 相关阅读:
    java中的 equals 与 ==
    String类的内存分配
    SVN用命令行更换本地副本IP地址
    npoi 设置单元格格式
    net core 微服务框架 Viper 调用链路追踪
    打不死的小强 .net core 微服务 快速开发框架 Viper 限流
    net core 微服务 快速开发框架 Viper 初体验20201017
    Anno 框架 增加缓存、限流策略、事件总线、支持 thrift grpc 作为底层传输
    net core 微服务 快速开发框架
    Viper 微服务框架 编写一个hello world 插件02
  • 原文地址:https://www.cnblogs.com/2010Freeze/p/2563815.html
Copyright © 2011-2022 走看看