zoukankan      html  css  js  c++  java
  • 并发编程

    一、线程

    1、线程的俩种调用方式

    1、直接调用

    import threading
    import time
    
    def speak(num):
        print("running on number is %s"%num)
        time.sleep(3)   #让其展现效果(同时打印,不受sleep的影响)
    
    if __name__ == '__main__':
    
        t1 = threading.Thread(target=speak,args=(1,))   #定义了两个线程实例
        t2 = threading.Thread(target=speak, args=(2,))
    
        t1.start()  #启动实例
        t2.start()
    
        print(t1.getName()) #获取线程名
        print(t2.getName())
    View Code

    2、继承式调用

    import threading
    import time
    
    class MyThread(threading.Thread):
        def __init__(self,num):
            threading.Thread.__init__(self) #调用父类
            self.num = num
    
        def run(self):
            print("running on number is %s"%self.num)
            time.sleep(3)
    
    if __name__ == '__main__':
        t1 = MyThread(1)
        t2 = MyThread(2)
    
        t1.start()
        t2.start()
    View Code

    2、threading模块的方法

    1、join()

    import threading
    import time
    
    def speak(num):
        print("running on number is %s"%num)
        time.sleep(3)
        print("speak ending")
    
    def talk(num):
        print("running on number is %s" % num)
        time.sleep(5)
        print("talk ending")
    
    if __name__ == '__main__':
    
        t1 = threading.Thread(target=speak,args=(1,))
        t2 = threading.Thread(target=talk, args=(2,))
    
        t1.start()
        # t1.join() #之后的内容都属于主线程内容,未加载
        t2.start()
    
        t1.join()
    
        print("ending......")
    View Code

    join():在子线程完成运行之前,这个子线程的父线程将一直被阻塞。

    2、setDaemon()

    def speak(num):
        print("running on number is %s"%num)
        time.sleep(3)
        print("speak ending")
    
    def talk(num):
        print("running on number is %s" % num)
        time.sleep(5)
        print("talk ending")
    
    if __name__ == '__main__':
    
        t1 = threading.Thread(target=speak,args=(1,))
        t2 = threading.Thread(target=talk, args=(2,))
    
        t1.setDaemon(True)
        # t2.setDaemon(True)
    
        t1.start()
        t2.start()
    
        print("ending......")
    View Code

    setDaemon():在start()之前设置,将线程设置为守护线程,随着主线程的退出而退出。

    mian线程:当所有的子线程退出时,才退出。

    所有,上述例子,先将t1设置为守护线程时,主线程并没有退出,而是等待t2结束,造成的结果。

    3、其他方法

    # run():  线程被cpu调度后自动执行线程对象的run方法
    # start():启动线程活动。
    # isAlive(): 返回线程是否活动的。
    # getName(): 返回线程名。
    # setName(): 设置线程名。
    
    threading模块提供的一些方法:
    # threading.currentThread(): 返回当前的线程变量。
    # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
    # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。

    3、同步锁

    import threading
    import time
    
    def Add():
        global num
        # num += 1
    
        temp = num  #增加计算过程
        time.sleep(5)
        # time.sleep(1)
        # time.sleep(0.1)
        num = temp + 1
    
    num = 0
    
    thread_list = []
    for i in range(100):
        t = threading.Thread(target=Add)
        t.start()
        thread_list.append(t)
    
    for t in thread_list:
        t.join()
    
    print("final num:",num)
    操作公共资源

    利用多线程操作公共资源时,不要进行IO操作或者sleep,有可能会造成意想不到的结果。(第100个线程拿到的num也是等于0)

    同步锁

    R = threading.Lock()    #定义在全局内
    
    def Add()
       global num R.acquire() temp
    = num time.sleep(0.1) num = temp + 1 R.release()

    4、线程死锁和递归锁

    在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁,因为系统判断这部分资源都正在使用,所有这两个线程在无外力作用下将一直等待下去。

    import threading
    import time
    
    class myThread(threading.Thread):
        def fork(self):
            lockA.acquire()         #只有一个线程能够进来,其他需要等待释放    当第二个线程进入到这里
            print(self.name,"I need fork")
            time.sleep(2)
            lockB.acquire()
            print(self.name,"I need knife")
            lockB.release()         #释放锁
            lockA.release()
    
        def knife(self):
            lockB.acquire()         #当第一个线程执行到这里时
            print(self.name,"I need knife")
            time.sleep(3)
            lockA.acquire()
            print(self.name,"I need fork")
            lockA.release()
            lockB.release()
    
        def run(self):
            self.fork()
            self.knife()
    
    if __name__=="__main__":
    
        lockA=threading.Lock()
        lockB=threading.Lock()
    
        thread_list=[]
        for i in range(5):
            thread_list.append(myThread())
        for t in thread_list:
            t.start()
        for t in thread_list:
            t.join()
    死锁
    mport threading
    import time
    
    class myThread(threading.Thread):
        def fork(self):
            r_lock.acquire()         #只有一个线程能够进来,其他需要等待释放    当第二个线程进入到这里
            print(self.name,"I need fork")
            time.sleep(2)
            r_lock.acquire()
            print(self.name,"I need knife")
            r_lock.release()         #释放锁
            r_lock.release()
    
        def knife(self):
            r_lock.acquire()         #当第一个线程执行到这里时
            print(self.name,"I need knife")
            time.sleep(3)
            r_lock.acquire()
            print(self.name,"I need fork")
            r_lock.release()
            r_lock.release()
    
        def run(self):
            self.fork()
            self.knife()
    
    if __name__=="__main__":
    
        r_lock = threading.RLock()    #采用递归锁
    
        thread_list=[]
        for i in range(5):
            thread_list.append(myThread())
        for t in thread_list:
            t.start()
        for t in thread_list:
            t.join()
    递归锁

    为了支持在同一线程中多次请求同一资源,python提供了“可重入锁”:threading.RLock。RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次acquire。直到一个线程所有的acquire都被release,其他的线程才能获得资源。

    count = 0,获得一把锁,count+1,当count > 0时,其他线程进不了

    5、同步对象

    An event is a simple synchronization object;the event represents an internal flag,
    
    and threads can wait for the flag to be set, or set or clear the flag themselves.
    
    
    event = threading.Event()
    
    # a client thread can wait for the flag to be set
    event.wait()
    
    # a server thread can set or reset it
    event.set()
    event.clear()
    
    
    If the flag is set, the wait method doesn’t do anything.
    If the flag is cleared, wait will block until it becomes set again.
    Any number of threads may wait for the same event.
    import threading
    import time
    
    class Boss(threading.Thread):
        def run(self):
            print("BOSS:今晚大家都要加班到22:00。")
            print(event.isSet())
            event.set()         #设置后,wait状态才能执行
            time.sleep(5)
            print("BOSS:<22:00>可以下班了。")
            print(event.isSet())
            event.set()
    
    class Worker(threading.Thread):
        def run(self):
            event.wait()
            print("Worker:哎……命苦啊!")
            time.sleep(1)
            event.clear()           #清除设置
            event.wait()
            print("Worker:OhYeah!")
    
    if __name__=="__main__":
        event=threading.Event() #创建同步对象
    
        thread_list = []
        for i in range(5):
            thread_list.append(Worker())
        thread_list.append(Boss())
    
        for t in thread_list:
            t.start()
        for t in thread_list:
            t.join()
    同步对象

    set():进行设置,设置之后,wait才能执行,否则一直等待

    wait():等待状态,等待设置

    clear():清除设置

    6、信号量

    信号量用来控制线程并发数的,BoundedSemaphore或Semaphore管理一个内置的计数 器,每当调用acquire()时-1,调用release()时+1。

    计数器不能小于0,当计数器为 0时,acquire()将阻塞线程至同步锁定状态,直到其他线程调用release()。(类似于停车位的概念)

    BoundedSemaphore与Semaphore的唯一区别在于前者将在调用release()时检查计数 器的值是否超过了计数器的初始值,如果超过了将抛出一个异常。

    import threading,time
    class myThread(threading.Thread):
        def run(self):
            if semaphore.acquire():
                print(self.name)
                time.sleep(5)
                semaphore.release()
    
    if __name__=="__main__":
        semaphore=threading.Semaphore(5)    #设置最大并发数
    
        thread_list = []
        for i in range(100):
            thread_list.append(myThread())
        for t in thread_list:
            t.start()
    View Code

    7、队列

     queue的方法

    创建一个“队列”对象
    import Queue
    q = Queue.Queue(maxsize = 10)
    Queue.Queue类即是一个队列的同步实现。队列长度可为无限或者有限。可通过Queue的构造函数的可选参数maxsize来设定队列长度。如果maxsize小于1就表示队列长度无限。
    
    将一个值放入队列中
    q.put(10)
    调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为
    1。如果队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。如果block为0,put方法将引发Full异常。
    
    将一个值从队列中取出
    q.get()
    调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。如果队列为空且block为True,
    get()就使调用线程暂停,直至有项目可用。如果队列为空且block为False,队列将引发Empty异常。
    
    Python Queue模块有三种队列及构造函数:
    1、Python Queue模块的FIFO队列先进先出。   class queue.Queue(maxsize)
    2、LIFO类似于堆,即先进后出。               class queue.LifoQueue(maxsize)
    3、还有一种是优先级队列级别越低越先出来。        class queue.PriorityQueue(maxsize)
    
    此包中的常用方法(q = Queue.Queue()):
    q.qsize() 返回队列的大小
    q.empty() 如果队列为空,返回True,反之False
    q.full() 如果队列满了,返回True,反之False
    q.full 与 maxsize 大小对应
    q.get([block[, timeout]]) 获取队列,timeout等待时间
    q.get_nowait() 相当q.get(False)
    非阻塞 q.put(item) 写入队列,timeout等待时间
    q.put_nowait(item) 相当q.put(item, False)
    q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号
    q.join() 实际上意味着等到队列为空,再执行别的操作
    View Code

    other mode

    import queue
    
    #先进后出
    
    q=queue.LifoQueue()
    
    q.put(34)
    q.put(56)
    q.put(12)
    
    #优先级
    # q=queue.PriorityQueue()
    # q.put([5,100])
    # q.put([7,200])
    # q.put([3,"hello"])
    # q.put([4,{"name":"alex"}])
    
    while 1:
    
      data=q.get()
      print(data)
    View Code

    二、线程与进程

    1、Threads share the address space of the process that created it; processes have their own address space.
    2、Threads have direct access to the data segment of its process; processes have their own copy of the data segment of the parent process.
    3、Threads can directly communicate with other threads of its process; processes must use interprocess communication to communicate with sibling processes.
    4、New threads are easily created; new processes require duplication of the parent process.
    5、Threads can exercise considerable control over threads of the same process; processes can only exercise control over child processes.
    6、hanges to the main thread (cancellation, priority change, etc.) may affect the behavior of the other threads of the process; changes to the parent process 
    does not affect child processes. 1、一个程序至少有一个进程,一个进程至少有一个线程.(进程可以理解成线程的容器) 2、进程在执行过程中拥有独立的内存单元,而多个线程共享内存,从而极大地提高了程序的运行效率。 3、线程在执行过程中与进程还是有区别的。每个独立的线程有一个程序运行的入口、顺序执行序列和 4、程序的出口。但是线程不能够独立执行,必须依存在应用程序中,由应用程序提供多个线程执行控制。 进程是具有一定独立功能的程序关于某个数据集合上的一次运行活动,进程是系统进行资源分配和调度的一个独立单位. 线程是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位.线程自己基本上不拥有系统资源,
    只拥有一点在运行中必不可少的资源(如程序计数器,一组寄存器和栈)但是它可与同属一个进程的其他的线程共享进程所拥有的全部资源. 一个线程可以创建和撤销另一个线程;同一个进程中的多个线程之间可以并发执行.

    三、进程

    1、进程的调用

    调用方式1

    from multiprocessing import Process
    import time
    
    def speak(name):
        time.sleep(1)
        print("Hello",name)
    
    if __name__ == '__main__':
        p_list = []
    
        for i in range(5):
            p = Process(target=speak,args=("lilong",))            #创建进程对象
            p_list.append(p)
            p.start()
    
        for p in p_list:
            p.join()
    
        print("ending...")
    View Code

    调用方式2

    from multiprocessing import Process
    import time
    
    class MyProcess(Process):
        def __init__(self,name):
            super(MyProcess,self).__init__()          #继承父类构造方法
            self.name = name
    
        def run(self):          #必须重写该方法
            time.sleep(1)
            print("Hello",self.name)
    
    if __name__ == '__main__':
        p_list = []
    
        for i in range(5):
            p = MyProcess("lilong")         #实例化
            p_list.append(p)
            p.start()
    
        for p in p_list:
            p.join()
    
        print("ending...")
    View Code

    2、process类

    构造方法:

      Process([group [, target [, name [, args [, kwargs]]]]])

      group: 线程组,目前还没有实现,库引用中提示必须是None; 
      target: 要执行的方法; 
      name: 进程名; 
      args/kwargs: 要传入方法的参数。

    实例方法:

      is_alive():返回进程是否在运行。

      join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。

      start():进程准备就绪,等待CPU调度

      run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。

      terminate():不管任务是否完成,立即停止工作进程

    属性:

      daemon:和线程的setDeamon功能一样

      name:进程名字

      pid:进程号

    3、进程间通信

    1、进程队列

    from multiprocessing import Process, Queue
    
    def foo(q,n):
        q.put(n**n)   #多个数据类型,用list
        print("son process")
    
    if __name__ == '__main__':
        q = Queue()         #创建进程队列
        print("main process")
    
        for i in range(3):
            p = Process(target=foo, args=(q,i))
            p.start()
    
        print(q.get())
        print(q.get())
        print(q.get())
    View Code

    2、管道

    from multiprocessing import Process, Pipe
    
    def foo(conn):
        conn.send("Hello parent")   #多个数据类型,用list
        response=conn.recv()
        print("parent:",response)
        conn.close()
        # print("son_ID2:",id(conn))
    
    if __name__ == '__main__':
    
        parent_conn, child_conn = Pipe()    #双向管道
        # print("parent_ID:",id(child_conn))
        # print("son_ID:", id(parent_conn))
    
        p = Process(target=foo, args=(child_conn,))
        p.start()
    
        print("son:",parent_conn.recv())
        parent_conn.send("Hello son")
        p.join()
    
        print("ending...")
    View Code

    3、Managers

    Queue和Pipe只是实现了数据交互,并没实现数据共享,即一个进程去更改另一个进程的数据。

    A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

    A manager returned by Manager() will support types listdictNamespaceLockRLockSemaphoreBoundedSemaphoreConditionEventBarrierQueueValue and Array.

    from multiprocessing import Process, Manager
    
    def foo(d, l,n):
        d[n] = '1'
        d['2'] = 2
        d[0.25] = None
        l.append(n)
        # print(l)
    
        # print("son process:",id(d),id(l))
    
    if __name__ == '__main__':
    
        with Manager() as manager:
    
            d = manager.dict()    #创建dict
            l = manager.list(range(5))    #创建list
    
            # print("main process:",id(d),id(l))
    
            p_list = []
    
            for i in range(10):
                p = Process(target=foo, args=(d,l,i))
                p.start()
                p_list.append(p)
    
            for res in p_list:
                res.join()
    
            print(d)
            print(l)
    View Code

    4、进程同步

    from multiprocessing import Process, Lock
    
    def foo(l, i):
    
        l.acquire()
        print(i)
        l.release()
    
    if __name__ == '__main__':
        lock = Lock()
    
        for num in range(10):
            Process(target=foo, args=(lock, num)).start()
    View Code

    5、进程池

    进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

    进程池中有两个方法:

    • apply
    • apply_async
    from  multiprocessing import Process,Pool
    import time,os
    
    def foo(i):
        time.sleep(1)
        print(i)
        return i+100
    
    def bar(arg):
    
        # print(os.getpid())
        # print(os.getppid())
        print('logger:',arg)
    
    
    if __name__ == '__main__':
    
        pool = Pool(5)  #创建进程池(同时并发)
    
        bar(1)
        print("----------------")
    
        for i in range(10):
            pool.apply(func=foo, args=(i,))         #阻塞,等待当前子进程执行完毕后,在执行下一个进程
            # pool.apply_async(func=foo, args=(i,))
            # pool.apply_async(func=foo, args=(i,),callback=bar)  #foo的返回值作为参数传给bar
    
        pool.close()
        pool.join()
        print("ending...")
    View Code

    四、上下文管理器

    1、如何使用上下文管理器

    如何打开一个文件,并写入"hello world"

    1 filename="my.txt"
    2 mode="w"
    3 f=open(filename,mode)
    4 f.write("hello world")
    5 f.close()

    当发生异常时(如磁盘写满),就没有机会执行第5行。当然,我们可以采用try-finally语句块进行包装:

    1 writer=open(filename,mode)
    2 try:
    3     writer.write("hello world")
    4 finally:
    5     writer.close()

    当我们进行复杂的操作时,try-finally语句就会变得丑陋,采用with语句重写:

    1 with open(filename,mode) as writer:
    2     writer.write("hello world")

    as指代了从open()函数返回的内容,并把它赋给了新值。with完成了try-finally的任务。

    2、自定义上下文管理器

    with语句的作用类似于try-finally,提供一种上下文机制。要应用with语句的类,其内部必须提供两个内置函数__enter__和__exit__。前者在主体代码执行前执行,后者在主体代码执行后执行。as后面的变量,是在__enter__函数中返回的。

    3、contextlib模块

    4、contextlib.nested:减少嵌套

    5、contextlib.closing()

    五、协程

    协程,又称微线程,纤程。英文名Coroutine。

    优点1: 协程极高的执行效率。因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。

    优点2: 不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。

    因为协程是一个线程执行,那怎么利用多核CPU呢?最简单的方法是多进程+协程,既充分利用多核,又充分发挥协程的高效率,可获得极高的性能。

    1、yield的简单实现

    import time
    
    def consumer(name):
        print("--->ready to eat baozi...")
        while True:
            new_baozi = yield   #暂停
            print("[%s] is eating baozi %s" % (name,new_baozi))
            #time.sleep(1)
    
    def producer():
    
        r = con1.__next__() #进入consumer
        r = con2.__next__()
        n = 1
        while 1:
            time.sleep(1)
            print("33[32;1m[producer]33[0m is making baozi %s and %s" %(n,n+1) )
            con1.send(n)
            con2.send(n+1)
    
            n += 2
    
    
    if __name__ == '__main__':
        con1 = consumer("c1")
        con2 = consumer("c2")
        producer()
    View Code

    2、Greenlet

    greenlet是一个用C实现的协程模块,相比与python自带的yield,它可以使你在任意函数之间随意切换,而不需把这个函数先声明为generator

    from greenlet import greenlet
    
    def test1():
        print(12)
        gr2.switch()
        print(34)
        gr2.switch()
    
    
    def test2():
        print(56)
        gr1.switch()
        print(78)
    
    
    gr1 = greenlet(test1)
    gr2 = greenlet(test2)
    gr1.switch()
    View Code

    3、Gevent

    import gevent
    import requests,time
    
    
    start=time.time()
    
    def f(url):
        print('GET: %s' % url)
        resp =requests.get(url)
        data = resp.text
        print('%d bytes received from %s.' % (len(data), url))
    
    gevent.joinall([
    
            gevent.spawn(f, 'https://www.python.org/'),
            gevent.spawn(f, 'https://www.yahoo.com/'),
            gevent.spawn(f, 'https://www.baidu.com/'),
            gevent.spawn(f, 'https://www.sina.com.cn/'),
    
    ])
    
    
    print("cost time:",time.time()-start)
    View Code

    本文参考:https://www.cnblogs.com/yuanchenqi/articles/6248025.html

  • 相关阅读:
    c++中string类中的函数
    二进制
    快速幂
    substring
    hdu 4678
    扩展欧几里得算法
    欧几里得算法
    Floyd_Warshall(任意两点之间的最短路)
    带结构体的优先队列
    php获得远程信息到本地使用的3个函数:file_get_contents和curl函数和stream_get_contents
  • 原文地址:https://www.cnblogs.com/lilong74/p/11294586.html
Copyright © 2011-2022 走看看