zoukankan      html  css  js  c++  java
  • 如何线程中通信?

    需求:
    我们通过intrinio.com获取了多支股票csv数据文件,并将其转换为xml文件
    由于全局解释器锁的存在,多线程进行CPU 密集型操作并不能提高执行效率,我们修改程序构架:
    1、使用多个DownloadThread线程进行下载(I/O操作)
    2、使用一个ConvertThread线程进行转换(CPU密集型操作)
    3、下载线程把下载数据安全地传递给转换线程

    思路:
    使用标准库中Queue.Queue,它是一个线程安全的队列,Download线程把下载的数据放入队列,Convert线程从队列中提取数据,它在内部实现了锁,帮助我们完成了同步工作

    代码:

    import csv
    from xml.etree.ElemenTree import Element, ElementTree
    import requests
    from StringIO import StringIO
    from xml_pretty import pretty
    
    from Queue import Queue 
    
    class DownloadThread(Thread):
        def __init__(self,sid,queue):
            Thread.__init__(self)
            self.sid = sid
            self.url = 'http://table.finance.yahoo.com/table.csv?s=%s.sz'
            self.url %= str(sid).rjust(6,'0')
            self.queue = queue
    
        def download(self,url):
            response = requests.get(url,timeout=3)
            if response.ok:
                return StringIO(response.content)
    
        def run(self):
            print('download',self.sid)
            # 1
            data = self.download(self.url)
            # 2 (sid,data)
            # lock
            self.queue.put((self.sid,data))
    
            
    class ConvertThread(Thread,queue):
        def __init__(self):
            Thread.__init__(self)
            self.queue = queue
    
        def csvToxml(self,scsv,fxml):
            reader = csv.reader(scsv)
            header = reader.next()
            headers = map(lambda h: h.replace( , ),headers)
        
            root = Element("Data")
            for row in reader:
                eRow = Element("Row")
                root.append(eRow)
                for tag,text in zip(headers,row):
                    e = Element(tag)
                    e.text = text
                    eRow.append(e)
    
            pretty(root)
            et = ElementTree(root)
            et.write(fxml)
    
        def run(self):
            #1 sid,data
            while True:
                sid,data = self.queue.get()
                print('Convert',sid)
                if sid == -1:
                    break
                if data:
                fname = str(sid).rjust(6,'0') +'.xml'
                with open(fname,'wb') as wf:
                    self.csvToxml(data,wf)
    
    def handle(sid):
        print('Download...(%d)' % sid)
        url = 'http://table.finance.yahoo.com/table.csv?s=%s.sz'
        url %= str(sid).rjust(6,'0')
        rf = download(url)
        if rf is None:return
    
        print('convert to xml...(%d)' % sid)
        fname = str(sid).rjust(6,'0') +'.xml'
        with open(fname,'wb') as wf:
            csvToxml(rf,wf)
    
    q = Queue()
    dThreads = [DownloadThreadI(i,q) for i in xrange(1,11)]
    cThread = ConvertThread(q)
    for t in dThreads:
        t.start()
    cThread.start()
    
    for t in dThreads:
        t.join()
    
    q.put((-1,None))
    
    ==============================================================================================
    import requests
    import base64
    from io import StringIO
    import csv
    from xml.etree.ElementTree import ElementTree, Element, SubElement
    from threading import Thread
    
    apikey = 'OjZlY2MzYTQwNGVlMTI3Y2VkYjMyYTZiNzJiYzdlOTFk'
    
    class DownloadThread(Thread):
        def __init__(self, page_number,queue):
            super().__init__()
            self.page_number = page_number
            self.queue = queue
    
        def run(self):
            csv_file = None
            while not csv_file:
                csv_file = self.download_csv(self.page_number)
            self.queue.put((self.page_number,csv_file))
    
        def download_csv(self,page_number):
            print('download csv data [page=%s]' % page_number)
            url = "https://api.intrinio.com/prices.csv?api_key=OjZlY2MzYTQwNGVlMTI3Y2VkYjMyYTZiNzJiYzdlOTFk&identifier=AAPL&page_size=20&page_number=%s&start_date=2017-09-28&end_date=2020-09-28" % page_number
            # auth = b'Basic ' + base64.b64encode(b'%s' % api_key)
            # headers = {'Authorization' : auth}
            response = requests.get(url)
    
            if response.ok:
                return StringIO(response.text)
    
    class ConvertThread(Thread):
        def __init__(self,queue):
            super().__init__()
            self.queue = queue
    
        def run(self):
            while True:
                page_number,csv_file = self.queue.get()
                if page_number == -1:
                    break
                self.csv_to_xml(csv_file,'data%s.xml' % page_number)
    
        def csv_to_xml(self,csv_file, xml_path):
            print('Convert csv data to %s' % xml_path)
            reader = csv.reader(csv_file)
            headers = next(reader)
    
            root = Element('Data')
            root.text = '
    	'
            root.tail = '
    '
    
            for row in reader:
                book = SubElement(root, 'Row')
                book.text = '
    		'
                book.tail = '
    	'
    
                for tag, text in zip(headers, row):
                    e = SubElement(book, tag)
                    e.text = text
                    e.tail = '
    		'
                e.tail = '
    	'
    
            ElementTree(root).write(xml_path, encoding='utf8')
    
    # def download_and_save(page_number, xml_path):
        # # IO
        # csv_file = None
        # while not csv_file:
            # csv_file = download_csv(page_number)
        # # CPU
        # csv_to_xml(csv_file, 'data%s.xml' % page_number)
    
    # class MyThread(Thread):
        # def __init__(self, page_number, xml_path):
            # super().__init__()
            # self.page_number = page_number
            # self.xml_path = xml_path
    # 
        # def run(self):
            # download_and_save(self.page_number, self.xml_path)
    
    from queue import Queue
    
    if __name__ == '__main__':
        queue = Queue()
        import time
        t0 = time.time()
        thread_list = []
        for i in range(1, 6):
            t = DownloadThread(i,queue)
            t.start()
            thread_list.append(t)
    
        convert_thread= ConvertThread(queue)
        convert_thread.start()
    
        for t in thread_list:
            t.join()
        # for i in range(1, 6):
        #      download_and_save(i, 'data%s.xml' % i)
        print(time.time() - t0)
        print('main thread end.')
        queue.put((-1,None))  # 让自动跳出死循环
    
  • 相关阅读:
    FastDFS 安装与使用
    leecode刷题(18)-- 报数
    时间戳转换日期格式
    嵌入式Linux的FTP服务端软件(stupid-ftpd)
    iMx280A测试声纹
    Linux 版本查询
    Linux下的目录结构
    uboot主Makefile分析
    uboot配置过程详解1
    路由器设置
  • 原文地址:https://www.cnblogs.com/Richardo-M-Q/p/13885101.html
Copyright © 2011-2022 走看看