使用pyaudio和火山引擎实现麦克风转语音
目前大模型盛行,大模型+机器人+agent也是成为了之后发展的大方向。
思路
硬件机器人想要实现对话,首先要能听到,这个可以使用外置麦克风实现,将麦克风收到的音频存储为wav等格式,然后将wav通过asr技术转换成文字,再对接文本大模型就能实现对话了。
本文就第一步进行探讨,题主平时是使用go语音进行开发的,所以在有这个想法后首先就想使用go实现,无奈发现,go想要实现麦克风录音,需要一系列复杂流程(关键最后也没有完成,呜呜呜)
so,转为python开发,python包pyaudio可以完全支持麦克风的数据存储,下载起来就是一行代码的事情, pip install pyaudio ,哎,专业的语言做专业的事。
这个包可以实现按找音频阈值决定是否对音频进行记录,并且可以通过该阈值实现音频分段,进而实现将分段的音频发送到转换接口。
asr接口选择的是字节的火山引擎(大公司,接口稳定且快),当然也可以使用本地部署whisper等,都是不错的方式。
具体代码实现:
asr_client
#coding=utf-8"""
requires Python 3.6 or laterpip install asyncio
pip install websockets
"""import asyncio
import base64
import gzip
import hmac
import json
import logging
import os
import uuid
import wave
from enum import Enum
from hashlib import sha256
from io import BytesIO
from typing import List
from urllib.parse import urlparse
import time
import websocketsappid = "xxxxxxxx" # 项目的 appid
token = "xxxxxxxxxx" # 项目的 token
cluster = "volcengine_streaming_common" # 请求的集群
audio_path = "F:\huancun\python\\nova\wav\\recording_latest.wav" # 本地音频路径
audio_format = "wav" # wav 或者 mp3,根据实际音频格式设置PROTOCOL_VERSION = 0b0001
DEFAULT_HEADER_SIZE = 0b0001PROTOCOL_VERSION_BITS = 4
HEADER_BITS = 4
MESSAGE_TYPE_BITS = 4
MESSAGE_TYPE_SPECIFIC_FLAGS_BITS = 4
MESSAGE_SERIALIZATION_BITS = 4
MESSAGE_COMPRESSION_BITS = 4
RESERVED_BITS = 8# Message Type:
CLIENT_FULL_REQUEST = 0b0001
CLIENT_AUDIO_ONLY_REQUEST = 0b0010
SERVER_FULL_RESPONSE = 0b1001
SERVER_ACK = 0b1011
SERVER_ERROR_RESPONSE = 0b1111# Message Type Specific Flags
NO_SEQUENCE = 0b0000 # no check sequence
POS_SEQUENCE = 0b0001
NEG_SEQUENCE = 0b0010
NEG_SEQUENCE_1 = 0b0011# Message Serialization
NO_SERIALIZATION = 0b0000
JSON = 0b0001
THRIFT = 0b0011
CUSTOM_TYPE = 0b1111# Message Compression
NO_COMPRESSION = 0b0000
GZIP = 0b0001
CUSTOM_COMPRESSION = 0b1111def generate_header(version=PROTOCOL_VERSION,message_type=CLIENT_FULL_REQUEST,message_type_specific_flags=NO_SEQUENCE,serial_method=JSON,compression_type=GZIP,reserved_data=0x00,extension_header=bytes()
):"""protocol_version(4 bits), header_size(4 bits),message_type(4 bits), message_type_specific_flags(4 bits)serialization_method(4 bits) message_compression(4 bits)reserved (8bits) 保留字段header_extensions 扩展头(大小等于 8 * 4 * (header_size - 1) )"""header = bytearray()header_size = int(len(extension_header) / 4) + 1header.append((version << 4) | header_size)header.append((message_type << 4) | message_type_specific_flags)header.append((serial_method << 4) | compression_type)header.append(reserved_data)header.extend(extension_header)return headerdef generate_full_default_header():return generate_header()def generate_audio_default_header():return generate_header(message_type=CLIENT_AUDIO_ONLY_REQUEST)def generate_last_audio_default_header():return generate_header(message_type=CLIENT_AUDIO_ONLY_REQUEST,message_type_specific_flags=NEG_SEQUENCE)def parse_response(res):"""protocol_version(4 bits), header_size(4 bits),message_type(4 bits), message_type_specific_flags(4 bits)serialization_method(4 bits) message_compression(4 bits)reserved (8bits) 保留字段header_extensions 扩展头(大小等于 8 * 4 * (header_size - 1) )payload 类似与http 请求体"""protocol_version = res[0] >> 4header_size = res[0] & 0x0fmessage_type = res[1] >> 4message_type_specific_flags = res[1] & 0x0fserialization_method = res[2] >> 4message_compression = res[2] & 0x0freserved = res[3]header_extensions = res[4:header_size * 4]payload = res[header_size * 4:]result = {}payload_msg = Nonepayload_size = 0if message_type == SERVER_FULL_RESPONSE:payload_size = int.from_bytes(payload[:4], "big", signed=True)payload_msg = payload[4:]elif message_type == SERVER_ACK:seq = int.from_bytes(payload[:4], "big", signed=True)result['seq'] = seqif len(payload) >= 8:payload_size = int.from_bytes(payload[4:8], "big", signed=False)payload_msg = payload[8:]elif message_type == SERVER_ERROR_RESPONSE:code = int.from_bytes(payload[:4], "big", signed=False)result['code'] = codepayload_size = int.from_bytes(payload[4:8], "big", signed=False)payload_msg = payload[8:]if payload_msg is None:return resultif message_compression == GZIP:payload_msg = gzip.decompress(payload_msg)if serialization_method == JSON:payload_msg = json.loads(str(payload_msg, "utf-8"))elif serialization_method != NO_SERIALIZATION:payload_msg = str(payload_msg, "utf-8")result['payload_msg'] = payload_msgresult['payload_size'] = payload_sizereturn resultdef read_wav_info(data: bytes = None) -> (int, int, int, int, int): # type: ignorewith BytesIO(data) as _f:wave_fp = wave.open(_f, 'rb')nchannels, sampwidth, framerate, nframes = wave_fp.getparams()[:4]wave_bytes = wave_fp.readframes(nframes)return nchannels, sampwidth, framerate, nframes, len(wave_bytes)class AudioType(Enum):LOCAL = 1 # 使用本地音频文件class AsrWsClient:def __init__(self, audio_path, cluster, **kwargs):""":param config: config"""self.audio_path = audio_pathself.cluster = clusterself.success_code = 1000 # success code, default is 1000self.seg_duration = int(kwargs.get("seg_duration", 15000))self.nbest = int(kwargs.get("nbest", 1))self.appid = kwargs.get("appid", "")self.token = kwargs.get("token", "")self.ws_url = kwargs.get("ws_url", "wss://openspeech.bytedance.com/api/v2/asr")self.uid = kwargs.get("uid", "streaming_asr_demo")self.workflow = kwargs.get("workflow", "audio_in,resample,partition,vad,fe,decode,itn,nlu_punctuate")self.show_language = kwargs.get("show_language", False)self.show_utterances = kwargs.get("show_utterances", False)self.result_type = kwargs.get("result_type", "full")self.format = kwargs.get("format", "wav")self.rate = kwargs.get("sample_rate", 16000)self.language = kwargs.get("language", "zh-CN")self.bits = kwargs.get("bits", 16)self.channel = kwargs.get("channel", 1)self.codec = kwargs.get("codec", "raw")self.audio_type = kwargs.get("audio_type", AudioType.LOCAL)self.secret = kwargs.get("secret", "access_secret")self.auth_method = kwargs.get("auth_method", "token")self.mp3_seg_size = int(kwargs.get("mp3_seg_size", 10000))def construct_request(self, reqid):req = {'app': {'appid': self.appid,'cluster': self.cluster,'token': self.token,},'user': {'uid': self.uid},'request': {'reqid': reqid,'nbest': self.nbest,'workflow': self.workflow,'show_language': self.show_language,'show_utterances': self.show_utterances,'result_type': self.result_type,"sequence": 1},'audio': {'format': self.format,'rate': self.rate,'language': self.language,'bits': self.bits,'channel': self.channel,'codec': self.codec}}return req@staticmethoddef slice_data(data: bytes, chunk_size: int) -> (list, bool):"""slice data:param data: wav data:param chunk_size: the segment size in one request:return: segment data, last flag"""data_len = len(data)offset = 0while offset + chunk_size < data_len:yield data[offset: offset + chunk_size], Falseoffset += chunk_sizeelse:yield data[offset: data_len], Truedef _real_processor(self, request_params: dict) -> dict:passdef token_auth(self):return {'Authorization': 'Bearer; {}'.format(self.token)}def signature_auth(self, data):header_dicts = {'Custom': 'auth_custom',}url_parse = urlparse(self.ws_url)input_str = 'GET {} HTTP/1.1\n'.format(url_parse.path)auth_headers = 'Custom'for header in auth_headers.split(','):input_str += '{}\n'.format(header_dicts[header])input_data = bytearray(input_str, 'utf-8')input_data += datamac = base64.urlsafe_b64encode(hmac.new(self.secret.encode('utf-8'), input_data, digestmod=sha256).digest())header_dicts['Authorization'] = 'HMAC256; access_token="{}"; mac="{}"; h="{}"'.format(self.token,str(mac, 'utf-8'), auth_headers)return header_dictsasync def segment_data_processor(self, wav_data: bytes, segment_size: int):reqid = str(uuid.uuid4())# 构建 full client request,并序列化压缩request_params = self.construct_request(reqid)payload_bytes = str.encode(json.dumps(request_params))payload_bytes = gzip.compress(payload_bytes)full_client_request = bytearray(generate_full_default_header())full_client_request.extend((len(payload_bytes)).to_bytes(4, 'big')) # payload size(4 bytes)full_client_request.extend(payload_bytes) # payloadheader = Noneif self.auth_method == "token":header = self.token_auth()elif self.auth_method == "signature":header = self.signature_auth(full_client_request)async with websockets.connect(self.ws_url, extra_headers=header, max_size=1000000000) as ws:# 发送 full client requestawait ws.send(full_client_request)res = await ws.recv()result = parse_response(res)if 'payload_msg' in result and result['payload_msg']['code'] != self.success_code:return resultfor seq, (chunk, last) in enumerate(AsrWsClient.slice_data(wav_data, segment_size), 1):# if no compression, comment this linepayload_bytes = gzip.compress(chunk)audio_only_request = bytearray(generate_audio_default_header())if last:audio_only_request = bytearray(generate_last_audio_default_header())audio_only_request.extend((len(payload_bytes)).to_bytes(4, 'big')) # payload size(4 bytes)audio_only_request.extend(payload_bytes) # payload# 发送 audio-only client requestawait ws.send(audio_only_request)res = await ws.recv()result = parse_response(res)if 'payload_msg' in result and result['payload_msg']['code'] != self.success_code:return resultreturn resultasync def execute(self):with open(self.audio_path, mode="rb") as _f:data = _f.read()audio_data = bytes(data)if self.format == "mp3":segment_size = self.mp3_seg_sizereturn await self.segment_data_processor(audio_data, segment_size)if self.format != "wav":raise Exception("format should in wav or mp3")nchannels, sampwidth, framerate, nframes, wav_len = read_wav_info(audio_data)size_per_sec = nchannels * sampwidth * frameratesegment_size = int(size_per_sec * self.seg_duration / 1000)return await self.segment_data_processor(audio_data, segment_size)def execute_one(audio_item, cluster, **kwargs):""":param audio_item: {"id": xxx, "path": "xxx"}:param cluster:集群名称:return:"""assert 'id' in audio_itemassert 'path' in audio_itemaudio_id = audio_item['id']audio_path = audio_item['path']audio_type = AudioType.LOCALasr_http_client = AsrWsClient(audio_path=audio_path,cluster=cluster,audio_type=audio_type,**kwargs)result = asyncio.run(asr_http_client.execute())return {"id": audio_id, "path": audio_path, "result": result}def test_one():"""执行 test_one 逻辑并输出识别结果"""try:result = execute_one({'id': 1,'path': audio_path # 确保这里的 audio_path 是你实际使用的路径},cluster=cluster,appid=appid,token=token,format=audio_format,)# 获取结果的 payload_msg 字段payload_msg = result.get('result', {}).get('payload_msg', {})# 检查返回的代码,1000 表示成功识别if payload_msg.get('code') == 1000:recognition_result = payload_msg.get('result', [])if recognition_result:text = recognition_result[0].get('text', '').strip()if text:# 只在识别成功时打印识别文本print(f"识别结果: {text}")else:print("未识别出文字,请重新说话")else:print("未识别出文字,请重新说话")else:# 错误码 1013 或其他情况print("未识别出文字,请重新说话")except Exception as e:# 捕获异常并打印错误信息print(f"执行 test_one 时发生错误: {e}")
main
import threading
import pyaudio
import wave
import time
import audioop
import os
from datetime import datetime
from asr_client import test_one # 假设这个函数会调用语音识别服务并返回结果# 配置参数
CHUNK = 1024
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000
THRESHOLD = 2500 # 音量阈值
SILENCE_DURATION = 2 # 静音时长
DIRECTORY_TO_WATCH = "F:\\huancun\\python\\nova\\wav" # 文件夹路径
FILE_PROCESSING_INTERVAL = 1.0 # 每个文件的最小处理间隔,单位为秒# 初始化 PyAudio
p = pyaudio.PyAudio()
stream = p.open(format=FORMAT,channels=CHANNELS,rate=RATE,input=True,frames_per_buffer=CHUNK)processed_files = {} # 存储文件的最后处理时间,用于避免重复处理def record_audio():"""监听麦克风并录音"""print("开始监听麦克风...按 Ctrl+C 停止")frames = []recording = Falsesilence_start_time = Nonetry:while True:data = stream.read(CHUNK)rms = audioop.rms(data, 2)if rms > THRESHOLD:frames.append(data)recording = Truesilence_start_time = Noneelif recording:if silence_start_time is None:silence_start_time = time.time()if time.time() - silence_start_time >= SILENCE_DURATION:save_audio(frames)frames = []recording = Falsesilence_start_time = Noneexcept KeyboardInterrupt:print("\n录音已终止")finally:stream.stop_stream()stream.close()p.terminate()def save_audio(frames):"""将音频数据保存到以时间戳命名的 wav 文件中"""filename = f"{DIRECTORY_TO_WATCH}\\recording_latest.wav"wf = wave.open(filename, 'wb')wf.setnchannels(CHANNELS)wf.setsampwidth(p.get_sample_size(FORMAT))wf.setframerate(RATE)wf.writeframes(b''.join(frames))wf.close()print(f"已保存音频文件: {filename}")def monitor_directory():"""监控目标目录,并在检测到新文件时调用 test_one"""existing_files = {}while True:time.sleep(0.05) # 轮询间隔current_files = set(os.listdir(DIRECTORY_TO_WATCH))for file in current_files:filepath = os.path.join(DIRECTORY_TO_WATCH, file)file_mtime = os.path.getmtime(filepath)# 如果文件的修改时间不同,且超过设定的时间间隔if file not in existing_files or existing_files[file] != file_mtime:current_time = time.time()last_processed_time = processed_files.get(file, 0)# 只有在上次处理时间超过 FILE_PROCESSING_INTERVAL 时才进行处理if current_time - last_processed_time > FILE_PROCESSING_INTERVAL:print(f"检测到文件变化: {filepath}")# 调用 test_one 逻辑test_one()# 记录最后一次处理时间processed_files[file] = current_timeexisting_files[file] = file_mtime # 更新文件的修改时间if __name__ == '__main__':# 创建线程audio_thread = threading.Thread(target=record_audio)monitor_thread = threading.Thread(target=monitor_directory)# 启动线程audio_thread.start()monitor_thread.start()# 等待线程完成audio_thread.join()monitor_thread.join()