zoukankan      html  css  js  c++  java
  • WeChat on Web 部分功能模拟实现

    Flask

    from flask import Flask,request,render_template,session,jsonify
    import time
    import requests
    import re
    from bs4 import BeautifulSoup
    import json
    
    app = Flask(__name__)
    app.debug = True
    app.secret_key = 'salt'
    
    
    
    def xml_parser(text):
        dic = {}
        soup = BeautifulSoup(text,'html.parser')
        div = soup.find(name='error')
        for item in div.find_all(recursive=False):
            dic[item.name] = item.text
        return dic
    
    @app.route('/login',methods=['GET','POST'])
    def login():
        if request.method == 'GET':
            ctime = str(int(time.time() * 1000))
            qcode_url = "https://login.wx.qq.com/jslogin?appid=wx782c26e4c19acffb&redirect_uri=https%3A%2F%2Fwx.qq.com%2Fcgi-bin%2Fmmwebwx-bin%2Fwebwxnewloginpage&fun=new&lang=zh_CN&_={0}".format(ctime)
    
            ret = requests.get(qcode_url)
            qcode = re.findall('uuid = "(.*)";',ret.text)[0]
            session['qcode'] = qcode
            return render_template('login.html',qcode=qcode)
        else:
            pass
    
    @app.route('/check_login')
    def check_login():
        """
        发送GET请求检测是否已经扫码、登录
        https://login.wx.qq.com/cgi-bin/mmwebwx-bin/login?loginicon=true&uuid=QbeUOBatKw==&tip=0&r=-1036255891&_=1525749595604
        :return:
        """
        response = {'code':408}
        qcode = session.get('qcode')
        ctime = str(int(time.time() * 1000))
        check_url = "https://login.wx.qq.com/cgi-bin/mmwebwx-bin/login?loginicon=true&uuid={0}&tip=0&r=-1036255891&_={1}".format(qcode,ctime)
        ret = requests.get(check_url)
        if "code=201" in ret.text:
            # 扫码成功
            src = re.findall("userAvatar = '(.*)';",ret.text)[0]
            response['code'] = 201
            response['src'] = src
        elif 'code=200' in ret.text:
            # 确认登录
            redirect_uri = re.findall('redirect_uri="(.*)";',ret.text)[0]
    
            # 向redirect_uri地址发送请求,获取凭证相关信息
            redirect_uri = redirect_uri + "&fun=new&version=v2"
            ticket_ret = requests.get(redirect_uri)
            ticket_dict = xml_parser(ticket_ret.text)
            session['ticket_dict'] = ticket_dict
            session['ticket_cookie'] = ticket_ret.cookies.get_dict()
    
            response['code'] = 200
        return jsonify(response)
    
    @app.route('/index')
    def index():
        """
        用户数据的初始化
        https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxinit?r=-1039465096&lang=zh_CN&pass_ticket=q9TOX4RI4VmNiHXW9dUUl1oMzoQK2X2f3H3kn0VYm5YGNwUMO2THYMznv8DSXqp0
    
        :return:
        """
        ticket_dict = session.get('ticket_dict')
        init_url = "https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxinit?r=-1039465096&lang=zh_CN&pass_ticket={0}".format(ticket_dict.get('pass_ticket'))
    
        data_dict = {
            "BaseRequest":{
                "DeviceID":"e750865687999321",
                "Sid":ticket_dict.get('wxsid'),
                "Uin":ticket_dict.get('wxuin'),
                "Skey":ticket_dict.get('skey'),
            }
        }
    
        init_ret = requests.post(
            url=init_url,
            json=data_dict
        )
        init_ret.encoding = 'utf-8'
        user_dict = init_ret.json()
    
        session['current_user'] = user_dict['User']
    
    
        return render_template('index.html',user_dict=user_dict)
    
    @app.route('/get_img')
    def get_img():  # 获取头像
        current_user = session['current_user']
        ticket_cookie = session.get('ticket_cookie')
        head_url = "https://wx.qq.com" + current_user['HeadImgUrl']
        img_ret = requests.get(head_url, cookies=ticket_cookie, headers={'Content-Type': 'image/jpeg'})
    
        return img_ret.content
    
    @app.route('/user_list')
    def user_list():    # 获取联系人列表
        ticket_dict = session.get('ticket_dict')
        ticket_cookie = session.get('ticket_cookie')
    
        ctime = int(time.time() * 1000)
        skey = ticket_dict.get('skey')
        user_list_url = "https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxgetcontact?lang=zh_CN&r={0}&seq=0&skey={1}".format(ctime,skey)
    
        r1 = requests.get(user_list_url, cookies=ticket_cookie)
        r1.encoding = 'utf-8'
        wx_user_dict = r1.json()
        # for item in wx_user_dict:
        #     print(item)
        # print(wx_user_dict['MemberCount'])
        # for item in wx_user_dict['MemberList']:
        #     print(item)
    
        return render_template('user_list.html',wx_user_dict=wx_user_dict)
    
    @app.route('/send',methods=['GET','POST'])
    def send():     # 模拟发送消息
        if request.method == "GET":
            return render_template('send.html')
        current_user = session['current_user']
        ticket_dict = session.get('ticket_dict')
        from_user = current_user['UserName']
        to = request.form.get('to')
        content = request.form.get('content')
        ctime = str(time.time()*1000)
        msg_url = "https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxsendmsg?lang=zh_CN&pass_ticket={0}".format(ticket_dict['pass_ticket'])
    
        data_dict = {
            'BaseRequest':{
                "DeviceID": "e750865687999321",
                "Sid": ticket_dict.get('wxsid'),
                "Uin": ticket_dict.get('wxuin'),
                "Skey": ticket_dict.get('skey'),
            },
            'Msg':{
                'ClientMsgId':ctime,
                'LocalID':ctime,
                'FromUserName':from_user,
                'ToUserName':to,
                "Content":content,
                'Type':1
            },
            'Scene':0
        }
    
        ret = requests.post(
            url=msg_url,
            data=bytes(json.dumps(data_dict,ensure_ascii=False),encoding='utf-8')
        )
    
        return ret.text
    
    if __name__ == '__main__':
        app.run()
    manage.py
    <body>
    <div style=" 200px;margin: 0 auto">
        <h1 style="text-align: center">微信登录</h1>
        <img id="img" style="height: 200px; 200px;" src="https://login.weixin.qq.com/qrcode/{{qcode}}" alt="">
    </div>
    
    <script src="/static/jquery-1.12.4.js"></script>
    <script>
        $(function () {
            checkLogin();
        })
    
        function checkLogin() {
            $.ajax({
                url: '/check_login',
                type: 'GET',
                dataType: 'JSON',
                success: function (arg) {
                    if (arg.code === 201) {
                        // 扫码
                        $('#img').attr('src', arg.src);
                        checkLogin();
                    } else if (arg.code === 200) {
                        // 重定向到用户列表
                        location.href = '/index'
                    } else {
                        checkLogin();
                    }
                }
            })
        }
    
    </script>
    
    </body>
    login.html
    <body>
        <h1>欢迎登录:{{user_dict.User.NickName}}</h1>
    
        <div>
            <img src="/get_img" alt="">
            <h2>{{user_dict.User.NickName}}</h2>
            <h2>{{user_dict.User.UserName}}</h2>
        </div>
    
    
        <h3>最近联系人</h3>
        <ul>
            {% for user in user_dict.ContactList%}
            <li>{{user.NickName}}{{user}}</li>
            {% endfor %}
        </ul>
        <a href="/user_list">查看所有联系人</a>
    </body>
    index.html
    <body>
    <div>
        <div style=" 30%;float:left;">
            <h3>{{ wx_user_dict.MemberCount}}</h3>
    
            <ul>
                {% for item in wx_user_dict.MemberList %}
                <li>{{item.NickName}} ====> {{item.UserName}}</li>
                {% endfor %}
            </ul>
    
        </div>
        <div style=" 70%;float:left;"></div>
    </div>
    </body>
    user_list.html
    <form action="" method="post">
        <p><input type="text" name="to" placeholder="请在这里输入联系人ID" ></p>
        <input type="text" name="content" >
        <input type="submit" value="发送" >
    </form>
    </body>
    send.html

    Django

    import re
    import time
    import json
    import requests
    from django.shortcuts import render
    from django.shortcuts import HttpResponse
    
    # 当前时间戳
    CURRENT_TIME = None
    QCODE = None
    
    LOGIN_COOKIE_DICT = {}
    TICKET_COOKIE_DICT = {}
    TICKET_DICT = {}
    TIPS = 1
    
    USER_INIT_DATA = {}
    
    
    def login(request):
        base_qcode_url = 'https://login.wx.qq.com/jslogin?appid=wx782c26e4c19acffb&redirect_uri=https%3A%2F%2Fwx.qq.com%2Fcgi-bin%2Fmmwebwx-bin%2Fwebwxnewloginpage&fun=new&lang=zh_CN&_={0}'
        global CURRENT_TIME
        CURRENT_TIME = str(time.time())
        q_code_url = base_qcode_url.format(CURRENT_TIME)
        response = requests.get(q_code_url)
        # 二维码后缀
        code = re.findall('uuid = "(.*)";', response.text)[0]
        global QCODE
        QCODE = code
        return render(request, 'login.html', {'code': code})
    
    
    def long_polling(request):
        print('polling....')
        ret = {'status': 408, 'data': None}
        # https://login.wx.qq.com/cgi-bin/mmwebwx-bin/login?loginicon=true&uuid=IZpsHyzTNw==&tip=1&r=-897465901&_=1486956149964
        # 408,201,200
        try:
            global TIPS
            base_login_url = 'https://login.wx.qq.com/cgi-bin/mmwebwx-bin/login?loginicon=true&uuid={0}&tip={1}&r=-897465901&_={2}'
            login_url = base_login_url.format(QCODE, TIPS, CURRENT_TIME)
            response_login = requests.get(login_url)
            if "window.code=201" in response_login.text:
                TIPS = 0
                avatar = re.findall("userAvatar = '(.*)';", response_login.text)[0]
                ret['data'] = avatar
                ret['status'] = 201
            elif 'window.code=200' in response_login.text:
                # 扫码点击确认后,获取cookie
                LOGIN_COOKIE_DICT.update(response_login.cookies.get_dict())
                redirect_uri = re.findall('redirect_uri="(.*)";', response_login.text)[0]
                redirect_uri += '&fun=new&version=v2&lang=zh_CN'
    
                # 获取票据,Cookie,返回值
                response_ticket = requests.get(redirect_uri, cookies=LOGIN_COOKIE_DICT)
                TICKET_COOKIE_DICT.update(response_ticket.cookies.get_dict())
                print(response_ticket.text)
                from bs4 import BeautifulSoup
                soup = BeautifulSoup(response_ticket.text, 'html.parser')
                for tag in soup.find():
                    TICKET_DICT[tag.name] = tag.string
    
                ret['status'] = 200
        except Exception as e:
            print(e)
        return HttpResponse(json.dumps(ret))
    
    
    def index(request):
        # 初始化用户基本信息
        # https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxinit?r=-909239606&lang=zh_CN&pass_ticket=Tpc2XEec%252BJ0q2qNRw6nqWzGSsQ3jM2LZtBCVJZfjvMTDxjiyJ9mO5eRtCNOveeXO
    
    
        user_init_url = 'http://wx.qq.com/cgi-bin/mmwebwx-bin/webwxinit?pass_ticket=%s&r=%s' % (
            TICKET_DICT['pass_ticket'], int(time.time()))
    
        form_data = {
            'BaseRequest': {
                'DeviceID': 'e531777446530354',
                'Sid': TICKET_DICT['wxsid'],
                'Skey': TICKET_DICT['skey'],
                'Uin': TICKET_DICT['wxuin']
            }
        }
        all_cookie_dict = {}
        all_cookie_dict.update(LOGIN_COOKIE_DICT)
        all_cookie_dict.update(TICKET_COOKIE_DICT)
    
        response_init = requests.post(user_init_url, json=form_data, cookies=all_cookie_dict)
        response_init.encoding = 'utf-8'
        user_init_data = json.loads(response_init.text)
        # for k,v in user_init_data.items():
        #     print(k,v)
        USER_INIT_DATA.update(user_init_data)
        """
        form_data = {
            'BaseRequest':{
            'DeviceID': 'e531777446530354',
            'Sid': TICKET_DICT['wxsid'],
            'Skey': TICKET_DICT['skey'],
            'Uin': TICKET_DICT['wxuin']
            }
        }
        all_cookie_dict = {}
        all_cookie_dict.update(LOGIN_COOKIE_DICT)
        all_cookie_dict.update(TICKET_COOKIE_DICT)
    
        response_init = requests.post(user_init_url,json=form_data,)
        response_init.encoding = 'utf-8'
        print(response_init.text)
        """
    
        return render(request, 'index.html', {'data': user_init_data})
    
    
    def contact_list(request):
        """
        获取联系人列表
        :param request:
        :return:
        """
        # https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxgetcontact?lang=zh_CN&pass_ticket={0}&r={1}&seq=0&skey={2}
        base_url = "https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxgetcontact?lang=zh_CN&pass_ticket={0}&r={1}&seq=0&skey={2}"
        url = base_url.format(TICKET_DICT['pass_ticket'], str(time.time()), TICKET_DICT['skey'])
    
        all_cookie_dict = {}
        all_cookie_dict.update(LOGIN_COOKIE_DICT)
        all_cookie_dict.update(TICKET_COOKIE_DICT)
        response = requests.get(url, cookies=all_cookie_dict)
        response.encoding = 'utf-8'
        contact_list_dict = json.loads(response.text)
        return render(request, 'contact_list.html', {'obj': contact_list_dict})
    
    
    def send_msg(request):
        for k, v in USER_INIT_DATA.items():
            print(k, "======>", v)
        from_user_id = USER_INIT_DATA['User']['UserName']
        to_user_id = request.POST.get('user_id')
        msg = request.POST.get('user_msg')
    
        send_url = "https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxsendmsg?lang=zh_CN&pass_ticket=" + TICKET_DICT['pass_ticket']
    
        form_data = {
            'BaseRequest': {
                'DeviceID': 'e531777446530354',
                'Sid': TICKET_DICT['wxsid'],
                'Skey': TICKET_DICT['skey'],
                'Uin': TICKET_DICT['wxuin']
            },
            'Msg': {
                "ClientMsgId": str(time.time()),
                "Content": '%(content)s',
                "FromUserName": from_user_id,
                "LocalID": str(time.time()),
                "ToUserName": to_user_id,
                "Type": 1
            },
            'Scene': 0
        }
        import json
        # 字符串
        form_data_str = json.dumps(form_data)
        # 进行格式化
        form_data_str = form_data_str % {'content': msg}
    
        # 转换成字节
        form_data_bytes = bytes(form_data_str, encoding='utf-8')
    
        all_cookie_dict = {}
        all_cookie_dict.update(LOGIN_COOKIE_DICT)
        all_cookie_dict.update(TICKET_COOKIE_DICT)
    
        response = requests.post(send_url, data=form_data_bytes, cookies=all_cookie_dict, headers={
            'Content-Type': 'application/json'})
        print(response.text)
    
        return HttpResponse('ok')
    
    
    def get_msg(request):
        # 检查是否有新消息到来
        sync_url = "https://webpush.weixin.qq.com/cgi-bin/mmwebwx-bin/synccheck"
    
        sync_data_list = []
        for item in USER_INIT_DATA['SyncKey']['List']:
            temp = "%s_%s" % (item['Key'], item['Val'])
            sync_data_list.append(temp)
        sync_data_str = "|".join(sync_data_list)
        nid = int(time.time())
        sync_dict = {
            "r": nid,
            "skey": TICKET_DICT['skey'],
            "sid": TICKET_DICT['wxsid'],
            "uin": TICKET_DICT['wxuin'],
            "deviceid": "e531777446530354",
            "synckey": sync_data_str
        }
        all_cookie = {}
        all_cookie.update(LOGIN_COOKIE_DICT)
        all_cookie.update(TICKET_COOKIE_DICT)
        response_sync = requests.get(sync_url, params=sync_dict, cookies=all_cookie)
        print(response_sync.text)
        if 'selector:"2"' in response_sync.text:
            fetch_msg_url = "https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxsync?sid=%s&skey=%s&lang=zh_CN&pass_ticket=%s" % (
                TICKET_DICT['wxsid'], TICKET_DICT['skey'], TICKET_DICT['pass_ticket'])
    
            form_data = {
                'BaseRequest': {
                    'DeviceID': 'e531777446530354',
                    'Sid': TICKET_DICT['wxsid'],
                    'Skey': TICKET_DICT['skey'],
                    'Uin': TICKET_DICT['wxuin']
                },
                'SyncKey': USER_INIT_DATA['SyncKey'],
                'rr': str(time.time())
            }
            response_fetch_msg = requests.post(fetch_msg_url, json=form_data)
            response_fetch_msg.encoding = 'utf-8'
            res_fetch_msg_dict = json.loads(response_fetch_msg.text)
            USER_INIT_DATA['SyncKey'] = res_fetch_msg_dict['SyncKey']
            for item in res_fetch_msg_dict['AddMsgList']:
                print(item['Content'], ":::::", item['FromUserName'], "---->", item['ToUserName'], )
        return HttpResponse('ok')
    View.py
    <body>
        <div style=" 300px;margin: 0 auto;">
            <!-- 二维码图片路径 -->
            <img id="qcode"  style=" 300px;height: 300px;" src="https://login.weixin.qq.com/qrcode/{{ code }}">
        </div>
        <script src="/static/jquery-1.12.4.js"></script>
        <script>
            $(function () {
                polling();
            });
    
            function polling(){
                $.ajax({
                    url: '/polling/',
                    type: "GET",
                    dataType: 'json',
                    success: function(arg){
                        if(arg.status == 408){
                            polling();
                        }else if(arg.status == 201){
                            // 获取图片接着发
                            $('#qcode').attr('src', arg.data);
                            polling();
                        }else {
                            window.location.href = '/index/'
                        }
                    }
                })
            }
        </script>
    </body>
    login.html
    <body>
        <h1>个人信息</h1>
        <div>
            <img src="https://wx.qq.com{{ data.User.HeadImgUrl }}">
        </div>
        <div>
            {{ data.User.NickName }} - {{ data.User.UserName }}
        </div>
        <h1>联系人列表</h1>
        <ul>
        {% for row in data.ContactList%}
            <li>{{ row.UserName }} - {{ row.NickName }}</li>
        {% endfor %}
        <li><a href="/contact_list/">获取更多联系人</a></li>
        </ul>
    
        <h1>公众号</h1>
        {% for row in data.MPSubscribeMsgList%}
            <div style="font-weight: bolder">{{ row.NickName }}</div>
            {% for i in row.MPArticleList %}
                <div>
                    <div><a href="{{ i.Url }}">{{ i.Title }}</a></div>
                    <div style="color: #dddddd">{{ i.Digest }}</div>
                </div>
    
            {% endfor %}
    
        {% endfor %}
    
    </body>
    index.html
    <body>
        <h1>发送消息</h1>
        <div>
            <p><input id="user_id" type="text" placeholder="请输入用户唯一ID" /></p>
            <p><input id='user_msg' type="text" placeholder="请输入内容" /></p>
            <input id="sendMsg" type="button" value="发送"  />
        </div>
        <ul>
            {% for row in obj.MemberList %}
                <li>{{ row.NickName }} - {{ row.UserName }} -{{ row.Province }}</li>
            {% endfor %}
        </ul>
        <script src="/static/jquery-1.12.4.js"></script>
        <script>
            $(function () {
                bindSendMessage();
                fetchMessage();
            });
            function bindSendMessage() {
                $('#sendMsg').click(function () {
                    $.ajax({
                        url: '/send_msg/',
                        type: 'POST',
                        data: {'user_id': $('#user_id').val(), 'user_msg': $('#user_msg').val()},
                        success:function () {
    
                        }
                    })
                });
            }
        
            function fetchMessage(){
                $.ajax({
                    url: '/get_msg/',
                    type: 'GET',
                    success:function (arg) {
                        fetchMessage();
                    }
                })
            }
        </script>
    </body>
    contact_list.html

    参考链接:

    1.http://www.cnblogs.com/yangxt90/articles/9009444.html

    2.https://blog.csdn.net/wonxxx/article/details/51787041

  • 相关阅读:
    linux系统 ssh免密码登录服务器主机
    QPS TPS 并发数 响应时间
    mysqlmtop开源系统监控mysql数据库性能
    jenkins 自动化构建 部署 回滚配置
    实用Linux命令记录
    关于高并发的几个基础问题
    tcp四次挥手
    时序数据异常检测相关的问题记录
    判断时序数据的周期性
    最短路径问题
  • 原文地址:https://www.cnblogs.com/iyouyue/p/9017320.html
Copyright © 2011-2022 走看看