zoukankan      html  css  js  c++  java
  • 第五十六篇 并发之协程

    一、引子

    1.线程队列

    from queue import Queue, LifoQueue, PriorityQueue
    

    1.普通线程队列

    和JoinableQueue队列相似

    # 普通线程队列
    q = Queue(2)  # 设置可以最多放入的元素个数
    
    q.put('a')
    q.put('b') 
    # q.put('c')  # 如果要放入的元素或要取出的元素大于设定的个数,则会卡死
    
    print(q.get())
    q.task_done()   # 调用task_done的个数和存入的个数相同时,join函数就不再阻塞程序
    print(q.get())
    q.task_done()   # 可以将多个task_done放在一起,也有效,只要和存入的元素个数相同即可
    
    q.join()   # 阻塞函数,阻塞将一直持续到task_done的调用次数和存入线程队列的元素个数相等为止
    print('over')
    
    '''
    a
    b
    over
    '''
    

    2.LifoQueue

    1.lifo:last in first out 后进先出队列,用于模拟栈

    2.和普通线程队列只有取元素的顺序不同,其他一样

    lifoq = LifoQueue()
    
    lifoq.put('a') 
    lifoq.put('b')
    
    print(lifoq.get())
    print(lifoq.get())
    
    '''
    b
    a
    '''
    

    3.PriorityQueue

    1.具备优先级的队列,取出数据时,会比较大小,越小的数据优先级越高

    # 对于数据则直接比较大小,对于字符串则比较首字母在字母表中的先后顺序(数字的优先级高于字母的优先级)
    pq1 = PriorityQueue()
    
    pq1.put(20)
    pq1.put(2)
    
    print(pq1.get())
    print(pq1.get())
    '''
    2
    20
    '''
    
    pq2 = PriorityQueue()
    
    pq2.put('sir')
    pq2.put('ace')
    
    print(pq2.get())
    print(pq2.get())
    '''
    ace
    sir
    '''
    
    # 对于有序的容器对象,会先比较第一个元素的大小,如果相同再比较后面的相同位置上的元素大小,如果为空,则优先级最高
    pq3 = PriorityQueue()
    
    pq3.put([2,4])
    pq3.put([1,5])
    pq3.put([1])
    
    print(pq3.get())
    print(pq3.get())
    '''
    [1]
    [1, 5]
    [2, 4]
    '''
    

    2.如果存入的是一个自定义对象,我们可以通过运算符重载来规定比较规则,使得对象可以被比较

    class Person:
        def __init__(self, name, age):
            self.age = age
            self.name = name
            
       	# 覆盖比较运算符:当在两个对象之间使用比较运算符时,会自动执行该方法
    	def __lt__(self, other):
    		# 先比较年龄,如果相同,则比较姓名中字母的有序顺序
    		# 返回的是bool值,当我们使用优先级队列时,它会根据bool值,谁小就返回谁
    		if self.age == other.age:
    			return self.name < other.name
    		return self.age < other.age
    
    q = PriorityQueue()
    
    p1 = Person('king', 20)
    p2 = Person('tom', 18)
    
    q.put(p1)
    q.put(p2)
    
    print(q.get().name)
    print(q.get().name)
    '''
    tom
    king
    '''
    

    2.背景

    1.上节课中我们知道GIL锁将导致CPython中的多线程无法并行执行,只能并发的执行,而实现并发的原理是切换+保存,那就意味着使用多线程实现并发,就需要为每一个任务创建一个线程,必然增加了线程创建/销毁/切换/保存所带来的开销

    2.高并发下,由于任务数量太多导致无法开启新的线程,会存在既没有实际任务要执行,也无法创建新线程来处理新任务的情况

    3.既要保证并发效果,也要避免创建线程带来的开销问题,在这个背景下,协程出现了,协程的原理是使用单线程来实现多任务并发

    二、单线程实现并发

    1.可行性

    1.并发:多个任务同时处理,其实是切换加保存,由于CPU运行速度极快,所以看上去是同时进行

    2.并行:利用多核CPU,真正实现多个任务同时处理

    3.早期的计算机只有一个CPU,通过CPU切换线程来实现并发,所以线程内实现并发理论上是可行的

    2.如何实现

    并发 = 切换任务 + 保存状态,只要找到一种方案,能够在两个任务之间切换执行并且保存状态,就可以实现单线程并发

    1.用yield实现

    1.python中的生成器就具备这样一个特点,每次调用next方法,都会回到生成器函数中执行代码,意味着任务之间可以切换,并且是基于上一次运行的结果,也即是生成器会自动保存执行状态

    def task1():
    	while True:
    		yield  # 不会终止函数,且每次返回一个值,并保留当前状态以供下次调用
    		print('task1 start')
    		
    def task2():
    	t = task1()  # 运行生成器函数
    	while True:
    		next(t)  # 利用next方法,每次循环都会去生成器中运行代码
    		print('task2 start')
    
    task2()
    
    import time
    
    # 将注释取消就是协程
    def task1():
    	a = 0
    	for i in range(10000):
    		a +=1
    		# yield
    		
    def task2():
    	# t = task1()
    	a = 0 
    	for i in range(10000):
    		a += 1
    		# next(t)
    		
    start_t = time.time()
    task2()
    print(time.time() - start_t)
    
    # 单线程下串行两个任务,效率反而比线程内并发高,因为并发要切换加保存
    

    2.对于纯计算任务,单线程并发效率比串行还低,所以我们需要用在io操作多的任务中,但是yield生成器方案无法解决阻塞问题,而且如果任务比较多时,代码将非常复杂

    3.greenlet模块

    1.greenlet模块简化了yield复杂的代码结构,实现了单线程多任务并发

    2.但是无论直接使用yield还是greenlet都不能检测IO操作,遇到IO时都会进入阻塞状态,都对纯计算任务而言效率没有提升

    3.通过greenlet模块导入greenlet类,实例化对象之后,需要将切换执行的两个任务通过switch方法放入对方的函数内,来不断实现切换加保存,并通过两个任务中的一个实例化对象使用switch方法来开启协程

    from greenlet import greenlet
    
    def task1():
    	a = 0
    	for i in range(10000):
    		a +=1
    		t2.switch()   # 用于切换加保存
    		
    def task2():
    	a = 0 
    	for i in range(10000):
    		a += 1
    		t1.switch()
    		
    start_t = time.time()
    t1 = greenlet(task1)
    t2 = greenlet(task2)
    t1.switch()   # 开启任意一个任务即可
    print(time.time() - start_t)
    

    三、协程

    1.协程是什么

    1.单线程下的并发,又称为微线程、纤程,英文名Coroutine。是一种用户态的轻量级线程,即协程是由用户程序自身控制调度的

    2.对比操作系统控制线程的切换,用户在单线程内控制协程的切换

    3.详解

    # 1. python的线程属于内核级别的,即由操作系统控制调度(如单线程遇到io或执行时间过长就会被迫交出cpu执行权限,切换其他线程运行)
    # 2. 单线程内开启协程,一旦遇到io,就会从应用程序级别(而非操作系统)控制切换,以此来提升效率(!!!非io操作的切换与效率无关)
    

    4.优点和缺点

    优点:
    # 1. 协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级
    # 2. 单线程内就可以实现并发的效果,最大限度地利用cpu
    
    缺点:
    # 1. 协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程来尽可能提高效率
    # 2. 协程本质是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程
    

    2.gevent模块

    1.gevent简介

    1.由于greenlet只是提供了一种比generator更加便捷的切换方式,当切到一个任务执行时,如果遇到IO,就会原地阻塞,任然实现不了遇到IO自动切换以提高效率的并发效果

    2.任务的代码通常会既有计算操作又有阻塞操作,我们完全可以在执行任务1遇到阻塞时,就利用阻塞的时间去执行任务2,如此,才能提高效率,这里就需要Gevent模块

    3.Gevent 是一个第三方库,可以轻松通过gevent实现并发编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度

    2.gevent如何使用

    1.直接导入gevent类,并通过gevent.spawn实例化一个协程对象,spawn里面第一个参数是目标函数,后面的参数可以是目标函数所需要的参数(位置实参、关键字实参皆可)

    2.必须至少有一个join函数来控制协程的运行,否则将只会执行主线程

    # 执行没有加join函数的代码时不会输出任何消息
    # 这是因为协程任务都是以异步方式提交,所以主线程会继续往下执行,而一旦执行完最后一行主线程也就结束了,
    # 导致了协程任务没有来的及执行,所以这时候必须join来让主线程等待协程任务执行完毕,也就是让主线程保持存活
    # 后续在使用协程时也需要保证主线程一直存活,如果主线程不会结束也就意味着不需要调用join
    

    3.gevent.joinall()函数里面的参数是一个容器,可以将多个协程对象放进去

    4.仍然无法解决阻塞时自动切换的问题

    import gevent, time
    
    # 会一直等到task1执行完,才会执行task2中的代码
    def task1(name):
    	print('%s start'%name)
    	time.sleep(2)   # 模拟阻塞
    	print('%s end'%name)
    	
    def task2():
    	print('task2 start')
    	print('task2 end')
    	
    g1 = gevent.spawn(task1, 'king')
    g2 = gevent.spawn(task2)
    
    # g1.join()
    gevent.joinall([g1, g2])
    print('over')
    

    3.monkey补丁

    1.monkey补丁的原理是把原始的阻塞方法替换为修改后的非阻塞方法,偷梁换柱,来实现IO自动切换
    monkey补丁实现原理(举例):

    # myjson.py文件中
    
    def dump():
        print("一个被替换的 dump函数")
    
    def load():
        print("一个被替换的 load函数")
    
    # test.py 文件中
    
    import myjson
    import json
    
    # 补丁函数示例
    def monkey_pacth_json():
        json.dump = myjson.dump
        json.load = myjson.load
        
    # 打补丁
    monkey_pacth_json()
    
    # 测试是否替换 
    json.dump()
    json.load()
    
    '''
    一个被替换的 dump函数
    一个被替换的 load函数
    '''
    

    2.通过monkey中的patch_all方法可以实现,遇到IO阻塞自动切换任务的并发效果

    3.必须在打补丁后再使用相应的功能,避免忘记,建议写在最上方

    from gevent import monkey  # 通过gevent导入monkey补丁
    monkey.patch_all()  # 打补丁
    import gevent, time
    
    def task1(name):
    	print('%s start'%name)
    	time.sleep(2)   # 模拟阻塞
    	print('%s end'%name)
    	
    def task2():
    	print('task2 start')
    	time.sleep(3)
    	print('task2 end')
    	
    g1 = gevent.spawn(task1, 'king')
    g2 = gevent.spawn(task2)
    
    gevent.joinall([g1, g2])
    print('over')
    

    3.案例

    1.爬虫

    from gevent import monkey
    monkey.patch_all()
    import gevent, requests, time
    
    def get_page(url):
    	print('get %s'%url)
    	response = requests.get(url)
    	if response.status_code == 200:   # 
    		print('%d bytes received from %s'%(len(response.text), url))
    		
    start_time = time.time()
    
    gevent.joinall([
    	gevent.spawn(get_page, 'https://www.python.org/'),
    	gevent.spawn(get_page,'https://github.com/'),
    	gevent.spawn(get_page,'https://baidu.com/'),
    ])
    
    print(time.time() - start_time)
    

    2.TCP通讯

    # 服务器
    from gevent import monkey
    monkey.patch_all()
    
    #如果不想用money.patch_all()打补丁,可以用gevent自带的socket
    # from gevent import socket
    # s=socket.socket()
    
    import gevent
    from socket import *
    
    def server_task(server_ip, port):
        s = socket.socket()
        s.bind((server_ip, port))
        s.listen()
    
        while True:
            c, addr = s.accept()
            g = gevent.spawn(talk_task, c)
    
    def talk_task(c):
        while True:
            msg = c.recv(1024).decode('utf-8')
            c.send((msg.upper()).encode('utf-8'))
    
    server_task()	
    
    # 客户端
    
    from socket import *
    import os
    from threading import Thread, current_thread
    
    c = socket()
    c.connect(('127.0.0.1', 8000))
    
    def client_task():
        while True:
            msg = '%s : %s'%(os.getpid(), current_thread().name)
    
            c.send(msg.encode('utf-8'))
            data = c.recv(1024).decode('utf-8')
            print(data)
    
    for i in range (100):
        t = Thread(target=client_task)
        t.start()
    
  • 相关阅读:
    Angular 学习笔记 ( CDK
    Angular 学习笔记 ( PWA + App Shell )
    Angular 学习笔记 ( CDK
    Angular 学习笔记 ( CDK
    Angular 学习笔记 ( CDK
    zw版【转发·台湾nvp系列Delphi例程】HALCON TestSubsetRegio
    zw版【转发·台湾nvp系列Delphi例程】HALCON ClipRegion
    zw版【转发·台湾nvp系列Delphi例程】HALCON SetMshape
    zw版【转发·台湾nvp系列Delphi例程】HALCON SmoothImage
    zw版【转发·台湾nvp系列Delphi例程】HALCON SelectObj
  • 原文地址:https://www.cnblogs.com/itboy-newking/p/11185358.html
Copyright © 2011-2022 走看看