引言
当我们访问大模型相关的API服务时,通常会遇到速率限制(即限流),它用于防止用户向某个API发送大量请求,防止请求过载,确保每个人都能公平地访问API。
速率限制的方式
速率限制通常有以下几种形式:
- RPM(requests per minute) 每分钟请求数
- PRD(requests per day) 每天请求数
- TPM(tokens per minute) 每分钟token数
- TPD(tokens per day) 每天token数
- IPM(images per minute) 每分钟图像数:比如针对图像生成模型
- IPD(images per day) 每天图像数
速率限制可能会因为其中任一选项中达到峰值而触发。比如RPM限制为20,TPM限制为100k,假设一分钟内发送了20个请求,每个请求只有100(0.1k)个token,那么RPM的限制会触发,即使这20个请求内没有发满100k个token。
如何处理速率限制
当大量调用OpenAI API时,可能会遇到429: Too Many Reuqests
或RateLimitError
的错误消息,表示超过速率限制。
⚠️ 持续重试不能解决该问题。这里说的处理,是指尽可能不要超过这个速率限制,如果想从根源上解决一种方法是自己部署。
以硅基流动提供的免费嵌入模型为例,它的RPM是2000,即一分钟内只能发送2000个请求,其实不算低;但是TPM只有500k,假设文本块平均为500个token,实际上每分钟只能发送1000个文本块。
错误示范
from langchain_core.embeddings import Embeddings
from itertools import islice
batch_size = 32 # 硅基流动嵌入模型最大批大小为32
def batch_list(it, size=batch_size):it = iter(it)return iter(lambda: list(islice(it, size)), [])class SiliconflowEmbeddings(Embeddings):def __init__(self, model_name: str):self.model_name = model_nameself.api_url = "https://api.siliconflow.cn/v1/embeddings"def embed_documents(self, texts: List[str]) -> List[List[float]]:payload = {"model": self.model_name, "input": texts, "encoding_format": "float"}headers = {"accept": "application/json","content-type": "application/json","authorization": f"Bearer {os.environ['OPENAI_API_KEY']}",}embeds = []for batch in tqdm(batch_list(texts), total=len(texts) // batch_size):payload["input"] = batchresponse = requests.post(self.api_url, json=payload, headers=headers)embeds.extend([d["embedding"] for d in response.json()["data"]])return embedsdef embed_query(self, text: str) -> List[float]:return self.embed_documents([text])[0]
如果我们以512 token为一个文本块,假设一篇文档被拆分为2562个文本块,那么直接运行上面的代码会报错:
Get 2562 chunks
Storing...15%|███████████████████████████▉ | 12/80 [00:06<00:35, 1.94it/s]
2024-09-16 17:27:42.706 Uncaught app exceptionembeds.extend([d["embedding"] for d in response.json()["data"]])^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: 'NoneType' object is not iterable
这是因为它触发了速率限制,如果查看response.status_code
会发现它为429
。
查看硅基流动文档中关于429状态码的描述,正好就是RateLimit。
使用指数退避重试
指数退避(exponential backoff)通过反馈成倍地降低某个过程的速率,以逐渐找到合适的速率。
即在遇到速率限制错误时先进行短暂的睡眠,然后重试失败的请求。如果该请求仍然失败,则增加睡眠的时间并重复该过程,直到请求成功或达到最大重试次数。注意在实现的时候,可能不是固定的成倍数(比如睡眠时间成2),而是会增加一定的随机性。这里增加随机性防止多个设备执行退避重试时产生的同步化。
根据参考1中的方案。要使用指数退避重试,下面先介绍两个库,最后介绍自己实现。
使用tenacity库
要开始,执行pip install tenacity
安装依赖。
from langchain_core.embeddings import Embeddings
from itertools import islicefrom tenacity import (retry,stop_after_attempt,wait_random_exponential,
) code_success = 200class SiliconflowEmbeddings(Embeddings):def __init__(self, model_name: str):self.model_name = model_nameself.api_url = "https://api.siliconflow.cn/v1/embeddings"# 最多重试12次@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(12))def _post(self, payload, headers):response = requests.post(self.api_url, json=payload, headers=headers)if response.status_code != code_success:raise Exception(f"Error code: {response.status_code}")return responsedef embed_documents(self, texts: List[str]) -> List[List[float]]:payload = {"model": self.model_name, "input": texts, "encoding_format": "float"}headers = {"accept": "application/json","content-type": "application/json","authorization": f"Bearer {os.environ['OPENAI_API_KEY']}",}embeds = []for batch in tqdm(batch_list(texts), total=len(texts) // batch_size):payload["input"] = batchresponse = self._post(payload, headers)embeds.extend([d["embedding"] for d in response.json()["data"]])return embedsdef embed_query(self, text: str) -> List[float]:return self.embed_documents([text])[0]
tenacity
提供一个retry
注解,我们可以加到真实调用API处,比如我们封装一下requests.post
的代码,检查它返回的状态码,如果是429我们就抛出异常,触发retry
里面的重试。这里为了简单,只要不是200就抛出异常。
这样可以处理速率限制,注意如果没有速率限制,正常情况下只需要30秒即可完成嵌入操作:
19%|██████████████████████████████████▉ | 15/80 [00:07<00:30, 2.15it/s]
当触发速率限制后,在重试中会进行一些休眠,因此最终的完成的时间会大大超过这个30秒,但至少成功嵌入了:
81it [04:25, 3.27s/it]
使用backoff库
要开始,执行pip install backoff
安装依赖。
import backoff @backoff.on_exception(backoff.expo, Exception, max_time=60)
def _post(self, payload, headers):response = requests.post(self.api_url, json=payload, headers=headers)if response.status_code != code_success:raise Exception(f"Error code: {response.status_code}")return response
使用backoff
和tenacity
差不多,它也提供了一个注解backoff.on_exception
。它会会使用一个叫做jitter
的函数来增加随机性避免同步化带来的冲突:
jitter: A function of the value yielded by wait_gen returningthe actual time to wait. This distributes wait timesstochastically in order to avoid timing collisions acrossconcurrent clients. Wait times are jittered by defaultusing the full_jitter function. Jittering may be disabledaltogether by passing jitter=None.
自己实现指数退避重试
我们可以看看如何手动实现指数退避重试,以认识到它的实现原理非常简单:
import random
import timecode_success = 200
code_rate_limit = 429# 自定义异常
class ConflictError(Exception):status_code: int = 429def retry_with_exponential_backoff(func,initial_delay: float = 1,exponential_base: float = 2,jitter: bool = True,max_retries: int = 12,errors: tuple = (ConflictError,),
):def wrapper(*args, **kwargs):num_retries = 0delay = initial_delay# 循环直到一次成功的响应或达到max_retries次数while True:try:return func(*args, **kwargs)# 在指定的错误上重是except errors as e:# 增加重试次数num_retries += 1# 检查是否达到最大重试次if num_retries > max_retries:raise Exception(f"Maximum number of retries ({max_retries}) exceeded.")# 增加延迟 random.random() 生成 0到1之间的随机数,这里是为了增加随机性delay *= exponential_base * (1 + jitter * random.random())# 休眠time.sleep(delay)# 如果不是指定的异常则抛出except Exception as e:raise ereturn wrapper
我们判断状态码,如果是429则抛出ConflictError
,否则直接返回response
。
@retry_with_exponential_backoff
def _post(self, payload, headers):response = requests.post(self.api_url, json=payload, headers=headers)if response.status_code == code_rate_limit:raise ConflictError()return response
参考
- https://platform.openai.com/docs/guides/rate-limits
- https://cloud.siliconflow.cn/rate-limits