zoukankan      html  css  js  c++  java
  • 进程/线程/协程

    概念:

    进程:运行中的程序    有生命周期 关掉程序就销毁了
    进程调度:1)先来先服务算法
         2)短作业优先调度算法
         3)时间片轮转法
         4)多级反馈队列
    并发:多个程序一起运行,轮流交替使用资源,独木桥 a先走一段 然后让给b走
    并行:指两者同时执行,赛跑,两个人同时往前跑
    阻塞:input sleep 文件的输入输出 网络请求 recv accept
    同步:一个任务的完成需要依赖另一个任务,等到被依赖的任务完成后,依赖的任务才算完成 等待的过程什么都不干
    异步:不需要等待被依赖的任务完成,当等待被依赖的任务完成时,立即执行 等待的过程可以干别的事
    同步阻塞:一直等待
    异步阻塞:可以干别的事
    同步非阻塞:来回切换观察
    异步非阻塞:可以干别的事。等待别人通知
    多进程 :同时开启多个运行程序

    进程:

    1.创建进程:

    from multiprocessing import Process
    import os
    
    def func(m,n):
        print(m,n)
        print('获取子进程的pid:', os.getpid())
        print('获取子进程的父进程pid:', os.getppid())
    
    if __name__ == '__main__':
        p = Process(target=func,args=('参数1','参数2'))     #创建一个对象   传递一个参数是需要后面加,
        p.start()                              #启动一个子进程    后面的print也同时执行  所以是异步的
        print('获取此进程的pid:',os.getpid())
        print('获取此进程的父进程pid:', os.getppid())
    方法一
    from multiprocessing import Process
    
    class MyProcess(Process):       #创建一个类,继承Process类
        def __init__(self,args1,args2):
            super().__init__()
            self.args1 = args1
            self.args2 = args2
    
        def run(self):              #必须实现一个run方法,run方法中是在子进程中的代码
            print(self.args1)
            print(self.args2)
    if __name__ == '__main__':
        p = MyProcess(1,2)
        p.start()
    View Code

    2.进程的几个方法:

    join
    from multiprocessing import Process
    import time
    
    def func(n):
        print(n)
        time.sleep(2)
    
    
    if __name__ == '__main__':
        p = Process(target=func,args=(1,))
        p.start()
        p.join()                   #join方法  感知子进程的结束,才执行之后的print
        print('hello')
    View Code

    3.开启多个子进程:

    from multiprocessing import Process
    import time
    
    def func(s):
        print('子进程%s'%s)
        time.sleep(2)
    
    if __name__ == '__main__':
        for i in range(10):
            p = Process(target=func,args=(i,))
            p.start()
    View Code

    >>基于多进程实现scoket实验(可以重复开启多个client)

    server端

    import socket
    from multiprocessing import Process
    def serve(conn):
        ret = '你好'.encode('utf-8')
        conn.send(ret)
        msg = conn.recv(1024).decode('utf-8')
        print(msg)
        conn.close()
    if __name__ == '__main__':
        sk = socket.socket()
        sk.bind(('127.0.0.1',8080))
        sk.listen()
        while True:
            conn,addr = sk.accept()
            p = Process(target=serve,args=(conn,))
            p.start()
    
        sk.close()

    client端 

    import socket
    
    sk=socket.socket()
    
    sk.connect(('127.0.0.1',8080))
    msg = sk.recv(1024).decode('utf-8')
    print(msg)
    msg2= input('>>>').encode('utf-8')
    sk.send(msg2)
    
    
    sk.close()

    4. 多进程之间数据隔离问题

    from  multiprocessing import  Process
    
    n = 100
    def func():
        global n
        n = n-1
        return 111
    
    if __name__ == '__main__':
        n_l = []
        for i in range(100):
            p = Process(target=func)
            p.start()
            n_l.append(p)
        for p in n_l : p.join()
        print(n)
    
    #多进程之间的数据是隔离的不共享的
    View Code

    5.守护进程

    #守护进程随着主进程代码执行结束而结束
    #守护线程会在主线程结束之后等待子线程结束而结束
    from multiprocessing import Process
    import time
    def func():
        while True:
            time.sleep(0.2)
            print('我还活着')
    
    if __name__ == '__main__':
        p = Process(target=func)
        p.daemon = True             #设置子线程为守护进程
        p.start()
        time.sleep(2)
        print('主进程结束')
    #守护进程会随着 主进程的代码执行完毕而结束
    #在主进程结束一个子进程 p.terminate()   结束一个进程并不是执行方法后立即生效,需要一个操作系统效应的过程
    #p.name获取进程的名字
    View Code

    6.进程锁

    lock = Lock() # 创造了一把锁
    lock.acquire() # 获取了这把锁的钥匙
    lock.release() # 归还这把锁的钥匙

    import json
    import time
    from multiprocessing import Process
    from multiprocessing import Lock,Manager
    
    def show(i):
        with open('ticket') as f:
            dic=json.load(f)
        print("余票 %s"%dic['ticket'])
    
    def buy_ticket(i,lock):
        lock.acquire()
        with open('ticket') as f:
            dic = json.load(f)  #文件用load 字符串用loads
            time.sleep(0.1)
        if dic['ticket']>0:
            dic['ticket'] -= 1
            print("买到票了%s"%i)
        else:
            print("票卖完了%s"%i)
        time.sleep(0.1)
        with open('ticket','w') as f:
            json.dump(dic,f)
        lock.release()
    if __name__ == '__main__':
        for i in range(10):
            p = Process(target=show,args=(i,))
            p.start()
        lock = Lock()
        for i in range(10):
            p = Process(target=buy_ticket,args=(i,lock))
            p.start()
    ---------------------------------------------------
    ticket
    
    {"ticket": 1}
    View Code

    7.信号量

    from multiprocessing import Process,Semaphore
    import time
    
    def func(i,sem):
        sem.acquire()
        print('进程%s'%i)
        time.sleep(4)
        sem.release()
    
    if __name__ == '__main__':
        sem = Semaphore(4)     #允许开启四个进程每一次
        for i in range(10):
            p = Process(target=func,args=(i,sem))
            p.start()
    View Code

    8.事件(红绿灯案例)

    '''
    from multiprocessing import Event
    
    e = Event()   #创建一个事件
    print(e.is_set())  #查看一个事件
    e.set()   #更改事件的状态为true
    print(e.is_set())
    e.wait()     #依据e.is_set的值来决定是否阻塞
    print(123)
    e.clear()#将事件的状态更改为false
    
    '''
    
    from multiprocessing import Event,Process
    import time
    import random
    def cars(e,i):
        if not e.is_set():
            print('car %i在等待通行'%i)
            e.wait()    #阻塞 直到一个事件状态改变
    
        else:
            print('car %i可通行' % i)
    def light(e):
        while True:
            if e.is_set():
                print('绿灯亮了')
                time.sleep(2)
                e.clear()
            else:
                print('红灯亮了')
                time.sleep(2)
                e.set()
    
    if __name__ == '__main__':
        e = Event()
        p = Process(target=light,args=(e,))
        p.start()
        for i in range(20):
            car = Process(target=cars,args=(e,i))
            car.start()
            time.sleep(random.random())
    View Code

    9.队列

    from multiprocessing import Queue,Process
    def produce(q):
        q.put('hello')
    
    def consume(q):
        print(q.get())
    
    if __name__ == '__main__':
        q = Queue()
        p = Process(target=produce,args=(q,))
        p.start()
        c = Process(target=consume, args=(q,))
        c.start()
    View Code

    10.管道

    from multiprocessing import Pipe,Process
    
    def func(conn1,conn2):
        conn2.close()
        while True:
            try:
                msg = conn1.recv()
                print(msg)
            except EOFError:
                conn1.close()
                break
    
    
    if __name__ == '__main__':
        conn1, conn2 = Pipe()
        Process(target=func,args=(conn1, conn2)).start()
        conn1.close()
        for i in range(20):
            conn2.send('吃了么')
        conn2.close()
    View Code

    11.进程池

    import os
    from multiprocessing import Pool
    import time
    def func(n):
        print('start func%s'%n,os.getpid())
        time.sleep(1)
        print('end func%s'%n,os.getpid())
    
    if __name__ == '__main__':
        p = Pool(5)
        for i in range(10):
            p.apply(func,args=(i,))   #同步
    -------------------------------------------------------------
    import os
    from multiprocessing import Pool
    import time
    def func(n):
        print('start func%s'%n,os.getpid())
        time.sleep(1)
        print('end func%s'%n,os.getpid())
    
    if __name__ == '__main__':
        p = Pool(5)
        for i in range(10):
            p.apply(func,args=(i,))   #异步
        p.close()   #结束进程池接受任务
        p.join()   #感知进程池中的任务执行结束
    View Code

    >>基于进程池实现scoket实验(可以重复开启多个client)

    server端

    import socket
    from multiprocessing import Pool
    
    def func(conn):
        conn.send(b'hello')
        print(conn.recv(1024).decode('utf-8'))
        conn.close()
    
    if __name__ == '__main__':
        p = Pool(5)
        sk= socket.socket()
        sk.bind(('127.0.0.1',8080))
        sk.listen()
    
        while True:
            conn, addr = sk.accept()
            p.apply_async(func,args=(conn,))
    
        sk.close()

    client端

    import socket
    
    sk = socket.socket()
    sk.connect(('127.0.0.1',8080))
    
    ret = sk.recv(1024).decode('utf-8')
    print(ret)
    
    msg = input('>>>').encode('utf-8')
    sk.send(msg)
    
    sk.close()

    12.进程池回调函数

    from multiprocessing import Pool
    import os
    
    def func1(n):
        print('in func1',os.getpid())
        return n*n                                          #子进程
    
    def func2(nn):
        print('in func2',os.getpid())
        print(nn)                                            #主进程
    
    if __name__ == '__main__':
        print('主进程:',os.getpid())
        p = Pool(5)
        for i in range(10):
            p.apply_async(func1,args=(10,),callback=func2)   #callback回调
        p.close()
        p.join()
    View Code

    13.生产者消费者模型

    import time
    import random
    from multiprocessing import Process,Queue
    
    def produce(q,name,food):
        for i in range(4):
            time.sleep(random.randint(1, 3))
            f = '%s生产%s%s' % (name, food, i)
            print(f)
            q.put(f)
    
    
    def consumer(q,name):
        while True:
            food = q.get()
            if food is None:
                print("%s获得一个空"%name)
                break
            print('%s吃了%s'%(name,food))
    
    if __name__ == '__main__':
    
        q = Queue(20)
        p = Process(target=produce,args=(q,'小白','肉包子'))
        p2 = Process(target=produce, args=(q, '小黑', '菜包子'))
        c1 = Process(target=consumer,args=(q,''))
        c2 = Process(target=consumer, args=(q,''))
        p.start()
        p2.start()
        c1.start()
        c2.start()
        p.join()
        p2.join()
        q.put(None)
        q.put(None)
    View Code

    线程:

    1.创建线程

    from threading import Thread
    import time
    
    def func(n):
        time.sleep(1)
        print(n)
    
    for i in range(10):
        t = Thread(target=func, args=(i,))
        t.start()
    方法一
    from threading import Thread
    import time
    
    class MyThread(Thread):
        def __init__(self,arg):
            super().__init__()
            self.arg = arg
    
        def run(self):
            time.sleep(1)
            print(self.arg)
    
    t=MyThread(10)
    t.start()
    方法二

    2.数据共享问题

    from threading import Thread
    import os
    
    def func(n):
        global g
        g = 0
        print(g,os.getpid())
    
    g=100
    t_list=[]
    for i in range(10):
        t = Thread(target=func, args=(i,))
        t.start()
        t_list.append(t)
    for t in t_list:
        t.join()
    print(g)
    #多线程的数据是共享的
    View Code

    3.>>基于多线程实现scoket实验(可以重复开启多个client)

    server端

    import socket
    from threading import Thread
    
    def chat(conn):
        conn.send(b'hello')
        msg = conn.recv(1024).decode('utf-8')
        print(msg)
        conn.close()
    
    sk = socket.socket()
    sk.bind(('127.0.0.1',8080))
    sk.listen()
    while True:
        conn,addr = sk.accept()
        Thread(target=chat,args=(conn,)).start()
    
    sk.close()

    client端

    import socket
    
    sk = socket.socket()
    sk.connect(('127.0.0.1',8080))
    
    
    msg = sk.recv(1024).decode('utf-8')
    print(msg)
    info = input('>>>').encode('utf-8')
    sk.send(info)
    
    sk.close()

    4.守护线程

    #守护进程随着主进程代码执行结束而结束
    #守护线程会在主线程结束之后等待子线程结束而结束
    from threading import Thread
    import time
    
    def func1():
        print(666)
        time.sleep(2)
    def func2():
        print('in func2')
        time.sleep(2)
    
    t = Thread(target=func1,)
    t.daemon = True
    t.start()
    print('主线程')
    t2 = Thread(target=func2,)
    t2.start()
    View Code

    5.线程锁

    #死锁现象
    from threading import Thread,Lock
    import time
    noodle_lock = Lock()
    fork_lock = Lock()
    def func(name):
        noodle_lock.acquire()
        print('%s拿到面条'%name)
        fork_lock.acquire()
        print('%s拿到叉子'% name)
        print('%s吃面'%name)
        fork_lock.release()
        noodle_lock.release()
    
    def func1(name):
        fork_lock.acquire()
        print('%s拿到叉子' % name)
        time.sleep(5)
        noodle_lock.acquire()
        print('%s拿到面条' % name)
        print('%s吃面' % name)
        noodle_lock.release()
        fork_lock.release()
    Thread(target=func,args=('1号顾客',)).start()
    Thread(target=func1,args=('2号顾客',)).start()
    Thread(target=func,args=('3号顾客',)).start()
    Thread(target=func1,args=('4号顾客',)).start()
    死锁现象(互斥锁lock)
    from threading import Thread,RLock
    import time
    noodle_lock = fork_lock = RLock()
    
    def func(name):
        noodle_lock.acquire()
        print('%s拿到面条'%name)
        fork_lock.acquire()
        print('%s拿到叉子'% name)
        print('%s吃面'%name)
        time.sleep(2)
        fork_lock.release()
        noodle_lock.release()
    
    def func1(name):
        fork_lock.acquire()
        print('%s拿到叉子' % name)
        noodle_lock.acquire()
        print('%s拿到面条' % name)
        print('%s吃面' % name)
        time.sleep(2)
        noodle_lock.release()
        fork_lock.release()
    Thread(target=func,args=('1号顾客',)).start()
    Thread(target=func1,args=('2号顾客',)).start()
    Thread(target=func,args=('3号顾客',)).start()
    Thread(target=func1,args=('4号顾客',)).start()
    递归锁(rlock)

    6.信号量

    from  threading import Thread,Semaphore
    import time
    
    def func(sem,a,b):
        sem.acquire()
        time.sleep(1)
        print(a+b)
        sem.release()
    
    sem = Semaphore(4)
    for i in range(10):
        t = Thread(target=func, args=(sem,i, i + 5))
        t.start()
    View Code

    7.事件 

    #事件被创建的时候是false状态
    #flase状态 wait()阻塞
    #true状态 wait()非阻塞
    #set false状态设置为true
    #clear 设置状态为false

    #连接数据库
    #检测数据库的可连接情况
    
    from threading import Thread,Event
    import time
    import random
    
    def connect_db(e):
        count = 0
        while count<3:
            e.wait(0.5)   #状态为flase的时候 只等待0.5s就结束
            if e.is_set() == True:
                print('连接成功')
                break
            else:
                count += 1
                print('第%s次连接失败'%count)
        else:
            raise TimeoutError
    def check_web(e):
        time.sleep(random.randint(0,3))
        e.set()
    
    
    e = Event()
    t1 = Thread(target=connect_db,args=(e,))
    t2 = Thread(target=check_web,args=(e,))
    t1.start()
    t2.start()
    View Code

     8.条件

    #notify(int数据类型) 造钥匙
    from threading import Condition,Thread
    
    def func(con,i):
        con.acquire()
        con.wait()             #等钥匙
        print('在第%s个循环里'%i)
        con.release()
    con = Condition()
    for i in range(10):
        Thread(target=func,args=(con,i)).start()
    
    while True:
        num = int(input('>>>'))
        con.acquire()
        con.notify(num)          #造钥匙
        con.release()
    View Code

     9.定时器

    from  threading import Timer
    import time
    def func():
        print('时间同步')
    
    while True:
        Timer(5,func).start()
        time.sleep(5)
    View Code

    10.队列

    import queue
    q = queue.Queue() # 队列 先进先出
    q.put(1)
    q.put(2)
    print(q.get())
    print(q.get())
    # q.put_nowait()
    # q.get_nowait()
    
    q = queue.LifoQueue() #栈 先进后出
    q.put(1)
    q.put(2)
    q.put(3)
    print(q.get())
    print(q.get())
    print(q.get())
    
    q = queue.PriorityQueue()  #优先级队列 数字越小优先级越高 相同优先级按ascii排
    q.put((20,'a'))
    q.put((10,'b'))
    q.put((1,'c'))
    q.put((1,'q'))
    print(q.get())
    View Code

    11.线程池

    from concurrent.futures import ThreadPoolExecutor
    import time
    
    def func(n):
       time.sleep(2)
       print(n)
       return n*n
    
    tpool = ThreadPoolExecutor(max_workers=5)
    t_list = []
    for i in range(20):     #异步的
        t = tpool.submit(func,i)
        t_list.append(t)
    tpool.shutdown()   #close+join
    print('主线程')
    for t in t_list: print('***',t.result())
    View Code

    12.回调函数

    from concurrent.futures import ThreadPoolExecutor
    import time
    
    def func(n):
       time.sleep(2)
       print(n)
       return n*n
    def callback(m):
        print('结果是%s'%m.result())
    tpool = ThreadPoolExecutor(max_workers=5)
    for i in range(20):
        tpool.submit(func,i).add_done_callback(callback)
    View Code

    协程

    1.概念

    #进程 启动多个进程 进程之间是由操作系统负责调用
    #线程 启动多个线程 真正被cpu执行的最小单位是线程
    #开启一个线程 创建一个线程 寄存器 堆栈
    #关闭一个线程

    #协程
    #本质上是一个线程
    #能够在多个任务之间切换来节省一些io时间
    #协程中任务之间的切换时间消耗的时间 远远小于进程和线程之间的切换
    #实现并发的手段
    #进程和线程的任务切换由操作系统完成
    #协程任务之间的切换由代码完成,只有遇到协程模块可识别的io操作,程序才会进程任务切换实现并发效果
    
    from gevent import monkey;monkey.patch_all()
    import time
    import gevent
    
    def task():
        time.sleep(1)
        print(123)
    
    def sync():
        for i in range(10):
            task()
    
    def asynct():
        g_list = []
        for i in range(10):
            g = gevent.spawn(task)
            g_list.append(g)
        gevent.joinall(g_list)
    
    sync()
    asynct()
    协程的概念

    2.基于协程实现socket实验(可以重复开启多个client)

    server端

    import socket
    from gevent import monkey;monkey.patch_all()
    import gevent
    
    def func(conn):
        conn.send(b'hello')
        msg = conn.recv(1024).decode('utf-8')
        print(msg)
        conn.close()
    
    sk = socket.socket()
    sk.bind(('127.0.0.1',8080))
    sk.listen()
    while True:
        conn,addr = sk.accept()
        gevent.spawn(func,conn)
    sk.close()

    client端

    import socket
    sk = socket.socket()
    sk.connect(('127.0.0.1',8080))
    
    msg = sk.recv(1024).decode('utf-8')
    print(msg)
    info = input('>>>').encode('utf-8')
    sk.send(info)
    sk.close()
  • 相关阅读:
    百练-2703:骑车与走路
    大数阶乘-ny-28
    大明A+B-第一次周赛(有小数)
    单例模式的五种写法
    Intent跳转传list集合
    listview中item 有checkbox多选防止滑动 listview页面 出现checkbox错位问题
    论自作音乐播放器涉及知识点总结
    Android横竖屏切换继续播放视频
    关于ScrollView中嵌套listview焦点滑动问题 解决
    Android上传头像代码,相机,相册,裁剪
  • 原文地址:https://www.cnblogs.com/yzcstart/p/10658641.html
Copyright © 2011-2022 走看看