zoukankan      html  css  js  c++  java
  • Python并发实践_03_并发实战之一

    16S数据质控流程,一次下机lane包括很多的项目,每个项目有独立的合同号,一个项目可能包含16S或者ITS两种,通过一个完整的pipeline,将上游拆分好的数据全部整理成可以直接分析的数据。原本这个工作是通过并行的sge实现,是运行层面的并行,现在在程序层面实现并行处理,可以脱离sge系统工作。

      1 import os
      2 import sys
      3 import re
      4 import time
      5 import collections
      6 from multiprocessing import Process,JoinableQueue,Queue,cpu_count
      7 from threading import Thread
      8 from settings import primer,pandaseq_soft
      9 from programs import *
     10 
     11 Result = collections.namedtuple("Result","compact sample_name HQ_fq")
     12 
     13 def parse_sam_barcode_file(sam_barcode_file):
     14     for line in open(sam_barcode_file):
     15         yield line.strip().split('	')
     16 
     17 def proc(compact,sample_name,work_path,lib_method,data_type):
     18     split_path = '%s/Split'%work_path
     19     QC_path = '%s/QC'%work_path
     20     compact_path = '%s/%s'%(QC_path,compact)
     21     if not os.path.exists(compact_path):
     22         os.makedirs(compact_path)
     23     sample_path = '%s/%s'%(compact_path,sample_name)
     24     if not os.path.exists(sample_path):
     25         os.makedirs(sample_path)
     26     original_path = '%s/%s/%s'%(split_path,compact,sample_name)
     27     (read1,read2) = os.popen('ls %s/*'%original_path).read().strip().split('
    ')
     28     pandaseq_fq = '%s/pandaseq.fq'%sample_path
     29     pandaseq_log = '%s/pandaseq.log'%sample_path
     30     pandaseq(pandaseq_soft,read1,read2,pandaseq_fq,primer[lib_method][data_type]['forward'],primer[lib_method][data_type]['reverse'],pandaseq_log)
     31     high_quality_fq = '%s/high_quality.fq'%sample_path
     32     high_quality_log = '%s/high_quality.stat'%sample_path
     33     QC(pandaseq_fq,high_quality_fq,high_quality_log,data_type)
     34     return Result(compact,sample_name,high_quality_fq)
     35 
     36 def worker(work_path,jobs,results):
     37     while True:
     38         try:
     39             compact,sample_name,lib_method,data_type = jobs.get()
     40             try:
     41                 result = proc(compact,sample_name,work_path,lib_method,data_type)
     42                 sys.stderr.write( 'Process %s is finished doing with compact:%s sample_name:%s
    '%(os.getpid(),compact,sample_name) )
     43                 results.put(result)
     44             except:
     45                 sys.stderr.write('Process %s is FIALED !!! %s/%s may be some problem!
    '%(os.getpid(),compact,sample_name))
     46                 jobs.put((compact,sample_name,lib_method,data_type))
     47                 sys.stderr.write('The job is repushed into the queue,with compact:%s sample_name:%s
    '%(compact,sample_name))
     48         finally:
     49             jobs.task_done()
     50 
     51 def add_jobs(work_path,sam_barcode_file_list,jobs):
     52     job_num = 0
     53     data_type_hash = {}
     54     for todo,sam_barcode_file in enumerate(sam_barcode_file_list):
     55         sam_barcode_file = sam_barcode_file.strip()
     56         if not os.path.isfile(sam_barcode_file):
     57             continue
     58         lib_method = get_lib_method(sam_barcode_file)
     59         if lib_method is None:
     60             continue
     61         print 'sam_barcode_file loading: %s             ......  ok
    '%sam_barcode_file
     62         for compact,sample_name,barcode_info,data_type in parse_sam_barcode_file(sam_barcode_file):
     63         print 'sam_barcode_file loading: %s             ......  ok
    '%sam_barcode_file
     64         for compact,sample_name,barcode_info,data_type in parse_sam_barcode_file(sam_barcode_file):
     65             if not data_type_hash.has_key(compact):
     66                 data_type_hash[compact] = {}
     67             if not data_type_hash[compact].has_key(data_type):
     68                 data_type_hash[compact][data_type] = []
     69             data_type_hash[compact][data_type].append(sample_name)
     70             jobs.put((compact,sample_name,lib_method,data_type))
     71             job_num += 1
     72             sys.stderr.write('The job is pushed into the queue,with compact:%s sample_name:%s
    '%(compact,sample_name))
     73     sys.stderr.write('
    ### All %s jobs have been pushed into the queue ###
    '%job_num)
     74     return data_type_hash
     75 
     76 def create_processes(concurrency,jobs,work_path,results):
     77     print '
    Begin create jobs with %s Process...
    '%concurrency
     78     for _ in range(concurrency):
     79         process = Process(target=worker,args=(work_path,jobs,results))
     80         process.daemon = True
     81         process.start()
     82 
     83 def main(work_path,sam_barcode_file_list):
     84     global concurrency
     85     split_path = '%s/Split'%work_path
     86     QC_path = '%s/QC'%work_path
     87     jobs = JoinableQueue()
     88     results = Queue()
     89 
     90     canceled = False
     91     data_type_hash = add_jobs(split_path,sam_barcode_file_list,jobs)
     92     create_processes(concurrency,jobs,work_path,results)
     93     try:
     94         jobs.join()
     95     except KeyboardInterrupt:
     96         sys.stderr.write('cancelling ...
    ')
     97         canceled = True
     98     finally:
     99         job_num = 0
    100         finished_hash = {}
    101         while not results.empty():
    102             result = results.get_nowait()
    103             job_num += 1
    104             if not finished_hash.has_key(result.compact):
    105                 finished_hash[result.compact] = []
    106             finished_hash[result.compact].append(result.sample_name)
    107         sys.stderr.write('all %s work finished!
    
    '%job_num)
    108         log_out = open('%s/work.log'%QC_path,'w')
    109         for compact,sample_list in finished_hash.iteritems():
    110             for sample_name in sample_list:
    111                 log_out.write('%s	%s has been finished
    '%(compact,sample_name))
    112         log_out.close()
    113     if canceled:
    114         return False
    115 
    116     for compact in os.listdir(QC_path):
    117         compact_dir = '%s/%s'%(QC_path,compact)
    118         if not os.path.isdir(compact_dir):
    119             continue
    120         sys.stderr.write('Begin stat compact: %s
    '%compact)
    121         reads_stat(compact_dir)
    122     sys.stderr.write('All campact stat finished!
    
    ')
    123 
    124     reads_stat_all(QC_path,split_path)
    125 
    126     merge_threads = set()
    127     for compact,subitem in data_type_hash.iteritems():
    128         compact_dir = '%s/%s'%(QC_path,compact)
    129         for data_type,sample_list in subitem.iteritems():
    130             merged_file = '%s/%s/%s.together.fna'%(QC_path,compact,data_type)
    131             t = Thread(target=sampleMerge,args=(sample_list,data_type,compact_dir,merged_file))
    132             merge_threads.add(t)
    133             t.start()
    134             while True:
    135                 if threading.activeCount() < concurrency:
    136                     break
    137     for t in threading.enumerate():
    138         if t in merge_threads:
    139             t.join()
    140 
    141     sys.stderr.write('
     All pipeline is done ! 
    ')
    142 
    143 if __name__ == '__main__':
    144     sys.argv.pop(0)
    145     if len(sys.argv) < 1:
    146         sys.stderr.write('Usage: python run_pipeline.py work_path [process_num] 
     process_num default is cpu_count
    ')
    147         sys.exit()
    148     work_path = sys.argv.pop(0)
    149     work_path = os.path.abspath(work_path)
    150     sys.stderr.write('Workdir is %s,pipeline begin
    '%work_path)
    151     sam_barcode_file_list = os.popen('ls %s/Split/sam_barcode.*'%work_path).read().strip().split('
    ')
    152     if len(sys.argv) != 0:
    153         concurrency = int(sys.argv.pop(0))
    154     else:
    155         concurrency = cpu_count()
    156 
    157     main(work_path,sam_barcode_file_list)

    下面是一些辅助程序:

      1 from __future__ import division
      2 from threading import Thread,Lock
      3 from multiprocessing import cpu_count
      4 import threading
      5 import sys
      6 import os
      7 import re
      8 import types
      9 from Bio import SeqIO
     10 from Bio.SeqRecord import SeqRecord
     11 def fq_reads_num(fq_file):
     12     wc_out = os.popen('wc -l %s'%fq_file).read().strip()
     13     result = int(re.search('^(d+)',wc_out).group(1)) / 4
     14     return int(result)
     15 
     16 def Q_ave(self):
     17     Q_sum = 0
     18     for qlist in  self.letter_annotations.itervalues():
     19         for q in qlist:
     20             Q_sum += q
     21         Q_ave = Q_sum / len(self)
     22         return Q_ave
     23 
     24 def QC(file,out_file,out_stat_file,data_type):
     25     SeqRecord.Q_ave = Q_ave
     26     out_stat = open(out_stat_file,'w')
     27     out = open(out_file,'w')
     28 
     29     count = 0
     30     high_count = 0
     31     for record in SeqIO.parse(open(file),'fastq'):
     32         count += 1
     33         if record.Q_ave() < 20:
     34             continue
     35         if len(record) < 220 or len(record) > 500:
     36             continue
     37         out.write(record.format('fastq'))
     38         high_count += 1
     39     high_ratio = high_count / count
     40     out_stat.write('%s	%s	%s	%s
    '%(data_type,count,high_count,high_ratio))
     41 
     42 class MyList(list):
     43     def __str__(self):
     44         out_str = ''
     45         for item in self:
     46             out_str += item
     47             out_str += '	'
     48         return out_str.strip()
     49 
     50 def parse_stat(stat_file):
     51     tabs = os.popen('cat %s'%stat_file).read().strip().split('	')
     52     yield tabs
     53 
     54 def parse_stat_files(compact_path):
     55     for f in os.popen('ls %s/*/*.stat'%compact_path):
     56         stat_file = f.strip()
     57         sample_name =  re.search('%s/(S+)/high_quality.stat'%compact_path,stat_file).group(1)
     58         yield stat_file,sample_name
     59 
     60 def reads_stat(compact_path):
     61     out = open('%s/reads_stat.xls'%compact_path,'w')
     62     sample_reads = {}
     63     out = open('%s/reads_stat.xls'%compact_path,'w')
     64     sample_reads = {}
     65     for stat_file,sample_name in parse_stat_files(compact_path):
     66         for tabs in parse_stat(stat_file):
     67             sample_reads[sample_name] = tabs
     68 
     69     out.write('sample_name	sample_type	raw_reads	HQ_reads	HQ_ratio
    ')
     70     for sample,tabs in sample_reads.iteritems():
     71         tabs = MyList(tabs)
     72         out.write('%s	%s
    '%(sample,str(tabs)))
     73     out.close()
     74 
     75 def raw_stat_thread(fq_file,lock,compact,sample_name,tabs,out):
     76     global total_reads
     77 #    sys.stderr.write('thread %s stat with %s %s
    '%(threading.currentThread().ident,compact,sample_name))
     78     raw_reads = fq_reads_num(fq_file)
     79     lock.acquire()
     80     total_reads += raw_reads
     81     data_type = tabs.pop(0)
     82     ratio = int(tabs[1]) / raw_reads * 100
     83     tabs = str(MyList(tabs))
     84     out.write('%s	%s	%s	%s	%s	%2.2f%%
    '%(compact,sample_name,data_type,raw_reads,tabs,ratio))
     85     lock.release()
     86 #    sys.stderr.write('thread %s finished doing with %s %s
    '%(threading.currentThread().ident,compact,sample_name))
     87 
     88 total_reads = 0
     89 def reads_stat_all(work_path,original_path):
     90     global total_reads
     91     sys.stderr.write('
    merge stat is begin ... 
    ')
     92     out = open('%s/reads_stat.xls'%work_path,'w')
     93     compact_hash = {}
     94     for f in os.listdir(work_path):
     95         compact = f.strip()
     96         compact_path = '%s/%s'%(work_path,compact)
     97         if not os.path.isdir(compact_path):
     98             continue
     99         if not compact_hash.has_key(compact):
    100             compact_hash[compact] = {}
    101         for stat_file,sample_name in parse_stat_files(compact_path):
    102             for tabs in parse_stat(stat_file):
    103                 compact_hash[compact][sample_name] = tabs
    104     out.write('compact	sample_name	data_type	raw_reads	pandaseq_reads	HQ_reads	ratio
    ')
    105     lock = Lock()
    106     active_threads = set()
    107     for compact,sample in compact_hash.iteritems():
    108         sys.stderr.write('doing with %s stat
    '%compact)
    109         for sample_name,tabs in sample.iteritems():
    110             original_fq = os.popen('ls %s/%s/%s/*'%(original_path,compact,sample_name)).read().strip().split('
    ').pop(0)
    111             t = Thread(target=raw_stat_thread,args=(original_fq,lock,compact,sample_name,tabs,out))
    112             active_threads.add(t)
    113             t.start()
    114             while True:
    115                 if threading.activeCount() < cpu_count():
    116                     break
    117     out.flush()
    118     for t in threading.enumerate():
    119         if t in active_threads:
    120             sys.stderr.write('thread %s  is still alive, wait ...
    '%t.ident)
    121             t.join()
    122     sys.stderr.write('Unaligned stating ...
    ')
    123     out.write('
    ###
    ')
    124     unalign_fq = os.popen('ls %s/Unalign/*'%original_path).read().strip().split('
    ').pop(0)
    125     unalign_reads = fq_reads_num(unalign_fq)
    126     total_reads += unalign_reads
    127     ratio = unalign_reads / total_reads * 100
    128     out.write('Unalign	%s	%2.2f%%
    '%(unalign_reads,ratio))
    129     out.close()
    130     sys.stderr.write('merge stat is all finished!
    
    ')
    131 
    132 def pandaseq(pandaseq_soft,read1,read2,fa_out,f_primer,r_primer,log_out):
    133     cmd = '%s -F -f %s -r %s -w %s -p %s -q %s -g %s -l 220 -L 500'%(pandaseq_soft,read1,read2,fa_out,f_primer,r_primer,log_out)
    134     os.system(cmd)
    135 
    136 def sampleMerge(sample_list,data_type,file_path,outfile):
    137     outhandle = open(outfile,'w')
    138 #    sys.stderr.write('Begin merge into %s
    '%file_path)
    139     reads_num = {}
    140     f_template = '%s/%s/high_quality.fq'
    141     for sample in sample_list:
    142         f = f_template%(file_path,sample)
    143         sample = re.sub('[-_]','.',sample)
    144         sample = '%s%s'%(data_type,sample)
    145         if not reads_num.has_key(sample):
    146             reads_num[sample] = 0
    147         for record in SeqIO.parse(open(f),'fastq'):
    148             reads_num[sample] += 1
    149             outhandle.write('>%s_%s
    %s
    '%(sample,reads_num[sample],str(record.seq)))
    150     outhandle.close()
    151     sys.stderr.write('merge file: %s is finished
    '%outfile)
    152 
    153 def get_lib_method(file):
    154     file = os.path.basename(file)
    155     if re.match('^sam_barcode.l$',file):
    156         lib_method = 'Self'
    157     elif re.match('^sam_barcode.sd+$',file):
    158         lib_method = 'HXT'
    159     else:
    160         lib_method = None
    161     return lib_method

    settings.py中包含不同建库方式的引物序列。

    这个程序也算是前几天的学习成果展示了

  • 相关阅读:
    必须为接口 System.Collections.IComparer 实现
    C#编写的windows程序随系统启动的问题
    ReportViewer导出Excel的问题
    英语听力的技巧
    Windows中无法删除文件的解决办法
    RDLC/RDL 动态报表
    Only export to PDF format from ReportViewer addin
    提高ASP.Net网站性能
    关于Spring 国际化 No message found under code 的解决方案
    Axis2中使用WSAddressing协议
  • 原文地址:https://www.cnblogs.com/lyon2014/p/4599782.html
Copyright © 2011-2022 走看看