# -*- coding:utf-8 -*-
from multiprocessing import JoinableQueue, Process, Value
import time
DOWNLOAD_EXCEPTION_MINUTE = 3
DEAL_EXCEPTION_MINUTE = 2
class SharedObj(object):
def __init__(self, id):
self.id = id
pass
from collections import namedtuple
SHARED_OBJ = namedtuple("shared_obj", ["id"])
def download_process(produce_queue, minute_list, sharded_time_cost, shared_is_exception):
def process(minute_name):
print("Begin download process minute:%s." % minute_name)
# if minute_name == DOWNLOAD_EXCEPTION_MINUTE:
# raise Exception("Exception in minute_name:%s when download process." % minute_name)
time.sleep(10)
print("Success download progress minute:%s." % minute_name)
try:
start_time = time.time()
for minute_name in minute_list:
if shared_is_exception.value:
print("Some other error happened when begin download minute:%s." % minute_name)
break
process(minute_name)
produce_queue.put(minute_name)
end_time = time.time()
with sharded_time_cost.get_lock():
sharded_time_cost.value = int(end_time - start_time)
except Exception as e:
print(e)
if shared_is_exception.value:
print("Some other error happened.")
with shared_is_exception.get_lock():
shared_is_exception.value = 1
#
produce_queue.put(None)
produce_queue.join()
def deal_process(produce_queue, shared_deal_time_cost, shared_done_minute_name, shared_done_count, shared_is_exception,
s):
def process():
print("Begin deal minute:%s." % minute_name)
# if minute_name == DEAL_EXCEPTION_MINUTE:
# raise Exception("Exception happened in minute:%s, when deal." % minute_name)
print(s)
time.sleep(5)
print("Done for deal minute:%s." % minute_name)
while True:
minute_name = produce_queue.get()
#
print("-----deal process----")
print("shared_deal_time_cost:%s." % shared_deal_time_cost.value)
print("shared_done_minute_name:%s." % shared_done_minute_name.value)
print("shared_done_count:%s." % shared_done_count.value)
print("shared_is_exception:%s." % shared_is_exception.value)
print("---------------------")
#
if minute_name is None:
print("Get empty minute name, done for deal.")
produce_queue.task_done()
break
try:
begin_time = time.time()
if not shared_is_exception.value:
process()
else:
print("shared_is_exception is true, exception is happened; continue...")
end_time = time.time()
#
with shared_deal_time_cost.get_lock():
shared_deal_time_cost.value += int(end_time - begin_time)
with shared_done_minute_name.get_lock():
shared_done_minute_name.value = minute_name
with shared_done_count.get_lock():
shared_done_count.value += 1
except Exception as e:
print (e)
if not shared_is_exception.value:
with shared_is_exception.get_lock():
shared_is_exception.value = 1
finally:
#
produce_queue.task_done()
class TestObj(object):
def __init__(self):
self.shared_done_minute = Value("i", 0)
self.shared_done_count = Value("i", 0)
self.shared_download_time_cost = Value("i", 0)
self.shared_deal_time_cost = Value("i", 0)
self.shared_is_exception = Value("i", 0)
def main_process(self, ):
minute_list = [1, 2, 3, 4, 5]
#
produce_queue = JoinableQueue()
download_processor = Process(target=download_process,
args=(produce_queue, minute_list, self.shared_download_time_cost,
self.shared_is_exception,))
#
s = SharedObj(id=1)
print(s)
deal_processor = Process(target=deal_process, args=(
produce_queue, self.shared_deal_time_cost, self.shared_done_minute, self.shared_done_count,
self.shared_is_exception, s,))
start_time = time.time()
#
download_processor.daemon = True
deal_processor.daemon = True
#
download_processor.start()
deal_processor.start()
#
download_processor.join()
deal_processor.join()
#
end_time = time.time()
print("Main time cost:%s." % (end_time - start_time))
def main():
test_obj = TestObj()
test_obj.main_process()
if __name__ == '__main__':
main()