zoukankan      html  css  js  c++  java
  • 011-Python-进程、线程于协程

    1.进程与线程

    1. 进程:
      一个程序要运行时所需的所有资源的集合;
      一个进程至少需要一个线程,这个线程称为主线程,一个进程里可以包含多个线程;
      cpu 核数越多,代表着你可以真正并发的线程越多2个进程之间的数据是完全独立的,默认情况下相互不能访问;

    2. 线程:
      工作最小单元的是线程,一个应用程序至少有一个线程;多个线程在涉及修改同一个数据时一定要加锁;

    3. 应用场景:
      IO密集型:线程(IO的读写)
      计算密集型:进程(涉及到CPU运算)

    4. GIL,全局解释器锁:
      保证同一个进程中只有一个线程同时被调用;

    2.多线程的格式:

    模块:
    threading # 线程模块
    t = threading.Thread(target=run, args=(1,)) # 创建线程
    t.start() # 启动线程

    2.1将两个线程执行后,发现并不会sleep4秒而执行,而是同时执行2秒结束;

    #导入模块threading
    import threading 
    import time
    
    def run(n):
        time.sleep(2)
        print("这是一个线程", n)
    #定义线程t:threading.Thread(target=函数名, args=(元组,))   # args 后面跟可迭代的列表,元组等
    t = threading.Thread(target=run, args=(1,))
    t2 = threading.Thread(target=run, args=(2,))
    
    #启动线程 t.start()
    t.start()
    t2.start()
    

    2.2循环的实现多线程:

    import threading
    import time
    
    
    def run(n):
        time.sleep(2)
        print("这是一个线程", n)
    
    for i in range(10):
        t = threading.Thread(target=run, args=(i,))
        t.start()
    

    2.3获取一共有多少线程数,以及线程名设置与获取;

    import threading
    import time
    
    
    def run(n):
        time.sleep(2)
        print("这是一个线程", n)
    
    for i in range(10):
        t = threading.Thread(target=run, args=(i,))
        t.start()
        t.setName("t-%s" % i)           # 修改线程名称
        print(t.getName())              # 打印线程名称
    print(threading.active_count())     # 统计一共有多少活跃(没有结束)的线程
    

    2.4通过类的方式调用线程;

    import threading
    import time
    
    
    class abcd(threading.Thread):
        def __init__(self, num):
            super().__init__()        # 继承父类
            self.num = num
    
        def run(self):
            print("运行这个数字:%s" % self.num)
            time.sleep(3)
    
    if __name__ == '__main__':
        t1 = abcd(1)
        t2 = abcd(2)
    
        t1.start()
        t2.start()
    

    2.5主线程等待子线程运行完毕后,在运行主线程;

    import time
    import threading
    
    t_list = []
    
    
    def run(n):
        time.sleep(1)
        print("运行这个数字", n)
    
    for i in range(10):
        t = threading.Thread(target=run, args=(i,))
        t.start()
    
        t_list.append(t)
    
    for t in t_list:
        t.join()
    
    print("====最后这是一个主线程=====")
    

    2.6检测,如果主线程结束了,子线程也跟着结束setDaemon(True);

    import time
    import threading
    
    
    def run(n):
        time.sleep(0.01)
        print("运行这个数字", n)
    
    for i in range(100):
        t = threading.Thread(target=run, args=(i,))
        t.setDaemon(True)           # 设置为守护线程,如果主线程结束,守护线程也会结束;
        t.start()
    
    print("====这是一个主线程=====")
    

    3.线程锁

    1.当涉及到对数据的同时修改操作时,就需要对数据进行加锁;以免数据的错乱;

    2.格式:
    lock = threading.Lock() # 创建锁
    lock.acquire() # 申请使用锁 其他线程等待
    lock.release() # 释放锁 下个进程继续

    3.1实例:

    3.1.1将进程1个进程涉及到修改数据的部分使用锁进行锁定(可用于修改)

    import threading
    import time
    
    v = 10
    lock = threading.Lock()   # 创建锁
    
    
    def task(arg):
        time.sleep(2)
        lock.acquire()     # 申请使用锁 其他线程等待
    
        global v
        v -= 1
        print(v)
        lock.release()      # 释放锁 下个进程继续
    
    for i in range(10):
        t = threading.Thread(target=task, args=(i, ))
        t.start()
    

    3.1.2分批次操作限定进程数量时,限定同时有3个进程对文件读的操作;

    import threading
    import time
    
    v = 10
    lock = threading.BoundedSemaphore(3)   # 创建锁,但同时有3个线程可以突破锁
    
    def task(arg):
    
        lock.acquire()      # 申请使用锁 其他线程等待
        time.sleep(2)
        global v
        v -= 1
        print(v)
        lock.release()      # 释放锁 下个进程继续
    
    for i in range(10):
        t = threading.Thread(target=task, args=(i, ))
        t.start()
    

    3.1.3事件锁,满足某种需求后释放所有的锁;

    import threading
    import time
    
    lock = threading.Event()   # 创建锁,但同时有3个线程可以突破锁
    
    
    def task(arg):
        time.sleep(1)
        lock.wait()             # wait锁住所有的线程等待
        print(arg)
    
    for i in range(10):
        t = threading.Thread(target=task, args=(i, ))
        t.start()
    
    while True:
        v = input(">>>")
        if v == "1":
            lock.set()          # 释放锁,将所有线程释放
    

    3.1.4事件锁,用户想释放几个释放几个;

    import threading
    import time
    
    lock = threading.Condition()    # 创建锁,控制锁
    
    
    def task(arg):
        time.sleep(1)
        lock.acquire()      # 申请使用锁 其他线程等待
    
        lock.wait()         # wait锁住所有的线程等待
    
        print("线程:", arg)
    
        lock.release()      # 释放锁 下个进程继续
    
    for i in range(10):
        t = threading.Thread(target=task, args=(i,))
        t.start()
    
    while True:
        v = input(">>>")
        lock.acquire()      # 申请使用锁 其他线程等待
        lock.notify(int(v))
        lock.release()        # 释放锁 下个进程继续
    

    4.线程池

    4.1通过线程池的方式,请求URL返回的状态;

    from concurrent.futures import ThreadPoolExecutor
    import time
    import requests
    
    
    def task(arg):
        data = requests.get(arg)
        print("返回结果:", arg, data.status_code)
    
    pool = ThreadPoolExecutor(2)     # 创建连接池 大小为 2
    url_list = [
        "http://www.baidu.com",
        "http://www.jd.com",
        "http://www.taobao.com",
    ]
    
    for i in url_list:
        print("开始请求:", i)
        pool.submit(task, i)         # 去连接池中获取一个连接,并执行
    

    4.2通过回调的方式,将获取的数据传送给txt函数;

    from concurrent.futures import ThreadPoolExecutor
    import requests
    
    
    def txt(a):
        # print(a.result().url, a.result().status_code)
        download_future = a.result()
        print(download_future.url, download_future.status_code)
    
    def download(arg):
        data = requests.get(arg)
        # print("返回结果:", arg, data.status_code)
    
        return data                  # 包含下载的所有内容
    
    pool = ThreadPoolExecutor(2)     # 创建连接池 大小为 2
    
    url_list = [
        "http://www.baidu.com",
        "http://www.jd.com",
        "http://www.taobao.com",
    ]
    
    for i in url_list:
        print("开始请求:", i)
        future = pool.submit(download, i)         # 去连接池中获取一个连接,并执行
        future.add_done_callback(txt)
    

    5.进程

    5.1基本使用

    from multiprocessing import Process      # 导入进程模块
    import time
    
    
    def task(arg):
        time.sleep(1)
        print(arg)
    
    if __name__ == '__main__':              # 进程在windos上面需要使用if __name__ == '__main__'执行,mac和Linux无需
        for i in range(10):
            p = Process(target=task,args=(i,))        # 生成一个进程
            # p.daemon = True                         # 守护进程,如果主进程结束子进程也结束
            p.start()                                 # 启动进程
            # p.join(1)                               # 设置进程最多等待时间
        print("主进程到最后了。。。")
    

    5.2进程池:

    from concurrent.futures import ProcessPoolExecutor
    
    
    def call(arg):
        data = arg.result()
        print(data)
    
    
    def task(arg):
        # print(arg)
        return arg+100
    
    if __name__ == '__main__':
    
        pool = ProcessPoolExecutor(5)   # 创建进程池,大小为 5
    
        for i in range(10):
            obj = pool.submit(task, i)
            obj.add_done_callback(call)
    

    6.进程之间的数据共享

    6.1默认情况下,进程之间的数据是不可以共享的,每个线程向v里面添加数据输出结果并不是追加形式;

    from multiprocessing import Process
    
    
    def task(num, v):
        v.append(num)
        print(v)
    
    
    if __name__ == '__main__':
        v = []
        for i in range(10):
            p = Process(target=task, args=(i, v,))
            p.start()
    

    6.2通过#C语言中的Array进行实现数据共享:

    from multiprocessing import Process, Array
    
    
    def task(i, v):
        v[i] = 1                    # 第一次循环的时候v = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]; v[0] = 1 ; v[1] = 1
        print(list(v))
    
    
    if __name__ == '__main__':
        # v = Array("数据类型", "长度")
        v = Array("i", 10)
        # print(list(v))            # Array的值为 v = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
    
        for i in range(10):
            p = Process(target=task, args=(i, v,))
            p.start()
    

    6.3通过Manager实现进程之间的通信(常用)执行报错可以忽略,由于断开socket连接导致;

    from multiprocessing import Process, Manager
    # Manager是基于socket实现的进程之间的通信
    
    
    def task(num, v):
        v.append(num)         # 每个进程向列表中追加值
        print(v)
    
    
    if __name__ == '__main__':
        obj = Manager()       # 创建了一个对象
        alist = obj.list()    # 创建一个列表,或字典dic = obj.dict()
        print(alist)          # alist 只是一个空列表 []
    
        for i in range(10):
            p = Process(target=task, args=(i, alist,))
            p.start()
    

    7.协程greenlet

    1. 协程永远是一个线程在执行;对线程的一个分片处理;在线程空闲是的时候干点其他的事情;
    2. 协程只有来回切换的功能,其他功能是没有的,需要实现二次加工;
    3. 单独使用协程没有任何意义;
    4. 使用模块greenlet实现协程

    7.1实现协程的二次加工有两种方式:

    1.使用已有模块 gevent模块

    • gevent是在协程的基础上,对IO请求又做了一次封装;

    **2.使用一个线程完成对数据的请求,以及接收(效果返回并不会安装顺序执行) **

    # 根据协程二次开发: 协程 + IO
    from gevent import monkey; monkey.patch_all()       # gevent补丁包,涉及到IO操作的都给改为异步模式 
    import gevent
    import requests
    
    
    def z(url):
        data = requests.get(url)
        print(data.url, data.status_code)
    
    gevent.joinall([
        gevent.spawn(z, "http://www.baidu.com"),
        gevent.spawn(z, "http://www.jd.com"),
        gevent.spawn(z, "http://www.taobao.com"),
    ])
    

    8.IO多路复用(自定义形式的协程)

    • 用于监听多个socket对象,是否有变化(可读,可写,发生错误)

    1.通过同时监听多个端口,实现客户端的同时请求,一个进程完成请求之间的来回调用;基于select实现的“伪”多并发

    # 服务端
    import socket
    import select
    
    s1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  # 基于网络通信,是TCP协议的通讯
    s1.bind(("127.0.0.1", 8080))  # 开启 绑定IP以及端口
    s1.listen(5)  # 监听;进程池的大小5
    
    
    s2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s2.bind(("127.0.0.1.", 8081))
    s2.listen(5)
    
    inputs = [s1, s2]
    while True:
        # IO多路复用
        #  - select,主动循环:内部进行的循环操作(监听的socket个数是有限制的:1024)
        #  - poll,   主动循环:内部进行循环的操作(没有监听个数的限制)
        #  - epoll, 被动告知:通过异步回调(没有监听个数的限制)
        r, w, e = select.select(inputs, [], [], 0.5)
        # 如果有访问 8080 列表r 的值为r = [s2]
        # 如果有访问 8081 列表r 的值为r = [s1]
        # 如果同时访问 8080 ,8081 列表r 的值为r = [s1, s2]
    
        for i in r:
            if i in [s1, s2]:
                # 判断如果i 是服务端的 8080、8081 就是新连接进来了
                print("新连接进来了!")
                conn, addr = i.accept()
                inputs.append(conn)
            else:
                # 否则有连接用户发送消息来了
                print("有数据过来了!")
                try:                        # 捕捉客户端如果突然断开的异常进行 处理
                    data = i.recv(1024)
                except Exception as e:
                    data = ""
                if data:                    # 如果data有数据,将输入返回给客户端
                    i.sendall(data.upper())
                else:                       # 没有数据将 这个socket连接关闭,并移除inputs
                    i.close()
                    inputs.remove(i)
    
    # 客户端
    import socket
    import select
    
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client.connect(("127.0.0.1", 8081,))
    
    
    while True:
        a = input(">>")
        client.sendall(a.encode())
    
        data = client.recv(1024)
        print("来自服务端的数据", data)
    
  • 相关阅读:
    Python多态
    python 元类与定制元类
    Python:类属性,实例属性,私有属性与静态方法,类方法,实例方法
    Python 类 --基础与要点
    服务器错误401
    H5中使用Web Storage来存储结构化数据
    Web Storage
    定义表单控件的id和name注意点
    Annotation(注解)代替配置文件
    jQuery 特殊选择器this
  • 原文地址:https://www.cnblogs.com/baolin2200/p/6610813.html
Copyright © 2011-2022 走看看