前言:闲时比较喜欢看斗鱼直播,刚好本周末有时间,于是边看直播边用 python 做了这个实时获取斗鱼弹幕脚本,并且实现语音播报弹幕信息,并将弹幕信息保存到数据库。
主要功能:
1、手动输入斗鱼主播房间号
2、获取弹幕信息(用户 uid,昵称,等级,弹幕)
3、弹幕信息存储到 mysql
4、语音播报弹幕消息
主要步骤:
一、手动输入斗鱼主播房间号
二、请求协议:格式消息头部
三、连接斗鱼官方提供的弹幕 API 接口
四、发送请求信息
五、处理返回的消息
1、判断消息类型,筛选到弹幕消息,其他过滤(例如弹幕、礼物等)
2、利用正则获取用户 uid,昵称,等级,弹幕
六、格式化打印弹幕信息
七、连接数据库 mysql
八、将弹幕信息写入数据库
九、语音播报弹幕消息
1、弹幕信息写入列表(字符串格式:用户昵称“说”弹幕)
2、语音播报列表第一位字符串
3、播报完成删除第一位
4、当列表内弹幕信息全部播放完成,则 sleep5 秒,继续循环。
十、持续向斗鱼弹幕 API 发送心跳包(斗鱼官方要求间隔 45 秒)
十一、多线程处理(弹幕获取、心跳包、语音播报)
一、手动输入斗鱼主播房间号
二、请求协议:格式消息头部
三、连接斗鱼官方提供的弹幕 API 接口
四、发送请求信息
五、处理返回的消息
1、判断消息类型,筛选到弹幕消息,其他过滤(例如弹幕、礼物等)
2、利用正则获取用户 uid,昵称,等级,弹幕
六、格式化打印弹幕信息
七、连接数据库 mysql
八、将弹幕信息写入数据库
九、语音播报弹幕消息
1、弹幕信息写入列表(字符串格式:用户昵称“说”弹幕)
2、语音播报列表第一位字符串
3、播报完成删除第一位
4、当列表内弹幕信息全部播放完成,则 sleep5 秒,继续循环。
十、持续向斗鱼弹幕 API 发送心跳包(斗鱼官方要求间隔 45 秒)
十一、多线程处理(弹幕获取、心跳包、语音播报)
主要调用模块:
import socket #
import re
from time import sleep
import datetime
import threading
import pymysql
import pyttsx3
其他:
1、接口协议:“斗鱼弹幕服务器第三方接入协议 v1.6.2”点击跳转到斗鱼官方接口协议论坛
2、对接问题:接口协议官方未进行更新,部分格式需要自己抓包测试。
源代码如下:
import socket import re from time import sleep import datetime import threading import pymysql import pyttsx3 class __danmu__(object): speak_data = ['开始语音播报!',] def speak(self,speak_data): #语音识别 speaker = pyttsx3.init() while True: if speak_data: speaker.say(speak_data[0]) speaker.runAndWait() del speak_data[0] else: sleep(5) pass #输入斗鱼房间号 def room_id(self): roomid = int(input("请输入斗鱼主播房间号:")) return roomid #连接斗鱼弹幕接口 def connect(self): print("****** DouYu TV ******") host = socket.gethostbyname("openbarrage.douyutv.com") port = 8601 self.client = socket.socket(socket.AF_INET,socket.SOCK_STREAM) self.client.connect((host,port)) def send_msg(self,msg): # print('发送信息!%s'%msg) msg = msg + '\0' #斗鱼要求数据以'\0'结尾 msg = msg.encode('utf-8') #消息头部长度为 12,不知道为啥加 8? data_length = len(msg) + 8 code = 689#消息类型 #消息头部,根据斗鱼官方要求的消息头部格式:消息长度+消息长度+消息类型+加密字段(默认为 0)+保留字段(默认为 0) msgHead = int.to_bytes(data_length,4,'little')+int.to_bytes(data_length,4,'little')+int.to_bytes(code,4,'little') self.client.send(msgHead) # print(len(msgHead)) sent = 0 #循环发送消息,保证消息全部发完。 while sent < len(msg): n = self.client.send(msg[sent:])#返回已发送消息的长度 sent = sent + n #连接数据库 def mysql_connect(self): self.db = pymysql.connect(host="localhost", port=3306, user='root', passwd='123456789lt', db='douyu_danmu', charset='utf8' ) self.cursor = self.db.cursor() #信息写入数据库操作! def save_mysql(self,danmu_data): sql = "INSERT INTO danmu (uid,level,username,danmu,room,datetime) VALUES (%d,%d,'%s','%s',%d,now());"%danmu_data try: self.cursor.execute(sql) self.db.commit() except: print('此条信息写入数据库失败!( ',danmu_data,' )' ) pass def danmu(self,room_id): login = 'type@=loginreq/roomid@=%s/'%room_id self.send_msg(login) joingroup = 'type@=joingroup/rid@=%s/gid@=-9999/'%room_id self.send_msg(joingroup) while True: content = self.client.recv(1024) if self.judge_chatmsg(content): username = self.user_name(content) chatmsg = self.chat_msg(content) useruid = self.user_uid(content) userlevel = self.user_level(content) date_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") danmu_data = (int(useruid),int(userlevel),username,chatmsg,int(room_id)) self.save_mysql(danmu_data) print('%s <%s>%s : %s'%(date_time,userlevel,username,chatmsg)) self.speak_data.append('%s 说:%s'%(username,chatmsg)) else: pass # 判断是否是弹幕消息 def judge_chatmsg(self, content): pattern = rb'type@=(.*)/rid@=' data_type = re.findall(pattern,content) try: if data_type[0] == b'chatmsg': return True else: return False except Exception as e: return False #获取用户昵称 def user_name(self,content): ''' 弹幕消息: type@=chatmsg/rid@=301712/gid@=-9999/uid@=123456/nn@=test /txt@=666/level@=1/ 判断 type,弹幕消息为 chatmsg,txt 为弹幕内容,nn 为用户昵称 ''' pattern = rb'nn@=(.*)/txt@' username = re.findall(pattern,content)[0].decode('utf-8','ignore') return username #获取弹幕消息 def chat_msg(self,content): pattern = rb'txt@=(.*)/cid@' chatmsg = re.findall(pattern,content)[0].decode('utf-8','ignore') return chatmsg #获取用户 uid def user_uid(self,content): pattern = rb'uid@=(.*)/nn@' useruid = re.findall(pattern, content)[0].decode('utf-8', 'ignore') return useruid #获取用户等级 def user_level(self,content): pattern = rb'level@=(.*)/sahf@' userlevel = re.findall(pattern, content)[0].decode('utf-8', 'ignore') # print(userlevel) return userlevel #持续发送心跳包,保持连接 def keep_alive(self): #斗鱼要求每隔 45 秒发送一次心跳包到弹幕服务器 while True: msg = 'type@=mrkl/'#斗鱼新版心跳消息 self.send_msg(msg) print("**************************************此处是持续心跳包****************************************") sleep(45) if __name__ == "__main__": dm = __danmu__() roomid = dm.room_id() dm.connect() dm.mysql_connect() t1 = threading.Thread(target=dm.danmu,args=(roomid,)) t2 = threading.Thread(target=dm.keep_alive) t3 = threading.Thread(target=dm.speak,args=(dm.speak_data,)) t1.start() t2.start() t3.start()
弹幕信息打印
弹幕信息保存到数据库
完结后续:看看什么时候再有时间,准备再写个自动定时送礼物的!这样就可以自动续牌子了,哈哈!