zoukankan      html  css  js  c++  java
  • 微信公众号开发

    申请的免费二级域名用于测试

    api 接口

    微博移动端数据

    这个有点东西

    接入 微信公众号测试接口 (后台可以使用 java 或者 python 实现)

       虽然现在还是写的稀烂的,但是呢已经打通了剩下的就是优化代码结构实现更多功能了
    
    from app.app import create_app
    from flask import (request, jsonify)
    from app.libs.constants.token import TOKEN
    import hashlib
    import re
    import xml.etree.cElementTree as et
    
    app = create_app()
    
    @app.route('/', methods=['GET', 'POST'])
    def index():
        # 1)将token、timestamp、nonce三个参数进行字典序排序
        # 2)将三个参数字符串拼接成一个字符串进行sha1加密
        # 3)开发者获得加密后的字符串可与signature对比,标识该请求来源于微信
        print(request.args)
    
        if request.method == 'GET':
    
            if request.args:
                signature = request.args.get('signature')
                echostr = request.args.get('echostr')
                timestamp = request.args.get('timestamp')
                nonce = request.args.get('nonce')
                sort_list = [TOKEN, timestamp, nonce]
                sort_list.sort()
                my_signature = check_signature(''.join(sort_list))
                if signature == my_signature:
                    return request.args.get('echostr')
                else:
                    return 'check failed!'
            else:
                return 'home page'
    
        elif request.method == 'POST':
            if request.data:
                if et.fromstring(request.data.decode('utf-8')).find('MsgType').text == 'text':
    
                    received_data = parse_request(request)
                    print(received_data)
                    FromUserName = received_data.get('ToUserName')
                    ToUserName = received_data.get('FromUserName')
                    from time import time
                    CreateTime = time()
                    MsgType = received_data.get('MsgType')
                    Content = '你输入的是 {} 该回复还是固定回复'.format(received_data.get('Content'))
                    MsgId = received_data.get('MsgId')
                    print(MsgType)
                    if MsgType =='text':
                        response_template="""<xml><ToUserName><![CDATA[{}]]></ToUserName>
    
                        <FromUserName><![CDATA[{}]]></FromUserName>
    
                        <CreateTime>{}</CreateTime>
    <MsgType><![CDATA[text]]></MsgType>
    
                        <Content><![CDATA[[{}]]]></Content>
    
                        <MsgId>{}</MsgId>
    </xml>
                        """.format(ToUserName,FromUserName,CreateTime,Content,MsgId)
    
                    response_text = re.sub(r's+','',response_template)
                    print(response_text)
                    return response_text
            else:
                received_data = parse_request_non_text(request)
                print(received_data)
                FromUserName = received_data.get('ToUserName')
                ToUserName = received_data.get('FromUserName')
                from time import time
                CreateTime = time()
                MsgType = request.data.get('MsgType')
                MsgId = received_data.get('MsgId')
                print(MsgType)
                response_template = """<xml><ToUserName><![CDATA[{}]]></ToUserName>
    
                            <FromUserName><![CDATA[{}]]></FromUserName>
    
                            <CreateTime>{}</CreateTime>
    <MsgType><![CDATA[text]]></MsgType>
    
                            <Content><![CDATA[[{}]]]></Content>
    
                            <MsgId>{}</MsgId>
    </xml>
                            """.format(FromUserName, ToUserName, CreateTime, '该功能正在开发中', MsgId)
                response_text = re.sub(r's+', '', response_template)
                print(response_text)
                return response_text
    
    
    
    def check_signature(sorted_str):
        hash_sh1 = hashlib.sha1()
        hash_sh1.update(sorted_str.encode('utf-8'))
        return hash_sh1.hexdigest()
    
    def parse_request(request):
        xml_data = """
        <xml><ToUserName><![CDATA[gh_24f09e47509b]]></ToUserName>
    
        <FromUserName><![CDATA[o221Atxy35oyLgebe4A9Abfir-H0]]></FromUserName>
    
        <CreateTime>1566709896</CreateTime>
    <MsgType><![CDATA[text]]></MsgType>
    
        <Content><![CDATA[[xe5x9bxa7]]]></Content>
    
        <MsgId>22429894037448157</MsgId>
    </xml>'
        """
        xml_data = request.data.decode('utf-8')
    
        xml_rec = et.fromstring(xml_data)
        ToUserName = xml_rec.find('ToUserName').text
        FromUserName = xml_rec.find('FromUserName').text
        CreateTime = xml_rec.find('CreateTime').text
        Content = xml_rec.find('Content').text
        MsgId = xml_rec.find('MsgId').text
        MsgType = xml_rec.find('MsgType').text
        return dict(ToUserName=ToUserName,FromUserName=FromUserName,CreateTime=CreateTime,Content=Content,MsgId=MsgId, MsgType=MsgType)
    
    def parse_request_non_text(request):
        xml_data = request.data.decode('utf-8')
        xml_rec = et.fromstring(xml_data)
        ToUserName = xml_rec.find('ToUserName').text
        FromUserName = xml_rec.find('FromUserName').text
        CreateTime = xml_rec.find('CreateTime').text
        MsgId = xml_rec.find('MsgId').text
        MsgType = xml_rec.find('MsgType').text
        return dict(ToUserName=ToUserName, FromUserName=FromUserName, CreateTime=CreateTime,
                    MsgId=MsgId, MsgType=MsgType)
    
    
    app.add_url_rule('/index/',view_func=index)
    
    if __name__ == '__main__':
        app.run(debug=app.config['DEBUG'],host='0.0.0.0', port=8090)
    

    工程化 可能还是 java 好写一点 , 使用 springboot 进行后台编写

    第一步, 申请注册好 微信公众号 测试接口,这一块儿可以网上找到

    第二步, 编写 微信签名验证 接口

    第三步, 代码编写

    验证 微信服务器 签名

    controller 层编写

    package com.ghc.wechat.wechat.controller;
    
    import com.ghc.wechat.wechat.service.WechatService;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.*;
    
    /**
     * @author :Frank Li
     * @date :Created in 2019/8/26 9:32
     * @description:${description}
     * @modified By:
     * @version: $version$
     */
    
    
    @RestController
    
    public class WechatController {
        @Autowired
        private WechatService wechatService;
    
        @GetMapping(value = "/wechat")
        public String validate(@RequestParam(value="signature") String signature,
                               @RequestParam(value="timestamp") String timestamp,
                               @RequestParam(value="nonce") String nonce,
                               @RequestParam(value="echostr") String echostr){
            return  wechatService.validateSignature(timestamp,nonce,signature) ? echostr:null;
        }
    
    }
    
    

    service 层 , utils 等

    package com.ghc.wechat.wechat.service;
    
    import com.ghc.wechat.wechat.utils.WebChatUtils;
    import org.springframework.stereotype.Service;
    
    /**
     * @author :Frank Li
     * @date :Created in 2019/8/26 10:56
     * @description:${description}
     * @modified By:
     * @version: $version$
     */
    @Service
    public class WechatService {
    
        public boolean validateSignature(String timestamp, String nonce, String signature){
            return WebChatUtils.validateSignature(timestamp,nonce,signature);
        }
    }
    
    package com.ghc.wechat.wechat.utils;
    
    import com.ghc.wechat.wechat.constants.Token;
    import lombok.extern.slf4j.Slf4j;
    
    import java.security.MessageDigest;
    import java.security.NoSuchAlgorithmException;
    import java.util.Arrays;
    
    /**
     * @author :Frank Li
     * @date :Created in 2019/8/26 10:13
     * @description:${description}
     * @modified By:
     * @version: $version$
     */
    @Slf4j
    public class WebChatUtils {
    
    
        public static boolean validateSignature(String timestamp,String nonce,String signature) {
    
    //        将token、timestamp、nonce三个参数进行字典序排序
            String [] strArray = {Token.TOKEN, timestamp, nonce};
            Arrays.sort(strArray);
    //        2)将三个参数字符串拼接成一个字符串进行sha1加密
            MessageDigest sha1Digest = null;
            try{
               sha1Digest = MessageDigest.getInstance("sha1");
            }catch(NoSuchAlgorithmException ne){
                log.error(ne.getMessage());
            }
            byte [] digest =  sha1Digest.digest((strArray[0]+strArray[1]+strArray[2]).getBytes());
            char[] hexArray = {'0','1','2','3','4','5','6','7','8','9','A','B','C','D','E','F'};
            StringBuilder sb = new StringBuilder(3);
            for(byte b:digest){
                char high8 = hexArray[(b>>4)&15];
                char low8 = hexArray[b&15];
                sb.append(high8)
                        .append(low8);
            }
    //        3)开发者获得加密后的字符串可与signature对比,标识该请求来源于微信
            log.info(sb.toString()+"
    "+signature);
            boolean flag = signature.equalsIgnoreCase(sb.toString());
            log.info(String.valueOf(flag));
            return flag;
    
        }
    }
    
    
    package com.ghc.wechat.wechat.constants;
    
    /**
     * @author :Frank Li
     * @date :Created in 2019/8/26 10:20
     * @description:${description}
     * @modified By:
     * @version: $version$
     */
    public interface Token {
        String  TOKEN = "xxx";
    }
    
    
    

    修改 application.properties

    server.port=8090
    

    设置 IDEA 热部署 spring

    spring为开发者提供了一个名为spring-boot-devtools的模块来使Spring Boot应用支持热部署,提高开发者的开发效率,无需手动重启Spring Boot应用。
    
    devtools的原理
    
    深层原理是使用了两个ClassLoader,一个Classloader加载那些不会改变的类(第三方Jar包),另一个ClassLoader加载会更改的类,称为restart ClassLoader,这样在有代码更改的时候,原来的restart ClassLoader 被丢弃,重新创建一个restart ClassLoader,由于需要加载的类相比较少,所以实现了较快的重启时间。
    
    使用需要添加以下的配置:
    
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <fork>true</fork>
                </configuration>
            </plugin>
        </plugins>
    </build>
    说明:
    
    (1) devtools可以实现页面热部署(即页面修改后会立即生效,这个可以直接在application.properties文件中配置spring.thymeleaf.cache=false来实现),
    实现类文件热部署(类文件修改后不会立即生效),实现对属性文件的热部署。
    即devtools会监听classpath下的文件变动,并且会立即重启应用(发生在保存时机),注意:因为其采用的虚拟机机制,该项重启是很快的
    (2)配置了后在修改java文件后也就支持了热启动,不过这种方式是属于项目重启(速度比较快的项目重启),会清空session中的值,也就是如果有用户登陆的话,项目重启后需要重新登陆。
    
    默认情况下,/META-INF/maven,/META-INF/resources,/resources,/static,/templates,/public这些文件夹下的文件修改不会使应用重启,但是会重新加载(devtools内嵌了一个LiveReload server,当资源发生改变时,浏览器刷新)。
    
    devtools的配置
    
    在application.properties中配置spring.devtools.restart.enabled=false,此时restart类加载器还会初始化,但不会监视文件更新。
    在SprintApplication.run之前调用System.setProperty(“spring.devtools.restart.enabled”, “false”);可以完全关闭重启支持,配置内容:
    
    #热部署生效
    spring.devtools.restart.enabled: true
    #设置重启的目录
    #spring.devtools.restart.additional-paths: src/main/java
    #classpath目录下的WEB-INF文件夹内容修改不重启
    spring.devtools.restart.exclude: WEB-INF/**
    IDEA配置
    
    当我们修改了Java类后,IDEA默认是不自动编译的,而spring-boot-devtools又是监测classpath下的文件发生变化才会重启应用,所以需要设置IDEA的自动编译:
    
    (1)File-Settings-Compiler-Build Project automatically
    
    
    
    (2)ctrl + shift + alt + /,选择Registry,勾上 Compiler autoMake allow when app running
    
    
    
    测试
    
    修改类–>保存:应用会重启
    修改配置文件–>保存:应用会重启
    修改页面–>保存:应用不会重启,但会重新加载,页面会刷新(原理是将spring.thymeleaf.cache设为false,参考:Spring Boot配置模板引擎)
    
    

    爬取 公众号文章

    # -*- coding: utf-8 -*-
    from selenium import webdriver
    from selenium.webdriver.support.wait import WebDriverWait
    import requests
    import json
    import time
    import re
    import random
    
    __author__ = 'Frank Li'
    
    def get_proxy_from_local():
        res = requests.get('http://127.0.0.1:5010/get') # 本地 ip 代理池
        proxy = res.json().get('proxy')
        print(proxy)
        res.close()
        return proxy
    proxy = get_proxy_from_local()
    ORIGINAL_URL = r'https://mp.weixin.qq.com/?token=&lang=zh_CN'
    # chromeOptions = webdriver.ChromeOptions()
    # chromeOptions.add_argument("--proxy-server=http://{}".format(proxy))
    # DRIVER = webdriver.Chrome() #chrome_options=chromeOptions
    # ACCOUNT="5xxx@qq.com"
    # PASSWORD="xxx"
    
    ARTICLE_URL = r'https://mp.weixin.qq.com/cgi-bin/operate_appmsg?sub=check_appmsg_copyright_stat'
    ARTICAL_JSON_FILE = r'ARTICAL_JSON_FILE.json'
    
    def auto_login():
        DRIVER.get(ORIGINAL_URL)
    
        time.sleep(2)
        WebDriverWait(DRIVER,10).until(lambda driver:driver.find_elements_by_xpath('//*[@id="header"]/div[2]/div/div/form/div[1]/div[1]/div/span/input')[0]).send_keys(ACCOUNT)
        time.sleep(2)
        WebDriverWait(DRIVER,10).until(lambda driver:driver.find_elements_by_xpath('//*[@id="header"]/div[2]/div/div/form/div[1]/div[2]/div/span/input')[0]).send_keys(PASSWORD)
        time.sleep(2)
        WebDriverWait(DRIVER,10).until(lambda driver:driver.find_elements_by_xpath('//*[@id="header"]/div[2]/div/div/form/div[3]/label'))[0].click()
        time.sleep(2)
        time.sleep(15)
        # WebDriverWait(DRIVER,10).until(lambda driver:driver.find_elements_by_xpath('//*[@id="header"]/div[2]/div/div/form/div[4]/a'))[0].click()
        cookies = {}
        for items in DRIVER.get_cookies():
            cookies[items.get('name')] = items.get('value')
        # 这里我们可以保存为 wechatcookies.json 文件
        return cookies
    
    def get_cookies():
        with open('wechatcookies.json', 'r') as fr:
            cookies = json.loads(fr.read())
        return cookies
    
    def get_token(url=ORIGINAL_URL):
        cookies = get_cookies()
        print('cookies:
    ',cookies)
        res = requests.get(url,cookies=cookies)
        return cookies, re.findall(r'token=(d+)',str(res.url))[0]
    
    
    def get_articles(url=ARTICLE_URL, query_words='英雄'):
        cookies, token = get_token()
    
        headers = {"Accept": "application/json, text/javascript, */*; q=0.01",
                   "Accept-Encoding": "gzip, deflate, br",
                   "Accept-Language": "en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7",
                   "Cache-Control": "no-cache",
                   "Connection": "keep-alive",
                   "Content-Length": '123',
                   "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
                   "Host": "mp.weixin.qq.com",
                   "Origin":"https://mp.weixin.qq.com",
                   "Pragma": "no-cache",
                   "Referer": "https://mp.weixin.qq.com/cgi-bin/appmsg?t=media/appmsg_edit_v2&action=edit&isNew=1&type=10&share=1&token="+token+"&lang=zh_CN",
                   "Sec-Fetch-Mode": "cors",
                   "Sec-Fetch-Site": "same-origin",
                   "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.132 Safari/537.36",
                   "X-Requested-With": "XMLHttpRequest"
                   }
        data = {
            'token':token,
            'lang':'zh_CN',
            'f': 'json',
            'ajax': '1',
            'random': random.random(),
            'url': query_words,
            'allow_reprint': '0',
            'begin': '0',
            'count': '10'
        }
        res = requests.post(url,cookies=cookies,headers=headers,data=data)
        data_list = res.json().get('list')
        final_data = {}
        for item in data_list:
            title = item.get('title')
            article_type = item.get('article_type')
            cover_url = item.get('cover_url')
            head_img_url = item.get('head_img_url')
            nickname = item.get('nickname')
            url = item.get('url')
            final_data.update(dict(title=title,article_type=article_type,cover_url=cover_url,head_img_url=head_img_url,nickname=nickname,url=url))
        return final_data
    
    def serilize_obj2json_file(obj,target=ARTICAL_JSON_FILE):
        with open(target, mode='w', encoding='utf-8') as fw:
            fw.write(json.dumps(obj,ensure_ascii=False,indent=2))
            fw.flush()
    
    if __name__ == '__main__':
        # print(auto_login())
        # get_token()
        data = get_articles(query_words="问苍茫大地")
        serilize_obj2json_file(data)
    

    输出结果

    增加 分页 爬取

    # -*- coding: utf-8 -*-
    from selenium import webdriver
    from selenium.webdriver.support.wait import WebDriverWait
    import requests
    import json
    import time
    import re
    import random
    
    __author__ = 'Frank Li'
    
    def get_proxy_from_local():
        res = requests.get('http://127.0.0.1:5010/get')
        proxy = res.json().get('proxy')
        print(proxy)
        res.close()
        return proxy
    proxy = get_proxy_from_local()
    ORIGINAL_URL = r'https://mp.weixin.qq.com/?token=&lang=zh_CN'
    # chromeOptions = webdriver.ChromeOptions()
    # chromeOptions.add_argument("--proxy-server=http://{}".format(proxy))
    # DRIVER = webdriver.Chrome() #chrome_options=chromeOptions
    # ACCOUNT="5xxx@qq.com"
    # PASSWORD="xxx"
    
    ARTICLE_URL = r'https://mp.weixin.qq.com/cgi-bin/operate_appmsg?sub=check_appmsg_copyright_stat'
    ARTICAL_JSON_FILE = r'ARTICAL_JSON_FILE.json'
    
    def auto_login():
        DRIVER.get(ORIGINAL_URL)
    
        time.sleep(2)
        WebDriverWait(DRIVER,10).until(lambda driver:driver.find_elements_by_xpath('//*[@id="header"]/div[2]/div/div/form/div[1]/div[1]/div/span/input')[0]).send_keys(ACCOUNT)
        time.sleep(2)
        WebDriverWait(DRIVER,10).until(lambda driver:driver.find_elements_by_xpath('//*[@id="header"]/div[2]/div/div/form/div[1]/div[2]/div/span/input')[0]).send_keys(PASSWORD)
        time.sleep(2)
        WebDriverWait(DRIVER,10).until(lambda driver:driver.find_elements_by_xpath('//*[@id="header"]/div[2]/div/div/form/div[3]/label'))[0].click()
        time.sleep(2)
        time.sleep(15)
        # WebDriverWait(DRIVER,10).until(lambda driver:driver.find_elements_by_xpath('//*[@id="header"]/div[2]/div/div/form/div[4]/a'))[0].click()
        cookies = {}
        for items in DRIVER.get_cookies():
            cookies[items.get('name')] = items.get('value')
        # 这里我们可以保存为 wechatcookies.json 文件
        return cookies
    
    def get_cookies():
        with open('wechatcookies.json', 'r') as fr:
            cookies = json.loads(fr.read())
        return cookies
    
    def get_token(url=ORIGINAL_URL):
        cookies = get_cookies()
        # print('cookies:
    ',cookies)
        res = requests.get(url,cookies=cookies)
        return cookies, re.findall(r'token=(d+)',str(res.url))[0]
    
    
    def get_articles(begin='0',count='10',url=ARTICLE_URL, query_words='英雄'):
        cookies, token = get_token()
    
        headers = {"Accept": "application/json, text/javascript, */*; q=0.01",
                   "Accept-Encoding": "gzip, deflate, br",
                   "Accept-Language": "en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7",
                   "Cache-Control": "no-cache",
                   "Connection": "keep-alive",
                   "Content-Length": '123',
                   "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
                   "Host": "mp.weixin.qq.com",
                   "Origin":"https://mp.weixin.qq.com",
                   "Pragma": "no-cache",
                   "Referer": "https://mp.weixin.qq.com/cgi-bin/appmsg?t=media/appmsg_edit_v2&action=edit&isNew=1&type=10&share=1&token="+token+"&lang=zh_CN",
                   "Sec-Fetch-Mode": "cors",
                   "Sec-Fetch-Site": "same-origin",
                   "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.132 Safari/537.36",
                   "X-Requested-With": "XMLHttpRequest"
                   }
        data = {
            'token':token,
            'lang':'zh_CN',
            'f': 'json',
            'ajax': '1',
            'random': random.random(),
            'url': query_words,
            'allow_reprint': '0',
            'begin': begin,
            'count': count
        }
        res = requests.post(url,cookies=cookies,headers=headers,data=data)
        data_list = res.json().get('list')
        total = res.json().get('total')
    
    
        final_data = {}
        for item in data_list:
            title = item.get('title')
            article_type = item.get('article_type')
            cover_url = item.get('cover_url')
            head_img_url = item.get('head_img_url')
            nickname = item.get('nickname')
            url = item.get('url')
            final_data.update(dict(title=title,article_type=article_type,cover_url=cover_url,head_img_url=head_img_url,nickname=nickname,url=url))
        return final_data, total
    
    def iter_pages(total, query_words='问苍茫大地',begin=0,count=10):
        cookies, token = get_token()
        num = 0
        pages = int(total / count)
        print('开始爬取文章......')
        while pages>=0:
            data = {
                'token': token,
                'lang': 'zh_CN',
                'f': 'json',
                'ajax': '1',
                'random': random.random(),
                'url': query_words,
                'allow_reprint': '0',
                'begin': begin,
                'count': count
            }
            headers = {"Accept": "application/json, text/javascript, */*; q=0.01",
                       "Accept-Encoding": "gzip, deflate, br",
                       "Accept-Language": "en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7",
                       "Cache-Control": "no-cache",
                       "Connection": "keep-alive",
                       "Content-Length": '123',
                       "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
                       "Host": "mp.weixin.qq.com",
                       "Origin": "https://mp.weixin.qq.com",
                       "Pragma": "no-cache",
                       "Referer": "https://mp.weixin.qq.com/cgi-bin/appmsg?t=media/appmsg_edit_v2&action=edit&isNew=1&type=10&share=1&token=" + token + "&lang=zh_CN",
                       "Sec-Fetch-Mode": "cors",
                       "Sec-Fetch-Site": "same-origin",
                       "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.132 Safari/537.36",
                       "X-Requested-With": "XMLHttpRequest"
                       }
    
            res = requests.post(ARTICLE_URL, cookies=cookies, headers=headers, data=data)
            data_list = res.json().get('list')
    
            final_data = {}
            for item in data_list:
                title = item.get('title')
                article_type = item.get('article_type')
                cover_url = item.get('cover_url')
                head_img_url = item.get('head_img_url')
                nickname = item.get('nickname')
                url = item.get('url')
                final_data.update(
                    dict(title=title, article_type=article_type, cover_url=cover_url, head_img_url=head_img_url,
                         nickname=nickname, url=url))
            serilize_obj2json_file(final_data)
            pages -= 1
            num += 1
            begin = str(num * count)
            time.sleep(2)
    
        print('完成爬取, 共 爬取 {} 页...'.format(pages))
    
    
    
    
    
    def serilize_obj2json_file(obj,target=ARTICAL_JSON_FILE):
        with open(target, mode='a', encoding='utf-8') as fw:
            fw.write(json.dumps(obj,ensure_ascii=False,indent=2))
            fw.flush()
    
    if __name__ == '__main__':
        # print(auto_login())
        # get_token()
        data,total = get_articles(query_words="苍茫大地")
        # serilize_obj2json_file(data)
        iter_pages(total)
    

    使用 开源工具包

    pip install wechatsogou --upgrade
    
    from time import perf_counter
    from requests import get
    # https://github.com/tesseract-ocr/tessdata.git
    # https://codeload.github.com/tesseract-ocr/tessdata/zip/master
    
    
    class ProcessBar:
        def __init__(self, started_char='[', ended_char=']', finished_char='-', next_finished_char='>',
                     unfinished_char=' '):
            self.started_char = started_char
            self.ended_char = ended_char
            self.finished_char = finished_char
            self.next_finished_char = next_finished_char
            self.unfinished_char = unfinished_char
    
        def intermediate_state(self, finished):
            if finished < 100:
                return f'{self.started_char}{self.finished_char*finished}{self.next_finished_char}' 
                    f'{self.unfinished_char*(99-finished)}{self.ended_char} {finished}% {self.time_format()}'
            return f'{self.started_char}{self.finished_char*finished}{self.ended_char} {finished}% {self.time_format()}'
    
        @staticmethod
        def time_format():
            second = int(perf_counter())
            minute = hour = day = 0
            if second >= 60:
                minute = second//60
                second = second % 60
            if minute >= 60:
                hour = minute//60
                minute = minute % 60
            if hour >= 24:
                day = hour//24
                hour = hour % 24
            if second < 10:
                second = f'0{second}'
            if minute < 10:
                minute = f'0{minute}'
            if hour < 10:
                hour = f'0{hour}'
            return f'{day}d {hour}:{minute}:{second}'
    
    
    class GithubDownloader:
        def __init__(self, url, file_name, headers=None, proxies=None, download_location=''):
            self.url = url
            self.headers = headers
            self.proxies = proxies
            self.file_name = file_name
            self.download_location = download_location
    
        @staticmethod
        def format_unit(byte):
            if byte >= 1024:
                kb = byte/1024
            else:
                return'%7.2fB' % byte
            if kb >= 1024:
                mb = kb/1024
            else:
                return'%7.2fKB' % kb
            if mb >= 1024:
                gb = mb/1024
            else:
                return'%7.2fMB' % mb
            if gb >= 1024:
                tb = gb/1024
                return'%7.2fTB' % tb
            return'%7.2fGB' % gb
    
        def download(self):
            process_bar = ProcessBar()
            with get(self.url, headers=self.headers, proxies=self.proxies, stream=True)as response:
                if response.headers.get('Content-Length'):
                    file_size = int(response.headers['Content-Length'])
                    print('文件大小:', file_size)
                    with open(self.download_location + self.file_name, 'wb')as file:
                        current_size = 0
                        print('开始下载……')
                        for chunk in response.iter_content(chunk_size=1024):
                            finished = int(current_size / file_size * 100)
                            print('下载进度:', process_bar.intermediate_state(finished), end='
    ', flush=True)
                            current_size += len(chunk)
                            if chunk:
                                file.write(chunk)
                        print('下载完成!', process_bar.intermediate_state(100), end='
    ', flush=True)
                else:
                    with open(self.download_location + self.file_name, 'wb')as file:
                        current_size = 0
                        print('开始下载……')
                        for chunk in response.iter_content(chunk_size=1024):
                            print(f'已下载:{self.format_unit(current_size)} {ProcessBar.time_format()}', end='
    ',
                                  flush=True)
                            current_size += len(chunk)
                            if chunk:
                                file.write(chunk)
                        print('下载完成!', process_bar.intermediate_state(100), end='
    ', flush=True)
    
    
    if __name__ == '__main__':
        github_downloader = GithubDownloader('https://codeload.github.com/tesseract-ocr/tessdata/zip/master', 'master.zip',
                                             headers={
                'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,applicatio'
                          'n/signed-exchange;v=b3', 'Accept-Encoding': 'gzip, deflate, br',
                'Accept-Language': 'zh-CN,zh;q=0.9', 'Connection': 'keep-alive', 'Host': 'codeload.github.com',
                'Upgrade-Insecure-Requests': '1',
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0'
                              '.3770.100 Safari/537.36'},
                                             proxies={'https': 'socks5://localhost:1080'})
        github_downloader.download()
    
  • 相关阅读:
    MD5验签同一字符串得到不同的MD5签名值可能问题之一
    Git本地仓库与远程github同步的时候提示fatal: remote origin already exists 错误解决办法
    SVN Error: Unreadable path encountered; access denied;
    2018年终个人总结
    ant编译无法依赖rt.jar
    ORA-00980: 同义词转换不再有效
    二叉树的深度和广度优先遍历
    Missing HTTP Strict-Transport-Security Header (HSTS) 解决
    单例模式
    sql 替换字段中部分内容
  • 原文地址:https://www.cnblogs.com/Frank99/p/11407237.html
Copyright © 2011-2022 走看看