SQLAlchemyのEngineオブジェクトをimport時に生成する例がネット上でよく見られる。
自分もimport時に生成していたが、本当はFastAPI起動時にEngineオブジェクトを生成することで、
起動した時点でDBへのアクセスが出来る状態にしておくのがあるべき姿だと考える。
そのため、FastAPIのlifespan
でSQLAlchemyのEngineオブジェクトを生成する。
Engineオブジェクトを生成
Engineオブジェクトをもつソースファイルを用意。 本当はクラスとかにした方がいいのだろうが、今回は一旦これで行く。
from sqlalchemy.ext.asyncio import create_async_engine from db.todo import Base URL = "postgresql+asyncpg://postgres:example@localhost:5432/postgres" engine = None async def connect(): global engine # エンジンオブジェクトの生成 engine = create_async_engine(URL, echo=True) # テーブル作成 async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) def get_engine(): return engine def disconnect(): engine.dispose()
lifespanの実装
FastAPI起動/終了時に実行する処理を実装。
起動時に先ほどのconnect
メソッドを実行して、Engineオブジェクトを生成する。
終了時にdispose
メソッドを実行する。
@asynccontextmanager async def lifespan(app: FastAPI): logger.info("Start up!!!") await database.connect() yield database.disconnect() logger.info("Shutdown!!!") app = FastAPI(lifespan=lifespan)
DBの操作はリポジトリクラスで実行
実際のDB操作はリポジトリクラスで実行する。 create_todo_repositoryをCRUDの各メソッドに設定して、リポジトリクラスのオブジェクトを生成する。
class TodoRepository: def __init__(self, session: AsyncSession): self.session: AsyncSession = session async def insert_data(self, request: TodoRequest): async with self.session() as session: async with session.begin(): insert_data = [] for item in request.root: insert_data.append(Todo(content=item.content)) logger.info(insert_data) session.add_all(insert_data) async def get_data(self): async with self.session() as session: stmt = select(Todo) result = await session.execute(stmt) response = result.scalars().all() for data in result.scalars(): logger.info("id = " + str(data.id) + ", content = " + data.content) response.append(response) return response def create_todo_repository() -> TodoRepository: engine = database.get_engine() async_session = async_sessionmaker(engine, expire_on_commit=True) todorepository = TodoRepository(async_session) return todorepository
CRUD用のメソッドはこんな感じ。Dependsでリポジトリクラスのオブジェクトを生成。
@app.post("/data") async def create_data( request: TodoRequest, repository: TodoRepository = Depends(create_todo_repository) ): await repository.insert_data(request) return {"result": True}
実行してみる
POSTリクエストでDBに書き込んで、GETメソッドでデータを取得する。
INSERTは成功。
2024-06-01 07:46:23,200 INFO sqlalchemy.engine.Engine BEGIN (implicit) 2024-06-01 07:46:23,200 INFO sqlalchemy.engine.Engine INSERT INTO todo (content) SELECT p0::VARCHAR FROM (VALUES ($1::VARCHAR, 0), ($2::VARCHAR, 1)) AS imp_sen(p0, sen_counter) ORDER BY sen_counter RETURNING todo.id, todo.id AS id__1 2024-06-01 07:46:23,200 INFO sqlalchemy.engine.Engine [cached since 45.06s ago (insertmanyvalues) 1/1 (ordered)] ('test1', 'test2') 2024-06-01 07:46:23,212 INFO sqlalchemy.engine.Engine COMMIT INFO: 127.0.0.1:55783 - "POST /data HTTP/1.1" 200 OK
GETで書き込んだデータを取得できた。
[ { "id": 3, "content": "test1" }, { "id": 4, "content": "test2" } ]
ModuleNotFoundError: No module named 'database' でつまづいた。
uvicornを起動した地点を基準にインポートが必要なので、注意が必要。 同じディレクトリにあるファイルも同様。
File "/xxxxxx/src/fastapi/on_event_dor_db.py", line 6, in <module> from db.repository import TodoRepository, create_todo_repository File "/xxxxxx/src/fastapi/db/repository.py", line 4, in <module> import database ModuleNotFoundError: No module named 'database'
最後に
FastAPIの起動時にEngineオブジェクトを生成するやり方をやってみた。
import文で生成でも良いのかもしれんが、個人的にはFastAPI起動時にDBアクセスの準備は出来た状態で
あるべきだと考えるので、これからはこっちで実装していきたい。