ヒトリ歩き

愚痴とかいろいろ書きます

リトライ処理をbackoffモジュールで簡単に実装する

リトライ処理を自分で作り込まずに、backoffモジュールを活用することでよりコードがシンプルになる。
これからはbackoffモジュールを活用していきたい。

backoffモジュールとは

backoffモジュールは条件が満たされるまでリトライするためのデコレータ。
ネットワークリソースやAPI、信頼性が低く、障害が発生する可能性のあるリソースにアクセスする際、使用することを目的としている。

使い方

backoff.on_exceptionデコレータにリトライする対象のExceptionクラスを定義。
第一パラメータのbackoff.expoはひとまずおまじないと思っていればいい。
下記の例だと、Exceptionが発生し続ければ、ずっとリトライする。

import requests
import backoff

@backoff.on_exception(backoff.expo,
                      Exception)
def handler():

    print("Start handler")
    try:
        response = requests.get("http://localhost:8081")
    except Exception as e:
        print("occur Exception")

        raise e
    
    print("End handler")

if __name__ == '__main__':
    handler()

リトライの設定

max_triesパラメータは、実行回数を設定する。
例えば、実行失敗時に、2回までリトライをさせる場合、最初の実行も回数に含めて3を設定する。

import requests
import backoff
import time

@backoff.on_exception(backoff.expo,
                      requests.exceptions.RequestException,
                      max_tries=5,
                      jitter=None)
def handler():

    print("Start handler")
    try:
        start = time.time()
        response = requests.get("http://localhost:8000", timeout=(5.0, 5.0))
    except Exception as e:
        end = time.time()
        print("occur Exception")
        print("Execute time = " + str(end - start))
        print(e)

        raise e
    
    end = time.time()
    print("End handler time = " + str(end - start))

if __name__ == '__main__':
    handler()

再実行時のリトライ間隔の設定

再実行時のリトライ間隔は、設定するWatiGeneratorで決まる。
backoff.expoを指定した場合、指数関数で増加する。デフォルトの場合、2のX乗。
backoff.fiboを指定した場合、フィボナッチ数列で増加する。
backoff.constantを指定した場合、リトライ間隔は一定となる。デフォルトは1秒。

backoff.constantのリトライの間隔を変更したい場合、intervalパラメータを指定する。
リトライの間隔を5秒に変更したい場合は、interval=5を指定する。

@backoff.on_exception(backoff.constant,
                      Exception,
                      jitter=None,
                      interval=5

リトライ間隔を1秒、10秒、20秒のようにカスタマイズしたい場合、intervalパラメータにリストで間隔を設定する。

@backoff.on_exception(backoff.constant,
                      Exception,
                      jitter=None,
                      interval=[1, 10, 20]

リトライ時のばらつきを含める

backoffモジュールにはリトライ時のタイミングにばらつきを持たせるために、jitterパラメータが用意されている。
複数のユーザが操作によるリトライが発生した場合、リトライによる際実行が同じになり、多くのリクエストが発生する。
そのため、jitterによる時間のずれを含めることで、リトライのタイミングをずらして再実行によるリクエストの増加を防ぐ。
実行のタイミングのずれを含めたくない場合、jitter=Noneの指定が必要になる。

beyondjapan.com

https://aws.typepad.com/sajp/2015/03/backoff.html

aws.amazon.com

最後に

backoffモジュールを使うことで簡単にリトライをするための実装が可能となる。
ほかにもon_predicateを使用して、ポーリングも続けることができる。
また、複数のデコレータを指定することもの可能だ。
backoffモジュールを使い倒すことで、リトライ系のことは何でもできそうな気がする。
これからは自分でリトライ処理は作り込まず、backoffモジュールを活用していきたい。

デコレータでタイムアウトを設定しよう!!

時間がかかる処理に対して、タイムアウトを設定したい場合に使用するためのデコレータとして、timeout_decorator.timeoutがあるので、使ってみた。 用途として、サーバに対する検索のリクエストを実行した際に、検索に時間がかかる場合にタイムアウトを設定する。

github.com

timeout_decorator.timeoutデコレータのパラメータに関数がコールされてからタイムアウトする 秒数を指定。 下記の例の場合、10秒スリープする処理を入れているが、デコレータに5秒でタイムアウトするように設定している。 5秒が経過すると、timeout_decorator.timeout_decorator.TimeoutErrorが発生する。

import time
import timeout_decorator

# 5秒後にタイムアウトする
@timeout_decorator.timeout(5)
def call_func():
    print("Start call_func")
    time.sleep(10)
    print("End call_func")

if __name__ == '__main__':
    i = 0
    while i < 3:
        call_func()
        i += 1

独自のエラーをライズしたい

timeout_decorator.timeout_decorator.TimeoutErrorではなく、独自で定義した Exceptionクラスをレイズしたい場合、timeout_exceptionパラメータにレイズしたい Exceptionクラスを指定することで可能。

下記の例だと、timeout_exceptionパラメータにNewErrorクラスを指定。 タイムアウトが発生すると、NewErrorクラスがレイズされる。

import time
import timeout_decorator

class NewError(Exception):
    pass

# 5秒後にタイムアウトする
@timeout_decorator.timeout(5, timeout_exception=NewError)
def call_func():
    print("Start call_func")
    time.sleep(10)
    print("End call_func")

if __name__ == '__main__':
    i = 0
    while i < 3:
        call_func()
        i += 1

時間がかかる検索にはタイムアウトを設定しよう

HTTPリクエストで時間がかかる処理をタイムアウトするために、timeout_decorator.timeoutデコレータを 使ってタイムアウトを設定しよう。 ただ、requestsモジュールにはタイムアウトの機能が存在するので、requestsモジュールを使用する場合は、 requestsモジュールのタイムアウト機能を素直に使いましょう。

import timeout_decorator
import requests

@timeout_decorator.timeout(10)
def call_func():

    print("Send Request!!")
    response = requests.get("http://localhost:8000"params=None)
    print("Receive Response!!")

if __name__ == '__main__':
    i = 0
    while i < 3:
        call_func()
        i += 1

requests.readthedocs.io

最後に

モジュールをインストールして、デコレータの設定のみでタイムアウト機能が実装できた。 他にもタイムアウトを実装したモジュールがあるようなので、用途によって使い分けたほうがいいかと思う。

scrapbox.io

FastAPIのエラーハンドラーを使ってコードをシンプルに!!

エラーハンドラーの設定をすることで、try-except毎にレスポンスの組み立てが不要になり、コードがシンプルになると考える。
自分はこのエラーハンドラーの機能があり、助かってる。

エラーハンドラーの設定

特定のエラーに対する応答結果をカスタマイズしたい場合、@app.exception_handlerを設定する。

@app.exception_handler(ValidationError)
async def handle_validation_error(request: Request, exc: ValidationError):
    return JSONResponse(
        status_code=400,
        content={"message": "ValidationErrorが発生したよ"}
    )

exception_handlerデコレータのパラメータにハンドリングするExceptionクラスを指定する。
関数のパラメータは、第一パラメータにRequestクラス、第二パラメータには処理するExceptionクラスを指定。
最後に、返却したい応答結果をJSONResponseクラスを使って、応答結果を設定する。 JSONResponseクラスのstatus_codeステータスコードcontentパラメータに応答内容を設定。

上記の場合、ステータスコードが400、応答内容にJSONのmessageキーにValidationErrorが発生したよの値を設定したデータを返す。

動作確認

応答結果を組み立てる際に、pydanticでValidationErrorが発生するソースコードを用意して実際に動かしてみる。

実際に、動作をさせてみると。
エラーハンドラーの設定がない場合、500エラーが返却される。

エラーハンドラーの設定がある場合、期待する応答が返却される。

最後に

このエラーハンドリングを使うことで、返却したい応答用のExceptionクラスをレイズすれば、応答結果を返すことができる。 そうすることで、try-except毎に応答結果を生成する処理は不要になり、コードがシンプルになると考える。 自分は、try-except毎に応答結果の処理を作りたくないので、エラーハンドリングの機能があって助かっている。

参考

fastapi.tiangolo.com

ソースコード

pydanticのインストールも必要。

from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field, PositiveInt, RootModel, ValidationError
from typing import List
from logging import getLogger, StreamHandler

logger = getLogger(__name__)
logger.addHandler(StreamHandler())
logger.setLevel("INFO")


class Product(BaseModel):
    id: int = Field(...)
    name: str = Field(...)
    cost : PositiveInt = 0

class Products(RootModel):
    root: List[Product]


app = FastAPI()

@app.get("/")
def hello():

    product = Product(id=1, name="チョコ", cost=100)
    products = Products.model_validate(product)

    return products

@app.exception_handler(ValidationError)
async def handle_validation_error(request: Request, exc: ValidationError):
    logger.info("Call Exception Handler!!")
    return JSONResponse(
        status_code=400,
        content={"message": "ValidationErrorが発生したよ"}
    )

Pythonのclick.commandを使えば簡単!コマンドの実装方法

シェルスクリプトで頑張って作っていたパラメータの解析もPythonのclick.commandを使えば簡単に実装できるし、コマンドで必要なことがモジュールとして提供されている。
シェルスクリプトから脱却して、Pythonでコマンドを作ろうぜ。

フォントの色、スタイルを変更できる

style()関数を使用することで、フォントの色やスタイルを変更が可能。

click.echo(click.style('Hello World!', fg='green'))
click.echo(click.style('Some more text', bg='blue', fg='white'))
click.echo(click.style('ATTENTION', blink=True, bold=True)) 

ヘルプテキストの記載が簡単

シェルスクリプトのヘルプメッセージは地味に大変。
click.commandだと関数のdocstringが指定されている場合、自動的にdocstringの内容が使用される。

@click.command()
@click.argument("name")
def hello(name: str):

    """docstringの内容がヘルプに指定されるよ"""

    click.echo("name: " + name )
Usage: clickcmd_sample.py [OPTIONS] NAME

  docstringの内容がヘルプに指定されるよ

Options:
  --help  Show this message and exit.

環境変数

auto_envvar_prefixに環境変数プレフィックスを設定する必要がある。
auto_envvar_prefixに指定したプレフィックスとアンダーバーを除いた文字列が変数名になる。
以下の場合、HELLO_USERNAMEとHELLO_JOBが環境変数となり、環境変数の値が格納される変数名は、usernameとjobになる。

@click.command()
@click.option('--username')
@click.option("--job", envvar="JOB")
def hello(username, job):

    click.echo("username = " + str(username))
    click.echo("job      = " + str(job))

if __name__ == "__main__":
    hello(auto_envvar_prefix="HELLO")
# export HELLO_USERNAME=sato
# export HELLO_JOB=engineer 
# python clickcmd_sample.py 
username = sato
job      = engineer

ユーザー入力のためのプロンプト表示も

コマンドのパラメータ指定ではなく、ユーザーに値を入力させたいケースがある。
そのケースもclick.commandは対応している。
click.optionデコレーターのpromptパラメータにTrueを指定するだけ。

import click

@click.command()
@click.option('--brithplace', prompt=True)
def hello(brithplace):

    click.echo("brithplace : " + str(brithplace))

if __name__ == "__main__":
    hello()

プロンプトのメッセージもカスタマイズをする場合、promptパラメータに表示したいメッセージを指定する。

import click

@click.command()
@click.option('--brithplace', prompt="あなたの出身地は?")
def hello(brithplace):

    click.echo("入力した出身地 : " + str(brithplace))

if __name__ == "__main__":
    hello()
あなたの出身地は?: 神奈川
入力した出身地 : 神奈川

パスワード入力も簡単に実装できる。
hide_inputパラメータをTrueに設定し、入力を非表示にする。
また、confirmation_promptで確認用に再入力をさせる。
試してみて分かったことは、パスワード入力を数回間違えれば、コマンドが終了するのではなく、ずっとパスワード入力が求められるということ。
回数指定出来たらいいのに。

import click

@click.command()
@click.option('--password',
              prompt="パスワードを入力してください",
              hide_input=True,
              confirmation_prompt=True)
def hello(password):

    click.echo("入力したパスワード : " + str(password))

if __name__ == "__main__":
    hello()

テストも簡単

テスト用のモジュールも提供されているので、テストが容易。
以下をテストをする。

import click

@click.command()
@click.argument("brithplace")
@click.argument("schoolname")
def show_your_birthplace(brithplace, schoolname):
    click.echo("brithplace = " + str(brithplace))
    click.echo("schoolname = " + str(schoolname))

if __name__ == '__main__':
    show_your_birthplace()

CliRunnerクラスのオブジェクトを生成し、invoke関数を実行する。
第一パラメータにテスト対象の関数、第二パラメータにパラメータを配列で指定。
invoke関数の戻り値に結果が格納される。
標準出力はresult.outputに格納される。改行含めての文字列になっているので、assertするときは改行で分割したほうが、チェックしやすい。

from click.testing import CliRunner
from click_easy import show_your_birthplace

def test_show_your_birthplace():
    runner = CliRunner()
    result = runner.invoke(show_your_birthplace, ["Kanagawa", "WAHAHA"])
    assert result.exit_code == 0
    assert result.output == "brithplace = Kanagawa\nschoolname = WAHAHA\n"

ユーザーの入力させる処理もinput変数を使用すればテスト可能。
これは嬉しい。

import click

@click.command()
@click.option("--brithplace", prompt="出身は?")
@click.option("--schoolname", prompt="出身校は?")
def show_your_birthplace(brithplace, schoolname):
    click.echo("brithplace = " + str(brithplace))
    click.echo("schoolname = " + str(schoolname))

if __name__ == '__main__':
    show_your_birthplace()

input変数に入力する値を設定。
output変数には入力を求める文字列も含まれるので、合わせて試験が可能。

from click.testing import CliRunner
from click_easy import show_your_birthplace

def test_show_your_birthplace():
    runner = CliRunner()
    result = runner.invoke(show_your_birthplace, input="Kanagawa\nWAHAHA\n")
    assert result.exit_code == 0
    #assert result.output == "出身は?: Kanagawa\n出身校は?: WAHAHA\nbrithplace = Kanagawa\nschoolname = WAHAHA\n"
    outputs = result.output.split('\n')
    assert len(outputs)-1 == 4 # 末尾の改行を除く
    assert outputs[0] == "出身は?: Kanagawa"
    assert outputs[1] == "出身校は?: WAHAHA"
    assert outputs[2] == "brithplace = Kanagawa"
    assert outputs[3] == "schoolname = WAHAHA"
    assert outputs[4] == ""

さいごに

コマンドはほぼシェルスクリプトで作成しているので、単体テストが後回しになったり、全ルートを通すために苦労する。
それにシェルスクリプトのテストツールも少ない。そもそも、シェルスクリプトのテストツールを使っているところ自体が少ない。
それに比べて、Pythonでコマンドに必要なことがモジュールで提供されているので、実装も楽だし、テストも容易。これはPythonでコマンド作ろぜってことだ。
自分のプロジェクトでも機会があれば、Pythonでコマンドを作ろうと思う。

参考

click.palletsprojects.com

kotapontan.hatenablog.com

Pythonのclick.commandを利用して簡単にプログレスバーを作成

click.commandを使えば、コマンドのプログレスバーを簡単に実現できる。
少し感動した自分がいる。

click.commandとは

コマンドラインインタフェースを作成するためのPythonのパッケージ。

基本例

イテレータのデータを処理する場合、click.progressbarに処理対象のイテレータ変数を渡す。
返却されたProgressBarオプジェクトをfor文で処理する。 各要素は、click.progressbarに渡したイレテータの要素になる。

import click
import time

def progress():

    all_item = ["product1", "product2", "product3", "product4"]

    with click.progressbar(all_item) as bar:
        for item in bar: # product1, product2, prodcut3, product4 がくる
             time.sleep(3)

if __name__ == '__main__':
    progress()

実行すると以下の表示される。

[##################------------------]   50%  00:00:06

labelパラメータを設定するとプログレスバーの前にラベルを表示できる。

with click.progressbar(all_item,
                           label="プログレスバーだよ") as bar:
        for item in bar:
             time.sleep(3)

ラベルを付与したプログレスバーは以下のようになる。

プログレスバーだよ  [#########---------------------------]   25% 

進捗状況を意図した値で更新したい

イテレータを処理せずに、指定の値のプログレスを表示したい場合、lengthに サイズを指定する。 プログレスバーの更新は、update関数を実行し、進捗状況を更新する。

    total_size = 1000
    with click.progressbar(length=total_size) as bar:
        
        while True:
             bar.update(1) # 1が加算されていく
             count += 1
             time.sleep(0.5)
             if count == total_size:
                break

プログレスバーのカスタマイズ

進捗状況の表示文字を変更

プログレスバーの進捗状況を表す文字をカスタマイズしたい場合、fill_charパラメータを使用する。
残りの状況の箇所をカスタマイズしたい場合、empty_charパラメータを使用する。
デフォルトだと、fill_charパラメータは#empty_charパラメータは、-になる。
fill_charパラメータを*empty_char(空白)にしてみる。

    total_size = 20
    with click.progressbar(fill_char="*",
                           empty_char=" ",
                           length=total_size) as bar:
        
        count = 0
        while True:
             bar.update(1)
             count += 1
             time.sleep(0.5)
             if count == total_size:
                 break

進捗状況は*で表示され、残りの状況は空白で置き換わっている。

[**********************              ]   62%  00:00:12

プログレスバーの表示フォーマットを変更

プログレスバーの表示フォーマットのカスタマイズもできる。
デフォルトだと、%(label)s [%(bar)s] %(info)s が設定されているので、barの枠を[]から|に変更してみる。

    total_size = 20
    with click.progressbar(fill_char="*",
                           empty_char=" ",
                           bar_template='%(label)s  |%(bar)s|  %(info)s',
                           length=total_size) as bar:
        
        count = 0
        while True:
             bar.update(1)
             count += 1
             time.sleep(0.5)
             if count == total_size:
                 break

barの枠が|になっていることがわかる。

  |************************************|  100%   

そのほかにも、推定時間の有効/無効の設定、パーセント表示の有効/無効の設定もできる。

click.palletsprojects.com

最後に

シェルスクリプトプログレスバーを作ろうとすると手間がかかるし、処理も面倒なので作ろうと思わないが、このpythonのライブラリを使用すれば、簡単にプログレスバーが作れる。
こんなに簡単にプログレスバーが出来るなんて、少し感動した。

SQLAlchemy ORMでのJOINのやり方

SQLAlchemyのORMを使ったSELECT時のJOINのやり方がよくわからなかったので、調べてみた。

やりたいこと

studentテーブルのschool_idとschoolテーブルのidを結合して、 student.id,student,name,school.nameを取得したい。

テーブルのデータは以下のとおり。

studentテーブル

id name school_id
1 sato 1
2 tanaka 2
3 watanabe 3

schoolテーブル

id name
1 あいうえお学園
2 かきくけこ学園
3 さしすせそ学校

Coreを使った結合

データクラスは以下のように定義。

class Base(DeclarativeBase):
    pass

class Student(Base):
    __tablename__ = "student"
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(30))
    school_id: Mapped[int]

class School(Base):
    __tablename__ = "school"
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(30))

JOINしたSQL文を生成するための処理を定義。

stmt = select(Student).join(School, Student.school_id == School.id).where(Student.id == 1)

生成されるSQL文は以下のようになる。
studentテーブルとschoolテーブルのnameカラムが同一名となっているので、AS句によって別名が付けられている。

SELECT student.id, student.name, school.name AS name_1 
FROM student JOIN school ON student.school_id = school.id 
WHERE student.id = %(id_1)s

生成したSQL文を実行する場合、executeを実行する。
データにアクセスするには、result[カラム名]になるため、AS句によって自動で別名をつけられた際に、意図したデータを参照することが出来なくなる。

results = session.execute(stmt).mappings().all()
for result in results:
    print(result)

意図した別名を付けるには、labelを使用する。

stmt = select(Student.id, Student.name, School.name.label("school_name")).join(School, Student.school_id == School.id).where(Student.id == 1)
print(stmt)
SELECT student.id, student.name, school.name AS school_name 
FROM student JOIN school ON student.school_id = school.id 
WHERE student.id = %(id_1)s

ORMを使った結合

ORMを使った結合をするためには、外部キーとリレーションシップの設定が必要になる。
まずは、school_idに外部キーを設定。

school_id: Mapped[int] = mapped_column(ForeignKey("school.id"))

次にschoolとのリレーションシップを設定。

school: Mapped["School"] = relationship()

全体のデータクラスの定義は以下のようになる。

class Student(Base):
    __tablename__ = "student"
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(30))
    school_id: Mapped[int] = mapped_column(ForeignKey("school.id"))
    school: Mapped["School"] = relationship()

class School(Base):
    __tablename__ = "school"
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(30))

これで実行する。

stmt = select(Student, School).join(School, Student.school_id == School.id).where(Student.id == 1)
results = session.scalars(stmt)

for result in results:
    print("student.id = " + str(result.id))
    print("student.name = " + str(result.name))
    print("student.school_id = " + str(result.school_id))
    print("student.school.id = " + str(result.school.id))
    print("student.school.name = " + str(result.school.name))

studentテーブルのidが1のデータが取得できた。

student.id = 1
student.name = sato
student.school_id = 1
student.school.id = 1
student.school.name = あいうえお学園

最後に

本当はstudent.nameと同じ階層でschool.nameの値も取得したかったが、少し期待はずれ。
自分のやり方が間違っているのかもしれないので、時間を見つけて調べてみようと思う。

参考

docs.sqlalchemy.org

docs.sqlalchemy.org

データベース連携に便利なPython ORMツール - SQLAlchemy

SQLAlchemyを使うことが増えてきたので・・・。

SQLAlchemy とは?

SQLAlchemyは、Python向けのORM(オブジェクト リレーショナル マッパー)。 効率的かつ高パフォーマンスデータベースアクセスを目的に設計されているとのこと。

www.sqlalchemy.org

データベースと連携する

@arkuchyさんの記事を参考にさせていただいています。 詳しい解説なので、こちらをみてもらえると分かりやすいです。

qiita.com

sqlalchemyのクイックスタートをベースにお試し。

docs.sqlalchemy.org

テーブル定義が簡単に書ける

SQL文を頑張って書かなくてもテーブルが作成できる。
モデルベースクラスを作成するために、Base = declarative_base()を宣言していたが、2.0からは不要。 その代わりに、DeclarativeBaseクラスを継承したBaseクラスを宣言する。 また、カラムの定義はmapped_columnを使用するようになっている。 Columnも使用可能。

# モデルベースクラスを作成
#Base = declarative_base() と同等(2.0から)
class Base(DeclarativeBase):
    pass

# モデルベースを拡張して、ORMのデータクラスを作成
# mapped_columnは2.0からの新機能
class Product(Base):
    __tablename__ = "product"
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(255), nullable=False)
    cost: Mapped[int] = mapped_column(Integer, nullable=False)
   
   

上記のモデルクラスは以下のテーブルと同じ。

CREATE TABLE product (
        id SERIAL NOT NULL, 
        name VARCHAR(255) NOT NULL, 
        cost INTEGER NOT NULL, 
        PRIMARY KEY (id)
)

検索、登録、更新、削除はORMを使うと楽だね

エンジン作成時にecho=Trueを設定すると、発行したSQL文を標準出力する。

# DBエンジンを作成
url = "postgresql://postgres:example@localhost:5432/postgres"
engine = create_engine(url, echo=True)

# テーブルをDBに作成
Base.metadata.create_all(engine)

# セッションを作成
# セッションを介してクエリを実行する
SessionClass = sessionmaker(engine)
session = SessionClass()

データの登録/更新/削除もORMを使うと簡単に出来る。

データを登録する場合、データクラスに値を設定して、登録用の関数を実行する。 データを更新する場合、対象のレコードのデータを取得して、データクラスを書き換える。 データを削除する場合、更新時と同様にデータを取得して、データクラスを削除用関数にパラメータとして渡す。

# Insert
product1 = Product(name="チョコ", cost=100)
product2 = Product(name="ガム", cost=10)
session.add_all([product1, product2])
session.commit()

# 検索
stmt = select(Product).where(Product.name.in_(["チョコ"]))
products = session.scalars(stmt)
for product in products:
    print("Name = " + str(product.name))

# 変更
stmt = select(Product).where(Product.id == 1)
product = session.scalars(stmt).one()
product.name = "パン"
session.commit()

# 削除
stmt = select(Product)
products = session.scalars(stmt)
session.delete(product)

さいごに

ORMを使うとSQLは意識しなくていいので、楽。
joinとかfunctionとかはどうしたらいいのかわからないので、もう少し調べてみる。