zoukankan      html  css  js  c++  java
  • python并发编程之多线程二

    一,开启线程的两种方式

    方法一:

    from threading import Thread
    import random,time
    def eat(name):
        print('%s is eating......'%name)
        time.sleep(random.randint(1,5))
        print('%s had end....'%name)
    if __name__ == '__main__':
        t=Thread(target=eat,args=('xiaoming',))#创建线程
        t.start() #开启线程
        print('主线程')

    方法二:

    from threading import Thread
    import random,time
    class Eat(Thread):
        def __init__(self,name):
            super().__init__()
            self.name=name
        def run(self):
            print('%s is eating......' % self.name)
            time.sleep(random.randint(6, 15))
            print('%s had end....' % self.name)
    if __name__ == '__main__':
        t=Eat('xiaoming')
        t.start()
        print('主线程')

    二,线程相关的其他方法

    Thread实例对象的方法
      # isAlive(): 返回线程是否活动的。
      # getName(): 返回线程名。
      # setName(): 设置线程名。
    
    threading模块提供的一些方法:
      # threading.currentThread(): 返回当前的线程变量。
      # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
      # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果

    三,守护线程

    设置一个线程为守护线程,那么它会等本进程中所有非守护线程运行结束之后才会被主线程给回收,而守护进程会在父进程代码运行完毕

    的时候就会被回收。

    from threading import Thread,currentThread
    import time
    def talk1():
        print('%s is running'%currentThread().getName())
        time.sleep(3)
        print('%s is end'%currentThread().getName())
    def talk2():
        print('%s is running'%currentThread().getName())
        time.sleep(3)
        print('%s is end'%currentThread().getName())
    
    if __name__ == '__main__':
        t1=Thread(target=talk1)
        t2=Thread(target=talk2)
        t1.daemon=True #设置守护线程
        t1.start()
        t2.start()
        print('主线程')

    四,同步锁

    1.线程抢的是GIL锁,GIL锁相当于执行权限,拿到执行权限后才能拿到互斥锁Lock,其他线程也可以抢到GIL,但如果发现Lock仍然没有被释放则阻塞,即便是拿到执行权限GIL也要立刻交出来
    
    2.join是等待所有,即整体串行,而锁只是锁住修改共享数据的部分,即部分串行,要想保证数据安全的根本原理在于让并发变成串行,join与互斥锁都可以实现,毫无疑问,互斥锁的部分串行效率要更高
    

     

    # from threading import Thread,Lock
    # import time
    # n=100
    # def work():
    #     mutex.acquire()
    #     global n
    #     temp=n
    #     time.sleep(0.1)
    #     n=temp-1
    #     mutex.release()
    # if __name__ == '__main__':
    #     mutex=Lock()
    #     t_l=[]
    #     for i in range(100):
    #         t=Thread(target=work)
    #         t_l.append(t)
    #         t.start()
    #     for t in t_l:
    #         t.join()
    #     print(n)
    
    
    from threading import Thread,Lock
    import time
    n=100
    def work():
        time.sleep(3)
        global n
        mutex.acquire()
        temp=n
        time.sleep(0.1)
        n=temp-1
        mutex.release()
    if __name__ == '__main__':
        strat=time.time()
        mutex=Lock()
        t_l=[]
        for i in range(100):
            t=Thread(target=work)
            t_l.append(t)
            t.start()
        for t in t_l:
            t.join()
        stop=time.time()
        print(n)
        print('运行了:%s 秒'%(stop-strat))
    互斥锁与join锁

    五,死锁现象与递归锁

    所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁

    from threading import Thread,Lock
    import time
    mutexA=Lock()
    mutexB=Lock()
    
    class MyThread(Thread):
        def run(self):
            self.func1()
            self.func2()
        def func1(self):
            mutexA.acquire()
            print('33[41m%s 拿到A锁33[0m' %self.name)
    
            mutexB.acquire()
            print('33[42m%s 拿到B锁33[0m' %self.name)
            mutexB.release()
    
            mutexA.release()
    
        def func2(self):
            mutexB.acquire()
            print('33[43m%s 拿到B锁33[0m' %self.name)
            time.sleep(2)
    
            mutexA.acquire()
            print('33[44m%s 拿到A锁33[0m' %self.name)
            mutexA.release()
    
            mutexB.release()
    
    if __name__ == '__main__':
        for i in range(10):
            t=MyThread()
            t.start()
    
    '''
    Thread-1 拿到A锁
    Thread-1 拿到B锁
    Thread-1 拿到B锁
    Thread-2 拿到A锁
    然后就卡住,死锁了
    '''
    死锁

    解决方法,递归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。

    这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁:

    from threading import Thread,RLock
    import time
    mutex=RLock()
    class MyThread(Thread):
    
        def run(self):
            self.func1()
            self.func2()
        def func1(self):
            mutex.acquire()
            time.sleep(1)
            print('%s 拿到锁'%self.name)
            mutex.acquire()
            print('%s 拿到锁' % self.name)
            mutex.release()
            print('%s 释放锁' % self.name)
            mutex.release()
            print('%s 释放锁' % self.name)
        def func2(self):
            mutex.acquire()
    
            print('%s 拿到锁'%self.name)
            mutex.acquire()
            print('%s 拿到锁' % self.name)
            mutex.release()
            print('%s 释放锁' % self.name)
            mutex.release()
            print('%s 释放锁' % self.name)
    
    if __name__ == '__main__':
    
        l=[]
        for i in range(4):
            t = MyThread()
            t.start()
            l.append(t)
        for t in l:
            t.join()
    递归锁

    六,信号量Semaphore

    同进程池一样

    Semaphore管理一个内置的计数器,
    每当调用acquire()时内置计数器-1;
    调用release() 时内置计数器+1;
    计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。

    实例:(同时只有5个线程可以获得semaphore,即可以限制最大连接数为5):

    from threading import Thread,Semaphore
    import threading
    import time
    # def func():
    #     if sm.acquire():
    #         print (threading.currentThread().getName() + ' get semaphore')
    #         time.sleep(2)
    #         sm.release()
    def func():
        sm.acquire()
        print('%s get sm' %threading.current_thread().getName())
        time.sleep(3)
        sm.release()
    if __name__ == '__main__':
        sm=Semaphore(5)
        for i in range(23):
            t=Thread(target=func)
            t.start()
    View Code

    七,Event

    event.isSet():返回event的状态值;
    
    event.wait():如果 event.isSet()==False将阻塞线程;
    
    event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
    
    event.clear():恢复event的状态值为False。
    from threading import Thread,Event,currentThread
    import time
    e=Event()
    def conn_my_sql():
        count=1
        while not e.is_set():
            if count>3:
                raise ConnectionError('链接超时')
            e.wait(timeout=1)
            print('%s 正在进行第%s次链接'%(currentThread().getName(),count))
            count+=1
        print('链接成功')
    def check_my_sql():
        print('准备链接.....')
        time.sleep(3)
        e.set()
    
    if __name__ == '__main__':
        for i in range(4):
            t=Thread(target=conn_my_sql)
            t.start()
        t=Thread(target=check_my_sql)
        t.start()
    View Code

    八,定时器

    定时器,指定n秒后执行某操作

    复制代码
    from threading import Timer
     
     
    def hello():
        print("hello, world")
     
    t = Timer(1, hello)
    t.start()  # after 1 seconds, "hello, world" will be printed
    复制代码

    九,线程Queue

    import queue
    
    q=queue.Queue()
    q.put('first')
    q.put('second')
    q.put('third')
    
    print(q.get())
    print(q.get())
    print(q.get())
    '''
    结果(先进先出):
    first
    second
    third
    '''
    先进先出,队列形式
    import queue
    
    q=queue.LifoQueue()
    q.put('first')
    q.put('second')
    q.put('third')
    
    print(q.get())
    print(q.get())
    print(q.get())
    '''
    结果(后进先出):
    third
    second
    first
    '''
    先进后出,堆栈形式
    import queue
    
    q=queue.PriorityQueue()
    #put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
    q.put((20,'a'))
    q.put((10,'b'))
    q.put((30,'c'))
    
    print(q.get())
    print(q.get())
    print(q.get())
    '''
    结果(数字越小优先级越高,优先级高的优先出队):
    (10, 'b')
    (20, 'a')
    (30, 'c')
    '''
    指定优先级形式

    十,Python标准模块--concurrent.futures

    from concurrent.futures import ThreadPoolExecutor
    import time,os
    def task(n):
        print('%s is running'%os.getpid())
        time.sleep(2)
        return n**2
    if __name__ == '__main__':
        start=time.time()
        p=ThreadPoolExecutor()
        l=[]
        for i in range(20):
            obj=p.submit(task,i)
            l.append(obj)
        p.shutdown()
        print([obj.result() for obj in l])
        print(time.time()-start)
    ThreadPoolExecutor
    from concurrent.futures import ProcessPoolExecutor
    import time,random,os
    def task(n):
        print('%s is running'%os.getpid())
        time.sleep(random.randint(1,3))
        return n**2
    
    if __name__ == '__main__':
        p=ProcessPoolExecutor()
        l=[]
        for i in range(10):
            obj=p.submit(task,i)
            l.append(obj)
        p.shutdown()
        print([obj.result() for obj in l])
    ProcessPoolExecutor
  • 相关阅读:
    XE8下安装IntraWeb 14.0.40和D7下安装IntraWeb 11.0.63破解版的正确方法
    网易博客打不开怎么办
    SQL SERVER 导入EXCEL的存储过程
    TMemoryStream、String与OleVariant互转
    【转载】Delphi Idhttp的get和post方法
    sqlserver得到行号
    Delphi 中的 XMLDocument 类详解(5)
    10款免费且开源的项目管理工具
    iOS开发者必备:九大设计类工具
    15个步骤创立技术公司,并收获千万用户(完结)
  • 原文地址:https://www.cnblogs.com/wxp5257/p/7453897.html
Copyright © 2011-2022 走看看