用 logfire 提高应用的可观测性

Logfire是由 Pydantic 团队打造的平台, 还有供 app 使用的 library, 我们经常提到对应用要做 LMT(Log, Metrics, Trace),
Logfire 可以用来收集、存储、分析和可视化日志数据和应用性能指标。通过集成日志和度量,Logfire 提供了一个统一的界面来管理应用程序和系统的可观测性.

Logfire 其实是基于 OpenTelemetry构建的,可以使用大量现有工具和基础架构,包括许多常见 Python 包的观测(instrument)。

OpenTelemetry 是一个开源的可观测性框架,用于生成、收集、处理和导出应用程序的分布式追踪、日志和度量数据。
它旨在帮助开发者更好地监控分布式系统中的应用程序性能,并进行故障排查。

OpenTelemetry 是 CNCF(Cloud Native Computing Foundation)的项目,它统一了许多流行的监控和可观测性工具,比如 OpenTracing 和 OpenCensus。
通过 OpenTelemetry,开发者可以在不同的语言和框架中统一地生成可观测性数据(追踪、日志、指标),并将这些数据发送到不同的后端系统进行分析和可视化,如 Prometheus、Grafana、Jaeger、Zipkin 等

OpenTelemetry 的核心概念就是 LMT:

  1. Logs(日志):
    日志记录应用程序在运行时输出的信息,包括错误、状态信息和其他调试数据。

  2. Metrics(度量):
    用于收集关于系统性能的定量数据,例如 CPU 使用率、内存占用、请求延迟等。这些度量帮助监控应用的性能。

  3. Tracing(追踪):
    用于跟踪跨越不同服务或组件的单个请求,帮助你了解整个请求的生命周期。追踪包括多个 span,每个 span 表示一次操作或请求。

Logfire 比 OpenTelemetry 更好用, 我最近用 FastAPI 写一些 LLM 的应用, 将 Logfire 集成到 FastAPI 中用于日志记录和性能度量,可以帮助监控和分析 FastAPI 应用的健康状态和性能表现。可以通过 logfire 做到

  • 收集和发送 FastAPI 的请求日志。
  • 记录异常并发送到 Logfire。
  • 通过 Prometheus 或 Logfire 的度量功能,监控应用的性能指标。

Logfire 与 FastAPI 的集成

1. 安装依赖

Logfire 提供了适用于不同语言的 SDK,首先你需要安装 Logfire 的 Python 客户端库。通常,官方提供的 SDK 可以通过 pip 安装。

pip install logfire

假设 Logfire 提供了一个 SDK 来发送日志和指标,我们会用这个来集成 FastAPI。

2. FastAPI 日志集成

Logfire 的 SDK 一般允许你直接将应用的日志发送到它的后端。我们可以通过 FastAPI 的事件钩子来捕获日志并发送给 Logfire。

首先,配置 Logfire 的客户端实例:

from logfire import LogfireClient
from fastapi import FastAPI, Request
import loggingapp = FastAPI()# 初始化 Logfire 客户端
logfire_client = LogfireClient(api_key="your-logfire-api-key")# 设置 FastAPI 的 logger
logger = logging.getLogger("fastapi")
logger.setLevel(logging.INFO)@app.middleware("http")
async def log_requests(request: Request, call_next):# 记录请求信息response = await call_next(request)log_data = {"method": request.method,"url": str(request.url),"status_code": response.status_code,"client_ip": request.client.host,}# 将日志发送到 Logfirelogfire_client.log("Request info", log_data)return response

在这个例子中,通过 FastAPI 的 middleware 机制,在每次 HTTP 请求时捕获请求日志,并将其发送到 Logfire 平台。

3. FastAPI 度量集成

除了日志记录,还可以通过 Logfire 记录应用的性能指标,比如响应时间、CPU 和内存使用等。

import time@app.middleware("http")
async def add_metrics(request: Request, call_next):# 记录请求开始时间start_time = time.time()# 处理请求response = await call_next(request)# 计算响应时间process_time = time.time() - start_timelogfire_client.metric("request_duration_seconds", process_time)# 记录额外度量数据metrics_data = {"method": request.method,"url": str(request.url),"status_code": response.status_code,"duration": process_time,}# 将度量数据发送到 Logfirelogfire_client.log("Request metrics", metrics_data)return response

这个 middleware 会计算每次请求的处理时间,并通过 Logfire 的度量功能发送响应时间等性能数据。

4. 异常处理日志

如果 FastAPI 中发生了未捕获的异常,你可以通过全局异常处理器记录日志并将其发送到 Logfire。

from fastapi import HTTPException
from starlette.middleware.errors import ServerErrorMiddleware@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):# 捕获 HTTP 异常并记录到 Logfirelogfire_client.log("HTTP Exception", {"status_code": exc.status_code, "detail": exc.detail})return JSONResponse(status_code=exc.status_code, content={"detail": exc.detail})@app.middleware("http")
async def catch_exceptions_middleware(request: Request, call_next):try:return await call_next(request)except Exception as exc:logfire_client.log("Unhandled Exception", {"exception": str(exc)})raise exc  # 继续抛出异常

5. 可选:Prometheus 度量与 Logfire 集成

你还可以使用 FastAPI 与 Prometheus 结合,然后将 Prometheus 收集的度量数据导入 Logfire。

首先,使用 prometheus-fastapi-instrumentator 进行集成。

pip install prometheus-fastapi-instrumentator

然后在 FastAPI 应用中添加 Prometheus 的指标收集器:

from prometheus_fastapi_instrumentator import Instrumentator# 初始化 Prometheus 指标收集器
Instrumentator().instrument(app).expose(app)@app.on_event("startup")
async def on_startup():# 如果 Logfire 支持从 Prometheus 获取数据,可以配置 Prometheus 度量推送到 Logfirepass

这个配置可以让 Prometheus 采集到 FastAPI 应用的性能数据,并将其推送到 Logfire 平台。

用 Logfire 记录和观测大模型交互的性能

举例如下

#!/usr/bin/env python3
from pydantic import BaseModel
from fastapi import FastAPI
from openai import AsyncOpenAI
import logfire
from async_llm_agent import AsyncLlmAgent
import asyncio
from collections.abc import Iterable
from fastapi.responses import StreamingResponse# request
class UserData(BaseModel):query: str# response
class UserDetail(BaseModel):name: strage: intclass MultipleUserData(BaseModel):queries: list[str]app = FastAPI()
agent = AsyncLlmAgent()
#logfire.configure(pydantic_plugin=logfire.PydanticPlugin(record="all"))
logfire.configure(service_name='lazy-llm-agent')
logfire.instrument_pydantic()
logfire.instrument_openai(agent.get_llm_client(), suppress_other_instrumentation=True)
logfire.instrument_fastapi(app)@app.post("/user", response_model=UserDetail)
async def endpoint_function(data: UserData) -> UserDetail:system_prompt = "You are a smart AI assitant"user_prompt = f"Extract: `{data.query}`"user_detail = await agent.get_object_response(system_prompt, user_prompt, UserDetail)return user_detail@app.post("/many-users", response_model=list[UserDetail])
async def extract_many_users(data: MultipleUserData):async def extract_user(query: str):system_prompt = "You are a smart AI assitant"user_prompt = f"Extract: `{data.query}`"user_detail = await agent.get_object_response(system_prompt, user_prompt, UserDetail)logfire.info("/User returning", value=user_detail)return user_detailcoros = [extract_user(query) for query in data.queries]return await asyncio.gather(*coros)@app.post("/extract", response_class=StreamingResponse)
async def extract(data: UserData):system_prompt = "You are a smart AI assitant"user_prompt = f"Extract: `{data.query}`"users = await agent.get_objects_response(system_prompt, user_prompt, UserDetail, stream=True)async def generate():with logfire.span("Generating User Response Objects"):async for user in users:resp_json = user.model_dump_json()logfire.info("Returning user object", value=resp_json)yield resp_jsonreturn StreamingResponse(generate(), media_type="text/event-stream")def act_as_client(port: int):import requestsresponse = requests.post(f"http://127.0.0.1:{port}/extract",json={"query": "Alice and Bob are best friends. \They are currently 32 and 43 respectively. "},stream=True,)for chunk in response.iter_content(chunk_size=1024):if chunk:print(str(chunk, encoding="utf-8"), end="\n")if __name__ == "__main__":import argparseparser = argparse.ArgumentParser()parser.add_argument('--role','-r', action='store', dest='role', help='specify role: client|server')parser.add_argument('--port','-p', type=int, action='store', dest='port', default=2024, help='specify listen port')args = parser.parse_args()if (args.role=="client"):act_as_client(args.port)else:import uvicornuvicorn.run(app, host="localhost", port=args.port)

要点:

  1. 配置Logfire, 注意要先在 https://logfire.pydantic.dev 上注册你的项目, 获取一个 token
logfire.configure(service_name='lazy-llm-agent')
# 上面这行代码配置了 logfire 的 service_name, 其中参数 token 没有显式传入, 因为已经在环境变量中配置了 LOGFIRE_TOKEN=xxx
logfire.instrument_pydantic()
# 上面这行代码配置了logfire,使其记录所有通过pydantic模型进行的数据交换。
  1. 植入监测 Instrumentation:
logfire.instrument_fastapi(app):
# 上面这行代码将FastAPI应用与logfire集成,以便自动记录API请求和响应。
logfire.instrument_openai(self._client, suppress_other_instrumentation=False):
# 上面这行代码将AsyncOpenAI客户端与logfire集成,以便记录与OpenAI API的交互。
  1. 记录日志:
  • 在extract_many_users函数中,logfire.info("/User returning", value=user_detail) 记录了用户详细信息的返回。
  • 在extract函数的generate生成器中,logfire.info("Returning user object", value=resp_json) 记录了流式响应中的用户对象。
  1. 使用Span:
with logfire.span("Generating User Response Objects"):
# 上面的上下文管理器用于创建一个日志跨度,记录生成用户响应对象的时间和细节。

测试步骤

  1. 启动服务端程序
% ./instructor_server.py -r server
  1. 启动客户端程序
% ./instructor_server.py -r client
Logfire project URL: https://logfire.pydantic.dev/walterfan/lazy-rabbit-agent
{"name":"Alice","age":32}
Logfire project URL: https://logfire.pydantic.dev/walterfan/lazy-rabbit-agent
{"name":"Bob","age":43}

这样我们就可以看到我们的应用程序与大模型的交互次数以及所耗费的时间

snapshot

参考链接

  • 参考文章 https://python.useinstructor.com/blog/2024/05/03/fastapi-open-telemetry-and-instructor
  • 上述例子的源码:
    • https://github.com/walterfan/lazy-rabbit-agent/blob/master/example/instructor_logfire.py
    • https://github.com/walterfan/lazy-rabbit-agent/blob/master/example/async_llm_agent.py

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1560572.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

Windows环境mysql 9安装mysqld install报错:Install/Remove of the Service Denied!

Windows环境mysql 9安装mysqld install报错:Install/Remove of the Service Denied! 解决方案: 控制台/批处理命令窗口需要以系统管理员身份运行。 mysql数据库环境配置和安装启动,Windows-CSDN博客文章浏览阅读920次。先下载mysql的zip压缩…

ChatTTS 本地安装和测试

Ubuntu 22服务器,3.9/3.10都可以,但是 3.11不可以 sudo apt install python3.10 apt install python3.10 python3.10-dev #ubuntu 22 安装python3.10对应的pip3.10 # 下载 get-pip.py curl -sS https://bootstrap.pypa.io/get-pip.py -o get-pip.py # 使…

干货分享:Air780E选型的注意事项

Air780E已经是个明星模组了,累计出货数量2000万,广泛应用于物联网各行业。 今天计划讲一讲选择Air780E的注意事项!从用户的角度,解答大家对Air780E这款模组最关心的问题,更多从选型、应用等非技术维度展开。 选择Air…

Spring Boot洗衣店订单系统:业务流程优化

2相关技术 2.1 MYSQL数据库 MySQL是一个真正的多用户、多线程SQL数据库服务器。 是基于SQL的客户/服务器模式的关系数据库管理系统,它的有点有有功能强大、使用简单、管理方便、安全可靠性高、运行速度快、多线程、跨平台性、完全网络化、稳定性等,非常适…

算法-依据先序遍历和中序遍历构建二叉树

简单的二叉树遍历算法, 为了通过给定的先序遍历(preorder)和中序遍历(inorder)数组构造二叉树,我们需要理解这两种遍历方式的特点: 先序遍历(Preorder):首先…

如何高效部署SD-WAN及是否需要路由器?

随着SD-WAN(软件定义广域网)的快速普及,企业在构建网络架构时迎来了更多灵活和高效的管理方式。但在决定是否仍需部署物理路由器时,关键在于企业的具体网络需求与架构特点。 SD-WAN的最大特点是其通过虚拟化技术来实现网络管理。通…

<<迷雾>> 第10章 用机器做一连串的加法(6)--循环移位寄存器改进的控制器 示例电路

使用循环移位寄存器来简化装载和相加过程. info::操作说明 鼠标单击开关切换开合状态 开始之前, 应当设置循环移位寄存器 RR 的初始状态, t01, t10.(如果不是该状态, 可单击一次开关 K 即可) 在 GA 传输门左边的开关置入一个数, 比如 10. 闭合 K装载, 断开 K相加, 此时 IGAIR…

使用 three.js和 shader 实现一个五星红旗 飘扬得着色器

使用 three.js和 shader 实现一个五星红旗 飘扬得着色器 源链接:https://threehub.cn/#/codeMirror?navigationThreeJS&classifyshader&idchinaFlag 国内站点预览:http://threehub.cn github地址: https://github.com/z2586300277/three-ce…

TY1801 内置GaN电源芯片(18w-65w)

TY1801 是一款针对离线式反激变换器的多模式 PWM GaN 功率开关。TY1801内置 GaN 功率管,具备超宽 的 VCC 工作范围,非常适用于 PD 快充等要求宽输出电压的应用场合,TY1801不需要使用额外的绕组或外围降压电路,节省系统 BOM 成本。TY1801 支持 Burst&…

【最新华为OD机试E卷-支持在线评测】智能成绩表(100分)多语言题解-(Python/C/JavaScript/Java/Cpp)

🍭 大家好这里是春秋招笔试突围 ,一枚热爱算法的程序员 💻 ACM金牌🏅️团队 | 大厂实习经历 | 多年算法竞赛经历 ✨ 本系列打算持续跟新华为OD-E/D卷的多语言AC题解 🧩 大部分包含 Python / C / Javascript / Java / Cpp 多语言代码 👏 感谢大家的订阅➕ 和 喜欢�…

Unity网络开发基础 —— 实践小项目

概述 接Unity网络开发基础 导入基础知识中的代码 需求分析 手动写Handler类 手动书写消息池 using GamePlayer; using System; using System.Collections; using System.Collections.Generic; using UnityEngine;/// <summary> /// 消息池中 主要是用于 注册 ID和消息类…

视频怎么去除杂音保留人声?让人声更动听!视频噪音处理攻略

在视频制作过程中&#xff0c;音质是至关重要的一环。然而&#xff0c;很多时候我们录制的视频会伴随着各种不想要的杂音&#xff0c;比如风声、交通噪音或是其他环境音&#xff0c;这些杂音严重影响了观众的观看体验。那么&#xff0c;如何在保留人声的同时&#xff0c;有效地…

Linux与科学计算

1、引言 Linux作为一种开源操作系统&#xff0c;在科学计算领域得到了广泛的应用。科学计算通常涉及处理大量的数据和复杂的数学模型&#xff0c;要求计算机系统具备强大的计算能力、灵活性和高效性。Linux凭借其高可扩展性、稳定性和开源生态系统的优势&#xff0c;成为科学计…

Scala面试题大全~基础题(15题)

1&#xff1a;Scala是什么? Scala是一种多范式的编程语言&#xff0c;它结合了面向对象编程和函数式编程的特性&#xff0c;它支持面向对象、函数式和命令式编程方法。Scala运行在Java虚拟机&#xff08;JVM&#xff09;上&#xff0c;这意味着它可以与Java代码无缝集成。它还…

情绪识别数据集(包含25w张图片) yolo格式类别:八种训练数据已划分, 识别精度:90%

情绪识别数据集(包含25w张图片) yolo格式 类别&#xff1a;Anger、Contempt、Disgust、Fear、Happy、Neutral、Sad、Surprise 八种 训练数据已划分&#xff0c;配置文件稍做路径改动即可训练。 训练集&#xff1a;171010 验证集&#xff1a;54060 测试集&#xff1a;27550 共计…

企业架构系列(17)使用ArchiMate支持TOGAF

从本篇开始&#xff0c;介绍如何使用 ArchiMate 建模语言支持 TOGAF 标准。用于支持使用企业架构、解决方案或其他架构活动进行业务转型。 架构开发方法&#xff08;ADM&#xff09;是TOGAF标准的方法组件&#xff0c;它描述了若干活动阶段。例如&#xff0c;ADM预备阶段的重点…

雨晨 24H2 正式版 Windows 11 iot ltsc 2024 适度 26100.2033 VIP2IN1

雨晨 24H2 正式版 Windows 11 iot ltsc 2024 适度 26100.2033 VIP2IN1 install.wim 索引: 1 名称: Windows 11 IoT 企业版 LTSC 2024 x64 适度 (生产力环境推荐) 描述: Windows 11 IoT 企业版 LTSC 2024 x64 适度 By YCDISM 2024-10-09 大小: 15,699,006,618 个字节 索引: 2 …

探索高效的 PDF 拆分工具及其独特功能

当一份大型的PDF文档包含了多个不同主题或章节的内容时&#xff0c;将其拆分成独立的部分可以更方便我们的阅读、编辑和管理。接下来&#xff0c;让我们一起走进PDF拆分工具的世界&#xff0c;了解它们的功能和价值。 1.福昕PDF编辑器 链接一下>>https://editor.foxits…

C++ 算法学习——1.8 单调栈算法

单调栈&#xff08;Monotonic Stack&#xff09;是一种在解决一些数组或者链表相关问题时非常有用的数据结构和算法。在C中&#xff0c;单调栈通常用于解决一些需要快速找到元素左右第一个比当前元素大或小的问题。 定义&#xff1a; 单调栈实际上是一个栈&#xff0c;但是与普…

数据交换的金钟罩:合理利用安全数据交换系统,确保信息安全

政府单位为了保护网络不受外部威胁和内部误操作的影响&#xff0c;通常会进行网络隔离&#xff0c;隔离成内网和外网。安全数据交换系统是专门设计用于在不同的网络环境&#xff08;如内部不同网络&#xff0c;内部网络和外部网络&#xff09;之间安全传输数据的解决方案。 使用…