前言:闲时比较喜欢看斗鱼直播,刚好本周末有时间,于是边看直播边用 python 做了这个实时获取斗鱼弹幕脚本,并且实现语音播报弹幕信息,并将弹幕信息保存到数据库。
主要功能:
1、手动输入斗鱼主播房间号
2、获取弹幕信息(用户 uid,昵称,等级,弹幕)
3、弹幕信息存储到 mysql
4、语音播报弹幕消息
主要步骤:
一、手动输入斗鱼主播房间号
二、请求协议:格式消息头部
三、连接斗鱼官方提供的弹幕 API 接口
四、发送请求信息
五、处理返回的消息
1、判断消息类型,筛选到弹幕消息,其他过滤(例如弹幕、礼物等)
2、利用正则获取用户 uid,昵称,等级,弹幕
六、格式化打印弹幕信息
七、连接数据库 mysql
八、将弹幕信息写入数据库
九、语音播报弹幕消息
1、弹幕信息写入列表(字符串格式:用户昵称“说”弹幕)
2、语音播报列表第一位字符串
3、播报完成删除第一位
4、当列表内弹幕信息全部播放完成,则 sleep5 秒,继续循环。
十、持续向斗鱼弹幕 API 发送心跳包(斗鱼官方要求间隔 45 秒)
十一、多线程处理(弹幕获取、心跳包、语音播报)
一、手动输入斗鱼主播房间号
二、请求协议:格式消息头部
三、连接斗鱼官方提供的弹幕 API 接口
四、发送请求信息
五、处理返回的消息
1、判断消息类型,筛选到弹幕消息,其他过滤(例如弹幕、礼物等)
2、利用正则获取用户 uid,昵称,等级,弹幕
六、格式化打印弹幕信息
七、连接数据库 mysql
八、将弹幕信息写入数据库
九、语音播报弹幕消息
1、弹幕信息写入列表(字符串格式:用户昵称“说”弹幕)
2、语音播报列表第一位字符串
3、播报完成删除第一位
4、当列表内弹幕信息全部播放完成,则 sleep5 秒,继续循环。
十、持续向斗鱼弹幕 API 发送心跳包(斗鱼官方要求间隔 45 秒)
十一、多线程处理(弹幕获取、心跳包、语音播报)
主要调用模块:
import socket #
import re
from time import sleep
import datetime
import threading
import pymysql
import pyttsx3
其他:
1、接口协议:“斗鱼弹幕服务器第三方接入协议 v1.6.2”点击跳转到斗鱼官方接口协议论坛
2、对接问题:接口协议官方未进行更新,部分格式需要自己抓包测试。
源代码如下:
import socket
import re
from time import sleep
import datetime
import threading
import pymysql
import pyttsx3
class __danmu__(object):
speak_data = ['开始语音播报!',]
def speak(self,speak_data):
#语音识别
speaker = pyttsx3.init()
while True:
if speak_data:
speaker.say(speak_data[0])
speaker.runAndWait()
del speak_data[0]
else:
sleep(5)
pass
#输入斗鱼房间号
def room_id(self):
roomid = int(input("请输入斗鱼主播房间号:"))
return roomid
#连接斗鱼弹幕接口
def connect(self):
print("****** DouYu TV ******")
host = socket.gethostbyname("openbarrage.douyutv.com")
port = 8601
self.client = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
self.client.connect((host,port))
def send_msg(self,msg):
# print('发送信息!%s'%msg)
msg = msg + '\0' #斗鱼要求数据以'\0'结尾
msg = msg.encode('utf-8')
#消息头部长度为 12,不知道为啥加 8?
data_length = len(msg) + 8
code = 689#消息类型
#消息头部,根据斗鱼官方要求的消息头部格式:消息长度+消息长度+消息类型+加密字段(默认为 0)+保留字段(默认为 0)
msgHead = int.to_bytes(data_length,4,'little')+int.to_bytes(data_length,4,'little')+int.to_bytes(code,4,'little')
self.client.send(msgHead)
# print(len(msgHead))
sent = 0
#循环发送消息,保证消息全部发完。
while sent < len(msg):
n = self.client.send(msg[sent:])#返回已发送消息的长度
sent = sent + n
#连接数据库
def mysql_connect(self):
self.db = pymysql.connect(host="localhost", port=3306, user='root', passwd='123456789lt', db='douyu_danmu',
charset='utf8'
)
self.cursor = self.db.cursor()
#信息写入数据库操作!
def save_mysql(self,danmu_data):
sql = "INSERT INTO danmu (uid,level,username,danmu,room,datetime) VALUES (%d,%d,'%s','%s',%d,now());"%danmu_data
try:
self.cursor.execute(sql)
self.db.commit()
except:
print('此条信息写入数据库失败!( ',danmu_data,' )' )
pass
def danmu(self,room_id):
login = 'type@=loginreq/roomid@=%s/'%room_id
self.send_msg(login)
joingroup = 'type@=joingroup/rid@=%s/gid@=-9999/'%room_id
self.send_msg(joingroup)
while True:
content = self.client.recv(1024)
if self.judge_chatmsg(content):
username = self.user_name(content)
chatmsg = self.chat_msg(content)
useruid = self.user_uid(content)
userlevel = self.user_level(content)
date_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
danmu_data = (int(useruid),int(userlevel),username,chatmsg,int(room_id))
self.save_mysql(danmu_data)
print('%s <%s>%s : %s'%(date_time,userlevel,username,chatmsg))
self.speak_data.append('%s 说:%s'%(username,chatmsg))
else:
pass
# 判断是否是弹幕消息
def judge_chatmsg(self, content):
pattern = rb'type@=(.*)/rid@='
data_type = re.findall(pattern,content)
try:
if data_type[0] == b'chatmsg':
return True
else:
return False
except Exception as e:
return False
#获取用户昵称
def user_name(self,content):
'''
弹幕消息:
type@=chatmsg/rid@=301712/gid@=-9999/uid@=123456/nn@=test /txt@=666/level@=1/
判断 type,弹幕消息为 chatmsg,txt 为弹幕内容,nn 为用户昵称
'''
pattern = rb'nn@=(.*)/txt@'
username = re.findall(pattern,content)[0].decode('utf-8','ignore')
return username
#获取弹幕消息
def chat_msg(self,content):
pattern = rb'txt@=(.*)/cid@'
chatmsg = re.findall(pattern,content)[0].decode('utf-8','ignore')
return chatmsg
#获取用户 uid
def user_uid(self,content):
pattern = rb'uid@=(.*)/nn@'
useruid = re.findall(pattern, content)[0].decode('utf-8', 'ignore')
return useruid
#获取用户等级
def user_level(self,content):
pattern = rb'level@=(.*)/sahf@'
userlevel = re.findall(pattern, content)[0].decode('utf-8', 'ignore')
# print(userlevel)
return userlevel
#持续发送心跳包,保持连接
def keep_alive(self):
#斗鱼要求每隔 45 秒发送一次心跳包到弹幕服务器
while True:
msg = 'type@=mrkl/'#斗鱼新版心跳消息
self.send_msg(msg)
print("**************************************此处是持续心跳包****************************************")
sleep(45)
if __name__ == "__main__":
dm = __danmu__()
roomid = dm.room_id()
dm.connect()
dm.mysql_connect()
t1 = threading.Thread(target=dm.danmu,args=(roomid,))
t2 = threading.Thread(target=dm.keep_alive)
t3 = threading.Thread(target=dm.speak,args=(dm.speak_data,))
t1.start()
t2.start()
t3.start()
弹幕信息打印

弹幕信息保存到数据库

完结后续:看看什么时候再有时间,准备再写个自动定时送礼物的!这样就可以自动续牌子了,哈哈!