zoukankan      html  css  js  c++  java
  • 并发编程之多线程篇之五

    主要内容:

      一、异步调用与回调机制

        二、多线程实现并发的套接字通信

      三、gevent实现并发的套接字通信

    1️⃣ 异步调用与回调机制

      提交任务的两种方式

      1、同步调动

       1.1 什么是同步调用?
        提交完任务后,在原地等待(并非阻塞)直到任务执行完毕,拿到结果,再执行下一行代码,导致程序串行执行。

      1.2 实例:
        
    import time,random
    from concurrent.futures import ThreadPoolExecutor
    
    def eating(name):
        print('%s is eating'%name)
        time.sleep(random.randint(2,5))
        res = random.randint(2,4)
        return {'name':name,'res':res}
    
    def weigh(account):
        name = account['name']
        weight = account['res']
        print('%s 吃了 <%s> kg!'%(name,weight))
    
    if __name__ == '__main__':
        pool = ThreadPoolExecutor(3)
        eat1 = pool.submit(eating,'qw').result() # .result()表示拿到结果
        weigh(eat1)
        eat2 = pool.submit(eating,'hj').result()
        weigh(eat2)
        eat3 = pool.submit(eating,'sc').result()
        weigh(eat3)
    '''执行结果:
    qw is eating
    qw 吃了 <4> kg!
    hj is eating
    hj 吃了 <3> kg!
    sc is eating
    sc 吃了 <2> kg!
    '''

      2、异步调用

        2.1 什么是异步调用?

          提交完任务后,不在原地等待任务执行完毕。

        2.1 实例:

    import time,random
    from concurrent.futures import ThreadPoolExecutor
    
    def eating(name):
        print('%s is eating'%name)
        time.sleep(random.randint(2,5))
        res = random.randint(2,4)
        return {'name':name,'res':res}
    
    def weigh(account):
        account = account.result()
        name = account['name']
        weight = account['res']
        print('%s 吃了 <%s> kg!'%(name,weight))
    
    if __name__ == '__main__':
        pool = ThreadPoolExecutor(3)
        pool.submit(eating,'qw').add_done_callback(weigh) # 对象后直接加回调函数(仅填函数名),将回调函数前的函数的返回值对象作为参数赋给weigh函数
        pool.submit(eating,'hj').add_done_callback(weigh)
        pool.submit(eating,'sc').add_done_callback(weigh)
    '''执行结果:
    qw is eating
    hj is eating
    sc is eating
    hj 吃了 <2> kg!
    qw 吃了 <2> kg!
    sc 吃了 <4> kg!
    '''

      3、回调函数

        3.1 含义:可以为进程池或线程池内的每个进程或线程绑定一个函数,该函数在进程或线程的任务

        执行完毕后自动触发,并接收任务的返回值当作参数,该函数称为回调函数。

        3.2 实例 

    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    from multiprocessing import Pool
    import requests
    import json
    import os
    
    def get_page(url):
        print('<进程%s> get %s' %(os.getpid(),url))
        respone=requests.get(url)
        if respone.status_code == 200:
            return {'url':url,'text':respone.text}
    
    def parse_page(res):
        res=res.result()
        print('<进程%s> parse %s' %(os.getpid(),res['url']))
        parse_res='url:<%s> size:[%s]
    ' %(res['url'],len(res['text']))
        with open('db.txt','a') as f:
            f.write(parse_res)
    
    
    if __name__ == '__main__':
        urls=[
            'https://www.baidu.com',
            'https://www.python.org',
            'https://www.openstack.org',
            'https://help.github.com/',
            'http://www.sina.com.cn/'
        ]
    
        p=ProcessPoolExecutor(3)
        for url in urls:
            p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一个future对象obj,需要用obj.result()拿到结果
    View Code

    2️⃣ 多线程实现并发的套接字通信

      分为两部分,客户端(client.py)和服务端(server.py)

      client.py
      
    #!/usr/bin/env python3
    #-*- coding:utf-8 -*-
    # write by congcong
    
    import socket
    client = socket.socket(family=socket.AF_INET,type=socket.SOCK_STREAM)
    client.connect(('127.0.0.1',8808))
    while True:
        mes = input('>>>:').strip()
        if not mes:continue
        client.send(mes.encode('utf-8'))
        data = client.recv(1024)
        print(data.decode('utf-8'))
    
    client.close()
    View Code
    
    

       server.py(简单版,有缺陷)

    #!/usr/bin/env python3
    #-*- coding:utf-8 -*-
    # write by congcong
    
    import socket
    from threading import Thread
    # 不完善版,不应该无限制开线程
    
    def talk(conn):
        while True:
            try:
                data = conn.recv(1024)
                if not data: break
                print(data.decode('utf-8'))
                mes = input('>>>:').strip()
                conn.send(mes.encode('utf-8'))
            except ConnectionResetError:
                break
        conn.close()
    
    def connect(ip,port):
        server = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
        server.bind((ip,port))
        server.listen(5)
        while True:
            conn, addres = server.accept()
            t = Thread(target=talk,args=(conn,))
            t.start()
        server.close()
    if __name__ == '__main__':
        connect('127.0.0.1',8808)
    View Code

       server.py(改进版,基于线程池实现控制线程数目)

    #!/usr/bin/env python3
    #-*- coding:utf-8 -*-
    # write by congcong
    
    import socket
    
    from concurrent.futures import ThreadPoolExecutor
    def talk(conn):
        while True:
            try:
                data = conn.recv(1024)
                if not data: break
                print(data.decode('utf-8'))
                mes = input('>>>:').strip()
                conn.send(mes.encode('utf-8'))
            except ConnectionResetError:
                break
        conn.close()
    
    def connect(ip,port):
        server = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
        server.bind((ip,port))
        server.listen(5)
        while True:
            conn, addres = server.accept()
            pool.submit(talk,conn) # 异步提交
        server.close()
    
    if __name__ == '__main__':
        pool = ThreadPoolExecutor(2) # 同时只接收两个线程
        connect('127.0.0.1',8808)
    View Code

      

    3️⃣ gevent实现并发的套接字通信

      同样分为客户端(client.py)和服务端(server.py)。

      client.py 

    #!/usr/bin/env python3
    #-*- coding:utf-8 -*-
    # write by congcong
    
    import socket
    from threading import Thread,currentThread
    def client():
        client = socket.socket(family=socket.AF_INET,type=socket.SOCK_STREAM)
        client.connect(('127.0.0.1',8808))
        while True:
            client.send(('%s coming'%currentThread().getName()).encode('utf-8'))
            data = client.recv(1024)
            print(data.decode('utf-8'))
    
        client.close()
    if __name__ == '__main__':
        for i in range(300):
            t = Thread(target=client)
            t.start()
    View Code

      server.py

    # 基于gevent模块实现
    from gevent import monkey,spawn;monkey.patch_all()
    from socket import *
    
    def talk(conn):
        while True:
            try:
                data = conn.recv(1024)
                if not data:break
                #print(data.decode('utf-8'))
                conn.send(data.upper())
            except ConnectionResetError:
                break
        conn.close()
    
    def server(ip,port):
        server = socket(AF_INET,SOCK_STREAM)
        server.bind((ip,port))
        server.listen(5)
        while True:
            conn,addres = server.accept()
            spawn(talk,conn) # 生成协程对象
        server.close()
    
    if __name__ =='__main__':
        g = spawn(server,'127.0.0.1',8808)
        g.join()
    View Code

      

     
  • 相关阅读:
    第四章:(2)原理之 Dubbo 框架设计
    大三学习进度29
    大三学习进度27
    大三学习进度31
    大三学习进度24
    大三学习进度29
    大三学习进度26
    大三学习进度28
    大三学习进度25
    大三学习进度32
  • 原文地址:https://www.cnblogs.com/schut/p/9030236.html
Copyright © 2011-2022 走看看