zoukankan      html  css  js  c++  java
  • python_day10 多线程 协程 IO模型

    多线程
    协程
    IO模型

    多线程

    进程:是资源单位 进程之前是竞争关系
    线程:是资源CPU计算单位 线程之间是协作关系
    线程两个特点:1、线程的创建开销小,速度快。
        2、1个进程下的多个线程之间共享名称空间或资源。
    注:Linux系统下,子进程会复制父进程的状态。
    #线程的PID与主进程PID一致
    from threading import Thread
    from multiprocessing import Process
    import os
    def task():
        print('%s is running' %os.getpid())
    if __name__ == '__main__':
        t1=Thread(target=task,)
        t2=Thread(target=task,)
        # t1=Process(target=task,)
        # t2=Process(target=task,)
        t1.start()
        t2.start()
        print('',os.getpid())
    #多线程共享一个进程内的资源
    from threading import Thread
    from multiprocessing import Process
    n=100
    def work():
        global n
        n=0
    if __name__ == '__main__':
        # p=Process(target=work,)
        # p.start()
        # p.join()
        # print('主',n)
        t=Thread(target=work,)
        t.start()
        t.join()
        print('',n)
    #开启线程的两种方式
    #开启线程的方式一:使用替换threading模块提供的Thread
    from threading import Thread
    from multiprocessing import Process
    def task():
        print('is running')
    if __name__ == '__main__':
        t=Thread(target=task,)
        # t=Process(target=task,)
        t.start()
        print('')
    #开启线程的方式二:自定义类,继承Thread
    from threading import Thread
    from multiprocessing import Process
    class MyThread(Thread):
        def __init__(self,name):
            super().__init__()
            self.name=name
        def run(self):
            print('%s is running' %self.name)
    if __name__ == '__main__':
        t=MyThread('egon')
        # t=Process(target=task,)
        t.start()
        print('')
    #多线程共享同一进程内地址空间的练习
    #三个任务,一个接收用户输入,一个将用户输入的内容格式化成大写,一个将格式化后的结果存入文件
    from threading import Thread
    msg_l=[]
    format_l=[]
    def talk():
        while True:
            msg=input('>>: ').strip()
            msg_l.append(msg)
    def format():
        while True:
            if msg_l:
                data=msg_l.pop()
                format_l.append(data.upper())
    def save():
        while True:
            if format_l:
                data=format_l.pop()
                with open('db.txt','a') as f:
                    f.write('%s
    ' %data)
    if __name__ == '__main__':
        t1=Thread(target=talk)
        t2=Thread(target=format)
        t3=Thread(target=save)
        t1.start()
        t2.start()
        t3.start()
    #Thread对象其他相关的属性或方法
    from threading import Thread,activeCount,enumerate,current_thread
    import time
    def task():
        print('%s is running' %current_thread().getName())
        time.sleep(2)
    if __name__ == '__main__':
        t=Thread(target=task,)
        t.start()
        t.join()
        print(t.is_alive())  无join()返回True,有join()返回FALSE
        print(t.getName())    获取线程名
        print(enumerate())  当前活跃的线程信息/对象
        print('')
        print(activeCount())  返回活跃的线程个数
    #current_thread的用法 返回当前进程信息
    from threading import Thread,activeCount,enumerate,current_thread
    from multiprocessing import Process
    import time
    def task():
        print('%s is running' %current_thread().getName())
        time.sleep(2)
    if __name__ == '__main__':
        p=Process(target=task)
        p.start()
        print(current_thread())   #有MainThread主进程对象信息
    
    from threading import Thread,activeCount,enumerate,current_thread
    from multiprocessing import Process
    import time
    def task():
        print('%s is running' %current_thread().getName())
        time.sleep(2)
    if __name__ == '__main__':
        t1=Thread(target=task)
        t2=Thread(target=task)
        t3=Thread(target=task)
        t1.start()
        t2.start()
        t3.start()
        print(current_thread())
    #强调:主线程从执行层面上代表了其所在进程的执行过程
    #先看:守护进程
    from multiprocessing import Process
    import time
    def task1():
        print('123')
        time.sleep(1)
        print('123done')
    def task2():
        print('456')
        time.sleep(10)
        print('456done')
    if __name__ == '__main__':
        p1=Process(target=task1)
        p2=Process(target=task2)
        p1.daemon = True
        p1.start()
        p2.start()
        print('')
    ‘’‘主
    456
    456done’‘’
    
    #再看:守护线程
    from threading import Thread
    import time
    def task1():
        print('123')
        time.sleep(10)
        print('123done')
    def task2():
        print('456')
        time.sleep(1)
        print('456done')
    if __name__ == '__main__':
        t1=Thread(target=task1)
        t2=Thread(target=task2)
        t1.daemon=True
        t1.start()
        t2.start()
        print('')
    ‘’‘123
    456
    主
    456done’‘’
    #Python GIL(Global Interpreter Lock) 全局解释器锁
    #注意的点:
    #1.线程抢的是GIL锁,GIL锁相当于执行权限,拿到执行权限后才能拿到互斥锁Lock,其他线程也可以抢到GIL,但如果发现Lock仍然没有被释放则阻塞,即便是拿到执行权限GIL也要立刻交出来 
    #2.join是等待所有,即整体串行,而锁只是锁住修改共享数据的部分,即部分串行,要想保证数据安全的根本原理在于让并发变成串行,join与互斥锁都可以实现,毫无疑问,互斥锁的部分串行效率要更高
    from threading import Thread
    n=100
    def task():
        print('is running')
    if __name__ == '__main__':
        t1=Thread(target=task,)
        t2=Thread(target=task,)
        t3=Thread(target=task,)
        # t=Process(target=task,)
        t1.start()
        t2.start()
        t3.start()
        print('')
    
    #线程的互斥锁
    from threading import Thread,Lock
    import time
    n=100
    def work():
        global n
        mutex.acquire()
        temp=n
        time.sleep(0.1)
        n=temp-1
        mutex.release()
    if __name__ == '__main__':
        mutex=Lock()
        l=[]
        start=time.time()
        for i in range(100):
            t=Thread(target=work)
            l.append(t)
            t.start()
        for t in l:
            t.join()
        print('run time:%s value:%s' %(time.time()-start,n))
    
    #互斥锁与join的区别
    #join
    from threading import Thread,Lock
    import time
    n=100
    def work():
        time.sleep(0.05)
        global n
        temp=n
        time.sleep(0.1)
        n=temp-1
    if __name__ == '__main__':
        start=time.time()
        for i in range(100):
            t=Thread(target=work)
            t.start()
            t.join()
        print('run time:%s value:%s' %(time.time()-start,n
    
    #多进程:
    #优点:可以利用多核优势
    #缺点:开销大
    
    #多线程:
    #优点:开销小
    #缺点:不能利用多核优势
    
    from threading import Thread
    from multiprocessing import Process
    import time
    #计算密集型
    def work():
        res=1
        for i in range(100000000):
            res+=i
    if __name__ == '__main__':
        p_l=[]
        start=time.time()
        for i in range(4):
            # p=Process(target=work) #6.7473859786987305
            p=Thread(target=work) #24.466399431228638
            p_l.append(p)
            p.start()
        for p in p_l:
            p.join()
        print(time.time()-start)
    
    
    from threading import Thread
    from multiprocessing import Process
    import time
    #IO密集型
    def work():
        time.sleep(2)
    if __name__ == '__main__':
        p_l=[]
        start=time.time()
        for i in range(400):
            # p=Process(target=work) #12.104692220687866
            p=Thread(target=work) #2.038116455078125
            p_l.append(p)
            p.start()
        for p in p_l:
            p.join()
        print(time.time()-start)
    
    #死锁现象
    from threading import Thread,Lock,RLock
    import time
    mutexA=Lock()
    mutexB=Lock()
    class Mythread(Thread):
        def run(self):
            self.f1()
            self.f2()
        def f1(self):
            mutexA.acquire()
            print('33[45m%s 抢到A锁33[0m' %self.name)
            mutexB.acquire()
            print('33[44m%s 抢到B锁33[0m' %self.name)
            mutexB.release()
            mutexA.release()
        def f2(self):
            mutexB.acquire()
            print('33[44m%s 抢到B锁33[0m' %self.name)
            time.sleep(1)
            mutexA.acquire()
            print('33[45m%s 抢到A锁33[0m' %self.name)
            mutexA.release()
            mutexB.release()
    if __name__ == '__main__':
        for i in range(20):
            t=Mythread()
            t.start()
    
    #递归锁
    #计算机制:Rlock加锁+1,释放锁-1.只有Rlock计数为0,其他才能抢。
    from threading import Thread,Lock,RLock
    import time
    mutex=RLock()
    class Mythread(Thread):
        def run(self):
            self.f1()
            self.f2()
        def f1(self):
            mutex.acquire()
            print('33[45m%s 抢到A锁33[0m' %self.name)
            mutex.acquire()
            print('33[44m%s 抢到B锁33[0m' %self.name)
            mutex.release()
            mutex.release()
        def f2(self):
            mutex.acquire()
            print('33[44m%s 抢到B锁33[0m' %self.name)
            time.sleep(1)
            mutex.acquire()
            print('33[45m%s 抢到A锁33[0m' %self.name)
            mutex.release()
            mutex.release()
    if __name__ == '__main__':
        for i in range(20):
            t=Mythread()
            t.start()
    
    #信号量semaphore
    from threading import Thread,current_thread,Semaphore
    import time,random
    sm=Semaphore(5)
    def work():
        sm.acquire()
        print('%s 上厕所' %current_thread().getName())
        time.sleep(random.randint(1,3))
        sm.release()
    if __name__ == '__main__':
        for i in range(20):
            t=Thread(target=work)
            t.start()
    
    #事件Event
    from threading import Thread,current_thread,Event
    import time
    event=Event()
    def conn_mysql():
        count=1
        while not event.is_set():  #判断event是否设置好
            if count > 3:
                raise ConnectionError('链接失败')
            print('%s 等待第%s次链接mysql' %(current_thread().getName(),count))
            event.wait(0.5)   #设置超时时间0.5s,超时后重新连接   #event.wait() wait即event为True时
            count+=1
        print('%s 链接ok' % current_thread().getName())
    def check_mysql():
        print('%s 正在检查mysql状态' %current_thread().getName())
        time.sleep(1)
        event.set()  #将event由FALSE改为True
    if __name__ == '__main__':
        t1=Thread(target=conn_mysql)
        t2=Thread(target=conn_mysql)
        check=Thread(target=check_mysql)
        t1.start()
        t2.start()
        check.start()
    
    #定时器Timer
    from threading import Timer
    def hello(n):
        print("hello, world",n)
    t = Timer(3, hello,args=(11,))
    t.start()  # after 1 seconds, "hello, world" will be printed
    
    #线程queue
    import queue
    q=queue.Queue(3) #队列:先进先出
    q.put(1)
    q.put(2)
    q.put(3)
    print(q.get())
    print(q.get())
    print(q.get())
    
    q=queue.LifoQueue(3) #堆栈:后进先出
    q.put(1)
    q.put(2)
    q.put(3)
    print(q.get())
    print(q.get())
    print(q.get())
    
    q=queue.PriorityQueue(3) #数字越小优先级越高
    q.put((10,'data1'))
    q.put((11,'data2'))
    q.put((9,'data3'))
    print(q.get())
    print(q.get())
    print(q.get())
    
    #进程池与线程池
    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    import os,time,random
    def work(n):
        print('%s is running' %os.getpid())
        time.sleep(random.randint(1,3))
        return n**2
    if __name__ == '__main__':
        p=ProcessPoolExecutor()
        # objs=[]
        # for i in range(10):
        #     obj=p.submit(work,i)
        #     objs.append(obj)
        # p.shutdown()
        # for obj in objs:
        #     print(obj.result())  #获取进程执行结果
        obj=p.map(work,range(10))
        p.shutdown()    #进程池中进程执行完毕
        print(list(obj))
    
    #线程池
    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    from threading import current_thread
    import os,time,random
    def work(n):
        print('%s is running' %current_thread().getName())
        time.sleep(random.randint(1,3))
        return n**2
    if __name__ == '__main__':
        p=ThreadPoolExecutor()   #线程默认数为CPU个数*5.
        objs=[]
        for i in range(21):
            obj=p.submit(work,i)
            objs.append(obj)
        p.shutdown()
        for obj in objs:
            print(obj.result())
    
    #进程池
    import requests #pip3 install requests
    import os,time
    from multiprocessing import Pool
    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    def get_page(url):
        print('<%s> get :%s' %(os.getpid(),url))
        respone = requests.get(url)
        if respone.status_code == 200:
            return {'url':url,'text':respone.text}
    def parse_page(obj):
        dic=obj.result()
        print('<%s> parse :%s' %(os.getpid(),dic['url']))
        time.sleep(0.5)
        res='url:%s size:%s
    ' %(dic['url'],len(dic['text'])) #模拟解析网页内容
        with open('db.txt','a') as f:
            f.write(res)
    if __name__ == '__main__':
        # p=Pool(4)
        p=ProcessPoolExecutor()
        urls = [
            'http://www.baidu.com',
            'http://www.baidu.com',
            'http://www.baidu.com',
            'http://www.baidu.com',
            'http://www.baidu.com',
            'http://www.baidu.com',
            'http://www.baidu.com',
        ]
        for url in urls:
            # p.apply_async(get_page,args=(url,),callback=parse_page)
            p.submit(get_page,url).add_done_callback(parse_page)
        p.shutdown()
        print('主进程pid:',os.getpid())
    
    #线程池
    import requests #pip3 install requests
    import os,time,threading
    from multiprocessing import Pool
    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    def get_page(url):
        print('<%s> get :%s' %(threading.current_thread().getName(),url))
        respone = requests.get(url)
        if respone.status_code == 200:
            return {'url':url,'text':respone.text}
    def parse_page(obj):
        dic=obj.result()
        print('<%s> parse :%s' %(threading.current_thread().getName(),dic['url']))
        time.sleep(0.5)
        res='url:%s size:%s
    ' %(dic['url'],len(dic['text'])) #模拟解析网页内容
        with open('db.txt','a') as f:
            f.write(res)
    if __name__ == '__main__':
        # p=Pool(4)
        p=ThreadPoolExecutor(3)
        urls = [
            'http://www.baidu.com',
            'http://www.baidu.com',
            'http://www.baidu.com',
            'http://www.baidu.com',
            'http://www.baidu.com',
            'http://www.baidu.com',
            'http://www.baidu.com',
        ]
        for url in urls:
            # p.apply_async(get_page,args=(url,),callback=parse_page)
            p.submit(get_page,url).add_done_callback(parse_page)
        p.shutdown()
        print('主进程pid:',os.getpid())
    
    #并发编程总结
    1 生产者消费者模型
    2 进程池线程池
    3 回调函数
    4 GIL全局解释器锁

    l=[1,2,3]
    map res=map(lambda x:x**2,l) res是对象,list(res)出结果
    obj=p.map(work,range(10))等价于 for i in range(10):p.submit(work,i)

    单线程下并发

    #基于yield实现并发
    import time
    def consumer():
        while True:
            res=yield
    def producer():
        g=consumer()
        next(g)
        for i in range(100000000):
            g.send(i)
    start=time.time()
    producer()
    print(time.time()-start) #12.602720737457275
    
    import time
    def consumer(res):
        print('consumer')
    def producer():
        res=[]
        for i in range(100000000):
            res.append(i)
        return res
    start=time.time()
    res=producer()
    consumer(res)
    print(time.time()-start) #12.344706058502197
    
    import time
    def consumer():
        while True:
            res=yield
            print('consumer',res)
            time.sleep(10)
    def producer():
        g=consumer()
        next(g)
        for i in range(100000000):
            print('producer', i)
            g.send(i)
    start=time.time()
    producer()
    print(time.time()-start) #12.602720737457275
    
    #greenlet模块实现并发
    from greenlet import greenlet
    import time
    def eat(name):
        print('%s eat 1' %name)
        g2.switch('egon') #
        print('%s eat 2' %name)
        g2.switch()
    def play(name):
        print('%s play 1' %name)
        time.sleep(10)
        g1.switch()
        print('%s play 2' %name)
    g1=greenlet(eat)
    g2=greenlet(play)
    g1.switch('egon')
    
    #gevent模块服务端
    from gevent import monkey;monkey.patch_all()
    from socket import *
    import gevent
    #如果不想用money.patch_all()打补丁,可以用gevent自带的socket
    # from gevent import socket
    # s=socket.socket()
    def server(server_ip,port):
        s=socket(AF_INET,SOCK_STREAM)
        s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
        s.bind((server_ip,port))
        s.listen(5)
        while True:
            conn,addr=s.accept()
            gevent.spawn(talk,conn,addr)
    def talk(conn,addr):
        try:
            while True:
                res=conn.recv(1024)
                print('client %s:%s msg: %s' %(addr[0],addr[1],res))
                conn.send(res.upper())
        except Exception as e:
            print(e)
        finally:
            conn.close()
    if __name__ == '__main__':
        server('127.0.0.1',8080)
    
    #gevent模块客户端
    #_*_coding:utf-8_*_
    from threading import Thread
    from socket import *
    def client():
        client=socket(AF_INET,SOCK_STREAM)
        client.connect(('127.0.0.1',8080))
        while True:
            client.send('hello'.encode('utf-8'))
            msg=client.recv(1024)
            print(msg.decode('utf-8'))
    if __name__ == '__main__':
        for i in range(500):
            t=Thread(target=client)
            t.start()
    #gevent模块实现生产者消费者模型
    from gevent import monkey;monkey.patch_all()
    import gevent
    import time
    import threading
    def eat(name):
        print(threading.current_thread().getName())
        print('%s eat 1' %name)
        time.sleep(1)
        print('%s eat 2' %name)
    def play(name):
        print(threading.current_thread().getName())
        print('%s play 1' %name)
        time.sleep(2)
        print('%s play 2' %name)
    g1=gevent.spawn(eat,'egon')
    g2=gevent.spawn(play,'egon')
    # g1.join()
    # g2.join()
    gevent.joinall([g1,g2])

    IO模型

    #阻塞IO服务端
    from socket import *
    server=socket(AF_INET,SOCK_STREAM)
    server.bind(('127.0.0.1',8080))
    server.listen(5)
    while True:
        conn,addr=server.accept()
        print(addr)
        while True:
            try:
                data=conn.recv(1024)
                if not data:break
                conn.send(data.upper())
            except Exception:
                break
        conn.close()
    server.close()
    
    #阻塞IO客户端
    from socket import *
    client=socket(AF_INET,SOCK_STREAM)
    client.connect(('127.0.0.1',8080))
    while True:
        msg=input('>>: ').strip()
        if not msg:continue
        client.send(msg.encode('utf-8'))
        data=client.recv(1024)
        print(data.decode('utf-8'))
    
    #非阻塞IO服务端
    from socket import *
    import time
    server=socket(AF_INET,SOCK_STREAM)
    server.bind(('127.0.0.1',8080))
    server.listen(5)
    server.setblocking(False)
    conns=[]
    del_l=[]
    while True:
        try:
            print(conns)
            conn,addr=server.accept()
            conns.append(conn)
        except BlockingIOError:
            for conn in conns:
                try:
                    data=conn.recv(1024)
                    conn.send(data.upper())
                except BlockingIOError:
                    pass
                except ConnectionResetError:
                    conn.close()
                    del_l.append(conn)
    
            for conn in del_l:
                conns.remove(conn)
            del_l=[]
    
    #非阻塞IO客户端
    from socket import *
    client=socket(AF_INET,SOCK_STREAM)
    client.connect(('127.0.0.1',8080))
    while True:
        msg=input('>>: ').strip()
        if not msg:continue
        client.send(msg.encode('utf-8'))
        data=client.recv(1024)
        print(data.decode('utf-8'))
    
    ##IO多路复用服务端
    from socket import *
    import select
    import time
    server=socket(AF_INET,SOCK_STREAM)
    server.bind(('127.0.0.1',8080))
    server.listen(5)
    server.setblocking(False)
    reads=[server,]
    while True:
        rl,_,_=select.select(reads,[],[])
        for obj in rl:
            if obj == server:
                conn,addr=obj.accept()
                reads.append(conn)
            else:
                try:
                    data=obj.recv(1024)
                    if not data:
                        obj.close()
                        reads.remove(obj)
                        continue
                    obj.send(data.upper())
                except Exception:
                    obj.close()
                    reads.remove(obj)
    
    #IO多路复用客户端
    from socket import *
    client=socket(AF_INET,SOCK_STREAM)
    client.connect(('127.0.0.1',8080))
    while True:
        msg=input('>>: ').strip()
        if not msg:continue
        client.send(msg.encode('utf-8'))
        data=client.recv(1024)
        print(data.decode('utf-8'))
  • 相关阅读:
    还原被删除的对象(域)
    Windows blue系列的安装
    转移active directry数据库文件
    使用指针让两个数交换
    针对被删除的AD DS对象执行授权还原
    这两天的总结
    小小问题
    程序2
    程序4
    程序1
  • 原文地址:https://www.cnblogs.com/liweijing/p/7476135.html
Copyright © 2011-2022 走看看