import threading
import time
class BlockingTestThread(threading.Thread):
def __init__(self,i,b):
self._running_flag = False
self.stop = threading.Event()
self.i = i
self.b = b
threading.Thread.__init__(self, target=self.test_method)
def test_method(self):
try:
#print(a,b,threading.current_thread().name)
while(not self.stop.wait(1)):
self._running_flag = True
print(i*2,"wwww",self.b+10,threading.current_thread().name)
print("Start wait")
print(threading.active_count(), )
print(threading.current_thread().name)
print(threading.enumerate())
self.stop.wait(50)
print("Done waiting")
finally:
self._running_flag = False
def terminate(self):
self.stop.set()
if __name__ == "__main__":
list_thread = []
for i in range(5):
thread = BlockingTestThread(i,i*3)
#thread.start()
#thread.test_method(i,i*2)
print(thread)
list_thread.append(thread)
print(threading.active_count(), i,"before end")
print(threading.current_thread().name)
print(threading.enumerate())
#print(thread, "end")
for item in list_thread:
print(item)
item.start()
print(item.name,"name")
pass
time.sleep(5)
while threading.active_count()!=1:
time.sleep(1)
count = 0
for thread1 in threading.enumerate():
if count == 0 and thread1.name != "MainThread":
print(threading.active_count(), "before end")
print(threading.current_thread().name)
print(threading.enumerate())
print(thread1, "end")
thread1.terminate()
time.sleep(1)
print(threading.active_count(), "after end")
print(threading.current_thread().name)
print(threading.enumerate())
count = count +1
print("Time sleep 2")
#thread.terminate()
print("Joining thread")
#thread.join()
print("Done Joining thread")