ヒトリ歩き

愚痴とかいろいろ書きます

TimescaleDBのセットアップ

時系列DBのことを知りたくて、TimescaleDBのコンテナを使ってサクッとセットアップ。

時系列DBとは

時間情報を持ったデータを格納し、最適化したデータベース。   TimescaleDBはその中の1つ。詳細は参考の資料を参照。

https://www.sraoss.co.jp/wp-content/uploads/files/event_seminar/material/2021/timescaledb-intro-20210624.pdf

セットアップ

インストール

本体をインストールしたくないので、コンテナで起動する。
パスワードを設定したい場合、POSTGRES_PASSWORD環境変数に設定する。ここはPostgreSQLと一緒。

docker pull timescale/timescaledb-ha:pg16
docker run -d --name timescaledb -p 5432:5432 -e POSTGRES_PASSWORD=example timescale/timescaledb-ha:pg16
docker exec -it timescaledb psql -U postgres

docs.timescale.com

ハイパーテーブルの作成

TimescaleDBの紹介資料を参照して、ハイパーテーブルを作成する。 通常のテーブルを作成して、create_hypertable関数を実行。
サーバのCPU使用率、メモリ使用率の情報をもつsserverspecテーブルを作成。

  • テーブルの作成
CREATE TABLE serverspec (
    time timestamp NOT NULL,
    servername text NOT NULL,
    cpu_userd double precision,
    memory_used double precision
);
  • create_hypertable関数の実行

create_hypertable関数には、ハイパーテーブルに変換するテーブル名、分割の条件にする時間の列名を指定する。
戻り値は、ハイパーテーブルのID、スキーマ名、テーブル名。

SELECT create_hypertable('serverspec', 'time');
WARNING:  column type "timestamp without time zone" used for "time" does not follow best practices
HINT:  Use datatype TIMESTAMPTZ instead.
    create_hypertable    
-------------------------
 (1,public,serverspec,t)

データの操作

実際にデータの挿入、検索、更新、削除をする。

  • 挿入
INSERT INTO serverspec VALUES ('2024-02-18 21:00:00', 'server1', 19.4, 10.4);
INSERT 0 1
  • 検索
SELECT * FROM serverspec;
        time         | servername | cpu_userd | memory_used 
---------------------+------------+-----------+-------------
 2024-02-18 21:00:00 | server1    |      19.4 |        10.4
(1 row)
  • 更新
UPDATE serverspec SET cpu_userd = cpu_userd + 0.1 WHERE time = '2024-02-18 21:00:00' and servername = 'server1
';
UPDATE 1
select * from serverspec;
        time         | servername | cpu_userd | memory_used 
---------------------+------------+-----------+-------------
 2024-02-18 21:00:00 | server1    |      19.5 |        10.4
(1 row)
  • 削除
DELETE FROM serverspec WHERE time = '2024-02-18 21:00:00' and servername = 'server1';

最後に

TimescaleDBのインストールとデータの投入、更新、削除までやってみたが、PostgreSQLなので簡単だった。
ここから、チャンクとか時系列DBとして大事な概念が出てくるので、引き続き確認していく。

SQLAlchemyのORMを活用したデータの一括操作

大量データを1件ずつ登録、更新、削除をした場合、データベースとのやりとりが必要になる。 そのため、大量データをまとめて投入したい。 その方法として、bulk insertなどがある。 SQLAlchemyのORMを使ってbulk操作を実現するのか確認する。

wa3.i-3-i.info

Bulk Insert

insert関数にORMクラスを指定して実行する。 ここでは、insert後に確定するidがあるので、returning関数を指定して、insert後のデータを取得する。 returning関数で結果を取得するには、scalars関数を使う必要がある。 returning不要であれば、execute関数でもOK。 ただし、returningから返却されるデータの順序は保証されていない。 順序を保証するために、sort_by_parameter_orderパラメータを指定する必要がある。

students = session.scalars(
    insert(Student).returning(Student),
    [
        {"name": "sato", "address": "Tokyo", "email": "sato@ddd.com"},
        {"name": "tanaka", "address": "Kanagawa", "email": "tanaka@ddd.com"},
        {"name": "wata", "address": "Chiba", "email": "wata@ddd.com"}
    ]
)
session.commit()

Bulk Update

主キーをWHERE条件に設定して、UPDATEを実行する。 下記の場合は、idが主キー。

session.execute(
    update(Student),
    [
        {"id": 1, "name": "sato2"},
        {"id": 2, "name": "tanaka2"},
        {"id": 3, "name": "wata2"}
    ]
)

SQLAlchemyで生成するSQL文。

UPDATE student SET name=%(name)s WHERE student.id = %(student_id)s

主キーを使用せずに、別の条件でUPDATEをしたい場合はwhere関数を使用する。 ここでは、Student.nameと辞書のキーであるu_nameをバインドするために、bindparam関数を使ってバインドする。 また、実行時にはsession.executeではなくsession.connection().executeを使用する必要がある。

session.connection().execute(
    update(Student).where(Student.name == bindparam("u_name")),
    [
        {"u_name": "sato", "name": "sato3"},
        {"u_name": "wata", "name": "wata3"}
    ]
)

docs.sqlalchemy.org

Bulk Delete

Bulk ORM DELETEはサポートされていないため、session.connection().executeでcoreモジュールを使って、実行する。

session.connection().execute(
    delete(Student).where(Student.name == bindparam("d_name")), 
    [
        {"d_name": "wata3"},
        {"d_name": "tanaka"}
    ]
)
session.commit()

session.executeでもできる。

session.execute(
    delete(Student).where(Student.name.in_(["wata3", "tanaka"]))
)
session.commit()

最後に

ORMを使ったBulk操作のやり方を理解できた。 より高度な使い方をする場合は、もう少しマニュアルを見る必要があると思う。 開発でも活かしていきたい。

参考

docs.sqlalchemy.org

2024年2月の振り返り

もう2月も終わりということで、2月の振り返り。

やったこと

朝活または夜活を20分やる

なんとか2月も継続してやることができた。順調に草も生えてきている。 ただ、2日ほど草が生えてないけど、気にしない。

投稿をいつもより頑張ってみた

1月が8記事に対して、2月が10記事の投稿。
気持ちは1月よりもかなり多く投稿した気持ちでいたが、意外と2つしか違いがなかった。
2月は祝日がない週は週3で投稿をすることを意識したら、PVが少し伸びてきている。 ストックはあるにしても3月も週3回は投稿できるようにチャレンジする。

1年のやることリストを少しだけ

結局、1年のやることリストが完成してない。 完成する気配がない。どうしましょう。

わかったこと

ちょっと無理してるかも

週3回投稿しているけど、ちょっと無理している気もする。 モチベーションが落ちないように少しペースは調整しようと思う。
仕事も無理してやろうとするので、少し早めに終わるとか調整する。

次にやること

ちょっとそろそろOSSに貢献したい

そろそろPython系のOSSに貢献したいので、どれかいい感じなものを探してみる。 以前、PowerMockにプルリクしてマージしてもらったけど、もう数年前の話。 ちょっと、探してみよう。

内部ツールを共有

個人的に作成したツールを社内に共有してみる。
あまり使う頻度は高くないけど、あれば便利なはずなのでグループに共有してみようと思う。

プライベートで

2月もちょくちょく奥さんと口喧嘩になったので、3月は仲良くしたい。

最後に

2月はそれなりに頑張れたと思う。
2月の後半からプロジェクトが忙しくなり、人も減りで自分への負荷が高くなりそう。
ちょっと、2月のペースはきついけど、無理せずに楽しめたらと思う。

リトライ処理をbackoffモジュールで簡単に実装する

リトライ処理を自分で作り込まずに、backoffモジュールを活用することでよりコードがシンプルになる。
これからはbackoffモジュールを活用していきたい。

backoffモジュールとは

backoffモジュールは条件が満たされるまでリトライするためのデコレータ。
ネットワークリソースやAPI、信頼性が低く、障害が発生する可能性のあるリソースにアクセスする際、使用することを目的としている。

使い方

backoff.on_exceptionデコレータにリトライする対象のExceptionクラスを定義。
第一パラメータのbackoff.expoはひとまずおまじないと思っていればいい。
下記の例だと、Exceptionが発生し続ければ、ずっとリトライする。

import requests
import backoff

@backoff.on_exception(backoff.expo,
                      Exception)
def handler():

    print("Start handler")
    try:
        response = requests.get("http://localhost:8081")
    except Exception as e:
        print("occur Exception")

        raise e
    
    print("End handler")

if __name__ == '__main__':
    handler()

リトライの設定

max_triesパラメータは、実行回数を設定する。
例えば、実行失敗時に、2回までリトライをさせる場合、最初の実行も回数に含めて3を設定する。

import requests
import backoff
import time

@backoff.on_exception(backoff.expo,
                      requests.exceptions.RequestException,
                      max_tries=5,
                      jitter=None)
def handler():

    print("Start handler")
    try:
        start = time.time()
        response = requests.get("http://localhost:8000", timeout=(5.0, 5.0))
    except Exception as e:
        end = time.time()
        print("occur Exception")
        print("Execute time = " + str(end - start))
        print(e)

        raise e
    
    end = time.time()
    print("End handler time = " + str(end - start))

if __name__ == '__main__':
    handler()

再実行時のリトライ間隔の設定

再実行時のリトライ間隔は、設定するWatiGeneratorで決まる。
backoff.expoを指定した場合、指数関数で増加する。デフォルトの場合、2のX乗。
backoff.fiboを指定した場合、フィボナッチ数列で増加する。
backoff.constantを指定した場合、リトライ間隔は一定となる。デフォルトは1秒。

backoff.constantのリトライの間隔を変更したい場合、intervalパラメータを指定する。
リトライの間隔を5秒に変更したい場合は、interval=5を指定する。

@backoff.on_exception(backoff.constant,
                      Exception,
                      jitter=None,
                      interval=5

リトライ間隔を1秒、10秒、20秒のようにカスタマイズしたい場合、intervalパラメータにリストで間隔を設定する。

@backoff.on_exception(backoff.constant,
                      Exception,
                      jitter=None,
                      interval=[1, 10, 20]

リトライ時のばらつきを含める

backoffモジュールにはリトライ時のタイミングにばらつきを持たせるために、jitterパラメータが用意されている。
複数のユーザが操作によるリトライが発生した場合、リトライによる際実行が同じになり、多くのリクエストが発生する。
そのため、jitterによる時間のずれを含めることで、リトライのタイミングをずらして再実行によるリクエストの増加を防ぐ。
実行のタイミングのずれを含めたくない場合、jitter=Noneの指定が必要になる。

beyondjapan.com

https://aws.typepad.com/sajp/2015/03/backoff.html

aws.amazon.com

最後に

backoffモジュールを使うことで簡単にリトライをするための実装が可能となる。
ほかにもon_predicateを使用して、ポーリングも続けることができる。
また、複数のデコレータを指定することもの可能だ。
backoffモジュールを使い倒すことで、リトライ系のことは何でもできそうな気がする。
これからは自分でリトライ処理は作り込まず、backoffモジュールを活用していきたい。

デコレータでタイムアウトを設定しよう!!

時間がかかる処理に対して、タイムアウトを設定したい場合に使用するためのデコレータとして、timeout_decorator.timeoutがあるので、使ってみた。 用途として、サーバに対する検索のリクエストを実行した際に、検索に時間がかかる場合にタイムアウトを設定する。

github.com

timeout_decorator.timeoutデコレータのパラメータに関数がコールされてからタイムアウトする 秒数を指定。 下記の例の場合、10秒スリープする処理を入れているが、デコレータに5秒でタイムアウトするように設定している。 5秒が経過すると、timeout_decorator.timeout_decorator.TimeoutErrorが発生する。

import time
import timeout_decorator

# 5秒後にタイムアウトする
@timeout_decorator.timeout(5)
def call_func():
    print("Start call_func")
    time.sleep(10)
    print("End call_func")

if __name__ == '__main__':
    i = 0
    while i < 3:
        call_func()
        i += 1

独自のエラーをライズしたい

timeout_decorator.timeout_decorator.TimeoutErrorではなく、独自で定義した Exceptionクラスをレイズしたい場合、timeout_exceptionパラメータにレイズしたい Exceptionクラスを指定することで可能。

下記の例だと、timeout_exceptionパラメータにNewErrorクラスを指定。 タイムアウトが発生すると、NewErrorクラスがレイズされる。

import time
import timeout_decorator

class NewError(Exception):
    pass

# 5秒後にタイムアウトする
@timeout_decorator.timeout(5, timeout_exception=NewError)
def call_func():
    print("Start call_func")
    time.sleep(10)
    print("End call_func")

if __name__ == '__main__':
    i = 0
    while i < 3:
        call_func()
        i += 1

時間がかかる検索にはタイムアウトを設定しよう

HTTPリクエストで時間がかかる処理をタイムアウトするために、timeout_decorator.timeoutデコレータを 使ってタイムアウトを設定しよう。 ただ、requestsモジュールにはタイムアウトの機能が存在するので、requestsモジュールを使用する場合は、 requestsモジュールのタイムアウト機能を素直に使いましょう。

import timeout_decorator
import requests

@timeout_decorator.timeout(10)
def call_func():

    print("Send Request!!")
    response = requests.get("http://localhost:8000"params=None)
    print("Receive Response!!")

if __name__ == '__main__':
    i = 0
    while i < 3:
        call_func()
        i += 1

requests.readthedocs.io

最後に

モジュールをインストールして、デコレータの設定のみでタイムアウト機能が実装できた。 他にもタイムアウトを実装したモジュールがあるようなので、用途によって使い分けたほうがいいかと思う。

scrapbox.io

FastAPIのエラーハンドラーを使ってコードをシンプルに!!

エラーハンドラーの設定をすることで、try-except毎にレスポンスの組み立てが不要になり、コードがシンプルになると考える。
自分はこのエラーハンドラーの機能があり、助かってる。

エラーハンドラーの設定

特定のエラーに対する応答結果をカスタマイズしたい場合、@app.exception_handlerを設定する。

@app.exception_handler(ValidationError)
async def handle_validation_error(request: Request, exc: ValidationError):
    return JSONResponse(
        status_code=400,
        content={"message": "ValidationErrorが発生したよ"}
    )

exception_handlerデコレータのパラメータにハンドリングするExceptionクラスを指定する。
関数のパラメータは、第一パラメータにRequestクラス、第二パラメータには処理するExceptionクラスを指定。
最後に、返却したい応答結果をJSONResponseクラスを使って、応答結果を設定する。 JSONResponseクラスのstatus_codeステータスコードcontentパラメータに応答内容を設定。

上記の場合、ステータスコードが400、応答内容にJSONのmessageキーにValidationErrorが発生したよの値を設定したデータを返す。

動作確認

応答結果を組み立てる際に、pydanticでValidationErrorが発生するソースコードを用意して実際に動かしてみる。

実際に、動作をさせてみると。
エラーハンドラーの設定がない場合、500エラーが返却される。

エラーハンドラーの設定がある場合、期待する応答が返却される。

最後に

このエラーハンドリングを使うことで、返却したい応答用のExceptionクラスをレイズすれば、応答結果を返すことができる。 そうすることで、try-except毎に応答結果を生成する処理は不要になり、コードがシンプルになると考える。 自分は、try-except毎に応答結果の処理を作りたくないので、エラーハンドリングの機能があって助かっている。

参考

fastapi.tiangolo.com

ソースコード

pydanticのインストールも必要。

from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field, PositiveInt, RootModel, ValidationError
from typing import List
from logging import getLogger, StreamHandler

logger = getLogger(__name__)
logger.addHandler(StreamHandler())
logger.setLevel("INFO")


class Product(BaseModel):
    id: int = Field(...)
    name: str = Field(...)
    cost : PositiveInt = 0

class Products(RootModel):
    root: List[Product]


app = FastAPI()

@app.get("/")
def hello():

    product = Product(id=1, name="チョコ", cost=100)
    products = Products.model_validate(product)

    return products

@app.exception_handler(ValidationError)
async def handle_validation_error(request: Request, exc: ValidationError):
    logger.info("Call Exception Handler!!")
    return JSONResponse(
        status_code=400,
        content={"message": "ValidationErrorが発生したよ"}
    )

Pythonのclick.commandを使えば簡単!コマンドの実装方法

シェルスクリプトで頑張って作っていたパラメータの解析もPythonのclick.commandを使えば簡単に実装できるし、コマンドで必要なことがモジュールとして提供されている。
シェルスクリプトから脱却して、Pythonでコマンドを作ろうぜ。

フォントの色、スタイルを変更できる

style()関数を使用することで、フォントの色やスタイルを変更が可能。

click.echo(click.style('Hello World!', fg='green'))
click.echo(click.style('Some more text', bg='blue', fg='white'))
click.echo(click.style('ATTENTION', blink=True, bold=True)) 

ヘルプテキストの記載が簡単

シェルスクリプトのヘルプメッセージは地味に大変。
click.commandだと関数のdocstringが指定されている場合、自動的にdocstringの内容が使用される。

@click.command()
@click.argument("name")
def hello(name: str):

    """docstringの内容がヘルプに指定されるよ"""

    click.echo("name: " + name )
Usage: clickcmd_sample.py [OPTIONS] NAME

  docstringの内容がヘルプに指定されるよ

Options:
  --help  Show this message and exit.

環境変数

auto_envvar_prefixに環境変数プレフィックスを設定する必要がある。
auto_envvar_prefixに指定したプレフィックスとアンダーバーを除いた文字列が変数名になる。
以下の場合、HELLO_USERNAMEとHELLO_JOBが環境変数となり、環境変数の値が格納される変数名は、usernameとjobになる。

@click.command()
@click.option('--username')
@click.option("--job", envvar="JOB")
def hello(username, job):

    click.echo("username = " + str(username))
    click.echo("job      = " + str(job))

if __name__ == "__main__":
    hello(auto_envvar_prefix="HELLO")
# export HELLO_USERNAME=sato
# export HELLO_JOB=engineer 
# python clickcmd_sample.py 
username = sato
job      = engineer

ユーザー入力のためのプロンプト表示も

コマンドのパラメータ指定ではなく、ユーザーに値を入力させたいケースがある。
そのケースもclick.commandは対応している。
click.optionデコレーターのpromptパラメータにTrueを指定するだけ。

import click

@click.command()
@click.option('--brithplace', prompt=True)
def hello(brithplace):

    click.echo("brithplace : " + str(brithplace))

if __name__ == "__main__":
    hello()

プロンプトのメッセージもカスタマイズをする場合、promptパラメータに表示したいメッセージを指定する。

import click

@click.command()
@click.option('--brithplace', prompt="あなたの出身地は?")
def hello(brithplace):

    click.echo("入力した出身地 : " + str(brithplace))

if __name__ == "__main__":
    hello()
あなたの出身地は?: 神奈川
入力した出身地 : 神奈川

パスワード入力も簡単に実装できる。
hide_inputパラメータをTrueに設定し、入力を非表示にする。
また、confirmation_promptで確認用に再入力をさせる。
試してみて分かったことは、パスワード入力を数回間違えれば、コマンドが終了するのではなく、ずっとパスワード入力が求められるということ。
回数指定出来たらいいのに。

import click

@click.command()
@click.option('--password',
              prompt="パスワードを入力してください",
              hide_input=True,
              confirmation_prompt=True)
def hello(password):

    click.echo("入力したパスワード : " + str(password))

if __name__ == "__main__":
    hello()

テストも簡単

テスト用のモジュールも提供されているので、テストが容易。
以下をテストをする。

import click

@click.command()
@click.argument("brithplace")
@click.argument("schoolname")
def show_your_birthplace(brithplace, schoolname):
    click.echo("brithplace = " + str(brithplace))
    click.echo("schoolname = " + str(schoolname))

if __name__ == '__main__':
    show_your_birthplace()

CliRunnerクラスのオブジェクトを生成し、invoke関数を実行する。
第一パラメータにテスト対象の関数、第二パラメータにパラメータを配列で指定。
invoke関数の戻り値に結果が格納される。
標準出力はresult.outputに格納される。改行含めての文字列になっているので、assertするときは改行で分割したほうが、チェックしやすい。

from click.testing import CliRunner
from click_easy import show_your_birthplace

def test_show_your_birthplace():
    runner = CliRunner()
    result = runner.invoke(show_your_birthplace, ["Kanagawa", "WAHAHA"])
    assert result.exit_code == 0
    assert result.output == "brithplace = Kanagawa\nschoolname = WAHAHA\n"

ユーザーの入力させる処理もinput変数を使用すればテスト可能。
これは嬉しい。

import click

@click.command()
@click.option("--brithplace", prompt="出身は?")
@click.option("--schoolname", prompt="出身校は?")
def show_your_birthplace(brithplace, schoolname):
    click.echo("brithplace = " + str(brithplace))
    click.echo("schoolname = " + str(schoolname))

if __name__ == '__main__':
    show_your_birthplace()

input変数に入力する値を設定。
output変数には入力を求める文字列も含まれるので、合わせて試験が可能。

from click.testing import CliRunner
from click_easy import show_your_birthplace

def test_show_your_birthplace():
    runner = CliRunner()
    result = runner.invoke(show_your_birthplace, input="Kanagawa\nWAHAHA\n")
    assert result.exit_code == 0
    #assert result.output == "出身は?: Kanagawa\n出身校は?: WAHAHA\nbrithplace = Kanagawa\nschoolname = WAHAHA\n"
    outputs = result.output.split('\n')
    assert len(outputs)-1 == 4 # 末尾の改行を除く
    assert outputs[0] == "出身は?: Kanagawa"
    assert outputs[1] == "出身校は?: WAHAHA"
    assert outputs[2] == "brithplace = Kanagawa"
    assert outputs[3] == "schoolname = WAHAHA"
    assert outputs[4] == ""

さいごに

コマンドはほぼシェルスクリプトで作成しているので、単体テストが後回しになったり、全ルートを通すために苦労する。
それにシェルスクリプトのテストツールも少ない。そもそも、シェルスクリプトのテストツールを使っているところ自体が少ない。
それに比べて、Pythonでコマンドに必要なことがモジュールで提供されているので、実装も楽だし、テストも容易。これはPythonでコマンド作ろぜってことだ。
自分のプロジェクトでも機会があれば、Pythonでコマンドを作ろうと思う。

参考

click.palletsprojects.com

kotapontan.hatenablog.com