zoukankan      html  css  js  c++  java
  • python使用hbase

    #coding:utf-8
    __author__ = 'similarface'
    from multiprocessing import Process
    import happybase
    import os
    import re
    import hashlib
    import multiprocessing
    from multiprocessing import Queue
    basedir="/tmp/t8"
    filterpath="/Users/similarface/Documents/20170303Morgene999ProductFullSNP.txt"
    snpkey={}
    pattern_barcode= re.compile(r'[0-9]{3}[-][0-9]{4}[-][0-9]{4}')
    pattern_ls=re.compile(r's+')
    def func(filepath,snpkey):
        conn=happybase.Connection(host='192.168.30.250')
        table=conn.table('chipdata')
        barcodes=pattern_barcode.findall(filepath)
        barcode=barcodes[0]
        i=0
        all=0
        with open(filepath,'rb') as foper:
            for line in foper:
                try:
                    lines=pattern_ls.split(line.strip())
                    chr=lines[1]
                    pos=lines[2]
                    key=chr+":"+pos
                    #print key
                    if key in snpkey:
                        all=all+1
                        m = hashlib.md5()
                        m.update(pos.strip())
                        rowkey = m.hexdigest()+":"+chr.upper()
                        dictkey='d:'+barcode
                        columns=[dictkey]
                        rows_as_dict = dict(table.row(rowkey,columns))
                        if rows_as_dict[dictkey]==lines[3]:
                            i=i+1
                except Exception,e:
                    pass
        print barcode+":"+format((i+0.0)/all,'0.1%')+"match"+str(i)
            #q.put(barcode+":"+format((i+0.0)/all,'0.1%'))
        conn.close()
    
    def read(q):
        while True:
            value = q.get(True)
            print 'Get %s from queue.' % value
    
    
    
    if __name__ == "__main__":
        pool = multiprocessing.Pool(processes = 3)
        snpkey={}
        q = Queue()
        pattern_s=re.compile(r's+')
        with open(filterpath,'rb') as oper:
            for line in oper:
                if line.strip()!="":
                    lines=pattern_s.split(line.strip())
                    snpkey[':'.join(lines[0:2])]=""
    
        # pr = Process(target=read, args=(q,))
        # pr.start()
    
        for filename in os.listdir(basedir):
            if filename.endswith("snp"):
                filterpath=os.path.join(basedir,filename)
                pool.apply_async(func, args=(filterpath,snpkey))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
    
        print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"
        pool.close()
        pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
        print "Sub-process(es) done."
        #pr.terminate()
  • 相关阅读:
    模式的作用就是解耦,解耦,再解耦,让事情变简单、可控制。
    系统的同构性分析
    “以客观事物(object)的形式来组织程序”
    String的indexOf()的三种情况
    关于finally关键字
    openSession和getCurrentSession的区别?
    eclipse括号跳转
    final修饰的类能不能创建一个对象
    使用svn从恢复到某个版本的时候会报错
    关于TableModel的中获取表格数据的问题
  • 原文地址:https://www.cnblogs.com/similarface/p/7053282.html
Copyright © 2011-2022 走看看