zoukankan      html  css  js  c++  java
  • 锁 队列 和池

    1.锁

    #线程中是不是会产生数据不安全
    #   共享内存
    a = 0
    def add_f():
       global a
       for i in range(200000):
           a += 1

    def sub_f():
       global a
       for i in range(200000):
           a -= 1

    from threading import Thread

    t1 = Thread(target=add_f)
    t1.start()
    t2 = Thread(target=sub_f)
    t2.start()
    t1.join()
    t2.join()
    print(a)
    a = 0
    def func():
       global a
       a -= 1
    import dis
    dis.dis(func)

    #即便是线程 即便有GIL 也会出现数据不安全的问题
    #   1.操作的是全局变量
    #   2.做一下操作
    #       += -= *= /+ 先计算再赋值才容易出现数据不安全的问题
    #       包括 lst[0] += 1 dic['key']-=1

    a = 0
    def func():
       global a
       a += 1

    import dis
    dis.dis(func)


    a = 0
    def add_f(lock):
       global a
       for i in range(200000):
           with lock:
               a += 1

    def sub_f(lock):
       global a
       for i in range(200000):
           with lock:
               a -= 1

    from threading import Thread,Lock
    lock = Lock()
    t1 = Thread(target=add_f,args=(lock,))
    t1.start()
    t2 = Thread(target=sub_f,args=(lock,))
    t2.start()
    t1.join()
    t2.join()
    print(a)
    #加锁会影响程序的执行效率,但是保证了数据的安全

    #互斥锁是锁中的一种:在同一个线程中,不能连续acquire多次
    from threading import Lock
    lock = Lock()
    lock.acquire()
    print('*'*20)
    lock.release()
    lock.acquire()
    print('-'*20)
    lock.release()

    2.单例模式

    import time
    from threading import Lock
    class A:
       __instance = None
       lock = Lock()
       def __new__(cls, *args, **kwargs):
           with cls.lock:
               if not cls.__instance:
                   time.sleep(0.1)
                   cls.__instance = super().__new__(cls)
           return cls.__instance
       def __init__(self,name,age):
           self.name = name
           self.age = age

    def func():
       a = A('alex', 84)
       print(a)

    from threading import Thread
    for i in range(10):
       t = Thread(target=func)
       t.start()

    3.死锁现象

    import time
    from threading import Thread,Lock
    noodle_lock = Lock()
    fork_lock = Lock()
    def eat1(name,noodle_lock,fork_lock):
       noodle_lock.acquire()
       print('%s抢到面了'%name)
       fork_lock.acquire()
       print('%s抢到叉子了' % name)
       print('%s吃了一口面'%name)
       time.sleep(0.1)
       fork_lock.release()
       print('%s放下叉子了' % name)
       noodle_lock.release()
       print('%s放下面了' % name)

    def eat2(name,noodle_lock,fork_lock):
       fork_lock.acquire()
       print('%s抢到叉子了' % name)
       noodle_lock.acquire()
       print('%s抢到面了'%name)
       print('%s吃了一口面'%name)
       time.sleep(0.1)
       noodle_lock.release()
       print('%s放下面了' % name)
       fork_lock.release()
       print('%s放下叉子了' % name)

    lst = ['alex','wusir','taibai','yuan']
    Thread(target=eat1,args=(lst[0],noodle_lock,fork_lock)).start()
    Thread(target=eat2,args=(lst[1],noodle_lock,fork_lock)).start()
    Thread(target=eat1,args=(lst[2],noodle_lock,fork_lock)).start()
    Thread(target=eat2,args=(lst[3],noodle_lock,fork_lock)).start()





    # 锁
       # 互斥锁
           # 在一个线程中连续多次acquire会死锁
       # 递归锁
           # 在一个线程中连续多次acquire不会死锁
       # 死锁现象
           # 死锁现象是怎么发生的?
               # 1.有多把锁,一把以上
               # 2.多把锁交替使用
       # 怎么解决
           # 递归锁 —— 将多把互斥锁变成了一把递归锁
               # 快速解决问题
               # 效率差
           # ***递归锁也会发生死锁现象,多把锁交替使用的时候
           # 优化代码逻辑
               # 可以使用互斥锁 解决问题
               # 效率相对好
               # 解决问题的效率相对低

    4.递归锁

    from threading import RLock
    # rlock = RLock()
    # rlock.acquire()
    # print('*'*20)
    # rlock.acquire()
    # print('-'*20)
    # rlock.acquire()
    # print('*'*20)

    # 在同一个线程中,可以连续acuqire多次不会被锁住

    # 递归锁:
       # 好 :在同一个进程中多次acquire也不会发生阻塞
       # 不好 :占用了更多资源
    import time
    from threading import RLock,Thread
    # noodle_lock = RLock()
    # fork_lock = RLock()
    noodle_lock = fork_lock = RLock()
    print(noodle_lock,fork_lock)
    def eat1(name,noodle_lock,fork_lock):
       noodle_lock.acquire()
       print('%s抢到面了'%name)
       fork_lock.acquire()
       print('%s抢到叉子了' % name)
       print('%s吃了一口面'%name)
       time.sleep(0.1)
       fork_lock.release()
       print('%s放下叉子了' % name)
       noodle_lock.release()
       print('%s放下面了' % name)

    def eat2(name,noodle_lock,fork_lock):
       fork_lock.acquire()
       print('%s抢到叉子了' % name)
       noodle_lock.acquire()
       print('%s抢到面了'%name)
       print('%s吃了一口面'%name)
       time.sleep(0.1)
       noodle_lock.release()
       print('%s放下面了' % name)
       fork_lock.release()
       print('%s放下叉子了' % name)

    lst = ['alex','wusir','taibai','yuan']
    Thread(target=eat1,args=(lst[0],noodle_lock,fork_lock)).start()
    Thread(target=eat2,args=(lst[1],noodle_lock,fork_lock)).start()
    Thread(target=eat1,args=(lst[2],noodle_lock,fork_lock)).start()
    Thread(target=eat2,args=(lst[3],noodle_lock,fork_lock)).start()

    5.互斥锁解决死锁问题

    import time
    from threading import Lock,Thread
    lock = Lock()
    def eat1(name,noodle_lock,fork_lock):
       lock.acquire()
       print('%s抢到面了'%name)
       print('%s抢到叉子了' % name)
       print('%s吃了一口面'%name)
       time.sleep(0.1)
       print('%s放下叉子了' % name)
       print('%s放下面了' % name)
       lock.release()

    def eat2(name,noodle_lock,fork_lock):
       lock.acquire()
       print('%s抢到叉子了' % name)
       print('%s抢到面了'%name)
       print('%s吃了一口面'%name)
       time.sleep(0.1)
       print('%s放下面了' % name)
       print('%s放下叉子了' % name)
       lock.release()

    lst = ['alex','wusir','taibai','yuan']
    Thread(target=eat1,args=(lst[0],noodle_lock,fork_lock)).start()
    Thread(target=eat2,args=(lst[1],noodle_lock,fork_lock)).start()
    Thread(target=eat1,args=(lst[2],noodle_lock,fork_lock)).start()
    Thread(target=eat2,args=(lst[3],noodle_lock,fork_lock)).start()

    6.队列

    import queue
    # 线程之间的通信 线程安全
    from queue import Queue  # 先进先出队列
    q = Queue(5)
    q.put(0)
    q.put(1)
    q.put(2)
    q.put(3)
    q.put(4)
    print('444444')


    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get())

    # 使用多线程 实现一个请求网页 并且把网页写到文件中
    # 生产者消费者模型来完成

    # 5个线程负责请求网页 把结果放在队列里
    # 2个线程 负责从队列中获取网页代码 写入文件

    from queue import LifoQueue  # 后进先出队列
    last in first out
    lfq = LifoQueue(4)
    lfq.put(1)
    lfq.put(3)
    lfq.put(2)
    print(lfq.get())
    print(lfq.get())
    print(lfq.get())

    # 先进先出
       # 写一个server,所有的用户的请求放在队列里
           # 先来先服务的思想
    # 后进先出
       # 算法
    # 优先级队列
       # 自动的排序
       # 抢票的用户级别 100000 100001
       # 告警级别
    from queue import PriorityQueue
    pq = PriorityQueue()
    pq.put((10,'alex'))
    pq.put((6,'wusir'))
    pq.put((20,'yuan'))
    print(pq.get())
    print(pq.get())
    print(pq.get())

    7.池

    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

    # 池
       # 进程池
       # 线程池
    # 为什么要有池?
    # 10000
    # 池
    # 预先的开启固定个数的进程数,当任务来临的时候,直接提交给已经开好的进程
    # 让这个进程去执行就可以了
    # 节省了进程,线程的开启 关闭 切换都需要时间
    # 并且减轻了操作系统调度的负担

    # concurrent.futures
    import os
    import time
    import random
    from concurrent.futures import ProcessPoolExecutor
    submit + shutdown
    def func():
       print('start',os.getpid())
       time.sleep(random.randint(1,3))
       print('end', os.getpid())
    if __name__ == '__main__':
       p = ProcessPoolExecutor(5)
       for i in range(10):
           p.submit(func)
       p.shutdown()   # 关闭池之后就不能继续提交任务,并且会阻塞,直到已经提交的任务完成
       print('main',os.getpid())

    # 任务的参数 + 返回值
    def func(i,name):
       print('start',os.getpid())
       time.sleep(random.randint(1,3))
       print('end', os.getpid())
       return '%s * %s'%(i,os.getpid())
    if __name__ == '__main__':
       p = ProcessPoolExecutor(5)
       ret_l = []
       for i in range(10):
           ret = p.submit(func,i,'alex')
           ret_l.append(ret)
       for ret in ret_l:
           print('ret-->',ret.result())  # ret.result() 同步阻塞
       print('main',os.getpid())

    # 开销大
    # 一个池中的任务个数限制了我们程序的并发个数

    # 线程池
    from concurrent.futures import ThreadPoolExecutor
    def func(i):
       print('start', os.getpid())
       time.sleep(random.randint(1,3))
       print('end', os.getpid())
       return '%s * %s'%(i,os.getpid())
    tp = ThreadPoolExecutor(20)
    ret_l = []
    for i in range(10):
       ret = tp.submit(func,i)
       ret_l.append(ret)
    tp.shutdown()
    print('main')
    for ret in ret_l:
       print('------>',ret.result())



    from concurrent.futures import ThreadPoolExecutor
    def func(i):
       print('start', os.getpid())
       time.sleep(random.randint(1,3))
       print('end', os.getpid())
       return '%s * %s'%(i,os.getpid())
    tp = ThreadPoolExecutor(20)
    ret = tp.map(func,range(20))
    for i in ret:
       print(i)
    ret_l = []
    for i in range(10):
       ret = tp.submit(func,i)
       ret_l.append(ret)
    tp.shutdown()
    print('main')

    # 回调函数
    import requests
    from concurrent.futures import ThreadPoolExecutor
    def get_page(url):
       res = requests.get(url)
       return {'url':url,'content':res.text}

    def parserpage(ret):
       dic = ret.result()
       print(dic['url'])
    tp = ThreadPoolExecutor(5)
    url_lst = [
       'http://www.baidu.com',   # 3
       'http://www.cnblogs.com', # 1
       'http://www.douban.com',  # 1
       'http://www.tencent.com',
       'http://www.cnblogs.com/Eva-J/articles/8306047.html',
       'http://www.cnblogs.com/Eva-J/articles/7206498.html',
    ]
    ret_l = []
    for url in url_lst:
       ret = tp.submit(get_page,url)
       ret_l.append(ret)
       ret.add_done_callback(parserpage)


    # ThreadPoolExcutor
    # ProcessPoolExcutor

    # 创建一个池子
    # tp = ThreadPoolExcutor(池中线程/进程的个数)
    # 异步提交任务
    # ret = tp.submit(函数,参数1,参数2....)
    # 获取返回值
    # ret.result()
    # 在异步的执行完所有任务之后,主线程/主进程才开始执行的代码
    # tp.shutdown() 阻塞 直到所有的任务都执行完毕
    # map方法
    # ret = tp.map(func,iterable) 迭代获取iterable中的内容,作为func的参数,让子线程来执行对应的任务
    # for i in ret: 每一个都是任务的返回值
    # 回调函数
    # ret.add_done_callback(函数名)
    # 要在ret对应的任务执行完毕之后,直接继续执行add_done_callback绑定的函数中的内容,并且ret的结果会作为参数返回给绑定的函数

    # 5个进程
    # 20个线程
    # 5*20 = 100个并发

     

  • 相关阅读:
    商贸通帐套隐藏方法
    固定资产打开提示:上年度数据未结转!
    ZOJ 2432 Greatest Common Increasing Subsequence
    POJ 1080 Human Gene Functions
    POJ 1088 滑雪
    POJ 1141 Brackets Sequence
    POJ 1050 To the Max
    HDOJ 1029 Ignatius and the Princess IV
    POJ 2247 Humble Numbers
    HDOJ 1181 变形课
  • 原文地址:https://www.cnblogs.com/usherwang/p/13039419.html
Copyright © 2011-2022 走看看