前言:之前写过一个斗鱼弹幕工具脚本,不是客户端版本,最近就根据之前的代码利用 PYQT5 写了个 exe 的可执行客户端,便于在其他电脑上使用。
说明:
1、之前的纯脚本代码链接:点击跳转
2、本版本删除了弹幕日志功能,不再写入到数据库。
3、本版本未修复部分 bug。a.部分弹幕不显示;b.客户端可能一段时间内不显示弹幕;c.客户端可能会崩溃。
4、bug 不想修复。。。嘿嘿嘿。


源代码
import socket
import re
from time import sleep
import datetime
import threading
import pymysql
#import pyttsx3
import sys
import inspect
import ctypes
import win32com.client
from PyQt5 import QtCore, QtGui, QtWidgets
from PyQt5.QtWidgets import QMessageBox
class __danmu__(object):
state = False
speak_data = []
def setupUi(self, mainWindow):
mainWindow.setObjectName("mainWindow")
mainWindow.resize(800, 600)
self.centralwidget = QtWidgets.QWidget(mainWindow)
self.centralwidget.setObjectName("centralwidget")
self.room_id_title = QtWidgets.QLabel(self.centralwidget)
self.room_id_title.setGeometry(QtCore.QRect(480, 510, 81, 41))
font = QtGui.QFont()
font.setFamily("微软雅黑")
font.setPointSize(12)
self.room_id_title.setFont(font)
self.room_id_title.setObjectName("room_id_title")
self.uiroomid = QtWidgets.QLineEdit(self.centralwidget)
self.uiroomid.setGeometry(QtCore.QRect(570, 510, 91, 41))
font = QtGui.QFont()
font.setFamily("微软雅黑")
font.setPointSize(12)
self.uiroomid.setFont(font)
self.uiroomid.setObjectName("room_id")
self.start_go = QtWidgets.QPushButton(self.centralwidget)
self.start_go.setGeometry(QtCore.QRect(680, 510, 101, 41))
font = QtGui.QFont()
font.setFamily("微软雅黑")
font.setPointSize(12)
self.start_go.setFont(font)
self.start_go.setObjectName("start_go")
self.label = QtWidgets.QLabel(self.centralwidget)
self.label.setGeometry(QtCore.QRect(220, 550, 281, 31))
font = QtGui.QFont()
font.setFamily("微软雅黑 Light")
font.setPointSize(10)
self.label.setFont(font)
self.label.setObjectName("label")
self.note_label = QtWidgets.QLabel(self.centralwidget)
self.note_label.setGeometry(QtCore.QRect(20, 510, 441, 41))
font = QtGui.QFont()
font.setFamily("微软雅黑 Light")
font.setPointSize(12)
self.note_label.setFont(font)
self.note_label.setText("")
self.note_label.setObjectName("note_label")
self.msg_list = QtWidgets.QTextBrowser(self.centralwidget)
self.msg_list.setGeometry(QtCore.QRect(10, 10, 781, 481))
font = QtGui.QFont()
font.setFamily("微软雅黑 Light")
font.setPointSize(10)
self.msg_list.setFont(font)
self.msg_list.setObjectName("msg_list")
# mainWindow.setCentralWidget(self.centralwidget)
# self.statusbar = QtWidgets.QStatusBar(mainWindow)
# self.statusbar.setObjectName("statusbar")
# mainWindow.setStatusBar(self.statusbar)
self.retranslateUi(mainWindow)
QtCore.QMetaObject.connectSlotsByName(mainWindow)
self.start_go.clicked.connect(self.check_state)
def retranslateUi(self, mainWindow):
_translate = QtCore.QCoreApplication.translate
mainWindow.setWindowTitle(_translate("mainWindow", "斗鱼弹幕工具"))
self.room_id_title.setText(_translate("mainWindow", "Room ID:"))
self.uiroomid.setText(_translate("mainWindow", "984395"))
self.start_go.setText(_translate("mainWindow", "连接服务"))
self.label.setText(_translate("mainWindow", "Power By Taen & Designer By Taen weask.cc"))
self.note_label.setText(_translate("mainWindow", "服务未开启!"))
# speak_data = ['开始语音播报!',]
#语音播报
def speak(self,speak_data):
speaker = win32com.client.Dispatch("SAPI.SpVoice")
while True:
if speak_data:
speaker.Speak(speak_data[0])
del speak_data[0]
else:
sleep(5)
pass
#语音识别
"""
speaker = pyttsx3.init()
while True:
if speak_data:
speaker.say(speak_data[0])
speaker.runAndWait()
del speak_data[0]
else:
sleep(5)
pass
"""
#关闭线程程序
def _async_raise(self,tid,exctype):
"""raises the exception, performs cleanup if needed"""
if not inspect.isclass(exctype):
exctype = type(exctype)
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
if res == 0:
raise ValueError("invalid thread id")
elif res != 1:
# """if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"""
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
raise SystemError("PyThreadState_SetAsyncExc failed")
#关闭线程接口
def stop_thread(self,thread):
self._async_raise(thread.ident, SystemExit)
#信号槽入口,判断连接状态
def check_state(self):
if self.state is True:
self.stop_thread(self.t1)
self.stop_thread(self.t2)
self.stop_thread(self.t3)
self.start_go.setText("开启服务")
self.note_label.setText("服务已关闭!")
self.state = False
else:
self.room_id()
#开始斗鱼多线程
def room_id(self):
# roomid = int(input("请输入斗鱼主播房间号:"))
roomid = self.uiroomid.text()
try:
roomid = int(roomid)
self.gogo(roomid)
self.start_go.setText('关闭服务')
self.state = True
except Exception as notices:
# notices = "请输入斗鱼房间号或检查网络!"
self.notice(notices)
self.state = False
def notice(self,notices):
nt = QMessageBox.information(None, "消息提示", "%s" %notices, QMessageBox.Yes, QMessageBox.Yes)
if nt == QMessageBox.Yes:
pass
else:
pass
#连接斗鱼弹幕接口
def connect(self):
# print("****** DouYu TV ******")
host = socket.gethostbyname("openbarrage.douyutv.com")
port = 8601
self.client = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
self.client.connect((host,port))
def send_msg(self,msg):
# print('发送信息!%s'%msg)
msg = msg + '\0' #斗鱼要求数据以'\0'结尾
msg = msg.encode('utf-8')
#消息头部长度为 12,不知道为啥加 8?
data_length = len(msg) + 8
code = 689#消息类型
#消息头部,根据斗鱼官方要求的消息头部格式:消息长度+消息长度+消息类型+加密字段(默认为 0)+保留字段(默认为 0)
msgHead = int.to_bytes(data_length,4,'little')+int.to_bytes(data_length,4,'little')+int.to_bytes(code,4,'little')
self.client.send(msgHead)
# print(len(msgHead))
sent = 0
#循环发送消息,保证消息全部发完。
while sent < len(msg):
n = self.client.send(msg[sent:])#返回已发送消息的长度
sent = sent + n
#连接数据库
def mysql_connect(self):
self.db = pymysql.connect(host="localhost", port=3306, user='root', passwd='123456789lt', db='douyu_danmu',
charset='utf8'
)
self.cursor = self.db.cursor()
#信息写入数据库操作!
def save_mysql(self,danmu_data):
sql = "INSERT INTO danmu (uid,level,username,danmu,room,datetime) VALUES (%d,%d,'%s','%s',%d,now());"%danmu_data
try:
self.cursor.execute(sql)
self.db.commit()
except:
print('此条信息写入数据库失败!( ',danmu_data,' )' )
pass
def save_danmu(self,msg):
f = open('danmu.log', 'w')
msg = msg.encode("utf-8")
f.write(msg)
f.close()
#输出弹幕
def danmu(self,room_id):
login = 'type@=loginreq/roomid@=%s/'%room_id
self.send_msg(login)
joingroup = 'type@=joingroup/rid@=%s/gid@=-9999/'%room_id
self.send_msg(joingroup)
while True:
try:
content = self.client.recv(1024)
if self.judge_chatmsg(content):
username = self.user_name(content)
chatmsg = self.chat_msg(content)
useruid = self.user_uid(content)
userlevel = self.user_level(content)
date_time = datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")
danmu_data = (int(useruid),int(userlevel),username,chatmsg,int(room_id))
# self.save_mysql(danmu_data)
# print('%s <%s>%s : %s'%(date_time,userlevel,username,chatmsg))
self.danmu_msg = str('%s <%s>%s : %s'%(date_time,userlevel,username,chatmsg))
# self.save_danmu(self.danmu_msg)
self.msg_list.append(self.danmu_msg)
self.note_label.setText(self.danmu_msg)
self.speak_data.append('%s 说:%s'%(username,chatmsg))
#弹幕显示窗口滚动条持续刷新
self.msg_list.moveCursor(self.msg_list.textCursor().End)
else:
pass
except Exception as err:
# err = "网络故障或其他错误!"
self.note_label.setText(err)
self.notice(err)
# 判断是否是弹幕消息
def judge_chatmsg(self, content):
pattern = rb'type@=(.*)/rid@='
data_type = re.findall(pattern,content)
try:
if data_type[0] == b'chatmsg':
return True
else:
return False
except Exception as e:
return False
#获取用户昵称
def user_name(self,content):
'''
弹幕消息:
type@=chatmsg/rid@=301712/gid@=-9999/uid@=123456/nn@=test /txt@=666/level@=1/
判断 type,弹幕消息为 chatmsg,txt 为弹幕内容,nn 为用户昵称
'''
try:
pattern = rb'nn@=(.*)/txt@'
username = re.findall(pattern,content)[0].decode('utf-8','ignore')
except Exception:
username = 'error'
return username
#获取弹幕消息
def chat_msg(self,content):
try:
pattern = rb'txt@=(.*)/cid@'
chatmsg = re.findall(pattern,content)[0].decode('utf-8','ignore')
except Exception:
chatmsg = 'error'
return chatmsg
#获取用户 uid
def user_uid(self,content):
pattern = rb'uid@=(.*)/nn@'
useruid = re.findall(pattern, content)[0].decode('utf-8', 'ignore')
return useruid
#获取用户等级
def user_level(self,content):
pattern = rb'level@=(.*)/sahf@'
userlevel = re.findall(pattern, content)[0].decode('utf-8', 'ignore')
# print(userlevel)
return userlevel
#持续发送心跳包,保持连接
def keep_alive(self):
#斗鱼要求每隔 45 秒发送一次心跳包到弹幕服务器
n_heart = 1
while True:
msg = 'type@=mrkl/'#斗鱼新版心跳消息
self.send_msg(msg)
# print("**************************************此处是持续心跳包(%s)****************************************"%n_heart)
self.note_label.setText("斗鱼持续心跳包(%s)"%n_heart)
n_heart = n_heart + 1
sleep(45)
#程序入口
def gogo(self,roomid):
self.connect()
# self.mysql_connect()
self.t1 = threading.Thread(target=dm.danmu, args=(roomid,))
self.t2 = threading.Thread(target=dm.keep_alive)
self.t3 = threading.Thread(target=dm.speak,args=(self.speak_data,))
self.t1.start()
self.t2.start()
self.t3.start()
if __name__ == "__main__":
app = QtWidgets.QApplication(sys.argv)
mainWindow = QtWidgets.QWidget()
dm = __danmu__()
dm.setupUi(mainWindow)
mainWindow.show()
sys.exit(app.exec_())
# dm.connect()
# dm.mysql_connect()
# t1 = threading.Thread(target=dm.danmu,args=(roomid,))
# t2 = threading.Thread(target=dm.keep_alive)
# t3 = threading.Thread(target=dm.speak,args=(dm.speak_data,))
# t1.start()
# t2.start()
# t3.start()