zoukankan      html  css  js  c++  java
  • Day10 多线程理论 开启线程

    多线程:

    多线程和多进程的不同是他们占用的资源不一样,

    一个进程里边可以包含一个或多个进程,

    进程的开销大,线程的开销小。

    打个比方来说:创建一个进程,就是创建一个车间。创建一个线程,就是在一个车间创建一个流水线。


    怎么去开启一个线程:

    方法一(直接用默认的类):

     1 开启线程的方式一:使用替换threading模块提供的Thread
     2 from threading import Thread
     3 from multiprocessing import Process
     4 
     5 def task():
     6     print('is running')
     7 
     8 if __name__ == '__main__':
     9     t=Thread(target=task,)
    10     # t=Process(target=task,)
    11     t.start()
    12     print('')

    方法二(自己定义一个类,继承系统的类):

     1 #开启线程的方式二:自定义类,继承Thread
     2 from threading import Thread
     3 from multiprocessing import Process
     4 class MyThread(Thread):
     5     def __init__(self,name):
     6         super().__init__()    #不破坏原有的类
     7         self.name=name
     8     def run(self):
     9         print('%s is running' %self.name)
    10 
    11 if __name__ == '__main__':
    12     t=MyThread('egon')
    13     # t=Process(target=task,)
    14     t.start()
    15     print('')

    在同时开始多个线程和多个进程的时候,多个线程的pid是不一样的,多个线程的pid是一样的。

    可以利用这段代码测试下:

    from threading import Thread
    from multiprocessing import Process
    import os
    
    def task():
        print('%s is running' %os.getpid())
    
    if __name__ == '__main__':
        # t1=Thread(target=task,)
        # t2=Thread(target=task,)
        t1=Process(target=task,)
        t2=Process(target=task,)
        t1.start()
        t2.start()
        print('',os.getpid())

    多个线程是共享同一个进程内的资源的。


    Thread的其他相关的属性或者方法:

    from threading import Thread,activeCount,enumerate,current_thread
    import time
    def task():
        print('%s is running' %current_thread().getName())
        time.sleep(2)
    
    if __name__ == '__main__':
        t=Thread(target=task,)
        t.start()
        t.join()
        print(t.is_alive())     #判断线程是否是活着的     
        print(t.getName())    #获取到线程名
        print(enumerate())    #显示当前活跃的进程对象
        print('主')
        print(activeCount())   #查看活着的线程数,一般是一个主进程(相当于也是一个线程)和开启的数量
    

      

    current_thread的用法:

    current_thread的用法
    from threading import Thread,activeCount,enumerate,current_thread
    from multiprocessing import Process
    import time
    
    def task():
        print('%s is running' %current_thread().getName())
        time.sleep(2)
    
    if __name__ == '__main__':
        p=Process(target=task)
        p.start()
        print(current_thread())
    

    主线程从执行层面上代表了其所在进程的执行过程。


    守护线程:

    无论是进程还是线程,都是遵循:守护XXX会等待主XXX运行完毕后被销毁。

    需要强调的是:运行完毕并非终止运行。

    主进程运行完了还需要等待子进程运行。

    先看:守护进程
    
    from multiprocessing import Process
    import time
    
    def task1():
        print('123')
        time.sleep(1)
        print('123done')
    
    def task2():
        print('456')
        time.sleep(10)
        print('456done')
    
    if __name__ == '__main__':
        p1=Process(target=task1)
        p2=Process(target=task2)
        p1.daemon = True           #p1加了守护进程
        p1.start()
        p2.start()
        print('主')
    

    p1加了守护进程后,在主进程运行结束后,p1的进程也死掉。

    p2没有加守护进程,在主进程运行结束后,p2会继续运行。

    #再看:守护线程
    
    from threading import Thread
    import time
    
    def task1():
        print('123')
        time.sleep(10)
        print('123done')
    
    def task2():
        print('456')
        time.sleep(1)
        print('456done')
    
    if __name__ == '__main__':
        t1=Thread(target=task1)
        t2=Thread(target=task2)
        t1.daemon=True
        t1.start()
        t2.start()
        print('主')
    

    主线程会等其他线程(相当于子线程)运行完毕,程序才会结束。

    如果(子)线程加上了守护线程,当主线程运行结束后,(子)线程没有运行结束的都会被干掉。

    需要特别注意的一点,这里如果还有其他线程没有运行完,主线程是不会结束的。


    GIL全局解释器锁:

    GIL本质就是一把互斥锁。

    这把锁的意义在于在代码执行的时候,防止多段代码抢占资源。

    在Cpython解释器中,同一个进程下开启多个多线程,同一时刻只能有一个线程执行,无法利用多核优势。

    from threading import Thread
    n=100
    def task():
        print('is running')
    
    if __name__ == '__main__':
        t1=Thread(target=task,)
        t2=Thread(target=task,)
        t3=Thread(target=task,)
        # t=Process(target=task,)
        t1.start()
        t2.start()
        t3.start()
        print('')

    比如说python执行这样一个程序:

    python解释器除了自己开的线程外,还有一些其他的线程,比如说内存回收机制。

    线程1,2,3,4都是同一段代码,这三段代码都是要交给python解释器来执行的。

    当python代码和内存回收机制同时进行的时候,可能会将数据紊乱。

     所以为了防止这种情况:

    可以在解释器的代码前后加入这些代码:

    GIL.acquire()
    解释器的代码()
    GIL.release()
    

     

    python多线程利用不了多核优势,多进程能够利用多核优势。

     1 #线程的互斥锁
     2 from threading import Thread,Lock
     3 import time
     4 n=100
     5 def work():
     6     global n
     7     mutex.acquire()    #加GIL锁
     8     temp=n
     9     time.sleep(0.1)
    10     n=temp-1
    11     mutex.release()    #解GIL锁
    12 
    13 if __name__ == '__main__':
    14     mutex=Lock()      #造出这把所
    15     l=[]
    16     start=time.time()
    17     for i in range(100):
    18         t=Thread(target=work)
    19         l.append(t)
    20         t.start()
    21 
    22     for t in l:
    23         t.join()
    24     print('run time:%s value:%s' %(time.time()-start,n))

    互斥锁和join的区别:

    上面的互斥锁的代码,下面是join的代码:

    from threading import Thread,Lock
    import time
    n=100
    def work():
        time.sleep(0.05)
        global n
        temp=n
        time.sleep(0.1)
        n=temp-1
    
    
    if __name__ == '__main__':
        start=time.time()
        for i in range(100):
            t=Thread(target=work)
            t.start()
            t.join()
    
        print('run time:%s value:%s' %(time.time()-start,n))
    

    join和互斥锁的代码的区别是:

    join是将每一个循环斗都执行了,相当于所有的代码串行了。

    而互斥锁是只将加锁的代码串行,其他的代码串行。

    总结一个结论:如果碰到的是纯IO的操作,利用不了多核优势,一般利用多线程。

    如果碰到的是计算的操作,能够利用多核的优势,一般利用多进程

    #多线程
    优点:开销小 缺点:不能利用多核优势 from threading import Thread from multiprocessing import Process import time #计算密集型 def work(): res=1 for i in range(100000000): res+=i if __name__ == '__main__': p_l=[] start=time.time() for i in range(4): # p=Process(target=work) #6.7473859786987305 p=Thread(target=work) #24.466399431228638 p_l.append(p) p.start() for p in p_l: p.join() print(time.time()-start) #从运算结果上得到结果可以看到,在有4核的机器上,纯计算的多线程程序能利用多核优势,多线程无法利用多核优势。
    from threading import Thread
    from multiprocessing import Process
    import time
    #IO密集型
    def work():
        time.sleep(2)
    
    if __name__ == '__main__':
        p_l=[]
        start=time.time()
        for i in range(400):
            # p=Process(target=work) #12.104692220687866
            p=Thread(target=work) #2.038116455078125
            p_l.append(p)
            p.start()
        for p in p_l:
            p.join()
    
        print(time.time()-start)
    
    
    #IO密集型程序能够利用单核优势,开线程比开进程的优势在于线程的资源比进程的资源小。
    

    死锁与递归锁:

    #死锁现象
    #一个线程起来以后,运行函数F1内容(去抢A锁,然后去抢B锁,然后释放B锁,释放A锁),然后运行F2的程序(先抢B锁,然会抢A锁,等待一小下,然后释放A锁,释放B锁) #同时起了20个线程,当一个线程起来后,运行了A过程,其他线程抢不到A锁,当释放了A锁后,运行B过程,这时候抢到了B锁。其他线程抢到了A锁,第一个线程需要A锁,其他进程需要B锁,就会造成死锁现象。
    from threading import Thread,Lock,RLock import time mutexA=Lock() mutexB=Lock() class Mythread(Thread): def run(self): self.f1() self.f2() def f1(self): mutexA.acquire() print('33[45m%s 抢到A锁33[0m' %self.name) mutexB.acquire() print('33[44m%s 抢到B锁33[0m' %self.name) mutexB.release() mutexA.release() def f2(self): mutexB.acquire() print('33[44m%s 抢到B锁33[0m' %self.name) time.sleep(1) mutexA.acquire() print('33[45m%s 抢到A锁33[0m' %self.name) mutexA.release() mutexB.release() if __name__ == '__main__': for i in range(20): t=Mythread() t.start()

    解决死锁的问题可以利用递归锁:

    #递归锁   RLock
    #RLock的运行原理:当锁一旦acquire的话,计数器+1,当锁release的话,计数器就减一。
    #其他锁想抢占锁的时候,就必须得等到锁的计数器减到0才能抢占,这样就避免了死锁现象的发生。2017-09-06 from threading import Thread,Lock,RLock import time mutex=RLock() class Mythread(Thread): def run(self): self.f1() self.f2() def f1(self): mutex.acquire() print('33[45m%s 抢到A锁33[0m' %self.name) mutex.acquire() print('33[44m%s 抢到B锁33[0m' %self.name) mutex.release() mutex.release() def f2(self): mutex.acquire() print('33[44m%s 抢到B锁33[0m' %self.name) time.sleep(1) mutex.acquire() print('33[45m%s 抢到A锁33[0m' %self.name) mutex.release() mutex.release() if __name__ == '__main__': for i in range(20): t=Mythread() t.start()

    信号量:

    (本质也是一把锁,就好像是一个公共厕所,可以进多个人)

    可以让有数量的进程一起运行。

    from threading import Thread,current_thread,Semaphore
    import time,random
    #current_thread    得到线程的名称
    #Semaphore    指定线程的最大运行数量函数
    
    
    #定义最多允许5个进程同时运行
    sm=Semaphore(5)
    def work():               
        sm.acquire()
        print('%s 上厕所' %current_thread().getName())
        time.sleep(random.randint(1,3))
        sm.release()
    
    if __name__ == '__main__':
        for i in range(20):
            t=Thread(target=work)
            t.start()
    

    信号量的执行是你先创造几把锁,这样就定义了同时能够运行的最大数量。

    当有一个线程(或进程)运行了以后,拿走一个锁,count+1.当线程(或线程)运行完毕以后,count-1.

    这时候其他的进程或者线程才能够再去抢这把锁。


    事件Event:

    一共两个进程(或者进程),第二个进程的执行需要得到第一个的执行结果才能执行。

    例如说连接数据库需要两个进程,一个检测数据库是否可以连接,一个连接。from threading import Thread,current_thread,Evenimport timeevent=Event()


    def conn_mysql(): count=1 while not event.is_set(): if count > 3: raise ConnectionError('链接失败') print('%s 等待第%s次链接mysql' %(current_thread().getName(),count)) event.wait(0.5)
    #这里设置的wait是等下面check_mysql的event设置为True count
    +=1 print('%s 链接ok' % current_thread().getName()) def check_mysql(): print('%s 正在检查mysql状态' %current_thread().getName()) time.sleep(1)
    #这里是将event设置为True
    event.set() if __name__ == '__main__': t1=Thread(target=conn_mysql) t2=Thread(target=conn_mysql) check=Thread(target=check_mysql) t1.start() t2.start() check.start()

    定时器:

    from threading import Timer
    
    def hello(n):
        print("hello, world",n)
    
    t = Timer(3, hello,args=(11,))
    t.start()  # after 1 seconds, "hello, world" will be printed
    #定义了以后在几秒以后执行

    线程queue:

    针对于线程,不针对于进程

    import queue
    #这个是针对于线程的

    # q=queue.Queue(3) #队列:先进先出 # q.put(1) # q.put(2) # q.put(3) # # print(q.get()) # print(q.get()) # print(q.get()) # q=queue.LifoQueue(3) #堆栈:后进先出 # q.put(1) # q.put(2) # q.put(3) # # print(q.get()) # print(q.get()) # print(q.get()) q=queue.PriorityQueue(3) #数字越小优先级越高
    #前面放的是优先级,数字越小优先级越高
    q.put((10,'data1')) q.put((11,'data2')) q.put((9,'data3')) print(q.get()) print(q.get()) print(q.get())

    进程池与线程池:

    这个模块提高了一个更高级的接口来异步调用。

    异步调用就是将一堆任务直接丢到进程池或者线程池,不用等待执行完成。

    同步调用是将任务丢到进程池或者线程池,还要等待他的执行结果。

    线程池:

    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    
    #引用线程池和进程池函数
    from threading import current_thread import os,time,random def work(n): print('%s is running' %current_thread().getName()) time.sleep(random.randint(1,3)) return n**2 if __name__ == '__main__': p=ThreadPoolExecutor()
    #生成一个线程池 objs
    =[] for i in range(21): obj=p.submit(work,i)
    #将所有线程提交到线程池 objs.append(obj) p.shutdown()
    #等待所有线程结束

    #打印线程池内的线程
    for obj in objs: print(obj.result())

    进程池:

    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    from threading import current_thread
    import os,time,random
    def work(n):
        print('%s is running' %current_thread().getName())
        time.sleep(random.randint(1,3))
        return n**2
    
    if __name__ == '__main__':
        p=ThreadPoolExecutor()
        objs=[]
        for i in range(21):
            obj=p.submit(work,i)
            objs.append(obj)
        p.shutdown()
        for obj in objs:
            print(obj.result())

    模拟IO密集型操作,看看是进程池的程序好还是线程池的程序好:

    模拟下载网页的过程:(IO需要等待下载)

    进程池:

     1 进程池
     2 import requests #pip3 install requests
     3 import os,time
     4 from multiprocessing import Pool
     5 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
     6 def get_page(url):
     7     print('<%s> get :%s' %(os.getpid(),url))
     8     respone = requests.get(url)
     9     if respone.status_code == 200:
    10         return {'url':url,'text':respone.text}
    11 
    12 def parse_page(obj):
    13     dic=obj.result()
    14     print('<%s> parse :%s' %(os.getpid(),dic['url']))
    15     time.sleep(0.5)
    16     res='url:%s size:%s
    ' %(dic['url'],len(dic['text'])) #模拟解析网页内容
    17     with open('db.txt','a') as f:
    18         f.write(res)
    19 
    20 
    21 if __name__ == '__main__':
    22 
    23     # p=Pool(4)
    24     p=ProcessPoolExecutor()
    25     urls = [
    26         'http://www.baidu.com',
    27         'http://www.baidu.com',
    28         'http://www.baidu.com',
    29         'http://www.baidu.com',
    30         'http://www.baidu.com',
    31         'http://www.baidu.com',
    32         'http://www.baidu.com',
    33     ]
    34 
    35 
    36     for url in urls:
    37         # p.apply_async(get_page,args=(url,),callback=parse_page)
    38         p.submit(get_page,url).add_done_callback(parse_page)
    39 
    40     p.shutdown()
    41     print('主进程pid:',os.getpid())

    线程池:

     1 线程池
     2 import requests #pip3 install requests
     3 import os,time,threading
     4 from multiprocessing import Pool
     5 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
     6 def get_page(url):
     7     print('<%s> get :%s' %(threading.current_thread().getName(),url))
     8     respone = requests.get(url)
     9     if respone.status_code == 200:
    10         return {'url':url,'text':respone.text}
    11 
    12 def parse_page(obj):
    13     dic=obj.result()
    14     print('<%s> parse :%s' %(threading.current_thread().getName(),dic['url']))
    15     time.sleep(0.5)
    16     res='url:%s size:%s
    ' %(dic['url'],len(dic['text'])) #模拟解析网页内容
    17     with open('db.txt','a') as f:
    18         f.write(res)
    19 
    20 
    21 if __name__ == '__main__':
    22 
    23     # p=Pool(4)
    24     p=ThreadPoolExecutor(3)
    25     urls = [
    26         'http://www.baidu.com',
    27         'http://www.baidu.com',
    28         'http://www.baidu.com',
    29         'http://www.baidu.com',
    30         'http://www.baidu.com',
    31         'http://www.baidu.com',
    32         'http://www.baidu.com',
    33     ]
    34 
    35 
    36     for url in urls:
    37         # p.apply_async(get_page,args=(url,),callback=parse_page)
    38         p.submit(get_page,url).add_done_callback(parse_page)
    39 
    40     p.shutdown()
    41     print('主进程pid:',os.getpid())
    View Code
  • 相关阅读:
    信息安全系统设计基础第八周期中复习总结
    layui下各种富文本的冲突情况
    TP3.2+find_set_in 以及 find_set_in和like的区别
    tp5+linux+apache php7.1.30环境下,上传图片报错:mkdir():permission denied
    一次基于老古董thinkPHP3.1的修改尝试
    微信网页开发 thinkphp5.0的try-catch和重定向
    CentOS 7.2下服务器配置(linux+apache+php+mysql)
    微信小程序踩坑(不定时更新)
    PHP 定时自动执行代码
    PHP TP5 文章评论+积分+签到
  • 原文地址:https://www.cnblogs.com/sexiaoshuai/p/7472687.html
Copyright © 2011-2022 走看看