zoukankan      html  css  js  c++  java
  • python语言磁力搜索引擎源码公开,基于DHT协议

           之前我在写百度网盘爬虫百度图片爬虫的时候答应网友说,抽时间要把ok搜搜的的源码公开,如今是时候兑现诺言了,下面就是爬虫的所有代码,完全,彻底的公开,你会不会写程序都可以使用,不过请先装个linux系统,具备公网条件,然后运行: 

    python startCrawler.py

    有必要提醒你,数据库字段代码中都有,请你自己建张表格,这个太简单了,就不多说了。同时我也提供一下下载地址,源码都在:下载地址1 下载地址2

    #!/usr/bin/env python
    # encoding: utf-8
    """
    author:haoning
    create time:2015.8.1
    """
    import hashlib
    import os
    import time
    import datetime
    import traceback
    import sys
    import random
    import json
    import socket
    import threading
    from hashlib import sha1 #进行hash加密
    from random import randint
    from struct import unpack
    from socket import inet_ntoa
    from threading import Timer, Thread
    from time import sleep
    from collections import deque
    from Queue import Queue
     
    import MySQLdb as mdb  #数据库连接器
     
    import metautils
    import downloadTorrent
    from bencode import bencode, bdecode
    import pygeoip
     
    DB_HOST = '127.0.0.1'
    DB_USER = 'root'
    DB_PASS = 'root'
     
    BOOTSTRAP_NODES = (
        ("67.215.246.10", 6881),
        ("82.221.103.244", 6881),
        ("23.21.224.150", 6881)
    )
    RATE = 1 #调控速率
    TID_LENGTH = 2
    RE_JOIN_DHT_INTERVAL = 3
    TOKEN_LENGTH = 2
    INFO_HASH_LEN = 500000 #50w数据很小,限制内存不至于消耗太大
    CACHE_LEN = 100 #更新数据库缓存
    WAIT_DOWNLOAD = 80
     
     
    geoip = pygeoip.GeoIP('GeoIP.dat')
     
    def is_ip_allowed(ip):
        country = geoip.country_code_by_addr(ip)
        if country in ('CN','TW','JP','HK', 'KR'):
            return True
        return False
     
    def entropy(length):
        return "".join(chr(randint(0, 255)) for _ in xrange(length))
     
    def random_id():
        h = sha1()
        h.update(entropy(20))
        return h.digest()
     
     
    def decode_nodes(nodes):
        n = []
        length = len(nodes)
        if (length % 26) != 0:
            return n
     
        for i in range(0, length, 26):
            nid = nodes[i:i+20]
            ip = inet_ntoa(nodes[i+20:i+24])
            port = unpack("!H", nodes[i+24:i+26])[0]
            n.append((nid, ip, port))
     
        return n
     
     
    def timer(t, f):
        Timer(t, f).start()
     
     
    def get_neighbor(target, nid, end=10):
        return target[:end]+nid[end:]
     
     
    class KNode(object):
     
        def __init__(self, nid, ip, port):
            self.nid = nid
            self.ip = ip
            self.port = port
     
     
    class DHTClient(Thread):
     
        def __init__(self, max_node_qsize):
            Thread.__init__(self)
            self.setDaemon(True)
            self.max_node_qsize = max_node_qsize
            self.nid = random_id()
            self.nodes = deque(maxlen=max_node_qsize)
     
        def send_krpc(self, msg, address):
            try:
                self.ufd.sendto(bencode(msg), address)
            except Exception:
                pass
     
        def send_find_node(self, address, nid=None):
            nid = get_neighbor(nid, self.nid) if nid else self.nid
            tid = entropy(TID_LENGTH)
            msg = {
                "t": tid,
                "y": "q",
                "q": "find_node",
                "a": {
                    "id": nid,
                    "target": random_id()
                }
            }
            self.send_krpc(msg, address)
     
        def join_DHT(self):
            for address in BOOTSTRAP_NODES:
                self.send_find_node(address)
     
        def re_join_DHT(self):
            if len(self.nodes) == 0:
                self.join_DHT()
            timer(RE_JOIN_DHT_INTERVAL, self.re_join_DHT)
     
        def auto_send_find_node(self):
            wait = 1.0 / self.max_node_qsize
            while True:
                try:
                    node = self.nodes.popleft()
                    self.send_find_node((node.ip, node.port), node.nid)
                except IndexError:
                    pass
                try:
                    sleep(wait)
                except KeyboardInterrupt:
                    os._exit(0)
     
        def process_find_node_response(self, msg, address):
            nodes = decode_nodes(msg["r"]["nodes"])
            for node in nodes:
                (nid, ip, port) = node
                if len(nid) != 20: continue
                if ip == self.bind_ip: continue
                n = KNode(nid, ip, port)
                self.nodes.append(n)
     
     
    class DHTServer(DHTClient): #获得info_hash
     
        def __init__(self, master, bind_ip, bind_port, max_node_qsize):
            DHTClient.__init__(self, max_node_qsize)
     
            self.master = master
            self.bind_ip = bind_ip
            self.bind_port = bind_port
            self.speed=0
     
            self.process_request_actions = {
                "get_peers": self.on_get_peers_request,
                "announce_peer": self.on_announce_peer_request,
            }
     
            self.ufd = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
            self.ufd.bind((self.bind_ip, self.bind_port))
     
            timer(RE_JOIN_DHT_INTERVAL, self.re_join_DHT)
     
     
        def run(self):
            self.re_join_DHT()
            while True:
                try:
                    (data, address) = self.ufd.recvfrom(65536)
                    msg = bdecode(data)
                    self.on_message(msg, address)
                except Exception:
                    pass
     
        def on_message(self, msg, address):
            global RATE #设为全局量
            try:
                if msg["y"] == "r":
                    if msg["r"].has_key("nodes"):
                        self.process_find_node_response(msg, address) #发现节点
                elif msg["y"] == "q":
                    try:
                        self.speed+=1
                        if self.speed % 10000 ==0:
                            RATE=random.randint(1,3)
                            if RATE==2:
                                RATE=1
                            if RATE==3:
                                RATE=10
                            if self.speed>100000:
                                self.speed=0
                        if self.speed % RATE==0: #数据过多,占用cpu太多,划分限速,1,1,10
                            self.process_request_actions[msg["q"]](msg, address) #处理其他节点的请求,这个过程获取info_hash
                        #self.process_request_actions[msg["q"]](msg, address) #处理其他节点的请求,这个过程获取info_hash
                    except KeyError:
                        self.play_dead(msg, address)
            except KeyError:
                pass
     
        def on_get_peers_request(self, msg, address):
            try:
                infohash = msg["a"]["info_hash"]
                tid = msg["t"]
                nid = msg["a"]["id"]
                token = infohash[:TOKEN_LENGTH]
                msg = {
                    "t": tid,
                    "y": "r",
                    "r": {
                        "id": get_neighbor(infohash, self.nid),
                        "nodes": "",
                        "token": token
                    }
                }
                self.master.log(infohash, address)
                self.send_krpc(msg, address)
            except KeyError:
                pass
     
        def on_announce_peer_request(self, msg, address):
            try:
                infohash = msg["a"]["info_hash"]
                token = msg["a"]["token"]
                nid = msg["a"]["id"]
                tid = msg["t"]
     
                if infohash[:TOKEN_LENGTH] == token:
                    if msg["a"].has_key("implied_port ") and msg["a"]["implied_port "] != 0:
                        port = address[1]
                    else:
                        port = msg["a"]["port"]
                    self.master.log_announce(infohash, (address[0], port))
            except Exception:
                print 'error'
                pass
            finally:
                self.ok(msg, address)
     
        def play_dead(self, msg, address):
            try:
                tid = msg["t"]
                msg = {
                    "t": tid,
                    "y": "e",
                    "e": [202, "Server Error"]
                }
                self.send_krpc(msg, address)
            except KeyError:
                pass
     
        def ok(self, msg, address):
            try:
                tid = msg["t"]
                nid = msg["a"]["id"]
                msg = {
                    "t": tid,
                    "y": "r",
                    "r": {
                        "id": get_neighbor(nid, self.nid)
                    }
                }
                self.send_krpc(msg, address)
            except KeyError:
                pass
     
     
    class Master(Thread): #解析info_hash
     
        def __init__(self):
            Thread.__init__(self)
            self.setDaemon(True)
            self.queue = Queue()
            self.cache = Queue()
            self.count=0
            self.mutex = threading.RLock() #可重入锁,使单线程可以再次获得已经获得的?
            self.waitDownload = Queue()
            self.metadata_queue = Queue()
            self.dbconn = mdb.connect(DB_HOST, DB_USER, DB_PASS, 'oksousou', charset='utf8')
            self.dbconn.autocommit(False)
            self.dbcurr = self.dbconn.cursor()
            self.dbcurr.execute('SET NAMES utf8')
            self.visited = set()
                     
        def lock(self): #加锁
            self.mutex.acquire()
     
        def unlock(self): #解锁
            self.mutex.release()
             
        def work(self,item):
     
            print "start thread",item
            while True:
                self.prepare_download_metadata()
                self.lock()
                self.download_metadata()
                self.unlock()
     
                self.lock()
                self.got_torrent()
                self.unlock()
                         
        def start_work(self,max):
         
            for item in xrange(max):
                t = threading.Thread(target=self.work, args=(item,))
                t.setDaemon(True)
                t.start()
             
        #入队的种子效率更高
        def log_announce(self, binhash, address=None):
            if self.queue.qsize() < INFO_HASH_LEN : #大于INFO_HASH_LEN就不要入队,否则后面来不及处理
                if is_ip_allowed(address[0]):
                    self.queue.put([address, binhash]) #获得info_hash
             
        def log(self, infohash, address=None):
            if self.queue.qsize() < INFO_HASH_LEN: #大于INFO_HASH_LEN/2就不要入队,否则后面来不及处理
                if is_ip_allowed(address[0]):
                    self.queue.put([address, infohash])
         
        def prepare_download_metadata(self):
             
            if self.queue.qsize() == 0:
                sleep(2)
            #从queue中获得info_hash用来下载
            address, binhash= self.queue.get() 
            if binhash in self.visited:
                return
            if len(self.visited) > 100000: #大于100000重置队列,认为已经访问过了
                self.visited = set()
            self.visited.add(binhash)
            #跟新已经访问过的info_hash
            info_hash = binhash.encode('hex')
            utcnow = datetime.datetime.utcnow()
             
            self.cache.put((address,binhash,utcnow)) #装入缓存队列
         
        def download_metadata(self):
         
            if self.cache.qsize() > CACHE_LEN/2: #出队更新下载
                while self.cache.qsize() > 0: #排空队列
                    address,binhash,utcnow = self.cache.get()
                    info_hash = binhash.encode('hex')
                    self.dbcurr.execute('SELECT id FROM search_hash WHERE info_hash=%s', (info_hash,))
                    y = self.dbcurr.fetchone()
                    if y:
                    # 更新最近发现时间,请求数
                        self.dbcurr.execute('UPDATE search_hash SET last_seen=%s, requests=requests+1 WHERE info_hash=%s', (utcnow, info_hash))
                    else: 
                        self.waitDownload.put((address, binhash))
                self.dbconn.commit()
                if self.waitDownload.qsize() > WAIT_DOWNLOAD:
                    while self.waitDownload.qsize() > 0:
                        address,binhash = self.waitDownload.get()
                        t = threading.Thread(target=downloadTorrent.download_metadata, args=(address, binhash, self.metadata_queue))
                        t.setDaemon(True)
                        t.start()
     
        def decode(self, s):
            if type(s) is list:
                s = ';'.join(s)
            u = s
            for x in (self.encoding, 'utf8', 'gbk', 'big5'):
                try:
                    u = s.decode(x)
                    return u
                except:
                    pass
            return s.decode(self.encoding, 'ignore')
     
        def decode_utf8(self, d, i):
            if i+'.utf-8' in d:
                return d[i+'.utf-8'].decode('utf8')
            return self.decode(d[i])
         
        def parse_metadata(self, data): #解析种子
            info = {}
            self.encoding = 'utf8'
            try:
                torrent = bdecode(data) #编码后解析
                if not torrent.get('name'):
                    return None
            except:
                return None
            detail = torrent
            info['name'] = self.decode_utf8(detail, 'name')
            if 'files' in detail:
                info['files'] = []
                for x in detail['files']:
                    if 'path.utf-8' in x:
                        v = {'path': self.decode('/'.join(x['path.utf-8'])), 'length': x['length']}
                    else:
                        v = {'path': self.decode('/'.join(x['path'])), 'length': x['length']}
                    if 'filehash' in x:
                        v['filehash'] = x['filehash'].encode('hex')
                    info['files'].append(v)
                info['length'] = sum([x['length'] for x in info['files']])
            else:
                info['length'] = detail['length']
            info['data_hash'] = hashlib.md5(detail['pieces']).hexdigest()
            return info
     
        def got_torrent(self):
            if self.metadata_queue.qsize() == 0:
                return
            binhash, address, data,start_time = self.metadata_queue.get()
            if not data:
                return
            try:
                info = self.parse_metadata(data)
                if not info:
                    return
            except:
                traceback.print_exc()
                return
     
            temp = time.time()
            x = time.localtime(float(temp))
            utcnow = time.strftime("%Y-%m-%d %H:%M:%S",x) # get time now
             
            info_hash = binhash.encode('hex') #磁力
            info['info_hash'] = info_hash
            # need to build tags
            info['tagged'] = False
            info['classified'] = False
            info['requests'] = 1
            info['last_seen'] = utcnow
            info['create_time'] = utcnow
            info['source_ip'] = address[0]
             
            if info.get('files'):
                files = [z for z in info['files'] if not z['path'].startswith('_')]
                if not files:
                    files = info['files']
            else:
                files = [{'path': info['name'], 'length': info['length']}]
            files.sort(key=lambda z:z['length'], reverse=True)
            bigfname = files[0]['path']
            info['extension'] = metautils.get_extension(bigfname).lower()
            info['category'] = metautils.get_category(info['extension'])
     
            try:
                try:
                    print '
    ', 'Saved', info['info_hash'], info['name'], (time.time()-start_time), 's', address[0]
                except:
                    print '
    ', 'Saved', info['info_hash']
                ret = self.dbcurr.execute('INSERT INTO search_hash(info_hash,category,data_hash,name,extension,classified,source_ip,tagged,' + 
                    'length,create_time,last_seen,requests) VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)',
                    (info['info_hash'], info['category'], info['data_hash'], info['name'], info['extension'], info['classified'],
                    info['source_ip'], info['tagged'], info['length'], info['create_time'], info['last_seen'], info['requests']))
                if self.count %50 ==0:
                    self.dbconn.commit()
                    if self.count>100000:
                        self.count=0
            except:
                print self.name, 'save error', self.name, info
                traceback.print_exc()
                return
     
    if __name__ == "__main__":
         
        #启动客户端
        master = Master()
        master.start_work(150)
         
        #启动服务器
        dht = DHTServer(master, "0.0.0.0", 6881, max_node_qsize=200)
        dht.start()
        dht.auto_send_find_node()

    注意,上面的代码有一段代码需要下载种子,所以下面的这段代码十分重要:

    #!/usr/bin/env python
    # encoding: utf-8
    """
    author:haoning
    create time:2015.8.1
    """
    from hashlib import sha1
    import math
    from socket import inet_ntoa
    import socket
    from struct import pack, unpack
    from threading import Timer, Thread
    from time import sleep, time
     
    from bencode import bencode, bdecode 
    from startCrawler import entropy
     
     
    BT_PROTOCOL = "BitTorrent protocol"
    BT_MSG_ID = 20
    EXT_HANDSHAKE_ID = 0
     
    def random_id():
        hash = sha1()
        hash.update(entropy(20))
        return hash.digest()
     
    def send_packet(the_socket, msg):
        the_socket.send(msg)
     
    def send_message(the_socket, msg):
        msg_len = pack(">I", len(msg))
        send_packet(the_socket, msg_len + msg)
     
    def send_handshake(the_socket, infohash):
        bt_header = chr(len(BT_PROTOCOL)) + BT_PROTOCOL
        ext_bytes = "x00x00x00x00x00x10x00x00"
        peer_id = random_id()
        packet = bt_header + ext_bytes + infohash + peer_id
     
        send_packet(the_socket, packet)
     
    def check_handshake(packet, self_infohash):
        try:
            bt_header_len, packet = ord(packet[:1]), packet[1:]
            if bt_header_len != len(BT_PROTOCOL):
                return False
        except TypeError:
            return False
     
        bt_header, packet = packet[:bt_header_len], packet[bt_header_len:]
        if bt_header != BT_PROTOCOL:
            return False
     
        packet = packet[8:]
        infohash = packet[:20]
        if infohash != self_infohash:
            return False
     
        return True
     
    def send_ext_handshake(the_socket):
        msg = chr(BT_MSG_ID) + chr(EXT_HANDSHAKE_ID) + bencode({"m":{"ut_metadata": 1}})
        send_message(the_socket, msg)
     
    def request_metadata(the_socket, ut_metadata, piece):
        """bep_0009"""
        msg = chr(BT_MSG_ID) + chr(ut_metadata) + bencode({"msg_type": 0, "piece": piece})
        send_message(the_socket, msg)
     
    def get_ut_metadata(data):
        ut_metadata = "_metadata"
        index = data.index(ut_metadata)+len(ut_metadata) + 1
        return int(data[index])
     
    def get_metadata_size(data):
        metadata_size = "metadata_size"
        start = data.index(metadata_size) + len(metadata_size) + 1
        data = data[start:]
        return int(data[:data.index("e")])
     
    def recvall(the_socket, timeout=5):
        the_socket.setblocking(0)
        total_data = []
        data = ""
        begin = time()
     
        while True:
            sleep(0.05)
            if total_data and time()-begin > timeout:
                break
            elif time()-begin > timeout*2:
                break
            try:
                data = the_socket.recv(1024)
                if data:
                    total_data.append(data)
                    begin = time()
            except Exception:
                pass
        return "".join(total_data)
     
    def download_metadata(address, infohash, metadata_queue, timeout=5):
        metadata = None
        start_time = time()
        the_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        try: 
            the_socket.settimeout(timeout)
            the_socket.connect(address)
     
            # handshake
            send_handshake(the_socket, infohash)
            packet = the_socket.recv(4096)
     
            # handshake error
            if not check_handshake(packet, infohash):
                return
     
            # ext handshake
            send_ext_handshake(the_socket)
            packet = the_socket.recv(4096)
     
            # get ut_metadata and metadata_size
            ut_metadata, metadata_size = get_ut_metadata(packet), get_metadata_size(packet)
     
            # request each piece of metadata
            metadata = []
            for piece in range(int(math.ceil(metadata_size/(16.0*1024)))): #piece是个控制块,根据控制块下载数据
                request_metadata(the_socket, ut_metadata, piece)
                packet = recvall(the_socket, timeout) #the_socket.recv(1024*17)
                metadata.append(packet[packet.index("ee")+2:])        
            metadata = "".join(metadata)
     
        except socket.timeout:
            pass
        except Exception, e:
            pass
        finally:
            #print "metadata= %s" %(metadata)
            the_socket.close() #确保没回都关闭socket
            if metadata != None: #只让不空的种子入?            
                metadata_queue.put((infohash, address, metadata,start_time))

    其实下载种子还有一种方式就是借助libtorrent,但这个太耗费cpu了,所以我一般不用他,如下:

    #coding: utf8
    import threading
    import traceback
    import random
    import time
    import os
    import socket
     
    import libtorrent as lt
     
    threading.stack_size(200*1024)
    socket.setdefaulttimeout(30)
     
    def fetch_torrent(session, ih, timeout):
        name = ih.upper()
        url = 'magnet:?xt=urn:btih:%s' % (name,)
        data = ''
        params = {
            'save_path': '/tmp/downloads/',
            'storage_mode': lt.storage_mode_t(2),
            'paused': False,
            'auto_managed': False,
            'duplicate_is_error': True}
        try:
            handle = lt.add_magnet_uri(session, url, params)
        except:
            return None
        status = session.status()
        handle.set_sequential_download(1)
        meta = None
        down_time = time.time()
        down_path = None
        for i in xrange(0, timeout):
            if handle.has_metadata():
                info = handle.get_torrent_info()
                down_path = '/tmp/downloads/%s' % info.name()
                #print 'status', 'p', status.num_peers, 'g', status.dht_global_nodes, 'ts', status.dht_torrents, 'u', status.total_upload, 'd', status.total_download
                meta = info.metadata()
                break
            time.sleep(1)
        if down_path and os.path.exists(down_path):
            os.system('rm -rf "%s"' % down_path)
        session.remove_torrent(handle)
        return meta
     
     
    def download_metadata(address, binhash, metadata_queue, timeout=20):
        metadata = None
        start_time = time.time()
        try:
            session = lt.session()
            r = random.randrange(10000, 50000)
            session.listen_on(r, r+10)
            session.add_dht_router('router.bittorrent.com',6881)
            session.add_dht_router('router.utorrent.com',6881)
            session.add_dht_router('dht.transmission.com',6881)
            session.add_dht_router('127.0.0.1',6881)
            session.start_dht()
            metadata = fetch_torrent(session, binhash.encode('hex'), timeout)
            session = None
        except:
            traceback.print_exc()
        finally:
            metadata_queue.put((binhash, address, metadata,start_time))

    这个爬虫还是耗费了本人和其他网上高手的很多时间的,请看到这篇博客的朋友保持钻研精神,开源精神,多多交流,秉承分享。本人建立个qq群作为去转盘网的官方群,人数现在也不多,如果有兴趣的话来逛逛吧,多个粉丝去转盘多一份热闹,qq群号:512245829

  • 相关阅读:
    Java基础面试题总结-编程题总结
    Linux下mysql的安装与卸载并且连接navicat详解(亲测可用)
    linux 下安装redis
    linux 下安装tomcat
    Linux系统下安装jdk及环境配置(两种方法)
    Servlet概述
    多线程之volatile关键字
    多线程之ThreadLocal
    多线程之synchronized实现原理
    线程池2
  • 原文地址:https://www.cnblogs.com/huangxie/p/5550680.html
Copyright © 2011-2022 走看看