zoukankan      html  css  js  c++  java
  • python检测音频中的静音

    #-*- coding: utf-8 -*-
    import os
    import wave
    from time import sleep
    import numpy as np
    
    SUCCESS = 0
    FAIL = 1
    
    # 需要添加录音互斥功能能,某些功能开启的时候录音暂时关闭
    def ZCR(curFrame):
        # 过零率
        tmp1 = curFrame[:-1]
        tmp2 = curFrame[1:]
        sings = (tmp1 * tmp2 <= 0)
        diffs = (tmp1 - tmp2) > 0.02
        zcr = np.sum(sings * diffs)
        return zcr
    
    
    def STE(curFrame):
        # 短时能量
        amp = np.sum(np.abs(curFrame))
        return amp
    
    
    class Vad(object):
        def __init__(self):
            # 初始短时能量高门限
            self.amp1 = 140
            # 初始短时能量低门限
            self.amp2 = 120
            # 初始短时过零率高门限
            self.zcr1 = 10
            # 初始短时过零率低门限
            self.zcr2 = 5
            # 允许最大静音长度
            self.maxsilence = 100
            # 语音的最短长度
            self.minlen = 40
            # 偏移值
            self.offsets = 40
            self.offsete = 40
            # 能量最大值
            self.max_en = 20000
            # 初始状态为静音
            self.status = 0
            self.count = 0
            self.silence = 0
            self.frame_len = 256
            self.frame_inc = 128
            self.cur_status = 0
            self.frames = []
            # 数据开始偏移
            self.frames_start = []
            self.frames_start_num = 0
            # 数据结束偏移
            self.frames_end = []
            self.frames_end_num = 0
            # 缓存数据
            self.cache_frames = []
            self.cache = ""
            # 最大缓存长度
            self.cache_frames_num = 0
            self.end_flag = False
            self.wait_flag = False
            self.on = True
            self.callback = None
            self.callback_res = []
            self.callback_kwargs = {}
    
        def clean(self):
            self.frames = []
            # 数据开始偏移
            self.frames_start = []
            self.frames_start_num = 0
            # 数据结束偏移
            self.frames_end = []
            self.frames_end_num = 0
            # 缓存数据
            self.cache_frames = []
            # 最大缓存长度
            self.cache_frames_num = 0
            self.end_flag = False
            self.wait_flag = False
    
        def go(self):
            self.wait_flag = False
    
        def wait(self):
            self.wait_flag = True
    
        def stop(self):
            self.on = False
    
        def add(self, frame, wait=True):
            if wait:
                print 'wait'
                frame = self.cache + frame
    
            while len(frame) > self.frame_len:
                frame_block = frame[:self.frame_len]
                self.cache_frames.append(frame_block)
                frame = frame[self.frame_len:]
            if wait:
                self.cache = frame
            else:
                self.cache = ""
                self.cache_frames.append(-1)
    
        def run(self,hasNum):
            print "开始执行音频端点检测"
            step = self.frame_len - self.frame_inc
            num = 0
            while 1:
                # 开始端点
                # 获得音频文件数字信号
                if self.wait_flag:
                    sleep(1)
                    continue
                if len(self.cache_frames) < 2:
                    sleep(0.05)
                    continue
    
                if self.cache_frames[1] == -1:
                    print '----------------没有声音--------------'
                    break
                # 从缓存中读取音频数据
                record_stream = "".join(self.cache_frames[:2])
                wave_data = np.fromstring(record_stream, dtype=np.int16)
                wave_data = wave_data * 1.0 / self.max_en
                data = wave_data[np.arange(0, self.frame_len)]
                speech_data = self.cache_frames.pop(0)
                # 获得音频过零率
                zcr = ZCR(data)
                # 获得音频的短时能量, 平方放大
                amp = STE(data) ** 2
                # 返回当前音频数据状态
                res = self.speech_status(amp, zcr)
    
                if res == 2:
                    hasNum += 1
    
                if hasNum > 10:
                    print '+++++++++++++++++++++++++有声音++++++++++++++++++++++++'
                    break
                num = num + 1
                # 一段一段进行检测
                self.frames_start.append(speech_data)
                self.frames_start_num += 1
                if self.frames_start_num == self.offsets:
                    # 开始音频开始的缓存部分
                    self.frames_start.pop(0)
                    self.frames_start_num -= 1
                if self.end_flag:
                    # 当音频结束后进行后部缓存
                    self.frames_end_num += 1
                    # 下一段语音开始,或达到缓存阀值
                    if res == 2 or self.frames_end_num == self.offsete:
                        speech_stream = b"".join(self.frames + self.frames_end)
                        self.callback_res.append(self.callback(speech_stream, **self.callback_kwargs))
    
                        # 数据环境初始化
                        # self.clean()
                        self.end_flag = False
    
                        self.frames = []
                        self.frames_end_num = 0
                        self.frames_end = []
    
                    self.frames_end.append(speech_data)
                if res == 2:
                    if self.cur_status in [0, 1]:
                        # 添加开始偏移数据到数据缓存
                        self.frames.append(b"".join(self.frames_start))
                    # 添加当前的语音数据
                    self.frames.append(speech_data)
                if res == 3:
                    print '检测音频结束'
                    self.frames.append(speech_data)
                    # 开启音频结束标志
                    self.end_flag = True
    
                self.cur_status = res
                # return self.callback_res
    
        def speech_status(self, amp, zcr):
            status = 0
            # 0= 静音, 1= 可能开始, 2=确定进入语音段
            if self.cur_status in [0, 1]:
                # 确定进入语音段
                if amp > self.amp1:
                    status = 2
                    self.silence = 0
                    self.count += 1
                # 可能处于语音段
                elif amp > self.amp2 or zcr > self.zcr2:
                    status = 1
                    self.count += 1
                # 静音状态
                else:
                    status = 0
                    self.count = 0
                    self.count = 0
            # 2 = 语音段
            elif self.cur_status == 2:
                # 保持在语音段
                if amp > self.amp2 or zcr > self.zcr2:
                    self.count += 1
                    status = 2
                # 语音将结束
                else:
                    # 静音还不够长,尚未结束
                    self.silence += 1
                    if self.silence < self.maxsilence:
                        self.count += 1
                        status = 2
                    # 语音长度太短认为是噪声
                    elif self.count < self.minlen:
                        status = 0
                        self.silence = 0
                        self.count = 0
                    # 语音结束
                    else:
                        status = 3
                        self.silence = 0
                        self.count = 0
            return status
    
    
    def read_file_data(filename):
        """
        输入:需要读取的文件名
        返回:(声道,量化位数,采样率,数据)
        """
        read_file = wave.open(filename, "r")
        params = read_file.getparams()
        nchannels, sampwidth, framerate, nframes = params[:4]
        data = read_file.readframes(nframes)
        return nchannels, sampwidth, framerate, data
    
    class FileParser(Vad):
        def __init__(self):
            self.block_size = 256
            Vad.__init__(self)
        def read_file(self, filename):
            if not os.path.isfile(filename):
                print "文件%s不存在" % filename
                return FAIL
            datas = read_file_data(filename)[-1]
            self.add(datas, False)
    
    if __name__ == "__main__":
        stream_test = FileParser()
    
        filename = 'test1566606924822.wav'
        result = stream_test.read_file(filename)
        if result != FAIL:
            stream_test.run(0)

    转载自网络,版权归原作者所有

  • 相关阅读:
    WPF/Sliverlight ScrollViewer与Panel(2)
    OpenGL学习笔记(7)多边形绘制
    OpenGL学习笔记(10)抗锯齿
    OpenGL学习笔记(9)颜色混合
    GLUT函数说明(转)
    OpenGL学习笔记(8)显示列表
    C#操作IIS的代码
    完整解决Flash载入中文FLASH乱码问题
    用C#的IIS上配置用户账号
    ASP.NET定时调用WebService 运行后台代码
  • 原文地址:https://www.cnblogs.com/passedbylove/p/11806548.html
Copyright © 2011-2022 走看看