zoukankan      html  css  js  c++  java
  • 并发编程

    #        ======   信号量
    #信号量Semaphore是同时允许一定数量的线程更改数据
    # 多进程中的组件
    # ktv
    # 4个
    # 一套资源 同一时间 只能被n个人访问
    # 某一段代码 同一时间 只能被n个进程执行
    # import time
    # import random
    # from multiprocessing import Process
    # from multiprocessing import Semaphore

    # sem = Semaphore(4)
    # sem.acquire()
    # print('拿到第一把钥匙')
    # sem.acquire()
    # print('拿到第二把钥匙')
    # sem.acquire()
    # print('拿到第三把钥匙')
    # sem.acquire()
    # print('拿到第四把钥匙')
    # sem.acquire()
    # print('拿到第五把钥匙')
    # def ktv(i,sem):
    # sem.acquire() #获取钥匙
    # print('%s走进ktv'%i)
    # time.sleep(random.randint(1,5))
    # print('%s走出ktv'%i)
    # sem.release()
    #
    #
    # if __name__ == '__main__' :
    # sem = Semaphore(4)
    # for i in range(20):
    # p = Process(target=ktv,args=(i,sem))
    # p.start()



    # 事件
    #python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。
    # 通过一个信号 来控制 多个进程 同时 执行或者阻塞
    # 事件
    # 一个信号可以使所有的进程都进入阻塞状态
    # 也可以控制所有的进程解除阻塞
    # 一个事件被创建之后,默认是阻塞状态
    # e = Event() # 创建了一个事件
    # print(e.is_set()) # 查看一个事件的状态,默认被设置成阻塞
    # e.set() # 将这个事件的状态改为True
    # print(e.is_set())
    # e.wait() # 是依据e.is_set()的值来决定是否阻塞的
    # print(123456)
    # e.clear() # 将这个事件的状态改为False
    # print(e.is_set())
    # e.wait() # 等待 事件的信号被变成True
    # print('*'*10)

    # set 和 clear
    # 分别用来修改一个事件的状态 True或者False
    # is_set 用来查看一个事件的状态
    # wait 是依据事件的状态来决定自己是否在wait处阻塞
    # False阻塞 True不阻塞

    # 红绿灯事件
    # import time
    # import random
    # from multiprocessing import Event,Process
    # def cars(e,i):
    # if not e.is_set():
    # print('car%i在等待'%i)
    # e.wait() # 阻塞 直到得到一个 事件状态变成 True 的信号
    # print('33[0;32;40mcar%i通过33[0m' % i)
    #
    # def light(e):
    # while True:
    # if e.is_set():
    # e.clear()
    # print('33[31m红灯亮了33[0m')
    # else:
    # e.set()
    # print('33[32m绿灯亮了33[0m')
    # time.sleep(2)
    #
    # if __name__ == '__main__':
    # e = Event()
    # traffic = Process(target=light,args=(e,))
    # traffic.start()
    # for i in range(20):
    # car = Process(target=cars, args=(e,i))
    # car.start()
    # time.sleep(random.random())


    # ========= IPC 进程之间的通信
    # 队列
    #创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递
    #队列的方法
    # Queue([maxsize])
    # 创建共享的进程队列。
    # 参数 :maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。
    # 底层队列使用管道和锁定实现。
    # Queue([maxsize])
    # 创建共享的进程队列。maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。
    # 底层队列使用管道和锁定实现。另外,还需要运行支持线程以便队列中的数据传输到底层管道中。
    # Queue的实例q具有以下方法:
    # q.get( [ block [ ,timeout ] ] )
    # 返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,
    # 默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,
    # 用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。
    # q.get_nowait( )
    # 同q.get(False)方法。 有的话输出,没有的话报错
    # q.put(item [, block [,timeout ] ] )
    # 将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。
    # 如果设置为False,将引发Queue.Empty异常(定义在Queue库模块中)。
    # timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。
    # q.qsize()
    # 返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,
    # 队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。
    # q.empty()
    # 如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。
    # 也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。
    # q.full()
    # 如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)。。
    # q.close()
    # 关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已入队列但尚未写入的数据,
    # 但将在此方法完成时马上关闭。如果q被垃圾收集,将自动调用此方法。
    # 关闭队列不会在队列使用者中生成任何类型的数据结束信号或异常。
    # 例如,如果某个使用者正被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。
    # q.cancel_join_thread()
    # 不会再进程退出时自动连接后台线程。这可以防止join_thread()方法阻塞。
    # q.join_thread()
    # 连接队列的后台线程。此方法用于在调用q.close()方法后,等待所有队列项被消耗。
    # 默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread()方法可以禁止这种行为。

    # 队列 先进先出
    # IPC'
    # import time
    # from multiprocessing import Queue
    # q = Queue(5)
    # q.put(1)
    # q.put(2)
    # q.put(3)
    # q.put(4)
    # q.put(5)
    # print(q.full()) # 队列是否满了
    # print(q.get())
    # print(q.get())
    # print(q.get())
    # print(q.get())
    # print(q.get())
    # print(q.empty())
    # while True:
    # try:
    # q.get_nowait()
    # except:
    # print('队列已空')
    # time.sleep(0.5)
    # for i in range(6):
    # q.put(i)

    # from multiprocessing import Queue,Process
    # def produce(q):
    # q.put('hello')
    # def consume(q):
    # print(q.get())
    # if __name__ == '__main__':
    # q = Queue()
    # p = Process(target=produce,args=(q,))
    # p.start()
    # c = Process(target=consume, args=(q,))
    # c.start()


    # 生产者消费者模型
    # 在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。
    # 该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

    # 什么是生产者消费者模式
    # 生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,
    # 而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,
    # 消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

    # 为什么要使用生产者和消费者模式
    # 在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,
    # 如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。
    # 同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

    # 通过队列实现生产者消费者模式
    # 队列
    # 生产者消费者模型

    # 生产者 进程
    # 消费者 进程
    # import time
    # import random
    # from multiprocessing import Process,Queue
    # def consumer(q,name):
    # while True:
    # food = q.get()
    # if food is None:
    # print('%s获取到了一个空'%name)
    # break
    # print('33[31m%s消费了%s33[0m' % (name,food))
    # time.sleep(random.randint(1,3))
    #
    # def producer(name,food,q):
    # for i in range(4):
    # time.sleep(random.randint(1,3))
    # f = '%s生产了%s%s'%(name,food,i)
    # print(f)
    # q.put(f)
    #
    # if __name__ == '__main__':
    # q = Queue(20)
    # p1 = Process(target=producer,args=('Egon','包子',q))
    # p2 = Process(target=producer, args=('wusir','泔水', q))
    # c1 = Process(target=consumer, args=(q,'alex'))
    # c2 = Process(target=consumer, args=(q,'jinboss'))
    # p1.start()
    # p2.start()
    # c1.start()
    # c2.start()
    # p1.join()
    # p2.join()
    # q.put(None)
    # q.put(None)

    # 第二种 JoinableQueue 模块
    import time
    import random
    from multiprocessing import Process,JoinableQueue
    # def consumer(q,name):
    # while True:
    # food = q.get()
    # print('33[31m%s消费了%s33[0m' % (name,food))
    # time.sleep(random.randint(1,3))
    # q.task_done() # count - 1
    # def producer(name,food,q):
    # for i in range(4):
    # time.sleep(random.randint(1,3))
    # f = '%s生产了%s%s'%(name,food,i)
    # print(f)
    # q.put(f)
    # q.join() # 阻塞 直到一个队列中的所有数据 全部被处理完毕
    #
    # if __name__ == '__main__':
    # q = JoinableQueue(20)
    # p1 = Process(target=producer,args=('Egon','包子',q))
    # p2 = Process(target=producer, args=('wusir','泔水', q))
    # c1 = Process(target=consumer, args=(q,'alex'))
    # c2 = Process(target=consumer, args=(q,'jinboss'))
    # p1.start()
    # p2.start()
    # c1.daemon = True # 设置为守护进程 主进程中的代码执行完毕之后,子进程自动结束
    # c2.daemon = True
    # c1.start()
    # c2.start()
    # p1.join()
    # p2.join() # 感知一个进程的结束


    # 在消费者这一端:
    # 每次获取一个数据
    # 处理一个数据
    # 发送一个记号 : 标志一个数据被处理成功

    # 在生产者这一端:
    # 每一次生产一个数据,
    # 且每一次生产的数据都放在队列中
    # 在队列中刻上一个记号
    # 当生产者全部生产完毕之后,
    # join信号 : 已经停止生产数据了
    # 且要等待之前被刻上的记号都被消费完
    # 当数据都被处理完时,join阻塞结束

    # consumer 中把所有的任务消耗完
    # producer 端 的 join感知到,停止阻塞
    # 所有的producer进程结束
    # 主进程中的p.join结束
    # 主进程中代码结束
    # 守护进程(消费者的进程)结束
  • 相关阅读:
    数据结构:数组、链表、栈、队列的理解
    JVM学习记录-JVM的内存结构管理和运行时数据区理解
    线程池ThreadPoolExecutor的一种扩展办法
    四级地址插件升级改造(京东商城地址选择插件)city-picker
    java8在Collection中新增加的方法removeIf
    使用lambda编程之延迟执行
    JSP的内置对象以及作用域。
    Netty 异步的、事件驱动的网络应用程序框架和工具
    Zookeeper 服务注册和发现
    Jetty 发布web服务
  • 原文地址:https://www.cnblogs.com/xuerh/p/8663841.html
Copyright © 2011-2022 走看看