zoukankan      html  css  js  c++  java
  • 线程与进程(续)

    socket服务端实现并发

    # 服务端
    import socket
    from threading import Thread
    service = socket.socket()
    service.bind(('127.0.0.1',8080))
    service.listen(5) # 半连接池
    
    def communicate(conn):
        while True:
            try:
                data = conn.recv(1024) #阻塞
                if len(data) == 0:break
                print(data)
                conn.send(data.upper())
            except ConnectionResetError:
                break
        conn.close()
    while True:
        conn,addr = service.accept()
        print(addr)
        print(conn)
        t = Thread(target=communicate,args=(conn,))
        t.start()
    
    #客户端
    import socket
    client = socket.socket()
    client.connect(('127.0.0.1',8080))
    
    while True:
        info  = input('>>>:').encode('utf-8')
        if len(info) == 0:continue
        client.send(info)
        data = client.recv(1024)
        print(data)

    # 注意在socker中listen中班连接池限制就失去作用了。可以使用线程池对连接数进行限制

    无论是开线程还是进程都消耗资源,开线程消耗的资源比开进程小,

    池:

      为了减缓计算机硬件压力,避免计算机硬件设备崩溃

      虽然减轻了计算机硬件的压力,但是一定程度上减低了持续的效率

    进程池线程:

      为了限制开设的进程和线程数,从而保证计算机硬件的安全。

    如何使用:

    使用进程池限制进程数

    异步提交:
    异步调用:提交完一个任务之后,不在原地等待,结果,而是直接执行下一个行代码,会导致任务是并发执行的

    from
    concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import time import os def task(name): print('%s %s is running'% (name,os.getpid())) time.sleep(1) if __name__ == '__main__': p = ProcessPoolExecutor(4)  #默认情况下不指定使用时cpu的核心数 for i in range(20): p.submit(task,'进程%s的pid:'%i) print('') 输出结果: 进程12的pid: 11484 is running 进程13的pid: 11072 is running 进程14的pid: 2784 is running 进程15的pid: 12328 is running 进程16的pid: 11484 is running 进程17的pid: 11072 is running 进程18的pid: 2784 is running 进程19的pid: 12328 is running 根据结果可以看出一直是4个进程在进行运算,限制了进程数
    同步提交
    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    import time
    import os
    def task(name):
        print('%s %s is running'% (name,os.getpid()))
        time.sleep(1)
        return 123  # 返回结果
    if __name__ == '__main__':
        p = ProcessPoolExecutor(4)  #默认不指定参数,值是cpu核心数的5倍
        for i in range(20):
            res = p.submit(task,'进程%s的pid:'%i).result()   #提交完任务原地等待结果
            print(res)
        print('')
    输出:
    进程0的pid: 13900 is running
    123
    进程1的pid: 8996 is running
    123
    进程2的pid: 13496 is running
    123
    进程3的pid: 7548 is running
    123
    进程4的pid: 13900 is running
    123
    进程5的pid: 8996 is running
    123
    进程6的pid: 13496 is running
    123

    线程池

    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    import time
    import os
    def task(name,n):
        print('%s %s is running'% (name,os.getpid()))
        time.sleep(1)
        return n**2
    if __name__ == '__main__':
        p = ThreadPoolExecutor(4)
        # 提交任务的两种方式
        # 同步调用:提交完成一个任务后,就在原地等待,等待任务完完整整地运行完毕拿到结果后,再执行下一行代码,会导致任务是串行执行的。
        # 异步调用:提交完一个任务之后,不在原地等待,结果,而是直接执行下一个行代码,会导致任务是并发执行的
        l=[]
        for i in range(10):
            future = p.submit(task,'进程pid:',i)
            l.append(future)
        p.shutdown(wait=True)   #关闭进程池的入口,原地等待进程池内所有任务运行完毕
        for future in l:
            print(future.result())
    
        print('')
    输出结果:

    进程pid: 12932 is running
    进程pid: 12932 is running
    进程pid: 12932 is running
    进程pid: 12932 is running
    进程pid: 12932 is running
    进程pid: 12932 is running
    进程pid: 12932 is running
    进程pid: 12932 is running
    进程pid: 12932 is running
    进程pid: 12932 is running
    0
    1
    4
    9
    16
    25
    36
    49
    64
    81

    异步回调

    # 回调函数,异步提交之后一旦任务有返回结果,自动交给另外一个任务执行
    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    import time
    import os
    pool = ProcessPoolExecutor(5)
    
    
    def task(n):
        print(n,os.getpid())
        time.sleep(2)
        return n**2
    def call_back(n):
        print('我拿到了结果:%s'%n.result())
    
    if __name__ == '__main__':
        t_list=[]
        for i in range(10):
            future = pool.submit(task,i).add_done_callback(call_back)
            t_list.append(future)
        print('')
    输出:


    0 11468
    1 12216
    2 10784
    3 5564
    4 13252
    5 11468
    我拿到了结果:0
    6 12216
    我拿到了结果:1
    7 10784
    我拿到了结果:4
    8 5564
    我拿到了结果:9
    9 13252

    协程

    进程:资源单位(车间)

    线程:最小资源单位(流水线)

    协程:单线程下实现并发

    并发:看上去像是同时执行就可以称之为并发

    多道技术:

      空间上的复用

      时间上的复用

    核心:切换+保存状态

     

    协程:完全是技术人员自己定义的

      当任务是计算密集型:会降低效率

      当任务是io密集型:会提升效率

    from gevent import monkey;monkey.patch_all()   # 添加该模块会自动识别所有代码中的io操作
    from gevent import spawn
    
    # gevent本身不能识别time.sleep不属于该模块的io操作
    import time
    def heng(name):
        print('%s 哼'%name)
        time.sleep(2)
        print('%s 哼'% name)
    def ha(name):
        print('%s 哈'%name)
        time.sleep(3)
        print('%s 哈'% name)
    start = time.time()
    
    # heng('egon')
    # ha('yzn')
    # print('主',time.time()-start)    # 5.001242399215698
    
    s1=spawn(heng,'egon')
    s2=spawn(ha,'echo')
    s1.join()
    s2.join()
    print('',time.time()-start)    # 3.0121893882751465

    单线程sockert通信

    from gevent import monkey;monkey.patch_all()
    from gevent import spawn
    import socket


    def communicate(conn):
    while True:
    try:
    data = conn.recv(1024)
    if len(data) == 0:break
    print(data.decode('utf-8'))
    conn.send(data.upper())
    except ConnectionResetError:
    break
    conn.close()


    def server():
    server = socket.socket()
    server.bind(('127.0.0.1',8080))
    server.listen(5)
    while True:
    conn,addr = server.accept()
    spawn(communicate,conn)

    if __name__ == '__main__':
    s1 = spawn(server)
    s1.join()

    客户端:
    from threading import Thread,current_thread
    import socket


    def client():
    client = socket.socket()
    client.connect(('127.0.0.1',8080))
    n = 1
    while True:
    data = '%s %s'%(current_thread().name,n)
    n += 1
    client.send(data.encode('utf-8'))
    info = client.recv(1024)
    print(info)

    if __name__ == '__main__':
    for i in range(500):
    t = Thread(target=client)
    t.start()

    io模型:

    • 阻塞IO

    • 非阻塞IO(服务端通信针对accept用s.setblocking(False)加异常捕获,cpu占用率过高)

    • IO多路复用

      在只检测一个套接字的情况下,他的效率连阻塞IO都比不上。因为select这个中间人增加了环节。

      但是在检测多个套接字的情况下,就能省去wait for data过程

    • 异步IO

  • 相关阅读:
    Python爬虫_分布式爬虫
    Python爬虫_selenium
    Python爬虫_scrapy框架
    Python爬虫_高性能爬取
    Python爬虫_三种数据解析方式
    Python爬虫_requests模块
    Django
    Python爬虫相关基础概念
    MySQL 多表结构的创建与分析
    mysql
  • 原文地址:https://www.cnblogs.com/yangzhaon/p/10841737.html
Copyright © 2011-2022 走看看