FastAPIで処理に時間がかかるようなタスク向けのバックグラウンドタスクの機能がある。 チュートリアルを参考に動作確認をしてみる。
バックグラウンドタスクの目的
リクエスト後に処理を開始する必要があるが、クライアントがレスポンスを受け取る前に 処理を終える必要がないタスクを処理するために存在する。
作業完了の通知
何かしらの完了通知をする際に、クライアント側は通知リクエストだけ出して、バックグラウドで完了通知をする。ファイルの解析
巨大なファイルの解析をする際に、リクエスト受付の応答だけ返して、リクエスト契機でファイルの解析を開始する。 ファイルの解析完了後に、完了通知を送信する。
実装方法
タスクの登録
BackgroundTasks
を使用する。
リクエスト受信の関数内で、BackgroundTasks
のadd_task()
メソッドをコールして、BackgroundTask
にタスクを登録する。
background_tasks.add_task(execute_long_time, mode, message="notification")
add_task()
には、バックグラウンドで実行されるタスク関数と実行時に必要なパラメータを渡す。
応答を返す
タスクを登録したら、return文でリクエスト元に応答を返す。
background_tasks.add_task(execute_long_time, mode, message="notification") return {"message": "Notification sent in the background"}
ファイルを収集する(ユースケース)
作りとしては悪いが、ファイルの収集依頼を出して、定期的に収集状態を確認する。
ファイルの収集依頼を受けた側は、リクエストIDを発行し、バックグラウンドでファイルを収集する。
リクエストIDが返ってきたら、リクエストIDを使って収集状況を確認する。
aiohttp.client_exceptions.ServerDisconnectedError: Server disconnected が発生
aiohttp.ClientSession
を使いまわしたら、ServerDisconnectedError
が発生した。
aiohttp.TCPConnector
のforce_close
パラメータをTrue
に設定し、ClientSession
に設定して
使用する。この設定は、HTTPのKeep-Aliveを使用しないように設定するためのものらしい。
aiohttp
のマニュアルからでは読み取れなかったし、検証もできてないけど。
async with aiohttp.ClientSession( connector=aiohttp.TCPConnector(force_close=True) ) as session:
最後に
簡単なユースケースを実際に動かしてみて、Keep-Aliveによる問題の回避方法も見つけることが出来た。
ただ、大量のリクエストに対して、大量のタスクが発生する。そのため、実際の運用では大量のタスクを
キューなどで実行数に制限をかけるなどの対応が必要になると考える。
そこに関してもFastAPIのマニュアルにも言及されているので、次回で確認したい。
参考
async def execute_long_time(mode: str, request_id: str): logger.info("Start execute_long_time method") logger.info("execute_log_time param mode=" + mode + ", request_id=" + request_id) collecting_status.append({"request_id": request_id, "status": "collecting"}) await asyncio.sleep(60) # time.sleep(60) if {"request_id": request_id, "status": "collecting"} in collecting_status: collecting_status.remove({"request_id": request_id, "status": "collecting"}) logger.info("End execute_long_time method") @app.post("/collectlog") async def collect_log(request: CollectRequest, background_tasks: BackgroundTasks): request_id = uuid.uuid4() background_tasks.add_task(execute_long_time, request.mode, str(request_id)) response = {"request_id": request_id} return response