调用科大讯飞语音听写,使用
Python
实现语音识别,将实时语音转换为文字。
参考这篇博客实现的录音,首先在官网下载了关于语音听写的SDK
,然后在文件夹内新建了两个.py
文件,分别是get_audio.py
和iat_demo.py
,并且新建了一个存放录音的文件夹audios
,文件夹内存放录音文件input.wav
,我的整个文件目录如下:
asr_SDK(文件名)
├─ Readme.html
├─ audios
│ └─ input.wav(存放音频)
├─ bin
│ ├─ gm_continuous_digit.abnf
│ ├─ ise_cn
│ ├─ ise_en
│ ├─ msc
│ ├─ msc.dll (因为我是32位的python,所以用的这个动态链接库)
│ ├─ msc_x64.dll
│ ├─ source.txt
│ ├─ userwords.txt
│ └─ wav
├─ doc
├─ get_audio.py
├─ iat_demo.py
├─ include
├─ libs
├─ release.txt
└─ samples
这里使用的是pyaudio
进行录音,需要下载相关的轮子,具体可参考我的另一篇博客。然后根据自己的需要进行了修改,gt_audio.py
全部代码如下:
import pyaudio # 这个需要自己下载轮子 import wave in_path = "./audios/input.wav" # 存放录音的路径 def get_audio(filepath): aa = str(input("是否开始录音? (y/n)")) if aa == str("y") : CHUNK = 1024 FORMAT = pyaudio.paInt16 CHANNELS = 1 # 声道数 RATE = 11025 # 采样率 RECORD_SECONDS = 5 # 录音时间 WAVE_OUTPUT_FILENAME = filepath p = pyaudio.PyAudio() stream = p.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK) print("*"*5, "开始录音:请在5秒内输入语音", "*"*5) frames = [] for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)): data = stream.read(CHUNK) frames.append(data) print("*"*5, "录音结束 ") stream.stop_stream() stream.close() p.terminate() wf = wave.open(WAVE_OUTPUT_FILENAME, 'wb') wf.setnchannels(CHANNELS) wf.setsampwidth(p.get_sample_size(FORMAT)) wf.setframerate(RATE) wf.writeframes(b''.join(frames)) wf.close() elif aa == str("否"): exit() else: print("语音录入失败,请重新开始") get_audio(in_path)
录音的保持是可循环的,每当重新录音,都会覆盖前一次的音频。
直接使用的是科大讯飞官网语音听写的web API
关于Python
的例子,在此基础上进行了相关的调整,自动识别录音转换为文字。iat_demo.py
的全部代码如下:
import websocket import requests import datetime import hashlib import base64 import hmac import json import os, sys import re from urllib.parse import urlencode import logging import time import ssl import wave from wsgiref.handlers import format_date_time from datetime import datetime from time import mktime from pyaudio import PyAudio,paInt16 from get_audio import get_audio # 导入录音.py文件 input_filename = "input.wav" # 麦克风采集的语音输入 input_filepath = "./audios/" # 输入文件的path in_path = input_filepath + input_filename type = sys.getfilesystemencoding() path_pwd = os.path.split(os.path.realpath(__file__))[0] os.chdir(path_pwd) try: import thread except ImportError: import _thread as thread logging.basicConfig() STATUS_FIRST_FRAME = 0 # 第一帧的标识 STATUS_CONTINUE_FRAME = 1 # 中间帧标识 STATUS_LAST_FRAME = 2 # 最后一帧的标识 framerate = 8000 NUM_SAMPLES = 2000 channels = 1 sampwidth = 2 TIME = 2 global wsParam class Ws_Param(object): # 初始化 def __init__(self, host): self.Host = host self.HttpProto = "HTTP/1.1" self.HttpMethod = "GET" self.RequestUri = "/v2/iat" self.APPID = "5d312675" # 在控制台-我的应用-语音听写(流式版)获取APPID self.Algorithm = "hmac-sha256" self.url = "wss://" + self.Host + self.RequestUri # 采集音频 录音 get_audio("./audios/input.wav") # 设置测试音频文件,流式听写一次最多支持60s,超过60s会引起超时等错误。 self.AudioFile = r"./audios/input.wav" self.CommonArgs = {"app_id": self.APPID} self.BusinessArgs = {"domain":"iat", "language": "zh_cn","accent":"mandarin"} def create_url(self): url = 'wss://ws-api.xfyun.cn/v2/iat' now = datetime.now() date = format_date_time(mktime(now.timetuple())) APIKey = 'a6aabfcca4ae28f9b6a448f705b7e432' # 在控制台-我的应用-语音听写(流式版)获取APIKey APISecret = 'e649956e14eeb085d1b0dce77a671131' # 在控制台-我的应用-语音听写(流式版)获取APISecret signature_origin = "host: " + "ws-api.xfyun.cn" + " " signature_origin += "date: " + date + " " signature_origin += "GET " + "/v2/iat " + "HTTP/1.1" signature_sha = hmac.new(APISecret.encode('utf-8'), signature_origin.encode('utf-8'), digestmod=hashlib.sha256).digest() signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8') authorization_origin = "api_key="%s", algorithm="%s", headers="%s", signature="%s"" % ( APIKey, "hmac-sha256", "host date request-line", signature_sha) authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8') v = { "authorization": authorization, "date": date, "host": "ws-api.xfyun.cn" } url = url + '?' + urlencode(v) return url # 收到websocket消息的处理 这里我对json解析进行了一些更改 打印简短的一些信息 def on_message(ws, message): msg = json.loads(message) # 将json对象转换为python对象 json格式转换为字典格式 try: code = msg["code"] sid = msg["sid"] if code != 0: errMsg = msg["message"] print("sid:%s call error:%s code is:%s " % (sid, errMsg, code)) else: result = msg["data"]["result"]["ws"] # 以json格式显示 data_result = json.dumps(result, ensure_ascii=False, sort_keys=True, indent=4, separators=(',', ': ')) print("sid:%s call success!" % (sid)) print("result is:%s " % (data_result)) except Exception as e: print("receive msg,but parse exception:", e) # 收到websocket错误的处理 def on_error(ws, error): print("### error:", error) # 收到websocket关闭的处理 def on_close(ws): print("### closed ###") # 收到websocket连接建立的处理 def on_open(ws): def run(*args): frameSize = 1280 # 每一帧的音频大小 intervel = 0.04 # 发送音频间隔(单位:s) status = STATUS_FIRST_FRAME # 音频的状态信息,标识音频是第一帧,还是中间帧、最后一帧 with open(wsParam.AudioFile, "rb") as fp: while True: buf = fp.read(frameSize) # 文件结束 if not buf: status = STATUS_LAST_FRAME # 第一帧处理 # 发送第一帧音频,带business 参数 # appid 必须带上,只需第一帧发送 if status == STATUS_FIRST_FRAME: d = {"common": wsParam.CommonArgs, "business": wsParam.BusinessArgs, "data": {"status": 0, "format": "audio/L16;rate=16000", "audio": str(base64.b64encode(buf),'utf-8'), "encoding": "raw"}} d = json.dumps(d) ws.send(d) status = STATUS_CONTINUE_FRAME # 中间帧处理 elif status == STATUS_CONTINUE_FRAME: d = {"data": {"status": 1, "format": "audio/L16;rate=16000", "audio": str(base64.b64encode(buf),'utf-8'), "encoding": "raw"}} ws.send(json.dumps(d)) # 最后一帧处理 elif status == STATUS_LAST_FRAME: d = {"data": {"status": 2, "format": "audio/L16;rate=16000", "audio": str(base64.b64encode(buf),'utf-8'), "encoding": "raw"}} ws.send(json.dumps(d)) time.sleep(1) break # 模拟音频采样间隔 time.sleep(intervel) ws.close() thread.start_new_thread(run, ()) if __name__ == "__main__": wsParam = Ws_Param("ws-api.xfyun.cn") #流式听写 域名 websocket.enableTrace(False) wsUrl = wsParam.create_url() ws = websocket.WebSocketApp(wsUrl, on_message=on_message, on_error=on_error, on_close=on_close) ws.on_open = on_open ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
在程序文件夹内,右键点击iat_demo
,选择Edit with IDLE
->Edit with IDLE3.7(32 bit)
打开,然后使用F5
快速启动,启动的时候如果提示没有哪个第三方包,自行安装。启动成功后根据提示进行操作,这是我修改json
解析之前操作的打印结果:
相关代码下载