zoukankan      html  css  js  c++  java
  • Python 语音识别

    调用科大讯飞语音听写,使用Python实现语音识别,将实时语音转换为文字。

    参考这篇博客实现的录音,首先在官网下载了关于语音听写的SDK,然后在文件夹内新建了两个.py文件,分别是get_audio.pyiat_demo.py,并且新建了一个存放录音的文件夹audios,文件夹内存放录音文件input.wav,我的整个文件目录如下:

    asr_SDK(文件名)
    ├─ Readme.html
    ├─ audios
    │    └─ input.wav(存放音频)
    ├─ bin
    │    ├─ gm_continuous_digit.abnf
    │    ├─ ise_cn
    │    ├─ ise_en
    │    ├─ msc
    │    ├─ msc.dll (因为我是32位的python,所以用的这个动态链接库)
    │    ├─ msc_x64.dll
    │    ├─ source.txt
    │    ├─ userwords.txt
    │    └─ wav
    ├─ doc
    ├─ get_audio.py
    ├─ iat_demo.py
    ├─ include
    ├─ libs
    ├─ release.txt
    └─ samples

    一、录音

    这里使用的是pyaudio进行录音,需要下载相关的轮子,具体可参考我的另一篇博客。然后根据自己的需要进行了修改,gt_audio.py全部代码如下:

    import pyaudio # 这个需要自己下载轮子
    import wave
    in_path = "./audios/input.wav" # 存放录音的路径
    
    def get_audio(filepath):
        aa = str(input("是否开始录音?   (y/n)"))
        if aa == str("y") :
            CHUNK = 1024
            FORMAT = pyaudio.paInt16
            CHANNELS = 1                # 声道数
            RATE = 11025                # 采样率
            RECORD_SECONDS = 5          # 录音时间
            WAVE_OUTPUT_FILENAME = filepath
            p = pyaudio.PyAudio()
    
            stream = p.open(format=FORMAT,
                            channels=CHANNELS,
                            rate=RATE,
                            input=True,
                            frames_per_buffer=CHUNK)
    
            print("*"*5, "开始录音:请在5秒内输入语音", "*"*5)
            frames = []
            for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
                data = stream.read(CHUNK)
                frames.append(data)
            print("*"*5, "录音结束
    ")
    
            stream.stop_stream()
            stream.close()
            p.terminate()
    
            wf = wave.open(WAVE_OUTPUT_FILENAME, 'wb')
            wf.setnchannels(CHANNELS)
            wf.setsampwidth(p.get_sample_size(FORMAT))
            wf.setframerate(RATE)
            wf.writeframes(b''.join(frames))
            wf.close()
        elif aa == str(""):
            exit()
        else:
            print("语音录入失败,请重新开始")
            get_audio(in_path)

    录音的保持是可循环的,每当重新录音,都会覆盖前一次的音频。

    二、语音识别

    直接使用的是科大讯飞官网语音听写的web API关于Python的例子,在此基础上进行了相关的调整,自动识别录音转换为文字。iat_demo.py的全部代码如下:

    import websocket
    import requests
    import datetime
    import hashlib
    import base64
    import hmac
    import json
    import os, sys
    import re
    from urllib.parse import urlencode
    import logging
    import time
    import ssl
    import wave
    from wsgiref.handlers import format_date_time
    from datetime import datetime
    from time import mktime
    from pyaudio import PyAudio,paInt16
    from get_audio import get_audio # 导入录音.py文件
    
    input_filename = "input.wav"               # 麦克风采集的语音输入
    input_filepath = "./audios/"              # 输入文件的path
    in_path = input_filepath + input_filename
    
    
    
    type = sys.getfilesystemencoding()
    
    path_pwd = os.path.split(os.path.realpath(__file__))[0]
    os.chdir(path_pwd)
    
    try:
        import thread
    except ImportError:
        import _thread as thread
    
    logging.basicConfig()
    
    STATUS_FIRST_FRAME = 0  # 第一帧的标识
    STATUS_CONTINUE_FRAME = 1  # 中间帧标识
    STATUS_LAST_FRAME = 2  # 最后一帧的标识
    
    framerate = 8000
    NUM_SAMPLES = 2000
    channels = 1
    sampwidth = 2
    TIME = 2
    
    global wsParam
    
    
    class Ws_Param(object):
        # 初始化
        def __init__(self, host):
            self.Host = host
            self.HttpProto = "HTTP/1.1"
            self.HttpMethod = "GET"
            self.RequestUri = "/v2/iat"
            self.APPID = "5d312675" # 在控制台-我的应用-语音听写(流式版)获取APPID
            self.Algorithm = "hmac-sha256"
            self.url = "wss://" + self.Host + self.RequestUri
    
            # 采集音频 录音
            get_audio("./audios/input.wav")
    
            # 设置测试音频文件,流式听写一次最多支持60s,超过60s会引起超时等错误。
            self.AudioFile = r"./audios/input.wav"
    
            self.CommonArgs = {"app_id": self.APPID}
            self.BusinessArgs = {"domain":"iat", "language": "zh_cn","accent":"mandarin"}
    
        def create_url(self):
            url = 'wss://ws-api.xfyun.cn/v2/iat'
            now = datetime.now()
            date = format_date_time(mktime(now.timetuple()))
            APIKey = 'a6aabfcca4ae28f9b6a448f705b7e432' # 在控制台-我的应用-语音听写(流式版)获取APIKey
            APISecret = 'e649956e14eeb085d1b0dce77a671131' # 在控制台-我的应用-语音听写(流式版)获取APISecret
    
            signature_origin = "host: " + "ws-api.xfyun.cn" + "
    "
            signature_origin += "date: " + date + "
    "
            signature_origin += "GET " + "/v2/iat " + "HTTP/1.1"
            signature_sha = hmac.new(APISecret.encode('utf-8'), signature_origin.encode('utf-8'),
                                     digestmod=hashlib.sha256).digest()
            signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8')
    
            authorization_origin = "api_key="%s", algorithm="%s", headers="%s", signature="%s"" % (
                APIKey, "hmac-sha256", "host date request-line", signature_sha)
            authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
            v = {
                "authorization": authorization,
                "date": date,
                "host": "ws-api.xfyun.cn"
            }
            url = url + '?' + urlencode(v)
            return url
    
    # 收到websocket消息的处理 这里我对json解析进行了一些更改 打印简短的一些信息
    def on_message(ws, message):
        msg = json.loads(message) # 将json对象转换为python对象 json格式转换为字典格式
        try:
            code = msg["code"]
            sid = msg["sid"]
            
            if code != 0:
                errMsg = msg["message"]
                print("sid:%s call error:%s code is:%s
    " % (sid, errMsg, code))
            else:
                result = msg["data"]["result"]["ws"]
                # 以json格式显示 
                data_result = json.dumps(result, ensure_ascii=False, sort_keys=True, indent=4, separators=(',', ': ')) 
                print("sid:%s call success!" % (sid))
                print("result is:%s
    " % (data_result))
        except Exception as e:
            print("receive msg,but parse exception:", e)
    
    
    # 收到websocket错误的处理
    def on_error(ws, error):
        print("### error:", error)
    
    
    # 收到websocket关闭的处理
    def on_close(ws):
        print("### closed ###")
    
    
    # 收到websocket连接建立的处理
    def on_open(ws):
        def run(*args):
            frameSize = 1280  # 每一帧的音频大小
            intervel = 0.04  # 发送音频间隔(单位:s)
            status = STATUS_FIRST_FRAME  # 音频的状态信息,标识音频是第一帧,还是中间帧、最后一帧
            with open(wsParam.AudioFile, "rb") as fp:
                while True:
                    buf = fp.read(frameSize)
    
                    # 文件结束
                    if not buf:
                        status = STATUS_LAST_FRAME
                    # 第一帧处理
                    # 发送第一帧音频,带business 参数
                    # appid 必须带上,只需第一帧发送
                    if status == STATUS_FIRST_FRAME:
    
                        d = {"common": wsParam.CommonArgs,
                             "business": wsParam.BusinessArgs,
                             "data": {"status": 0, "format": "audio/L16;rate=16000",
                                       "audio": str(base64.b64encode(buf),'utf-8'),
                                      "encoding": "raw"}}
                        d = json.dumps(d)
                        ws.send(d)
                        status = STATUS_CONTINUE_FRAME
                    # 中间帧处理
                    elif status == STATUS_CONTINUE_FRAME:
                        d = {"data": {"status": 1, "format": "audio/L16;rate=16000",
                                       "audio": str(base64.b64encode(buf),'utf-8'),
                                      "encoding": "raw"}}
                        ws.send(json.dumps(d))
                    # 最后一帧处理
                    elif status == STATUS_LAST_FRAME:
                        d = {"data": {"status": 2, "format": "audio/L16;rate=16000",
                                      "audio": str(base64.b64encode(buf),'utf-8'),
                                      "encoding": "raw"}}
                        ws.send(json.dumps(d))
                        time.sleep(1)
                        break
                    # 模拟音频采样间隔
                    time.sleep(intervel)
            ws.close()
    
        thread.start_new_thread(run, ())
    
    
    if __name__ == "__main__":
        wsParam = Ws_Param("ws-api.xfyun.cn") #流式听写 域名
        websocket.enableTrace(False)
        wsUrl = wsParam.create_url()
        ws = websocket.WebSocketApp(wsUrl, on_message=on_message, on_error=on_error, on_close=on_close)
        ws.on_open = on_open
        ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})

    三、启动程序

    在程序文件夹内,右键点击iat_demo,选择Edit with IDLE->Edit with IDLE3.7(32 bit)打开,然后使用F5快速启动,启动的时候如果提示没有哪个第三方包,自行安装。启动成功后根据提示进行操作,这是我修改json解析之前操作的打印结果:
    在这里插入图片描述
    相关代码下载

  • 相关阅读:
    VSFTPD匿名用户上传文件
    shell随机数比较
    Verse For Santa ---- CodeForces
    计算机基础--整数加减运算
    Hello,Blog!
    题解 P2486 【[SDOI2011]染色】
    题解 P1047 【校门外的树】
    题解 P1339 【[USACO09OCT]热浪Heat Wave】——线段树做法
    题解 P1197 【[JSOI2008]星球大战】
    题解 P1886 【滑动窗口】
  • 原文地址:https://www.cnblogs.com/suRimn/p/11314956.html
Copyright © 2011-2022 走看看