zoukankan      html  css  js  c++  java
  • 基于内存共享的并行排序算法慢谈(下)

    题目再现:请用Python多线程对一个4G以上的文件, 进行外排序,尽量优化性能。假设系统内核数为8,Mem=512MB,关键字是字符串

    6.实验

    6.1python多线程

    关于python多线程可以参考:http://www.cnblogs.com/holbrook/archive/2012/03/02/2376940.html

    python的多线程机制和Java很像,这里就不多讲了。似乎没有涉及到共享变量,锁也不用了。

    6.2问题解决思路

      我也想过先划分数据再交给线程,但是没试过,感觉需要很多内存,搞不好还会浪费。

      第三个非常恶劣的问题就是,线程里面新建线程会产生很多问题,具体我说不清楚。

      第四个问题就是,pdb调试无法在线程里面 设置断点,线程的问题无法跟踪,可能是我太蠢了。

    由于以上问题,我最好只能换了一种方法,就是用前面提到的算法间并行。

    开始时主线程读取数据,每读到一定量时就开辟一个新的线程,把线程插入到一个队列里。

    主线程把读到的数据块交给线程排序。线程启动,线程排完序后会将结果输出到文件。

    主线程每次读新数据的时候,先判断下是否还有足够内存开瓶新的数据。若没有则等待线程队里的第一个线程结束。

    因为第一个线程是最先开辟的,它结束后就可以释放内存了。

    当主线程读完所有数据时,等待线程队列里的所有线程结束。

    然后主线程使用归并思想对子文件数据进行归并,并输出到最终文件。

    在归并的时候,若内存有多,尽量预取数据。

    6.3完整代码

    产生待排序数据
     1 import random
     2 
     3 def generatekey(num):
     4     str = []
     5     while num > 0:
     6         str.append(chr(random.randint(97, 122)))
     7         num = num - 1
     8     str.append('\n')
     9     return ''.join(str)
    10 
    11 print 'please enter the number'
    12 N = input()
    13 f=open('unsortdata','w')
    14 while N > 0:
    15     str = generatekey(30)
    16     f.write(str)
    17     N = N - 1
    18     
    19 f.close()
    多线程排序
      1 import threading
      2 import time
      3 import random
      4 
      5 class MyThread(threading.Thread): 
      6 
      7     def __init__(self, file, data):
      8         threading.Thread.__init__(self)
      9         self.file = file
     10         self.data = data
     11 
     12     def run(self):
     13         last = len(self.data)
     14         self.quickSort(0, last - 1)
     15         for j in range(last):
     16             self.file.write(self.data[j])
     17         self.file.close()
     18 
     19     def quickSort(self, first, last):
     20         if last - first > 7:
     21             mid = self.partition(first, last)
     22             if first < mid - 1:
     23                 self.quickSort(first, mid - 1)
     24             if mid + 1 < last:
     25                 self.quickSort(mid + 1, last)
     26         else:
     27             self.insertSort(first, last)
     28 
     29     def insertSort(self, first, last):
     30         for i in range(first + 1, last + 1):
     31             if self.data[i - 1] > self.data[i]:
     32                 comp = self.data[i]
     33                 low = first
     34                 high = i - 1
     35                 while low <= high:
     36                     mid = (low + high) / 2
     37                     if self.data[mid] < comp:
     38                         low = mid + 1
     39                     else:
     40                         high = mid - 1
     41                 j = i - 1
     42                 while j >= high + 1:
     43                     self.data[j + 1] = self.data[j]
     44                     j = j - 1
     45                 self.data[high + 1] = comp
     46 
     47     def partition(self, first, last): 
     48         mid = random.randint(first, last)
     49         comp = self.data[mid]
     50         #交换临界值和数组第一个值
     51         temp = self.data[first]
     52         self.data[first] = comp
     53         self.data[mid] = temp
     54         while first < last:
     55             while first < last and self.data[last] >= comp:
     56                 last = last - 1
     57             if first < last:
     58                 self.data[first] = self.data[last]
     59                 first = first + 1
     60             while first < last and self.data[first] <= comp:
     61                 first = first + 1
     62             if first < last:
     63                 self.data[last] = self.data[first]
     64                 last = last - 1
     65         self.data[first] = comp
     66         return first
     67 
     68 def getbuffer(flist, i, X):
     69     temp = []
     70     for j in range(X):
     71         str = flist[i].readline()
     72         if str == '':
     73             break
     74         temp.append(str)
     75     return temp
     76 
     77 def outerMergeSort(X, N, M):
     78     buffer = []
     79     Mlist = []
     80     flist = []
     81     num = 0
     82     for i in range(M):
     83         flist.append(open('sortdata' + str(i), 'r'))
     84         buffer.append(getbuffer(flist, i, X))
     85         Mlist.append([buffer[i][0], i])
     86         del buffer[i][0]
     87     f = open('sortdata', 'w')        
     88     while(num < N and len(Mlist) > 0):
     89         Mlist.sort()
     90 
     91         f.write(Mlist[0][0])
     92         num = num + 1
     93         fro = Mlist[0][1]
     94         del Mlist[0]
     95         #here has a bug if X == 1
     96         if len(buffer[fro]) > 0:
     97             Mlist.append([buffer[fro][0], fro])
     98         if len(buffer[fro]) <= 1:
     99             buffer[fro] = getbuffer(flist, fro, X)
    100         else:
    101             del buffer[fro][0]
    102 
    103     f.close()
    104     for i in range(M):
    105         flist[i].close()
    106 
    107 if __name__ == '__main__':
    108     #N为待排序的总数据量
    109     #M为线程数,Sum为每个线程处理的数据量
    110     #X为每个子文件的预取数组大小
    111     #L为内存限制的数据量
    112     print 'please enter the number N'
    113     N = input()
    114     print 'please enter the number M'
    115     M = input()
    116     Sum = N / M
    117     #print 'please enter the mem limit data'
    118     L = 8 * Sum 
    119     X = (L - M) / M
    120     ThreadL = L / Sum
    121     filename = 'unsortdata'
    122     f = open(filename, 'r')
    123     i = 0
    124     #tlist存放线程
    125     tlist = []
    126     data = []
    127     start = time.time()
    128     while i < M:
    129         #主线程从unsortdata中读取Sum行
    130         for j in range(Sum):
    131             data.append(f.readline())
    132         if len(tlist) == ThreadL:
    133             tlist[0].join()
    134         #新建子文件用于输出线程排序结果
    135         file = open('sortdata' + str(i), 'w+')
    136         #新建线程使用混合快速排序对data进行排序,将排序后的结果输出到flist[i]中
    137         t = MyThread(file,  data[:])
    138         tlist.append(t)
    139         t.start()
    140         if len(tlist) == ThreadL:
    141             del tlist[0]
    142         data = []
    143         i = i + 1
    144 
    145     f.close()
    146     for i in range(len(tlist)):
    147         tlist[i].join()
    148 
    149     #归并外排序
    150     outerMergeSort(X, N, M)
    151     finish = time.time()
    152     print 'total cost time:%s\n' % (finish - start)

    6.4后期实验

    可以做很多后期实验,但是时间有限

    N=200000

    M=1,T=12.854446888

    M=2,T=8.58908414841

    M=4,T=6.01675200462

    M=8,T=4.41931986809

    M=16,T=3.61740279198 内存限制8线程

    这些都是一次实验,没求平均。

    N=200000,M=8, 内排序消耗时间2.29,总共时间4.10

    N=200000,M=4,内排序消耗时间2.62,总共时间5.61

    ......

    全文完

    后记:感谢李兄台指出的问题,python很多版本不能实现并行。看样子要并行的话得换个语言。

    另外在本文中我们也发现了,虽然是并发,但是性能提升还是很显著的,这主要是因为算法的瓶颈在IO的缘故。

  • 相关阅读:
    iOS新建项目基础设置
    Burp安装及配置(修改参数测试)
    打开他人代码,如何运行起来
    四、python用户交互程序
    三、python_字符编码与二进制
    二、python_变量要求
    一、python_(入门)
    linux 常用命令之一
    Atcoder Regular Contest 093 C
    「HNOI2016」序列
  • 原文地址:https://www.cnblogs.com/2010Freeze/p/2563815.html
Copyright © 2011-2022 走看看