ヒトリ歩き

愚痴とかいろいろ書きます

FastAPI起動時にSQLAlchemyのEngineオブジェクトを生成

SQLAlchemyのEngineオブジェクトをimport時に生成する例がネット上でよく見られる。
自分もimport時に生成していたが、本当はFastAPI起動時にEngineオブジェクトを生成することで、 起動した時点でDBへのアクセスが出来る状態にしておくのがあるべき姿だと考える。
そのため、FastAPIのlifespanでSQLAlchemyのEngineオブジェクトを生成する。

Engineオブジェクトを生成

Engineオブジェクトをもつソースファイルを用意。 本当はクラスとかにした方がいいのだろうが、今回は一旦これで行く。

from sqlalchemy.ext.asyncio import create_async_engine
from db.todo import Base

URL = "postgresql+asyncpg://postgres:example@localhost:5432/postgres"
engine = None

async def connect():
    global engine

    # エンジンオブジェクトの生成
    engine = create_async_engine(URL, echo=True)

    # テーブル作成
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

def get_engine():
    return engine

def disconnect():
    engine.dispose()

lifespanの実装

FastAPI起動/終了時に実行する処理を実装。
起動時に先ほどのconnectメソッドを実行して、Engineオブジェクトを生成する。
終了時にdisposeメソッドを実行する。

@asynccontextmanager
async def lifespan(app: FastAPI):
    logger.info("Start up!!!")
    await database.connect()

    yield
    database.disconnect()
    logger.info("Shutdown!!!")


app = FastAPI(lifespan=lifespan)

DBの操作はリポジトリクラスで実行

実際のDB操作はリポジトリクラスで実行する。 create_todo_repositoryをCRUDの各メソッドに設定して、リポジトリクラスのオブジェクトを生成する。

class TodoRepository:

    def __init__(self, session: AsyncSession):
        self.session: AsyncSession = session

    async def insert_data(self, request: TodoRequest):
        async with self.session() as session:
            async with session.begin():
                insert_data = []
                for item in request.root:
                    insert_data.append(Todo(content=item.content))

                logger.info(insert_data)
                session.add_all(insert_data)

    async def get_data(self):
        async with self.session() as session:
            stmt = select(Todo)
            result = await session.execute(stmt)
            response = result.scalars().all()
            for data in result.scalars():
                logger.info("id = " + str(data.id) + ", content = " + data.content)
                response.append(response)

        return response


def create_todo_repository() -> TodoRepository:
    engine = database.get_engine()
    async_session = async_sessionmaker(engine, expire_on_commit=True)
    todorepository = TodoRepository(async_session)

    return todorepository

CRUD用のメソッドはこんな感じ。Dependsでリポジトリクラスのオブジェクトを生成。

@app.post("/data")
async def create_data(
    request: TodoRequest, repository: TodoRepository = Depends(create_todo_repository)
):
    await repository.insert_data(request)
    return {"result": True}

実行してみる

POSTリクエストでDBに書き込んで、GETメソッドでデータを取得する。

INSERTは成功。

2024-06-01 07:46:23,200 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-06-01 07:46:23,200 INFO sqlalchemy.engine.Engine INSERT INTO todo (content) SELECT p0::VARCHAR FROM (VALUES ($1::VARCHAR, 0), ($2::VARCHAR, 1)) AS imp_sen(p0, sen_counter) ORDER BY sen_counter RETURNING todo.id, todo.id AS id__1
2024-06-01 07:46:23,200 INFO sqlalchemy.engine.Engine [cached since 45.06s ago (insertmanyvalues) 1/1 (ordered)] ('test1', 'test2')
2024-06-01 07:46:23,212 INFO sqlalchemy.engine.Engine COMMIT
INFO:     127.0.0.1:55783 - "POST /data HTTP/1.1" 200 OK

GETで書き込んだデータを取得できた。

[
  {
    "id": 3,
    "content": "test1"
  },
  {
    "id": 4,
    "content": "test2"
  }
]

ModuleNotFoundError: No module named 'database' でつまづいた。

uvicornを起動した地点を基準にインポートが必要なので、注意が必要。 同じディレクトリにあるファイルも同様。

  File "/xxxxxx/src/fastapi/on_event_dor_db.py", line 6, in <module>
    from db.repository import TodoRepository, create_todo_repository
  File "/xxxxxx/src/fastapi/db/repository.py", line 4, in <module>
    import database
ModuleNotFoundError: No module named 'database'

最後に

FastAPIの起動時にEngineオブジェクトを生成するやり方をやってみた。
import文で生成でも良いのかもしれんが、個人的にはFastAPI起動時にDBアクセスの準備は出来た状態で あるべきだと考えるので、これからはこっちで実装していきたい。

参考

bitestreams.com

blog.teclado.com