一、前言
- 引入异步编程趋势:Python的异步编程正变得越来越流行。在过去,同步的HTTP请求已经不足以满足对性能的要求。
- 异步HTTP客户端库的流行:目前,有许多第三方库已经实现了异步HTTP客户端,如aiohttp和httpx等。然而,异步语法使得代码变得更加冗长,导致缩进增多,降低了代码的可读性和简洁性。
- 封装异步HTTP客户端:为了简化异步HTTP请求的代码,我们需要封装一个常用的HTTP客户端,以实现业务中常见的功能,并提供更简洁的接口。在这篇博客中,我将使用httpx库来进行封装异步客户端,requests则是封装同步客户端,以实现常见的HTTP方法,并支持设置超时时间、请求参数等功能。
原文:Python 同、异步HTTP客户端封装:性能与简洁性的较量
二、同异步http客户端测试
同异步简易Demo
再封装之前先看看同异步发个http请求的代码差异,这里以 requests、aiohttp、httpx进行展示
依赖安装
pip install requests aiohttp httpx
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# @Author: Hui
# @Desc: { 模块描述 }
# @Date: 2023/09/28 10:09
import asyncio
import httpx
import aiohttp
import requestsdef requests_demo(url):print("requests_demo")resp = requests.get(url)print(resp.text)async def aiohttp_demo(url):print("aiohttp_demo")async with aiohttp.client.ClientSession() as session:async with session.get(url) as resp:html_text = await resp.text()print(html_text)async def httpx_demo(url):print("httpx_demo")async with httpx.AsyncClient() as client:resp = await client.get(url)print(resp.text)async def main():url = "https://juejin.cn/"requests_demo(url)await aiohttp_demo(url)await httpx_demo(url)if __name__ == '__main__':asyncio.run(main())
可以看到同步的requests库实现的非常简洁一行代码就可以发送http请求。但异步语法的 httpx与aiohttp就感觉代码很臃肿,要嵌套好多层,尤其aiohttp,可读性变差了好多,但异步的请求可以大大的提升并发性能,利用网络IO的耗时处理更多的请求任务,这在爬虫中可以大大提升性能,再异步的web框架中也非常适用。
并发http请求测试
再看看同异步如何并发请求数据
async def concurrent_http_test():# requests testurls = ["https://juejin.cn/"] * 10start_time = time.time()for url in urls:requests_demo(url)use_time = time.time() - start_timeprint(f"requests {len(urls)} http req use {use_time} s")# httpx teststart_time = time.time()await asyncio.gather(*[httpx_demo(url) for url in urls])use_time = time.time() - start_timeprint(f"httpx {len(urls)} http req use {use_time} s")# aiohttp teststart_time = time.time()await asyncio.gather(*[aiohttp_demo(url) for url in urls])use_time = time.time() - start_timeprint(f"aiohttp {len(urls)} http req use {use_time} s")
结果:
requests 10 http req use 2.9108400344848633 shttpx 10 http req use 0.8657052516937256 saiohttp 10 http req use 1.9703822135925293 s
requests 请求demo是同步一个一个请求,所以会慢好多,而 httpx、aiohttp 是通过 asyncio.gather 并发请求的,会一次性发送10个请求,这样网络IO的耗时就复用了,但发现 aiohttp 的效果不尽人意,与httpx的0.86s相差太大,都是异步库,不应该的,于是看看之前写的demo代码发现其实aiohttp并没有复用 ClientSession 每次都是创建一个新的实例来去发送请求,这样频繁的创建与销毁连接会大大影响性能,httpx的 async with httpx.AsyncClient() as client:
好像是一样的问题,但httpx效果更好些。
尝试把 aiohttp 的 ClientSession 与 httpx.AsyncClient() 放到全局中去,再试试。
def requests_demo(url, session):# print("requests_demo")resp = session.get(url)return respasync def aiohttp_demo(url, aio_session):# print("aiohttp_demo")async with aio_session.get(url) as resp:# html_text = await resp.text()return respasync def httpx_demo(url, client):# print("httpx_demo")resp = await client.get(url)return respasync def concurrent_http_test():# requests testurls = ["https://juejin.cn/"] * 10start_time = time.time()with ThreadPoolExecutor() as pool:session = requests.session()for url in urls:pool.submit(requests_demo, url, session)use_time = time.time() - start_timeprint(f"requests {len(urls)} http req use {use_time} s")# aiohttp teststart_time = time.time()async with aiohttp.client.ClientSession() as aio_session:await asyncio.gather(*[aiohttp_demo(url, aio_session) for url in urls])use_time = time.time() - start_timeprint(f"aiohttp {len(urls)} http req use {use_time} s")# httpx teststart_time = time.time()async with httpx.AsyncClient() as client:await asyncio.gather(*[httpx_demo(url, client) for url in urls])use_time = time.time() - start_timeprint(f"httpx {len(urls)} http req use {use_time} s")
改进效果
requests 10 http req use 1.2176601886749268 saiohttp 10 http req use 0.4052879810333252 shttpx 10 http req use 0.5238490104675293 s
异步的效果很明显快了很多,requests 请求我也用 session 与线程池来并发请求看看效果,但网络有波动每次测的数据都不一样,所以这里的测试值仅作为参考。
三、异步http客户端封装
简易封装
aiohttp 与 httpx 性能都差不多,由于之前用 requests 习惯了,再接触这些异步封装的语法都觉得好怪,而 httpx的api 与 requests 类似,所以我就选择用 htppx 简单封装下。
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# @Author: Hui
# @Desc: { http客户端 }
# @Date: 2023/08/10 09:33
import httpx
from datetime import timedeltaclass HttpMethod(BaseEnum):GET = "GET"POST = "POST"PATCH = "PATCH"PUT = "PUT"DELETE = "DELETE"HEAD = "HEAD"OPTIONS = "OPTIONS"class RespFmt(BaseEnum):"""http响应格式"""JSON = "json"BYTES = "bytes"TEXT = "text"class AsyncHttpClient:"""异步HTTP客户端通过httpx封装,实现了常见的HTTP方法,支持设置超时时间、请求参数等,简化了异步调用的层级缩进。Attributes:default_timeout: 默认请求超时时间,单位秒default_headers: 默认请求头字典default_resp_fmt: 默认响应格式jsonclient: httpx 异步客户端response: 每次实例请求的响应"""def __init__(self, timeout=timedelta(seconds=10), headers: dict = None, resp_fmt: RespFmt = RespFmt.JSON):"""构造异步HTTP客户端"""self.default_timeout = timeoutself.default_headers = headers or {}self.default_resp_fmt = resp_fmtself.client = httpx.AsyncClient()self.response: httpx.Response = Noneasync def _request(self,method: HttpMethod, url: str,params: dict = None, data: dict = None,timeout: timedelta = None, **kwargs):"""内部请求实现方法创建客户端会话,构造并发送HTTP请求,返回响应对象Args:method: HttpMethod 请求方法, 'GET', 'POST' 等url: 请求URLparams: 请求查询字符串参数字典data: 请求体数据字典timeout: 超时时间,单位秒kwargs: 其他关键字参数Returns:httpx.Response: HTTP响应对象"""timeout = timeout or self.default_timeoutheaders = self.default_headers or {}self.response = await self.client.request(method=method.value,url=url,params=params,data=data,headers=headers,timeout=timeout.total_seconds(),**kwargs)return self.responsedef _parse_response(self, resp_fmt: RespFmt = None):"""解析响应Args:resp_fmt: 响应格式Returns:resp Union[dict, bytes, str]"""resp_fmt = resp_fmt or self.default_resp_fmtresp_content_mapping = {RespFmt.JSON: self.json,RespFmt.BYTES: self.bytes,RespFmt.TEXT: self.text,}resp_func = resp_content_mapping.get(resp_fmt)return resp_func()def json(self):return self.response.json()def bytes(self):return self.response.contentdef text(self):return self.response.textasync def get(self, url: str, params: dict = None, timeout: timedelta = None, resp_fmt: RespFmt = None, **kwargs):"""GET请求Args:url: 请求URLparams: 请求查询字符串参数字典timeout: 请求超时时间,单位秒resp_fmt: 响应格式,默认None 使用实例对象的 default_resp_fmtReturns:resp => dict or bytes"""await self._request(HttpMethod.GET, url, params=params, timeout=timeout, **kwargs)return self._parse_response(resp_fmt)async def post(self, url: str, data: dict = None, timeout: timedelta = None, resp_fmt: RespFmt = None, **kwargs):"""POST请求Args:url: 请求URLdata: 请求体数据字典timeout: 请求超时时间,单位秒resp_fmt: 响应格式,默认None 使用实例对象的 default_resp_fmtReturns:resp => dict or bytes"""await self._request(HttpMethod.POST, url, data=data, timeout=timeout, **kwargs)return self._parse_response(resp_fmt)async def put(self, url: str, data: dict = None, timeout: timedelta = None, resp_fmt: RespFmt = None, **kwargs):"""PUT请求Args:url: 请求URLdata: 请求体数据字典timeout: 请求超时时间,单位秒resp_fmt: 响应格式,默认None 使用实例对象的 default_resp_fmtReturns:resp => dict"""await self._request(HttpMethod.PUT, url, data=data, timeout=timeout, **kwargs)return self._parse_response(resp_fmt)async def delete(self, url: str, data: dict = None, timeout: timedelta = None, resp_fmt: RespFmt = None, **kwargs):"""DELETE请求Args:url: 请求URLdata: 请求体数据字典timeout: 请求超时时间,单位秒resp_fmt: 响应格式,默认None 使用实例对象的 default_resp_fmtReturns:resp => dict"""await self._request(HttpMethod.DELETE, url, data=data, timeout=timeout, **kwargs)return self._parse_response(resp_fmt)
封装细节
这里封装就是简单的内部维护一个 httpx 的异步客户端,然后初始化一些默认的参数
- default_timeout: 默认请求超时时间,单位秒,默认10s
- default_headers: 默认请求头字典
- default_resp_fmt: 默认响应格式json
- client: httpx 异步客户端
- response: 每次实例请求的响应
class AsyncHttpClient:"""异步HTTP客户端"""def __init__(self, timeout=timedelta(seconds=10), headers: dict = None, resp_fmt: RespFmt = RespFmt.JSON):"""构造异步HTTP客户端"""self.default_timeout = timeoutself.default_headers = headers or {}self.default_resp_fmt = resp_fmtself.client = httpx.AsyncClient()self.response: httpx.Response = None
然后实现几个常用的请求,get、post、put、delete方法
async def post(self, url: str, data: dict = None, timeout: timedelta = None, resp_fmt: RespFmt = None, **kwargs):"""POST请求Args:url: 请求URLdata: 请求体数据字典timeout: 请求超时时间,单位秒resp_fmt: 响应格式,默认None 使用实例对象的 default_resp_fmtReturns:resp => dict or bytes"""await self._request(HttpMethod.POST, url, data=data, timeout=timeout, **kwargs)return self._parse_response(resp_fmt)
每个请求方法冗余了一些常用的参数字段,例如
-
params 查询字符串入参
-
data body入参
-
timeout: 请求超时时间,单位秒
-
resp_fmt: 响应格式,默认None 使用实例对象的 default_resp_fmt
- 默认json,一般我们http的数据交互都是使用 json了
-
**kwargs 预留其他关键字参数的入参
- 这样有助于有些参数没想到要设计但经常用,可以通过kwargs来弥补
其实 get、post、put、delete方法没做什么事,就是标记了下使用什么请求方法、参数,最终都是让 _request
方法处理。
async def _request(self,method: HttpMethod, url: str,params: dict = None, data: dict = None,timeout: timedelta = None, **kwargs
):"""内部请求实现方法创建客户端会话,构造并发送HTTP请求,返回响应对象Args:method: HttpMethod 请求方法, 'GET', 'POST' 等url: 请求URLparams: 请求查询字符串参数字典data: 请求体数据字典timeout: 超时时间,单位秒kwargs: 其他关键字参数Returns:httpx.Response: HTTP响应对象"""timeout = timeout or self.default_timeoutheaders = self.default_headers or {}self.response = await self.client.request(method=method.value,url=url,params=params,data=data,headers=headers,timeout=timeout.total_seconds(),**kwargs)return self.response
处理完再根据指定的响应格式进行解析
def _parse_response(self, resp_fmt: RespFmt = None):"""解析响应Args:resp_fmt: 响应格式Returns:resp Union[dict, bytes, str]"""resp_fmt = resp_fmt or self.default_resp_fmtresp_content_mapping = {RespFmt.JSON: self.json,RespFmt.BYTES: self.bytes,RespFmt.TEXT: self.text,}resp_func = resp_content_mapping.get(resp_fmt)return resp_func()def json(self):return self.response.json()def bytes(self):return self.response.contentdef text(self):return self.response.text
通过字典的方法来处理不同的解析格式,简化了 if elif
的操作,这里封装主要是将一些常用操作封装起来,让代码更简洁,当然也可以获取响应对象后,自己自由处理,最后看看封装后的使用Demo
from py_tools.connections.http import AsyncHttpClient
from py_tools.enums.http import RespFmtasync def httpx_demo(url):print("httpx_demo")async with httpx.AsyncClient() as client:resp = await client.get(url)# print(resp.text)return respasync def main():url = "https://juejin.cn/"resp_obj = await httpx_demo(url)resp_text = resp_obj.textresp_text = await AsyncHttpClient().get(url, resp_fmt=RespFmt.TEXT)if __name__ == '__main__':asyncio.run(main())
封装后简洁了许多,虽然方法有些冗余参数,但在业务中使用就不会出现好多嵌套的缩进,也牺牲了一些灵活性,因为只封装一些常用的请求操作,但一开始也想不全,只有在业务中不断的磨练,以及大家一起提建议贡献,才能慢慢的变得更好用。有时候适当的冗余封装也挺不错的。
四、同步http客户端
同步的其实 requests 已经够简洁了,没必要再封装了,这里为了统一公共库的调用,就二次封装下,思路还是跟异步的一样,有一点不一样的就是,get、post、put、delete方法返回的是 self 的引用,用于一些链式操作。一开始我想把异步的也变成链式调用,发现做不到,方法如果不await拿不到结果,返回的是 协程对象,所以一时半会弄不出来,就用了一个参数的方式来处理。
class HttpClient:"""同步HTTP客户端通过request封装,实现了常见的HTTP方法,支持设置超时时间、请求参数等,链式调用Examples:>>> HttpClient().get("http://www.baidu.com").text>>> HttpClient().get("http://www.google.com", params={"name": "hui"}).bytes>>> HttpClient().post("http://www.google.com", data={"name": "hui"}).jsonAttributes:default_timeout: 默认请求超时时间,单位秒default_headers: 默认请求头字典client: request 客户端response: 每次实例请求的响应"""def __init__(self, timeout=timedelta(seconds=10), headers: dict = None):"""构造异步HTTP客户端"""self.default_timeout = timeoutself.default_headers = headers or {}self.client = requests.session()self.response: requests.Response = Nonedef _request(self,method: HttpMethod, url: str,params: dict = None, data: dict = None,timeout: timedelta = None, **kwargs):"""内部请求实现方法创建客户端会话,构造并发送HTTP请求,返回响应对象Args:method: HttpMethod 请求方法, 'GET', 'POST' 等url: 请求URLparams: 请求查询字符串参数字典data: 请求体数据字典timeout: 超时时间,单位秒kwargs: 其他关键字参数Returns:httpx.Response: HTTP响应对象"""timeout = timeout or self.default_timeoutheaders = self.default_headers or {}self.response = self.client.request(method=method.value,url=url,params=params,data=data,headers=headers,timeout=timeout.total_seconds(),**kwargs)return self.response@propertydef json(self):return self.response.json()@propertydef bytes(self):return self.response.content@propertydef text(self):return self.response.textdef get(self, url: str, params: dict = None, timeout: timedelta = None, **kwargs):"""GET请求Args:url: 请求URLparams: 请求查询字符串参数字典timeout: 请求超时时间,单位秒Returns:self 自身对象实例"""self._request(HttpMethod.GET, url, params=params, timeout=timeout, **kwargs)return selfdef post(self, url: str, data: dict = None, timeout: timedelta = None, **kwargs):"""POST请求Args:url: 请求URLdata: 请求体数据字典timeout: 请求超时时间,单位秒Returns:self 自身对象实例"""self._request(HttpMethod.POST, url, data=data, timeout=timeout, **kwargs)return selfasync def put(self, url: str, data: dict = None, timeout: timedelta = None, **kwargs):"""PUT请求Args:url: 请求URLdata: 请求体数据字典timeout: 请求超时时间,单位秒Returns:self 自身对象实例"""self._request(HttpMethod.PUT, url, data=data, timeout=timeout, **kwargs)return selfasync def delete(self, url: str, data: dict = None, timeout: timedelta = None, **kwargs):"""DELETE请求Args:url: 请求URLdata: 请求体数据字典timeout: 请求超时时间,单位秒Returns:self 自身对象实例"""self._request(HttpMethod.DELETE, url, data=data, timeout=timeout, **kwargs)return self
五、源代码
源代码已上传到了Github,里面也有具体的使用Demo,欢迎大家一起体验、贡献。
HuiDBK/py-tools: 打造 Python 开发常用的工具,让Coding变得更简单 (github.com)