zoukankan      html  css  js  c++  java
  • day42 Pyhton 并发编程05

    一.内容回顾

    # 线程
    # 正常的编程界:
        # 进程
            # 计算机中最小的资源分配单位
            # 数据隔离
            # 进程可以独立存在
            # 创建与销毁 还有切换 都慢 给操作系统压力大
       # 线程
            # 计算机中能被CPU调度的最小单位
            # 同一个进程中的多个线程资源共享
            # 线程必须依赖进程存在
            # 创建与销毁 还有切换 都比进程快很多
    # Cpython解释器下
        # GIL 全局解释器锁
        # 保证了同一时刻下只有一个线程可以被CPU操作
    
    # threading模块
    # 创建子线程 Thread类
        # start 开启子线程
        # join  阻塞等待子线程结束
        # setDeamon 设置守护线程
            # 会等待主线程结束(包含所有非守护的子线程)之后守护线程才结束
    # currentthread,enumerate,activecount
    # 查看当前线程,所有的线程对象组成的列表,列表的长度
    # from threading import Thread,currentThread
    # def func():
    #     print(currentThread())
    # for i in range(10):
    #     Thread(target=func).start()

    今日内容

    # 1.锁   ****
        # 互斥锁
        # 死锁现象
        # 递归锁
    # 2.其他模型 **
        # 信号量
        # 事件
        # 条件
        # 定时器
    # 3.线程队列 ****
    # 4.线程池模块 ****
    # 锁是用来做什么的?
    # 保证数据的安全的
    # GIL是干什么的?
    # 全局解释器锁线程
    # 有了GIL还要锁干啥?
    # 有了GIL还是会出现数据不安全的现象,所以还是要用锁
    # import time
    # from threading import Thread,Lock
    # n = 100
    # def func(lock):
    #     global n
    #     # n -= 1
    #     with lock:
    #         tmp = n-1  # n-=1
    #         time.sleep(0.1)
    #         n = tmp
    #
    # if __name__ == '__main__':
    #     l = []
    #     lock = Lock()
    #     for i in range(100):
    #         t = Thread(target=func,args=(lock,))
    #         t.start()
    #         l.append(t)
    #     for t in l:t.join()
    #     print(n)
    dis模块使用
    import dis
    n = 1
    def func():
        n = 100
        n -= 1
    
    dis.dis(func)
    
    # 会出现线程不安全的两个条件
    # 1.是全局变量
    # 2.出现 += -=这样的操作
    
    
    # 列表 字典
    # 方法 l.append l.pop l.insert dic.update 都是线程安全的
    # l[0] += 1
    # d[k] += 1
    死锁现象
    # 科学家吃面问题
    import time
    from threading import Thread,Lock
    # noodle_lock = Lock()
    # fork_lock = Lock()
    # 死锁不是时刻发生的,有偶然的情况整个程序都崩了
    # 每一个线程之中不止一把锁,并且套着使用
    # 如果某一件事情需要两个资源同时出现,那么不应该将这两个资源通过两把锁控制
    # 而应看做一个资源
    import time
    from threading import Thread,Lock
    noodle_lock = Lock()
    fork_lock = Lock(
    def eat1(name):
        noodle_lock.acquire()
        print('%s拿到面条了'%name)
        fork_lock.acquire()
        print('%s拿到叉子了'%name)
        print('%s开始吃面'%name)
        time.sleep(0.2)
        fork_lock.release()
        print('%s放下叉子了' % name)
        noodle_lock.release()
        print('%s放下面了' % name)
    
    def eat2(name):
        fork_lock.acquire()
        print('%s拿到叉子了' % name)
        noodle_lock.acquire()
        print('%s拿到面条了' % name)
        print('%s开始吃面' % name)
        time.sleep(0.2)
        noodle_lock.release()
        print('%s放下面了' % name)
        fork_lock.release()
        print('%s放下叉子了' % name)
    
    Thread(target=eat1,args=('alex',)).start()
    Thread(target=eat2,args=('wusir',)).start()
    Thread(target=eat1,args=('太白',)).start()
    Thread(target=eat2,args=('宝元',)).start()
    alex拿到面条了
    alex拿到叉子了
    alex开始吃面
    alex放下叉子了wusir拿到叉子了
    alex放下面了太白拿到面条了
    发生了死锁
    import time
    from threading import Thread,Lock
    lock = Lock()
    def eat1(name):
        lock.acquire()#只有一个锁,把资源都锁在一起
        print('%s拿到面条了'%name)
        print('%s拿到叉子了'%name)
        print('%s开始吃面'%name)
        time.sleep(0.2)
        lock.release()
        print('%s放下叉子了' % name)
        print('%s放下面了' % name)
    
    def eat2(name):
        lock.acquire()
        print('%s拿到叉子了' % name)
        print('%s拿到面条了' % name)
        print('%s开始吃面' % name)
        time.sleep(0.2)
        lock.release()
        print('%s放下面了' % name)
        print('%s放下叉子了' % name)
    
    Thread(target=eat1,args=('alex',)).start()
    Thread(target=eat2,args=('wusir',)).start()
    Thread(target=eat1,args=('太白',)).start()
    Thread(target=eat2,args=('宝元',)).start()
    # 先临时解决
    # 然后再找到死锁的原因,再去修改
    from threading import RLock,Lock,Thread
    # 互斥锁
        # 无论在相同的线程还是不同的线程,都只能连续acquire一次
        # 要想再acquire,必须先release
    # 递归锁
        # 在同一个线程中,可以无限次的acquire
        # 但是要想在其他线程中也acquire,
        # 必须现在自己的线程中添加和acquire次数相同的release
    rlock = RLock()
    rlock.acquire()
    rlock.acquire()
    rlock.acquire()
    rlock.acquire()
    print('锁不住')
    锁不住
    lock = Lock()
    lock.acquire()
    print('1')
    lock.acquire()
    print('2')#不能打印2
    rlock = RLock()
    def func(num):
        rlock.acquire()
        print('aaaa',num)
        rlock.acquire()
        print('bbbb',num)
        rlock.release()
        rlock.release()
    
    Thread(target=func,args=(1,)).start()
    Thread(target=func,args=(2,)).start()
    aaaa 1
    bbbb 1
    aaaa 2
    bbbb 2
    import time
    noodle_lock = fork_lock = RLock()
    def eat1(name):
        noodle_lock.acquire()
        print('%s拿到面条了'%name)
        fork_lock.acquire()
        print('%s拿到叉子了'%name)
        print('%s开始吃面'%name)
        time.sleep(0.2)
        fork_lock.release()
        print('%s放下叉子了' % name)
        noodle_lock.release()
        print('%s放下面了' % name)
    
    def eat2(name):
        fork_lock.acquire()
        print('%s拿到叉子了' % name)
        noodle_lock.acquire()
        print('%s拿到面条了' % name)
        print('%s开始吃面' % name)
        time.sleep(0.2)
        noodle_lock.release()
        print('%s放下面了' % name)
        fork_lock.release()
        print('%s放下叉子了' % name)
    
    Thread(target=eat1,args=('alex',)).start()
    Thread(target=eat2,args=('wusir',)).start()
    Thread(target=eat1,args=('太白',)).start()
    Thread(target=eat2,args=('宝元',)).start()

    alex拿到面条了

    alex拿到叉子了

    alex开始吃面

    alex放下叉子了

    alex放下面了

    wusir拿到叉子了

    wusir拿到面条了

    wusir开始吃面

    wusir放下面了

    wusir放下叉子了

    太白拿到面条了

    太白拿到叉子了

    太白开始吃面

    太白放下叉子了

    太白放下面了

    宝元拿到叉子了

    宝元拿到面条了

    宝元开始吃面

    宝元放下面了

    宝元放下叉子了

    信号量

    import time
    from threading import Semaphore,Thread
    
    def func(name,sem):
        sem.acquire()
        print(name,'start')
        time.sleep(1)
        print(name,'stop')
        sem.release()
    
    sem = Semaphore(5)
    for i in range(20):
        Thread(target=func,args=(i,sem)).start()
    # 信号量和池
    # 进程池
        # 有1000个任务
        # 一个进程池中有5个进程
        # 所有的1000个任务会多次利用这五个进程来完成任务
    # 信号量
        # 有1000个任务
        # 有1000个进程/线程
        # 所有的1000个任务由于信号量的控制,只能5个5个的执行

    事件

    from threading import Event
    # 事件
    # wait() 阻塞 到事件内部标识为True就停止阻塞
    # 控制标识
        # set
        # clear
        # is_set
    
    # 连接数据库
    import time
    import random
    from threading import Thread,Event
    def connect_sql(e):
        count = 0
        while count < 3:
            e.wait(0.5)
            if e.is_set():
                print('连接数据库成功')
                break
            else:
                print('数据库未连接成功')
                count += 1
    
    def test(e):
        time.sleep(random.randint(0,3))
        e.set()
    
    e = Event()
    Thread(target=test,args=(e,)).start()
    Thread(target=connect_sql,args=(e,)).start()

    条件

    # wait      阻塞
    # notify(n) 给信号
    # 假如现在有20个线程
    # 所有的线程都在wait这里阻塞
    # notify(n) n传了多少
    # 那么wait这边就能获得多少个解除阻塞的通知
    
    # notifyall
    # acquire
    # release
    
    import threading
    
    def run(n):
        con.acquire()
        con.wait()
        print("run the thread: %s" % n)
        con.release()
    
    if __name__ == '__main__':
    
        con = threading.Condition()
        for i in range(10):
            t = threading.Thread(target=run, args=(i,))
            t.start()
    
        while True:
            inp = input('>>>')
            if inp == 'q':
                break
            con.acquire()
            con.notify(int(inp))
            con.release()
            print('****')
    
    # 设置某个条件
    # 如果满足这个条件 就可以释放线程
    # 监控测试我的网速
    # 20000个任务
    # 测试我的网速 /系统资源
    # 发现系统资源有空闲,我就放行一部分任务

    定时器

    from threading import Timer
    
    def func():
        print('执行我啦')
    
    t = Timer(3,func)
    # 现在这个时间点我不想让它执行,而是预估一下大概多久之后它执行比较合适
    t.start()
    print('主线程的逻辑')

    队列

    import queue
    
    # 线程队列 线程之间数据安全
    q = queue.Queue(1)
    # 普通队列
    q.put(1)
    print(q.get())
    try:
        q.put_nowait(2)
    except queue.Full:
        print('您丢失了一个数据2')
    print(q.get_nowait()) # 如果有数据我就取,如果没数据不阻塞而是报错
    # 非阻塞的情况下
    q.put(10)
    print(q.get(timeout=2))
    1
    2
    10
    # 算法里 栈
    lfq = queue.LifoQueue()   #
    lfq.put(1)
    lfq.put(2)
    lfq.put(3)
    print(lfq.get())
    print(lfq.get())
    print(lfq.get())
    3
    2
    1
    # 优先级队列,是根据第一个值的大小来排定优先级的
    # ascii码越小,优先级越高
    q = queue.PriorityQueue()
    q.put((2,'a'))
    q.put((0,'c'))
    q.put((1,'b'))
    
    print(q.get())
    
    # 线程+队列 实现生产者消费者模型

    线程池

    def func(num):
        print('in %s func'%num,currentThread())
        time.sleep(random.random())
        return num**2
    
    tp = ThreadPoolExecutor(5)
    ret_l = []
    for i in range(30):
        ret = tp.submit(func,i)#运行时只有5个线程运行
        ret_l.append(ret)
    for ret in ret_l:
        print(ret.result())
    import time
    import random
    from threading import currentThread
    from concurrent.futures import ThreadPoolExecutor  as Pool
    
    import os
    def func(num):
        print('in %s func'%num,currentThread())
        # print('in %s func'%num,os.getpid())
        time.sleep(random.random())
        return num**2
    if __name__ == '__main__':
        # tp = ThreadPoolExecutor(5)
        tp = Pool(5)
        ret_l = []
        for i in range(30):
            ret = tp.submit(func,i)
            ret_l.append(ret)
        tp.shutdown()  # close + join
        for ret in ret_l:
            print(ret.result())
    # 创建一个池
    # 提交任务 submit
    # 阻塞直到任务完成(close + join) shutdown
    # 获取结果 result
    # 简便用法 map
    # 回调函数 add_done_callback
    # 简便用法 map
    import os
    def func(num):
        print('in %s func'%num,currentThread())
        # print('in %s func'%num,os.getpid())
        time.sleep(random.random())
        return num**2
    if __name__ == '__main__':
    
        # tp = ThreadPoolExecutor(5)
        tp = Pool(5)
        ret = tp.map(func,range(30))
        for i in ret:
            print(i)
    # 回调函数 add_done_callback
    def func1(num):
        print('in func1 ',num,currentThread())
        return num*'*'
    
    def func2(ret):
        print('--->',ret.result(),currentThread())
    tp = Pool(5)
    print('主 : ',currentThread())
    for i in range(10):
        tp.submit(func1,i).add_done_callback(func2)
    # 回调函数收到的参数是需要使用result()获取的
    # 回调函数是由谁执行的? 主线程
    # 相关概念
        # 进程
        # 线程
            # GIL
    # 很多模型
    #     进程 锁(递归锁 互斥锁)池(cpu的1-2倍)队列
    #     线程 锁(递归锁 互斥锁)池(cpu个数的5倍)队列  其他模型
  • 相关阅读:
    什么是线程安全和线程不安全
    C# 实现Dictionary数据对象的深度拷贝
    数据库设计三大范式
    Socket 短连接、长连接
    第二篇:MongoDB高级查询
    如何在oracle中导入导出dmp数据库文件
    以太网中的UDP编程:udp分包问题
    CocosBuilder 值得关注的一个新项目
    [转载]iPhone程序到iPad程序的移植问题
    DWR使用总结
  • 原文地址:https://www.cnblogs.com/pythonz/p/10110715.html
Copyright © 2011-2022 走看看