zoukankan      html  css  js  c++  java
  • python之路 -- 并发编程之线程

    进程 是 最小的内存分配单位

    线程 是 操作系统调度的最小单位

    线程直接被CPU执行,进程内至少含有一个线程,也可以开启多个线程

    开启一个线程所需要的时间要远远小于开启一个进程

    GIL锁(即全局解释器锁) 锁的是线程

    在Cpython解释器下的python程序 在同一时刻 多个线程中只能有一个线程被CPU执行

    1.创建线程的两中方式:

    import time 
    from threading import Thread
    def func(args):
        time.sleep(1)
        print(args)
    
    t = Thread(target=func,args=(10,))
    # 传入的参数也必须以元组的形式传
    t.start()
    
    # 创建线程的另一种方式(使用面向对象创建线程)
    import time 
    from threading import Thread
    class MyTread(Thread):
        def __init__(self,arg):
            super().__init__()    # 调用父类的__init__
            self.arg = arg
        def run(self):        # 方法名必须是run
            time.sleep(1)
            print(self.arg)
    
    t = MyTread(10)
    t.start()

    2.线程与进程效率的比较

    import time
    from threading import Thread
    from multiprocessing import Process
    def func(n):
        n + 1
    
    if __name__ == '__main__':
        start1 = time.time()
        t_lst1 = []
        for i in range(100):
            t = Thread(target=func,args=(i,))
            t.start()
            t_lst1.append(t)
        for t in t_lst1:t.join()
        t1 = time.time() - start1
    
        start2 = time.time()
        t_lst2 = []
        for i in range(100):
            t = Process(target=func, args=(i,))
            t.start()
            t_lst2.append(t)
        for t in t_lst2: t.join()
        t2 = time.time() - start2
        print(t1,t2)
    
    输出的结果为:
    0.02698826789855957 10.160863876342773
    
    # 可见开启一个线程所需要的时间要远远小于开启一个进程

    多线程用于IO密集型,如socket,爬虫,处理web请求,读写数据库;

    多进程用于计算密集型,如金融分析。

    3.多进程与多线程数据共享的比较:

    多个线程内部有自己的数据栈,线程内部数据不共享

    全局变量在多个线程之间是共享的(线程之间资源共享)

    多进程修改全局变量
    import os,time
    from multiprocessing import Process
    from threading import Thread
    
    def func():
        global g
        g -= 10
        print(g,os.getpid())
    
    g = 100   # 全局变量g
    t_lst = []
    if __name__ == "__main__":
        for i in range(5):
            t = Process(target=func)
            t.start()
            t_lst.append(t)
        for t in t_lst: t.join()
        print(g)
    """输出结果为:
    90 143648
    90 160300
    90 143316
    90 160804
    90 159348
    100
    """
        
    多线程修改全局变量
    import os,time
    from multiprocessing import Process
    from threading import Thread
        
    for i in range(5):
        t = Thread(target=func)
        t.start()
        t_lst.append(t)
    for t in t_lst : t.join()
    print(g)
    """输出的结果为;
    90 161716
    80 161716
    70 161716
    60 161716
    50 161716
    50
    """

    4.线程模块中的其他方法:

    1、Thread实例对象的方法(t为实例化的一个线程对象)
      t.isAlive():返回线程是否存活。
      t.getName():返回线程名。
      t.setName():设置线程名。

    2、threading模块提供的一些方法:
      threading.active_count():查看程序中存在多少个活着的线程,与len(threading.enumerate())有相同的结果。
      threading.enumerate(): 查看程序中的所有线程的线程名和线程号(列表的形式列出每个线程)。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
      threading.current_thread():查看当前线程的线程名和线程号

    3.实例:

    例子:
    import time
    import threading
    
    def func(n):
        time.sleep(0.5)
        print(n,t.getName(),threading.get_ident(),t.isAlive())
    # 打印线程名,线程号,判断线程是否在活动
    
    # 当一个线程执行完成func之后就结束了,t.isAlive()查看为False
    
    for i in  range(5):
        t = threading.Thread(target=func,args=(i,))
        t.start()
        t.join()
    
    print(threading.active_count())    # 查看程序中存在多少个活着的线程
    print(threading.current_thread())   # 查看当前线程的线程名和线程号
    print(threading.enumerate())        # 查看程序中的所有线程的线程名和线程号(列表的形式列出每个线程)
    
    """执行结果为:
    0 Thread-1 33164 True
    1 Thread-2 57836 True
    2 Thread-3 165280 True
    3 Thread-4 171196 True
    4 Thread-5 119008 True
    1
    <_MainThread(MainThread, started 165276)>
    [<_MainThread(MainThread, started 165276)>]
    "
    View Code

    5.守护线程

    守护进程与守护线程的区别:

      1.守护进程随着主进程代码的执行结束而结束
      2.守护线程会在主线程结束之后再等待其他子线程结束之后才结束

    from threading import Thread
    import time
    
    def func():
        print('我是守护线程')
        time.sleep(0.5)
        print('守护线程结束了')
    
    def func2():
        print('我是线程2,非守护线程')
        time.sleep(2)
        print('func2执行完了')
    
    t = Thread(target=func)
    t.daemon = True        # 仍是加在start()前    
    t.start()
    t2 = Thread(target=func2)
    t2.start()
    
    print('我是主线程')
    t2.join()
    print('主线程结束了。')
    
    """执行结果为:
    我是守护线程
    我是线程2,非守护线程
    我是主线程
    守护线程结束了
    func2执行完了
    主线程结束了。
    """
    
    # 主进程在执行完自己的代码之后不会立即结束 而是等待子进程结束之后 再结束。然后回收子进程的资源
    实例

    6.线程锁

    1.不加锁

    from threading import Thread
    import time
    def func():
        global n
        time.sleep(0.5)
        n-=1
    
    n = 10
    t_list = []
    for i in range(10):
        t = Thread(target=func)
        t.start()
        t_list.append(t)
    
    [t.join() for i in t_list]
    print(n)
    # 输出结果n为9
    # 创建线程非常快,创建的10个线程都拿到了global的n,此时n为10,然后各线程都做-1操作,得到的结都为9
    不加锁

    2.加锁

    from threading import Thread
    from threading import Lock
    
    def func(lock):
        global n
        lock.acquire()
        n-=1
        lock.release()
    
    lock = Lock()
    n = 10
    t_list = []
    for i in range(10):
        t = Thread(target=func,args=(lock,))
        t.start()
        t_list.append(t)
    
    [t.join() for i in t_list]
    print(n)
    # 输出的结果为0
    通过加锁后保证了多线程数据安全,但损失了效率
    加锁

    互斥锁Lock:在同一个线程或者进程之间,当有两个acquire的时候,就会产生阻塞(死锁)

    递归锁RLock:在同一个线程或则进程之间,无论acquire多少次都不会产生阻塞(死锁)

    3.多人吃面事例

    多人吃面事例:
    # 有多个人在一个桌子上吃面,只有一份面和一个叉子,当一个人同时拿到了这两样才能完成一次吃面的动作。
    from threading import Thread,Lock
    import time
    
    def eat1(name):
        noodle_lock.acquire()
        print('%s拿到面条了'%name)
        fork_lock.acquire()
        print('%s拿到叉子了'%name)
        print('%s吃面'%name)
        noodle_lock.release()
        fork_lock.release()
    
    def eat2(name):
        fork_lock.acquire()
        print('%s拿到叉子了'%name)
        time.sleep(1)
        noodle_lock.acquire()
        print('%s拿到面条啦'%name)
        print('%s吃面'%name)
        noodle_lock.release()
        fork_lock.release()
    
    # 实例化2个锁,给面条和叉子加上锁
    noodle_lock = Lock()
    fork_lock = Lock()
    
    Thread(target=eat1,args=('peo1',)).start()
    Thread(target=eat2,args=('peo2',)).start()
    Thread(target=eat1,args=('peo3',)).start()
    Thread(target=eat2,args=('peo4',)).start()
    """执行结果为:
    peo1拿到面条了
    peo1拿到叉子了
    peo1吃面
    peo2拿到叉子了
    peo3拿到面条了
    “陷入阻塞”…………
    """
    多人吃面事例(使用互斥锁):
    # 加递归锁
    from threading import Thread,RLock
    import time
    
    def eat1(name):
        noodle_lock.acquire()       # 一把钥匙
        print('%s拿到面条啦'%name)
        fork_lock.acquire()
        print('%s拿到叉子了'%name)
        print('%s吃面'%name)
        fork_lock.release()
        noodle_lock.release()
    
    def eat2(name):
        fork_lock.acquire()
        print('%s拿到叉子了'%name)
        time.sleep(1)
        noodle_lock.acquire()
        print('%s拿到面条啦'%name)
        print('%s吃面'%name)
        noodle_lock.release()
        fork_lock.release()
    
    fork_lock = noodle_lock  = RLock()
    
    Thread(target=eat1,args=('peo1',)).start()
    Thread(target=eat2,args=('peo2',)).start()
    Thread(target=eat1,args=('peo3',)).start()
    Thread(target=eat2,args=('peo4',)).start()
    
    # 通过加递归锁实现了数据安全
    # 递归锁一般用于有多个数据需要加锁的时候,不会出现数据安全问题
    多人吃面事例(使用递归锁) 

    对于有多个数据需要加锁的时候,用互斥锁仍然会出现数据不安全问题

    当多线程是对一个数据处理的时候通过给这个数据加上互斥锁实现了数据安全。

    但是当有多个数据被多线程调用处理的时候,加互斥锁仍然会出现数据安全问题。这时候需要用到递归锁,给多个数据加锁。

    7.线程信号量

    # 线程信号量
    # 相当于加一个锁,但此锁可以根据自己需要设置有多个钥匙,此时可以允许有多个线程同时去做操作
    from threading import Thread,Semaphore
    import time
    
    def func(sem,n):
        sem.acquire()
        time.sleep(0.5)
        print(n,end=' ')
        sem.release()
    
    sem = Semaphore(4)
    for i in range(10):
        t = Thread(target=func,args=(sem,i))
        t.start()
    
    """输出的结果为:
    2 3 1 0 5 4 7 6 8 9     # (4个一组基本同时输出)
    """

    8.线程事件

    # 事件被创建的时候
    # False状态
        # wait() 阻塞
    # True状态
        # wait() 非阻塞
    # clear 设置状态为False
    # set  设置状态为True
    #  起两个线程
    #  第一个线程 : 连接数据库
            # 等待一个信号 告诉我我们之间的网络是通的
            # 连接数据库
    #  第二个线程 : 检测与数据库之间的网络是否连通
            # time.sleep(0,2) 2
            # 将事件的状态设置为True
    from threading import Thread,Event
    import time,random
    
    def connect_db(e):
        count = 0
        while count < 3:
            e.wait(0.5) # 状态为False的时候,只等待0.5秒就结束
            if e.is_set() == True:
                print('连接数据库')
                break
            else:
                count += 1
                print('第%s次连接失败' %count)
        else:
            raise TimeoutError('数据库连接超时')
    
    def check_web(e):
        time.sleep(random.randint(0, 3))
        e.set()
    
    e = Event()
    t1 = Thread(target=connect_db, args=(e,))
    t2 = Thread(target=check_web, args=(e,))
    t1.start()
    t2.start()

    9.线程条件

    # notify(int数据类型)  造钥匙
    
    from threading import Thread
    from threading import Condition
    def func(con,i):
        con.acquire()
        con.wait() # 等钥匙
        print('在第%s个循环里'%i)
        con.release()
    con = Condition()
    for i in range(10):
        Thread(target=func,args = (con,i)).start()
    while True:
        num = int(input('>>>'))
        con.acquire()
        con.notify(num)  # 造钥匙
        con.release()

    输出结果:

    10.线程定时器

    定时器,即指定n秒后执行某操作
    
    import time
    from threading import Timer
    def func():
        print('时间同步')   #1-3
    
    while True:
        time.sleep(1)
        t = Timer(5,func).start()   # 非阻塞的
        # 定时等待5秒之后就调用func
        # time.sleep(5)
    # 自动更新验证码
    from threading import Thread,Timer
    import random
    
    class Check_number:
        def __init__(self):
            self.cash_code() # 程序一开始便实例化一个验证码
        def make_code(self,n=4):
            res = ''
            for i in range(n):
                s1 = str(random.randint(0,9)) # 0到9间的任意自然数
                s2 = chr(random.randint(65,90)) # 24个小写字母
                res += random.choice([s1,s2]) # 字符和数字的任意组合
            return res
        def cash_code(self,interval=3):
            self.code = self.make_code()  # 实例化一个验证码
            print(self.code) # 打印验证码
            self.t = Timer(interval,self.make_code) # 定时器,等待指定时间再运行
            self.t.start()
    
        def check(self):
            while True:
                mes = input('输入验证码>>>:').strip()
                if self.code == mes.upper():
                    print('输入正确!')
                    self.t.cancel() # 关闭定时器
                    break
    
    obj = Check_number()
    obj.check()
    自动更新验证码

    11.线程队列

    import queue
    q1 = queue.Queue()  # 队列 先进先出
    q2 = queue.LifoQueue()  # 栈 先进后出
    q3 = queue.PriorityQueue()  # 优先级队列
    
    q.put()     # 往队列中放入值,当队列满的时候陷入等待状态,直到队列可以放入值的时候放值
    q.get()     # 从队列取值,当队列为空的时候陷入等待状态,直到队列中有值的时候取值
    q.put_nowait()  # 往队列中放入值,当队列满的时候,直接报错
    q.get_nowait()  # 从队列取值,当队列为空的时候,直接报错
    优先级队列事例:
    优先级队列事例:
    q = queue.PriorityQueue()  # 优先级队列
    q.put((20,'a'))
    q.put((10,'b'))
    q.put((-5,'d'))
    q.put((1,'e'))
    q.put((1,'f'))
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get())
    
    """执行结果为:
    (-5, 'd')
    (1, 'e')
    (1, 'f')
    (10, 'b')
    """
    
    # 此队列按照值的优先级的顺序来取值,优先级数越小越先取值,
    # 当优先级一样的时候,就比较值的ASCII值的大小,小的先取

    12.线程池

    import time
    from concurrent.futures import ThreadPoolExecutor
    def func(n):
        time.sleep(1)
        print(n,end=" ")
        return 2*n
    
    def call_back(m):   # m接收的为func中return的值
        print('结果是 %s'%m.result())  # 从对象中获取值的方法.result(),进程池中是.get()
    
    tpool = ThreadPoolExecutor(max_workers=5)   #  默认 不要超过cpu个数*5
    
    for i in  range(10):
        tpool.submit(func,i).add_done_callback(call_back)
    
    """执行结果为:
    4 结果是 8
    2 结果是 4
    3 结果是 6
    1 结果是 2
    0 结果是 0
    9 结果是 18
    8 结果是 16
    6 结果是 12
    5 结果是 10
    7 结果是 14
    """
    
    # tpool.map(func,range(10))  # 拿不到返回值
    t_lst = []
    for i in  range(10):
        t = tpool.submit(func,i)
        t_lst.append(t)
    tpool.shutdown()  # shutdown相当于close+join
    print('主线程')
    for t in t_lst:print('*',t.result(),end=' ')
    
    """执行结果为:
    4 2 3 1 0 9 8 7 6 5 主线程
    * 0 * 2 * 4 * 6 * 8 * 10 * 12 * 14 * 16 * 18
    """

    13.多线程实现socket聊天

      server 端

    import socket
    from threading import Thread
    
    def chat(sk):
        conn, addr = sk.accept()
        conn.send("hello,我是服务端".encode('utf-8'))
        msg = conn.recv(1024).decode('utf-8')
        print(msg)
        conn.close()
    
    sk = socket.socket()
    sk.bind(('127.0.0.1',8080))
    sk.listen()
    while True:
        t = Thread(target=chat,args=(sk,))
        t.start()
    sk.close
    server端

      client 端

    # client端
    import socket
    sk = socket.socket()
    sk.connect(('127.0.0.1',8080))
    
    msg = sk.recv(1024).decode('utf-8')
    print(msg)
    info = input('>>>').encode('utf-8')
    sk.send(info)
    sk.close()
    client端
  • 相关阅读:
    vue双向数据绑定原理
    vue-router原理
    CSS选择器优化
    静态资源的渲染阻塞
    什么是Base64?Base64的原理是什么?
    OAuth的解释
    AMD与CMD的区别
    AMD的实现
    组件通信的几种方式
    关于istream_iterator<int>(cin)和istream_iterator<int>()的一点分析
  • 原文地址:https://www.cnblogs.com/aberwang/p/9460121.html
Copyright © 2011-2022 走看看