zoukankan      html  css  js  c++  java
  • 四、小电视自动抽奖

    自动参加小电视抽奖
    自动领取舰长、提督奖励
    运行一天大概能获取1800包辣条
    However. 每次凌晨B站后台都会检测账号脚本,一次抢太多很容易被封号一周。
    可能是依据最大的连续领奖次数判断的吧

    图片.png
    一、抽奖流程
    1. 分析页面可知
    每当有人赠送小电视之类的礼物时
    系统就会发送广播弹幕(SYS_MSG)
    2. 点击广播跳转到抽奖房间
    F12清空网络请求
    接着点击抽奖
    就找到了抽奖请求

    POST https://api.live.bilibili.com/gift/v3/smalltv/join
    

    参数
    图片.png

    响应
    图片.png
    3. 分析post参数
    roomid系统广播里就有
    raffleId是抽奖关键参数
    type 固定为 Gift
    csrf_token 来自于cookie值bili_jct (可为空)
    visit_id (可为空)
    4. 刷新抽奖页面
    F12过滤得到XHR 请求
    逐个分析返回值和url请求
    发现raffleId请求

    GET  https://api.live.bilibili.com/gift/v3/smalltv/check
    data={
    'roomid':roomid
     }
    

    响应
    图片.png
    分析结果

    1. 小电视抽奖和月色真美、摩天大楼
      获取raffleId的请求略有不同
    https://api.live.bilibili.com/gift/v3/smalltv/check
    https://api.live.bilibili.com/gift/v2/smalltv/check
    

    参加抽奖的请求相同
    二、代码实现
    api.py 添加Gift类

    class Gift():
        def check_smalltv(self,s,roomid,ver='v2'):
            url='https://api.live.bilibili.com/gift/'+ver+'/smalltv/check'
            data={'roomid': roomid}
            return ajax(s,url,'GET',data)
        def join_smalltv(self,s,roomid,raffleId,csrf_token,visit_id='', type = 'Gift'):
            url= 'https://api.live.bilibili.com/gift/v3/smalltv/join'
            data={
                    'roomid': roomid,
                    'raffleId': raffleId,
                    'type': type,
                    'csrf_token': csrf_token,
                    'visit_id': visit_id
                    }
            return ajax(s,url,'POST',data)
        def check_guard(self,s,roomid,_type='_guard'):
            url='https://api.live.bilibili.com/lottery/v1/Lottery/check'+_type+'?roomid='+str(roomid)
            return ajax(s,url,'GET');
        def join_guard(self,s,roomid,id,csrf_token,type='guard',visit_id=None):
            url= 'https://api.live.bilibili.com/lottery/v2/Lottery/join'
            data= {
                    'roomid': roomid,
                    'id': id,
                    'type': type,
                    'csrf_token': csrf_token,
                    'visit_id': visit_id
                }
            return ajax(s,url,'POST',data)
    

    server.py添加抽奖类

    class Join:
        def __init__(self,Info,s):
            self.Info = Info
            self.check_roomids=[Info['roomid']]
            self.raffleId=0
            self.hour_run=Timer(7.2e3,self.each_hour_run)
            self.s=s
            self.num=1
        def join_smalltv(self,obj):
            if self.num==50:
                self.num=0
                print('[跳过抽奖]['+obj['title']+']已跳过抽奖(roomid=' + str(obj['roomid'])+ ',raffleId=' + str(obj['raffleId'])+ ')')
                return
            self.num+=1
            res=API.Gift.join_smalltv(self.s,obj['roomid'], obj['raffleId'],self.Info['csrf_token'],self.Info['visit_id'])
            code=res['code']
            if code==0:Logger.info('[自动抽奖]['+obj['title']+']已参加抽奖(roomid=' + str(obj['roomid'])+ ',raffleId=' + str(obj['raffleId'])+ ')')
            elif code==400:Logger.warning('[自动抽奖][礼物抽奖]访问被拒绝,您的帐号可能已经被封禁,已停止')
            elif code==65531:Logger.info('[自动抽奖][礼物抽奖]已参加抽奖(roomid=' + str(obj['roomid'])+ ',raffleId=' + str(obj['raffleId'])+ ')')
            else:Logger.error('[自动抽奖][礼物抽奖]已参加抽奖(roomid=' + str(obj['roomid'])+ ',raffleId=' + str(obj['raffleId'])+ ')'+res['msg'])
        def push_and_check_roomid(self,roomid):
            self.push_roomid(roomid)
            for roomid in self.check_roomids:self.check_and_join_guard(roomid)
        def push_roomid(self,roomid):
            if roomid in self.check_roomids:return
            self.check_roomids.append(roomid)
        def each_hour_run(self):
            res=API.Room.room_rank(self.s,"master_realtime_hour",'areaid_realtime_hour')
            for room in res['data']['list']:self.push_roomid(room['roomid'])
            res=API.Room.room_rank(self.s,"week_star_master",'week')
            for room in res['data']['list']:self.push_roomid(room['roomid'])
        def check_and_join_smalltv(self,roomid):
            res=API.Gift.check_smalltv(self.s,roomid,'v2')
            if res['code']==-400:return
            for data in res['data']:
                if data['raffleId']<=self.raffleId:continue
                self.raffleId=data['raffleId']
                self.join_smalltv({'roomid':roomid,'raffleId':self.raffleId,'title':data['title']})
            if len(res['data'])==0:
                res=API.Gift.check_smalltv(self.s,roomid,'v3')
                for data in res['data']['list']:
                    if data['raffleId']<=self.raffleId:continue
                    self.raffleId=data['raffleId']
                    self.join_smalltv({'roomid':roomid,'raffleId':self.raffleId,'title':data['title']})
            self.push_and_check_roomid(roomid)
        def join_guard(self,obj):
            res=API.Gift.join_guard(self.s,obj['roomid'],obj['id'],self.Info['csrf_token'])
            print(obj['roomid'])
            Logger.info(res['data']['message'])
        def check_and_join_guard(self,roomid):
            res=API.Gift.check_guard(self.s,roomid)
            if len(res['data'])==0:return
            for guard in res['data']:self.join_guard({'id':guard['id'],'roomid':roomid})
            res=API.Gift.check_guard(self.s,roomid,'')
            if len(res['data']['guard'])==0:return
            for guard in res['data']['guard']:self.join_guard({'id':guard['id'],'roomid':roomid})
    

    utils.py添加日志类

    @singleton
    class Logger:
        def __init__(self):
            handlers = {
                logging.NOTSET: "logs/notset.logs",
                logging.DEBUG: "logs/debug.logs",
                logging.INFO: "logs/info.logs",
                logging.WARNING: "logs/warning.logs",
                logging.ERROR: "logs/error.logs",
                logging.CRITICAL: "logs/critical.logs"
            }
            self.__loggers = {}
            logLevels = handlers.keys()
            fmt = logging.Formatter('%(asctime)s [%(levelname)s]: %(message)s')
            for level in logLevels: #创建logger
                logger = logging.getLogger(str(level))
                logger.setLevel(level)
                #创建hander用于写日日志文件
                log_path = os.path.abspath(handlers[level])
                fh = logging.FileHandler(log_path) #定义日志的输出格式
                fh.setFormatter(fmt)
                fh.setLevel(level) #给logger添加hander
                logger.addHandler(fh)
                self.__loggers.update({level: logger})
        def info(self, message):
            self.__loggers[logging.INFO].info(message)
        def error(self, message):
            self.__loggers[logging.ERROR].error(message)
        def warning(self, message):
            self.__loggers[logging.WARNING].warning(message)
        def debug(self, message):
            self.__loggers[logging.DEBUG].debug(message)
        def critical(self, message):
            self.__loggers[logging.CRITICAL].critical(message)
    

    utils.py添加 保存配置文件函数

    def save_py(moudle):
        keys,py=dir(moudle),''
        for key in keys:
            if '__' in key:continue
            py+=key+'='+json.dumps(getattr(moudle,key),indent=4, separators=(',', ':'))+'
    '
        with open(moudle.__name__+".py",'w') as f:f.write(py)
    

    添加配置文件 config.py

    headers={
        "User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:62.0) Gecko/20100101 Firefox/62.0",
        "Accept":"application/json, text/plain, */*",
        "Accept-Language":"zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
        "Accept-Encoding":"gzip, deflate, br",
        "Referer":"https://live.bilibili.com/",
        "Origin":"https://live.bilibili.com",
        "Connection":"keep-alive"
    }
    msg_info={
        "reject_msg":[
            "SEND_GIFT",
            "WELCOME",
            "WELCOME_GUARD",
            "COMBO_SEND",
            "ENTRY_EFFECT",
            "NOTICE_MSG",
            "DANMU_MSG",
            "ROOM_RANK",
            "WISH_BOTTLE",
            "COMBO_END"
        ],
        "receive_msg":[
            "SYS_MSG",
            "SPECIAL_GIFT"
        ]
    }
    post_info={
        "csrf_token":"",
        "visit_id":"",
        "roomid":392351,
        "uid":380741418
    }
    ws_info={
        "uid":380741418,
        "roomid":392351,
        "protover":1,
        "platform":"web",
        "clientver":"1.4.0"
    }
    
    

    主程序

    import os
    os.chdir('/data/jupyter/root/软件/Bilibili')
    from jquery import http,session
    from servers import Join,Login
    from DanmuWS import DanmuWebSocket
    from config import *
    from utils import save_py
    import config
    %matplotlib inline
    #save_py(config)
    s=session(headers,'config/2685054765.txt')
    login=Login(s)
    join=Join(post_info,s)
    ws=None
    while not login.isLogin():
        login.get_vdcode()
        login.loop_vdcode()
    ws_info['uid']=post_info['uid']=login.info['uid']
    def oncmd(data):
        cmd=data["cmd"]
        if cmd in msg_info['reject_msg']:return
        if cmd=="SYS_MSG":
            if "real_roomid" in data:join.check_and_join_smalltv(data["real_roomid"])
    def onlogin(data):
        print("login success")
    def onreconnect(code,data=None):
        global ws
        ws=data['dws']
    room_num=0
    def onheartbeat(num):
        if num>3:return
        close()
        if len(join.check_roomids)==0:
            global room_num
            join.each_hour_run()
            room_num+=1
            if room_num>3:return join.hour_run.cancel()
        ws_info['roomid']=post_info['roomid']=join.check_roomids.pop()
        main()
    def main():
        global ws
        try:
            print(ws_info['roomid'])
            ws = DanmuWebSocket(ws_info,'wss://broadcastlv.chat.bilibili.com/sub')
            ws.connect()
            ws.bind(onreconnect,onlogin,onheartbeat,oncmd)
            #ws.run_forever()
        except:
            close()
    def close():
        ws.close()
        save_py(config)
        s.save()
    main()
    
  • 相关阅读:
    仿MSN小类别滑动效果
    pku1674 Sorting by Swapping
    pku1456 Supermarket
    pku1083 Moving Tables
    pku1125 Stockbroker Grapevine
    pku2232 New StoneForfexCloth Game
    如何低头前进
    和两年前一样
    股票亏了
    早上选举了
  • 原文地址:https://www.cnblogs.com/blogs-qq2685054765/p/9734366.html
Copyright © 2011-2022 走看看