zoukankan      html  css  js  c++  java
  • python--分布式爬虫

    //server
    import socket, select, re, queue, redis
    from multiprocessing import Pool, cpu_count
    from pymongo import MongoClient
    
    host = '192.168.1.107'
    ConnectionList = []
    Recv_buffer = 4096000
    Client_Status = {}
    Client_Num = {}
    redis1 = redis.Redis(host='localhost', port=6379, db=0)
    Num = 0
    
    
    class Distributed_Web_Crawler:
        def __init__(self, port):
            self.url_num = 1
            self.queue = queue.Queue()
            self.db = MongoClient().CrawSpider.content
            self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.server_socket.bind((host, port))
            self.server_socket.listen(10)
            self.pool = Pool(cpu_count() - 1)
            ConnectionList.append(self.server_socket)
            print("服务器运行在端口:" + str(port))
            address = 'https://movie.douban.com/'
            self.queue.put(address)
            redis1.set(address, 0)
            self.main()
    
        def main(self):
            global Num
            while 1:
                if not self.queue.empty() and ConnectionList.__len__() > 1 is not None:
                    self.pool.apply_async(self.task_manage())
                read_sockets, write_sockets, error_sockets = select.select(ConnectionList, [], [])
                for sock in read_sockets:
                    if sock == self.server_socket:
                        conn, addr = self.server_socket.accept()
                        ConnectionList.append(conn)
                        core_num = conn.recv(Recv_buffer).decode('utf8')
                        Client_Status[conn] = core_num
                        Client_Num[conn] = Client_Num.__len__() + 1
                        print('客户端 ' + addr[0] + ':' + str(addr[1]) + '已连接,核心数: ' + core_num + '
    编号为' + str(Client_Num[
                            conn]))
                    else:
                        data = sock.recv(Recv_buffer)
                        if data:
                            Contents = data.decode('utf8').split('Page_ContentPPPPPP///////')
                            # print('收到'+str(Client_Num[sock])+'号机发来数据,正在处理')
                            Client_Status[sock] = int(Client_Status[sock]) + len(Contents)
                            print('编号'+str(Client_Num[sock])+'可用核心'+str(Client_Status[sock]))
                            for content in Contents:
                                if content:
                                    self.pool.apply_async(self.web_page_resolution(content))
                        else:
                            print('客户端 ' + addr[0] + ':' + str(addr[1]) + '断开连接')
                            sock.close()
                            Client_Status.pop(sock)
                            Client_Num.pop(sock)
                            ConnectionList.remove(sock)
    
        def web_page_resolution(self, content):
            db = MongoClient().Web.data
            db.insert({'page_content': content})
            pattern = re.compile('https://movie.douban.com/(.*?)"')
            urls = re.findall(string=content, pattern=pattern)
            for url in urls:
                url = 'https://movie.douban.com/' + url
                if redis1.get(url) is None:
                    redis1.set(url, self.url_num)
                    self.queue.put(url)
                    self.url_num += 1
    
        def task_manage(self):
            urls = ''
            for socket in ConnectionList:
                if socket != self.server_socket:
                    while not self.queue.empty() and int(Client_Status[socket]) != 0:
                        urls = urls + self.queue.get() + ' '
                        Client_Status[socket] = int(Client_Status[socket]) - 1
                    # print('向' + str(Client_Num[socket]) + '号终端分配任务')
                    socket.send(urls.encode('utf8'))
    
    
    if __name__ == "__main__":
        port = 8888
        Distributed_Web_Crawler(port, )
    //Client
    import socket, sys, select
    from multiprocessing import cpu_count
    from requests import get
    from multiprocessing import Pool
    
    p = Pool(cpu_count() - 1)
    host = '192.168.0.103'
    Page_contents = []
    
    
    def crawler_page(url):
        print("正在爬取网页" + url)
        content = get(url).content.decode('utf8') + 'Page_ContentPPPPPP///////'
        print(url + "爬取完成,正在向服务器发送数据")
        s.send(content.encode('utf8'))
    
    
    def listing():
        while 1:
            rlist = [sys.stdin, s]
            read_list, write_list, error_list = select.select(rlist, [], [])
            for sock in read_list:
                if sock == s:
                    data = sock.recv(4096).decode('utf8')
                    if data != 'quit' and data:
                        urls = data.split()
                        if len(urls) == 1:
                            p.apply_async(crawler_page(urls[0]))
                        else:
                            for url in urls:
                                p.apply_async(crawler_page(url))
                                urls.remove(url)
                    elif data == 'quit':
                        print('接收到服务器关闭指令,客户端正在退出')
                        sys.exit()
                    else:
                        print('服务器连接失败,正在退出')
                        sys.exit()
    
    
    if __name__ == "__main__":
        port = 8888
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.settimeout(3)
        try:
            s.connect(('192.168.1.107', port))
        except:
            print("无法连接至服务器,请检查地址后重试")
            sys.exit()
        print("已连接至服务器,开始发送机器信息
    核心数:" + str(cpu_count()))
        s.send(str(cpu_count()).encode('utf8'))
        listing()
  • 相关阅读:
    PHP基本的语法以及和Java的差别
    Linux 性能測试工具
    【Oracle 集群】Linux下Oracle RAC集群搭建之Oracle DataBase安装(八)
    【Oracle 集群】Oracle 11G RAC教程之集群安装(七)
    【Oracle 集群】11G RAC 知识图文详细教程之RAC在LINUX上使用NFS安装前准备(六)
    【Oracle 集群】ORACLE DATABASE 11G RAC 知识图文详细教程之RAC 特殊问题和实战经验(五)
    【Oracle 集群】ORACLE DATABASE 11G RAC 知识图文详细教程之缓存融合技术和主要后台进程(四)
    【Oracle 集群】ORACLE DATABASE 11G RAC 知识图文详细教程之RAC 工作原理和相关组件(三)
    Oracle 集群】ORACLE DATABASE 11G RAC 知识图文详细教程之ORACLE集群概念和原理(二)
    【Oracle 集群】ORACLE DATABASE 11G RAC 知识图文详细教程之集群概念介绍(一)
  • 原文地址:https://www.cnblogs.com/INnoVationv2/p/6072211.html
Copyright © 2011-2022 走看看