zoukankan      html  css  js  c++  java
  • 线程

    一.线程

      定义:进程中某个单一顺序的执行流。进程可以看作工厂的一个车间,每个线程相当于车间中的流水线。

      多个线程就是,一个进程中存在多个线程,多个线程共享所在进程的所有资源。

      线程与进程的区别:

      1.线程共享创建它的进程的地址空间;进程有自己的地址空间。

      2.线程可以直接访问其进程的数据段;进程有自己的父进程数据段的副本。

      3.线程可以与进程的其他线程直接通信;进程必须使用进程间通信来与同级进程通信。

      4.新线程很容易创建;新进程需要父进程的重复。

      5.线程可以对同一进程的线程进行相当大的控制;进程只能对子进程执行控制。

      6.对主线程的更改(取消、优先级变更等)可能会影响进程的其他线程的行为;对父进程的更改不会影响子进程。

      使用多线程的原因:

      1.多线程共享一个地址的内存空间

      2.线程比进程更轻量级

      3. 若多个线程都是cpu密集型的,那么并不能获得性能上的增强,但是如果存在大量的计算和大量的I/O处理,拥有多个线程允许这些活动彼此重叠运行,从而会加快程序执行的速度。

      threading 模块

        https://docs.python.org/3/library/threading.html?highlight=threading#

      开启线程的两种方式:

    #方式一
    from threading import Thread
    import time
    def sayhi(name):
        time.sleep(2)
        print('%s say hello' %name)
    if __name__ == '__main__':
        t=Thread(target=sayhi,args=('egon',))
        t.start()
        print('主线程')
    #方式二
    from threading import Thread
    import time
    class Sayhi(Thread):
        def __init__(self,name):
            super().__init__()
            self.name=name
        def run(self):
            time.sleep(2)
            print('%s say hello' % self.name)
    
    if __name__ == '__main__':
        t = Sayhi('egon')
        t.start()
        print('主线程')

      多进程并发的socket服务端和客户端

    #服务端
    import multiprocessing
    import threading
    import socket
    s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    s.bind(('127.0.0.1',8080))
    s.listen(5)
    def action(conn):
        while True:
            data=conn.recv(1024)
            print(data)
            conn.send(data.upper())
    if __name__ == '__main__':
        while True:
            conn,addr=s.accept()
            p=threading.Thread(target=action,args=(conn,))
            p.start()
    #客户端
    import socket
    s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    s.connect(('127.0.0.1',8080))
    
    while True:
        msg=input('>>: ').strip()
        if not msg:continue
        s.send(msg.encode('utf-8'))
        data=s.recv(1024)
        print(data)

      线程常用的方法

    Thread实例对象的方法
      # isAlive(): 返回线程是否活动的。
      # getName(): 返回线程名。
      # setName(): 设置线程名。
    threading模块提供的一些方法:
      # threading.currentThread(): 返回当前的线程变量。
      # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
      # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。

      实例:

    from threading import Thread
    import threading
    from multiprocessing import Process
    import os
    
    def work():
        import time
        time.sleep(3)
        print(threading.current_thread().getName())
    if __name__ == '__main__':
        #在主进程下开启线程
        t=Thread(target=work)
        t.start()
    
        print(threading.current_thread().getName())
        print(threading.current_thread()) #主线程
        print(threading.enumerate()) #连同主线程在内有两个运行的线程
        print(threading.active_count())
        print('主线程/主进程')

      主线程等待子线程结束

    from threading import Thread
    import time
    def sayhi(name):
        time.sleep(2)
        print('%s say hello' %name)
    
    if __name__ == '__main__':
        t=Thread(target=sayhi,args=('egon',))
        t.start()
        t.join()
        print('主线程')
        print(t.is_alive())

      守护线程

        守护进程与守护线程的区别:

      1.主进程在其代码结束后就已经算运算完毕了(守护进程在此时被收回)主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束,

      2 主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。主线程的结束意味着进程的结束,进程整体的资源都被回收,因而主线程必须在其余非守护线程都运行完毕后才能结束

    from threading import Thread
    import time
    def sayhi(name):
        time.sleep(2)
        print('%s say hello' %name)
    
    if __name__ == '__main__':
        t=Thread(target=sayhi,args=('egon',))
        t.setDaemon(True) #必须在t.start()之前设置
        t.start()
    
        print('主线程')
        print(t.is_alive())

      python GIL锁

      定义:在CPython,全局解释器锁,或吉尔,是一个互斥体,防止多本地线程执行Python字节码一次。这种锁是必要的。因为Cpython的内存管理不是线程安全的。

      注意:GIL并不是Python的特性,Python完全可以不依赖于GIL

       多进程与多线程的应用:

        多线程用于IO密集型,如socket,爬虫,web
        多进程用于计算密集型,如金融分析  

      过程分析:所有线程抢的是GIL锁,或者说所有线程抢的是执行权限

      线程1抢到GIL锁,拿到执行权限,开始执行,然后加了一把Lock,还没有执行完毕,即线程1还未释放Lock,有可能线程2抢到GIL锁,开始执行,执行过程中发现Lock还没有被线程1释放,于是线程2进入阻塞,被夺走执行权限,有可能线程1拿到GIL,然后正常执行到释放Lock,这就导致了串行运行的效果

       死锁与递归锁

       死锁:两个或者两个以上的线程在执行的过程中,因争夺资源造成的互相等待的现象,如果外力不进行干预,将会一直等待下去。

    from threading import Thread,Lock
    import time
    mutexA=Lock()
    mutexB=Lock()
    class MyThread(Thread):
        def run(self):
            self.func1()
            self.func2()
        def func1(self):
            mutexA.acquire()
            print('33[41m%s 拿到A锁33[0m' %self.name)
            mutexB.acquire()
            print('33[42m%s 拿到B锁33[0m' %self.name)
            mutexB.release()
            mutexA.release()
        def func2(self):
            mutexB.acquire()
            print('33[43m%s 拿到B锁33[0m' %self.name)
            time.sleep(2)
    
            mutexA.acquire()
            print('33[44m%s 拿到A锁33[0m' %self.name)
            mutexA.release()
    
            mutexB.release()
    
    if __name__ == '__main__':
        for i in range(10):
            t=MyThread()
            t.start()
    '''
    Thread-1 拿到A锁
    Thread-1 拿到B锁
    Thread-1 拿到B锁
    Thread-2 拿到A锁
    然后就卡住,死锁了
    '''

      递归锁

       解决上面的问题用递归锁 Rlock

    from threading import Lock,Thread,RLock
    import time
    mutexB = mutexA=RLock()
    class MyThread(Thread):
        def run(self):
            self.f1()
            self.f2()
        def f1(self):
            mutexA.acquire()
            print('33[39m%s 拿到A锁'%self.name)
            mutexB.acquire()
            print('33[40m%s 拿到B锁'%self.name)
            mutexB.release()
            mutexA.release()
        def f2(self):
            mutexB.acquire()
            print('33[39m%s 拿到B锁'%self.name)
            time.sleep(1)
            mutexA.acquire()
            print('33[40m%s 拿到A锁' % self.name)
            mutexA.release()
            mutexB.release()
    if __name__ == '__main__':
        for i in range(10):
            t= MyThread()
            t.start()
    #
    mutexA=mutexB=threading.RLock() #一个线程拿到锁,counter加1,该线程内又碰到加锁的情况,则counter继续加1,这期间所有其他线程都只能等待,等待该线程释放所有锁,即counter递减到0为止
    
    

      信号量Semaphore  

      Semaphore管理一个内置的计数器,每当调用acquire()时内置计数器-1;调用release() 时内置计数器+1;计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。

      实例:(同时只有5个线程可以获得semaphore,即可以限制最大连接数为5):

    from threading import Thread,Semaphore
    import threading
    import time
    def func():
        sm.acquire()
        print('%s get sm' %threading.current_thread().getName())
        time.sleep(3)
        sm.release()
    if __name__ == '__main__':
        sm=Semaphore(5)
        for i in range(20):
            t=Thread(target=func)
            t.start()

      与进程池是完全不同的概念,进程池Pool(4),最大只能产生4个进程,而且从头到尾都只是这四个进程,不会产生新的,而信号量是产生一堆线程/进程

       Event事件

       Python提供了Event对象用于线程间通信,它是由线程设置的信号标志,如果信号标志位真,则其他线程等待直到信号接触。

         Event对象实现了简单的线程通信机制,它提供了设置信号,清除信号,等待等用于实现线程间的通信。

         1 设置信号

         使用Event的set()方法可以设置Event对象内部的信号标志为真。Event对象提供了is_Set()方法来判断其内部信号标志的状态。当使用event对象的set()方法后,isSet()方法返回真

         2 清除信号

         使用Event对象的clear()方法可以清除Event对象内部的信号标志,即将其设为假,当使用Event的clear方法后,isSet()方法返回假

         3 等待

         Event对象wait的方法只有在内部信号为真的时候才会很快的执行并完成返回。当Event对象的内部信号标志位假时,则wait方法一直等待到其为真时才返回。

      实例:

    from threading import Thread,Event
    import threading
    import time,random
    def conn_mysql():
        count=1
        while not event.is_set():
            if count > 3:
                raise TimeoutError('链接超时')
            print('<%s>第%s次尝试链接' % (threading.current_thread().getName(), count))
            event.wait(0.5)
            count+=1
        print('<%s>链接成功' %threading.current_thread().getName())
    
    
    def check_mysql():
        print('33[45m[%s]正在检查mysql33[0m' % threading.current_thread().getName())
        time.sleep(random.randint(2,4))
        event.set()
    if __name__ == '__main__':
        event=Event()
        conn1=Thread(target=conn_mysql)
        conn2=Thread(target=conn_mysql)
        check=Thread(target=check_mysql)
    
        conn1.start()
        conn2.start()
        check.start()

      定时器

    from threading import Timer 
    def hello():
        print("hello, world")
    t = Timer(1, hello)
    t.start()  # after 1 seconds, "hello, world" will be printed

      线程queue

      queue队列 :使用import queue,用法与进程Queue一样

      class queue.Queue(maxsize=0) #先进先出

    import queue
    q=queue.Queue()
    q.put('first')
    q.put('second')
    q.put('third')
    print(q.get())
    print(q.get())
    print(q.get())
    '''
    结果(先进先出):
    first
    second
    third
    '''

      class queue.LifoQueue(maxsize=0) #last in fisrt out 

    import queue
    
    q=queue.LifoQueue()
    q.put('first')
    q.put('second')
    q.put('third')
    
    print(q.get())
    print(q.get())
    print(q.get())
    '''
    结果(后进先出):
    third
    second
    first
    '''

      class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列

    import queue
    
    q=queue.PriorityQueue()
    #put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
    q.put((20,'a'))
    q.put((10,'b'))
    q.put((30,'c'))
    
    print(q.get())
    print(q.get())
    print(q.get())
    '''
    结果(数字越小优先级越高,优先级高的优先出队):
    (10, 'b')
    (20, 'a')
    (30, 'c')
    '''

       concurrent.futures模块

      Python标准库为我们提供了threading和multiprocessing模块编写相应的多线程/多进程代码,但是当项目达到一定的规模,频繁创建/销毁进程或者线程是非常消耗资源的,这个时候我们就要编写自己的线程池/进程池,以空间换时间。但从Python3.2开始,标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutor和ProcessPoolExecutor两个类,实现了对threading和multiprocessing的进一步抽象,对编写线程池/进程池提供了直接的支持。

      concurrent.futures模块的基础是Exectuor,Executor是一个抽象类,它不能被直接使用。但是它提供的两个子类ThreadPoolExecutor和ProcessPoolExecutor却是非常有用,顾名思义两者分别被用来创建线程池和进程池的代码。我们可以将相应的tasks直接放入线程池/进程池,不需要维护Queue来操心死锁的问题,线程池/进程池会自动帮我们调度。

      ThreadPoolExecutor:

       submitfn* args** kwargs ) 用来异步开启多个进程,并返回一个对象。

        shutdownwait = True ), 向执行人发出信号,当目前待定的期货完成执行时,它应该释放它正在使用的任何资源,相当于原先的close()+ join()的作用

    # from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    # import os,time,random
    # def task(n):
    #     print('%s is running'%os.getpid())
    #     time.sleep(random.randint(1,3))
    #     return n **2
    # if __name__ == '__main__':
    #     p = ProcessPoolExecutor()
    #     l = []
    #     for i in range(10):
    #         obj = p.submit(task,i)
    #         l.append(obj)
    #     p.shutdown()
    #     print('='*30)
    #     print([obj.result() for obj in l])

      ThreadPoolExecutor:

    # from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    # import threading
    # import os,time,random
    # def task(n):
    #     print('%s:%s is running' %(threading.currentThread().getName(),os.getpid()))
    #     time.sleep(2)
    #     return n**2
    # if __name__ == '__main__':
    #     p=ThreadPoolExecutor()
    #     l=[]
    #     start=time.time()
    #     for i in range(10):
    #         obj=p.submit(task,i)
    #         l.append(obj)
    #     p.shutdown()
    #     print('='*30)
    #     print([obj.result() for obj in l])
    #     print(time.time()-start)
    #异步执行
    #from concurrent.futures import #ProcessPoolExecutor,ThreadPoolExecutor
    # import os,time,random
    # def task(n):
    #     print('%s is running' %os.getpid())
    #     time.sleep(2)
    #     return n**2
    # if __name__ == '__main__':
    #     p=ProcessPoolExecutor()
    #     start=time.time()
    #     for i in range(10):
    #         res=p.submit(task,i).result()
    #         print(res)
    #     print('='*30)
    #     print(time.time()-start)
    # 同步执行

     

     

      

  • 相关阅读:
    6-1
    4-9
    4-5
    4-4
    4-3
    3-10
    作业三2
    作业三1
    课堂练习二
    实验三
  • 原文地址:https://www.cnblogs.com/sxh-myblogs/p/7444325.html
Copyright © 2011-2022 走看看