zoukankan      html  css  js  c++  java
  • 异步非阻塞

    一、必备知识

    1.socket客户端:

    阻塞式客户端:

    import socket
    #实例化socket对象
    sock = socket.socket()
    #向服务器发起连接
    sock.connect(("43.226.160.17",80)) #连接会阻塞
    
    # 仿HTTP发送请求
    data = b"GET / HTTP/1.0
    host: dig.chouti.com
    
    "
    sock.sendall(data)
    #接收服务器响应信息
    response = sock.recv(8096) #等待数据回收时也会阻塞
    print(response)
    #关闭socket连接
    sock.close()

    伪非阻塞式:

    import socket
    #实例化socket对象
    sock = socket.socket()
    
    #设置非阻塞,不等待连接成功直接执行后面的代码,由于连接的操作不可能立即完成所以
    #会报一个BlockingIOError:无法立即完成一个非阻止性套接字操 的错误,因此需要处理异常
    sock.setblocking(False)
    
    #向服务器发起连接
    try:
        sock.connect(("43.226.160.17",80)) #连接会阻塞
    except BlockingIOError as e:
        print(e)
    import time
    #虽然会报错,但连接的请求已经发出去,所以连接请求依然生效,
    #我们可以使用这段时间进行其它的工作,达到资源的最大利用,这里用sleep代指
    time.sleep(5)
    
    # 仿HTTP发送请求
    data = b"GET / HTTP/1.0
    host: dig.chouti.com
    
    "
    
    #由于没有建立连接,所以这里还发不出消息
    try:
        sock.sendall(data)
        #接收服务器响应信息
        response = sock.recv(8096) #等待数据回收时也会阻塞
    except Exception as e:
        print(e)
    
    print(response)
    #关闭socket连接
    sock.close()
    

      

    2.IO多路复用加socket实现非阻塞

    伪代码:

    import socket
    import select
    socket_list = []
    for url in ["www.baidu.com","www.bing.com"]:
        client = socket.socket()
        client.setblocking(False)
        try:
            client.connect((url,80))
        except BlockingIOError as e:
            print(e)
    
        socket_list.append(client)
    
    #事件循环
    while True:
        r,w,e = select.select(socket_list,socket_list,[],0.05)
        #w代指每一个连接成功的client
        for obj in w:
            obj.send("GET / HTTP/1.0
    host: baidu.com
    
    ") #这里host写死,下面的代码会解决
        #r代指client有改变,可以收数据了
            for obj in r:
                response = obj.recv(1024)
                print(response)
    

      

    上边代码中,当对多个网站进行连接或爬取时,每一个单独的Url都不会阻塞,再通过select监测数据的变化,可以及时接收数据,又不会挡住后边Url的爬取工作,实现了简单的非阻塞的目标。

    但这只是一段伪代码,接下来根据这个思路实现一个真正的异步非阻塞模块。

    二、简单的异步非阻塞模块。

    import socket
    import select
    
    class Request(object):
        """
        封装socket对象,使每次循环时创建的socket对象能对应它的req_info字典,
        方便其利用字典拿到对应的host信息等
        """
        def __init__(self, sock, info):
            self.sock = sock
            self.info = info
    
        def fileno(self):
            return self.sock.fileno()
    
    
    class AsyncRequest(object):
        def __init__(self):
            self.sock_list = []
            self.conns = []
    
        def add_request(self, req_info):
            """
            创建请求
             req_info: {'host': 'www.baidu.com', 'port': 80, 'path': '/'},
            :return:
            """
            sock = socket.socket()
            sock.setblocking(False)
            try:
                sock.connect((req_info['host'], req_info['port']))
            except BlockingIOError as e:
                pass
    
            obj = Request(sock, req_info)
            self.sock_list.append(obj)
            self.conns.append(obj)
    
        def run(self):
            """
            开始事件循环,检测:连接成功?数据是否返回?
            :return:
            """
            while True:
                # select.select([socket对象,])
                # 可是任何对象,对象一定要有fileno方法,实际上执行的是对象.fileno()
                # select.select([request对象,])
                r, w, e = select.select(self.sock_list, self.conns, [], 0.05)
                # w,是否连接成功
                for obj in w:
                    # 检查obj:request对象
                    # socket, {'host': 'www.baidu.com', 'port': 80, 'path': '/'},
                    data = "GET %s http/1.1
    host:%s
    
    " % (obj.info['path'], obj.info['host'])
                    obj.sock.send(data.encode('utf-8'))
                    #连接成功后从列表中删除此obj对象,避免重复连接
                    self.conns.remove(obj)
                # 数据返回,接收到数据
                for obj in r:
                    response = obj.sock.recv(8096)
                    #函数名加括号运行对应的回调函数
                    obj.info['callback'](response)
                    #相应的为避免重复接收移除已经接收成功的对象
                    self.sock_list.remove(obj)
    
                # 所有请求已经返回
                if not self.sock_list:
                    break
    
    
    if __name__ == '__main__':
        #指定回调函数,可以在屏幕输出,也可以写入文件、数据库等
        def callback_fun1(response):
            print(response)
    
        def callback_fun2(response):
            pass
            # with open ......
    
        url_list = [
            {'host': 'www.baidu.com', 'port': 80, 'path': '/', 'callback': callback_fun1},
            {'host': 'www.cnblogs.com', 'port': 80, 'path': '/index.html', 'callback': callback_fun2},
            {'host': 'www.bing.com', 'port': 80, 'path': '/', 'callback': callback_fun1},
        ]
    
        obj = AsyncRequest()
        for item in url_list:
            obj.add_request(item)
    
        obj.run()
    

      

  • 相关阅读:
    Struts2SpringHibernate整合示例,一个HelloWorld版的在线书店(项目源码+详尽注释+单元测试)
    Java实现蓝桥杯勇者斗恶龙
    Java实现 LeetCode 226 翻转二叉树
    Java实现 LeetCode 226 翻转二叉树
    Java实现 LeetCode 226 翻转二叉树
    Java实现 LeetCode 225 用队列实现栈
    Java实现 LeetCode 225 用队列实现栈
    Java实现 LeetCode 225 用队列实现栈
    Java实现 LeetCode 224 基本计算器
    Java实现 LeetCode 224 基本计算器
  • 原文地址:https://www.cnblogs.com/mitsui/p/7459430.html
Copyright © 2011-2022 走看看