zoukankan      html  css  js  c++  java
  • 线程测试

    #开进程的方法一:
    # import time
    # import random
    # from multiprocessing import Process
    # def piao(name):
    # print('%s piaoing' %name)
    # time.sleep(random.randrange(1,5))
    # print('%s piao end' %name)
    #
    #
    #
    # p1=Process(target=piao,args=('egon',)) #必须加,号
    # p2=Process(target=piao,args=('alex',))
    # p3=Process(target=piao,args=('wupeqi',))
    # p4=Process(target=piao,args=('yuanhao',))
    #
    # p1.start()
    # p2.start()
    # p3.start()
    # p4.start()
    # print('主线程')


    #开进程的方法二:
    # import time
    # import random
    # from multiprocessing import Process
    #
    #
    # class Piao(Process):
    # def __init__(self,name):
    # super().__init__()
    # self.name=name
    # def run(self):
    # print('%s piaoing' %self.name)
    #
    # time.sleep(random.randrange(1,5))
    # print('%s piao end' %self.name)
    #
    # p1=Piao('egon')
    # p2=Piao('alex')
    # p3=Piao('wupeiqi')
    # p4=Piao('yuanhao')
    #
    # p1.start() #start会自动调用run
    # p2.start()
    # p3.start()
    # p4.start()
    # print('主线程')

    # import multiprocessing
    #
    # def worker(num):
    # """thread worker function"""
    # print('Worker:', num)
    # return
    #
    # if __name__ == '__main__':
    # jobs = []
    # for i in range(5):
    # p = multiprocessing.Process(target=worker, args=(i,))
    # jobs.append(p)
    # p.start()


    # 创建线程
    #
    # Python提供两个模块进行多线程的操作,分别是thread和threading, 前者是比较低级的模块,用于更底层的操作,一般应用级别的开发不常用。
    #
    # 第一种方法是创建threading.Thread的子类,重写run方法。
    # import time,threading
    # class MyThread(threading.Thread):
    # def run(self):
    # for i in range(5):
    # print('thread {},@number: {}'.format(self.name,i))
    # time.sleep(1)
    # def main():
    # print('Start main threading')
    # #创建三个线程
    # threads = [MyThread() for i in range(3)]
    # #启动三个线程
    # for t in threads:
    # t.start()
    # print('End Main threading')
    #
    # if __name__ =='__main__':
    # main()

    # 线程合并(join方法)
    #
    # 主线程结束后,子线程还在运行,join方法使得主线程等到子线程结束 时才退出。
    # import time,threading
    # class MyThread(threading.Thread):
    # def run(self):
    # for i in range(5):
    # print('thread {},@number: {}'.format(self.name,i))
    # time.sleep(1)
    # def main():
    # print('Start main threading')
    # #创建三个线程
    # threads = [MyThread() for i in range(3)]
    # #启动三个线程
    # for t in threads:
    # t.start()
    # for t in threads:
    # t.join()
    # print('End Main threading')
    # #一次让新创建的线程执行join
    #
    #
    # if __name__ =='__main__':
    # main()

    # 线程同步与互斥锁
    #
    # 为了避免线程不同步造成是数据不同步,可以对资源进行加锁。 也就是访问资源的线程需要获得锁,才能访问。 threading模块正好提供了一个Lock功能
    #
    # mutex = threading.Lock()
    # 在线程中获取锁
    #
    # mutex.acquire()
    # 使用完后,释放锁
    #
    # mutex.release()
    # 可重入锁
    #
    # 为了支持在同一线程中多次请求同一资源, python提供了可重入锁(RLock)。 RLock内部维护着一个Lock和一个counter变量, counter记录了acquire的次数,从而使得资源可以被多次require。 直到一个线程所有的acquire都被release,其他的线程才能获得资源。
    #

    #
    # import time,threading
    # # 创建RLock
    # mutex = threading.RLock()
    # class MyThread(threading.Thread):
    # #线程内多次进入锁和释放锁
    # def run(self):
    # if mutex.acquire(2):
    # print("thread {} get mutex".format(self.name))
    # time.sleep(1)
    # mutex.acquire()
    # mutex.release()
    # mutex.release()
    # # for i in range(5):
    # # print('thread {},@number: {}'.format(self.name,i))
    # # time.sleep(1)
    # def main():
    # print('Start main threading')
    # #创建三个线程
    # threads = [MyThread() for i in range(3)]
    # #启动三个线程
    # for t in threads:
    # t.start()
    # for t in threads:
    # t.join()
    # print('End Main threading')
    # #一次让新创建的线程执行join
    #
    #
    # if __name__ =='__main__':
    # main()
    #
    # 条件变量
    #
    # 实用锁可以达到线程同步,前面的互斥锁就是这种机制。更复杂的环境,需要针对锁进行一些条件判断。Python提供了Condition对象。它除了具有acquire和release方法之外,还提供了wait和notify方法。线程首先acquire一个条件变量锁。如果条件不足,则该线程wait,如果满足就执行线程,甚至可以notify其他线程。其他处于wait状态的线程接到通知后会重新判断条件。
    #
    # 条件变量可以看成不同的线程先后acquire获得锁,如果不满足条件,可以理解为被扔到一个(Lock或RLock)的waiting池。直达其他线程notify之后再重新判断条件。该模式常用于生成消费者模式:
    # import time,threading,random
    # queue = []
    # con = threading.Condition()
    #
    # class Producer(threading.Thread):
    # def run(self):
    # while True:
    # if con.acquire():
    # if len(queue) > 100:
    # con.wait()
    # else:
    # elem = random.randrange(100)
    # queue.append(elem)
    # print("Producer a elem {}, Now size is {}".format(elem,len(queue)))
    # time.sleep(random.random())
    # con.notify()
    # con.release()
    #
    # class Consumer(threading.Thread):
    # def run(self):
    # while True:
    # if con.acquire():
    # if len(queue) < 0:
    # con.wait()
    # else:
    # elem = queue.pop()
    # print("Consumer a elem {},Now size is {}".format(elem,len(queue)))
    # time.sleep(random.random())
    # con.notify()
    # con.release()
    # def main():
    # for i in range(3):
    # Producer().start()
    # for i in range(2):
    # Consumer().start()
    #
    # if __name__ =='__main__':
    # main()

    # 队列
    #
    # 带锁的队列Queue。 创建10个元素的队列

    # import time,threading,queue
    # queue=queue.Queue(10)
    #
    # class MyThread(threading.Thread):
    # def __init__(self,event):
    # super(MyThread,self).__init__()
    # self.event = event
    #
    # def run(self):
    # print("thread {} is ready".format(self.name))
    # self.event.wait()
    # print("thread {} run".format(self.name))
    # signal = threading.Event()
    #
    # def main():
    # start = time.time()
    # for i in range(3):
    # t = MyThread(signal)
    # t.start()
    # time.sleep(3)
    # print("after {}s".format(time.time()-start))
    # signal.set()
    #
    # if __name__=="__main__":
    # main()
    #
    # 后台线程
    #
    # 默认情况下,主线程退出之后,即使子线程没有join。那么主线程结束后, 子线程也依然会继续执行。如果希望主线程退出后, 其子线程也退出而不再执行,则需要设置子线程为后台线程。python提供了setDeamon方法。

    #进程
    # python中的多线程其实并不是真正的多线程,如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。Python提供了非常好用的多进程包multiprocessing,只需要定义一个函数,Python会完成其他所有事情。借助这个包,可以轻松完成从单进程到并发执行的转换。multiprocessing支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。
    #
    # 类Process
    #
    # 创建进程的类:Process([group [, target [, name [, args [, kwargs]]]]]), target表示调用对象,args表示调用对象的位置参数元组。 kwargs表示调用对象的字典。name为别名。group实质上不使用。
    #
    # 方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,Process以start()启动某个进程。
    #
    # 属性:authkey、daemon(要通过start()设置)、exitcode(进程在运行时为None、如果为–N,表示被信号N结束)、name、pid。其中daemon是父进程终止后自动终止,且自己不能产生新进程,必须在start()之前设置。
    #
    # 例:创建函数并将其作为单个进程

    # import multiprocessing,time
    # def worker(interval):
    # n = 5
    # while n > 0:
    # print("The time is {0}".format(time.time()))
    # time.sleep(interval)
    # n -= 1
    # if __name__ == '__main__':
    # p = multiprocessing.Process(target= worker, args=(3,),name='马大锤')
    # p.start()
    # print("p.pid",p.pid)
    # print("p.name",p.name)
    # #该函数返回当前线程是否存活状态,比较重要
    # print("p.is_alive",p.is_alive())

    # import multiprocessing,time
    #
    # def worker_1(interval):
    # print("worker_1")
    # time.sleep(interval)
    # print("end worker_1")
    #
    # def worker_2(interval):
    # print("worker_2")
    # time.sleep(interval)
    # print("end worker_2")
    #
    # def worker_3(interval):
    # print("worker_3")
    # time.sleep(interval)
    # print("end worker_3")
    #
    # if __name__ == '__main__':
    # p1 = multiprocessing.Process(target=worker_1,args=(2,))
    # p2 = multiprocessing.Process(target=worker_2, args=(3,))
    # p3 = multiprocessing.Process(target=worker_3, args=(4,))
    #
    # p1.start()
    # p2.start()
    # p3.start()
    # #另外你还可以通过 cpu_count() 方法还有 active_children() 方法获取当前机器的 CPU 核心数量以及得到目前所有的运行的进程。
    # print("The number of CPU is:" + str(multiprocessing.cpu_count()))
    # for p in multiprocessing.active_children():
    # print("child p.name:" + p.name + " p.id" + str(p.pid))
    # print("END!!!!!!!!!!!!!!!!!!")

    #例:将进程定义为类
    # import multiprocessing,time
    # class ClockProcess(multiprocessing.Process):
    # def __init__(self,interval):
    # multiprocessing.Process.__init__(self)
    # self.interval = interval
    #
    # def run(self):
    # n = 5
    # while n > 0:
    # print("the time is {0}".format(time.time()))
    # time.sleep(self.interval)
    # n -= 1
    #
    # if __name__ == '__main__':
    # p = ClockProcess(3)
    # p.start()
    # 注:进程p调用start()时,自动调用run()

    # import multiprocessing,time
    # def worker(interval):
    # print("work start:{0}".format(time.ctime()))
    # time.sleep(interval)
    # print('work end:{0}'.format(time.ctime()))
    #
    # if __name__ == '__main__':
    # p = multiprocessing.Process(target= worker,args=(3,))
    # p.daemon = True
    # p.start()
    # print('end!!!!!!!!1')
    #注:因子进程设置了daemon属性,主进程结束,它们就随着结束了。



    # #设置daemon执行完结束的方法,加join等待主进程执行结束再终止
    # import multiprocessing,time
    # def worker(interval):
    # print("work start:{0}".format(time.ctime()))
    # time.sleep(interval)
    # print('work end:{0}'.format(time.ctime()))
    #
    # if __name__ == '__main__':
    # p = multiprocessing.Process(target= worker,args=(3,))
    # daemon默认设置为False 是意思不设置守护进程的意思
    # p.daemon = True
    # p.start()
    # p.join()
    #
    # print('end!!!!!!!!')

    #Lock
    #当多个进程需要访问共享资源的时候,Lock可以用来避免访问的冲突。
    # 一、多线程同步
    # 由于CPython的python解释器在单线程模式下执行,所以导致python的多线程在很多的时候并不能很好地发挥多核cpu的资源。大部分情况都推荐使用多进程。
    # python的多线程的同步与其他语言基本相同,主要包含:
    # Lock & RLock :用来确保多线程多共享资源的访问。
    # Semaphore : 用来确保一定资源多线程访问时的上限,例如资源池。
    # Event : 是最简单的线程间通信的方式,一个线程可以发送信号,其他的线程接收到信号后执行操作。
    # 二、实例
    # 1)Lock & RLock
    # Lock对象的状态可以为locked和unlocked
    # 使用acquire()设置为locked状态;
    # 使用release()设置为unlocked状态。
    # 如果当前的状态为unlocked,则acquire()会将状态改为locked然后立即返回。当状态为locked的时候,acquire()将被阻塞直到另一个线程中调用release()来将状态改为unlocked,然后acquire()才可以再次将状态置为locked。
    # Lock.acquire(blocking=True, timeout=-1),blocking参数表示是否阻塞当前线程等待,timeout表示阻塞时的等待时间 。如果成功地获得lock,则acquire()函数返回True,否则返回False,timeout超时时如果还没有获得lock仍然返回False。

    # import multiprocessing,sys
    #
    # def worker_with(lock,f):
    # with lock:
    # fs = open(f,'a+')
    # n = 10
    # while n > 1:
    # fs.write("Lockd acquired via with ")
    # n -= 1
    # fs.close()
    # def worker_no_with(lock,f):
    # lock.acquire()
    # try:
    # fs = open(f,'a+')
    # n = 10
    # while n > 1:
    # fs.write("Lockd acquired via with ")
    # n -= 1
    # fs.close()
    # finally:
    # lock.release()
    #
    # if __name__ == '__main__':
    # lock = multiprocessing.Lock()
    # f = "file.txt"
    # w = multiprocessing.Process(target=worker_with,args=(lock,f))
    # nw = multiprocessing.Process(target=worker_no_with,args=(lock,f))
    # w.start()
    # nw.start()
    # print("end!!!!!!!!!!!!!!!!!!!!!!!")


    #Semaphore

    #Semaphore用来控制对共享资源的访问数量,例如池的最大连接数。
    # import multiprocessing,time
    # def worker(s,i):
    # s.acquire()
    # print(multiprocessing.current_process().name + "acquire")
    # time.sleep(i)
    # print(multiprocessing.current_process().name + "release ")
    # s.release()
    #
    # if __name__ == '__main__':
    # s = multiprocessing.Semaphore(2)
    # for i in range(5):
    # p = multiprocessing.Process(target=worker,args=(s,i*2))
    # p.start()

    #Event
    #Event用来实现进程间同步通信。
    #Python线程event
    # python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法wait、clear、set
    #
    # 事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为
    # False,那么当程序执行
    # event.wait
    # 方法时就会阻塞,如果“Flag”值为True,那么event.wait
    # 方法时便不再阻塞。
    #
    # clear:将“Flag”设置为False
    # set:将“Flag”设置为True
    #
    # 用
    # threading.Event
    # 实现线程间通信
    # 使用threading.Event可以使一个线程等待其他线程的通知,我们把这个Event传递到线程对象中,
    # Event默认内置了一个标志,初始值为False。
    # 一旦该线程通过wait()
    # 方法进入等待状态,直到另一个线程调用该Event的set()
    # 方法将内置标志设置为True时,
    # 该Event会通知所有等待状态的线程恢复运行。


    # import multiprocessing,time
    # def wait_for_event(e):
    # print("wait_for_event: starting")
    # e.wait()
    # print("wait_for_event: e.is_set()->" + str(e.is_set()))
    # e.clear()
    # print("wait_for_event: ending")
    # print("wait_for_event: e.is_set()->" + str(e.is_set()))
    #
    # def wait_for_event_timeout(e,t):
    # print("wait_for_event_timeout:starting")
    # e.wait()
    # print("wait_for_event_timeout:e.is_set" + str(e.is_set()))
    # e.clear()
    # print("wait_for_event_timeout:ending")
    # print("wait_for_event_timeout:e.is_set" + str(e.is_set()))
    #
    # if __name__ == '__main__':
    # e = multiprocessing.Event()
    # w1 = multiprocessing.Process(name="block",
    # target=wait_for_event,
    # args=(e,))
    # w2 = multiprocessing.Process(name="non_block",
    # target=wait_for_event_timeout,
    # args=(e,2))
    #
    # w1.start()
    # w2.start()
    #
    # time.sleep(3)
    #
    # e.set()
    # print("manin: event is set")

    #Queue

    # Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
    #
    # get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常。Queue的一段示例代码:

    # import multiprocessing
    #
    # def writer_proc(q):
    # try:
    # q.put(9,block = False)
    # except:
    # pass
    # def reader_proc(q):
    # try:
    # print(q.get(block =False))
    # except:
    # pass
    #
    # if __name__ == '__main__':
    # q = multiprocessing.Queue()
    # writer = multiprocessing.Process(target=writer_proc,
    # args=(q,))
    # writer.start()
    # reader = multiprocessing.Process(target=reader_proc,
    # args=(q,))
    # reader.start()
    #
    # reader.join()
    # writer.join()

    # Pipe
    #
    # Pipe方法返回(conn1, conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。duplex为False,conn1只负责接受消息,conn2只负责发送消息。
    #
    # send和recv方法分别是发送和接受消息的方法。例如,在全双工模式下,可以调用conn1.send发送消息,conn1.recv接收消息。如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出EOFError。
    # import multiprocessing,time
    # def proc1(pipe):
    # while True:
    # for i in range(10):
    # print("send: %s" %(i))
    # pipe.send(i)
    # time.sleep(1)
    # def proc2(pipe):
    # while True:
    # print("proc2 rev",pipe.recv())
    # time.sleep(1)
    #
    # def proc3(pipe):
    # while True:
    # print("proc3 rev",pipe.recv())
    # time.sleep(1)
    #
    # if __name__ == '__main__':
    # pipe = multiprocessing.Pipe()
    # P1 = multiprocessing.Process(target=proc1,args=(pipe[0],))
    # P2 = multiprocessing.Process(target=proc2, args=(pipe[1],))
    # P3 = multiprocessing.Process(target=proc3, args=(pipe[1],))
    #
    # P1.start()
    # P2.start()
    # P3.start()
    #
    # P1.join()
    # P2.join()
    # P3.join()

    # Pool
    #
    # 在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。 Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它
    #例:使用进程池
    # import multiprocessing,time
    #
    # def func(msg):
    # print("msg:",msg)
    # time.sleep(3)
    # print("end")
    #
    # if __name__ == '__main__':
    # pool = multiprocessing.Pool(processes=10)
    # for i in range(4):
    # msg = "hello %d" %(i)
    # pool.apply_async(func,(msg,)) #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
    # #pool.apply(func, (msg,)) # 维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去(阻塞模式)
    #
    # print("Mark~ Mark~ -------------------------")
    # pool.close()
    # pool.join()
    # print("Sub-process(es) done.")
  • 相关阅读:
    随笔—邀请赛前训— Codeforces Round #330 (Div. 2) B题
    随笔—邀请赛前训— Codeforces Round #330 (Div. 2) Vitaly and Night
    General Problem Solving Techniques [Examples]~C
    General Problem Solving Techniques [Examples]~A
    General Problem Solving Techniques [Beginner-1]~B
    General Problem Solving Techniques [Beginner-1]~A
    General Problem Solving Techniques [Beginner-1]~E
    General Problem Solving Techniques [Beginner-1]~F
    2015 HUAS Summer Contest#5~C
    2015 HUAS Summer Contest#5~B
  • 原文地址:https://www.cnblogs.com/Bruce-yin/p/7101769.html
Copyright © 2011-2022 走看看