前言:之前写过一个斗鱼弹幕工具脚本,不是客户端版本,最近就根据之前的代码利用 PYQT5 写了个 exe 的可执行客户端,便于在其他电脑上使用。
说明:
1、之前的纯脚本代码链接:点击跳转
2、本版本删除了弹幕日志功能,不再写入到数据库。
3、本版本未修复部分 bug。a.部分弹幕不显示;b.客户端可能一段时间内不显示弹幕;c.客户端可能会崩溃。
4、bug 不想修复。。。嘿嘿嘿。
源代码
import socket import re from time import sleep import datetime import threading import pymysql #import pyttsx3 import sys import inspect import ctypes import win32com.client from PyQt5 import QtCore, QtGui, QtWidgets from PyQt5.QtWidgets import QMessageBox class __danmu__(object): state = False speak_data = [] def setupUi(self, mainWindow): mainWindow.setObjectName("mainWindow") mainWindow.resize(800, 600) self.centralwidget = QtWidgets.QWidget(mainWindow) self.centralwidget.setObjectName("centralwidget") self.room_id_title = QtWidgets.QLabel(self.centralwidget) self.room_id_title.setGeometry(QtCore.QRect(480, 510, 81, 41)) font = QtGui.QFont() font.setFamily("微软雅黑") font.setPointSize(12) self.room_id_title.setFont(font) self.room_id_title.setObjectName("room_id_title") self.uiroomid = QtWidgets.QLineEdit(self.centralwidget) self.uiroomid.setGeometry(QtCore.QRect(570, 510, 91, 41)) font = QtGui.QFont() font.setFamily("微软雅黑") font.setPointSize(12) self.uiroomid.setFont(font) self.uiroomid.setObjectName("room_id") self.start_go = QtWidgets.QPushButton(self.centralwidget) self.start_go.setGeometry(QtCore.QRect(680, 510, 101, 41)) font = QtGui.QFont() font.setFamily("微软雅黑") font.setPointSize(12) self.start_go.setFont(font) self.start_go.setObjectName("start_go") self.label = QtWidgets.QLabel(self.centralwidget) self.label.setGeometry(QtCore.QRect(220, 550, 281, 31)) font = QtGui.QFont() font.setFamily("微软雅黑 Light") font.setPointSize(10) self.label.setFont(font) self.label.setObjectName("label") self.note_label = QtWidgets.QLabel(self.centralwidget) self.note_label.setGeometry(QtCore.QRect(20, 510, 441, 41)) font = QtGui.QFont() font.setFamily("微软雅黑 Light") font.setPointSize(12) self.note_label.setFont(font) self.note_label.setText("") self.note_label.setObjectName("note_label") self.msg_list = QtWidgets.QTextBrowser(self.centralwidget) self.msg_list.setGeometry(QtCore.QRect(10, 10, 781, 481)) font = QtGui.QFont() font.setFamily("微软雅黑 Light") font.setPointSize(10) self.msg_list.setFont(font) self.msg_list.setObjectName("msg_list") # mainWindow.setCentralWidget(self.centralwidget) # self.statusbar = QtWidgets.QStatusBar(mainWindow) # self.statusbar.setObjectName("statusbar") # mainWindow.setStatusBar(self.statusbar) self.retranslateUi(mainWindow) QtCore.QMetaObject.connectSlotsByName(mainWindow) self.start_go.clicked.connect(self.check_state) def retranslateUi(self, mainWindow): _translate = QtCore.QCoreApplication.translate mainWindow.setWindowTitle(_translate("mainWindow", "斗鱼弹幕工具")) self.room_id_title.setText(_translate("mainWindow", "Room ID:")) self.uiroomid.setText(_translate("mainWindow", "984395")) self.start_go.setText(_translate("mainWindow", "连接服务")) self.label.setText(_translate("mainWindow", "Power By Taen & Designer By Taen weask.cc")) self.note_label.setText(_translate("mainWindow", "服务未开启!")) # speak_data = ['开始语音播报!',] #语音播报 def speak(self,speak_data): speaker = win32com.client.Dispatch("SAPI.SpVoice") while True: if speak_data: speaker.Speak(speak_data[0]) del speak_data[0] else: sleep(5) pass #语音识别 """ speaker = pyttsx3.init() while True: if speak_data: speaker.say(speak_data[0]) speaker.runAndWait() del speak_data[0] else: sleep(5) pass """ #关闭线程程序 def _async_raise(self,tid,exctype): """raises the exception, performs cleanup if needed""" if not inspect.isclass(exctype): exctype = type(exctype) res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype)) if res == 0: raise ValueError("invalid thread id") elif res != 1: # """if it returns a number greater than one, you're in trouble, # and you should call it again with exc=NULL to revert the effect""" ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None) raise SystemError("PyThreadState_SetAsyncExc failed") #关闭线程接口 def stop_thread(self,thread): self._async_raise(thread.ident, SystemExit) #信号槽入口,判断连接状态 def check_state(self): if self.state is True: self.stop_thread(self.t1) self.stop_thread(self.t2) self.stop_thread(self.t3) self.start_go.setText("开启服务") self.note_label.setText("服务已关闭!") self.state = False else: self.room_id() #开始斗鱼多线程 def room_id(self): # roomid = int(input("请输入斗鱼主播房间号:")) roomid = self.uiroomid.text() try: roomid = int(roomid) self.gogo(roomid) self.start_go.setText('关闭服务') self.state = True except Exception as notices: # notices = "请输入斗鱼房间号或检查网络!" self.notice(notices) self.state = False def notice(self,notices): nt = QMessageBox.information(None, "消息提示", "%s" %notices, QMessageBox.Yes, QMessageBox.Yes) if nt == QMessageBox.Yes: pass else: pass #连接斗鱼弹幕接口 def connect(self): # print("****** DouYu TV ******") host = socket.gethostbyname("openbarrage.douyutv.com") port = 8601 self.client = socket.socket(socket.AF_INET,socket.SOCK_STREAM) self.client.connect((host,port)) def send_msg(self,msg): # print('发送信息!%s'%msg) msg = msg + '\0' #斗鱼要求数据以'\0'结尾 msg = msg.encode('utf-8') #消息头部长度为 12,不知道为啥加 8? data_length = len(msg) + 8 code = 689#消息类型 #消息头部,根据斗鱼官方要求的消息头部格式:消息长度+消息长度+消息类型+加密字段(默认为 0)+保留字段(默认为 0) msgHead = int.to_bytes(data_length,4,'little')+int.to_bytes(data_length,4,'little')+int.to_bytes(code,4,'little') self.client.send(msgHead) # print(len(msgHead)) sent = 0 #循环发送消息,保证消息全部发完。 while sent < len(msg): n = self.client.send(msg[sent:])#返回已发送消息的长度 sent = sent + n #连接数据库 def mysql_connect(self): self.db = pymysql.connect(host="localhost", port=3306, user='root', passwd='123456789lt', db='douyu_danmu', charset='utf8' ) self.cursor = self.db.cursor() #信息写入数据库操作! def save_mysql(self,danmu_data): sql = "INSERT INTO danmu (uid,level,username,danmu,room,datetime) VALUES (%d,%d,'%s','%s',%d,now());"%danmu_data try: self.cursor.execute(sql) self.db.commit() except: print('此条信息写入数据库失败!( ',danmu_data,' )' ) pass def save_danmu(self,msg): f = open('danmu.log', 'w') msg = msg.encode("utf-8") f.write(msg) f.close() #输出弹幕 def danmu(self,room_id): login = 'type@=loginreq/roomid@=%s/'%room_id self.send_msg(login) joingroup = 'type@=joingroup/rid@=%s/gid@=-9999/'%room_id self.send_msg(joingroup) while True: try: content = self.client.recv(1024) if self.judge_chatmsg(content): username = self.user_name(content) chatmsg = self.chat_msg(content) useruid = self.user_uid(content) userlevel = self.user_level(content) date_time = datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S") danmu_data = (int(useruid),int(userlevel),username,chatmsg,int(room_id)) # self.save_mysql(danmu_data) # print('%s <%s>%s : %s'%(date_time,userlevel,username,chatmsg)) self.danmu_msg = str('%s <%s>%s : %s'%(date_time,userlevel,username,chatmsg)) # self.save_danmu(self.danmu_msg) self.msg_list.append(self.danmu_msg) self.note_label.setText(self.danmu_msg) self.speak_data.append('%s 说:%s'%(username,chatmsg)) #弹幕显示窗口滚动条持续刷新 self.msg_list.moveCursor(self.msg_list.textCursor().End) else: pass except Exception as err: # err = "网络故障或其他错误!" self.note_label.setText(err) self.notice(err) # 判断是否是弹幕消息 def judge_chatmsg(self, content): pattern = rb'type@=(.*)/rid@=' data_type = re.findall(pattern,content) try: if data_type[0] == b'chatmsg': return True else: return False except Exception as e: return False #获取用户昵称 def user_name(self,content): ''' 弹幕消息: type@=chatmsg/rid@=301712/gid@=-9999/uid@=123456/nn@=test /txt@=666/level@=1/ 判断 type,弹幕消息为 chatmsg,txt 为弹幕内容,nn 为用户昵称 ''' try: pattern = rb'nn@=(.*)/txt@' username = re.findall(pattern,content)[0].decode('utf-8','ignore') except Exception: username = 'error' return username #获取弹幕消息 def chat_msg(self,content): try: pattern = rb'txt@=(.*)/cid@' chatmsg = re.findall(pattern,content)[0].decode('utf-8','ignore') except Exception: chatmsg = 'error' return chatmsg #获取用户 uid def user_uid(self,content): pattern = rb'uid@=(.*)/nn@' useruid = re.findall(pattern, content)[0].decode('utf-8', 'ignore') return useruid #获取用户等级 def user_level(self,content): pattern = rb'level@=(.*)/sahf@' userlevel = re.findall(pattern, content)[0].decode('utf-8', 'ignore') # print(userlevel) return userlevel #持续发送心跳包,保持连接 def keep_alive(self): #斗鱼要求每隔 45 秒发送一次心跳包到弹幕服务器 n_heart = 1 while True: msg = 'type@=mrkl/'#斗鱼新版心跳消息 self.send_msg(msg) # print("**************************************此处是持续心跳包(%s)****************************************"%n_heart) self.note_label.setText("斗鱼持续心跳包(%s)"%n_heart) n_heart = n_heart + 1 sleep(45) #程序入口 def gogo(self,roomid): self.connect() # self.mysql_connect() self.t1 = threading.Thread(target=dm.danmu, args=(roomid,)) self.t2 = threading.Thread(target=dm.keep_alive) self.t3 = threading.Thread(target=dm.speak,args=(self.speak_data,)) self.t1.start() self.t2.start() self.t3.start() if __name__ == "__main__": app = QtWidgets.QApplication(sys.argv) mainWindow = QtWidgets.QWidget() dm = __danmu__() dm.setupUi(mainWindow) mainWindow.show() sys.exit(app.exec_()) # dm.connect() # dm.mysql_connect() # t1 = threading.Thread(target=dm.danmu,args=(roomid,)) # t2 = threading.Thread(target=dm.keep_alive) # t3 = threading.Thread(target=dm.speak,args=(dm.speak_data,)) # t1.start() # t2.start() # t3.start()