zoukankan      html  css  js  c++  java
  • 并发编程之多线程篇之五

    主要内容:

      一、异步调用与回调机制

        二、多线程实现并发的套接字通信

      三、gevent实现并发的套接字通信

    1️⃣ 异步调用与回调机制

      提交任务的两种方式

      1、同步调动

       1.1 什么是同步调用?
        提交完任务后,在原地等待(并非阻塞)直到任务执行完毕,拿到结果,再执行下一行代码,导致程序串行执行。

      1.2 实例:
        
    import time,random
    from concurrent.futures import ThreadPoolExecutor
    
    def eating(name):
        print('%s is eating'%name)
        time.sleep(random.randint(2,5))
        res = random.randint(2,4)
        return {'name':name,'res':res}
    
    def weigh(account):
        name = account['name']
        weight = account['res']
        print('%s 吃了 <%s> kg!'%(name,weight))
    
    if __name__ == '__main__':
        pool = ThreadPoolExecutor(3)
        eat1 = pool.submit(eating,'qw').result() # .result()表示拿到结果
        weigh(eat1)
        eat2 = pool.submit(eating,'hj').result()
        weigh(eat2)
        eat3 = pool.submit(eating,'sc').result()
        weigh(eat3)
    '''执行结果:
    qw is eating
    qw 吃了 <4> kg!
    hj is eating
    hj 吃了 <3> kg!
    sc is eating
    sc 吃了 <2> kg!
    '''

      2、异步调用

        2.1 什么是异步调用?

          提交完任务后,不在原地等待任务执行完毕。

        2.1 实例:

    import time,random
    from concurrent.futures import ThreadPoolExecutor
    
    def eating(name):
        print('%s is eating'%name)
        time.sleep(random.randint(2,5))
        res = random.randint(2,4)
        return {'name':name,'res':res}
    
    def weigh(account):
        account = account.result()
        name = account['name']
        weight = account['res']
        print('%s 吃了 <%s> kg!'%(name,weight))
    
    if __name__ == '__main__':
        pool = ThreadPoolExecutor(3)
        pool.submit(eating,'qw').add_done_callback(weigh) # 对象后直接加回调函数(仅填函数名),将回调函数前的函数的返回值对象作为参数赋给weigh函数
        pool.submit(eating,'hj').add_done_callback(weigh)
        pool.submit(eating,'sc').add_done_callback(weigh)
    '''执行结果:
    qw is eating
    hj is eating
    sc is eating
    hj 吃了 <2> kg!
    qw 吃了 <2> kg!
    sc 吃了 <4> kg!
    '''

      3、回调函数

        3.1 含义:可以为进程池或线程池内的每个进程或线程绑定一个函数,该函数在进程或线程的任务

        执行完毕后自动触发,并接收任务的返回值当作参数,该函数称为回调函数。

        3.2 实例 

    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    from multiprocessing import Pool
    import requests
    import json
    import os
    
    def get_page(url):
        print('<进程%s> get %s' %(os.getpid(),url))
        respone=requests.get(url)
        if respone.status_code == 200:
            return {'url':url,'text':respone.text}
    
    def parse_page(res):
        res=res.result()
        print('<进程%s> parse %s' %(os.getpid(),res['url']))
        parse_res='url:<%s> size:[%s]
    ' %(res['url'],len(res['text']))
        with open('db.txt','a') as f:
            f.write(parse_res)
    
    
    if __name__ == '__main__':
        urls=[
            'https://www.baidu.com',
            'https://www.python.org',
            'https://www.openstack.org',
            'https://help.github.com/',
            'http://www.sina.com.cn/'
        ]
    
        p=ProcessPoolExecutor(3)
        for url in urls:
            p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一个future对象obj,需要用obj.result()拿到结果
    View Code

    2️⃣ 多线程实现并发的套接字通信

      分为两部分,客户端(client.py)和服务端(server.py)

      client.py
      
    #!/usr/bin/env python3
    #-*- coding:utf-8 -*-
    # write by congcong
    
    import socket
    client = socket.socket(family=socket.AF_INET,type=socket.SOCK_STREAM)
    client.connect(('127.0.0.1',8808))
    while True:
        mes = input('>>>:').strip()
        if not mes:continue
        client.send(mes.encode('utf-8'))
        data = client.recv(1024)
        print(data.decode('utf-8'))
    
    client.close()
    View Code
    
    

       server.py(简单版,有缺陷)

    #!/usr/bin/env python3
    #-*- coding:utf-8 -*-
    # write by congcong
    
    import socket
    from threading import Thread
    # 不完善版,不应该无限制开线程
    
    def talk(conn):
        while True:
            try:
                data = conn.recv(1024)
                if not data: break
                print(data.decode('utf-8'))
                mes = input('>>>:').strip()
                conn.send(mes.encode('utf-8'))
            except ConnectionResetError:
                break
        conn.close()
    
    def connect(ip,port):
        server = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
        server.bind((ip,port))
        server.listen(5)
        while True:
            conn, addres = server.accept()
            t = Thread(target=talk,args=(conn,))
            t.start()
        server.close()
    if __name__ == '__main__':
        connect('127.0.0.1',8808)
    View Code

       server.py(改进版,基于线程池实现控制线程数目)

    #!/usr/bin/env python3
    #-*- coding:utf-8 -*-
    # write by congcong
    
    import socket
    
    from concurrent.futures import ThreadPoolExecutor
    def talk(conn):
        while True:
            try:
                data = conn.recv(1024)
                if not data: break
                print(data.decode('utf-8'))
                mes = input('>>>:').strip()
                conn.send(mes.encode('utf-8'))
            except ConnectionResetError:
                break
        conn.close()
    
    def connect(ip,port):
        server = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
        server.bind((ip,port))
        server.listen(5)
        while True:
            conn, addres = server.accept()
            pool.submit(talk,conn) # 异步提交
        server.close()
    
    if __name__ == '__main__':
        pool = ThreadPoolExecutor(2) # 同时只接收两个线程
        connect('127.0.0.1',8808)
    View Code

      

    3️⃣ gevent实现并发的套接字通信

      同样分为客户端(client.py)和服务端(server.py)。

      client.py 

    #!/usr/bin/env python3
    #-*- coding:utf-8 -*-
    # write by congcong
    
    import socket
    from threading import Thread,currentThread
    def client():
        client = socket.socket(family=socket.AF_INET,type=socket.SOCK_STREAM)
        client.connect(('127.0.0.1',8808))
        while True:
            client.send(('%s coming'%currentThread().getName()).encode('utf-8'))
            data = client.recv(1024)
            print(data.decode('utf-8'))
    
        client.close()
    if __name__ == '__main__':
        for i in range(300):
            t = Thread(target=client)
            t.start()
    View Code

      server.py

    # 基于gevent模块实现
    from gevent import monkey,spawn;monkey.patch_all()
    from socket import *
    
    def talk(conn):
        while True:
            try:
                data = conn.recv(1024)
                if not data:break
                #print(data.decode('utf-8'))
                conn.send(data.upper())
            except ConnectionResetError:
                break
        conn.close()
    
    def server(ip,port):
        server = socket(AF_INET,SOCK_STREAM)
        server.bind((ip,port))
        server.listen(5)
        while True:
            conn,addres = server.accept()
            spawn(talk,conn) # 生成协程对象
        server.close()
    
    if __name__ =='__main__':
        g = spawn(server,'127.0.0.1',8808)
        g.join()
    View Code

      

     
  • 相关阅读:
    Populating Next Right Pointers in Each Node II
    Populating Next Right Pointers in Each Node
    Construct Binary Tree from Preorder and Inorder Traversal
    Construct Binary Tree from Inorder and Postorder Traversal
    Path Sum
    Symmetric Tree
    Solve Tree Problems Recursively
    632. Smallest Range(priority_queue)
    609. Find Duplicate File in System
    poj3159最短路spfa+邻接表
  • 原文地址:https://www.cnblogs.com/schut/p/9030236.html
Copyright © 2011-2022 走看看