在python的整体生态中,虽然已经有很多库支持了异步调用,如可以使用httpx或者aiohttp代替requests库发起http请求,使用asyncio.sleep 代替time.sleep, 但是依然还有很多优秀的第三方库是不支持异步调用也没有可代替的库,那么如何在FastAPI中调用这种没有实现异步的库但是又不阻塞整个系统呢?
查看官方文档,https://fastapi.tiangolo.com/zh/async/ ,最后有个简要的结论
- 如果你的函数中有使用await,则函数使用async def 定义。
- 如果不知道是否要使用async,则不要用,使用def 就好。
原理:
**在python 中,使用async def 定义的函数是运行在协程中,而多个协程是在一个主线程中的。fastapi中的async def协程路由处理的请求会全部放在main thread,而def 处理的请求每个都有自己的独立线程,fastapi内部会自动为这两种接口进行对应的处理逻辑 **
我们来详细看一下各种情况。本文将使用await asyncio.sleep(5)
来模拟实现了异步操作的库。time.sleep(5)
来模拟只有同步操作的库。
只有同步库
import time from fastapi import FastAPI, Request
import asyncio
import threading
import uvicorn app = FastAPI() @app.middleware("http")
async def cal_time(req: Request, call_next): start = time.time() response = await call_next(req) process_time = time.time() - start print(f"接口{req.url.path} use {process_time} sec.") return response @app.get("/test1")
def test1(): print(threading.current_thread(), "test1") return "test1" @app.get("/test2")
def test2(): print(threading.current_thread(), "test2") time.sleep(5) return "test2" if __name__ == '__main__': uvicorn.run(app, host="127.0.0.1", port=8000)
上面有的fastapi 定义了两个接口,test1接口未执行任何IO操作,只返回一个字符串,用来验证其它接口是否会阻塞test1接口,test2 接口,这里使用time.sleep(5)
来模拟一个阻塞操作。并且同时打印了test1和test2中的线程信息。并且添加了一个中间件cal_time,打印每个请求的耗时。
我们启动应用并看一下此时存在阻塞的接口test2是否会阻塞test1接口?
我们在浏览器中先访问test2接口,之后再请求test1接口,我们来看一下打印
<WorkerThread(AnyIO worker thread, started 11720)> test2
<WorkerThread(AnyIO worker thread, started 22508)> test1
接口/test1 use 0.0019936561584472656 sec.
INFO: 127.0.0.1:56758 - "GET /test1 HTTP/1.1" 200 OK
接口/test2 use 5.0042359828948975 sec.
INFO: 127.0.0.1:56757 - "GET /test2 HTTP/1.1" 200 OK
- 由于先请求的test2接口,所以先打印出了test2接口的线程信息,在11720 线程上执行。
- 之后请求了test1接口,这时打印了test1接口所运行的线程,线程号为22508。
- 此时 test1 接口并没有因为test2接口的
time.sleep(5)
而阻塞,而是直接返回了test1,显示耗时0.001秒。 - 在停了5秒钟以后,接口test2完成了响应,显示耗时5秒。
有异步库
在只有同步调用库的情况下,上面的代码运行很好,接口之间并没有相互阻塞,接下来我们看一下存在异步库的情况。
import time from fastapi import FastAPI, Request
import asyncio
import threading
import uvicorn app = FastAPI() @app.middleware("http")
async def cal_time(req: Request, call_next): start = time.time() response = await call_next(req) process_time = time.time() - start print(f"接口{req.url.path} use {process_time} sec.") return response @app.get("/test1")
def test1(): print(threading.current_thread(), "test1") return "test1" @app.get("/test2")
async def test2(): print(threading.current_thread(), "test2") await asyncio.sleep(5) return "test2" if __name__ == '__main__': uvicorn.run(app, host="127.0.0.1", port=8000)
这里修改test2方法,使用 await asyncio.sleep(5)
模拟异步阻塞操作,由于这里使用了await, 所以需要将test2改为async def,这时还是先访问test2接口,再访问test1接口,此时test2中的异步IO操作并依然没有阻塞test1接口,我们再来看一下请求的打印输出。
- 首先访问test2接口,打印了test2接口的线程信息。
- 接下来访问test1接口,打印了test1接口的线程信息。
- 由于test2接口并不会阻塞test1接口,所以这里test1接口完成请求,耗时0.003秒。
- 过了5秒钟以后,接口2阻塞结束,打印耗时5秒。
两次接口test2都不会阻塞test1,这是为什么呢?以前在使用Tornado或者Sanic开发时,如果某个接口里调用了同步库,如requests.get,那么如果处理不当是会阻塞整个系统进程的。
我们来仔细看一下程序的打印输出,在第一次使用time.sleep(5) 模拟IO操作时,打印的线程信息为
<WorkerThread(AnyIO worker thread, started 11720)> test2
<WorkerThread(AnyIO worker thread, started 22508)> test1
我们看到两次请求是在不同的线程中运行的,所以即使某个接口请求中存在阻塞操作,也不会影响到其它的线程。
再看第二次使用await asyncio.sleep(5)
模拟IO操作时的线程信息。
<_MainThread(MainThread, started 4501536256)> test2
<WorkerThread(AnyIO worker thread, started 123145549803520)> test1
依然是在两个不同的线程中运行的,所以也不会相互阻塞。
但是我们在横向比较两次请求中的test2的线程信息
<WorkerThread(AnyIO worker thread, started 11720)> test2
<_MainThread(MainThread, started 4501536256)> test2
第一次在使用def 定义函数时,是在WorkerThread中运行的,第二次使用async def 显示是在MainThread 中运行的,这也就说明了一个问题,在python 中,使用async def 定义的函数是运行在协程中,而多个协程是在一个主线程中的。
容易出的问题
通过上面的实践,我们再来看最开始时,FastAPI官网给出的结论:
- 如果你的函数中有使用await,则函数使用async def 定义。
- 如果不知道是否要使用async,则不要用,使用def 就好。
这个结论看起来比较简单明了,但是我们也看出一些问题,如果存在await,则必须要定义在async def 函数中,这个好说,如果没有定义在async def 函数中,则语法都过不去。对于没有存在异步操作的库,我们应不应该定义在async中呢?我们来实践一下。修改test2函数。
import time from fastapi import FastAPI, Request
import asyncio
import threading
import uvicorn
from uvicorn.config import LOGGING_CONFIG app = FastAPI() @app.middleware("http")
async def cal_time(req: Request, call_next): start = time.time() response = await call_next(req) process_time = time.time() - start print(f"接口{req.url.path} use {process_time} sec.") return response @app.get("/test1")
def test1(): print(threading.current_thread(), "test1") return "test1" @app.get("/test2")
async def test2(): print(threading.current_thread(), "test2") time.sleep(5) return "test2" if __name__ == '__main__': LOGGING_CONFIG["formatters"]["access"][ "fmt"] = '%(asctime)s %(levelprefix)s %(client_addr)s - "%(request_line)s" %(status_code)s' uvicorn.run(app, host="127.0.0.1", port=8000)
这次我们打印一下请求的具体时间,由于默认的uvicorn 日志没有打印具体的时间,所以这里通过LOGGING_CONFIG重新定义一下日志的格式。在test2函数这时我们使用time.sleep(5) 替换掉await asyncio.sleep(5)
, 这时依然使用async def 定义test2函数,再访问一下test2和test1接口。
此时我们看到的现象为test1接口被test2接口给阻塞住了,但是由上面的实验得出来的结论,使用def 定义的函数应该是单启一个线程执行,可是为什么还被test2给阻塞了呢?
我们详细看一下输出:
<_MainThread(MainThread, started 3768)> test2
接口/test2 use 5.008129358291626 sec.
2023-11-29 11:33:12,724 INFO: 127.0.0.1:64011 - "GET /test2 HTTP/1.1" 200 OK<WorkerThread(AnyIO worker thread, started 20056)> test1
接口/test1 use 0.006971836090087891 sec.
2023-11-29 11:33:12,730 INFO: 127.0.0.1:64012 - "GET /test1 HTTP/1.1" 200 OK
输出分为两部分,一个是访问test2的输出,一个是访问test1的输出。
- test2的输出,首先打印出运行test2函数的线程,由于test2是async def 定义的, 所以这个协程是跑在_MainThread中。
- 此时test2函数运行到了
time.sleep(5)
处,阻塞了整个系统,使得整个系统无法再接收新来的请求。 - 这时新发来了一个test1接口的请求,这时由于test2中的time.sleep 阻塞了整个系统,所以这时test1请求也被阻塞了,并不是test1函数被阻塞了,而是现在都无法开始执行test1接口函数。
- 过了5秒钟,执行完time.sleep(5) 代码,打印输出
2023-11-29 11:33:12,724 INFO: 127.0.0.1:64011 - "GET /test2 HTTP/1.1" 200 OK
, 系统接着开始处理test1接口请求。 - 由于 test1使用def 定义,所以这里跑在了一个新的子线程中,WorkerThread,且test1没有任何IO阻塞操作,所以此时很快的完成了响应。打印耗时0.006。
这也就解释了为什么先访问阻塞的test2接口,再访问非阻塞的test1接口,即使test1接口使用def 定义也会被卡住的情况。
通过上面的实验,我们可以得出,如果存在同步阻塞的IO操作,不要 放到async def 函数中。
test1函数没有阻塞的IO操作,使用async def 或者 def 定义都可以,看官方的文档应该是推荐放到def 中,但是由于def 定义的函数在运行的时候需要单启一个线程(从线程池中获取),会有一些额外的开销,如果访问量小的话应该没有多大影响。
结论
在FastAPI中,如果使用async def 定义的函数,里面的IO操作均要实现异步操作await,如果要使用同步的IO操作,需要使用def 定义函数。简单来讲用下图表示