zoukankan      html  css  js  c++  java
  • 线程

    什么是线程:

                 顾名思义就是一条流水线的工作过程,so,进程只是用来把资源集中到一起(进程只是一个资源单位或者说资 

                  源集合)而线程才是cpu上的执行单位

    DUOXIANCHENG
              在一个进程中存在多个控制线程,多个控制线程共享该进程的地址空间

    线程和进程的区别:

                    线程创建速度快,开销小,多线程共享一个进程的地址空间

     开启线程的两种方式:

      通常本人喜欢用第一种         

    from threading import Thread
    import time
    def sayhi(name):
        time.sleep(2)
        print('%s say hello' %name)
    
    if __name__ == '__main__':
        t=Thread(target=sayhi,args=('egon',))
        t.start()
        print('主线程')
    方式一
    from threading import Thread
    import time
    class Sayhi(Thread):
        def __init__(self,name):
            super().__init__()
            self.name=name
        def run(self):
            time.sleep(2)
            print('%s say hello' % self.name)
    
    
    if __name__ == '__main__':
        t = Sayhi('egon')
        t.start()
        print('主线程')
    方式二

    IPC:进程之间的通信,两种方式,一个是管道,一个是队列
    在一个进程下开启多个线程和在一个进程下开启多个字进程的区别

    from threading import Thread
    from multiprocessing import Process
    import os
    
    def work():
        print('hello')
    
    if __name__ == '__main__':
        #在主进程下开启线程
        t=Thread(target=work)
        t.start()
        print('主线程/主进程')
        '''
        打印结果:
        hello
        主线程/主进程
        '''
    
        #在主进程下开启子进程
        t=Process(target=work)
        t.start()
        print('主线程/主进程')
        '''
        打印结果:
        主线程/主进程
        hello
        '''
    开启速度
    from threading import Thread
    from multiprocessing import Process
    import os
    
    def work():
        print('hello',os.getpid())
    
    if __name__ == '__main__':
        #part1:在主进程下开启多个线程,每个线程都跟主进程的pid一样
        t1=Thread(target=work)
        t2=Thread(target=work)
        t1.start()
        t2.start()
        print('主线程/主进程pid',os.getpid())
    
        #part2:开多个进程,每个进程都有不同的pid
        p1=Process(target=work)
        p2=Process(target=work)
        p1.start()
        p2.start()
        print('主线程/主进程pid',os.getpid())
    pid
    from  threading import Thread
    from multiprocessing import Process
    import os
    def work():
        global n
        n=0
    
    if __name__ == '__main__':
        # n=100
        # p=Process(target=work)
        # p.start()
        # p.join()
        # print('',n) #毫无疑问子进程p已经将自己的全局的n改成了0,但改的仅仅是它自己的,查看父进程的n仍然为100
    
    
        n=1
        t=Thread(target=work)
        t.start()
        t.join()
        print('',n) #查看结果为0,因为同一进程内的线程之间共享进程内的数据
    同一个进程内的线程共享该进程的数据

    练习:三个任务,一个接受用户输入,一个将用户输入的内容格式化成大写,一个将格式化后的结果存入文件

    from threading import Thread
    msg_l=[]
    format_l=[]
    def talk():
        while True:
            msg=input('>>: ').strip()
            if not msg:continue
            msg_l.append(msg)
    
    def format_msg():
        while True:
            if msg_l:
                res=msg_l.pop()
                format_l.append(res.upper())
    
    def save():
        while True:
            if format_l:
                with open('db.txt','a',encoding='utf-8') as f:
                    res=format_l.pop()
                    f.write('%s
    ' %res)
    
    if __name__ == '__main__':
        t1=Thread(target=talk)
        t2=Thread(target=format_msg)
        t3=Thread(target=save)
        t1.start()
        t2.start()
        t3.start()
    View Code


    用于课堂练习的代码

    from threading import Thread,currentThread,activeCount
    import os,time,threading
    def talk():
        time.sleep(2)
        print('%s is running' %currentThread().getName())
    
    if __name__ == '__main__':
        t=Thread(target=talk)
        t.start()
        t.join()
        print('')
    主线程等待子线程结束
    CIL并不是Python的特性,是在python解释器(CPython)时引入的一个概念,有了CIL的存在,同一时刻同一进程中只有一个线程被执行
    from threading import Thread,Lock
    import time
    n=100
    def work():
        # mutex.acquire()
        global n
        temp=n
        time.sleep(0.5)
        n=temp-1
        # mutex.release()
    
    if __name__ == '__main__':
        mutex=Lock()
        t_l=[]
        s=time.time()
        for i in range(100):
            t=Thread(target=work)
            t_l.append(t)
            t.start()
        for t in t_l:
            t.join()
        print('%s:%s' %(time.time()-s,n))
    全局解释器锁
    from threading import Thread
    input_l=[]
    format_l=[]
    def talk():
        while True:
            msg=input('>>: ')
            if not msg:continue
            input_l.append(msg)
    def format():
        while True:
            if input_l:
                res=input_l.pop()
                format_l.append(res.upper())
    def save():
        with open('db.txt','a') as f:
            while True:
                if format_l:
                    f.write('%s
    ' %(format_l.pop()))
                    f.flush()
    
    
    if __name__ == '__main__':
        t1=Thread(target=talk)
        t2=Thread(target=format)
        t3=Thread(target=save)
    
        t1.start()
        t2.start()
        t3.start()
    多线程共享同一个进程内的地址空间
    无论是进程还是线程,都是守护***会等待主***完毕后被销毁,主进程在其他代码结束后就已经算运行完毕了主进程会一直等非守护的子进程都运行完毕后回收进程的资源(否则会产生僵尸进程),才会结束
    主线程在等其他非守护线程都运行完毕才算运行完毕(守护线程在此时被回收),主线程的结束意味着进程的结束,进程整体的资源都被回收,所以主线程必须等到其余的非守护线程都运行完才能结束
    from threading import Thread, currentThread
    from multiprocessing import Process
    import os, time, threading
    
    
    def talk1():
        time.sleep(10)
        print('%s is running' % currentThread().getName())
    
    
    def talk2():
        time.sleep(2)
        print('%s is running' % currentThread().getName())
    
    
    if __name__ == '__main__':
        t1 = Thread(target=talk1)
        t2 = Thread(target=talk2)
        t1.daemon = True
        t1.start()
        t2.start()
        print('主线程', os.getpid())
    守护线程
    from threading import Thread,currentThread,activeCount
    import os,time,threading
    def talk():
        print('%s is running' %currentThread().getName())
    
    if __name__ == '__main__':
        # t=Thread(target=talk,name='egon')
        t=Thread(target=talk)
        t.start()
        # print(t.name)
        # print(t.getName())
        # print(t.is_alive())
        # print(currentThread().getName())
        print(threading.enumerate())
        time.sleep(3)
        print(t.is_alive())
        print('',activeCount())
    线程的其他方法属性

     同步锁

    需要注意的点:

     1.线程抢的是GIL锁,GIL锁相当于执行权限,拿到执行权限后才能拿到互斥锁Lock,其他线程也可以抢到GIL,但如果发现Lock仍然没有被释放则阻塞,即便是拿到执行权限GIL也要立刻交出来

    2.join是等待所有,即整体串行,而锁只是锁住修改共享数据的部分,即部分串行,要想保证数据安全的根本原理在于让并发变成串行,join与互斥锁都可以实现,毫无疑问,互斥锁的部分串行效率要更高

      GIL VS Lock

         锁的目的是为了保护共享的数据,同一时间只能有一个线程来修改共享的数据

        结论:保护不同的数据用不同的锁

        GIL是解释器级别的,保护解释器级别的数据,比如垃圾回收的数据

         Lock保护用户自己开发的应用程序的数据

    所有线程抢的是GIL锁或者说抢的是执行权限,

    拿到执行权限开始执行,然后加了一把Lock,还没有执行完毕

    线程1抢到GIL锁,拿到执行权限,开始执行,然后加了一把Lock,还没有执行完毕,即线程1还未释放Lock,有可能线程2抢到GIL锁,开始执行,执行过程中发现Lock还没有被线程1释放,于是线程2进入阻塞,被夺走执行权限,有可能线程1拿到GIL,然后正常执行到释放Lock。。。这就导致了串行运行的效果

    死锁与递归锁

    所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁

    from threading import Thread,Lock
    import time
    mutexA=Lock()
    mutexB=Lock()
    
    class MyThread(Thread):
        def run(self):
            self.func1()
            self.func2()
        def func1(self):
            mutexA.acquire()
            print('33[41m%s 拿到A锁33[0m' %self.name)
    
            mutexB.acquire()
            print('33[42m%s 拿到B锁33[0m' %self.name)
            mutexB.release()
    
            mutexA.release()
    
        def func2(self):
            mutexB.acquire()
            print('33[43m%s 拿到B锁33[0m' %self.name)
            time.sleep(2)
    
            mutexA.acquire()
            print('33[44m%s 拿到A锁33[0m' %self.name)
            mutexA.release()
    
            mutexB.release()
    
    if __name__ == '__main__':
        for i in range(10):
            t=MyThread()
            t.start()
    
    '''
    Thread-1 拿到A锁
    Thread-1 拿到B锁
    Thread-1 拿到B锁
    Thread-2 拿到A锁
    然后就卡住,死锁了
    '''
    死锁

    解决方法,递归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。

    这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁:

    from threading import Lock,Thread,RLock
    import time
    # mutexA=Lock()
    # mutexB=Lock()
    mutexB=mutexA=RLock()
    class MyThread(Thread):
        def run(self):
            self.f1()
            self.f2()
    
        def f1(self):
            mutexA.acquire()
            print('33[32m%s 拿到A锁' %self.name)
            mutexB.acquire()
            print('33[45m%s 拿到B锁' %self.name)
            mutexB.release()
            mutexA.release()
    
        def f2(self):
            mutexB.acquire()
            print('33[32m%s 拿到B锁' %self.name)
            time.sleep(1)
            mutexA.acquire()
            print('33[45m%s 拿到A锁' %self.name)
            mutexA.release()
            mutexB.release()
    
    if __name__ == '__main__':
        for i in range(10):
            t=MyThread()
            t.start()
    解决方法

    信号量

    同进程的一样

    Semaphore管理一个内置的计数器,
    每当调用acquire()时内置计数器-1;
    调用release() 时内置计数器+1;
    计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。

    from threading import Thread,Semaphore,currentThread
    import time,random
    sm=Semaphore(5)
    def task():
        sm.acquire()
        print('%s 上厕所' %currentThread().getName())
        time.sleep(random.randint(1,3))
        print('%s 走了' %currentThread().getName())
        sm.release()
    if __name__ == '__main__':
        for i in range(20):
            t=Thread(target=task)
            t.start()
    View Code

    与进程池是完全不同的概念,进程池Pool(4),最大只能产生4个进程,而且从头到尾都只是这四个进程,不会产生新的,而信号量是产生一堆线程/进程

    Event

    同进程的一样

    线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行

    event.isSet():返回event的状态值;
    
    event.wait():如果 event.isSet()==False将阻塞线程;
    
    event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
    
    event.clear():恢复event的状态值为False。
    from threading import Thread,Event,currentThread
    import time
    e=Event()
    def conn_mysql():
        count=1
        while not e.is_set():
            if count > 3:
                raise ConnectionError('尝试链接的次数过多')
            print('33[45m%s 第%s次尝试' %(currentThread().getName(),count))
            e.wait(timeout=1)
            count+=1
        print('33[45m%s 开始链接' %currentThread().getName())
    
    def check_mysql():
        print('33[45m%s 检测mysql...' %currentThread().getName())
        time.sleep(2)
        e.set()
    if __name__ == '__main__':
        for i in range(3):
            t=Thread(target=conn_mysql)
            t.start()
        t=Thread(target=check_mysql)
        t.start()
    View Code

    定时器

    from threading import Timer
     
     
    def hello():
        print("hello, world")
     
    t = Timer(1, hello)
    t.start()  # after 1 seconds, "hello, world" will be printed

    线程queue

    queue队列 :使用import queue,用法与进程Queue一样

    class queue.Queue(maxsize=0) #先进先出

    import queue
    
    q=queue.Queue()
    q.put('first')
    q.put('second')
    q.put('third')
    
    print(q.get())
    print(q.get())
    print(q.get())
    '''
    结果(先进先出):
    first
    second
    third
    '''
    先进先出

    class queue.LifoQueue(maxsize=0) #last in fisrt out 

    import queue
    
    q=queue.LifoQueue()
    q.put('first')
    q.put('second')
    q.put('third')
    
    print(q.get())
    print(q.get())
    print(q.get())
    '''
    结果(后进先出):
    third
    second
    first
    '''
    后进先出

    class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列

    import queue
    
    q=queue.PriorityQueue()
    #put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
    q.put((20,'a'))
    q.put((10,'b'))
    q.put((30,'c'))
    
    print(q.get())
    print(q.get())
    print(q.get())

    其他

    Constructor for a priority queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.
    
    The lowest valued entries are retrieved first (the lowest valued entry is the one returned by sorted(list(entries))[0]). A typical pattern for entries is a tuple in the form: (priority_number, data).
    
    exception queue.Empty
    Exception raised when non-blocking get() (or get_nowait()) is called on a Queue object which is empty.
    
    exception queue.Full
    Exception raised when non-blocking put() (or put_nowait()) is called on a Queue object which is full.
    
    Queue.qsize()
    Queue.empty() #return True if empty  
    Queue.full() # return True if full 
    Queue.put(item, block=True, timeout=None)
    Put item into the queue. If optional args block is true and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Full exception if no free slot was available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available, else raise the Full exception (timeout is ignored in that case).
    
    Queue.put_nowait(item)
    Equivalent to put(item, False).
    
    Queue.get(block=True, timeout=None)
    Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Empty exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise the Empty exception (timeout is ignored in that case).
    
    Queue.get_nowait()
    Equivalent to get(False).
    
    Two methods are offered to support tracking whether enqueued tasks have been fully processed by daemon consumer threads.
    
    Queue.task_done()
    Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.
    
    If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).
    
    Raises a ValueError if called more times than there were items placed in the queue.
    
    Queue.join() block直到queue被消费完毕
    View Code

       

  • 相关阅读:
    Flex 布局:语法
    Sublime Text常用快捷键
    WebStorm快捷键操作
    获取token
    Oracle杂记
    YKT文件解析
    杂记_ 关键字
    Python Web 性能和压力测试 multi-mechanize
    详细介绍windows下使用python pylot进行网站压力测试
    python文件和目录操作方法大全
  • 原文地址:https://www.cnblogs.com/1996-11-01-614lb/p/7448691.html
Copyright © 2011-2022 走看看