zoukankan      html  css  js  c++  java
  • 190428多线进程编程

    一、多线程编程

    • 示例
    import threading
    import time
    
    def Say_Hi(num):
        print("hello %s" %num)
        time.sleep(3)
        print("hello2 %s" %num)
    
    if __name__ == '__main__':
        t1 = threading.Thread(target=Say_Hi, args=(111,))  #创建一个线程对象
        t1.start()
    
        t2 = threading.Thread(target=Say_Hi, args=(222,))  #创建一个线程对象
        t2.start()
    
        print("ending...")
    

    1、join方法

    import threading
    import time
    
    def Listen_Music(name):
        print("%s begin to listen %s" %(name,time.ctime()))
        time.sleep(3)
        print("%s end to listen %s" %(name,time.ctime()))
    
    def Play_Game(name):
        print("%s begin to play game %s" %(name,time.ctime()))
        time.sleep(5)
        print("%s end to play game %s" %(name,time.ctime()))
    
    if __name__ == '__main__':
        music = threading.Thread(target=Listen_Music,args=("dongfei",))
        game = threading.Thread(target=Play_Game,args=("dongfei",))
    
        music.start()
        music.join()  #将等待music执行完成后再继续下边代码
        game.start()
    
        # music.join()
        # game.join()
    
        print("ending....")
    

    2、setDaemon守护线程

    import threading
    import time
    
    def Listen_Music(name):
        print("%s begin to listen %s" %(name,time.ctime()))
        time.sleep(3)
        print("%s end to listen %s" %(name,time.ctime()))
    
    def Play_Game(name):
        print("%s begin to play game %s" %(name,time.ctime()))
        time.sleep(5)
        print("%s end to play game %s" %(name,time.ctime()))
    
    music = threading.Thread(target=Listen_Music,args=("dongfei",))
    game = threading.Thread(target=Play_Game,args=("dongfei",))
    
    l = []
    l.append(music)
    l.append(game)
    
    if __name__ == '__main__':
    
        for i in l:
            i.setDaemon(True)  #设置为守护线程,主线程结束就退出
            i.start()
    
        print("ending....")
    

    3、线程对象和threading模块的其他方法

    • run()

    • start() 将线程处于就绪状态

    • getName() 获取线程名

    • isAlive() 判断线程是否活动

    • setName() 设置线程名

    • threading.currentThread() 返回当前的线程变量

    • threading.enumerate() 返回一个正在运行的线程列表

    • threading.activeCount() 统计正在运行的线程数

    4、线程锁(同步锁,递归锁)

    • GIL:全局解释器锁,同一时刻,只有一个线程在执行

      • IO密集型任务:建议多线程执行
      • CPU密集型任务:建议多进程 + 协成执行,由于GIL锁的原因,不推荐使用线程
    • 同步锁(互斥锁)

    import threading
    import time
    lock = threading.Lock()  #创建锁
    
    def sub():
        global num
        # num -= 1
        lock.acquire()  #加锁
        temp = num
        time.sleep(0.0001)  #在此中间的过程只有一个线程执行
        num = temp - 1
        lock.release()  #释放锁
    
    num = 100
    
    l = []
    for i in range(100):
        t = threading.Thread(target=sub)
        t.start()
        l.append(t)
    
    for t in l:
        t.join()
    
    print(num)
    
    • 死锁
    import threading
    import time
    
    class MyThread(threading.Thread):
    
        def actionA(self):
            A.acquire()
            print(self.name, "gotA", time.ctime())
            time.sleep(2)
    
            B.acquire()
            print(self.name, "gotB", time.ctime())
            time.sleep(1)
    
            B.release()
            A.release()
    
        def actionB(self):
            B.acquire()
            print(self.name, "gotB", time.ctime())
            time.sleep(2)
    
            B.acquire()
            print(self.name, "gotA", time.ctime())
            time.sleep(1)
    
            A.release()
            B.release()
    
        def run(self):
            self.actionA()
            self.actionB()
    
    if __name__ == '__main__':
        A = threading.Lock()
        B = threading.Lock()
        L = []
    
        for i in range(5):
            t = MyThread()
            t.start()
            L.append(t)
    
        for i in L:
            i.join()
        print("ending...")
    
    • 递归锁
    import threading
    import time
    
    class MyThread(threading.Thread):
    
        def actionA(self):
            r_lock.acquire()
            print(self.name, "gotA", time.ctime())
            time.sleep(2)
    
            r_lock.acquire()
            print(self.name, "gotB", time.ctime())
            time.sleep(1)
    
            r_lock.release()
            r_lock.release()
    
        def actionB(self):
            r_lock.acquire()
            print(self.name, "gotB", time.ctime())
            time.sleep(2)
    
            r_lock.acquire()
            print(self.name, "gotA", time.ctime())
            time.sleep(1)
    
            r_lock.release()
            r_lock.release()
    
        def run(self):
            self.actionA()
            self.actionB()
    
    if __name__ == '__main__':
        # A = threading.Lock()
        # B = threading.Lock()
        r_lock = threading.RLock()  #递归锁
        L = []
    
        for i in range(5):
            t = MyThread()
            t.start()
            L.append(t)
    
        for i in L:
            i.join()
        print("ending...")
    

    5、同步条件对象(EVENT对象)

    • 设置flag,wait非阻塞;清除flag,wait阻塞
    event = threading.Event()
    event.set()
    print(event.isSet())  #False or Ture
    event.clear()
    event.wait()
    

    6、信号量

    • 限制同时可以开几个线程
    import threading, time
    
    class MyThread(threading.Thread):
        def run(self):
            if semaphore.acquire():  #锁定,每次只能放5个执行
                print(self.name)
                time.sleep(2)
                semaphore.release()  #释放
    
    if __name__ == '__main__':
        semaphore = threading.Semaphore(5)  #同时可以开5个线程
        thrs = []
        for i in range(100):
            thrs.append(MyThread())
        for t in thrs:
            t.start()
    

    7、线程队列(queue)

    import queue
    
    q = queue.Queue(3)  #FIFO,默认先进先出;3表示只能放3个元素
    q2 = queue.LifoQueue()  #LIFO,后进先出
    q3 = queue.PriorityQueue()  #按优先级出
    q3.put([2,"abc"])  #2为优先级,数字越大优先级越低
    
    q.put(111)
    q.put("hello")
    q.put({"name":"dongfei"})
    # q.put("abc",False)  #如果传不进去则报错
    
    while True:
        data = q.get(block=False)  #如果没有值则报错
        print(data)
        print("----")
    
    • 队列模块的常见使用方法
    print(q.qsize())  #队列的当前的值
    print(q.empty())  #队列是否为空
    print(q.full())  #队列是否已满
    q.get_nowait()  # == q.get(Flase)
    q.task_done()  #发信号
    q.join()  #收信号
    
    • 消费者生产者模型
    import time
    import random
    import queue
    import threading
    
    q = queue.Queue()
    
    def Producer(name)  #生产者
        count = 0
        while count < 10:
            print("making...")
            time.sleep(random.randrange(3))
            q.put(count)
            print("Producer %s has produced %s baozi." %(name,count))
            count += 1
            print("ok.")
    
    def Consumer(name):  #消费者
        count = 0
        while count < 10:
            time.sleep(random.randrange(4))
            if not q.empty():
                data = q.get()
                print("Consumer %s has eat %s baozi." %(name,data))
            else:
                print("no baozi anymore!")
            count += 1
    
    p1 = threading.Thread(target=Producer, args=("A",))
    c1 = threading.Thread(target=Consumer, args=("B",))
    
    p1.start()
    c1.start()
    

    二、多进程编程

    • 多进程示例1
    from multiprocessing import Process
    import time
    
    def show_time(name):
        time.sleep(1)
        print(name, time.ctime())
    
    if __name__ == '__main__':
        p_list = []
        for i in range(3):
            p = Process(target=show_time,args=('Tom',))
            p_list.append(p)
            p.start()
        for i in p_list:
            i.join()
        print("end.")
    
    • 多进程示例2
    from multiprocessing import Process
    import time
    
    class MyProcess(Process):
        def run(self):
            time.sleep(1)
            print(self.name, time.ctime())  #self.name 进程名
    
    if __name__ == '__main__':
        p_list = []
        for i in range(3):
            p = MyProcess()
            p_list.append(p)
            p.start()
        for i in p_list:
            i.join()
        print("end.")
    

    1、守护进程

    from multiprocessing import Process
    import time
    
    class MyProcess(Process):
        def run(self):
            time.sleep(3)
            print(self.name, time.ctime())
    
    if __name__ == '__main__':
        p_list = []
        for i in range(3):
            p = MyProcess()
            p_list.append(p)
            p.daemon = True  #设置为守护进程
            p.start()
        # for i in p_list:
        #     i.join()
        print("end.")
    

    2、获取进程号

    from multiprocessing import Process
    import os
    import time
    
    def info(title):
        print("title:", title)
        print("parent process:", os.getppid())
        print("process id:", os.getpid())
    
    if __name__ == '__main__':
        info("main process line")
    
        time.sleep(1)
        print("===========")
        p = Process(target=info, args=("tom",))
        p.start()
        p.join()
    

    3、实例方法和属性

    • 实例方法
    is_alive(): 判断进程是否运行
    join([timeout]): 阻塞当前上下文,直到此实例完成
    start(): 进程就绪,等待CPU调度运行
    run(): start()调用run()方法
    terminate(): 立即停止工作进程
    
    • 属性
    daemon: 同线程的setDeamon功能,设置为守护进程
    name: 进程名
    pid: 进程号
    

    4、进程间通信

    • 进程队列
    from multiprocessing import Process
    from multiprocessing import Queue
    
    def foo(q):
        q.put("dongfei")
    
    if __name__ == '__main__':
    
        q = Queue()  #进程队列
        p = Process(target=foo,args=(q,))
        p.start()
        p.join()
    
        print(q.get())
    
    • 管道
    from multiprocessing import Process, Pipe
    
    def foo(conn):
        conn.send("dad hello")
        response = conn.recv()
        print("father say:", response)
        conn.close()
    
    if __name__ == '__main__':
        parent_conn, child_conn = Pipe()
        p = Process(target=foo, args=(child_conn,))
        p.start()
        print("son say:", parent_conn.recv())
        parent_conn.send("son hello")
        p.join()
    
    • 数据共享
    from multiprocessing import Process, Manager
    
    def foo(d,i):
        d.append(i)
    
    if __name__ == '__main__':
        with Manager() as manager:
            d = manager.list()
            for i in range(10):
                p = Process(target=foo,args=(d,i))
                p.start()
                p.join()
            print(d)
    

    5、进程池

    线程池的作用:比如现在有100个任务,我们可以1个1个的运行,但是这样运行时间太长;我们也可以直接起100个进程去处理,但是这样消耗的资源庞大,反而会更慢;所有我们选择一个折中的方法,每次起4个进程去处理,这个时候就需要用到进程池了

    from multiprocessing import Process, Pool
    import time,os
    
    def foo(i):
        time.sleep(1)
        print(i)
        return i+100
    
    def bar(arg):  #被主进程调用
        print(arg)  #arg是foo函数的返回值
        print("is ok...")
    
    if __name__ == '__main__':
    
        pool = Pool(4)
    
        for i in range(100):
            # pool.apply(func=foo,args=(i,))  #同步
            # pool.apply_async(func=foo,args=(i,))  #异步
            pool.apply_async(func=foo,args=(i,), callback=bar)  #回调函数:foo函数执行成功后,再去执行bar函数
    
        pool.close()  #此处必须先close,再join
        pool.join()
        print("end")
    

    三、协程

    又称微线程,纤程,Coroutine

    概念:协作式,非抢占式

    协程的优势:

    1. 没有切换的消耗
    2. 没有锁的概念
    3. 可以使用多进程 + 协程利用多核工作
    • yield语法
    import time
    
    def consumer(name):
        print("--->ready to eat baozi...")
        while True:
            new_baozi = yield
            print("[%s] is eating baozi %s" % (name,new_baozi))
            time.sleep(1)
    
    def producer():
        r = con.__next__()
        r2 = con2.__next__()
        n = 0
        while True:
            time.sleep(1)
            print("[producer is making baozi %s and %s]" % (n, n+1))
            con.send(n)
            con2.send(n+1)
    
    if __name__ == '__main__':
        con = consumer("c1")
        con2 = consumer("c2")
        p = producer()
    
    • greenlet模块:手动切换
    from greenlet import greenlet
    
    def t1():
        print(12)
        gr2.switch()
        print(34)
        gr2.switch()
    
    def t2():
        print(56)
        gr1.switch()
        print(78)
    
    gr1 = greenlet(t1)
    gr2 = greenlet(t2)
    
    gr2.switch()
    
    • gevent模块:自动切换
    import gevent
    import requests,time
    
    start = time.time()
    
    def f(url):
        print('GET: %s' %url)
        resp = requests.get(url)
        data = resp.text
        f = open('new.html', 'w',encoding="utf8")
        f.write(data)
        print('%d bytes received from %s.' %(len(data), url))
    
    gevent.joinall([
        gevent.spawn(f, 'https://www.python.org'),
        gevent.spawn(f, 'https://www.baidu.com'),
        gevent.spawn(f, 'https://www.aliyun.com'),
    ])
    
    end = time.time()
    print(end - start)
    
  • 相关阅读:
    如何在for循环中使用多线程
    解决ios10以上H5页面手势、双击缩放问题
    select标签默认选项
    vue三级联动
    手动安装composer详细教学
    密码校验:长度6位以上,至少包含一个数字,一个大写字母,一个小写字母,不包含空格
    tp5生成6位不重复验证码
    css漂亮的阴影边框
    圆形进度条css3样式
    jQuery倒计时组件(jquery.downCount.js)
  • 原文地址:https://www.cnblogs.com/L-dongf/p/11068434.html
Copyright © 2011-2022 走看看