基于讯飞API增加TTS能力,总配置文件样例

This commit is contained in:
lzybetter
2024-05-09 07:58:36 +08:00
parent 667d48f5f1
commit 807142da86
5 changed files with 225 additions and 11 deletions

View File

@@ -1,11 +1,12 @@
import asyncio
import importlib.util
import os
import threading
from datetime import datetime, timedelta
from datetime import timedelta, datetime
from util.base_plugin import BasePlugin
import telegram
from util.tts import TTSWebSocket, TTS
from playsound import playsound
import os
class PluginManager:
PLUGIN_DIR = "custom_plugins"
@@ -74,26 +75,54 @@ class PluginManager:
async def execute_plugin_async(self, plugin_name, schedule, *args, **kwargs):
telegram_need = False
tts_need = False
result = {'result':""}
APPID = ""
APIKey = ""
APISecret = ""
try:
token = self.config.get_plugin_config('TELEGRAM', self.plugins_path[plugin_name])['BOT_TOKEN']
chat_id = self.config.get_plugin_config('TELEGRAM', self.plugins_path[plugin_name])['CHAT_ID']
telegram_need = True
except:
telegram_need = False
try:
if self.config.get_plugin_config('TTS', self.plugins_path[plugin_name]) == "NEED":
tts_need = True
else:
tts_need = False
except Exception as e:
print(e)
tts_need = False
try:
APPID = self.config.get_config("TTS")["APPID"]
APIKey = self.config.get_config("TTS")["APIKey"]
APISecret = self.config.get_config("TTS")["APISecret"]
except Exception as e:
print(e)
sleep_time = 30
while True:
if plugin_name in self.plugins:
print(f"Executing plugin '{plugin_name}':")
print(schedule)
# print(f"Executing plugin '{plugin_name}':")
if schedule['mode'] == "INTERVAL" or schedule['mode'] == "CONTIENUOUS":
result = await self.execute_plugin(plugin_name, *args, **kwargs)
sleep_time = schedule['INTERVAL_TIME']
if telegram_need:
if result['result'] != "":
print(result)
await self.send_message_async(token, chat_id, result['result'])
if tts_need:
try:
if result['result'] != "":
tts = TTS(API_KEY=APIKey,
API_SECRET=APISecret, APP_ID=APPID)
sever = TTSWebSocket(tts_obj=tts, msg=(0, result['result']))
au_result = sever.run()
# speakpath = os.path.join(self.config.BASE_PATH, au_result)
print(au_result)
playsound(au_result)
except Exception as e:
print(e)
elif schedule['mode'] == "FIX":
now = datetime.now()
today = now.date()
@@ -103,23 +132,34 @@ class PluginManager:
next_day = today + timedelta(days=1)
next_time = datetime(year=next_day.year, month=next_day.month, day=next_day.day,
hour=schedule['HOUR'], minute=schedule['MINUTE'])
print(next_time)
sleep_time = (next_time - now).seconds
if now.hour == schedule['HOUR'] and now.minute == schedule["MINUTE"]:
result = await self.execute_plugin(plugin_name, *args, **kwargs)
if telegram_need:
if result['result'] != "":
await self.send_message_async(token, chat_id, result['result'])
if tts_need:
try:
if result['result'] != "":
tts = TTS(API_KEY=APIKey,
API_SECRET=APISecret, APP_ID=APPID)
sever = TTSWebSocket(tts_obj=tts, msg=(0, result['result']))
au_result = sever.run()
# speakpath = os.path.join(self.config.BASE_PATH, au_result)
print(au_result)
playsound(au_result)
except Exception as e:
print(e)
else:
print(f"Plugin '{plugin_name}' not found.")
print(sleep_time)
await asyncio.sleep(sleep_time)
async def send_message_async(self, bot_token, chat_id, text):
proxy_url = "http://" + self.config.get_proxy()['http']
print(proxy_url)
proxy = telegram.request.HTTPXRequest(proxy_url=proxy_url)
bot = telegram.Bot(token=bot_token, request=proxy)
await bot.send_message(chat_id=chat_id, text=text)

View File

@@ -1,7 +1,8 @@
import yaml
import os
from util.singleton import singleton
@singleton
class Config:
__BASE_PATH = os.getcwd()

161
util/tts.py Normal file
View File

@@ -0,0 +1,161 @@
import base64
import hashlib
import hmac
import json
import ssl
import threading
import time
from datetime import datetime
from time import mktime
from urllib.parse import urlencode
from wsgiref.handlers import format_date_time
import websocket
import os
from util.config import Config
STATUS_FIRST_FRAME = 0 # 第一帧的标识
STATUS_CONTINUE_FRAME = 1 # 中间帧标识
STATUS_LAST_FRAME = 2 # 最后一帧的标识
class TTS(object):
def __init__(self, APP_ID, API_KEY, API_SECRET):
self.APP_ID = APP_ID
self.API_KEY = API_KEY
self.API_SECRET = API_SECRET
self.config = Config()
# 公共参数(common)
self.common_args = {"app_id": self.APP_ID}
# 业务参数(business)
self.business_args = {
"aue": "lame", "sfl":1, "auf": "audio/L16;rate=16000", "vcn": "xiaoyan", "tte": "utf8"
}
# 生成业务数据流参数(data)
@staticmethod
def gen_data(text):
data = {
"status": 2, # 数据状态固定为2 注由于流式合成的文本只能一次性传输不支持多次分段传输此处status必须为2。
"text": str(base64.b64encode(text.encode('utf-8')), "UTF8")
}
return data
# 生成url
def create_url(self):
url = 'wss://tts-api.xfyun.cn/v2/tts'
# 生成RFC1123格式的时间戳
now = datetime.now()
date = format_date_time(mktime(now.timetuple()))
# 拼接字符串
signature_origin = "host: " + "ws-api.xfyun.cn" + "\n"
signature_origin += "date: " + date + "\n"
signature_origin += "GET " + "/v2/tts " + "HTTP/1.1"
# 进行hmac-sha256进行加密
signature_sha = hmac.new(self.API_SECRET.encode('utf-8'), signature_origin.encode('utf-8'),
digestmod=hashlib.sha256).digest()
signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8')
authorization_origin = "api_key=\"%s\", algorithm=\"%s\", headers=\"%s\", signature=\"%s\"" % (
self.API_KEY, "hmac-sha256", "host date request-line", signature_sha)
authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
# 将请求的鉴权参数组合为字典
v = {
"authorization": authorization,
"date": date,
"host": "ws-api.xfyun.cn"
}
# 拼接鉴权参数生成url
url = url + '?' + urlencode(v)
# print("date: ",date)
# print("v: ",v)
# 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释比对相同参数时生成的url与自己代码生成的url是否一致
# print('websocket url :', url)
return url
class TTSWebSocket(object):
def __init__(self, msg, tts_obj):
self.msg = msg
self.tts = tts_obj
self.url = tts_obj.create_url()
self.data = []
self.flag = False
self.audio_dir = "audio/"
self.ws_listener = None
self.config = Config()
websocket.enableTrace(False)
self.ws = websocket.WebSocketApp(self.url, on_message=self.on_message,
on_error=self.on_error, on_close=self.on_close)
def on_message(self, ws, msg):
try:
message = json.loads(msg)
print(message)
code = message["code"]
sid = message["sid"]
audio = message["data"]["audio"]
status = message["data"]["status"]
if code == 0:
self.data.append(audio)
else:
err_msg = message["message"]
print("sid:%s call error:%s code is:%s" % (sid, err_msg, code))
if status == 2:
print("------>数据接受完毕")
self.flag = True
self.ws.close()
except Exception as e:
print("receive msg,but parse exception:", e)
# print(sys.exc_info()[0])
def on_error(self, ws, error):
print("### error:", error)
def on_close(self, ws, *args):
print("### closed ###")
def on_open(self, ws):
d = {"common": self.tts.common_args,
"business": self.tts.business_args,
"data": self.tts.gen_data(self.msg[1]),
}
d = json.dumps(d)
print("------>开始发送文本数据: {}".format(self.msg))
self.ws.send(d)
def get_result(self):
self.flag = False
if self.data:
audio_path = os.path.join(self.config.BASE_PATH, self.config.get_config("TTS")["AUDIO_PATH"])
print(self.config.get_config("TTS"))
audio_file = os.path.join(audio_path, "result.mp3")
print(audio_path)
with open(audio_file, 'wb') as f:
for _r in self.data:
f.write(base64.b64decode(_r))
return audio_file
return "error未收到任何信息"
def run(self):
self.ws.on_open = self.on_open
self.ws_listener = threading.Thread(target=self.ws.run_forever, kwargs={"sslopt": {"cert_reqs": ssl.CERT_NONE}})
self.ws_listener.daemon = True
self.ws_listener.start()
timeout = 15
end_time = time.time() + timeout
while True:
if time.time() > end_time:
raise websocket.WebSocketTimeoutException
if self.flag:
result = self.get_result()
return result