zoukankan      html  css  js  c++  java
  • python基础-并发编程02

    并发编程

    子进程回收的两种方式

    • join()让主进程等待子进程结束,并回收子进程资源,主进程再结束并回收资源

      from multiprocessing import Process
      import time
      
      
      def task(name):
          print(f'子进程{name}:starting……')
          time.sleep(1)
          print(f'子进程{name}:end……')
      
      
      if __name__ == '__main__':
          print('进入主进程……')
          pro_list = []
          for i in range(3):
              pro_obj = Process(target=task, args=(i,))
              pro_list.append(pro_obj)
              pro_obj.start()
      
          for pro in pro_list:
              # 强制子进程结束后,主进程才可以结束,实现子进程资源回收
              pro.join()
      
          print('结束主进程……')
      
      
    • 主进程正常结束,子进程与主进程一并被回收资源

    了解知识

    僵尸进程:子进程结束后,主进程没有正常结束,子进程PID不会被回收。

    缺点:操作系统中的PID号是有限的,只用PID号也就是资源被占用,可能会导致无法创建新的进程

    孤儿进程:子进程未结束,主进程没有正常结束,子进程PID不会被回收,会被操作系统优化机制回收。

    操作系统优化机制:当主进程意外终止,操作系统会检测是否有正在运行的子进程,如果有,操作系统会将其放入优化机制中回收


    守护进程

    当主进程被结束时,子进程必须结束。守护进程必须在子进程开启之前设置

    from multiprocessing import Process
    import time
    
    
    # 进程任务
    def task():
        print('starting……')
        time.sleep(2)
        print('ending……')
    
    
    if __name__ == '__main__':
        print('进入主进程……')
        obj_list = []
        for i in range(2):
            # 创建进程
            pro_obj = Process(target=task)
            obj_list.append(pro_obj)
            # 开启守护进程
            pro_obj.daemon = True
            # 守护进程必须在进程开启之前设置
            pro_obj.start()
    
        for obj in obj_list:
            obj.join()
    
        print('主进程结束……')
    

    进程间数据是隔离的,代码论证

    from multiprocessing import Process
    
    count = 0
    
    
    def func1():
        global count
        count += 10
        print(f'func1:{count}')
    
    
    def func2(count):
        count += 100
        print(f'func2:{count}')
    
    
    if __name__ == '__main__':
        # 创建子进程1
        pro_obj1 = Process(target=func1)
        # 创建子进程2
        pro_obj2 = Process(target=func2, args=(count,))
        # 子进程1开启
        pro_obj1.start()
        pro_obj1.join()
        # 子进程2开启
        pro_obj2.start()
        pro_obj2.join()
    
        print(f'主进程:{count}')
    

    输出结果

    func1:10
    func2:100
    主进程:0
    

    线程

    参考: https://blog.csdn.net/daaikuaichuan/article/details/82951084

    一般会将进程和线程一起讲,做个区分

    进程:操作系统会以进程为单位,分配系统资源(CPU时间片、内存等资源),进程是资源分配的最小单位

    1575789286438

    线程:有时被称为轻量级进程,是操作系统调度(CPU调度)执行的最小单位

    1575789300900

    进程和线程的区别

    • 调度:线程作为调度和分配的基本单位,进程作为拥有资源的基本单位
    • 并发性:进程之间可以并发执行,同一个进程的多个线程之间也可以并发执行
    • 拥有资源:进程是拥有资源的独立单位,线程不拥有系统资源,但可以访问隶属于进程的资源。
    • 系统开销:在创建或撤销进程时,系统都要为之分配和回收资源。而线程只是一个进程中的不同执行路径。一个进程挂掉就等于所有的线程挂掉。因此,多进程的程序要比多线程的程序健壮,但在进程切换时,耗费资源较大,效率要差一些

    进程和线程的联系

    • 一个线程只能属于一个进程,而一个进程可以有多个线程,至少有一个线程
    • 资源分配给进程,同一进程的所有线程共享该进程的所有资源
    • 真正在处理机运行的是线程
    • 不同进程的线程间要利用消息通信的办法实现同步

    线程的实现

    # 创建子线程的方式一
    # 1、导入threading模块中的Thread类
    from threading import Thread
    import time
    
    number = 1000
    
    
    def task():
        global number
        number = 200
        print('子线程开始……')
        time.sleep(1)
        print('子线程结束……')
    
    
    if __name__ == '__main__':
        # 2、创建一个子线程对象
        thread_obj = Thread(target=task)
        # 3、开启子线程
        thread_obj.start()
        # 4、设置子线程结束,主线程才能结束
        thread_obj.join()
        print('主进程(主线程)……')
        print(number)			# 输出结果:200
    
    # 创建子线程的方式二
    # 1、导入threading模块中的Thread类
    from threading import Thread
    import time
    
    # 2、继承Thread类
    class MyThread(Thread):
        def run(self):
            global number
            number = 200
            print('子线程开始……')
            time.sleep(1)
            print('子线程结束……')
    
    
    if __name__ == '__main__':
        # 创建一个子线程对象
        t = MyThread()
        # 开启子线程
        t.start()
        t.join()
        print('主进程(主线程)……')
        print(number)			# 输出结果:200
    

    守护子线程:设置子线程对象的demon属性为True,即

    def task():
        global number
        number = 200
        print('子线程开始……')
        time.sleep(1)
        print('子线程结束……')
    
    
    if __name__ == '__main__':
        # 2、创建一个子线程对象
        thread_obj = Thread(target=task)
        # 3、开启子线程
        thread_obj.daemon = True
        # 4、开启守护线程
        thread_obj.start()
        # 5、设置子线程结束,主线程才能结束
        thread_obj.join()
        print('主进程(主线程)……')
        print(number)
    

    队列

    队列相当于一个第三方通道,可以存放数据,实现进程之间数据传递(也就是数据交互)。特点是先进先出

    可通过三种方式实现

    • from multiprocessing import Queue
    • from multiprocessing import JoinableQueue # 推荐使用这种方式
    • import queue # python内置队列

    队列存数据

    • put(obj):存数据,存放的数据个数超过队列设置的长度,进程进入阻塞状态
    • put_nowait(obj):存数据,当存放的数据个数超过队列设置的长度,报错

    队列取数据

    • get():取数据,队列中的记录被取完后,继续取,进程进入阻塞状态
    • get_nowait():取数据,队列中的记录被取完后,继续取,报错

    使用

    from multiprocessing import JoinableQueue
    # from multiprocessing import Queue
    # import queue
    from multiprocessing import Process
    
    
    # 往队列中存储数据
    def task_put(queue):
        number_list = [10, 20, 30, 40]
        for i in number_list:
            # put() 存数据,存放的数据个数超过队列设置的长度,进程进入阻塞状态
            queue.put(i)
            print(f'存入记录:{i}')
            # put_nowait() 存数据,当存放的数据个数超过队列设置的长度,报错
            # queue.put_nowait(i)
            # print(f'存入记录:{i}')
    
        queue.put(1000)
        print(f'存入记录:{1000}')
        # put_nowait() 存数据,当存放的数据超过队列设置的长度,报错
        # queue.put_nowait(1000)
        # print(f'存入记录:{1000}')
    
    
    # 从队列中取数据
    def task_get(queue):
        for i in range(5):
            # get() 取数据,队列中的记录被取完后,继续取,进程进入阻塞状态
            print(f'取出的第{i+1}个记录:{queue.get()}')
            # get_nowait() 取数据,队列中的记录被取完后,继续取,报错
            # print(f'取出的第{i+1}个记录:{queue.get_nowait()}')
    
    
    if __name__ == '__main__':
        # from multiprocessing import JoinableQueue 创建队列对象的方式
        queue_obj = JoinableQueue(3)  # 参数是int类型,表示队列中存放数据的个数
        # from multiprocessing import Queue 创建队列对象的方式
        # queue_obj = Queue(4)
        # import queue 创建队列对象的方式
        # queue_obj = queue.Queue(4)
    
        # 进程1 存数据
        pro_obj1 = Process(target=task_put, args=(queue_obj,))
        pro_obj1.start()
        pro_obj1.join()
    
        # 进程2 取数据
        pro_obj2 = Process(target=task_get, args=(queue_obj,))
        pro_obj2.start()
        pro_obj2.join()
    

    复习:

    通过列表和有序字典实现队列,先进先出

    # 通过列表实现队列
    # 定义一个空列表,当做队列
    queue = []
    # 向列表中插入元素
    queue.insert(0, 1)
    queue.insert(0, 2)
    queue.insert(0, "hello")
    print(queue)
    for index in range(len(queue)):
        print(f"第{index+1}个元素:", queue.pop())
    
    
    # 通过有序字典实现队列方式一
    from collections import OrderedDict
    
    # 向有序字典中插入元素
    ordered_dict = OrderedDict()
    ordered_dict[1] = 1
    ordered_dict[2] = 2
    ordered_dict[3] = 'hello'
    # 将先插入的元素移到最后
    ordered_dict.move_to_end(2)
    ordered_dict.move_to_end(1)
    print(ordered_dict)
    for index in range(3):
        print(ordered_dict.pop(index + 1))
        
    # 方式二
    # 通过有序字典实现队列
    from collections import OrderedDict
    
    ordered_dict = OrderedDict()
    ordered_dict['1'] = 1
    ordered_dict['2'] = 2
    ordered_dict['3'] = 'hello'
    ordered_dict.move_to_end('2')
    ordered_dict.move_to_end('1')
    print(ordered_dict)
    
    ordered_dict.move_to_end('1')
    ordered_dict.move_to_end('2')
    ordered_dict.move_to_end('3')
    index = 1
    for key in ordered_dict:
        print(f'第{index}个元素:{key}')
        index += 1
    
    

    IPC机制

    IPC(Inner-Process Communication,进程间通信)

    进程间的通信可通过队列实现,详情参见队列的示例


    互斥锁

    互斥:散布在不同任务之间的若干程序片段,当某个任务运行其中一个程序片段时,其它任务就不能运行他们之中的任一程序片段,只能等到该任务运行完这个程序片段才可以运行。最基本场景就是:一个公共资源同一时刻只能被一个进程或线程使用,多个进程或线程不能同时使用公共资源

    互斥锁:一种简单的加锁的方法来控制对共享资源的访问,互斥锁只有两种状态,即上锁[lock对象.acquire()]和解锁[lock对象.release()]

    作用:让并发变成了串行,牺牲了执行效率,保证了数据安全

    特点原子性、唯一性、非繁忙等待

    • 原子性:如果一个进程/线程锁定了一个互斥量,没有其他进程/线程在同一时间可以成功锁定这个互斥量
    • 唯一性:如果一个进程/线程锁定了一个互斥量,在它解锁之前,没有其他进程/线程可以锁定这个互斥量
    • 非繁忙等待:如果一个进程/线程已锁定了一个互斥量,第二个进程/线程又试图去锁定这个互斥量,则第二个进程/线程将被挂起(不占用任何cpu资源)直到第一个进程/线程解锁,第二个进程/线程则被唤醒并执行执行,同时锁定这个互斥量

    互斥锁操作流程:

    1. 通过模块[multiprocessing]中的[Lock]类创建互斥锁lock对象
    2. 共享资源的临界区域前,对互斥量进行加锁
    3. 在访问前进行上锁,使用lock对象.acquire(),在访问完成后解锁,使用lock对象.release()

    进程互斥锁: 购票小例子

    # data.json 文件中的内容:{"number": 1}
    # 购票小例子
    from multiprocessing import Process  # 进程
    from multiprocessing import Lock  # 进程互斥锁
    import datetime
    import json
    import random
    import time
    
    
    # 查看余票
    def check_ticket(name):
        with open('data.json', 'r', encoding='utf-8') as f:
            ticket_dic = json.load(f)
        print(f'[{datetime.datetime.now()}]用户{name}查看余票,'
              f'当前余票:{ticket_dic.get("number")}')
    
    
    # 购票
    def buy_ticket(name):
        # 获取当前票的数量
        with open('data.json', 'r', encoding='utf-8') as f:
            ticket_dic = json.load(f)
        number = ticket_dic.get('number')
        if number:
            number -= 1
            # 模拟购票的网络延迟
            time.sleep(random.random())
            ticket_dic['number'] = number
            # 购票成功
            with open('data.json', 'w', encoding='utf-8') as  f:
                json.dump(ticket_dic, f)
            print(f'[{datetime.datetime.now()}]{name}成功抢票!')
        else:
            # 购票失败
            print(f'[{datetime.datetime.now()}]{name}抢票失败!')
    
    
    def main(name, lock):
        # 查看余票
        check_ticket(name)
        # 对购票这个过程使用互斥锁
        # 上锁
        lock.acquire()
        buy_ticket(name)
        # 解锁
        lock.release()
    
    
    if __name__ == '__main__':
        pro_list = []
        # 创建互斥锁对象
        lock = Lock()
        # 创建10个进程
        for i in range(9):
            pro_obj = Process(target=main, args=(f'pro_obj{i+1}', lock))
            pro_obj.start()
    
        for pro in pro_list:
            pro.join()
    
    
    

    线程互斥锁示例

    """
    开启10个线程,对一个数据进行修改
    """
    from threading import Lock
    from threading import Thread
    import time
    
    # 创建线程互斥锁对象
    lock = Lock()
    # 要修改的记录
    number = 100
    
    
    # 线程任务
    def task():
        global number
        # 上锁
        # lock.acquire()
        # 修改值
        number2 = number
        time.sleep(1)
        number = number2 - 1
        # 解锁
        # lock.release()
    
    
    if __name__ == '__main__':
        # 创建10个线程
        list1 = []
        for line in range(10):
            t = Thread(target=task)
            t.start()
            list1.append(t)
    
        # 限制子线程结束后,主线程才能结束
        for t in list1:
            t.join()
    
        print(number)  # 加互斥锁,输出:90;不加互斥锁,输出:99
    
    
  • 相关阅读:
    gcc和g++的区别
    configure svn server on win
    FD_SET,FD_ISSET,FD_ZERO,select
    intel中的cr寄存器
    Linux系统环境下的Socket编程详细解析
    可重入函数与不可重入函数
    初步认识迭代服务器和并发服务器
    排序
    fd_set 用法
    MFC消息映射
  • 原文地址:https://www.cnblogs.com/xiaodan1040/p/12016381.html
Copyright © 2011-2022 走看看