前言:公司做的智能分析系统,为了将消息推送给第三方,故使用 RabbitMQ 消息队列,简单方便。由于公司没有测试推送接口的程序,就用 python 写了个简单的测试接口脚本。
功能:1、连接 mq,获取 mq 消息,方便查看推送的消息表是否正确。
2、接收消息后,对消息进行格式化输出。
3、调用语音接口,播放报警消息。
功能:1、连接 mq,获取 mq 消息,方便查看推送的消息表是否正确。
2、接收消息后,对消息进行格式化输出。
3、调用语音接口,播放报警消息。
代码演示如下:
import pika import time import json import pyttsx3 auth = pika.PlainCredentials( username='admin', password='admin123#$', ) # 用户名 / 密码 connection = pika.BlockingConnection( pika.ConnectionParameters( '10.9.101.60', # RabbitMQ 地址 5672, # 端口号 'test-host', # 虚拟主机 auth, # 验证 ) ) # 链接 RabbitMQ channel = connection.channel() # 创建 RabbitMQ 通道 channel.queue_declare( queue='event.send', # 消费对列名 durable=True, # 持久化 ) def speak(camera_name,event_name): # 语音识别 speaker = pyttsx3.init() speaker.say('警告:'+camera_name+'发生'+event_name) speaker.runAndWait() num = 1 def callback(ch, method, properties, body): global num body = body.decode('utf-8', 'ignore') body = json.loads(body) check_body(num,body) num = num+1 #print(body) def check_body(num,body): print('-----------------------%s------------------------------'%num,'\n' "报警时间:" + time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(body[0]['alarm_time']/1000)),'\n' "报警事件:" + body[0]['event_name'],'\n' "报警等级:" + body[0]['event_level'],'\n' "摄像机名称:" + body[0]['camera_no'],'\n' "摄像机 ID:" + body[0]['camera_id'],'\n' "摄像机 IP:" + body[0]['camera_ip'],'\n' "用户名:" + body[0]['camera_username'],'\n' "密码:" + body[0]['camera_password'],'\n' "UUID:" + body[0]['uuid'],'\n' "报警截图:" + body[0]['pic_url'],'\n' "报警录像:" + body[0]['rec_path']) speak(body[0]['camera_no'], body[0]['event_name']) channel.basic_consume( queue='event.send', # 对列名 auto_ack=True, # 自动回应 on_message_callback=callback, # 回调消息 ) time.sleep(1) # 模拟消费时间 print("开始接收报警信息!") channel.start_consuming()