zoukankan      html  css  js  c++  java
  • 20.multi_case07

    # coding:utf-8
    
    import re
    import ssl
    import csv
    import json
    import time
    import random
    import asyncio
    import aiohttp
    import requests
    from lxml import etree
    from asyncio.queues import Queue
    from aiosocksy import Socks5Auth
    from aiosocksy.connector import ProxyConnector, ProxyClientRequest
    
    
    class Common():
        task_queue = Queue()
        result_queue = Queue()
        market_cap_all = 0
        currency_rate = 0
    
    
    # 线上内网
    socks5_address_prod = [
        'socks5://10.1.100.253:1235',
        'socks5://10.1.100.51:1235',
        'socks5://10.1.100.70:1235',
        'socks5://10.1.100.205:1235',
        'socks5://10.1.100.73:1235'
    ]
    
    # 办公网
    socks5_address_dev = [
        'socks5://18.208.81.123:1235',
        'socks5://34.197.217.25:1235',
        'socks5://52.20.255.43:1235',
        'socks5://34.237.163.87:1235',
        'socks5://18.208.81.123:1235',
        'socks5://52.0.114.155:1235'
    ]
    
    DEPLOY_MODE = "dev"
    
    
    async def session_get(session, url, socks):
        auth = Socks5Auth(login='...', password='...')
        headers = {'User-Agent': 'Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)'}
        timeout = aiohttp.ClientTimeout(total=20)
        response = await session.get(
            url,
            proxy=socks,
            proxy_auth=auth,
            timeout=timeout,
            headers=headers,
            ssl=ssl.SSLContext()
        )
        return await response.text(), response.status
    
    
    async def download(url):
        connector = ProxyConnector()
        if DEPLOY_MODE == "dev":
            socks = None
        elif DEPLOY_MODE == "Prod":
            socks = random.choice(socks5_address_prod)
        async with aiohttp.ClientSession(
                connector=connector,
                request_class=ProxyClientRequest
        ) as session:
            ret, status = await session_get(session, url, socks)
            if 'window.location.href' in ret and len(ret) < 1000:
                url = ret.split("window.location.href='")[1].split("'")[0]
                ret, status = await session_get(session, url, socks)
            return ret, status
    
    
    async def parse_html(cid, url, response):
        coin_info = {}
        coin_value = {}
    
        coin_info['url'] = url
        coin_info['cid'] = cid
        coin_info['time'] = int(time.time())
    
        tree = etree.HTML(response)
    
        try:
            price_usd = tree.xpath(
                '//div[@class="priceInfo"]/div[@class="sub"]/span[1]/text()'
            )[0].strip().replace('$', '')
            if '?' not in price_usd:
                coin_value['price'] = float(price_usd)
        except BaseException:
            pass
    
        try:
            updown = tree.xpath(
                '//div[@class="priceInfo"]/div[@class="sub smallfont"]/span[1]/text()'
            )[0].strip().replace('%', '')
            coin_value['updown'] = float(updown)
        except BaseException:
            pass
    
        try:
            volume_24h_rmb = tree.xpath(
                '//div[@class="info"]/div[@class="charCell"][2]/div[2]/span/text()'
            )[0].strip().replace('¥', '').replace(',', '')
            coin_value['volume_24h'] = int(
                float(volume_24h_rmb) / Common.currency_rate)
        except BaseException:
            pass
    
        try:
            circulating_supply = tree.xpath(
                '//div[@class="info"]//div[@class="charCell"][1]/div[@class="val"]/text()'
            )[0].strip().replace(',', '')
            if '?' not in circulating_supply:
                circulating_supply = re.match(
                    r'^(d+)(w+)$', circulating_supply).group(1)
                coin_value['circulating_supply'] = int(circulating_supply)
        except BaseException:
            pass
    
        try:
            if coin_value['price'] and coin_value['circulating_supply']:
                market_cap = coin_value['price'] * coin_value['circulating_supply']
                coin_value['market_cap'] = market_cap
        except BaseException:
            pass
    
        try:
            if coin_value['market_cap']:
                global_share = coin_value['market_cap'] / Common.market_cap_all
                if global_share < 0.001:
                    coin_value['global_share'] = '<0.1%'
                else:
                    coin_value['global_share'] = str(
                        (global_share * 100).__round__(2)) + '%'
        except BaseException:
            pass
    
        try:
            circulation_rate = tree.xpath(
                '//div[@class="info"]//div[@class="charbox"][1]/div[@class="val"]/text()'
            )[0].strip()
            if '?' not in circulation_rate:
                coin_value['circulation_rate'] = circulation_rate
        except BaseException:
            pass
    
        try:
            turnover_rate = tree.xpath(
                '//div[@class="info"]//div[@class="charbox"][1]/div[@class="val"]/text()'
            )[1].strip()
            if '?' not in turnover_rate:
                coin_value['turnover_rate'] = turnover_rate
        except BaseException:
            pass
    
        try:
            issue_time = tree.xpath(
                '//div[@class="infoList"]/div[1]/div[1]/span[2]/text()'
            )[0].strip()
            if issue_time != '-':
                coin_value['issue_time'] = issue_time
        except BaseException:
            pass
    
        try:
            exchange_num = tree.xpath(
                '//div[@class="infoList"]/div[3]/div[1]/span[2]/text()'
            )[0].strip().replace('家', '')
            coin_value['exchange_num'] = int(exchange_num)
        except BaseException:
            pass
    
        try:
            total_circulation = tree.xpath(
                '//div[@class="infoList"]/div[2]/div[2]/span[2]/text()'
            )[0].strip().replace(',', '')
            coin_value['total_circulation'] = int(total_circulation)
        except BaseException:
            pass
    
        coin_info['value'] = coin_value
        return coin_info
    
    
    async def down_and_parse_task(queue):
        while True:
            try:
                cid, url = queue.get_nowait()[:2]
            except BaseException:
                return
            for retry_cnt in range(3):
                try:
                    html, status = await download(url)
                    if status != 200:
                        html, status = await download(url)
                    if '访问控制拒绝了你的请求' in html:
                        html, status = await download(url)
                    html_parse_result = await parse_html(cid, url, html)
                    print(html_parse_result)
                    await Common.result_queue.put(html_parse_result)
                    break
                except BaseException:
                    await asyncio.sleep(0.2)
                    continue
    
    
    async def push(data):
        url = 'http://127.0.0.1:8000/aaa'
        error = None
        for retry_cnt in range(3):
            try:
                async with aiohttp.ClientSession() as session:
                    async with session.post(
                            url,
                            data=json.dumps(data)
                    ) as response:
                        pass
                    response.raise_for_status()
            except Exception as e:
                await asyncio.sleep(0.2)
                print(e)
    
    
    async def speed_monitor():
        while Common.task_queue.qsize() != 0:
            old_queue_len = Common.task_queue.qsize()
            await asyncio.sleep(5)
            new_queue_count = Common.task_queue.qsize()
            print('=================')
            print('speed = ', (old_queue_len - new_queue_count) / 5)
    
    
    async def monitor_finish():
        while len(asyncio.Task.all_tasks()) > 3:
            await asyncio.sleep(1)
        await asyncio.sleep(5)
        raise SystemExit()
    
    
    async def push_results():
        temp_q = []
        while True:
            try:
                await asyncio.sleep(3)
                for _ in range(Common.result_queue.qsize()):
                    temp_q.append(await Common.result_queue.get())
                if len(temp_q) > 0:
                    await push(temp_q)
                    temp_q.clear()
            except BaseException:
                import traceback
                print(traceback.format_exc())
    
    
    async def get_marketcap():
        url = 'https://dncapi.feixiaohao.com/api/home/global?webp=0'
        response = requests.get(url)
        response_json = json.loads(response.text)
        marketcap = response_json['data']['marketcapvol']
        Common.market_cap_all = int(marketcap)
    
    
    async def get_currency_rate():
        url_rate = 'https://dncapi.feixiaohao.com/api/coin/web-rate/'
        response = requests.get(url_rate)
        currency_rate = json.loads(response.text)[11]['cny']
        Common.currency_rate = currency_rate
    
    # 300秒抓取时间上限
    async def time_limit():
        await asyncio.sleep(280)
        raise SystemExit()
    
    
    async def main():
        # loop = asyncio.get_event_loop()
        csv_reader = csv.reader(
            open(
                'feixiaohao_mapping_data.csv',
                encoding='utf-8'))
        for row in csv_reader:
            try:
                if row[1].startswith('https'):
                    await Common.task_queue.put(row)
            except BaseException:
                pass
        print(Common.task_queue)
    
        await get_marketcap()
        print('总市值', Common.market_cap_all)
    
        await get_currency_rate()
        print('汇率', Common.currency_rate)
    
        for _ in range(10):
            loop.create_task(down_and_parse_task(Common.task_queue))
            loop.create_task(monitor_finish())
            loop.create_task(speed_monitor())
            loop.create_task(push_results())
            loop.create_task(time_limit())
    
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        loop.create_task(main())
        loop.run_forever()
    
  • 相关阅读:
    mysql导出某张表的部分数据
    linux命令行实用快捷键
    mysql导出数据库某些表的数据
    二进制日志过期时间设置expire_logs_days
    进程信息
    安装mysql-workbench
    create user
    系统io统计
    mysqldump与mydumper
    系统负载监控
  • 原文地址:https://www.cnblogs.com/hankleo/p/10820766.html
Copyright © 2011-2022 走看看