麦克风实现语音转文字

使用pyaudio和火山引擎实现麦克风转语音

目前大模型盛行,大模型+机器人+agent也是成为了之后发展的大方向。

思路

硬件机器人想要实现对话,首先要能听到,这个可以使用外置麦克风实现,将麦克风收到的音频存储为wav等格式,然后将wav通过asr技术转换成文字,再对接文本大模型就能实现对话了。

本文就第一步进行探讨,题主平时是使用go语音进行开发的,所以在有这个想法后首先就想使用go实现,无奈发现,go想要实现麦克风录音,需要一系列复杂流程(关键最后也没有完成,呜呜呜)

so,转为python开发,python包pyaudio可以完全支持麦克风的数据存储,下载起来就是一行代码的事情, pip install pyaudio ,哎,专业的语言做专业的事。

这个包可以实现按找音频阈值决定是否对音频进行记录,并且可以通过该阈值实现音频分段,进而实现将分段的音频发送到转换接口。

asr接口选择的是字节的火山引擎(大公司,接口稳定且快),当然也可以使用本地部署whisper等,都是不错的方式。

具体代码实现:

asr_client

#coding=utf-8"""
requires Python 3.6 or laterpip install asyncio
pip install websockets
"""import asyncio
import base64
import gzip
import hmac
import json
import logging
import os
import uuid
import wave
from enum import Enum
from hashlib import sha256
from io import BytesIO
from typing import List
from urllib.parse import urlparse
import time
import websocketsappid = "xxxxxxxx"    # 项目的 appid
token = "xxxxxxxxxx"    # 项目的 token
cluster = "volcengine_streaming_common"  # 请求的集群
audio_path = "F:\huancun\python\\nova\wav\\recording_latest.wav"  # 本地音频路径
audio_format = "wav"   # wav 或者 mp3,根据实际音频格式设置PROTOCOL_VERSION = 0b0001
DEFAULT_HEADER_SIZE = 0b0001PROTOCOL_VERSION_BITS = 4
HEADER_BITS = 4
MESSAGE_TYPE_BITS = 4
MESSAGE_TYPE_SPECIFIC_FLAGS_BITS = 4
MESSAGE_SERIALIZATION_BITS = 4
MESSAGE_COMPRESSION_BITS = 4
RESERVED_BITS = 8# Message Type:
CLIENT_FULL_REQUEST = 0b0001
CLIENT_AUDIO_ONLY_REQUEST = 0b0010
SERVER_FULL_RESPONSE = 0b1001
SERVER_ACK = 0b1011
SERVER_ERROR_RESPONSE = 0b1111# Message Type Specific Flags
NO_SEQUENCE = 0b0000  # no check sequence
POS_SEQUENCE = 0b0001
NEG_SEQUENCE = 0b0010
NEG_SEQUENCE_1 = 0b0011# Message Serialization
NO_SERIALIZATION = 0b0000
JSON = 0b0001
THRIFT = 0b0011
CUSTOM_TYPE = 0b1111# Message Compression
NO_COMPRESSION = 0b0000
GZIP = 0b0001
CUSTOM_COMPRESSION = 0b1111def generate_header(version=PROTOCOL_VERSION,message_type=CLIENT_FULL_REQUEST,message_type_specific_flags=NO_SEQUENCE,serial_method=JSON,compression_type=GZIP,reserved_data=0x00,extension_header=bytes()
):"""protocol_version(4 bits), header_size(4 bits),message_type(4 bits), message_type_specific_flags(4 bits)serialization_method(4 bits) message_compression(4 bits)reserved (8bits) 保留字段header_extensions 扩展头(大小等于 8 * 4 * (header_size - 1) )"""header = bytearray()header_size = int(len(extension_header) / 4) + 1header.append((version << 4) | header_size)header.append((message_type << 4) | message_type_specific_flags)header.append((serial_method << 4) | compression_type)header.append(reserved_data)header.extend(extension_header)return headerdef generate_full_default_header():return generate_header()def generate_audio_default_header():return generate_header(message_type=CLIENT_AUDIO_ONLY_REQUEST)def generate_last_audio_default_header():return generate_header(message_type=CLIENT_AUDIO_ONLY_REQUEST,message_type_specific_flags=NEG_SEQUENCE)def parse_response(res):"""protocol_version(4 bits), header_size(4 bits),message_type(4 bits), message_type_specific_flags(4 bits)serialization_method(4 bits) message_compression(4 bits)reserved (8bits) 保留字段header_extensions 扩展头(大小等于 8 * 4 * (header_size - 1) )payload 类似与http 请求体"""protocol_version = res[0] >> 4header_size = res[0] & 0x0fmessage_type = res[1] >> 4message_type_specific_flags = res[1] & 0x0fserialization_method = res[2] >> 4message_compression = res[2] & 0x0freserved = res[3]header_extensions = res[4:header_size * 4]payload = res[header_size * 4:]result = {}payload_msg = Nonepayload_size = 0if message_type == SERVER_FULL_RESPONSE:payload_size = int.from_bytes(payload[:4], "big", signed=True)payload_msg = payload[4:]elif message_type == SERVER_ACK:seq = int.from_bytes(payload[:4], "big", signed=True)result['seq'] = seqif len(payload) >= 8:payload_size = int.from_bytes(payload[4:8], "big", signed=False)payload_msg = payload[8:]elif message_type == SERVER_ERROR_RESPONSE:code = int.from_bytes(payload[:4], "big", signed=False)result['code'] = codepayload_size = int.from_bytes(payload[4:8], "big", signed=False)payload_msg = payload[8:]if payload_msg is None:return resultif message_compression == GZIP:payload_msg = gzip.decompress(payload_msg)if serialization_method == JSON:payload_msg = json.loads(str(payload_msg, "utf-8"))elif serialization_method != NO_SERIALIZATION:payload_msg = str(payload_msg, "utf-8")result['payload_msg'] = payload_msgresult['payload_size'] = payload_sizereturn resultdef read_wav_info(data: bytes = None) -> (int, int, int, int, int): # type: ignorewith BytesIO(data) as _f:wave_fp = wave.open(_f, 'rb')nchannels, sampwidth, framerate, nframes = wave_fp.getparams()[:4]wave_bytes = wave_fp.readframes(nframes)return nchannels, sampwidth, framerate, nframes, len(wave_bytes)class AudioType(Enum):LOCAL = 1  # 使用本地音频文件class AsrWsClient:def __init__(self, audio_path, cluster, **kwargs):""":param config: config"""self.audio_path = audio_pathself.cluster = clusterself.success_code = 1000  # success code, default is 1000self.seg_duration = int(kwargs.get("seg_duration", 15000))self.nbest = int(kwargs.get("nbest", 1))self.appid = kwargs.get("appid", "")self.token = kwargs.get("token", "")self.ws_url = kwargs.get("ws_url", "wss://openspeech.bytedance.com/api/v2/asr")self.uid = kwargs.get("uid", "streaming_asr_demo")self.workflow = kwargs.get("workflow", "audio_in,resample,partition,vad,fe,decode,itn,nlu_punctuate")self.show_language = kwargs.get("show_language", False)self.show_utterances = kwargs.get("show_utterances", False)self.result_type = kwargs.get("result_type", "full")self.format = kwargs.get("format", "wav")self.rate = kwargs.get("sample_rate", 16000)self.language = kwargs.get("language", "zh-CN")self.bits = kwargs.get("bits", 16)self.channel = kwargs.get("channel", 1)self.codec = kwargs.get("codec", "raw")self.audio_type = kwargs.get("audio_type", AudioType.LOCAL)self.secret = kwargs.get("secret", "access_secret")self.auth_method = kwargs.get("auth_method", "token")self.mp3_seg_size = int(kwargs.get("mp3_seg_size", 10000))def construct_request(self, reqid):req = {'app': {'appid': self.appid,'cluster': self.cluster,'token': self.token,},'user': {'uid': self.uid},'request': {'reqid': reqid,'nbest': self.nbest,'workflow': self.workflow,'show_language': self.show_language,'show_utterances': self.show_utterances,'result_type': self.result_type,"sequence": 1},'audio': {'format': self.format,'rate': self.rate,'language': self.language,'bits': self.bits,'channel': self.channel,'codec': self.codec}}return req@staticmethoddef slice_data(data: bytes, chunk_size: int) -> (list, bool):"""slice data:param data: wav data:param chunk_size: the segment size in one request:return: segment data, last flag"""data_len = len(data)offset = 0while offset + chunk_size < data_len:yield data[offset: offset + chunk_size], Falseoffset += chunk_sizeelse:yield data[offset: data_len], Truedef _real_processor(self, request_params: dict) -> dict:passdef token_auth(self):return {'Authorization': 'Bearer; {}'.format(self.token)}def signature_auth(self, data):header_dicts = {'Custom': 'auth_custom',}url_parse = urlparse(self.ws_url)input_str = 'GET {} HTTP/1.1\n'.format(url_parse.path)auth_headers = 'Custom'for header in auth_headers.split(','):input_str += '{}\n'.format(header_dicts[header])input_data = bytearray(input_str, 'utf-8')input_data += datamac = base64.urlsafe_b64encode(hmac.new(self.secret.encode('utf-8'), input_data, digestmod=sha256).digest())header_dicts['Authorization'] = 'HMAC256; access_token="{}"; mac="{}"; h="{}"'.format(self.token,str(mac, 'utf-8'), auth_headers)return header_dictsasync def segment_data_processor(self, wav_data: bytes, segment_size: int):reqid = str(uuid.uuid4())# 构建 full client request,并序列化压缩request_params = self.construct_request(reqid)payload_bytes = str.encode(json.dumps(request_params))payload_bytes = gzip.compress(payload_bytes)full_client_request = bytearray(generate_full_default_header())full_client_request.extend((len(payload_bytes)).to_bytes(4, 'big'))  # payload size(4 bytes)full_client_request.extend(payload_bytes)  # payloadheader = Noneif self.auth_method == "token":header = self.token_auth()elif self.auth_method == "signature":header = self.signature_auth(full_client_request)async with websockets.connect(self.ws_url, extra_headers=header, max_size=1000000000) as ws:# 发送 full client requestawait ws.send(full_client_request)res = await ws.recv()result = parse_response(res)if 'payload_msg' in result and result['payload_msg']['code'] != self.success_code:return resultfor seq, (chunk, last) in enumerate(AsrWsClient.slice_data(wav_data, segment_size), 1):# if no compression, comment this linepayload_bytes = gzip.compress(chunk)audio_only_request = bytearray(generate_audio_default_header())if last:audio_only_request = bytearray(generate_last_audio_default_header())audio_only_request.extend((len(payload_bytes)).to_bytes(4, 'big'))  # payload size(4 bytes)audio_only_request.extend(payload_bytes)  # payload# 发送 audio-only client requestawait ws.send(audio_only_request)res = await ws.recv()result = parse_response(res)if 'payload_msg' in result and result['payload_msg']['code'] != self.success_code:return resultreturn resultasync def execute(self):with open(self.audio_path, mode="rb") as _f:data = _f.read()audio_data = bytes(data)if self.format == "mp3":segment_size = self.mp3_seg_sizereturn await self.segment_data_processor(audio_data, segment_size)if self.format != "wav":raise Exception("format should in wav or mp3")nchannels, sampwidth, framerate, nframes, wav_len = read_wav_info(audio_data)size_per_sec = nchannels * sampwidth * frameratesegment_size = int(size_per_sec * self.seg_duration / 1000)return await self.segment_data_processor(audio_data, segment_size)def execute_one(audio_item, cluster, **kwargs):""":param audio_item: {"id": xxx, "path": "xxx"}:param cluster:集群名称:return:"""assert 'id' in audio_itemassert 'path' in audio_itemaudio_id = audio_item['id']audio_path = audio_item['path']audio_type = AudioType.LOCALasr_http_client = AsrWsClient(audio_path=audio_path,cluster=cluster,audio_type=audio_type,**kwargs)result = asyncio.run(asr_http_client.execute())return {"id": audio_id, "path": audio_path, "result": result}def test_one():"""执行 test_one 逻辑并输出识别结果"""try:result = execute_one({'id': 1,'path': audio_path  # 确保这里的 audio_path 是你实际使用的路径},cluster=cluster,appid=appid,token=token,format=audio_format,)# 获取结果的 payload_msg 字段payload_msg = result.get('result', {}).get('payload_msg', {})# 检查返回的代码,1000 表示成功识别if payload_msg.get('code') == 1000:recognition_result = payload_msg.get('result', [])if recognition_result:text = recognition_result[0].get('text', '').strip()if text:# 只在识别成功时打印识别文本print(f"识别结果: {text}")else:print("未识别出文字,请重新说话")else:print("未识别出文字,请重新说话")else:# 错误码 1013 或其他情况print("未识别出文字,请重新说话")except Exception as e:# 捕获异常并打印错误信息print(f"执行 test_one 时发生错误: {e}")

main

import threading
import pyaudio
import wave
import time
import audioop
import os
from datetime import datetime
from asr_client import test_one  # 假设这个函数会调用语音识别服务并返回结果# 配置参数
CHUNK = 1024
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000
THRESHOLD = 2500  # 音量阈值
SILENCE_DURATION = 2  # 静音时长
DIRECTORY_TO_WATCH = "F:\\huancun\\python\\nova\\wav"  # 文件夹路径
FILE_PROCESSING_INTERVAL = 1.0  # 每个文件的最小处理间隔,单位为秒# 初始化 PyAudio
p = pyaudio.PyAudio()
stream = p.open(format=FORMAT,channels=CHANNELS,rate=RATE,input=True,frames_per_buffer=CHUNK)processed_files = {}  # 存储文件的最后处理时间,用于避免重复处理def record_audio():"""监听麦克风并录音"""print("开始监听麦克风...按 Ctrl+C 停止")frames = []recording = Falsesilence_start_time = Nonetry:while True:data = stream.read(CHUNK)rms = audioop.rms(data, 2)if rms > THRESHOLD:frames.append(data)recording = Truesilence_start_time = Noneelif recording:if silence_start_time is None:silence_start_time = time.time()if time.time() - silence_start_time >= SILENCE_DURATION:save_audio(frames)frames = []recording = Falsesilence_start_time = Noneexcept KeyboardInterrupt:print("\n录音已终止")finally:stream.stop_stream()stream.close()p.terminate()def save_audio(frames):"""将音频数据保存到以时间戳命名的 wav 文件中"""filename = f"{DIRECTORY_TO_WATCH}\\recording_latest.wav"wf = wave.open(filename, 'wb')wf.setnchannels(CHANNELS)wf.setsampwidth(p.get_sample_size(FORMAT))wf.setframerate(RATE)wf.writeframes(b''.join(frames))wf.close()print(f"已保存音频文件: {filename}")def monitor_directory():"""监控目标目录,并在检测到新文件时调用 test_one"""existing_files = {}while True:time.sleep(0.05)  # 轮询间隔current_files = set(os.listdir(DIRECTORY_TO_WATCH))for file in current_files:filepath = os.path.join(DIRECTORY_TO_WATCH, file)file_mtime = os.path.getmtime(filepath)# 如果文件的修改时间不同,且超过设定的时间间隔if file not in existing_files or existing_files[file] != file_mtime:current_time = time.time()last_processed_time = processed_files.get(file, 0)# 只有在上次处理时间超过 FILE_PROCESSING_INTERVAL 时才进行处理if current_time - last_processed_time > FILE_PROCESSING_INTERVAL:print(f"检测到文件变化: {filepath}")# 调用 test_one 逻辑test_one()# 记录最后一次处理时间processed_files[file] = current_timeexisting_files[file] = file_mtime  # 更新文件的修改时间if __name__ == '__main__':# 创建线程audio_thread = threading.Thread(target=record_audio)monitor_thread = threading.Thread(target=monitor_directory)# 启动线程audio_thread.start()monitor_thread.start()# 等待线程完成audio_thread.join()monitor_thread.join()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/10263.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

探索 Python 的新边疆:sh 库的革命性功能

文章目录 **探索 Python 的新边疆&#xff1a;sh 库的革命性功能**第一部分&#xff1a;背景介绍第二部分&#xff1a;sh 库是什么&#xff1f;第三部分&#xff1a;如何安装 sh 库&#xff1f;第四部分&#xff1a;简单库函数使用方法1. 执行 ls 命令2. 使用 grep 搜索文件内容…

深度学习——前向传播与反向传播、神经网络(前馈神经网络与反馈神经网络)、常见算法概要汇总

文章目录 &#x1f33a;深度学习面试八股汇总&#x1f33a;前向传播与反向传播前向传播&#xff08;Forward Propagation&#xff09;反向传播&#xff08;Back Propagation&#xff09;总结 神经网络简介结构类型前馈神经网络&#xff08;Feedforward Neural Network, FFNN&am…

MySQL 中的索引下推功能

看到索引&#xff0c;应该大家都可以联想到这个是和查询效率有关系的&#xff0c;既然有这个功能&#xff0c;那么那句古话说的好啊&#xff1a;存在即合理。那么这个就是说有了这个功能&#xff0c;可以提升查询效率。 什么是索引下推 我们先有一个大概的理解&#xff1a;在…

#渗透测试#SRC漏洞挖掘# 操作系统-Linux系统之基本命令、资源耗尽脚本编写

免责声明 本教程仅为合法的教学目的而准备&#xff0c;严禁用于任何形式的违法犯罪活动及其他商业行为&#xff0c;在使用本教程前&#xff0c;您应确保该行为符合当地的法律法规&#xff0c;继续阅读即表示您需自行承担所有操作的后果&#xff0c;如有异议&#xff0c;请立即停…

软考中级 软件设计师 上午考试内容笔记(个人向)Part.1

软考上午考试内容 1. 计算机系统 计算机硬件通过高/低电平来模拟1/0信息&#xff1b;【p进制】&#xff1a; K n K n − 1 . . . K 2 K 1 K 0 K − 1 K − 2... K − m K n r n . . . K 1 r 1 K 0 r 0 K − 1 r − 1 . . . K − m r − m K_nK_{n-1}...K_2K_1K_0K…

IDA*算法 Power Calculus————poj 3134

目录 闲聊 前言 DFS算法的无效搜索 BFS算法的空间浪费 IDDFS A*算法 IDA* Power Calculus 问题描述 输入 输出 问题分析 代码 闲聊 前几周在忙着数学竞赛&#xff0c;所以就没时间更新&#xff0c;高等数学&#xff0c;一生之敌&#xff0c;真不知道报名的时候我是怎么想…

基于python深度学习技术矩阵分解的推荐系统,通过学习隐含特征,实现推荐

实现了一个基于矩阵分解的推荐系统&#xff0c;用于预测用户对电影的评分。具体来说&#xff0c;该程序通过TensorFlow构建和训练一个模型&#xff0c;来学习用户和电影之间的隐含特征&#xff0c;并根据这些特征预测评分。以下是代码的主要功能和步骤的详细描述&#xff1a; …

C++高级编程(8)

八、标准IO库 1.输入输出流类 1)非格式化输入输出 2)put #include <iostream> #include <string> ​ using namespace std; int main() {string str "123456789";for (int i str.length() - 1; i > 0; i--) {cout.put(str[i]); //从最后一个字符开…

EMC Plus:大电流注入传导抗扰度

大电流注入 &#xff08;BCI&#xff09; 是一种传导射频抗扰度测试&#xff0c;利用电流注入探头将调制信号引入电缆。其目的是复制设备运行环境中预期的电磁干扰 &#xff08;EMI&#xff09; 条件。在这里&#xff0c;我将为您提供一个使用 Ansys EMC Plus 进行大电流注入传…

《Java核心技术 卷I》JFrame组件中显示信息

组件中显示信息 JFrame结构复杂&#xff0c;由四层窗格&#xff0c;其中根窗格、层级窗格和玻璃窗格人们并不太关心&#xff0c;他们要用来组织菜单栏和内容窗格以及实现观感&#xff0c;Swing程序员最关心的是内容窗格(content pane)&#xff0c;添加到窗体的所有组件都会自动…

0x00基础算法 -- 0x01 位运算

资料来源&#xff1a;算法竞赛进阶指南活动 - AcWing 1、进制表示 二进制表示&#xff1a;m位二进制中&#xff0c;通常称最低位为第0位&#xff0c;从右到左以此类推&#xff0c;最高位为第m-1位。 常用十六进制表示的数字&#xff1a; 32位补码int&#xff08;十进制&#xf…

算法求解(C#)-- 寻找包含目标字符串的最短子串算法

1. 引言 在字符串处理中&#xff0c;我们经常需要从一个较长的字符串中找到包含特定目标字符串的最短子串。这个问题在文本搜索、基因序列分析等领域有着广泛的应用。本文将介绍一种高效的算法来解决这个问题。 2. 问题描述 给定一个源字符串 source 和一个目标字符串 targe…

Linux之Chronyd 时间服务器配置(Chronod Time Server Configuration in Linux)

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 本人主要分享计算机核心技…

【Ant Design Pro】如何实现组件的状态保存umi-plugin-keep-alive插件的使用

都知道vuejs里面帮我们实现了一个内置的keep-alive组件&#xff0c;给我们缓存一些组件的状态带来了很大的便利。但是在react中没有自带的实现&#xff0c;可以借助社区的插件umi-plugin-keep-alive来实现这个功能。 实现效果对比 未使用插件&#xff0c;可以看到我们在页面跳…

【数据结构】二叉排序树和平衡二叉树

目录 1. 二叉搜索树&#xff08;BST&#xff09; 1.1 二叉搜索树的定义及特点 1.1.1 定义 1.1.2 特点 1.2 二叉排序树的构造&#xff08;创建&#xff09; 1.2.1 基本思想 1.2.2 算法 1.3 二叉排序树的删除 2. 平衡二叉树&#xff08;AVL&#xff09; 2.1 为什么要用…

C++四种类型转换

C语言提供了四种类型转换 const_cast: 可以去除掉常量属性的类型转换 //const_cast const int a 10; double* p1 (double*)&a;//类型和原来的类型可以不一致&#xff0c;但是不安全 int* p2 const_cast<int*>(&a);//类型和原本的类型必须匹配 //<>中必…

【SPIE出版,往届稳定EI检索】2024智能视觉与数据建模国际学术会议(ICIVD 2024,12月13-15日)

2024智能视觉与数据建模国际学术会议 2024 International Conference on Intelligent Vision and Data modeling (ICIVD 2024) 重要信息 会议官网&#xff1a;www.iccaid.net 2024 International Conference on Intelligent Vision and Data modeling (ICIVD 2024)www.iccaid…

大模型的思维链提示

文章目录 思维链提示的基本形式思维链提示的优化策略关于思维链的进一步讨论思维链提示是一种高级提示策略,旨在增强大语言模型在各类复杂推理任务上的表现。常见的推理任务包括算术推理、常识推理以及符号推理等多种任务。与上下文学习方法仅使用⟨输入,输出⟩二元组来构造提…

JavaScript day01 笔记

一、引入方式 JavaScript 程序不能独立运行&#xff0c;它需要被嵌入 HTML 中&#xff0c;然后浏览器才能执行 JavaScript 代码。通过 script 标签将 JavaScript 代码引入到 HTML 中 1️⃣内部 通过 script 标签包裹 JavaScript 代码&#xff08;一般就写在</script>的…

vue,uniapp,微信小程序解决字符串中出现数字则修改数字样式,以及获取字符串中的数字

简单记录一下&#xff0c;最近遇到的一个新需求&#xff1a;后端返回的是非富文本&#xff0c;只是一串字符串&#xff0c;其中包含了文字和数字&#xff0c;前端需要将出现数字的地方将其加粗或者修改颜色等需求 设计思路&#xff1a;&#xff08;简单做个记录方便以后理解&a…