zoukankan      html  css  js  c++  java
  • 网络编程

    网络编程

    进程

    进程指的是程序的运行过程。而负责执行任务则是cpu。
    一 并发:是伪并行,即看起来是同时运行。单个cpu+多道技术就可以实现并发,(并行也属于并发)
    二 并行:同时运行,只有具备多个cpu才能实现并行
    单核下,可以利用多道技术,多个核,每个核也都可以利用多道技术(多道技术是针对单核而言的)

    开启多进程的方法

    方法一:

    from multiprocessing import Process
    import os
    
    
    def func(name):
    
        print('name: %s pid: %s' % (name, os.getpid()))
    
    
    if __name__ == '__main__':
        for i in range(3):
    
            p = Process(target=func, args=('进程%s' % i,), kwargs={'key': 'value'})  # args的参数为元组,一个参数时要加逗号
            p.start()  执行一个子进程
    
        print('主线程')
    # 执行结果
    '''
    主线程
    name: 进程0 pid: 9604
    name: 进程1 pid: 9188
    name: 进程2 pid: 2864
    '''
    

    方法二:

    from multiprocessing import Process
    import os
    
    
    class MyProcess(Process):  # 必须继承Process类
    
        def run(self):  # 必须实现run方法
            print(' pid: %s' % (os.getpid()))
    
    
    if __name__ == '__main__':
        for i in range(3):
            p = MyProcess()
            p.start()  # 子进程执行MyProcess类中的run方法
    # 执行结果
    '''
    pid: 4864
    pid: 9036
    pid: 9368
    '''
    
    带参数
    
    from multiprocessing import Process
    import os
    
    
    class MyProcess(Process):  # 必须继承Process类
    
        def __init__(self, name):
            super().__init__()
            self.name = name
    
        def run(self):  # 必须实现run方法
            print('name: %s pid: %s' % (self.name, os.getpid()))
    
    
    if __name__ == '__main__':
        for i in range(3):
            p = MyProcess(str(i))
            p.start()  # 子进程执行MyProcess类中的run方法
            
    '''
    name: 0 pid: 9268
    name: 1 pid: 8536
    name: 2 pid: 4944
    '''
    
    

    多进程之间的数据是相互隔离的:

    
    from multiprocessing import Process
    import os
    
    n = 100
    def func():
        global n
        n = 0
        print('pid:%s' % os.getpid(), n)  # pid:4992 0
        
    if __name__ == '__main__':
        p=Process(target=func)
        p.start()
        p.join()
        print('主进程', n)  # 主进程 100
    

    多进程实现socket并发效果

    服务端代码
    
    import socket
    from multiprocessing import Process
    
    
    def task(conn):
        while True:
            try:
                data = conn.recv(1024).decode('utf-8')
                conn.send(bytes('hello ' + data, encoding='utf-8'))
            except:
                break
    
        conn.close()
    
    
    def server(ip, port):
        sk = socket.socket()
        sk.bind((ip,port))
        sk.listen(5)
        while True:
            conn, addr = sk.accept()
            p = Process(target=task, args=(conn,))
            p.start()
        sk.close()
    
    
    if __name__ == '__main__':
        server('127.0.0.1', 8089)
    
    
    ----------
    客户端代码
    
    import socket
    
    sk = socket.socket()
    sk.connect(('127.0.0.1', 8089))
    
    while True:
        cmd = input('>>:').strip()
        if not cmd:
            continue
        sk.send(cmd.encode('utf-8'))
        data = sk.recv(1024).decode('utf-8')
        print(data)
    
    sk.close()
    
    

    join的用法

    主进程的执行需要等待子进程的执行完成后再继续。

    守护进程

    1. 在进程执行之前设置p.daemon = True
    2. 主进程代码执行完,守护进程就结束。
    3. 主进程需要等待其他非守护进程执行完才结束。
    4. 守护进程内不能再开启子进程。
    
    实例一:
    
    from multiprocessing import Process
    import time
    def foo():
        print(123)
        time.sleep(1)
        print("end123")
    
    def bar():
        print(456)
        time.sleep(3)
        print("end456")
    
    
    if __name__ == '__main__':
        p1 = Process(target=foo)
        p2 = Process(target=bar)
    
        p1.daemon=True  把p1设置成守护进程
        p1.start()
        p2.start()  	p2不是守护进程
        print("main-------")
    
    # 执行结果
    '''
    main-------
    456
    end456
    
    '''
    
    
    实例二:
    
    
    from multiprocessing import Process
    import time,os
    
    
    def func(name):
        time.sleep(1)
        print('name: %s  pid: %s' % (name, os.getpid()))  # name: egon  pid: 7216
    
    
    if __name__ == '__main__':
        p1 = Process(target=func, args=('alex',))
        p2 = Process(target=func, args=('egon',))
    
        p1.daemon = True
        p1.start()
        p2.start()
        print(p1.is_alive())  # 判断进程是否存活  True
        p1.terminate()  # 结束一个进程,需要先向操作系统发指令,需要时间
        print(p1.name)  # 进程名字  Process-1
        print(p1.pid)  # 进程号  676
    
        print('主进程:%s' % os.getpid())  # 主进程:2968
    ----------
    一些方法
    p.join() 	  等待子进程执行完,再执行后面的程序
    p.terminate() 结束一个子进程
    p.is_alive()  判断一个进程是否存活,返回布尔值
    p.name      查看进程的名字
    p.pid      	查看进程号
    
    

    多进程间同步控制

    锁、信号量、事件

    • 同一时间内只能一个进程对数据进行修改,保证安全性
    • 只能在多进程中使用

    模拟抢票的例子

    from multiprocessing import Process,Lock  # 导入锁
    import json,time
    
    
    def check_ticket(name):
        data = json.load(open('db.txt', 'r', encoding='utf-8'))
        count = data['count']
        print('name: %s 正在查票,还剩 %s 张票' % (name, count))
    
    
    def get(name):
        time.sleep(0.5)
        data = json.load(open('db.txt', 'r', encoding='utf-8'))
        if data['count']:
            data['count'] -= 1
            json.dump(data, open('db.txt', 'w', encoding='utf-8'))
            print('name: %s 买票成功,还剩 %s 张票' % (name,data['count']))
    
    
    def task(name, mutex):
        check_ticket(name)
        mutex.acquire()  # 上锁
        get(name)
        mutex.release()  # 释放锁
    
    
    if __name__ == '__main__':
        mutex = Lock()  # 创建锁
        for i in range(3):
            p = Process(target=task, args=('egon%s'%i, mutex))
            p.start()
    
    

    join和互斥锁的异同

    • 都是将并发变成串行,保证数据安全。
    • join是将整个子进程进行同步串行操作
    • 互斥锁可以控制子进程的部分代码串行

    信号量

    • 互斥锁同时只允许一个线程更改数据,而信号量Semaphore是同时允许一定数量的进程/线程更改数据 。
    • 实现:
      信号量同步基于内部计数器,每调用一次acquire(),计数器减1;每调用一次release(),计数器加1.当计数器为0时,acquire()调用被阻塞。这是迪科斯彻(Dijkstra)信号量概念P()和V()的Python实现。信号量同步机制适用于访问像服务器这样的有限资源。
      信号量与进程池的概念很像,但是要区分开,信号量涉及到加锁的概念
    from multiprocessing import Process,Semaphore
    import time
    import random
    
    
    def ktv(name, sem):
        sem.acquire()
        print('%s 走进ktv...' % name)
        time.sleep(random.randint(1, 5))
        print('%s 走出ktv...' % name)
        sem.release()
    
    
    if __name__ == '__main__':
        sem = Semaphore(3)  # 指定同时执行ktv函数的子进程个数
        for i in range(10):
            p = Process(target=ktv, args=(i,sem))
            p.start()
    

    事件

    • 利用wait对事件状态同步控制,两种状态True、False。事件主要提供了三个方法 set、wait、clear。

      事件处理的机制:
      全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。
      clear:将“Flag”设置为False
      set:将“Flag”设置为True

    通过一个信号 来控制 多个进程 同时 执行或者阻塞
    事件
    from multiprocessing import Event
    一个信号可以使所有的进程都进入阻塞状态
    也可以控制所有的进程解除阻塞
    一个事件被创建之后,默认是阻塞状态
    e = Event()  # 创建了一个事件
    print(e.is_set())   # 查看一个事件的状态,默认被设置成阻塞
    e.set()      # 将这个事件的状态改为True
    print(e.is_set())
    e.wait()     # 是依据e.is_set()的值来决定是否阻塞的
    print(123456)
    e.clear()    # 将这个事件的状态改为False
    print(e.is_set())
    e.wait()     # 等待 事件的信号被变成True
    print('*'*10)
    
    
    set 和 clear
         分别用来修改一个事件的状态 True或者False
    is_set 用来查看一个事件的状态
    wait 是依据事件的状态来决定自己是否在wait处阻塞
         False阻塞 True不阻塞
    

    红绿灯事件例子

    # 红绿灯事件
    import time
    import random
    from multiprocessing import Event,Process
    def cars(e,i):
        if not e.is_set():
            print('car%i在等待'%i)
            e.wait()    # 阻塞 直到得到一个 事件状态变成 True 的信号
        print('33[0;32;40mcar%i通过33[0m' % i)
    
    def light(e):
        while True:
            if e.is_set():
                e.clear()
                print('33[31m红灯亮了33[0m')
            else:
                e.set()
                print('33[32m绿灯亮了33[0m')
            time.sleep(2)
    
    if __name__ == '__main__':
        e = Event()
        traffic = Process(target=light,args=(e,))
        traffic.start()
        for i in range(20):
            car = Process(target=cars, args=(e,i))
            car.start()
            time.sleep(random.random())
    
    

    进程之间的通信

    队列和管道(multiprocess.Queue、multiprocess.Pipe)

    队列

    特点:先进先出
    队列 进程之间数据安全的

    Queue([maxsize])
    创建共享的进程队列。maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。底层队列使用管道和锁定实现。另外,还需要运行支持线程以便队列中的数据传输到底层管道中。
    Queue的实例q具有以下方法:

    • q.get( [ block [ ,timeout ] ] )
      返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。

    • q.get_nowait( )
      同q.get(False)方法。

    • q.put(item [, block [,timeout ] ] )
      将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。

    • q.qsize()
      返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。

    • q.empty()
      如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。

    • q.full()
      如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)。。

    生产者消费者模型

    from multiprocessing import Process,Queue
    import time
    import random
    
    
    def consumer(name, q):
        while True:
            food = q.get()
            if food is None:
                break
            print('%s消费了%s' % (name, food))
            time.sleep(random.randint(1, 3))
    
    
    def producer(name, food, q):
        for i in range(1, 4):
            time.sleep(random.randint(1, 3))
            f = '%s生产了%s%s' % (name, food, i)
            print(f)
            q.put(f)
    
    
    if __name__ == '__main__':
        q = Queue(4)
        p1 = Process(target=producer, args=('egon', '包子', q))
        p2 = Process(target=producer, args=('wusir', '馒头', q))
        c1 = Process(target=consumer, args=('alex', q))
        c2 = Process(target=consumer, args=('pizza', q))
        p1.start()
        p2.start()
        c1.start()
        c2.start()
        p1.join()
        p2.join()
        q.put(None)
        q.put(None)
    

    JoinableQueue

    import time
    import random
    from multiprocessing import Process,JoinableQueue
    def consumer(q,name):
        while True:
            food = q.get()
            print('33[31m%s消费了%s33[0m' % (name,food))
            time.sleep(random.randint(1,3))
            q.task_done()     # count - 1
    
    def producer(name,food,q):
        for i in range(4):
            time.sleep(random.randint(1,3))
            f = '%s生产了%s%s'%(name,food,i)
            print(f)
            q.put(f)
        q.join()    # 阻塞  直到一个队列中的所有数据 全部被处理完毕
    
    if __name__  == '__main__':
        q = JoinableQueue(20)
        p1 = Process(target=producer,args=('Egon','包子',q))
        p2 = Process(target=producer, args=('wusir','泔水', q))
        c1 = Process(target=consumer, args=(q,'alex'))
        c2 = Process(target=consumer, args=(q,'jinboss'))
        p1.start()
        p2.start()
        c1.daemon = True   # 设置为守护进程 主进程中的代码执行完毕之后,子进程自动结束
        c2.daemon = True
        c1.start()
        c2.start()
        p1.join()
        p2.join()      # 感知一个进程的结束
    在消费者这一端:
    每次获取一个数据
    处理一个数据
    发送一个记号 : 标志一个数据被处理成功
    在生产者这一端:
        每一次生产一个数据,
        且每一次生产的数据都放在队列中
        在队列中刻上一个记号
        当生产者全部生产完毕之后,
        join信号 : 已经停止生产数据了
                    且要等待之前被刻上的记号都被消费完
                    当数据都被处理完时,join阻塞结束
    consumer 中把所有的任务消耗完
    producer 端 的 join感知到,停止阻塞
    所有的producer进程结束
    主进程中的p.join结束
    主进程中代码结束
    守护进程(消费者的进程)结束
    

    管道

    进程间通信数据不安全,需要加锁。
    注意点:应该特别注意管道端点的正确管理问题。如果是生产者或消费者中都没有使用管道的某个端点,就应将它关闭。这也说明了为何在生产者中关闭了管道的输出端,在消费者中关闭管道的输入端。如果忘记执行这些步骤,程序可能在消费者中的recv()操作上挂起。管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生成EOFError异常。因此,在生产者中关闭管道不会有任何效果,除非消费者也关闭了相同的管道端点。

    • #创建管道的类:
      Pipe([duplex]):在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道
      #参数介绍:
      dumplex:默认管道是全双工的,如果将duplex射成False,conn1只能用于接收,conn2只能用于发送。
    • #主要方法:
      1.conn1.recv():接收conn2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。
      2.conn1.send(obj):通过连接发送对象。obj是与序列化兼容的任意对象
      #其他方法:
    1. conn1.close():关闭连接。如果conn1被垃圾回收,将自动调用此方法

    2. conn1.fileno():返回连接使用的整数文件描述符

    3. conn1.poll([timeout]):如果连接上的数据可用,返回True。timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout射成None,操作将无限期地等待数据到达。

    4. conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息,超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。如果连接的另外一端已经关闭,再也不存在任何数据,将引发EOFError异常。

    5. conn.send_bytes(buffer [, offset [, size]]):通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收

    6. conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常。

    from multiprocessing import Pipe,Process
    
    def func(conn1, conn2):
        conn2.close()
        while True:
            try:
                msg = conn1.recv()
                print(msg)
            except EOFError:
                conn1.close()
                break
    
    
    if __name__ == '__main__':
        conn1, conn2 = Pipe()
        p = Process(target=func, args=(conn1, conn2))
        p.start()
        conn1.close()
        for i in range(10):
            conn2.send('hello 你好')
        conn2.close()
    

    管道实现生产者消费者模型
    管道是在进程间数据通信不安全,会出现两个消费者抢占同一个资源的情况,解决这个问题就是加锁。

    from multiprocessing import Pipe,Process,Lock
    
    
    def consumer(name, conn1, conn2, mutex):
        conn2.close()
        while True:
            try:
                mutex.acquire()
                food = conn1.recv()
                mutex.release()
                if not food:break
                print('%s消费了%s' % (name, food))
            except EOFError:
                conn1.close()
                break
    
    
    def producer(name, food, conn1, conn2):
        conn1.close()
        for i in range(200):
            f = '%s生产了%s%s' % (name, food, i)
            print(f)
            conn2.send(f)
        conn2.send(None)
        conn2.send(None)
        conn2.send(None)
        conn2.close()
    
    
    if __name__ == '__main__':
        conn1, conn2 = Pipe()
        mutex = Lock()
        p1 = Process(target=producer, args=('egon', '包子', conn1, conn2))
        c1 = Process(target=consumer, args=('alex', conn1,conn2, mutex))
        c2 = Process(target=consumer, args=('wusir', conn1,conn2,mutex))
        c3 = Process(target=consumer, args=('jin', conn1,conn2,mutex))
        c1.start()
        c2.start()
        c3.start()
        p1.start()
    

    进程之间的数据共享

    数据修改是不安全的,需要加锁解决。
    进程间数据是独立的,可以借助于队列或管道实现通信,二者都是基于消息传递的
    虽然进程间数据独立,但可以通过Manager实现数据共享,事实上Manager的功能远不止于此

    from multiprocessing import Manager,Process,Lock
    def work(d,lock):
        with lock: #不加锁而操作共享的数据,肯定会出现数据错乱
            d['count']-=1
    
    if __name__ == '__main__':
        lock=Lock()
        with Manager() as m:
            dic=m.dict({'count':100})
            p_l=[]
            for i in range(100):
                p=Process(target=work,args=(dic,lock))
                p_l.append(p)
                p.start()
            for p in p_l:
                p.join()
            print(dic)
            
    加锁
    from multiprocessing import Manager,Process,Lock
    def main(dic,lock):
        lock.acquire()
        dic['count'] -= 1
        lock.release()
    
    if __name__ == '__main__':
        m = Manager()
        l = Lock()
        dic=m.dict({'count':100})
        p_lst = []
        for i in range(50):
            p = Process(target=main,args=(dic,l))
            p.start()
            p_lst.append(p)
        for i in p_lst: i.join()
        print('主进程',dic)
    

    进程池

    为什么要有进程池?进程池的概念。超过5个进程就用进程池。

    在程序实际处理问题过程中,忙时会有成千上万的任务需要被执行,闲时可能只有零星任务。那么在成千上万个任务需要被执行的时候,我们就需要去创建成千上万个进程么?首先,创建进程需要消耗时间,销毁进程也需要消耗时间。第二即便开启了成千上万的进程,操作系统也不能让他们同时执行,这样反而会影响程序的效率。因此我们不能无限制的根据任务开启或者结束进程。那么我们要怎么做呢?

    在这里,要给大家介绍一个进程池的概念,定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,等到处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务。如果有很多任务需要执行,池中的进程数量不够,任务就要等待之前的进程执行任务完毕归来,拿到空闲进程才能继续执行。也就是说,池中进程的数量是固定的,那么同一时间最多有固定数量的进程在运行。这样不会增加操作系统的调度难度,还节省了开闭进程的时间,也一定程度上能够实现并发效果。
    Pool([numprocess [,initializer [, initargs]]]):创建进程池

    1. numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值
    2. initializer:是每个工作进程启动时要执行的可调用对象,默认为None
    3. initargs:是要传给initializer的参数组

    主要方法:

    • p.apply(func [, args [, kwargs]]):在一个池工作进程中同步执行func(*args,**kwargs),然后返回结果。
      需要强调的是:此操作并不会在所有池工作进程中执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()

    • p.apply_async(func [, args [, kwargs]]):在一个池工作进程中异步执行,需要手动join和close。func(*args,**kwargs),然后返回结果。
      '''此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。'''

    • p.map(func, iterable,)默认是异步调用,自带join,自带close。

    • p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成

    • P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用

    其他方法:
    1 方法apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具有以下方法
    2 obj.get():返回结果,如果有必要则等待结果到达。timeout是可选的。如果在指定时间内还没有到达,将引发一场。如果远程操作中引发了异常,它将在调用此方法时再次被引发。
    3 obj.ready():如果调用完成,返回True
    4 obj.successful():如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发异常
    5 obj.wait([timeout]):等待结果变为可用。
    6 obj.terminate():立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函数

    进程池与多进程效率对比:

    from multiprocessing import Process,Pool
    import time
    
    def func(n):
        for i in range(5):
            print(n + 1)
    
    
    if __name__ == '__main__':
        start_time = time.time()
        pool = Pool(5)
        pool.map(func, range(50))  自带join,close方法,后面的代码同步
        end_time = time.time()
        t1 = end_time-start_time 
    
        start = time.time()
        p_lst = []
        for i in range(100):
            p = Process(target=func, args=(i,))
            p.start()
            p_lst.append(p)
        for p in p_lst:
            p.join()
        end = time.time()
        t2 = end - start
        print(t1, t2)  # 0.3350791931152344 6.496629953384399
    

    进程池的同步调用

    from multiprocessing import Pool
    import time
    import os
    
    
    def func(n):
        print('start func%s' % n,os.getpid())
        time.sleep(0.3)
        print('end func%s' % n, os.getpid())
    
    
    if __name__ == '__main__':
        pool = Pool(5)  产生5个进程,后面的任务都用这5个进程执行
        for i in range(5):  5个任务
            pool.apply(func, args=(i,))  自带join
    # 同步调用
    '''
    start func0 1076
    end func0 1076
    start func1 7000
    end func1 7000
    start func2 2848
    end func2 2848
    start func3 1988
    end func3 1988
    start func4 4272
    end func4 4272
    '''
    

    进程池的异步调用

    from multiprocessing import Pool
    import time,os
    
    
    def func(n):
        print('start func%s' % n, os.getpid())
        time.sleep(0.3)
        print('end func%s' % n, os.getpid())
    
    
    if __name__ == '__main__':
        p = Pool(5)
        for i in range(10):
            p.apply_async(func, args=(i,))
        p.close()  # 结束进程池接收任务
        p.join()  # 感知进程池中的任务执行结束
    # 异步调用
    '''
    start func0 3596
    start func1 8628
    start func2 3436
    start func3 5596
    start func4 2804
    end func0 3596
    start func5 3596
    end func1 8628
    start func6 8628
    end func2 3436
    start func7 3436
    end func3 5596
    start func8 5596
    end func4 2804
    start func9 2804
    end func5 3596
    end func6 8628
    end func7 3436
    end func8 5596
    end func9 2804
    '''
    

    进程池的返回值

    '''
    多进程中子进程的返回值
        主进程接收不到子进程中的返回值
    '''
    
    from multiprocessing import Process
    
    
    def func1(n):
        return n*2
    
    
    if __name__ == '__main__':
        for i in range(5):
            p = Process(target=func1, args=(i,))
            res = p.start()
            print(res)
    '''
    None
    None
    None
    None
    None
    '''
    
    '''
    进程池的返回值
        p.map  # [0, 1, 4, 9, 16]
        p.apply # 0,1,4,9,16
        p.apply_async  需要通过ret.get()取返回值			
    '''
    from multiprocessing import Pool
    
    
    def func(n):
        return n*n
    
    
    if __name__ == '__main__':
        p = Pool(5)
        ret = p.map(func, range(5))
        print(ret)  # [0, 1, 4, 9, 16]
    '''
    0
    1
    4
    9
    16
    '''
    
    from multiprocessing import Pool
    
    
    def func(n):
        return n*n
    
    
    if __name__ == '__main__':
        p = Pool(5)
        r_lst=[]
        for i in range(5):
            ret = p.apply_async(func, args=(i,))
            r_lst.append(ret)
        for res in r_lst:
            print(res.get())
    
        p.close()
        p.join()
    '''
    0
    1
    4
    9
    16
    '''
    

    注意:

    • 多进程当中,子进程返回值不能被父进程接收
    • 进程池中,子进程可以有返回值能被父进程接收

    进程池的回调函数

    import os
    from multiprocessing import Pool
    
    
    def func1(n1):
        print('in func1', os.getpid())
        return n1*n1
    
    
    def func2(n2):
        print('in func2', os.getpid())
        print(n2)
    
    
    if __name__ == '__main__':
        print('主进程:', os.getpid())
        p = Pool(5)
        for i in range(10):
            p.apply_async(func1, args=(i,), callback=func2)
        p.close()
        p.join()
    '''
    1.设置了回调函数,会将func1的返回值当作参数传给回调函数,并执行
    2.func2是在主进程中执行,并没有开启新的子进程去执行func2
    '''
    

    线程

    进程是资源分配的最小单位,线程是CPU调度的最小单位.
    每一个进程中至少有一个线程。 
    线程与进程的区别可以归纳为以下4点:
      - 1)地址空间和其它资源(如打开文件):进程间相互独立,同一进程的各线程间共享。某进程内的线程在其它进程不可见。
      - 2)通信:进程间通信IPC,线程间可以直接读写进程数据段(如全局变量)来进行通信——需要进程同步和互斥手段的辅助,以保证数据的一致性。
      - 3)调度和切换:线程上下文切换比进程上下文切换要快得多。
      - 4)在多线程操作系统中,进程不是一个可执行的实体。
    线程的特点:
    enter description here

    进程 是 最小的 内存分配单位
    线程 是 操作系统调度的最小单位
    线程直接被CPU执行,进程内至少含有一个线程,也可以开启多个线程
        开启一个线程所需要的时间要远远小于开启一个进程
        多个线程内部有自己的数据栈,数据不共享
        全局变量在多个线程之间是共享的
    GIL锁(即全局解释器锁)
    在Cpython解释器下的python程序 在同一时刻 多个线程中只能有一个线程被CPU执行
    高CPU : 计算类 --- 高CPU利用率
    高IO  : 爬取网页 200个网页
            qq聊天   send recv
            处理日志文件 读文件
            处理web请求
            读数据库 写数据库
    

    全局解释器锁GIL锁:
    由于cpython解释器有垃圾处理机制,间隔一定事件启动垃圾回收线程对数据进行回收,会造成python解释器的数据不安全

    1. 同一时刻只能有一个线程访问cpu
    2. 锁的是线程,控制线程对python解释器数据的访问
    3. 是Cpython解释器的特性
    4. 针对高CPU类型的运算,会降低效率
    5. 针对高IO的程序,并没有什么影响

    线程之间的数据是共享的:

    from threading import Thread
    
    n = 100
    
    
    def func():
        global n
        n = 0
    
    
    t = Thread(target=func)
    t.start()
    print(n)  # 0
    

    子进程中不能有input操作,子线程中可以有input操作。

    线程的一些方法:
    Thread实例对象的方法

    • isAlive(): 返回线程是否活动的。
    • getName(): 返回线程名。
    • setName(): 设置线程名。

    threading模块提供的一些方法:

    • threading.currentThread(): 返回当前的线程变量。
    • threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
    • threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
    from threading import Thread
    import threading
    import time
    
    
    def work():
        time.sleep(1)
        print(threading.current_thread().getName())  # Thread-1
    
    
    if __name__ == '__main__':
        t = Thread(target=work)
        t.start()
        print(t.getName())  # Thread-1
        print(t.is_alive())  # True
        print(threading.current_thread().getName())  # MainThread
        print(threading.current_thread())  # <_MainThread(MainThread, started 7560)>
        print(threading.enumerate())  # [<_MainThread(MainThread, started 7560)>, <Thread(Thread-1, started 496)>]
        print(threading.active_count())  # 2
    

    守护线程

    守护进程:

    1. 一个进程被设置成守护进程,在主进程代码运行完,守护进程就结束。
    2. 主进程会等待其他非守护进程执行完成,回收资源再结束。

    守护线程:

    1. 一个线程被设置成守护线程,要在主线程结束后,守护线程才结束。
    2. 而主线程会在其他非守护线程运行完毕后才结束
    3. 一个进程内,主线程结束就意味着进程执行结束
    from threading import Thread
    import time
    
    
    def func1():
        time.sleep(5)
        print('in func1')
    
    
    def func2():
        time.sleep(10)
        print('in func2')
    
    
    t1 = Thread(target=func1)
    t2 = Thread(target=func2)
    t1.daemon = True  # 设置守护线程
    t1.start()
    t2.start()
    print('主线程')
    # 执行结果
    '''
    主线程
    in func1
    in func2
    '''
    

    死锁与递归锁

    控制同一时间内只允许一个线程对程序进行执行。

    • 死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程
    • 递归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。一个线程拿到锁,counter加1,该线程内又碰到加锁的情况,则counter继续加1,这期间所有其他线程都只能等待,等待该线程释放所有锁,即counter递减到0为止
    死锁:互斥锁只能acquire一次
    
    from threading import Thread,Lock
    import time
    mutexA=Lock()
    mutexB=Lock()
    
    class MyThread(Thread):
        def run(self):
            self.func1()
            self.func2()
    
        def func1(self):
            mutexA.acquire()
            print('%s 拿到A锁' %self.name)
    
            mutexB.acquire()
            print('%s 拿到B锁' %self.name)
            mutexB.release()
    
            mutexA.release()
    
        def func2(self):
            mutexB.acquire()
            print('%s 拿到B锁' %self.name)
            time.sleep(2)
    
            mutexA.acquire()
            print('%s 拿到A锁' %self.name)
            mutexA.release()
    
            mutexB.release()
    
    
    if __name__ == '__main__':
        for i in range(10):
            t=MyThread()
            t.start()
            
        
    
    
    ----------
    
    
    递归锁:可以连续acquire多次,每acquire一次计数器+1,只有计数为0时,才能被抢到acquire
    
    from threading import Thread,RLock
    
    mutex = RLock()
    
    class MyThread(Thread):
        def run(self):
            self.func1()
            self.func2()
    
        def func1(self):
            mutex.acquire()
            print('%s 拿到A锁' % self.name)
            mutex.acquire()
            print('%s 拿到B锁' % self.name)
            mutex.release()
            mutex.release()
    
        def func2(self):
            mutex.acquire()
            print('%s 拿到B锁' % self.name)
            mutex.acquire()
            print('%s 拿到A锁' % self.name)
            mutex.release()
            mutex.release()
    
    
    if __name__ == '__main__':
        for i in range(3):
            t = MyThread()
            t.start()
            
    
    

    信号量Semaphore

    控制同一时间内只有指定个数的线程能够执行代码

    from threading import Thread,Semaphore,currentThread
    import time,random
    
    sm=Semaphore(3)
    
    def task():
        # sm.acquire()
        # print('%s in' %currentThread().getName())
        # sm.release()
        with sm:  # 自动管理锁
            print('%s in' %currentThread().getName())
            time.sleep(random.randint(1,3))
    
    
    if __name__ == '__main__':
        for i in range(10):
            t=Thread(target=task)
            t.start()
    
    

    事件Event

    通过信号来控制线程之间的状态,阻塞和执行

    import time
    import random
    from threading import Thread,Event
    def connect_db(e):
        count = 0
        while count < 3:
            e.wait(0.5)   # 状态为False的时候,我只等待1s就结束
            if e.is_set() == True:
                print('连接数据库')
                break
            else:
                count += 1
                print('第%s次连接失败'%count)
        else:
            raise TimeoutError('数据库连接超时')
    
    def check_web(e):
        time.sleep(random.randint(0,3))
        e.set()
    
    e = Event()
    t1 = Thread(target=connect_db,args=(e,))
    t2 = Thread(target=check_web,args=(e,))
    t1.start()
    t2.start()
    
    

    条件Condition

    使得线程等待,只有满足某条件时,才释放n个线程

    • Python提供的Condition对象提供了对复杂线程同步问题的支持。Condition被称为条件变量,除了提供与Lock类似的acquire和release方法外,还提供了wait和notify方法。线程首先acquire一个条件变量,然后判断一些条件。如果条件不满足则wait;如果条件满足,进行一些处理改变条件后,通过notify方法通知其他线程,其他处于wait状态的线程接到通知后会重新判断条件。不断的重复这一过程,从而解决复杂的同步问题。
    条件
    锁
    acquire release
    一个条件被创建之初 默认有一个False状态
    False状态 会影响wait一直处于等待状态
    notify(int数据类型)  造钥匙
    使用wait 和notify的前后都要acquire和release
    
    
    from threading import Thread,Condition
    def func(con,i):
        con.acquire()
        con.wait() # 等钥匙
        print('在第%s个循环里'%i)
        con.release()
    con = Condition()
    for i in range(10):
        Thread(target=func,args = (con,i)).start()
    while True:
        num = int(input('>>>'))
        con.acquire()
        con.notify(num)  # 造钥匙
        con.release()
    
    

    定时器Timer

    定时发起线程

    import time
    from threading import Timer
    def func():
        print('时间同步')   #1-3
    
    while True:
        t = Timer(5,func).start()   # 非阻塞的
        time.sleep(5)
    

    线程队列

    import queue
    
    q = queue.Queue(3)  # 先进先出,指定队列的个数
    
    q.put('alex')  # 把数据放入队列
    q.put('egon')
    q.put('peiqi')
    超过指定个数
    q.put('eva', block=True)  # 默认block=True,阻塞,不报错
    q.put('eva', block=True, timeout=3)  # timeout=3设置时间,3秒后就报错
    q.put('eva', block=False)  # block=False,直接报错,不阻塞
    q.put_nowait('eva')  # 不等待,直接报错,等效# q.put('eva', block=False)
    q.empty()  # 清空队列
    
    print(q.get())  # 'alex'
    print(q.get())  # 'egon'
    print(q.get())  # 'peiqi'
    超过指定个数
    q.get(block=True)  # q.get()默认block=True,阻塞,不报错
    q.get(block=True, timeout=2)  # timeout=2,2秒后报错
    q.get(block=False)  # block=False,直接报错,不阻塞
    q.get_nowait()  # 不等待,直接报错,等效# q.get(block=False)
    
    
    '''
    先进后出 堆栈
    '''
    q = queue.LifoQueue(3)  # 先进后出 堆栈
    
    q.put('alex')
    q.put('egon')
    q.put('peiqi')
    
    
    print(q.get())  # peiqi
    print(q.get())  # egon
    print(q.get())  # alex
    
    
    '''
    优先级队列
    '''
    q = queue.PriorityQueue(3)  # 优先级队列
    q.put((10, 'alex'))  # 放进队列的内容必须全部为同种数据类型,只能是列表或者元组,不能混合
    q.put((40, 'egon'))   # 第一个元素为优先级内容,数字越小优先级越高,
    q.put((5, 'peiqi'))
    
    print(q.get())  # (5, 'peiqi')  优先级越高的最先被取出来
    print(q.get())  # (10, 'alex')
    print(q.get())  # (40, 'egon')
    q.put([10, 'alex'])
    print(q.get())   # [10, 'alex']
    

    线程池

    1. 介绍
      concurrent.futures模块提供了高度封装的异步调用接口
      ThreadPoolExecutor:线程池,提供异步调用
      ProcessPoolExecutor: 进程池,提供异步调用
      Both implement the same interface, which is defined by the abstract Executor class.

    2. 基本方法

    • submit(fn, *args, **kwargs)
      异步提交任务

    • map(func, *iterables, timeout=None, chunksize=1)
      取代for循环submit的操作,map拿不到返回值

    • shutdown(wait=True)
      相当于进程池的pool.close()+pool.join()操作
      wait=True,等待池内所有任务执行完毕回收完资源后才继续
      wait=False,立即返回,并不会等待池内的任务执行完毕
      但不管wait参数为何值,整个程序都会等到所有任务执行完毕
      submit和map必须在shutdown之前

    • result(timeout=None)
      取得结果

    • add_done_callback(fn)
      回调函数

    from concurrent.futures import ThreadPoolExecutor
    import time
    
    
    def func(n):
        time.sleep(1)
        print(n)
        return n*n
    
    
    tpool = ThreadPoolExecutor(5)   默认 不要超过cpu个数*5
    t_lst = []
    for i in range(20):
        t = tpool.submit(func, i)  # 提交任务给线程
        t_lst.append(t)
    tpool.shutdown()  # close + join
    print('主线程')
    
    for t in t_lst:
        print('###', t.result())
    

    回调函数

    from concurrent.futures import ThreadPoolExecutor
    import requests
    import time
    
    def get(url):
        print('GET %s' %url)
        response=requests.get(url)
        time.sleep(3)
        return {'url':url,'content':response.text}
    
    
    def parse(res):
        res=res.result()
        print('%s parse res is %s' %(res['url'],len(res['content'])))
    
    
    if __name__ == '__main__':
        urls=[
            'http://www.cnblogs.com/linhaifeng',
            'https://www.python.org',
            'https://www.openstack.org',
        ]
    
        pool=ThreadPoolExecutor(2)
    
        for url in urls:
            pool.submit(get,url).add_done_callback(parse)
    

    协程

    本质单个线程下实现并发。
    协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。
    基于单线程来实现并发,即只用一个主线程(很明显可利用的cpu只有一个)情况下实现并发,为此我们需要先回顾下并发的本质:切换+保存状态

    1. cpu正在运行一个任务,会在两种情况下切走去执行其他的任务(切换由操作系统强制控制),一种情况是该任务发生了阻塞,另外一种情况是该任务计算的时间过长或有一个优先级更高的程序替代了它

    总结协程特点:

    • 必须在只有一个单线程里实现并发
    • 修改共享数据不需加锁
    • 用户程序里自己保存多个控制流的上下文栈
    • 附加:一个协程遇到IO操作自动切换到其它协程(如何实现检测IO,yield、greenlet都无法实现,就用到了gevent模块(select机制))

    yield

    
    yield
        无法实现遇到IO切换
    
    # 并发执行
    
    
    import time
    def producer():
        g = consumer()
        next(g)
        for i in range(10000000):
            g.send(i)
    
    
    def consumer():
        while True:
            res = yield
    
    
    start = time.time()
    producer()
    print(time.time() - start)
    

    greenlet模块

    greenlet
        无法实现线程遇到IO切换
        
    from greenlet import greenlet
    
    def eat(name):
        print('%s eat 1' %name)
    
        g2.switch('egon')
        print('%s eat 2' %name)
        g2.switch()
    
    def play(name):
        print('%s play 1' %name )
        g1.switch()
        print('%s play 2' %name )
    
    g1=greenlet(eat)
    g2=greenlet(play)
    
    g1.switch('egon')
    

    gevent模块

    gevent模块
        可以在线程中检测IO进行切换
        monkey.patch_all()识别所有的IO操作,整个文件的开头
    
    from gevent import monkey;monkey.patch_all()
    import time
    import gevent
    
    
    def eat():
        print('eating start')
        time.sleep(1)
        print('eating end')
    
    
    def play():
        print('playing start')
        time.sleep(1)
        print('playing end')
    
    
    g1 = gevent.spawn(eat)
    g2 = gevent.spawn(play)
    
    g1.join()
    g2.join()  # gevent模块提交人物的方式是异步的,需要加join
    gevent.joinall([g1,g2])  # 也可以一起join
    
    

    IO模型

    1. 同步
    • 所谓同步,就是在发出一个功能调用时,在没有得到结果之前,该调用就不会返回。按照这个定义,其实绝大多数函数都是同步调用。但是一般而言,我们在说同步、异步的时候,特指那些需要其他部件协作或者需要一定时间完成的任务。程序仍然处于就绪态。
    1. 异步
    • 异步的概念和同步相对。当一个异步功能调用发出后,调用者不能立刻得到结果。当该异步功能完成后,通过状态、通知或回调来通知调用者。如果异步功能用状态来通知,那么调用者就需要每隔一定时间检查一次,效率就很低(有些初学多线程编程的人,总喜欢用一个循环去检查某个变量的值,这其实是一 种很严重的错误)。如果是使用通知的方式,效率则很高,因为异步功能几乎不需要做额外的操作。至于回调函数,其实和通知没太多区别。
    1. 阻塞
    • 阻塞调用是指调用结果返回之前,当前线程会被挂起(如遇到io操作)。程序处于阻塞态。函数只有在得到结果之后才会将阻塞的线程激活。有人也许会把阻塞调用和同步调用等同起来,实际上他是不同的。对于同步调用来说,很多时候当前线程还是激活的,只是从逻辑上当前函数没有返回而已。
      #举例:
      #1. 同步调用:apply一个累计1亿次的任务,该调用会一直等待,直到任务返回结果为止,但并未阻塞住(即便是被抢走cpu的执行权限,那也是处于就绪态);
      #2. 阻塞调用:当socket工作在阻塞模式的时候,如果没有数据的情况下调用recv函数,则当前线程就会被挂起,直到有数据为止。
    1. 非阻塞
    • 非阻塞和阻塞的概念相对应,指在不能立刻得到结果之前也会立刻返回,同时该函数不会阻塞当前线程。

    小结:

    1. 同步与异步针对的是函数/任务的调用方式:同步就是当一个进程发起一个函数(任务)调用的时候,一直等到函数(任务)完成,而进程继续处于激活状态。而异步情况下是当一个进程发起一个函数(任务)调用的时候,不会等函数返回,而是继续往下执行当,函数返回的时候通过状态、通知、事件等方式通知进程任务完成。
    2. 阻塞与非阻塞针对的是进程或线程:阻塞是当请求不能满足的时候就将进程挂起,而非阻塞则不会阻塞当前进程

    IO发生时涉及的对象和步骤。对于一个network IO (这里我们以read举例),它会涉及到两个系统对象,一个是调用这个IO的process (or thread),另一个就是系统内核(kernel)。当一个read操作发生时,该操作会经历两个阶段:

    1. 等待数据准备 (Waiting for the data to be ready)
    2. 将数据从内核拷贝到进程中(Copying the data from the kernel to the process)
    3. 记住这两点很重要,因为这些IO模型的区别就是在两个阶段上各有不同的情况。

    补充:

    1、输入操作:read、readv、recv、recvfrom、recvmsg共5个函数,如果会阻塞状态,则会经理wait data和copy data两个阶段,如果设置为非阻塞则在wait 不到data时抛出异常
    
    2、输出操作:write、writev、send、sendto、sendmsg共5个函数,在发送缓冲区满了会阻塞在原地,如果设置为非阻塞,则会抛出异常
    
    3、接收外来链接:accept,与输入操作类似
    
    4、发起外出链接:connect,与输出操作类似
    

    阻塞IO(blocking IO)

    blocking IO的特点就是在IO执行的两个阶段(等待数据和拷贝数据两个阶段)都被block了。

    非阻塞IO(blocking IO)

    非阻塞的recvform系统调用调用之后,进程并没有被阻塞,内核马上返回给进程,如果数据还没准备好,此时会返回一个error。进程在返回之后,可以干点别的事情,然后再发起recvform系统调用。重复上面的过程,循环往复的进行recvform系统调用。这个过程通常被称之为轮询。轮询检查内核数据,直到数据准备好,再拷贝数据到进程,进行数据处理。需要注意,拷贝数据整个过程,进程仍然是属于阻塞的状态。在非阻塞式IO中,用户进程其实是需要不断的主动询问kernel数据准备好了没有。
    但是非阻塞IO模型绝不被推荐
    缺点:
    #1. 循环调用recv()将大幅度推高CPU占用率;这也是我们在代码中留一句time.sleep(2)的原因,否则在低配主机下极容易出现卡机情况
    #2. 任务完成的响应延迟增大了,因为每过一段时间才去轮询一次read操作,而任务可能在两次轮询之间的任意时间完成。这会导致整体数据吞吐量的降低。

    Server端
    
    '''
    非阻塞IO
        能够实现在wait data阶段执行其他
        accept recv send 都是IO
        send --- copy data 阶段
        recv --- wait data 、 copy data
    '''
    import socket
    
    sk = socket.socket()
    sk.bind(('127.0.0.1', 8091))
    sk.listen(5)
    sk.setblocking(False)  # 设置非阻塞
    
    
    rlist = []
    wlist = []
    
    while True:
        try:
            conn, addr = sk.accept()
            rlist.append(conn)
        except BlockingIOError:  # 捕捉异常
    
            # 收消息
            del_rlist = []
            for conn in rlist:
                try:
                    # 收消息
                    data = conn.recv(1024)
                    if not data:
                        del_rlist.append(conn)
                        continue
                    wlist.append((conn, data.upper()))
                    conn.send(data.upper())
                except BlockingIOError:
                    continue
    
                except ConnectionResetError:
                    conn.close()
                    del_rlist.append(conn)
    
            # 发消息
            del_wlist = []
            for item in wlist:
                try:
                    conn = item[0]
                    data = item[1]
                    conn.send(data)
                    del_wlist.append(item)
                except BlockingIOError:  # 内存满了,重新发给系统缓存
                    pass
    
            for item in del_wlist:
                wlist.remove(item)
    
            for conn in del_rlist:
                rlist.remove(conn)
    
    sk.close()
    
    
    ----------
    Client端
    
    import socket
    
    sk = socket.socket()
    sk.connect(('127.0.0.1', 8091))
    
    
    while True:
        cmd = input('>>>:').strip()
        if not cmd:
            continue
        sk.send(bytes(cmd, encoding='utf-8'))
        data = sk.recv(1024).decode('utf-8')
        print(data)
    
    sk.close()
    
    
    

    多路复用IO(IO multiplexing)

    当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,
    当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。
    这个图和blocking IO的图其实并没有太大的不同,事实上还更差一些。因为这里需要使用两个系统调用(select和recvfrom),
    而blocking IO只调用了一个系统调用(recvfrom)。但是,用select的优势在于它可以同时处理多个connection。
    select监听fd变化的过程分析:

    1. 用户进程创建socket对象,拷贝监听的fd到内核空间,每一个fd会对应一张系统文件表,内核空间的fd响应到数据后,就会发送信号给用户进程数据已到;
    2. 用户进程再发送系统调用,比如(accept)将内核空间的数据copy到用户空间,同时作为接受数据端内核空间的数据清除,这样重新监听时fd再有新的数据又可以响应到了(发送端因为基于TCP协议所以需要收到应答后才会清除)。
    Server端
    
    import socket
    import select
    
    sk = socket.socket()
    sk.bind(('127.0.0.1', 9000))
    sk.listen(5)
    sk.setblocking(False)
    
    
    read_list = [sk, ]  # 监测的IO对象
    write_list = []  # 发消息的IO对象
    wdata = {}
    while True:
        r_list, w_list, x_list = select.select(read_list, write_list, [], 0.5)
        '''
        参数是三个列表:
            传入的是需要监测的IO对象
        得到的是3个列表:
            是操作系统响应的IO对象,哪些有数据了就返回到r_list, w_list中
        '''
    
        for sock in r_list:
            if sock == sk:  # 当有人来链接的时候,就创建conn,加入到监测的列表中read_list
                conn, address = sock.accept()
                read_list.append(conn)
            else:
                try:  # conn接收消息
                    data = sock.recv(1024)
                    # if not data:
                    # 	sock.close()
                    # 	read_list.remove(sock)
                    # 	continue
                    write_list.append(sock)
                    wdata[sock] = 'hello' + data.decode('utf-8')
    
                except Exception:
                    sock.close()
                    read_list.remove(sock)
    
        for sock in w_list:
            sock.send(bytes(wdata[sock], encoding='utf-8'))
            write_list.remove(sock)
            wdata.pop(sock)
            
            
    
    
    ----------
    Client端
    
    import socket
    
    sk = socket.socket()
    sk.connect(('127.0.0.1', 9000))
    
    
    while True:
        cmd = input('>>>:').strip()
        if not cmd:
            continue
        sk.send(bytes(cmd, encoding='utf-8'))
        data = sk.recv(1024).decode('utf-8')
        print(data)
    
    sk.close()
    
    
    

    异步IO(IO multiplexing)

    python本身代码无法实现,只能用到其他框架例如Tornado等

  • 相关阅读:
    Java语言
    包名规范
    带参数的方法
    成员变量和局部变量
    Java数据类型
    java反射机制
    JDK安装
    注释
    二维数组
    数组的经典排序
  • 原文地址:https://www.cnblogs.com/james201133002/p/9584077.html
Copyright © 2011-2022 走看看