zoukankan      html  css  js  c++  java
  • python之线程

    什么是线程?

    线程是CPU上的执行单位。

    线程和进程的区别

    1、进程是资源的集合,是一个资源单位。线程是CPU上是执行单位。所以开进程开销与远大于开线程

    2、进程单独开辟内存空间。同一个进程内多线程共享同一个内存空间

    a = 100
    
    
    def task():
        global a
        a = 0
        
        
    if __name__ == '__main__':
        # p = Process(target=task, )
        # p.start()
        # p.join()
        # print(a)  # 100
        t = Thread(target=task, )
        t.start()
        t.join()
        print(a)  # 0
    

    3、开多个进程,每个进程有不同的pid。在主进程下开启多个线程,每个线程的pid和主进程的pid一样

    Thread对象的其他用法

    from threading import Thread, current_thread, active_count, enumerate
    import time
    
    
    def task():
        print("子进程", current_thread().getName(), current_thread().name)
        # current_thread().getName() == current_thread().name
        time.sleep(2)
        
    
    if __name__ == '__main__':
        t = Thread(target=task, name="sb")
        t.start()
        print("子线程", t.getName())
        t.setName("我是子线程")  # 设置进程名
        print("子线程", t.name)  # t.name == t.getName() 查看进程名
        # t.join()
        print(t.is_alive())  # 查看进程是否活动
        # t.is_alive() == t.isAlive
        print(active_count())  # 返回正在运行的线程数量,
        print(len(enumerate()))
        # 与len(threading.enumerate())有相同的结果。
    

    练习

    from threading import Thread
    import time
    
    
    def foo():
        print("123")
        time.sleep(1)
        print("end123")
        
    
    def bar():
        print(456)
        time.sleep(3)
        print("end 456")
        
        
    if __name__ == '__main__':
        t1 = Thread(target=foo,)
        t1.daemon = True
        t2 = Thread(target=bar)
        t1.start()
        t2.start()
        print("zhu")
        
    """
    123
    456
    zhu
    end123
    end 456
    """
    

    线程互斥锁

    不加锁

    from threading import Thread
    import time
    n = 100
    
    
    def task():
        global n
        temp = n
        time.sleep(0.1)
        n = temp - 1
    
       
    if __name__ == '__main__':
        t_l = []
        for i in range(10):
            t = Thread(target=task)
            t_l.append(t)
            t.start()
        
        for t in t_l:
            t.join()
        print("zhu", n) # 99
    

    加了锁

    from threading import Thread, Lock
    import time
    
    n = 100
    
    
    def task():
        global n
        mutex.acquire()
        temp = n
        time.sleep(0.1)
        n = temp - 1
        mutex.release()
    
    
    if __name__ == '__main__':
        mutex = Lock()
        t_l = []
        for i in range(10):
            t = Thread(target=task)
            t_l.append(t)
            t.start()
        
        for t in t_l:
            t.join()
        print("zhu", n)  # 90
    

    GIL全局解释器锁

    注意:

    GIL并不是Python的特性,Python完全可以不依赖于GIL。它是在实现Python解析器(CPython)时所引入的一个概念。

    什么是GIL?

    全局解释器锁,GIL本质就是一把互斥锁。解释器级别的一把互斥锁。

    GIL和互斥锁的区别

    GIL 与Lock是两把锁,保护的数据不一样,前者是解释器级别的(当然保护的就是解释器级别的数据,比如垃圾回收的数据),后者是保护用户自己开发的应用程序的数据。

    验证python test.py只会产生一个进程

    #test.py内容
    import os,time
    print(os.getpid())
    time.sleep(1000)
    
    #打开终端执行
    python3 test.py
    
    #在windows下查看
    tasklist |findstr python
    
    #在linux下下查看
    ps aux |grep python
    

    https://www.luffycity.com/python-book/73-duo-jin-cheng-shi-xian/746-gilquan-ju-jie-shi-qi-suo.html

    信号量

    互斥锁 同时只允许一个线程更改数据,而信号量(Semaphore)是同时允许一定数量的线程更改数据 任务拿到锁去执行。

    from threading import Thread,Semaphore
    import threading
    import time
    
    def func():
        sm.acquire()
        print('%s get sm' %threading.current_thread().getName())
        time.sleep(3)
        sm.release()
    
    if __name__ == '__main__':
        sm=Semaphore(5)
        for i in range(23):
            t=Thread(target=func)
            t.start()
    

    Event事件

    对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。

    from threading import Event
    
    event.is_set():返回event的状态值;
    
    event.wait():如果 event.isSet()==False将阻塞线程;
    
    event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
    
    event.clear():恢复event的状态值为False。
    
    

    代码实例

    from threading import Thread,Event
    import threading
    import time,random
    
    
    def conn_mysql():
        count=1
        while not event.is_set():
            if count > 3:
                raise TimeoutError('链接超时')
            print('<%s>第%s次尝试链接' % (threading.current_thread().getName(), count))
            event.wait(0.5)
            count += 1
        print('<%s>链接成功' % threading.current_thread().getName())
    
    
    def check_mysql():
        print('33[45m[%s]正在检查mysql33[0m' % threading.current_thread().getName())
        time.sleep(random.randint(2, 4))
        event.set()
        
        
    if __name__ == '__main__':
        event=Event()
        conn1=Thread(target=conn_mysql)
        conn2=Thread(target=conn_mysql)
        check=Thread(target=check_mysql)
    
        conn1.start()
        conn2.start()
        check.start()
    

    线程queue

    定时器

    from threading import Timer
    
    def hello():
        print("hello, world")
    
    t = Timer(1, hello)
    t.start()  
    

    多线程FTP

    服务端

    from threading import Thread
    import socket
    
    
    def communicate(conn):
        while True:
            try:
                data = conn.recv(1024).decode()
                if not data:
                    break
                print(data)
                conn.send(data.upper().encode())
            except ConnectionRefusedError as e:
                print(e)
                break
             
           
    def server(ip, port):
        server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        server.bind((ip, port))
        server.listen(5)
        while True:
            conn, addr = server.accept()
            t = Thread(target=communicate, args=(conn,))
            t.start()
        server.close()
    
    
    if __name__ == '__main__':
        server('127.0.0.1', 9999)
    

    线程池&进程池

    官网:https://docs.python.org/dev/library/concurrent.futures.html
    
    concurrent.futures模块提供了高度封装的异步调用接口
    ThreadPoolExecutor:线程池,提供异步调用
    ProcessPoolExecutor: 进程池,提供异步调用
    

    基本方法

    、submit(fn, *args, **kwargs)
    异步提交任务
    
    2、map(func, *iterables, timeout=None, chunksize=1) 
    取代for循环submit的操作
    
    3、shutdown(wait=True) 
    相当于进程池的pool.close()+pool.join()操作
    wait=True,等待池内所有任务执行完毕回收完资源后才继续
    wait=False,立即返回,并不会等待池内的任务执行完毕
    但不管wait参数为何值,整个程序都会等到所有任务执行完毕
    submit和map必须在shutdown之前
    
    4、result(timeout=None)
    取得结果
    
    5、add_done_callback(fn)
    回调函数
    

    用法

    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    
    import os,time,random
    def task(n):
        print('%s is runing' %os.getpid())
        time.sleep(random.randint(1,3))
        return n**2
    
    if __name__ == '__main__':
    
        executor=ProcessPoolExecutor(max_workers=3)
    
        futures=[]
        for i in range(11):
            future=executor.submit(task,i)
            futures.append(future)
        executor.shutdown(True)
        print('+++>')
        for future in futures:
            print(future.result())
    

    爬虫小练习

    from concurrent.futures import ThreadPoolExecutor
    import requests
    import time
    
    def get(url):
        print('GET %s' %url)
        response=requests.get(url)
        time.sleep(3)
        return {'url':url,'content':response.text}
    
    
    def parse(res):
        res=res.result()
        print('%s parse res is %s' %(res['url'],len(res['content'])))
    
    
    if __name__ == '__main__':
        urls=[
            'http://www.cnblogs.com/linhaifeng',
            'https://www.python.org',
            'https://www.openstack.org',
        ]
    
        pool=ThreadPoolExecutor(2)
    
        for url in urls:
            pool.submit(get,url).add_done_callback(parse) 
    

    基于进程池实现FTP

    服务端

    from socket import *
    from concurrent.futures import ThreadPoolExecutor
    
    def communicate(conn):
        while True:
            try:
                data=conn.recv(1024)
                if not data:break
                conn.send(data.upper())
            except ConnectionResetError:
                break
    
        conn.close()
    
    def server(ip,port):
        server = socket(AF_INET, SOCK_STREAM)
        server.bind((ip,port))
        server.listen(5)
    
        while True:
            conn, addr = server.accept()
            pool.submit(communicate,conn)
    
        server.close()
    
    if __name__ == '__main__':
        pool=ThreadPoolExecutor(2)
        server('127.0.0.1', 8081)
    

    同步与异步

    同步调用:提交完任务后,就在原地等待任务执行完毕,拿到结果,再执行下一行代码,导致程序是串行执行

    def la(name):
        print('%s is laing' % name)
        time.sleep(random.randint(3, 5))
        res=random.randint(7, 13)*'#'
        return {'name': name, 'res': res}
    
    
    def weigh(shit):
        name = shit['name']
        size = len(shit['res'])
        print('%s 拉了 《%s》kg' %(name,size))
    
    
    if __name__ == '__main__':
        pool = ThreadPoolExecutor(13)
    
        shit1 = pool.submit(la, 'alex').result()
        weigh(shit1)
    
        shit2 = pool.submit(la, 'wupeiqi').result()
        weigh(shit2)
    
        shit3 = pool.submit(la, 'yuanhao').result()
        weigh(shit3)
    
    

    异步调用:提交完任务后,不地等待任务执行完毕。

    from concurrent.futures import ThreadPoolExecutor
    import time
    import random
    
    def la(name):
        print('%s is laing' %name)
        time.sleep(random.randint(3,5))
        res=random.randint(7,13)*'#'
        return {'name':name,'res':res}
    
    
    def weigh(shit):
        shit=shit.result()
        name=shit['name']
        size=len(shit['res'])
        print('%s 拉了 《%s》kg' %(name,size))
    
    
    if __name__ == '__main__':
        pool=ThreadPoolExecutor(13)
    
        pool.submit(la,'alex').add_done_callback(weigh)
    
        pool.submit(la,'wupeiqi').add_done_callback(weigh)
    
        pool.submit(la,'yuanhao').add_done_callback(weigh)
    
  • 相关阅读:
    django 笔记4 数据库操作
    html关于不换行代码
    之前搭建的jenkins的一些笔记
    pip报错
    ssh 免密及加密远程脚本实现
    今天了解了些redis和memcached的知识
    django 笔记3
    来选择一款适合你网站的CMS建站程序吧
    如何预防和检测网页挂马?
    网页挂马方式
  • 原文地址:https://www.cnblogs.com/Jason-lin/p/8486047.html
Copyright © 2011-2022 走看看