zoukankan      html  css  js  c++  java
  • 如何在线程间进行事件通知?

    需求:
    在上节课,我们从Intrinio.com下载多支股票的csv数据,并将其转换为xml文件
    额外需求:
    实现一个线程TarThread,将转换出的xml文件打包。比如转换线程每生产出100个xml文件,就通知打包线程将它们打包成一个xxx.tgz文件
    并删除xml文件,打包完成后,打包线程反过来通知转换线程,转换线程继续转换

    思路:
    线程间的事件通知,可以使用标准库中的Threading.Event
    1、等待事件一端调用wait,等待事件
    2、通知事件一端用set,通知事件

    代码:

    import requests
    import base64
    from io import StringIO
    import csv
    from xml.etree.ElementTree import ElementTree, Element, SubElement
    from threading import Thread
    import tarfile
    import os
    
    apikey = 'OjZlY2MzYTQwNGVlMTI3Y2VkYjMyYTZiNzJiYzdlOTFk'
    
    class DownloadThread(Thread):
        def __init__(self, page_number,queue):
            super().__init__()
            self.page_number = page_number
            self.queue = queue
    
        def run(self):
            csv_file = None
            while not csv_file:
                csv_file = self.download_csv(self.page_number)
            self.queue.put((self.page_number,csv_file))
    
        def download_csv(self,page_number):
            print('download csv data [page=%s]' % page_number)
            url = "https://api.intrinio.com/prices.csv?api_key=OjZlY2MzYTQwNGVlMTI3Y2VkYjMyYTZiNzJiYzdlOTFk&identifier=AAPL&page_size=20&page_number=%s&start_date=2017-09-28&end_date=2020-09-28" % page_number
            # auth = b'Basic ' + base64.b64encode(b'%s' % api_key)
            # headers = {'Authorization' : auth}
            response = requests.get(url)
    
            if response.ok:
                return StringIO(response.text)
    
    class ConvertThread(Thread):
        def __init__(self,queue,c_event,t_event):
            super().__init__()
            self.queue = queue
            self.c_event = c_event
            self.t_event = t_event
    
        def run(self):
            count = 0
            while True:
                page_number,csv_file = self.queue.get()
                if page_number == -1:
                    # 对于最后不足三个的情况,也进行打包并等待打包完成
                    self.c_event.set()
                    self.t_event.wait()
                    break
                count += 1
                self.csv_to_xml(csv_file,'data%s.xml' % page_number)
    
                if count == 3:
                    count = 0
                    # 通知转换完成
                    self.c_event.set()
    
                    # 等待打包完成
                    self.t_event.wait()
                    self.t_event.clear()
    
        def csv_to_xml(self,csv_file, xml_path):
            print('Convert csv data to %s' % xml_path)
            reader = csv.reader(csv_file)
            headers = next(reader)
    
            root = Element('Data')
            root.text = '
    	'
            root.tail = '
    '
    
            for row in reader:
                book = SubElement(root, 'Row')
                book.text = '
    		'
                book.tail = '
    	'
    
                for tag, text in zip(headers, row):
                    e = SubElement(book, tag)
                    e.text = text
                    e.tail = '
    		'
                e.tail = '
    	'
    
            ElementTree(root).write(xml_path, encoding='utf8')
    
    class TarThread(Thread):
        def __init__(self,c_event,t_event):
            super().__init__(daemon=True) # 实现成了一个守护线程,在下载线程和转换线程退出后自动退出。
            self.count = 0
            self.c_event = c_event
            self.t_event = t_event
    
        def run(self):
            while True:
                # 等待转换完成
                self.c_event.wait()
                self.c_event.clear()  # 要进行清理,为了下次还可以调用event
    
                # 打包
                self.tar_xml()
    
                # 通知打包完成
                self.t_event.set()
    
    
        def tar_xml(self):
            self.count += 1
            tfname = 'data%s.tgz' % self.count
            print('tar %s...' % tfname)
            tf = tarfile.open(tfname,'w:gz')
            for fname in os.listdir('.'):
                if fname.endswith('.xml'):
                    tf.add(fname)
                    os.remove(fname)
            tf.close()
    
            if not tf.members:
                os.remove(tfname)
    
    # def download_and_save(page_number, xml_path):
        # # IO
        # csv_file = None
        # while not csv_file:
            # csv_file = download_csv(page_number)
        # # CPU
        # csv_to_xml(csv_file, 'data%s.xml' % page_number)
    
    # class MyThread(Thread):
        # def __init__(self, page_number, xml_path):
            # super().__init__()
            # self.page_number = page_number
            # self.xml_path = xml_path
    # 
        # def run(self):
            # download_and_save(self.page_number, self.xml_path)
    
    from queue import Queue
    from threading import  Event
    
    if __name__ == '__main__':
        queue = Queue()
        c_event = Event()
        t_event = Event()
        import time
        t0 = time.time()
        thread_list = []
        for i in range(1,15):
            t = DownloadThread(i,queue)
            t.start()
            thread_list.append(t)
    
        convert_thread= ConvertThread(queue,c_event,t_event)
        convert_thread.start()
    
        tar_thread = TarThread(c_event,t_event)
        tar_thread.start()
    
        for t in thread_list:
            t.join()
        # for i in range(1, 6):
        #      download_and_save(i, 'data%s.xml' % i)
        print(time.time() - t0)
        print('main thread end.')
        queue.put((-1,None))
    
    =========================================
    
    >>> from threading import Thread,Event
    >>> def f(event):
    ...     print('wait event...')
    ...     event.wait()
    ...     print('f end...')
    ... 
    >>> e = Event()
    >>> thread = Thread(target=f, args=(e,))
    >>> thread.start()
    wait event...
    >>> e.set()
    f end...
    >>> thread = Thread(target=f, args=(e,))
    >>> thread.start()
    wait event...
    f end...
    >>> e.clear()
    >>> thread = Thread(target=f, args=(e,))
    >>> thread.start()
    wait event...
    >>> 
    
  • 相关阅读:
    HTML编辑器(1)
    HTML轮播(2)
    HTML轮播(1)
    MVC联想查询绑定下拉框
    Flex布局
    css垂直水平居中问题
    rem,em,px的区别
    css中的定位、清除浮动
    css3的盒模型
    HTML的语义化
  • 原文地址:https://www.cnblogs.com/Richardo-M-Q/p/13948981.html
Copyright © 2011-2022 走看看