zoukankan      html  css  js  c++  java
  • 线程

    0进程和线程之间的关系:

          进程是负责资源分配的,线程是实际上帮你执行代码的,线程不能独立存在,必须给他分配资源

    线程和进程的区别:

            进程
                 计算机中最小的资源分配单位
                 数据隔离
                 进程可以独立独立存在
                 创建与销毁 还有切换 都慢 给操作系统压力大
            线程
                           计算机中能被CPU调度的最小单位
                           同一个进程中的多个线程资源共享
                           线程必须依赖进程存在
                           创建与销毁 还有切换 都比进程快很多

      

    为什么要有线程

     线程本身创建出来就是为了解决并发问题的,并且它的整体效率比进程要高,但是线程实际上也有一些性能上的限制管理调度开销

    在整个程序界:
       
       如果你的程序需要数据隔离 : 多进程
          如果你的程序对并发的要求非常高 : 多线程

    socketserver 多线程的
    web的框架 django flask tornado 多线程来接收用户并发的请求

    在整个编程界:
          同一个进程中的多个线程可以同时使用多个cpu

    线程锁这件事儿是有Cpython解释器完成
    对于python来说 同一时刻只能有一个线程被cpu访问
    彻底的解决了多核环境下的安全问题


    线程锁 : 全局解释器锁 GIL:
             1.这个锁是锁线程的
              2.这个锁是解释器提供的

    threading模块

    1.初识线程模块

          from  threading  import  Thread

               process   进程类

               Thread     线程类

    2.开启多线程代码 

     import time
     from threading import Thread
    
     def func():
         print('start')
         time.sleep(1)
         print('end')
    
     if __name__ == '__main__':
         t = Thread(target=func)
         t.start()
         for i in range(5):
             print('主线程')
             time.sleep(0.3)

    说明线程和进程完全异步 说明线程开启速度更快一些

    3.开启线程的另一种方式

    from threading import Thread
    class Mythread(Thread): def __init__(self,arg): #如果想要传参数 super().__init__() self.arg = arg
    def run(self): print('in son',self.arg) t = Mythread(123) t.start()

    4.进程与线程的效率对比 

    #线程 
    import time from threading import Thread def func(): n = 1 + 2 + 3 n ** 2 if __name__ == '__main__': start = time.time() lst = [] for i in range(100): #启多个线程 t = Thread(target=func) t.start() lst.append(t) for t in lst: t.join() print(time.time() - start)

    #进程
    import time
    from multiprocessing import Process as Thread
    def func():
         n = 1 + 2 + 3
         n**2

    if __name__ == '__main__':
         start = time.time()
         lst = []
         for i in range(100):
             t = Thread(target=func)
             t.start()
             lst.append(t)
         for t in lst:
             t.join()
         print(time.time() - start)

    线程效率要比进程快的多

    5.线程的数据共享 

     from threading import Thread
     n = 100
     def func():     #子线程
         global n
         n -= 1
    
     t = Thread(target=func)  #主线程
     t.start()
     t.join()
     print(n)

    在子线程减1,在主线程打印结果,仍然是减完之后的结果,意思我的全局变量对于我的子线程和主线程数据是共享的

    6.threading模块中的其他功能

    第一种方法 

     from threading import Thread,currentThread
    
     class Mythread(Thread):
         def __init__(self,arg):
             super().__init__()
             self.arg = arg
    
         def run(self):
             print('in son',self.arg,currentThread())
    
     
     t = Mythread(123)
     t.start()
    print("主:",currentThread())
    我的ID不同就说明是在不同的线程里面 

    第二种方法

     import time
     from threading import Thread,currentThread,activeCount,enumerate
    class Mythread(Thread): def __init__(self,arg): super().__init__() self.arg = arg
    def run(self): time.sleep(1) print('in son',self.arg,currentThread())
    for i in range(10): t = Mythread(123) t.start() print(t.ident) print(activeCount()) print(enumerate())

    7.多线程实现socket_server  

    server端

    import socket
    from threading import Thread
    def talk(conn):
        while True:
            msg = conn.recv(1024).decode()
            conn.send(msg.upper().encode())
    
    sk = socket.socket()
    sk.bind(('127.0.0.1',9000))
    sk.listen()
    while True:
        conn,addr = sk.accept()
        Thread(target=talk,args = (conn,)).start()

    client端

    import socket
    sk = socket.socket()
    
    
    sk.connect(('127.0.0.1',9000))
    while True:
        sk.send(b'hello')
        print(sk.recv(1024)) 

    守护线程

    守护进程: 
        只守护主进程的代码,主进程代码结束了就结束守护,守护进程在主进程之前结束
    守护线程: 
        随着主线程的结束才结束,守护线程是怎么结束的

    进程 terminate 强制结束一个进程的
    线程 没有强制结束的方法
    线程结束 : 线程内部的代码执行完毕 那么就自动结束了
    import time
    from threading import Thread def func(): while True: print('in func') time.sleep(0.5) def func2(): print('start func2') time.sleep(10) print('end func2') Thread(target=func2).start() t = Thread(target=func) t.setDaemon(True) t.start() print('主线程') time.sleep(2) print('主线程结束')

     锁 

     锁是用来做什么的?

        保证数据的安全的

    GIL是干什么的?

        锁线程

    有了GIL还要锁干啥?

        有了GIL还是会出现数据不安全的现象,所以还是要用锁

    线程也会出现数据不安全的现象:

     import time
     from threading import Thread,Lock
     n = 100
     def func(lock):
         global n
             tmp = n-1  # n-=1
             time.sleep(0.1)
             n = tmp
    
     if __name__ == '__main__':
         l = []
         for i in range(100):
             t = Thread(target=func)
             t.start()
             l.append(t)
         for t in l:t.join()
         print(n)
    

      dis模块使用

    import dis
    n = 1
    def func():
        n = 100
        n -= 1
    
    dis.dis(func)
    
    

    会出现线程不安全的俩种条件:

         1.是全局变量

       2.出现+=  -= 这样的操作    l[0] +=1   d[k] +=1

    列表  字典的方法  l.append  l.pop   l.insert   dic.update  都是线程安全的    

     线程加锁:

     import time
     from threading import Thread,Lock
     n = 100
     def func(lock):
         global n
         with lock:
             tmp = n-1  # n-=1
             time.sleep(0.1)
             n = tmp
    
     if __name__ == '__main__':
         l = []
         lock = Lock()
         for i in range(100):
             t = Thread(target=func,args=(lock,))
             t.start()
             l.append(t)
         for t in l:t.join()
         print(n)
    

    死锁现象 

    死锁问题是怎么出现的:

    import time
    from threading import Thread,Lock
     noodle_lock = Lock()
     fork_lock = Lock()
     def eat1(name):
         noodle_lock.acquire()
         print('%s拿到面条了'%name)
         fork_lock.acquire()
         print('%s拿到叉子了'%name)
         print('%s开始吃面'%name)
         time.sleep(0.2)
         fork_lock.release()
         print('%s放下叉子了' % name)
         noodle_lock.release()
         print('%s放下面了' % name)
    
     def eat2(name):
         fork_lock.acquire()
         print('%s拿到叉子了' % name)
         noodle_lock.acquire()
         print('%s拿到面条了' % name)
         print('%s开始吃面' % name)
         time.sleep(0.2)
         noodle_lock.release()
         print('%s放下面了' % name)
         fork_lock.release()
         print('%s放下叉子了' % name)
    
     Thread(target=eat1,args=('alex',)).start()
     Thread(target=eat2,args=('wusir',)).start()
     Thread(target=eat1,args=('太白',)).start()
     Thread(target=eat2,args=('宝元',)).start()

    死锁不是时刻发生的,有偶然的情况整个程序崩溃了 

    每一个线程之中不止一把锁,并且套着使用容易出现死锁现象

    如果某一件事情需要两个资源同时出现,那么不应该将这两个资源通过两把锁控制而应看做一个资源

     解决死锁问题:

    import time
    from threading import Thread,Lock
     lock = Lock()
     def eat1(name):
         lock.acquire()
         print('%s拿到面条了'%name)
         print('%s拿到叉子了'%name)
         print('%s开始吃面'%name)
         time.sleep(0.2)
         lock.release()
         print('%s放下叉子了' % name)
         print('%s放下面了' % name)
    
     def eat2(name):
         lock.acquire()
         print('%s拿到叉子了' % name)
         print('%s拿到面条了' % name)
         print('%s开始吃面' % name)
         time.sleep(0.2)
         lock.release()
         print('%s放下面了' % name)
         print('%s放下叉子了' % name)
    
     Thread(target=eat1,args=('alex',)).start()
     Thread(target=eat2,args=('wusir',)).start()
     Thread(target=eat1,args=('太白',)).start()
     Thread(target=eat2,args=('宝元',)).start()
    

    递归锁

     互斥锁
           无论在相同的线程还是不同的线程,都只能连续acquire一次,要想再acquire,必须先release
     递归锁
           在同一个线程中,可以无限次的acquire
           但是要想在其他线程中也acquire,
           必须现在自己的线程中添加和acquire次数相同的release

    from threading import RLock,Lock
    1.递归锁 rlock = RLock() # RLock在线程中永远不会被锁住的 rlock.acquire() rlock.acquire() rlock.acquire() rlock.acquire() print('锁不住')
    2.互斥锁 lock = Lock() lock.acquire() print('1') lock.acquire() print('2')     #永远打印不出来

    递归锁在多个线程中是怎么回事

    from threading import RLock,Lock,Thread
     rlock = RLock()
     def func(num):
         rlock.acquire()
         print('aaaa',num)
         rlock.acquire()
         print('bbbb',num)
    rlock.release() #俩次release rlock.release() Thread(target=func,args=(1,)).start() Thread(target=func,args=(2,)).start() #阻塞 只有上面执行完代码后经历俩次release把钥匙还回来,才可以执行

    用递归锁解决死锁问题:

    import time
    noodle_lock = fork_lock = RLock()
    def eat1(name): noodle_lock.acquire() print('%s拿到面条了'%name) fork_lock.acquire() print('%s拿到叉子了'%name) print('%s开始吃面'%name) time.sleep(0.2) fork_lock.release() print('%s放下叉子了' % name) noodle_lock.release() print('%s放下面了' % name) def eat2(name): fork_lock.acquire() print('%s拿到叉子了' % name) noodle_lock.acquire() print('%s拿到面条了' % name) print('%s开始吃面' % name) time.sleep(0.2) noodle_lock.release() print('%s放下面了' % name) fork_lock.release() print('%s放下叉子了' % name) Thread(target=eat1,args=('alex',)).start() Thread(target=eat2,args=('wusir',)).start() Thread(target=eat1,args=('太白',)).start() Thread(target=eat2,args=('宝元',)).start()

      信号量

    信号量= 锁+计数器 实现的功能

    import time
    from threading import Semaphore,Thread
    
     def func(name,sem):
         sem.acquire()
         print(name,'start')
         time.sleep(1)
         print(name,'stop')
         sem.release()
    
     sem = Semaphore(5)  #信号量给了你五把钥匙,五个线程可以来执行
     for i in range(20):
         Thread(target=func,args=(i,sem)).start()

    信号量和池
         进程池
               有1000个任务
               一个进程池中有5个进程
               所有的1000个任务会多次利用这五个进程来完成任务
         信号量
               有1000个任务
               有1000个进程/线程
               所有的1000个任务由于信号量的控制,只能5个5个的执行  

    事件

    连接数据库:

    from threading import Event      #Event 事件 wait() 阻塞 到事件内部标识为True就停止阻塞
    import time             #控制标识  set  clear   is_set
    import random
    from threading import Thread,Event
    def connect_sql(e): count = 0 while count < 3: e.wait(0.5) if e.is_set(): print('连接数据库成功') break else: print('数据库未连接成功') count += 1 def test(e): time.sleep(random.randint(0,3)) e.set() e = Event() Thread(target=test,args=(e,)).start() Thread(target=connect_sql,args=(e,)).start()
    
    

    定时器

    定时器,指定n秒后执行某个操作

    from threading import Timer
    
    def func():
        print('执行我啦')
    
    t = Timer(3,func)
    # 现在这个时间点我不想让它执行,而是预估一下大概多久之后它执行比较合适
    t.start()
    print('主线程的逻辑')
    

     条件 

    wait      阻塞
    notify(n) 给信号
     假如现在有20个线程
     所有的线程都在wait这里阻塞
     notify(n) n传了多少
     那么wait这边就能获得多少个解除阻塞的通知

     

    import threading
    
    def run(n):
        con.acquire()
        con.wait()  #阻塞
        print("run the thread: %s" % n)
        con.release()
    
    if __name__ == '__main__':
    
        con = threading.Condition()
        for i in range(10):
            t = threading.Thread(target=run, args=(i,))
            t.start()
    
        while True:
            inp = input('>>>')
            if inp == 'q':
                break
            con.acquire() 输入几就有几个线程往下执行
            con.notify(int(inp))
            con.release()
            print('****')
    

     线程队列

    queue : 线程队列,线程之间数据安全

    q = queue . Queue()

    #普通队列

    q.get()

    q.put()

    q.get()

    q.get_nowait()   如果有数据我就取,如果没数据不阻塞而是报错

    q.put_nowait()

    class queue.Queue(maxsize=0) #先进先出

    import queue

    q=queue.Queue()
    q.put('first')
    q.put('second')
    q.put('third')

    print(q.get())
    print(q.get())
    print(q.get())
    '''
    结果(先进先出):
    first
    second
    third
    '''

    class queue.LifoQueue(maxsize=0) #last in fisrt out

    import queue
    
    q=queue.LifoQueue()
    q.put('first')
    q.put('second')
    q.put('third')
    
    print(q.get())
    print(q.get())
    print(q.get())
    '''
    结果(后进先出):
    third
    second
    first
    '''

    class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列

    q=queue.PriorityQueue()
    #put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
    q.put((20,'a'))
    q.put((10,'b'))
    q.put((30,'c'))
    
    print(q.get())
    print(q.get())
    print(q.get())
    '''
    结果(数字越小优先级越高,优先级高的优先出队):
    (10, 'b')
    (20, 'a')
    (30, 'c')
    '''
    

     线程池 

    1.介绍

    concurrent.futures模块提供了高度封装的异步调用接口
    ThreadPoolExecutor:线程池,提供异步调用
    ProcessPoolExecutor: 进程池,提供异步调用
    Both implement the same interface, which is defined by the abstract Executor class.

    2.基本方法

    submit(fn, *args, **kwargs)      异步提交任务
    map(func, *iterables, timeout=None, chunksize=1)       取代for循环submit的操作
    shutdown(wait=True)      相当于进程池的pool.close()+pool.join()操作

    wait=True,等待池内所有任务执行完毕回收完资源后才继续
    wait=False,立即返回,并不会等待池内的任务执行完毕
    但不管wait参数为何值,整个程序都会等到所有任务执行完毕
    submit和map必须在shutdown之前

    result(timeout=None)     取得结果
    add_done_callback(fn)   回调函数 

    1.获取结果的例子
    import time import random from threading import currentThread from concurrent.futures import ThreadPoolExecutor
    def func(num): print('in %s func'%num,currentThread()) time.sleep(random.random()) return num**2 tp = ThreadPoolExecutor(5) 创建五个线程 分别去执行func里的任务 ret_l = [] for i in range(30): ret = tp.submit(func,i) ret_l.append(ret) for ret in ret_l: print(ret.result())

    2.shutdown 的特点 相当于close + join
    import time
    import random
    from threading import currentThread
    from concurrent.futures import ThreadPoolExecutor

    def func(num):
         print('in %s func'%num,currentThread())
         time.sleep(random.random())
         return num**2

      tp = ThreadPoolExecutor(5)
    ret_l = []
      for i in range(30):
             ret = tp.submit(func,i)
             ret_l.append(ret)
      tp.shutdown()  # close + join
      for ret in ret_l:
             print(ret.result())

    3.进程池和线程池只需要修改一个地方 (线程池和进程池平滑的切换)
    import time
    import random
    from threading import currentThread
    from concurrent.futures import ThreadPoolExecutor,processPoolExecutor

    def func(num):
         print('in %s func'%num,currentThread())
         time.sleep(random.random())
         return num**2

    if__nmae__=="__main__":
      #tp = ThreadPoolExecutor(5) 注释掉改成processPoolExecutor
    tp =processPoolExecutor (5) 就变成进程池了不是线程池了
      ret_l = []
      for i in range(30):
             ret = tp.submit(func,i)
             ret_l.append(ret)
      tp.shutdown()  # close + join
      for ret in ret_l:
             print(ret.result())


    3.或者另一种引入模块
    import time
    import random
    from threading import currentThread
    from concurrent.futures import ThreadPoolExecutor as Pool
    # from concurrent.futures import ProcessPoolExecutor as Pool
    import os def func(num): # print('in %s func'%num,currentThread()) print('in %s func'%num,os.getpid()) time.sleep(random.random()) return num**2 if __name__ == '__main__': # tp = ThreadPoolExecutor(5) tp = Pool(5) 直接用pool ret_l = [] for i in range(30): ret = tp.submit(func,i) ret_l.append(ret) tp.shutdown() # close + join for ret in ret_l: print(ret.result())

    # 创建一个池
    # 提交任务 submit
    # 阻塞直到任务完成(close + join) shutdown
    # 获取结果 result

    简便用法 map

     import os
    def func(num): print('in %s func'%num,os.getpid()) time.sleep(random.random()) return num**2
    if __name__ == '__main__': tp = Pool(5) ret = tp.map(func,range(30)) for i in ret: print(i)

    回调函数 add_done_callback

     def func1(num):
         print('in func1 ',num,currentThread())
         return num*'*'
    
     def func2(ret):
         print('--->',ret.result(),currentThread())
    tp = Pool(5) 创建线程池 print('主 : ',currentThread()) for i in range(10): tp.submit(func1,i).add_done_callback(func2) 回调函数收到的参数是需要使用result()获取的 回调函数是由谁执行的? 主线程

    线程爬虫的例子

    from urllib.request import urlopen
    def func(name,url): content = urlopen(url).read() 打开url获取网页内容 content是一个byts类型 return name,content def parserpage(ret): name,content = ret.result() with open(name,'wb') as f: f.write(content) urls = { 'baidu.html':'https://www.baidu.com', 'python.html':'https://www.python.org', 'openstack.html':'https://www.openstack.org', 'github.html':'https://help.github.com/', 'sina.html':'http://www.sina.com.cn/' } tp = Pool(2) for k in urls: tp.submit(func,k,urls[k]).add_done_callback(parserpage)

      

  • 相关阅读:
    async await promise 执行时序
    理解prototype
    X-Requested-With
    event事件传播规则
    【小文】Flask web Hello程序!
    【小文】php-gtk: Hello!
    【小文】HTTP 状态码
    【小文】Python环境安装配置
    C语言:趣味小问题 鸡兔同笼
    C语言:趣味小问题 百钱买百鸡
  • 原文地址:https://www.cnblogs.com/baoshuang0205/p/10060990.html
Copyright © 2011-2022 走看看