自动参加小电视抽奖
自动领取舰长、提督奖励
运行一天大概能获取1800包辣条
However. 每次凌晨B站后台都会检测账号脚本,一次抢太多很容易被封号一周。
可能是依据最大的连续领奖次数判断的吧
一、抽奖流程
1. 分析页面可知
每当有人赠送小电视之类的礼物时
系统就会发送广播弹幕(SYS_MSG)
2. 点击广播跳转到抽奖房间
F12清空网络请求
接着点击抽奖
就找到了抽奖请求
POST https://api.live.bilibili.com/gift/v3/smalltv/join
参数
响应
3. 分析post参数
roomid系统广播里就有
raffleId是抽奖关键参数
type 固定为 Gift
csrf_token 来自于cookie值bili_jct (可为空)
visit_id (可为空)
4. 刷新抽奖页面
F12过滤得到XHR 请求
逐个分析返回值和url请求
发现raffleId请求
GET https://api.live.bilibili.com/gift/v3/smalltv/check
data={
'roomid':roomid
}
响应
分析结果
- 小电视抽奖和月色真美、摩天大楼
获取raffleId的请求略有不同
https://api.live.bilibili.com/gift/v3/smalltv/check
https://api.live.bilibili.com/gift/v2/smalltv/check
参加抽奖的请求相同
二、代码实现
api.py 添加Gift类
class Gift():
def check_smalltv(self,s,roomid,ver='v2'):
url='https://api.live.bilibili.com/gift/'+ver+'/smalltv/check'
data={'roomid': roomid}
return ajax(s,url,'GET',data)
def join_smalltv(self,s,roomid,raffleId,csrf_token,visit_id='', type = 'Gift'):
url= 'https://api.live.bilibili.com/gift/v3/smalltv/join'
data={
'roomid': roomid,
'raffleId': raffleId,
'type': type,
'csrf_token': csrf_token,
'visit_id': visit_id
}
return ajax(s,url,'POST',data)
def check_guard(self,s,roomid,_type='_guard'):
url='https://api.live.bilibili.com/lottery/v1/Lottery/check'+_type+'?roomid='+str(roomid)
return ajax(s,url,'GET');
def join_guard(self,s,roomid,id,csrf_token,type='guard',visit_id=None):
url= 'https://api.live.bilibili.com/lottery/v2/Lottery/join'
data= {
'roomid': roomid,
'id': id,
'type': type,
'csrf_token': csrf_token,
'visit_id': visit_id
}
return ajax(s,url,'POST',data)
server.py添加抽奖类
class Join:
def __init__(self,Info,s):
self.Info = Info
self.check_roomids=[Info['roomid']]
self.raffleId=0
self.hour_run=Timer(7.2e3,self.each_hour_run)
self.s=s
self.num=1
def join_smalltv(self,obj):
if self.num==50:
self.num=0
print('[跳过抽奖]['+obj['title']+']已跳过抽奖(roomid=' + str(obj['roomid'])+ ',raffleId=' + str(obj['raffleId'])+ ')')
return
self.num+=1
res=API.Gift.join_smalltv(self.s,obj['roomid'], obj['raffleId'],self.Info['csrf_token'],self.Info['visit_id'])
code=res['code']
if code==0:Logger.info('[自动抽奖]['+obj['title']+']已参加抽奖(roomid=' + str(obj['roomid'])+ ',raffleId=' + str(obj['raffleId'])+ ')')
elif code==400:Logger.warning('[自动抽奖][礼物抽奖]访问被拒绝,您的帐号可能已经被封禁,已停止')
elif code==65531:Logger.info('[自动抽奖][礼物抽奖]已参加抽奖(roomid=' + str(obj['roomid'])+ ',raffleId=' + str(obj['raffleId'])+ ')')
else:Logger.error('[自动抽奖][礼物抽奖]已参加抽奖(roomid=' + str(obj['roomid'])+ ',raffleId=' + str(obj['raffleId'])+ ')'+res['msg'])
def push_and_check_roomid(self,roomid):
self.push_roomid(roomid)
for roomid in self.check_roomids:self.check_and_join_guard(roomid)
def push_roomid(self,roomid):
if roomid in self.check_roomids:return
self.check_roomids.append(roomid)
def each_hour_run(self):
res=API.Room.room_rank(self.s,"master_realtime_hour",'areaid_realtime_hour')
for room in res['data']['list']:self.push_roomid(room['roomid'])
res=API.Room.room_rank(self.s,"week_star_master",'week')
for room in res['data']['list']:self.push_roomid(room['roomid'])
def check_and_join_smalltv(self,roomid):
res=API.Gift.check_smalltv(self.s,roomid,'v2')
if res['code']==-400:return
for data in res['data']:
if data['raffleId']<=self.raffleId:continue
self.raffleId=data['raffleId']
self.join_smalltv({'roomid':roomid,'raffleId':self.raffleId,'title':data['title']})
if len(res['data'])==0:
res=API.Gift.check_smalltv(self.s,roomid,'v3')
for data in res['data']['list']:
if data['raffleId']<=self.raffleId:continue
self.raffleId=data['raffleId']
self.join_smalltv({'roomid':roomid,'raffleId':self.raffleId,'title':data['title']})
self.push_and_check_roomid(roomid)
def join_guard(self,obj):
res=API.Gift.join_guard(self.s,obj['roomid'],obj['id'],self.Info['csrf_token'])
print(obj['roomid'])
Logger.info(res['data']['message'])
def check_and_join_guard(self,roomid):
res=API.Gift.check_guard(self.s,roomid)
if len(res['data'])==0:return
for guard in res['data']:self.join_guard({'id':guard['id'],'roomid':roomid})
res=API.Gift.check_guard(self.s,roomid,'')
if len(res['data']['guard'])==0:return
for guard in res['data']['guard']:self.join_guard({'id':guard['id'],'roomid':roomid})
utils.py添加日志类
@singleton
class Logger:
def __init__(self):
handlers = {
logging.NOTSET: "logs/notset.logs",
logging.DEBUG: "logs/debug.logs",
logging.INFO: "logs/info.logs",
logging.WARNING: "logs/warning.logs",
logging.ERROR: "logs/error.logs",
logging.CRITICAL: "logs/critical.logs"
}
self.__loggers = {}
logLevels = handlers.keys()
fmt = logging.Formatter('%(asctime)s [%(levelname)s]: %(message)s')
for level in logLevels: #创建logger
logger = logging.getLogger(str(level))
logger.setLevel(level)
#创建hander用于写日日志文件
log_path = os.path.abspath(handlers[level])
fh = logging.FileHandler(log_path) #定义日志的输出格式
fh.setFormatter(fmt)
fh.setLevel(level) #给logger添加hander
logger.addHandler(fh)
self.__loggers.update({level: logger})
def info(self, message):
self.__loggers[logging.INFO].info(message)
def error(self, message):
self.__loggers[logging.ERROR].error(message)
def warning(self, message):
self.__loggers[logging.WARNING].warning(message)
def debug(self, message):
self.__loggers[logging.DEBUG].debug(message)
def critical(self, message):
self.__loggers[logging.CRITICAL].critical(message)
utils.py添加 保存配置文件函数
def save_py(moudle):
keys,py=dir(moudle),''
for key in keys:
if '__' in key:continue
py+=key+'='+json.dumps(getattr(moudle,key),indent=4, separators=(',', ':'))+'
'
with open(moudle.__name__+".py",'w') as f:f.write(py)
添加配置文件 config.py
headers={
"User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:62.0) Gecko/20100101 Firefox/62.0",
"Accept":"application/json, text/plain, */*",
"Accept-Language":"zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
"Accept-Encoding":"gzip, deflate, br",
"Referer":"https://live.bilibili.com/",
"Origin":"https://live.bilibili.com",
"Connection":"keep-alive"
}
msg_info={
"reject_msg":[
"SEND_GIFT",
"WELCOME",
"WELCOME_GUARD",
"COMBO_SEND",
"ENTRY_EFFECT",
"NOTICE_MSG",
"DANMU_MSG",
"ROOM_RANK",
"WISH_BOTTLE",
"COMBO_END"
],
"receive_msg":[
"SYS_MSG",
"SPECIAL_GIFT"
]
}
post_info={
"csrf_token":"",
"visit_id":"",
"roomid":392351,
"uid":380741418
}
ws_info={
"uid":380741418,
"roomid":392351,
"protover":1,
"platform":"web",
"clientver":"1.4.0"
}
主程序
import os
os.chdir('/data/jupyter/root/软件/Bilibili')
from jquery import http,session
from servers import Join,Login
from DanmuWS import DanmuWebSocket
from config import *
from utils import save_py
import config
%matplotlib inline
#save_py(config)
s=session(headers,'config/2685054765.txt')
login=Login(s)
join=Join(post_info,s)
ws=None
while not login.isLogin():
login.get_vdcode()
login.loop_vdcode()
ws_info['uid']=post_info['uid']=login.info['uid']
def oncmd(data):
cmd=data["cmd"]
if cmd in msg_info['reject_msg']:return
if cmd=="SYS_MSG":
if "real_roomid" in data:join.check_and_join_smalltv(data["real_roomid"])
def onlogin(data):
print("login success")
def onreconnect(code,data=None):
global ws
ws=data['dws']
room_num=0
def onheartbeat(num):
if num>3:return
close()
if len(join.check_roomids)==0:
global room_num
join.each_hour_run()
room_num+=1
if room_num>3:return join.hour_run.cancel()
ws_info['roomid']=post_info['roomid']=join.check_roomids.pop()
main()
def main():
global ws
try:
print(ws_info['roomid'])
ws = DanmuWebSocket(ws_info,'wss://broadcastlv.chat.bilibili.com/sub')
ws.connect()
ws.bind(onreconnect,onlogin,onheartbeat,oncmd)
#ws.run_forever()
except:
close()
def close():
ws.close()
save_py(config)
s.save()
main()