zoukankan      html  css  js  c++  java
  • python并发编程之协程

    ---恢复内容开始---

    一、join方法

    (1)开一个主线程

    from threading import Thread,currentThread
    import time
    def walk():
        print('%s is running'%currentThread().getName())
        time.sleep(2)
        print('%s is done'%currentThread().getName())
    if __name__ == '__main__':
        # for i in range(10):
            p=Thread(target=walk)
            p.start()
            p.join()
            print('')  #主线程在等p.join执行
    开一个主线程

    (2)开多个主线程,并发运行

    from threading import Thread,currentThread
    import time
    def walk():
        print('%s is running'%currentThread().getName())
        time.sleep(2)
        print('%s is done'%currentThread().getName())
    if __name__ == '__main__':
        l=[]
        for i in range(10):
            p=Thread(target=walk)
            l.append(p)
            p.start()
        #   p.join()       #在p.start() 跟p.join()就会变为串行,一个一个的运行
        for p in l:
            p.join()
        print('')
    开多个主线程

    (3)并发运行 互斥锁之锁局部的 ,用所只锁住你对共享数据修改的部分

    加锁:

    from threading import Thread,currentThread,Lock
    import time
    n=100
    def walk():
      # 并发运行
        time.sleep(2)
        global n
        mutex.acquire()
          # 串行
        temp=n
        time.sleep(0.01)
        n=temp-1    #数据可能同是减1,可能数据会乱
        mutex.release()
    if __name__ == '__main__':
        mutex=Lock()
        l=[]
        start = time.time()
        for i in range(100):
            p=Thread(target=walk)
            l.append(p)
            p.start()
        for p in l:
            p.join()
        stop = time.time()
        print('n:%s run_time : %s' %(n,stop - start))
    View Code

    不加锁:

    from threading import Thread,currentThread,Lock
    import time
    n=100
    def walk():
        time.sleep(2)
        global n
        # mutex.acquire()
        temp=n
        time.sleep(0.1)
        n=temp-1    #数据可能同是减1,可能数据会乱
        # mutex.release()
    if __name__ == '__main__':
        mutex=Lock()
        start = time.time()
        for i in range(10):
            p=Thread(target=walk)
            p.start()
            p.join()
        stop = time.time()
        print('n:%s run_time : %s' %(n,stop - start))  #至少21秒
    并发运行,不加锁
    主线程运行完毕是在所有线程所在的进程内所有非守护线程运行完毕才运行

    二、GIL本质是一把互斥锁,将并发转成串行,以此来控制同一时间内共享数据只能被一个任务修改,

         进而保证数据的安全

    from threading import Thread,currentThread,Lock
    import time
    n=100
    def work():
        time.sleep(2)
        global n
        time.sleep(0.5)
        mutex.acquire()
        temp=n
        time.sleep(0.1)
        n=temp-1
        mutex.release()
    if __name__ == '__main__':
        mutex=Lock()
        t1=Thread(target=work)
        t2=Thread(target=work)
        t3=Thread(target=work)
        t1.start()
        t2.start()
        t3.start()
    View Code

    三、多线程性能测试

    (1)'''
    多进程
    优点:可以利用多核
    缺点:开销大

    多线程
    优点:开销小
    缺点:不可以利用多核
    '''
    from multiprocessing import Process
    from threading import Thread
    import time
    def work():
        res=0
        for i in range(10000000):
            res+=i
    
    if __name__ == '__main__':
        l=[]
        start=time.time()
        for i in range(4):
            # p=Process(target=work) #0.9260530471801758
            p=Thread(target=work) #0.9260530471801758
            l.append(p)
            p.start()
    
        for p in l:
            p.join()
    
        stop=time.time()
        print('%s' %(stop-start))
    计算机密集型--开启多进程
    from multiprocessing import Process
    from threading import Thread
    import time
    def work():
        time.sleep(2)
    
    if __name__ == '__main__':
        l=[]
        start=time.time()
        for i in range(400):
            p=Process(target=work)
            # p=Thread(target=work)
            l.append(p)
            p.start()
    
        for p in l:
            p.join()
    
        stop=time.time()
        print('%s' %(stop-start))
    I/O密集型---开启多线程

    (2)应用:

                 多线程用于IO密集型,如socket,爬虫,web
                多进程用于计算密集型,如金融分析

    四、死锁与递归锁

    死锁:

    from threading import Thread,RLock
    import time
    mutexA=RLock()
    class MyThread(Thread):
        def run(self):
            self.f1()
            self.f2()
        def f1(self):
            mutexA.acquire()
            print('%s 拿到A锁'%self.name)
            mutexA.acquire()
            print('%s 拿到B锁' % self.name)
            mutexA.release()
            mutexA.release()
        def f2(self):
            mutexA.acquire()
            print('%s 拿到A锁' % self.name)
            time.sleep(1)
            mutexA.acquire()
            print('%s 拿到B锁' % self.name)
            mutexA.release()
            mutexA.release()
    if __name__ == '__main__':
        for i in range(10):
            t=MyThread()
            t.start()
    View Code

    解决死锁的方法

    递归锁:用RLock代替Lock

     1 from threading import Lock,Thread,RLock
     2 import time
     3 # mutexA=Lock()
     4 # mutexB=Lock()
     5 mutexB=mutexA=RLock()
     6 class MyThread(Thread):
     7     def run(self):
     8         self.f1()
     9         self.f2()
    10     def f1(self):
    11         mutexA.acquire()
    12         print('33[32m%s 拿到A锁' %self.name)
    13         mutexB.acquire()
    14         print('33[45m%s 拿到B锁' %self.name)
    15         mutexB.release()
    16         mutexA.release()
    17     def f2(self):
    18         mutexB.acquire()
    19         print('33[32m%s 拿到B锁' %self.name)
    20         time.sleep(1)
    21         mutexA.acquire()
    22         print('33[45m%s 拿到A锁' %self.name)
    23         mutexA.release()
    24         mutexB.release()
    25 if __name__ == '__main__':
    26     for i in range(10):
    27         t=MyThread()
    28         t.start()
    递归锁

    五、信号量

    信号量和进程一样

    信号量就是一把锁,可以有多把钥匙

    from threading import Thread,Semaphore,currentThread
    import time,random
    sm=Semaphore(5)
    def task():
        sm.acquire()
        print('%s 上厕所' %currentThread().getName())
        time.sleep(random.randint(1,3))
        print('%s 走了' %currentThread().getName())
        sm.release()
    if __name__ == '__main__':
        for i in range(20):
            t=Thread(target=task)
            t.start()
    View Code

    六、事件

    Event

    vent.isSet():返回event的状态值;
    event.wait():如果 event.isSet()==False将阻塞线程;
    event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
    event.clear():恢复event的状态值为False。
    from threading import Thread,currentThread,Event
    import time
    e=Event()
    def traffic_ligths():
        time.sleep(1)
        e.set()
    def car():
    
        print('33[45m%s等'%currentThread().getName())
        e.wait()
        print('33[43m%s开'%currentThread().getName())
    if __name__ == '__main__':
        print('绿灯')
        for i in range(10):
            p=Thread(target=car)
            p.start()
        # print('绿灯')
        time.sleep(5)
        print('红灯')
        traffic_ligth=Thread(target=traffic_ligths)
        traffic_ligth.start()
    红绿灯事列
    
    
    from threading import Thread, currentThread, Event
    import time
    e = Event()
    def conn_mysql():
        count = 1
        while not e.is_set():
            if count > 3:
                raise ConnectionError('尝试链接的次数太多了')
            print('33[45m%s 第%s次尝试' % (currentThread().getName(), count))
            e.wait(timeout=1)
            count += 1
        print('33[45m%s 开始链接' %currentThread().getName())
    def check_myql():
        print('33[45m%s 开始检测 my_sql....' %currentThread().getName())
        time.sleep(2)
        e.set()
    if __name__ == '__main__':
        for i in range(2):
            p = Thread(target=conn_mysql)
            p.start()
        p = Thread(target=check_myql)
        p.start()
    链接——sql

    七、定时器

    定时器,是n秒后执行操作

    rom threading import Timer
    
    
    def hello(n):
        print("hello, world",n)
    
    
    t = Timer(3, hello,args=(123,))
    t.start()  # after 1 seconds, "hello, world" will be printed
    定时器

    八、线程queue

    queue队列,用法与进程queue一样

    q=queue.queue()  先进先出

    q=queue.Queue(3) #先进先出
    q.put('first')
    q.put('second')
    q.put('third')
    # q.put('fourth')
    
    print(q.get())
    print(q.get())
    print(q.get())

    q=queue.LifoQueue()   先进后出

    q=queue.LifoQueue() #先进后出
    q.put('first')
    q.put('second')
    q.put('third')
    # q.put('fourth')
    
    print(q.get())
    print(q.get())
    print(q.get())
    put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
    import queue
    q=queue.PriorityQueue()
    q.put((20,'a'))
    q.put((10,'b'))
    q.put((30,'c'))
    print(q.get())
    print(q.get())
    print(q.get())

    ---恢复内容结束---

  • 相关阅读:
    MapReduce实例
    hadoop 分布式安装
    redis缓存
    Flink初始
    Flume初始
    大数据学习之路(持续更新中...)
    使用VisualVM分析性能
    JVM的理解
    Java日记
    UI笔记2
  • 原文地址:https://www.cnblogs.com/mengqingjian/p/7463527.html
Copyright © 2011-2022 走看看