zoukankan      html  css  js  c++  java
  • 并发编程

    操作系统

    系统是一个有程序员写出来的软件, 该软件用于控制计算机的硬件,让他们之间进行相互配合

    并发与并行

    并发:伪,比如吃饭时,电话响了,停下吃饭动作去接电话就是并发,相当于可以处理多个任务,但不能同时处理

    并行:真,若是一遍吃饭一遍接电话就是并行,相当于可以处理多个任务,且可以同时进行

    线程与进程

    一个程序至少有一个进程和一个线程,一个CPU一次只能执行一个进程,而一个进程中可以有一个线程,也可以有多个线程,线程是程序执行的最小单位

    1.因为线程是CPU工作的最小单元,创建线程可以利用CPU多核的优势实现并行操作(不适用于Python)

    2.进程与进程之间是数据隔离的(jave/c#)进程是为线程创造工作环境

    3.Python中存在一个GIL锁,因此只能一个进程对应一个线程,就不能利用多核的优势,只能通过多线程的方式(但会造成资源的浪费)

    4.当python使用多线程处理I/O密集型时,可以提高效率,当计算密集型时只能通过多进程

    线程

    线程的创建

    第一种函数式

    def func(arg):
        print(arg)
    
    
    t1 = threading.Thread(target=func, args=(11, ))
    t1.start()
    View Code

    第二种类方式

    class MyThread(threading.Thread):
        def run(self):
            print(self._args)
    
    
    t1 = MyThread(args=(11, ))
    t1.start()
    View Code2

    主线程默认等子线程执行完后再执行

    def func(arg):
        print(arg)
    
    
    t1 = threading.Thread(target=func, args=(11, ))
    t1.start()
    
    t2 = threading.Thread(target=func, args=(22, ))
    t2.start()
    print("主线程")

    setDaemon

    默认值为False,当为True时,主线程不再等子线程,当主线程结束时终止所有的子线程

    def func(arg):
        print(arg)
    
    
    t1 = threading.Thread(target=func, args=(11, ))
    t1.setDaemon(True)
    t1.start()
    
    t2 = threading.Thread(target=func, args=(22, ))
    t2.setDaemon(True)
    t2.start()
    print("主线程")

    join

    开发者可以使用join()来控制主线程等子线程的时间

    def func(arg):
        print(arg)
    
    
    t1 = threading.Thread(target=func, args=(11, ))
    t1.start()
    t1.join(2)
    
    t2 = threading.Thread(target=func, args=(22, ))
    t2.start()
    t2.join(2)
    print("主线程")

    线程名称

    def func(arg):
        t = threading.current_thread()
        name = t.getName()
        print(arg, name)
    
    
    t1 = threading.Thread(target=func, args=(11, ))
    t1.setName("线程一")
    t1.start()
    
    t2 = threading.Thread(target=func, args=(22, ))
    t2.setName("线程二")
    t2.start()
    print("主线程")

    start

    start不是代表开始运行线程,而是向cpu发送信息表明自己准备就绪,可是被CPUdcdu调度了,但是CPU是否会立即调度要取决于CPU的运算

     锁

    1.当线程安全时(列表和字典),锁会在内部让多个线程排队操作

    2.当线程不安全时,此时的排队处理还可以起到线程安全的作用

    3.一次只放一个线程

    lock

    import threading
    
    lst = []
    lock = threading.Lock()
    
    
    def func_lock(args):
        lock.acquire()  # 加锁 
        lst.append(args)
        m = lst[-1]
        print(args, m)
        lock.release()  # 解锁
    
    
    for i in range(10):  # 创建十个线程
        t = threading.Thread(target=func_lock, args=(i,))
        t.start()
    lock

    rlock

    import threading
    import time
    
    lst = []
    lock = threading.RLock()
    
    
    def func_lock(args):
        lock.acquire()  # 加锁
        lock.acquire()  # 再次加锁
        lst.append(args)
        time.sleep(1)
        m = lst[-1]
        print(args, m)
        lock.release()  # 解锁
        lock.release()  # 不会锁死
    
    
    for i in range(10):  # 创建十个线程
        t = threading.Thread(target=func_lock, args=(i,))
        t.start()
    rlock

    信号量

    Semaphore一次放n个

    import threading
    import time
    
    
    lock = threading.BoundedSemaphore(3)
    
    
    def func_lock(args):
        lock.acquire()  # 加锁
        time.sleep(1)
        print(args)
        lock.release()  # 解锁
    
    
    for i in range(10):  # 创建十个线程
        t = threading.Thread(target=func_lock, args=(i,))
        t.start()
    Semphore

    条件

    condition通过一次方法放入指定个数

    import threading
    import time
    
    
    lock = threading.Condition()
    
    
    def func_lock(args):
        lock.acquire()
        lock.wait()  # 加锁
        time.sleep(0.1)
        print("
    "+str(args))
        lock.release()  # 解锁
    
    
    for i in range(10):  # 创建十个线程
        t = threading.Thread(target=func_lock, args=(i,))
        t.start()
    
    while True:
        inp = int(input("放入的数量>>>>"))
        lock.acquire()
        lock.notify(inp)
        lock.release()
    condition
    import threading
    import time
    
    
    lock = threading.Condition()
    
    
    def func_lock():
        input("放入数量>>>>")
        return True
    
    
    def func(args):
        lock.wait_for(func_lock)
        print(args)
        time.sleep(1)
    
    
    for i in range(10):  # 创建十个线程
        t = threading.Thread(target=func, args=(i,))
        t.start()
    2

    事件

    控制放开与禁止,一旦放开就是放开全部线程

    import threading
    import time
    
    
    lock = threading.Event()
    
    
    def func_lock(args):
        lock.wait()  # 加锁
        time.sleep(1)
        print(args)
    
    
    for i in range(10):  # 创建十个线程
        t = threading.Thread(target=func_lock, args=(i,))
        t.start()
    
    input("放行>>>>")
    lock.set()
    
    input("停止放行>>>>")
    lock.clear()
    
    for i in range(10):  # 创建十个线程
        t = threading.Thread(target=func_lock, args=(i,))
        t.start()
    
    input("放行>>>>")
    lock.set()
    Event

    threading.local

    内部自动为每个线程维护一个空间(字典),用于当前存储属于自己的值,以保证线程之间的数据隔离

    import threading
    import time
    
    
    lock = threading.local()
    
    
    def func(name, age):
        # 内部会为当前线程创建一个空间用于储存
        lock.name = name
        lock.age = name
        time.sleep(1)
        print(lock.name, name)
        print(lock.age, age)
    
    
    for i in range(10):  # 创建十个线程
        t = threading.Thread(target=func, args=(i, i+10))
        t.start()
    threading.local
    import time
    import threading
    
    DATA_DICT = {}
    
    
    def func(arg):
        ident = threading.get_ident()  # 获取代表此线程线程的值
        DATA_DICT[ident] = arg  # 线程的值作为字典的key, arg作为values
        time.sleep(1)
        print(DATA_DICT[ident], arg)
    
    
    for i in range(10):
        t = threading.Thread(target=func, args=(i,))
        t.start()
    loacl实现原理
    import time
    import threading
    INFO = {}
    
    
    class Local(object):
    
        def __getattr__(self, item):
            ident = threading.get_ident()
            return INFO[ident][item]
    
        def __setattr__(self, key, value):
            ident = threading.get_ident()
            if ident in INFO:
                INFO[ident][key] = value  # INFO[ident]是字典indent键对应的值,此值是一个字典
            else:
                INFO[ident] = {key: value}
    
    
    obj = Local()
    
    
    def func(arg):
        obj.phone = arg  # 调用对象的 __setattr__方法(“phone”,1)
        time.sleep(2)
        print(obj.phone, arg)
    
    
    for i in range(10):
        t = threading.Thread(target=func, args=(i,))
        t.start()
    类方式实现原理

    线程池

    from concurrent.futures import ThreadPoolExecutor
    import time
    
    
    def task(x, y):
        time.sleep(2)
        print(x, y)
    
    
    # 创建5个线程池
    pool = ThreadPoolExecutor(5)
    
    for i in range(10):
        # 去线程池申请一个线程执行task函数
        pool.submit(task, i, i + 10)
    pool

    生产者消费者模型

    三部件:消费者

          队列:先入先出

          栈:先入后出

        生产者

    import time
    import queue
    import threading
    
    q = queue.Queue()  # 线程安全
    
    
    def producer(id):
        """
        生产者
        :return:
        """
        while True:
            time.sleep(2)
            q.put('包子')
            print('厨师%s 生产了一个包子' % id)
    
    
    for i in range(1, 4):
        t = threading.Thread(target=producer, args=(i,))
        t.start()
    
    
    def consumer(id):
        """
        消费者
        :return:
        """
        while True:
            time.sleep(1)
            v1 = q.get()
            print('顾客 %s 吃了一个包子' % id)
    
    
    for i in range(1, 3):
        t = threading.Thread(target=consumer, args=(i,))
        t.start()
    demo

     进程

    进程创建

    import multiprocessing
    
    
    def func(args):
        print(args)
    
    
    def run():
        for i in range(10):
            t = multiprocessing.Process(target=func, args=(i,))
            t.start()
    
    
    if __name__ == '__main__':
        run()
    function
    class MyProcess(multiprocessing):
        def run(self):
            print("当前进程", multiprocessing.current_process())
    
    
    def run():
        t = MyProcess()
        t.start()
    
    
    if __name__ == '__main__':
        run()
    class

    进程的常用功能

    进程的常用功能与线程类似,只是使用时把线程的方式换为进程就可以

    join 等子进程的最多时间,不填则等完
    deamon 是否等子进程, 默认为False,不等
    name 查看当前进程名字
    multiprocessing.current_process() 查看当前进程名字
    multiprocessing.current_process().ident/pid 查看当前进程id

    进程间的数据共享

    进程之间的数据是不可以共享的,但是为了某些需求,有进程间数据共享的需要时,可以通过下面的方式进行

    Queue

    import multiprocessing
    
    
    def func(args, q):
        q.put(args)
    
    
    if __name__ == '__main__':
        q = multiprocessing.Queue()
        for i in range(10):
            t = multiprocessing.Process(target=func, args=(i, q, ))
            t.start()
        while True:
            v = q.get()
    queue

    Manager

    import multiprocessing
    
    
    def func(args, dic):
        dic[args] = 123
    
    
    if __name__ == '__main__':
        m = multiprocessing.Manager()
        dic = m.dict()
        lst = []
        for i in range(10):
            t = multiprocessing.Process(target=func, args=(i, dic, ))
            t.start()
            lst.append(t)
        while True:
            n = 0
            for el in lst:
                if not el.is_alive():  # 进程是否存活
                    n += 1
            if n == len(lst):
                break
        print(dic)
    manager

    进程锁

    与线程相似,参考线程

    进程池

    import time
    from concurrent.futures import ProcessPoolExecutor
    
    
    def task(arg):
        time.sleep(2)
        print(arg)
    
    
    if __name__ == '__main__':
    
        pool = ProcessPoolExecutor(5)
        for i in range(10):
            pool.submit(task, i)
    pool

    单线程实现并发

    - 协程+IO切换:gevent
    - 基于事件循环的异步非阻塞框架:Twisted
                    

     IO多路复用

     IO多路复用是为了检测多个socket是否已经发生变化,(是否连接成功,是否已经获取数据,可读可写)

    操作系统检测socket是否发生变化

    select:最多1024个socket;循环去检测。
    poll:不限制监听socket个数;循环去检测(水平触发)。
    epoll:不限制监听socket个数;回调方式(边缘触发)。

    python模块检测socket是否发生变化

    import selcet
    select.select 
    select.epoll 

    异步非阻塞

     socket非阻塞

    非阻塞就是不等待,socket是默认阻塞的,当然可是改变它是否阻塞,它的阻塞体现在connect与recv

    client.setblocking(False) # 默认为True
    # 会报BlockingIOError的错误,只要捕获即可。

     异步

    指通知,当执行完之后,自动执行回调函数或者自动执行某些操作(通知)

    from twisted.web.client import getPage, defer
    from twisted.internet import reactor
    
    
    def all_done(arg):
        reactor.stop()
    
    
    def callback(contents):
        print(contents)
    
    
    deferred_list = []
    url_list = ['http://www.bing.com', 'http://www.baidu.com', ]
    for url in url_list:
        deferred = getPage(bytes(url, encoding='utf8'))
        deferred.addCallback(callback)
        deferred_list.append(deferred)
    
    dlist = defer.DeferredList(deferred_list)
    dlist.addBoth(all_done)
    
    reactor.run()
    twisted

    同步阻塞

    对比非阻塞,阻塞当然指的就是等了,等一步完成后才进行下一步,就是严格按顺序来执行

    协程

    协程是程序员创造出来了的,而不是一个真正存在的东西,单纯的协程没有实际的用处,一般都是配合IO操作使用的

    协程就是微线程,是对一个线程进程的分片,使得线程可以在代码块之间可以来回的切换执行,而不是逐行的执行

    协程与IO操作

    from gevent import monkey
    monkey.patch_all() # 以后代码中遇到IO都会自动执行greenlet的switch进行切换
    import requests
    import gevent
    
    
    def get_page1(url):
        ret = requests.get(url)
        print(url,ret.content)
    
    def get_page2(url):
        ret = requests.get(url)
        print(url,ret.content)
    
    def get_page3(url):
        ret = requests.get(url)
        print(url,ret.content)
    
    gevent.joinall([
        gevent.spawn(get_page1, 'https://www.python.org/'), # 协程1
        gevent.spawn(get_page2, 'https://www.yahoo.com/'),  # 协程2
        gevent.spawn(get_page3, 'https://github.com/'),     # 协程3
    ])

     进程,线程,协程的区别

           进程是计算机资源分配的最小单元,主要用来做数据隔离,线程是工作的最小单元,真正进行工作的其实就是线程.一个进程里可以有多个线程,一个应用程序里可以有多个进程,对于其他语言来说几乎用不到进程,他们使用的都是线程,而对于Python来说,对于IO密集型操作使用线程,对于计算密集型操作使用进程.因为python有GIL锁,它的作用是使一个进程中同一时刻只能有一个线程被CPU调度,所以想使用CPU的多核优势只能使用多个进程,而IO操作占用很少的CPU,所以使用多线程

           协程是程序员创造出来的,它本身是不存在的,它是用来可以让程序员可以控制代码的执行顺序,它本身存在没有什么意义,但是一旦它与IO切换放在一起,它的价值就大了,它可以人为的控制使程序遇到IO就切换到别的任务,IO操作回来时继续执行,这样就可使使线程的工作不会停,让线程一直工作,python在使用协程时主要是通过greenlet的模块,使用协程+IO操作时使用的是gevent模块

  • 相关阅读:
    归并排序
    堆排序
    数组数据生成器
    冒泡排序
    快速排序
    希尔排序
    排序接口与抽象类(java)
    Pycharm下HTMLTestRunner不生成测试报告
    抓包工具使用记录
    接口学习笔记
  • 原文地址:https://www.cnblogs.com/jiaqi-666/p/9620853.html
Copyright © 2011-2022 走看看