zoukankan      html  css  js  c++  java
  • 20.multi_case07

    # coding:utf-8
    
    import re
    import ssl
    import csv
    import json
    import time
    import random
    import asyncio
    import aiohttp
    import requests
    from lxml import etree
    from asyncio.queues import Queue
    from aiosocksy import Socks5Auth
    from aiosocksy.connector import ProxyConnector, ProxyClientRequest
    
    
    class Common():
        task_queue = Queue()
        result_queue = Queue()
        market_cap_all = 0
        currency_rate = 0
    
    
    # 线上内网
    socks5_address_prod = [
        'socks5://10.1.100.253:1235',
        'socks5://10.1.100.51:1235',
        'socks5://10.1.100.70:1235',
        'socks5://10.1.100.205:1235',
        'socks5://10.1.100.73:1235'
    ]
    
    # 办公网
    socks5_address_dev = [
        'socks5://18.208.81.123:1235',
        'socks5://34.197.217.25:1235',
        'socks5://52.20.255.43:1235',
        'socks5://34.237.163.87:1235',
        'socks5://18.208.81.123:1235',
        'socks5://52.0.114.155:1235'
    ]
    
    DEPLOY_MODE = "dev"
    
    
    async def session_get(session, url, socks):
        auth = Socks5Auth(login='...', password='...')
        headers = {'User-Agent': 'Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)'}
        timeout = aiohttp.ClientTimeout(total=20)
        response = await session.get(
            url,
            proxy=socks,
            proxy_auth=auth,
            timeout=timeout,
            headers=headers,
            ssl=ssl.SSLContext()
        )
        return await response.text(), response.status
    
    
    async def download(url):
        connector = ProxyConnector()
        if DEPLOY_MODE == "dev":
            socks = None
        elif DEPLOY_MODE == "Prod":
            socks = random.choice(socks5_address_prod)
        async with aiohttp.ClientSession(
                connector=connector,
                request_class=ProxyClientRequest
        ) as session:
            ret, status = await session_get(session, url, socks)
            if 'window.location.href' in ret and len(ret) < 1000:
                url = ret.split("window.location.href='")[1].split("'")[0]
                ret, status = await session_get(session, url, socks)
            return ret, status
    
    
    async def parse_html(cid, url, response):
        coin_info = {}
        coin_value = {}
    
        coin_info['url'] = url
        coin_info['cid'] = cid
        coin_info['time'] = int(time.time())
    
        tree = etree.HTML(response)
    
        try:
            price_usd = tree.xpath(
                '//div[@class="priceInfo"]/div[@class="sub"]/span[1]/text()'
            )[0].strip().replace('$', '')
            if '?' not in price_usd:
                coin_value['price'] = float(price_usd)
        except BaseException:
            pass
    
        try:
            updown = tree.xpath(
                '//div[@class="priceInfo"]/div[@class="sub smallfont"]/span[1]/text()'
            )[0].strip().replace('%', '')
            coin_value['updown'] = float(updown)
        except BaseException:
            pass
    
        try:
            volume_24h_rmb = tree.xpath(
                '//div[@class="info"]/div[@class="charCell"][2]/div[2]/span/text()'
            )[0].strip().replace('¥', '').replace(',', '')
            coin_value['volume_24h'] = int(
                float(volume_24h_rmb) / Common.currency_rate)
        except BaseException:
            pass
    
        try:
            circulating_supply = tree.xpath(
                '//div[@class="info"]//div[@class="charCell"][1]/div[@class="val"]/text()'
            )[0].strip().replace(',', '')
            if '?' not in circulating_supply:
                circulating_supply = re.match(
                    r'^(d+)(w+)$', circulating_supply).group(1)
                coin_value['circulating_supply'] = int(circulating_supply)
        except BaseException:
            pass
    
        try:
            if coin_value['price'] and coin_value['circulating_supply']:
                market_cap = coin_value['price'] * coin_value['circulating_supply']
                coin_value['market_cap'] = market_cap
        except BaseException:
            pass
    
        try:
            if coin_value['market_cap']:
                global_share = coin_value['market_cap'] / Common.market_cap_all
                if global_share < 0.001:
                    coin_value['global_share'] = '<0.1%'
                else:
                    coin_value['global_share'] = str(
                        (global_share * 100).__round__(2)) + '%'
        except BaseException:
            pass
    
        try:
            circulation_rate = tree.xpath(
                '//div[@class="info"]//div[@class="charbox"][1]/div[@class="val"]/text()'
            )[0].strip()
            if '?' not in circulation_rate:
                coin_value['circulation_rate'] = circulation_rate
        except BaseException:
            pass
    
        try:
            turnover_rate = tree.xpath(
                '//div[@class="info"]//div[@class="charbox"][1]/div[@class="val"]/text()'
            )[1].strip()
            if '?' not in turnover_rate:
                coin_value['turnover_rate'] = turnover_rate
        except BaseException:
            pass
    
        try:
            issue_time = tree.xpath(
                '//div[@class="infoList"]/div[1]/div[1]/span[2]/text()'
            )[0].strip()
            if issue_time != '-':
                coin_value['issue_time'] = issue_time
        except BaseException:
            pass
    
        try:
            exchange_num = tree.xpath(
                '//div[@class="infoList"]/div[3]/div[1]/span[2]/text()'
            )[0].strip().replace('家', '')
            coin_value['exchange_num'] = int(exchange_num)
        except BaseException:
            pass
    
        try:
            total_circulation = tree.xpath(
                '//div[@class="infoList"]/div[2]/div[2]/span[2]/text()'
            )[0].strip().replace(',', '')
            coin_value['total_circulation'] = int(total_circulation)
        except BaseException:
            pass
    
        coin_info['value'] = coin_value
        return coin_info
    
    
    async def down_and_parse_task(queue):
        while True:
            try:
                cid, url = queue.get_nowait()[:2]
            except BaseException:
                return
            for retry_cnt in range(3):
                try:
                    html, status = await download(url)
                    if status != 200:
                        html, status = await download(url)
                    if '访问控制拒绝了你的请求' in html:
                        html, status = await download(url)
                    html_parse_result = await parse_html(cid, url, html)
                    print(html_parse_result)
                    await Common.result_queue.put(html_parse_result)
                    break
                except BaseException:
                    await asyncio.sleep(0.2)
                    continue
    
    
    async def push(data):
        url = 'http://127.0.0.1:8000/aaa'
        error = None
        for retry_cnt in range(3):
            try:
                async with aiohttp.ClientSession() as session:
                    async with session.post(
                            url,
                            data=json.dumps(data)
                    ) as response:
                        pass
                    response.raise_for_status()
            except Exception as e:
                await asyncio.sleep(0.2)
                print(e)
    
    
    async def speed_monitor():
        while Common.task_queue.qsize() != 0:
            old_queue_len = Common.task_queue.qsize()
            await asyncio.sleep(5)
            new_queue_count = Common.task_queue.qsize()
            print('=================')
            print('speed = ', (old_queue_len - new_queue_count) / 5)
    
    
    async def monitor_finish():
        while len(asyncio.Task.all_tasks()) > 3:
            await asyncio.sleep(1)
        await asyncio.sleep(5)
        raise SystemExit()
    
    
    async def push_results():
        temp_q = []
        while True:
            try:
                await asyncio.sleep(3)
                for _ in range(Common.result_queue.qsize()):
                    temp_q.append(await Common.result_queue.get())
                if len(temp_q) > 0:
                    await push(temp_q)
                    temp_q.clear()
            except BaseException:
                import traceback
                print(traceback.format_exc())
    
    
    async def get_marketcap():
        url = 'https://dncapi.feixiaohao.com/api/home/global?webp=0'
        response = requests.get(url)
        response_json = json.loads(response.text)
        marketcap = response_json['data']['marketcapvol']
        Common.market_cap_all = int(marketcap)
    
    
    async def get_currency_rate():
        url_rate = 'https://dncapi.feixiaohao.com/api/coin/web-rate/'
        response = requests.get(url_rate)
        currency_rate = json.loads(response.text)[11]['cny']
        Common.currency_rate = currency_rate
    
    # 300秒抓取时间上限
    async def time_limit():
        await asyncio.sleep(280)
        raise SystemExit()
    
    
    async def main():
        # loop = asyncio.get_event_loop()
        csv_reader = csv.reader(
            open(
                'feixiaohao_mapping_data.csv',
                encoding='utf-8'))
        for row in csv_reader:
            try:
                if row[1].startswith('https'):
                    await Common.task_queue.put(row)
            except BaseException:
                pass
        print(Common.task_queue)
    
        await get_marketcap()
        print('总市值', Common.market_cap_all)
    
        await get_currency_rate()
        print('汇率', Common.currency_rate)
    
        for _ in range(10):
            loop.create_task(down_and_parse_task(Common.task_queue))
            loop.create_task(monitor_finish())
            loop.create_task(speed_monitor())
            loop.create_task(push_results())
            loop.create_task(time_limit())
    
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        loop.create_task(main())
        loop.run_forever()
    
  • 相关阅读:
    SPSS时间序列:频谱分析
    PureBasic—数控编辑框与调节块和进度条
    DELPHI2007 安装ACTIVEX插件的方法
    C++ builder的文件操作
    C++动态数组
    excel快速复制大量公式的方法
    Delphi XE5 如何与其他版本共存
    PureBasic 集成Form设计器的使用
    VS C++ 从一个窗口创建另一个窗口
    ENVI 5.0 Beta 体验——影像数据的显示
  • 原文地址:https://www.cnblogs.com/hankleo/p/10820766.html
Copyright © 2011-2022 走看看