需求:
我们通过intrinio.com获取了多支股票csv数据文件,并将其转换为xml文件
由于全局解释器锁的存在,多线程进行CPU 密集型操作并不能提高执行效率,我们修改程序构架:
1、使用多个DownloadThread线程进行下载(I/O操作)
2、使用一个ConvertThread线程进行转换(CPU密集型操作)
3、下载线程把下载数据安全地传递给转换线程
思路:
使用标准库中Queue.Queue,它是一个线程安全的队列,Download线程把下载的数据放入队列,Convert线程从队列中提取数据,它在内部实现了锁,帮助我们完成了同步工作
代码:
import csv
from xml.etree.ElemenTree import Element, ElementTree
import requests
from StringIO import StringIO
from xml_pretty import pretty
from Queue import Queue
class DownloadThread(Thread):
def __init__(self,sid,queue):
Thread.__init__(self)
self.sid = sid
self.url = 'http://table.finance.yahoo.com/table.csv?s=%s.sz'
self.url %= str(sid).rjust(6,'0')
self.queue = queue
def download(self,url):
response = requests.get(url,timeout=3)
if response.ok:
return StringIO(response.content)
def run(self):
print('download',self.sid)
# 1
data = self.download(self.url)
# 2 (sid,data)
# lock
self.queue.put((self.sid,data))
class ConvertThread(Thread,queue):
def __init__(self):
Thread.__init__(self)
self.queue = queue
def csvToxml(self,scsv,fxml):
reader = csv.reader(scsv)
header = reader.next()
headers = map(lambda h: h.replace( , ),headers)
root = Element("Data")
for row in reader:
eRow = Element("Row")
root.append(eRow)
for tag,text in zip(headers,row):
e = Element(tag)
e.text = text
eRow.append(e)
pretty(root)
et = ElementTree(root)
et.write(fxml)
def run(self):
#1 sid,data
while True:
sid,data = self.queue.get()
print('Convert',sid)
if sid == -1:
break
if data:
fname = str(sid).rjust(6,'0') +'.xml'
with open(fname,'wb') as wf:
self.csvToxml(data,wf)
def handle(sid):
print('Download...(%d)' % sid)
url = 'http://table.finance.yahoo.com/table.csv?s=%s.sz'
url %= str(sid).rjust(6,'0')
rf = download(url)
if rf is None:return
print('convert to xml...(%d)' % sid)
fname = str(sid).rjust(6,'0') +'.xml'
with open(fname,'wb') as wf:
csvToxml(rf,wf)
q = Queue()
dThreads = [DownloadThreadI(i,q) for i in xrange(1,11)]
cThread = ConvertThread(q)
for t in dThreads:
t.start()
cThread.start()
for t in dThreads:
t.join()
q.put((-1,None))
==============================================================================================
import requests
import base64
from io import StringIO
import csv
from xml.etree.ElementTree import ElementTree, Element, SubElement
from threading import Thread
apikey = 'OjZlY2MzYTQwNGVlMTI3Y2VkYjMyYTZiNzJiYzdlOTFk'
class DownloadThread(Thread):
def __init__(self, page_number,queue):
super().__init__()
self.page_number = page_number
self.queue = queue
def run(self):
csv_file = None
while not csv_file:
csv_file = self.download_csv(self.page_number)
self.queue.put((self.page_number,csv_file))
def download_csv(self,page_number):
print('download csv data [page=%s]' % page_number)
url = "https://api.intrinio.com/prices.csv?api_key=OjZlY2MzYTQwNGVlMTI3Y2VkYjMyYTZiNzJiYzdlOTFk&identifier=AAPL&page_size=20&page_number=%s&start_date=2017-09-28&end_date=2020-09-28" % page_number
# auth = b'Basic ' + base64.b64encode(b'%s' % api_key)
# headers = {'Authorization' : auth}
response = requests.get(url)
if response.ok:
return StringIO(response.text)
class ConvertThread(Thread):
def __init__(self,queue):
super().__init__()
self.queue = queue
def run(self):
while True:
page_number,csv_file = self.queue.get()
if page_number == -1:
break
self.csv_to_xml(csv_file,'data%s.xml' % page_number)
def csv_to_xml(self,csv_file, xml_path):
print('Convert csv data to %s' % xml_path)
reader = csv.reader(csv_file)
headers = next(reader)
root = Element('Data')
root.text = '
'
root.tail = '
'
for row in reader:
book = SubElement(root, 'Row')
book.text = '
'
book.tail = '
'
for tag, text in zip(headers, row):
e = SubElement(book, tag)
e.text = text
e.tail = '
'
e.tail = '
'
ElementTree(root).write(xml_path, encoding='utf8')
# def download_and_save(page_number, xml_path):
# # IO
# csv_file = None
# while not csv_file:
# csv_file = download_csv(page_number)
# # CPU
# csv_to_xml(csv_file, 'data%s.xml' % page_number)
# class MyThread(Thread):
# def __init__(self, page_number, xml_path):
# super().__init__()
# self.page_number = page_number
# self.xml_path = xml_path
#
# def run(self):
# download_and_save(self.page_number, self.xml_path)
from queue import Queue
if __name__ == '__main__':
queue = Queue()
import time
t0 = time.time()
thread_list = []
for i in range(1, 6):
t = DownloadThread(i,queue)
t.start()
thread_list.append(t)
convert_thread= ConvertThread(queue)
convert_thread.start()
for t in thread_list:
t.join()
# for i in range(1, 6):
# download_and_save(i, 'data%s.xml' % i)
print(time.time() - t0)
print('main thread end.')
queue.put((-1,None)) # 让自动跳出死循环