zoukankan      html  css  js  c++  java
  • 并发、并行、同步、异步、全局解释锁GIL、同步锁Lock、死锁、递归锁、同步对象/条件、信号量、队列、生产者消费者、多进程模块、进程的调用、Process类、

    并发:是指系统具有处理多个任务/动作的能力。

    并行:是指系统具有同时处理多个任务/动作的能力。

    并行是并发的子集

    同步:当进程执行到一个IO(等待外部数据)的时候。

    异步:当进程执行到一个IO不等到数据接收成功后再回来处理。

    def add():
    sum = 0
    for i in range(1000000):
    sum += i
    print("sum",sum)
    def mul():
    sum2 = 1
    for i in range(1,100000):
    sum2 *= i
    print("sum2",sum2)
    import threading,time
    start = time.time()
    t1 = threading.Thread(target=add)
    t2 = threading.Thread(target=mul)
    l = []
    l.append(t1)
    l.append(t2)
    for t in l:
    t.start()
    for i in l:
    t.join()
    print("cost time %s"%(time.time() - start))
    #GIL 全局解释锁,因为有GIL,所以,同一时刻,只有一个线程被CPU执行
    #任务:IO密集型 计算密集型
    #对于IO密集型任务,Python的多线程是有意义的。
    # 可以采用多进程+协程
    # 对于计算密集型任务,Python多线程不推荐。
    import threading
    import time
    def sub():
    global num
    #同步锁Lock
    lock.acquire()
    temp = num
    time.sleep(0.01)
    num = temp - 1
    lock.release()
    num = 100
    l = []
    lock = threading.Lock()
    for i in range(100):
    t = threading.Thread(target=sub)
    t.start()
    l.append(t)
    for t in l:
    t.join()
    print(num)
    死锁:
    在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁,因为系统判断这部分资源都正在使用,所有这两个线程在无外力作用下将一直等待下去。
    递归锁:
    import threading
    import time
    class MyThread(threading.Thread):
    def actionA(self):
    r_lock.acquire()
    print(self.name,"gotA",time.ctime())
    time.sleep(2)
    r_lock.acquire()
    print(self.name,"gotB",time.ctime())
    time.sleep(1)
    r_lock.release()
    r_lock.release()
    def actionB(self):
    r_lock.acquire()
    print(self.name,"gotB",time.ctime())
    time.sleep(2)
    r_lock.acquire()
    print(self.name,"gotA",time.ctime())
    time.sleep(1)
    r_lock.release()
    r_lock.release()
    def run(self):
    self.actionA()
    self.actionB()
    if __name__ == '__main__':
    r_lock = threading.RLock()
    L = []
    for i in range(5):
    t = MyThread()
    t.start()
    L.append(t)
    for i in L:
    i.join()
    同步对象/条件:
    import threading,time
    class Boss(threading.Thread):
    def run(self):
    print("BOSS:今晚大家都要加班到22:00。")
    print(event.isSet())
    event.set()
    time.sleep(5)
    print("BOSS:<22:00>可以下班了。")
    print(event.isSet())
    event.set()
    class Worker(threading.Thread):
    def run(self):
    event.wait()
    print("Worker:哎……命苦啊!")
    time.sleep(1)
    event.clear()
    event.wait()
    print("Worker:OhYeah!")
    if __name__=="__main__":
    event=threading.Event()
    threads=[]
    for i in range(5):
    threads.append(Worker())
    threads.append(Boss())
    for t in threads:
    t.start()
    for t in threads:
    t.join()
    print("ending.....")
    信号量:

    信号量用来控制线程并发数的,BoundedSemaphore或Semaphore管理一个内置的计数 器,每当调用acquire()时-1,调用release()时+1。

    计数器不能小于0,当计数器为 0时,acquire()将阻塞线程至同步锁定状态,直到其他线程调用release()。(类似于停车位的概念)

    BoundedSemaphore与Semaphore的唯一区别在于前者将在调用release()时检查计数 器的值是否超过了计数器的初始值,如果超过了将抛出一个异常。

    import threading,time
    class myThread(threading.Thread):
    def run(self):
    if semaphore.acquire():
    print(self.name)
    time.sleep(5)
    semaphore.release()
    if __name__=="__main__":
    semaphore=threading.Semaphore(5)
    thrs=[]
    for i in range(100):
    thrs.append(myThread())
    for t in thrs:
    t.start()
    多线程利器队列:
    import queue
    q = queue.Queue()
    q.put(12)
    q.put("hello")
    q.put({"name":"yuan"})
    while 1:
    data = q.get()
    print(data)
    print("----------------")

    创建一个“队列”对象
    import Queue
    q = Queue.Queue(maxsize = 10)
    Queue.Queue类即是一个队列的同步实现。队列长度可为无限或者有限。可通过Queue的构造函数的可选参数maxsize来设定队列长度。如果maxsize小于1就表示队列长度无限。

    将一个值放入队列中
    q.put(10)
    调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为
    1。如果队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。如果block为0,put方法将引发Full异常。

    将一个值从队列中取出
    q.get()
    调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。如果队列为空且block为True,
    get()就使调用线程暂停,直至有项目可用。如果队列为空且block为False,队列将引发Empty异常。

    Python Queue模块有三种队列及构造函数:
    1、Python Queue模块的FIFO队列先进先出。 class queue.Queue(maxsize)
    2、LIFO类似于堆,即先进后出。 class queue.LifoQueue(maxsize)
    3、还有一种是优先级队列级别越低越先出来。 class queue.PriorityQueue(maxsize)

    此包中的常用方法(q = Queue.Queue()):
    q.qsize() 返回队列的大小
    q.empty() 如果队列为空,返回True,反之False
    q.full() 如果队列满了,返回True,反之False
    q.full 与 maxsize 大小对应
    q.get([block[, timeout]]) 获取队列,timeout等待时间
    q.get_nowait() 相当q.get(False)
    非阻塞 q.put(item) 写入队列,timeout等待时间
    q.put_nowait(item) 相当q.put(item, False)
    q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号
    q.join() 实际上意味着等到队列为空,再执行别的操作

    生产者消费者:

    生产者消费者模型:

    为什么要使用生产者和消费者模式

    在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

    什么是生产者消费者模式

    生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

    这就像,在餐厅,厨师做好菜,不需要直接和客户交流,而是交给前台,而客户去饭菜也不需要不找厨师,直接去前台领取即可,这也是一个结耦的过程。

    import time,random
    import queue,threading
    q = queue.Queue()
    def Producer(name):
    count = 0
    while count <10:
    print("making........")
    time.sleep(random.randrange(3))
    q.put(count)
    print('Producer %s has produced %s baozi..' %(name, count))
    count +=1
    #q.task_done()
    #q.join()
    print("ok......")
    def Consumer(name):
    count = 0
    while count <10:
    time.sleep(random.randrange(4))
    if not q.empty():
    data = q.get()
    #q.task_done()
    #q.join()
    print(data)
    print('33[32;1mConsumer %s has eat %s baozi...33[0m' %(name, data))
    else:
    print("-----no baozi anymore----")
    count +=1
    p1 = threading.Thread(target=Producer, args=('A',))
    c1 = threading.Thread(target=Consumer, args=('B',))
    # c2 = threading.Thread(target=Consumer, args=('C',))
    # c3 = threading.Thread(target=Consumer, args=('D',))
    p1.start()
    c1.start()
    # c2.start()
    # c3.start()
    多进程模块:

    由于GIL的存在,python中的多线程其实并不是真正的多线程,如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。

    multiprocessing包是Python中的多进程管理包。与threading.Thread类似,它可以利用multiprocessing.Process对象来创建一个进程。该进程可以运行在Python程序内部编写的函数。该Process对象与Thread对象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition类 (这些对象可以像多线程那样,通过参数传递给各个进程),用以同步进程,其用法与threading包中的同名类一致。所以,multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的情境。

    进程的调用:

    from multiprocessing import Process
    import time
    def f(name):
    time.sleep(1)
    print("hello",name,time.ctime())
    if __name__ == '__main__':
    p_list = []
    for i in range(3):
    p = Process(target= f ,args= ("alvin",))
    p_list.append(p)
    p.start()
    for i in p_list:
    i.join()
    print("end")
    Process类:

    构造方法:

    Process([group [, target [, name [, args [, kwargs]]]]])

      group: 线程组,目前还没有实现,库引用中提示必须是None; 
      target: 要执行的方法; 
      name: 进程名; 
      args/kwargs: 要传入方法的参数。

    实例方法:

      is_alive():返回进程是否在运行。

      join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。

      start():进程准备就绪,等待CPU调度

      run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。

      terminate():不管任务是否完成,立即停止工作进程

    属性:

      daemon:和线程的setDeamon功能一样

      name:进程名字。

      pid:进程号。

  • 相关阅读:
    hive sql常用整理-hive引擎设置
    hdfs数据到hbase过程
    phoenix表操作
    HBase describe table 参数说明
    HBase 常用Shell命令
    sqoop的基本语法详解及可能遇到的错误
    Linux maven 下 jar包下载不下来的解决方法
    Linu 修改maven的setting保护文件
    Mybatis generator 自动生成代码
    Springmvc mvc:exclude-mapping不拦截 无效
  • 原文地址:https://www.cnblogs.com/zhang-da/p/11458129.html
Copyright © 2011-2022 走看看