zoukankan      html  css  js  c++  java
  • Python并发式编程

    进程

    进程就是一个程序在一个数据集上的一次动态执行过程,一般由程序、数据集和进程控制块三部分组成;

    程序用来描述进程要完成哪些功能以及如何完成;

    数据集则是程序在执行过程中所需要使用的资源;

    进程控制块用来记录进程的外部特征,描述进程的执行变化过程,系统可以利用它来控制和管理进程,它是系
    统感知进程存在的唯一标志.

    在一个程序的运行过程中,因为一个CPU在一个时间点只能运行一个程序,不过为了保持并发的效果,程序必须不断地切换,使得每一个程序在一定时间内都能被执行一次.这样进程就出现了.

    线程

    一个整体的程序运行可能要被分成许多不同的进程,但是进程之间却无法进行通信,因此为了降低上下文切换的消耗,提高系统的并发性,并突破一个进程只能干一件事的缺陷,线程就诞生了.

    线程也叫轻量级进程,它是一个基本的CPU执行单元,也是程序执行过程中的最小单元,它没有自己独立的内存单位,而是和多个线程共享一个内存单位.其本质是由线程ID、程序计数器、寄存器集合和堆栈共同组成.线程的引入减小了程序并发执行时的开销,提高了操作系统的并发性能.线程没有自己的系统资源.

    进程是最小的资源单位,线程是最小的执行单位

    GIL(Global Interpreter Lock)

    在Python里面,不管有多少CPU和过程,Python都只会默认的让一个CPU按顺序地一个一个地执行进程.

    线程的调用方式

    直接调用

    import threading, time  ##调用threading模块
    
    def test(num):   ##创建一个函数
    	print("It's %s" %num)
    	time.sleep(3)
    	print("%s end" %num)
    
    if __name__ == "__main__":
    	t1 = threading.Thread(target = test, args = (1,))  ##调用Thread方法,并传入函数和值
    	t2 = threading.Thread(target = test, args = (2,))
    
    	t1.start()  ##让线程开始运作
    	t2.start()
    
    	
    	print("End")
        
       
    It's 1
    It's 2
    End
    1 end
    2 end
    

    继承调用

    import threading, time
    
    class MyTread(threading.Thread):
    	def __init__(self, num):
    		threading.Thread.__init__(self)
    		self.num = num
    
    	def run(self):
    		print("我打印的数值是%s" %self.num)
    		time.sleep(3)
    		print("%s打印完了" %self.getName())
    
    if __name__ == "__main__":
    	s1 = MyTread(1)
    	s2 = MyTread(2)
    
    	s1.start()
    	s2.start()
    	print("End……")
        
    我打印的数值是1
    我打印的数值是2
    End……
    ##(等了3秒)
    Thread-1打印完了
    Thread-2打印完了
    

    join&Daemon方法

    在子线程完成运行之前,这个子线程的父进程将一直被阻塞

    import threading, time
    
    class ListenMusic(threading.Thread):
    	def __init__(self, name):
    		threading.Thread.__init__(self)
    		self.name = name
    
    	def run(self):
    		print("%s is listening music! %s" %(self.name,time.ctime()))
    		time.sleep(4)
    		print("OK! %s" %time.ctime())
    
    class DoHomework(threading.Thread):
    	def __init__(self, name):
    		threading.Thread.__init__(self)
    		self.name = name
    
    	def run(self):
    		print("%s is doing homework! %s" %(self.name, time.ctime()))
    		time.sleep(2)
    		print("Homework has been done! %s" %time.ctime())
    
    if __name__ == "__main__":
    	s1 = ListenMusic("Hermaeus")
    	s2 = DoHomework("YuanMing")
    
    	s1.start()
    	s2.start()
    	# s1.join()
    	s2.join()
    	print("End…… %s" %time.ctime())
        
    Hermaeus is listening music! Wed Aug 29 17:49:54 2018
    YuanMing is doing homework! Wed Aug 29 17:49:54 2018
    Homework has been done! Wed Aug 29 17:49:56 2018
    End…… Wed Aug 29 17:49:56 2018
    OK! Wed Aug 29 17:49:58 2018
    

    Daemon(True)

    将线程申明为守护线程,只要主线程执行完退出那么守护线程也会退出.

    import threading, time
    
    class ListenMusic(threading.Thread):
    	def __init__(self, name):
    		threading.Thread.__init__(self)
    		self.name = name
    
    	def run(self):
    		print("%s is listening music! %s" %(self.name,time.ctime()))
    		time.sleep(4)
    		print("OK! %s" %time.ctime())
    
    class DoHomework(threading.Thread):
    	def __init__(self, name):
    		threading.Thread.__init__(self)
    		self.name = name
    
    	def run(self):
    		print("%s is doing homework! %s" %(self.name, time.ctime()))
    		time.sleep(2)
    		print("Homework has been done! %s" %time.ctime())
    
    if __name__ == "__main__":
    	s1 = ListenMusic("Hermaeus")
    	s2 = DoHomework("YuanMing")
    
    	s1.setDaemon(True)  ##这个必须放在start()方法前面
    
    	s1.start()
    	s2.start()
    
    	print("End…… %s" %time.ctime())
        
        
    Hermaeus is listening music! Wed Aug 29 18:32:52 2018
    YuanMing is doing homework! Wed Aug 29 18:32:52 2018
    End…… Wed Aug 29 18:32:52 2018
    Homework has been done! Wed Aug 29 18:32:54 2018
    

    同步锁

    执行下面代码,会每得到不同的值,那是因为在执行IO操作时,有的线程没有了原来的数据,又有的线程拿到了被执行过后的数据,导致数据之间混乱.

    import time
    import threading
    
    def addNum():
        global num 
        temp=num
        time.sleep(0.0012)
        num =temp-1
    
    num = 100  
    
    for i in range(100):
        t = threading.Thread(target=addNum)
        t.start()
    
    print('final num:', num )
    

    我们可以使用同步锁来使得数据之间能同步,

    import time, threading
    
    def test():
    
    	global num
    	lock.acquire() ##用acquire和release锁住
    	temp = num
    	time.sleep(0.00001)
    	num = temp - 1
    	lock.release()
    
    num = 100
    lock = threading.Lock()  ##实例化一个同步锁
    l = []
    
    for i in range(100):
    	s = threading.Thread(target = test)
    	s.start()
    	l.append(s)
    
    for ready in l:
    	ready.join()  ##必须要join每一个线程
    
    print(num)
    

    死锁

    在多个线程里面,几个线程占有一定资源,并且等待对方释放资源以使用,这样就会导致死锁现象

    import threading,time
    
    class myThread(threading.Thread):
        def doA(self):
            lockA.acquire()
            print(self.name,"gotlockA",time.ctime())
            time.sleep(3)
            lockB.acquire()
            print(self.name,"gotlockB",time.ctime())
            lockB.release()
            lockA.release()
    
        def doB(self):
            lockB.acquire()
            print(self.name,"gotlockB",time.ctime())
            time.sleep(2)
            lockA.acquire()
            print(self.name,"gotlockA",time.ctime())
            lockA.release()
            lockB.release()
    
        def run(self):
            self.doA()
            self.doB()
    if __name__=="__main__":
    
        lockA=threading.Lock()
        lockB=threading.Lock()
        threads=[]
        for i in range(5):
            threads.append(myThread())
        for t in threads:
            t.start()
        for t in threads:
            t.join()
            
    Thread-1 gotlockA Wed Aug 29 19:16:53 2018
    Thread-1 gotlockB Wed Aug 29 19:16:56 2018
    Thread-1 gotlockB Wed Aug 29 19:16:56 2018
    Thread-2 gotlockA Wed Aug 29 19:16:56 2018
    ………………………………(会僵死在这里)
    

    递归锁

    为了支持在同一线程中多次请求同一资源,python提供了“可重入锁”:threading.RLock。RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次acquire。直到一个线程所有的acquire都被release,其他的线程才能获得资源。

    import threading,time
    
    class myThread(threading.Thread):
        def doA(self):
            lock.acquire()
            print(self.name,"gotlockA",time.ctime())
            time.sleep(3)
            lock.acquire()
            print(self.name,"gotlockB",time.ctime())
            lock.release()
            lock.release()
    
        def doB(self):
            lock.acquire()
            print(self.name,"gotlockB",time.ctime())
            time.sleep(2)
            lock.acquire()
            print(self.name,"gotlockA",time.ctime())
            lock.release()
            lock.release()
    
        def run(self):
            self.doA()
            self.doB()
    
    if __name__=="__main__":
    	lock = threading.RLock()  ##创建一个递归锁的实例
    	threads = []
    	for i in range(2):
    		threads.append(myThread())
    	for t in threads:
    		t.start()
    	for t in threads:
    		t.join()
    

    同步条件

    同步条件是一个简单的同步对象,一个同步条件就相当于一个中间标志;线程会等待直到标志被设置和清除。

    当标志被设定后,wait方法不会造成任何影响,但是当标志被清除后,wait会阻塞线程

    import threading,time
    class Boss(threading.Thread):
        def run(self):
            print("BOSS:今晚大家都要加班到22:00。")
            print(event.isSet())
            event.set()
            time.sleep(3)
            print("BOSS:<22:00>可以下班了。")
            print(event.isSet())
            event.set()
    class Worker(threading.Thread):
        def run(self):
            event.wait()
            print("Worker:哎……命苦啊!")
            time.sleep(1)
            event.clear()
            event.wait()
            print("Worker:OhYeah!")
    if __name__=="__main__":
        event=threading.Event()
        threads=[]
        for i in range(5):
            threads.append(Worker())
        threads.append(Boss())
        for t in threads:
            t.start()
        for t in threads:
            t.join()
            
    BOSS:今晚大家都要加班到22:00。
    False
    Worker:哎……命苦啊!
    Worker:哎……命苦啊!
    Worker:哎……命苦啊!
    Worker:哎……命苦啊!
    Worker:哎……命苦啊!
    BOSS:<22:00>可以下班了。
    False
    Worker:OhYeah!
    Worker:OhYeah!
    Worker:OhYeah!
    Worker:OhYeah!
    Worker:OhYeah!
    

    信号量(Semaphore)

    信号量用来控制线程并发数,BoundedSemaphore和Semaphore管理一个内置的计数器,调用一次acquire方法时-1,

    调用一次release方法时+1。如果计数器为0时,将会阻塞,直到其他线程release释放。BoundedSemaphore会在每一次调用release方式检查计数器是否超过了计数器的初始值,如果超过了就会抛出一个异常。

    import threading, time
    
    class MyThread(threading.Thread):
    	def run(self):
    		if semaphore.acquire():  ## 判断
    			print("%s  start   %s" %(self.getName, time.ctime))
    			time.sleep(2.5)
    			print("%s  end    %s" %(self.getName, time.ctime))
    			semaphore.release()
    
    
    if __name__ == "__main__":
    	semaphore = threading.Semaphore(5)  ##实例化一个信号量
    	threads = []
    	for i in range(20):
    		threads.append(MyThread())
    
    	for i in threads:
    		i.start()
    
    	for i in threads:
    		i.join()
    

    队列

    这是解决多线程的重要利器,可以让每一个线程都能拿到队列里面的数据。

    import time, threading, queue, random
    
    q_list = queue.Queue()  ##实例化一个队列,里面自定义了一个maxsize为无限大小,可修改
    ##Queue是先进先出,LifoQueue是先进后出,PriorityQueue是优先级别越低越先出来
    def add_num():
        for i in range(10):
            q_list.put(i)    ##put是放入数据
            time.sleep(random.randint(1,2)) 
        print("add_num run over!")
    
    def minus_num():
        while True:
            if not q_list.empty():  ##empty是判断队列是否为空
                print("取得数据 %s" %q_list.get())
                time.sleep(random.randint(2,4))
            else:
                print("minus_num run over!")
                break
    
    s1 = threading.Thread(target=add_num)
    s2 = threading.Thread(target=minus_num)
    s1.start()
    s2.start()
    
    q_list.qsize()  ##返回队列大小
    q_list.full()   ##判读队列是否是满的
    q_list.put()  ##lock为可选参数,默认为1,如果队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元;如果block为0,put方法将引发Full异常。
    q_list.get([block[,timeout]])  ##获取队列。选参数为block,默认为True。如果队列为空且block为True,get()就使调用线程暂停,直至有项目可用;如果队列为空且block为False,队列将引发Empty异常。timeout是等待时间。
    q_list.get_nowait()    ##相当于q_list(False)
    q_list.put_nowait()    ##相当于q_list(item, False)
    q_list.task_done()     ##完成一项工作后,向任务已经完成的队列发送信号
    q_list.join()    ##等队列为空,才执行别的操作
    

    多进程模块

    该模块可以利用multiprocessing.Process对象来创建一个进程,其用法与threading包中的同名类一致,所以multiprocessing的很大一部分与threading使用同一套API,只不过换到了多进程的情景。

    #####构造方法
    Process([group [, target [, name [, args [, kwargs]]]]])  
    #group:线程组,因为没有实现所以引用时提示必须是None
    #target:要执行的函数
    #name:进程名
    #args/kwargs:要传入的参数
    #####实例的方法
    is_alive():#返回进程是否在运行
    join([timeout]):#阻塞进程,直达调用此方法的进程终止或到达指定的timeout
    start():#调用进程
    run():#在继承式调用里面,必须覆盖这个函数
    terminate():#不管任务是否完成,立即停止工作
    #####属性
    daemon:#和线程的setDeamon功能一样  #p.daemon=True
    name:#进程名字
    pid:#进程号
    
    #####实例化
    from multiprocessing import Process
    import time
    def f(name):
    	time.sleep(1)
    	print("Hello %s %s" %(name, time.ctime()))
    
    if __name__ == "__main__":
    	p_list = []
    	for i in range(3):
    		p = Process(target = f, args = ("Hermaeus",))
    		p_list.append(p)
    		p.start()
    
    #####继承式
    from multiprocessing import Process
    import time
    class MyProcess(Process):
    	def __init__(self, name):
    		super(MyProcess, self).__init__()
    		self.name = name
    		
    	def run(self):
    		time.sleep(1)
    		print("Hello %s %s" %(self.name, time.ctime()))
    
    if __name__ == "__main__":
    	p_list = []
    	for i in range(3):
    		p = MyProcess("Hermaeus")
    		p.start()
    		p_list.append(p)
    

    进程间通讯

    Queue

    from multiprocessing import Process, Queue
    
    def test(q,i):  ##要把队列名字和值一起传进去
    	q.put(i)
    
    if __name__ == "__main__":
    	
    	q = Queue()
    	for i in range(3):
    		p = Process(target = test, args = (q,i,))
    		p.start()
    
    	print(q.get())
    	print(q.get())
    	print(q.get())
    

    Pipe

    from multiprocessing import Process, Pipe
    
    def f(conn):
    	conn.send("OK!")
    	response = conn.recv()
    	print("response: %s" %response)
    	conn.close()
    
    if __name__ == "__main__":
    	parent_conn, child_conn = Pipe()
    	p = Process(target = f, args = (child_conn,))
    	p.start()
    	print(parent_conn.recv())
    	parent_conn.send("From parent_conn!")
    	p.join()
    

    Managers

    from multiprocessing import Process, Manager
    
    def test(d,l,n):
        d[n] = n
        l.append(n)
    
    if __name__ == "__main__":
        with Manager() as manager:
            d = manager.dict() 
            l = manager.list()
            p_list = []
            for i in range(10):
                p = Process(target=test, args=(d,l,i,))
                p.start()
                p_list.append(p)
                
            for p in p_list:  ##必须要有join
                p.join()
    
            print(d)
            print(l)
    

    进程同步

    from multiprocessing import Process, Lock
    import time
    
    def test(l,i):
        l.acquire()
        time.sleep(3)
        print("Hi, %s" %i)
        l.release()
    
    if __name__ == "__main__":
        lock = Lock()
        for i in range(3):
            p = Process(target=test, args=(lock, i))
            p.start()
    

    进程池

    from multiprocessing import Process, Pool
    import time, os
    
    def test(i):
        time.sleep(1)
        print("From test %s" %os.getpid())
        return "Hello, %s" %i
    
    def Fd(arg):
        print(arg)
    
    if __name__ == "__main__":
        pool = Pool(2) ##实例化池
        for i in range(10):
            pool.apply(func=test, args=(i,)) ##同步接口
            pool.apply_async(func=test, args=(i,), callback = Fd) ##回调
        pool.close()
        pool.join()
        print("END......")
    

    协程 (微线程)

    yield

    import time
    import queue
    
    def consumer(name):
        print("--->ready to eat baozi...")
        while True:
            new_baozi = yield  ##在此处卡住,知道另外线程发送send
            print("[%s] is eating baozi %s" % (name,new_baozi))
    
    def producer():
    
        r = con.__next__()  ##运行consumer
        r = con2.__next__()
        n = 0
        while 1:
            time.sleep(1)
            print("producer is making baozi %s and %s" %(n,n+1) )
            con.send(n)  ##send,传递资源
            con2.send(n+1)
            n +=2
    
    
    if __name__ == '__main__':
        con = consumer("c1")
        con2 = consumer("c2")
        p = producer()
    

    Greenlet

    from greenlet import greenlet
    
    def quest_foo():
        print("你好!")
        answ_foo.switch()  ##switch()主管切换功能
        print("再见!")
        answ_foo.switch()
    
    def answ_foo():
        print("你好!")
        quest_foo.switch()
        print("再见!")
    
    if __name__ == "__main__":
        quest_foo = greenlet(quest_foo)
        answ_foo = greenlet(answ_foo)
        answ_foo.switch()
    
  • 相关阅读:
    elasticsearch之聚合函数
    elasticsearch之高亮查询
    elasticsearch之查询结果过滤
    elasticsearch之布尔查询
    elasticsearch之分页查询
    elasticsearch 之 排序查询
    elasticsearch 查询 term和match
    WSGI uwsgi wsgiref uWSGI werkzeug
    Django Contenttype组件
    迭代器
  • 原文地址:https://www.cnblogs.com/MingleYuan/p/10732308.html
Copyright © 2011-2022 走看看