DB_GPT excel研究
- 摘要
- 视频简介
- 源码分析
- excel文档上传预处理
- 对话
摘要
DB_GPT集成了很多对话方式,其中呢就有关于excel对话的模块,我搜集各大web好像都没有关于这个模块的研究,于是乎就自行研究了对于excel对话的的功能
如果是想看与数据库对话的可以看我写的另一篇博客DB_GPT DB对话
视频简介
dbgpt
源码分析
excel文档上传预处理
- 刚进excel对话界面需要上传excel文件
- 选好文件之后,点击上传
- 前端抓包,获得请求路径
\DB-GPT-main\dbgpt\app\openapi\api_v1\api_v1.py(方法具体位置)
@router.post("/v1/chat/mode/params/file/load")
async def params_load(conv_uid: str,chat_mode: str,model_name: str,user_name: Optional[str] = None,sys_code: Optional[str] = None,doc_file: UploadFile = File(...),
):print(f"params_load: {conv_uid},{chat_mode},{model_name}")try:if doc_file:# Save the uploaded fileupload_dir = os.path.join(KNOWLEDGE_UPLOAD_ROOT_PATH, chat_mode)os.makedirs(upload_dir, exist_ok=True)upload_path = os.path.join(upload_dir, doc_file.filename)async with aiofiles.open(upload_path, "wb") as f:await f.write(await doc_file.read())# Prepare the chatdialogue = ConversationVo(conv_uid=conv_uid,chat_mode=chat_mode,select_param=doc_file.filename,model_name=model_name,user_name=user_name,sys_code=sys_code,)# 有excel_reader参数,读取了excel的内容chat: BaseChat = await get_chat_instance(dialogue)resp = await chat.prepare()print('-------------------------')rres = Result.succ(get_hist_messages(conv_uid))print(rres)# Refresh messagesreturn rresexcept Exception as e:logger.error("excel load error!", e)return Result.failed(code="E000X", msg=f"File Load Error {str(e)}")
- debug发现chat: BaseChat = await get_chat_instance(dialogue)读取了excel的内容
一路debug找到了读取excel文件的关键代码
D:\Code\Code_jupyter\DB-GPT-main\dbgpt\app\scene\chat_data\chat_excel\excel_reader.py
class ExcelReader:def __init__(self, file_path):file_name = os.path.basename(file_path)self.file_name_without_extension = os.path.splitext(file_name)[0]encoding, confidence = detect_encoding(file_path)logger.info(f"Detected Encoding: {encoding} (Confidence: {confidence})")self.excel_file_name = file_nameself.extension = os.path.splitext(file_name)[1]# read excel fileif file_path.endswith(".xlsx") or file_path.endswith(".xls"):#初步读取的 DataFrame,用于检查和处理列名df_tmp = pd.read_excel(file_path, index_col=False)#使用 converters 参数格式化每列的数据格式self.df = pd.read_excel(file_path,index_col=False,converters={i: csv_colunm_foramt for i in range(df_tmp.shape[1])},)elif file_path.endswith(".csv"):df_tmp = pd.read_csv(file_path, index_col=False, encoding=encoding)self.df = pd.read_csv(file_path,index_col=False,encoding=encoding,# csv_colunm_foramt 可以修改更多,只是针对美元人民币符号,假如是“你好¥¥¥”则会报错!converters={i: csv_colunm_foramt for i in range(df_tmp.shape[1])},)else:raise ValueError("Unsupported file format.")self.df.replace("", np.nan, inplace=True)#进行清洗和处理unnamed_columns_tmp = [colfor col in df_tmp.columnsif col.startswith("Unnamed") and df_tmp[col].isnull().all()]df_tmp.drop(columns=unnamed_columns_tmp, inplace=True)self.df = self.df[df_tmp.columns.values]#self.columns_map = {}for column_name in df_tmp.columns:self.df[column_name] = self.df[column_name].astype(str)self.columns_map.update({column_name: excel_colunm_format(column_name)})try:self.df[column_name] = pd.to_datetime(self.df[column_name]).dt.strftime("%Y-%m-%d")except ValueError:try:self.df[column_name] = pd.to_numeric(self.df[column_name])except ValueError:try:self.df[column_name] = self.df[column_name].astype(str)except Exception:print("Can't transform column: " + column_name)self.df = self.df.rename(columns=lambda x: x.strip().replace(" ", "_"))# 连接 DuckDB 数据库self.db = duckdb.connect(database=":memory:", read_only=False)self.table_name = "excel_data"# write data in duckdbself.db.register(self.table_name, self.df)# 获取结果并打印表结构信息result = self.db.execute(f"DESCRIBE {self.table_name}")columns = result.fetchall()for column in columns:print(column)def run(self, sql):try:if f'"{self.table_name}"' in sql:sql = sql.replace(f'"{self.table_name}"', self.table_name)sql = add_quotes_to_chinese_columns(sql)print(f"excute sql:{sql}")results = self.db.execute(sql)colunms = []for descrip in results.description:colunms.append(descrip[0])return colunms, results.fetchall()except Exception as e:logger.error(f"excel sql run error!, {str(e)}")raise ValueError(f"Data Query Exception!\\nSQL[{sql}].\\nError:{str(e)}")#通过 SQL 查询将结果转换为 DataFrame 返回def get_df_by_sql_ex(self, sql):colunms, values = self.run(sql)return pd.DataFrame(values, columns=colunms)def get_sample_data(self):return self.run(f"SELECT * FROM {self.table_name} LIMIT 5;")
- chat: BaseChat = await get_chat_instance(dialogue)获取了一个对话实例
在上传文档的时候是用的下面的提示词模板
D:\Code\Code_jupyter\DB-GPT-main\dbgpt\app\scene\chat_data\chat_excel\excel_analyze\prompt.py
_PROMPT_SCENE_DEFINE_ZH = """你是一个数据分析专家!"""
_DEFAULT_TEMPLATE_ZH = """
请使用历史对话中的数据结构信息,在满足下面约束条件下通过duckdb sql数据分析回答用户的问题。
约束条件:1.请充分理解用户的问题,使用duckdb sql的方式进行分析, 分析内容按下面要求的输出格式返回,sql请输出在对应的sql参数中2.请从如下给出的展示方式种选择最优的一种用以进行数据渲染,将类型名称放入返回要求格式的name参数值种,如果找不到最合适的则使用'Table'作为展示方式,可用数据展示方式如下: {display_type}3.SQL中需要使用的表名是: {table_name},请检查你生成的sql,不要使用没在数据结构中的列名4.优先使用数据分析的方式回答,如果用户问题不涉及数据分析内容,你可以按你的理解进行回答5.输出内容中sql部分转换为:<api-call><name>[数据显示方式]</name><args><sql>[正确的duckdb数据分析sql]</sql></args></api- call> 这样的格式,参考返回格式要求请一步一步思考,给出回答,并确保你的回答内容格式如下:对用户说的想法摘要.<api-call><name>[数据展示方式]</name><args><sql>[正确的duckdb数据分析sql]</sql></args></api-call>用户问题:{user_input}
"""
还有一个提示词是,在读取了文件内容之后,用另外一个提示词
里面data_example的内容是
def get_sample_data(self):
return self.run(f"SELECT * FROM {self.table_name} LIMIT 5;")
从duckdb取的样例数据
prompt
_DEFAULT_TEMPLATE_ZH = """
下面是用户文件{file_name}的一部分数据,请学习理解该数据的结构和内容,按要求输出解析结果:{data_example}
分析各列数据的含义和作用,并对专业术语进行简单明了的解释, 如果是时间类型请给出时间格式类似:yyyy-MM-dd HH:MM:ss.
将列名作为属性名,分析解释作为属性值,组成json数组,并输出在返回json内容的ColumnAnalysis属性中.
请不要修改或者翻译列名,确保和给出数据列名一致.
针对数据从不同维度提供一些有用的分析思路给用户。请一步一步思考,确保只以JSON格式回答,具体格式如下:{response}
"""_RESPONSE_FORMAT_SIMPLE_ZH = {"DataAnalysis": "数据内容分析总结","ColumnAnalysis": [{"column name": "字段1介绍,专业术语解释(请尽量简单明了)"}],"AnalysisProgram": ["1.分析方案1", "2.分析方案2"],
}[ModelMessage(role='system', content='你是一个数据分析专家. \n下面是用户文件故障记录表2.csv的一部分数据,请学习理解该数据的结构和内容,按要求输出解析结果:\n [["\\u65f6\\u95f4", "\\u673a\\u7ec4", "\\u8bbe\\u5907", "\\u6545\\u969c\\u539f\\u56e0"], ["2024-09-23", "A\\u7ec4", "\\u53d1\\u7535\\u673a1", "\\u77ed\\u8def "], ["2024-09-23", "B\\u7ec4", "\\u53d8\\u538b\\u56682", "\\u8fc7\\u70ed "], ["2024-09-23", "C\\u7ec4", "\\u51b7\\u5374\\u7cfb\\u7edf", "\\u6f0f\\u6c34 "], ["2024-09-23", "A\\u7ec4", "\\u63a7\\u5236\\u677f", "\\u8f6f\\u4ef6\\u6545\\u969c "], ["2024-09-23", "D\\u7ec4", "\\u7535\\u52a8\\u673a3", "\\u8f74\\u627f\\u635f\\u574f "]]\n分析各列数据的含义和作用,并对专业术语进行简单明了的解释, 如果是时间类型请给出时间格式类似:yyyy-MM-dd HH:MM:ss.\n将列名作为属性名,分析解释作为属性值,组成json数组,并输出在返回json内容的ColumnAnalysis属性中.\n请不要修改或者翻译列名,确保和给出数据列名一致.\n针对数据从不同维度提供一些有用的分析思路给用户。\n\n请一步一步思考,确保只以JSON格式回答,具体格式如下:\n "{\\n \\"DataAnalysis\\": \\"数据内容分析总结\\",\\n \\"ColumnAnalysis\\": [\\n {\\n \\"column name\\": \\"字段1介绍,专业术语解释(请尽量简单明了)\\"\\n }\\n ],\\n \\"AnalysisProgram\\": [\\n \\"1.分析方案1\\",\\n \\"2.分析方案2\\"\\n ]\\n}"\n', round_index=0), ModelMessage(role='human', content='故障记录表2.csv', round_index=0)]
然后交给模型处理
D:\Code\Code_jupyter\DB-GPT-main\dbgpt\app\scene\base_chat.py
async def _no_streaming_call_with_retry(self, payload):with root_tracer.start_span("BaseChat.invoke_worker_manager.generate"):model_output = await self.call_llm_operator(payload)ai_response_text = self.prompt_template.output_parser.parse_model_nostream_resp(model_output, self.prompt_template.sep)prompt_define_response = (self.prompt_template.output_parser.parse_prompt_response(ai_response_text))metadata = {"model_output": model_output.to_dict(),"ai_response_text": ai_response_text,"prompt_define_response": self._parse_prompt_define_response(prompt_define_response),}with root_tracer.start_span("BaseChat.do_action", metadata=metadata):result = await blocking_func_to_async(self._executor, self.do_action, prompt_define_response)speak_to_user = self.get_llm_speak(prompt_define_response)view_message = await blocking_func_to_async(self._executor,self.prompt_template.output_parser.parse_view_response,speak_to_user,result,prompt_define_response,)return ai_response_text, view_message.replace("\n", "\\n")
在view_message中又做了格式处理
D:\Code\Code_jupyter\DB-GPT-main\dbgpt\app\scene\chat_data\chat_excel\excel_learning\out_parser.py
def parse_view_response(self, speak, data, prompt_response) -> str:if data and not isinstance(data, str):### tool out data to table viewhtml_title = f"### **Data Summary**\n{data.desciption} "html_colunms = self.__build_colunms_html(data.clounms)html_plans = self.__build_plans_html(data.plans)html = f"""{html_title}\n{html_colunms}\n{html_plans}"""return htmlelse:return speak解析完这个html的结果是
html解析结果
### **Data Summary**
这是一个关于用户设备故障的记录表,包含了故障发生的时间、涉及的机群、具体的设备以及故障的原由。
### **Data Structure**
- **1.[column name]** _时间(Time)_
- **1.[description]** _记录了故障发生的日期,格式为年-月-日(yyyy-MM-dd)_
- **2.[column name]** _机群(Cluster)_
- **2.[description]** _故障发生的设备所属的机群,例如A组、B组等_
- **3.[column name]** _设备(Equipment)_
- **3.[description]** _发生故障的具体设备,比如发电机组编号、变压器编号等_
- **4.[column name]** _故障原因(Cause)_
- **4.[description]** _导致设备故障的具体原因,例如短路、过热、漏水、软件故障、机械故障等_### **Analysis plans**
1. 分析故障趋势:通过时间(Time)查看故障发生的季节性、周期性规律,以预测可能的故障高发时段。
2. 机群故障分析:统计各机群(Cluster)的故障频率,找出故障率较高的机群,进行重点维护。
3. 设备故障分析:按设备(Equipment)类型分析故障频率,识别出故障频发的设备,评估设备可靠性。
4. 故障原因分类:对故障原因(Cause)进行统计,识别主要故障类型,优化设备设计或改进维护流程。
5. 故障预防模型:结合历史数据,建立机器学习模型,预测未来可能出现的故障,提前进行预防措施。
6. 故障响应时间分析:计算从故障发生到记录的时间,评估故障报告的及时性,优化故障响应流程
最后呈现在页面上的内容
对话
使用的提示词模板
_DEFAULT_TEMPLATE_ZH = """
请使用历史对话中的数据结构信息,在满足下面约束条件下通过duckdb sql数据分析回答用户的问题。
约束条件:1.请充分理解用户的问题,使用duckdb sql的方式进行分析, 分析内容按下面要求的输出格式返回,sql请输出在对应的sql参数中2.请从如下给出的展示方式种选择最优的一种用以进行数据渲染,将类型名称放入返回要求格式的name参数值种,如果找不到最合适的则使用'Table'作为展示方式,可用数据展示方式如下: {display_type}3.SQL中需要使用的表名是: {table_name},请检查你生成的sql,不要使用没在数据结构中的列名4.优先使用数据分析的方式回答,如果用户问题不涉及数据分析内容,你可以按你的理解进行回答5.输出内容中sql部分转换为:<api-call><name>[数据显示方式]</name><args><sql>[正确的duckdb数据分析sql]</sql></args></api- call> 这样的格式,参考返回格式要求请一步一步思考,给出回答,并确保你的回答内容格式如下:对用户说的想法摘要.<api-call><name>[数据展示方式]</name><args><sql>[正确的duckdb数据分析sql]</sql></args></api-call>用户问题:{user_input}
"""role='system' content="""你是一个数据分析专家!
请使用历史对话中的数据结构信息,
在满足下面约束条件下通过duckdb sql数据分析回答用户的问题。
约束条件:\n\t1.
请充分理解用户的问题,使用duckdb sql的方式进行分析, 分析内容按下面要求的输出格式返回,
sql请输出在对应的sql参数中
\t2.请从如下给出的展示方式种选择最优的一种用以进行数据渲染,
将类型名称放入返回要求格式的name参数值种,如果找不到最合适的则使用'Table'作为展示方式,
可用数据展示方式如下: response_line_chart:used to display comparative trend analysis data
response_pie_chart:suitable for scenarios such as proportion and distribution tatistics
response_table:
suitable for display with many display columns or non-numeric columns
response_scatter_plot:Suitable for exploring relationships between variables, detecting outliers, etc.
response_bubble_chart:Suitable for relationships between multiple variables,
highlighting outliers or special situations, etc.
response_donut_chart:Suitable for hierarchical structure representation, category proportion display and highlighting key categories, etc.
response_area_chart:Suitable for visualization of time series data, comparison of multiple groups of data, analysis of data change trends, etc.
response_heatmap:Suitable for visual analysis of time series data, large-scale data sets, distribution of classified data, etc.
\t3.SQL中需要使用的表名是: excel_data,请检查你生成的sql,不要使用没在数据结构中的列名
\t4.优先使用数据分析的方式回答,如果用户问题不涉及数据分析内容,你可以按你的理解进行回答
\t5.输出内容中sql部分转换为:<api-call><name>[数据显示方式]</name><args><sql>[正确的duckdb数据分析sql]</sql></args></api- call> 这样的格式,参考返回格式要求请一步一步思考,给出回答,并确保你的回答内容格式如下:
对用户说的想法摘要.<api-call><name>[数据展示方式]</name><args><sql>[正确的duckdb数据分析sql]</sql></args></api-call>\n\n用户问题:帮我查询断路器发生了什么故障\n""" round_index=0
D:\Code\Code_jupyter\DB-GPT-main\dbgpt\app\openapi\api_v1\api_v1.py
直接进对话框
@router.post("/v1/chat/completions")
async def chat_completions(dialogue: ConversationVo = Body(),flow_service: FlowService = Depends(get_chat_flow),
):#这里是选择的进行流式输出
return StreamingResponse(stream_generator(chat, dialogue.incremental, dialogue.model_name),headers=headers,media_type="text/plain",)
async def stream_generator(chat, incremental: bool, model_name: str):"""Generate streaming responsesOur goal is to generate an openai-compatible streaming responses.Currently, the incremental response is compatible, and the full response will be transformed in the future.Args:chat (BaseChat): Chat instance.incremental (bool): Used to control whether the content is returned incrementally or in full each time.model_name (str): The model nameYields:_type_: streaming responses"""span = root_tracer.start_span("stream_generator")msg = "[LLM_ERROR]: llm server has no output, maybe your prompt template is wrong."previous_response = ""async for chunk in chat.stream_call():if chunk:msg = chunk.replace("\ufffd", "")if incremental:incremental_output = msg[len(previous_response) :]choice_data = ChatCompletionResponseStreamChoice(index=0,delta=DeltaMessage(role="assistant", content=incremental_output),)chunk = ChatCompletionStreamResponse(id=chat.chat_session_id, choices=[choice_data], model=model_name)json_chunk = model_to_json(chunk, exclude_unset=True, ensure_ascii=False)yield f"data: {json_chunk}\n\n"else:# TODO generate an openai-compatible streaming responsesmsg = msg.replace("\n", "\\n")yield f"data:{msg}\n\n"previous_response = msgawait asyncio.sleep(0.02)if incremental:yield "data: [DONE]\n\n"span.end()
进行流式回答
D:\Code\Code_jupyter\DB-GPT-main\dbgpt\app\scene\base_chat.py
async for chunk in chat.stream_call():async def stream_call(self):# TODO Retry when server connection errorpayload = await self._build_model_request()logger.info(f"payload request: \n{payload}")ai_response_text = ""span = root_tracer.start_span("BaseChat.stream_call", metadata=payload.to_dict())payload.span_id = span.span_idtry:async for output in self.call_streaming_operator(payload):# Plugin research in result generationmsg = self.prompt_template.output_parser.parse_model_stream_resp_ex(output, 0)
#模型的输出为
"""
msg = "为了找出断路器发生了哪些故障,我们需要从数据中筛选出所有涉及“断路器”的故障记录,并查看这些记录中的故障原因。
<api-call><name>response_table</name><args><sql>SELECT 故障原因, COUNT(*) AS 故障次数 FROM excel_data WHERE 设备 LIKE '%断路器%' GROUP BY 故障原因</sql></args></api-call>"
"""# 给出答案view_msg = self.stream_plugin_call(msg)
"""
在经过处理之后view_msg =
为了找出断路器发生了哪些故障,我们需要从数据中筛选出所有涉及“断路器”的故障记录,并查看这些记录中的故障原因。 <chart-view content="{"type": "response_table", "sql": "SELECT 故障原因, COUNT(*) AS 故障次数 FROM excel_data WHERE 设备 LIKE '%断路器%' GROUP BY 故障原因", "data": [{"故障原因": "跳闸 ", "故障次数": 1}]}">\n</chart-view>
"""view_msg = view_msg.replace("\n", "\\n")yield view_msgself.current_message.add_ai_message(msg)view_msg = self.stream_call_reinforce_fn(view_msg)self.current_message.add_view_message(view_msg)span.end()except Exception as e:print(traceback.format_exc())logger.error("model response parse failed!" + str(e))self.current_message.add_view_message(f"""<span style=\"color:red\">ERROR!</span>{str(e)}\n {ai_response_text} """)### store current conversationspan.end(metadata={"error": str(e)})await blocking_func_to_async(self._executor, self.current_message.end_current_round)
D:\Code\Code_jupyter\DB-GPT-main\dbgpt\app\scene\chat_data\chat_excel\excel_analyze\chat.pydef stream_plugin_call(self, text):text = (text.replace("\\n", " ").replace("\n", " ").replace("\_", "_").replace("\\", " "))with root_tracer.start_span("ChatExcel.stream_plugin_call.run_display_sql", metadata={"text": text}):res = self.api_call.display_sql_llmvis(text, self.excel_reader.get_df_by_sql_ex)return res"""
res = self.api_call.display_sql_llmvis(text, self.excel_reader.get_df_by_sql_ex)
"""
这里对于display_sql_llmvis方法的一个说明
\Code\Code_jupyter\DB-GPT-main\dbgpt\agent\util\api_call.py
def display_sql_llmvis(self, llm_text, sql_run_func):"""Render charts using the Antv standard protocol.Args:llm_text: LLM response textsql_run_func: sql run functionReturns:ChartView protocol text"""try:if self._is_need_wait_plugin_call(llm_text) and self.check_last_plugin_call_ready(llm_text):# wait api call generate completeself.update_from_context(llm_text)for key, value in self.plugin_status_map.items():if value.status == Status.TODO.value:value.status = Status.RUNNING.valuelogger.info(f"SQL execution:{value.name},{value.args}")try:sql = value.args["sql"]if sql is not None and len(sql) > 0:data_df = sql_run_func(sql)value.df = data_dfvalue.api_result = json.loads(data_df.to_json(orient="records",date_format="iso",date_unit="s",))value.status = Status.COMPLETE.valueelse:value.status = Status.FAILED.valuevalue.err_msg = "No executable sql!"except Exception as e:logger.error(f"data prepare exception!{str(e)}")value.status = Status.FAILED.valuevalue.err_msg = str(e)value.end_time = datetime.now().timestamp() * 1000except Exception as e:logger.error("Api parsing exception", e)raise ValueError("Api parsing exception," + str(e))return self.api_view_context(llm_text, True)
可以看到在代码中
self.plugin_status_map.items() = {"<api-call><name>response_table</name><args><sql>SELECT 故障原因 FROM excel_data WHERE 设备 LIKE '%断路器%'</sql></args></api-call>": PluginStatus(name='response_table', location=[57], args={'args': None, 'sql': "SELECT 故障原因 FROM excel_data WHERE 设备 LIKE '%断路器%'"}, status='todo', logo_url=None, api_result=None, err_msg=None, start_time=1727155331909.641, end_time=None, df=None)}"""
这块用于提取sql语句