ヒトリ歩き

愚痴とかいろいろ書きます

FastAPIのバックグラウンドタスクを試してみた

FastAPIで処理に時間がかかるようなタスク向けのバックグラウンドタスクの機能がある。 チュートリアルを参考に動作確認をしてみる。

バックグラウンドタスクの目的

リクエスト後に処理を開始する必要があるが、クライアントがレスポンスを受け取る前に 処理を終える必要がないタスクを処理するために存在する。

  • 作業完了の通知
    何かしらの完了通知をする際に、クライアント側は通知リクエストだけ出して、バックグラウドで完了通知をする。

  • ファイルの解析
    巨大なファイルの解析をする際に、リクエスト受付の応答だけ返して、リクエスト契機でファイルの解析を開始する。  ファイルの解析完了後に、完了通知を送信する。

fastapi.tiangolo.com

実装方法

タスクの登録

BackgroundTasksを使用する。
リクエスト受信の関数内で、BackgroundTasksadd_task()メソッドをコールして、BackgroundTaskにタスクを登録する。

background_tasks.add_task(execute_long_time, mode, message="notification")

add_task()には、バックグラウンドで実行されるタスク関数と実行時に必要なパラメータを渡す。

応答を返す

タスクを登録したら、return文でリクエスト元に応答を返す。

    background_tasks.add_task(execute_long_time, mode, message="notification")
    return {"message": "Notification sent in the background"}

ファイルを収集する(ユースケース

作りとしては悪いが、ファイルの収集依頼を出して、定期的に収集状態を確認する。
ファイルの収集依頼を受けた側は、リクエストIDを発行し、バックグラウンドでファイルを収集する。
リクエストIDが返ってきたら、リクエストIDを使って収集状況を確認する。

aiohttp.client_exceptions.ServerDisconnectedError: Server disconnected が発生

aiohttp.ClientSessionを使いまわしたら、ServerDisconnectedErrorが発生した。
aiohttp.TCPConnectorforce_closeパラメータをTrueに設定し、ClientSessionに設定して 使用する。この設定は、HTTPのKeep-Aliveを使用しないように設定するためのものらしい。
aiohttpのマニュアルからでは読み取れなかったし、検証もできてないけど。

    async with aiohttp.ClientSession(
        connector=aiohttp.TCPConnector(force_close=True)
    ) as session:

docs.aiohttp.org

最後に

簡単なユースケースを実際に動かしてみて、Keep-Aliveによる問題の回避方法も見つけることが出来た。
ただ、大量のリクエストに対して、大量のタスクが発生する。そのため、実際の運用では大量のタスクを キューなどで実行数に制限をかけるなどの対応が必要になると考える。 そこに関してもFastAPIのマニュアルにも言及されているので、次回で確認したい。

fastapi.tiangolo.com

参考

async def execute_long_time(mode: str, request_id: str):
    logger.info("Start execute_long_time method")
    logger.info("execute_log_time param mode=" + mode + ", request_id=" + request_id)
    collecting_status.append({"request_id": request_id, "status": "collecting"})
    await asyncio.sleep(60)
    # time.sleep(60)

    if {"request_id": request_id, "status": "collecting"} in collecting_status:
        collecting_status.remove({"request_id": request_id, "status": "collecting"})

    logger.info("End execute_long_time method")


@app.post("/collectlog")
async def collect_log(request: CollectRequest, background_tasks: BackgroundTasks):
    request_id = uuid.uuid4()
    background_tasks.add_task(execute_long_time, request.mode, str(request_id))

    response = {"request_id": request_id}
    return response