ヒトリ歩き

愚痴とかいろいろ書きます

データ分析の第一歩:Pandasを使ったデータ型と操作

streamlitでも出てきたpandasを使ったことがないので、基本的な部分をやってみる。
これを使いこなして、品質データもサクッと集計したり消化スケジュールもサクッとグラフ化して雑務の工数を減らしたい。

pandasのデータ型

Pandasのデータ型としてはSeriesとDataFrameという2つのものがある。

Series

1次元データ構造と言われる1列のみのデータ型。

s = [1, 2, 3, 4, 5]
series = pd.Series(s)
print(series)

左側の番号が行ラベルと言われるindex indexを指定しない場合、0から開始になる。

0    1
1    2
2    3
3    4
4    5
dtype: int64

indexを指定する場合、indexパラメータに値を設定。

s = [1, 2, 3, 4, 5]
index = ["a", "b", "c", "d", "e"]
series = pd.Series(s, index=index)
print(series)

・出力結果
a    1
b    2
c    3
d    4
e    5

DataFrame

2次元データ構造と言われるデータ構造。 2次元配列で表すデータよりは、Excelで作る表をイメージするとわかりやすい。

data = [
    ["tanaka", 20, "female", 1, 600],
    ["suzuki", 30, "male", 8, 492],
    ["tanaka", 23, "male", 3, 890],
    ["sato", 28, "male", 4, 833],
]
column = ["name", "age", "m/f", "y of experience", "score"]
df = pd.DataFrame(data=data, columns=column)
print(df)

出力結果はこうなる。

        0   1       2  3    4
0  tanaka  20  female  1  600
1  suzuki  30    male  8  492
2  tanaka  23    male  3  890
3    sato  28    male  4  833

DataFrameは、indexとcolumnsがある。 indexを指定できたように、columnsも列名を指定できる。

data = [
    ["tanaka", 20, "female", 1, 600],
    ["suzuki", 30, "male", 8, 492],
    ["tanaka", 23, "male", 3, 890],
    ["sato", 28, "male", 4, 833],
]
column = ["name", "age", "m/f", "y of experience", "score"]
df = pd.DataFrame(data=data, columns=column)
print(df)

・出力結果
     name  age     m/f  y of experience  score
0  tanaka   20  female                1    600
1  suzuki   30    male                8    492
2  tanaka   23    male                3    890
3    sato   28    male                4    833

データの取得

データの取得方法にはいくつか方法がある。

特定の列または行を取得

pandasは、[]にキーを入力すると列名と判断し、スライスを入力すると行ラベルと判断して処理する。

www.sejuku.net

import pandas as pd

# DataFrame
# 2次元データ構造
data = [
    ["tanaka", 20, "female", 1, 600],
    ["suzuki", 30, "male", 8, 492],
    ["tanaka", 23, "male", 3, 890],
    ["sato", 28, "male", 4, 833],
]
column = ["name", "age", "m/f", "y of experience", "score"]
df = pd.DataFrame(data=data, columns=column)
print(df)

# 特定の列を取得
print("ageカラムを取得")
print(df["age"])

# 特定の行を取得
print("3行目を取得")
print(df[2:3])

# 実行結果
ageカラムを取得
0    20
1    30
2    23
3    28
Name: age, dtype: int64
3行目を取得
     name  age   m/f  y of experience  score
2  tanaka   23  male                3    890

特定の列と行の取得

locを使用した列と行の取得は列名と行名を使用できる。

import pandas as pd

# DataFrame
# 2次元データ構造
data = [
    ["tanaka", 20, "female", 1, 600],
    ["suzuki", 30, "male", 8, 492],
    ["tanaka", 23, "male", 3, 890],
    ["sato", 28, "male", 4, 833],
    ["takano", None, "male", 20, 234],
]
column = ["name", "age", "m/f", "y of experience", "score"]
index = ["0001", "0002", "0003", "0004", "0005"]
df = pd.DataFrame(data=data, columns=column, index=index)
print(df)

# 特定の列と行を取得
print("loc関数で取得")
print(df.loc["0003", "age":"m/f"])
print(df.loc["0004":])
# 実行結果
loc関数で取得
age    23.0
m/f    male
Name: 0003, dtype: object
        name   age   m/f  y of experience  score
0004    sato  28.0  male                4    833
0005  takano   NaN  male               20    234

ilocの場合は、列名や行名は使用できない。 インデックス番号を使用する必要がある。

import pandas as pd

# DataFrame
# 2次元データ構造
data = [
    ["tanaka", 20, "female", 1, 600],
    ["suzuki", 30, "male", 8, 492],
    ["tanaka", 23, "male", 3, 890],
    ["sato", 28, "male", 4, 833],
    ["takano", None, "male", 20, 234],
]
column = ["name", "age", "m/f", "y of experience", "score"]
index = ["0001", "0002", "0003", "0004", "0005"]
df = pd.DataFrame(data=data, columns=column, index=index)
print(df)

# 特定の列と行を取得
print("ilocを使用")
print(df.iloc[2:, 1:3])
print(df.iloc[3:])
ilocを使用
       age   m/f
0003  23.0  male
0004  28.0  male
0005   NaN  male
        name   age   m/f  y of experience  score
0004    sato  28.0  male                4    833
0005  takano   NaN  male               20    234

locとilocで同じような使い方だが、行列のスライスの指定の結果が異なる。 locの場合は、スライスの最後のインデックスが含まれるが、ilocの場合は、スライスの最後のインデックスは含まれない。

先頭と末尾の取得

先頭のデータを取得する際は、head関数を使用する。 また、末尾のデータを取得する場合は、tail関数を使用する。 ともにデフォルトは5件取得。 ここでは、2を指定することで、2件取得している。

import pandas as pd
# DataFrame
# 2次元データ構造
data = [
    ["tanaka", 20, "female", 1, 600],
    ["suzuki", 30, "male", 8, 492],
    ["tanaka", 23, "male", 3, 890],
    ["sato", 28, "male", 4, 833],
    ["takano", None, "male", 20, 234],
]
column = ["name", "age", "m/f", "y of experience", "score"]
index = ["0001", "0002", "0003", "0004", "0005"]
df = pd.DataFrame(data=data, columns=column, index=index)

# 先頭と末尾を取得
print("= 先頭 ==========")
print(df.head(2))

print("= 末尾 ==========")
print(df.tail(2))
# 実行結果
= 先頭 ==========
        name   age     m/f  y of experience  score
0001  tanaka  20.0  female                1    600
0002  suzuki  30.0    male                8    492
= 末尾 ==========
        name   age   m/f  y of experience  score
0004    sato  28.0  male                4    833
0005  takano   NaN  male               20    234

条件指定での行取得

query関数を使用することで、データベースを検索するように条件を指定してデータを取得できる。

import pandas as pd

# DataFrame
# 2次元データ構造
data = [
    ["tanaka", 20, "female", 1, 600],
    ["suzuki", 30, "male", 8, 492],
    ["tanaka", 23, "male", 3, 890],
    ["sato", 28, "male", 4, 833],
    ["takano", None, "male", 20, 234],
]
column = ["name", "age", "m/f", "y of experience", "score"]
index = ["0001", "0002", "0003", "0004", "0005"]
df = pd.DataFrame(data=data, columns=column, index=index)

# 条件指定
print("== ageが30より小さい行を取得 ======")
print(df.query("age < 30"))
print("== nameがtanakaの行を取得 ======")
print(df.query("name == 'tanaka'"))
print("== nameがtanakaかつageが23の行を取得 ======")
print(df.query("name == 'tanaka' and age == 23"))
# 実行結果
== ageが30より小さい行を取得 ======
        name   age     m/f  y of experience  score
0001  tanaka  20.0  female                1    600
0003  tanaka  23.0    male                3    890
0004    sato  28.0    male                4    833
== nameがtanakaの行を取得 ======
        name   age     m/f  y of experience  score
0001  tanaka  20.0  female                1    600
0003  tanaka  23.0    male                3    890
== nameがtanakaかつageが23の行を取得 ======
        name   age   m/f  y of experience  score
0003  tanaka  23.0  male                3    890

列と行の追加

列を追加する場合、[列名]=値またはassign関数を使用する。 assign関数を使用した場合、新しいDataFrameのオブジェクトを返す。 行を追加する場合、loc[行名] = 値で追加できる。 また、concat関数でDataFrame同士を連結する方法もある。

import pandas as pd

data = [["tanaka", 20, "female", 1, 600], ["suzuki", 30, "male", 8, 492]]
column = ["name", "age", "m/f", "y of experience", "score"]
df = pd.DataFrame(data=data, columns=column)
print("== 追加前 ================")
print(df)
print("== 列を追加1 ==============")
df["like"] = ["movie", "cook"]
print(df)
print("== 列を追加2 ==============")
df = df.assign(food=["sushi", "apple"])
print(df)
print("== 行を追加1 ==============")
df.loc[2] = ["yamada", 40, "male", 20, 500, "cycle", "beaf"]
print(df)

最後に

pandasの基本的な操作をやってみた。 集計とかその辺をもう少しやってみて、品質データの分析を簡単に集計できるようになったら、嬉しいな。

参考

qiita.com

qiita.com

qiita.com

リーダーの課題解決アプローチ。カネヴィンフレームワーク

ウクライナ軍に入隊したアジャイルコーチがカネヴィンフレームワークを使って、課題等を解決したとのことで、カネヴィンフレームワークというのを初めて気になり、調べてみた。

www.publickey1.jp

トラブルが発生すると、日本企業では原因を探し始める。
欠点や不具合、未達成な成果を探して、原因を分析、真の原因を見つけて、対策を立てる。
これは、不具合原因追及型の問題解決ギャップアプローチと呼ばれる問題解決手法。

この解決手法が全ての問題で解決されるわけではなく、組織のリーダーは課題や問題に囲まれているので、問題や課題によって解決方法が異なってくる。

では、どうやって解決方法を見つけていけばいいのか? そのためのフレームワークがカネヴィンフレーム。

カネヴィンフレームワークとは

ディビット・スノードン教授によって、問題を分類し、その問題に対するアプローチを定義したフレームワーク

問題は単純系複雑系困難系カオス系の4つのカテゴリに分け、カテゴリごとにアクションが異なる。

解決方法は見えている

単純系と呼ばれるものは、問題の原因が誰にでもわかる問題のこと。 ソフトウェアの問題で言うと、エラーログを見ればすぐに修正箇所がわかるだと考えられる。

問題解決に専門的な知識が必要

困難系と呼ばれるものは、因果関係が複雑であり、原因が分からない問題のこと。
ただし、分析すれば原因が見えるもの。
ソフトウェアの品質分析がこれにあたるのではないか考えます。いくつかのバグが発生しており、そのバグが発生した原因を分析し、対策を実施する。 PDCAサイクルの、CheckとActionもここに入るのかな。

Try And Error。やるっきゃない

複雑系の問題は因果関係が明確になっていない、かつ、原因が分からない問題のこと。 組織で起こりがちな問題が多く、未来のありたい姿に向けて未来創造型課題達成型を利用して解決していく。 自分の身近なところだと、プロジェクトで効率化をしているのに、メンバの稼働が下がらないとか。

先が見えない。恐ろしい。

カオス系は、因果関係も見えず、突発的に発生した問題をすぐに対処しなければならない問題のこと。 このカオス系は、必要なありとあらゆるアクションを実行し、一刻もはやい対応が求められる。 コロナウィルスの感染拡大がこれにあたる。 自分の身近なところだと、お客さまの利益に関わるような障害が発生した場合にあたる。 考えただけでも、ぞっとします。

最後に

カネヴィンフレームワークというものを知って、解決に向けてどのようなアプローチを取ればいいのかが考えやすくなると思う。 早速、このフレームワークを取り入れて、業務に活かしたい。

参考

successpoint.co.jp

heart-quake.com

successpoint.co.jp

ソート可能な一意な値を生成するULIDが超絶便利

UUIDは一意な値だが、ソートできない問題がある。
その課題を解決したソート可能な一意な値を生成できるULIDは非常に便利!!

ULIDとは

ULIDは、ソート可能な一意の識別子。
辞書順にソート可能であり、UUIDと128bitの互換性がある。
ULIDの構造は、48bitがタイムスタンプとなっており、80bitがランダムな値。

インストール方法

python-ulidをインストールするだけ。

pip install python-ulid

使ってみた

基本的な使い方は下記のページに載っている。

github.com

  • 現在のタイムスタンプからULIDオブジェクトを生成

現在のタイムスタンプからULIDオブジェクトを生成する場合は、ULIDを実行する。

# 現在のタイムスタンプからULIDオブジェクトを生成
ulid = ULID()
print("ULID = " + str(ulid))
  • タイムスタンプ指定でULIDオブジェクトを生成

指定のタイムスタンプからULIDオブジェクトを生成する場合、ULIDクラスのfrom_datetimeクラスメソッドを実行する。

# 2023-03-02 18:00:00 からULIDオブジェクトを生成
specfic_datetime = datetime.datetime.strptime("2023-03-02 18:00:00 +0000", "%Y-%m-%d %H:%M:%S %z")
ulid = ULID.from_datetime(specfic_datetime)
print("ULID = " + str(ulid))
  • ULIDに含まれる日時を確認

ULIDに含まれる日時を確認する場合、datetimeプロパティを確認する。

# ulidに含まれる日時を確認
print("Datetime = " + ulid.datetime.strftime('%Y-%m-%d %H:%M:%S'))
  • ULIDからUUIDを生成

ULIDからUUIDを生成する場合、to_uuidメソッドを実行する。

# UUIDに変換
uuid_from_ulid = ulid.to_uuid()
print("UUID = " + str(uuid_from_ulid))
  • UUIDからULIDを生成

UUIDからULIDオブジェクトを生成する場合、ULIDクラスのfrom_uuidクラスメソッドを実行する。

# UUIDからulidを生成
ULID.from_uuid(uuid_from_ulid)
print("ULID from uuid = " + str(ulid2))

ソートしてみる

日時指定でULIDオブジェクトを生成する。このときに1日単位でずらして生成。 順序をばらばらに挿入したリストを生成して、sorted関数でソートした結果を確認する。

def sort_execute():
    
    # 日時指定で1日ずらして、ULIDオブジェクトを生成
    specfic_datetime = datetime.datetime.strptime("2023-03-02 18:00:00 +0000", "%Y-%m-%d %H:%M:%S %z")
    ulid1 = ULID.from_datetime(specfic_datetime).to_uuid()
    print("ulid1 = " + str(ulid1))

    specfic_datetime = datetime.datetime.strptime("2023-03-03 18:00:00 +0000", "%Y-%m-%d %H:%M:%S %z")
    ulid2 = ULID.from_datetime(specfic_datetime).to_uuid()
    print("ulid2 = " + str(ulid2))

    specfic_datetime = datetime.datetime.strptime("2023-03-04 18:00:00 +0000", "%Y-%m-%d %H:%M:%S %z")
    ulid3 = ULID.from_datetime(specfic_datetime).to_uuid()
    print("ulid3 = " + str(ulid3))

    specfic_datetime = datetime.datetime.strptime("2023-03-05 18:00:00 +0000", "%Y-%m-%d %H:%M:%S %z")
    ulid4 = ULID.from_datetime(specfic_datetime).to_uuid()
    print("ulid4 = " + str(ulid4))

    # 整列した順番のリストを作成
    expected_list = list()
    expected_list.append(ulid1)
    expected_list.append(ulid2)
    expected_list.append(ulid3)
    expected_list.append(ulid4)

    # 順番をばらばらにしたリストを作成
    list_ulid = list()
    list_ulid.append(ulid2)
    list_ulid.append(ulid4)
    list_ulid.append(ulid1)
    list_ulid.append(ulid3)

    # 整列した順番のリストとばらばらなリストを比較
    print("Before = " + str(list_ulid))
    print("Is Same = " + str(expected_list == list_ulid))

    # ソート後にソートしたリストと整列した順番のリストを比較
    sorted_list = sorted(list_ulid)
    print("After  = " + str(sorted_list))
    print("Is Same = " + str(expected_list == sorted_list))

実行結果を確認すると、ソートできていることがわかる。

ulid1 = 0186a37b-1d00-223a-1718-56350b798aec
ulid2 = 0186a8a1-7900-b053-3768-799da20ec1a3
ulid3 = 0186adc7-d500-9872-0ecd-d7341e5b680a
ulid4 = 0186b2ee-3100-7041-5e91-7f7813f8d84d
Before = [UUID('0186a8a1-7900-b053-3768-799da20ec1a3'), UUID('0186b2ee-3100-7041-5e91-7f7813f8d84d'), UUID('0186a37b-1d00-223a-1718-56350b798aec'), UUID('0186adc7-d500-9872-0ecd-d7341e5b680a')]
Is Same = False
After  = [UUID('0186a37b-1d00-223a-1718-56350b798aec'), UUID('0186a8a1-7900-b053-3768-799da20ec1a3'), UUID('0186adc7-d500-9872-0ecd-d7341e5b680a'), UUID('0186b2ee-3100-7041-5e91-7f7813f8d84d')]
Is Same = True

uuidモジュールで生成したuuidをULIDオブジェクトに変換できる?

from_uuidクラスメソッドを使用すれば変換できる。 ただし、日時は異常な日時になる。

ulid2 = ULID.from_uuid(uuid.uuid4())
print("ULID from uuid = " + str(ulid2))
print(ulid2.datetime)
ULID from uuid = 1S57355C1B940S5RB1VZD2P2FW
3961-09-08 21:15:50.186996+00:00

PostgreSQLのorder by でソート

ULIDで生成したUUIDをPostgreSQLのテーブルに挿入し、OrderByでソートしても期待する順になるか確認してみた。 先ほどのリストのときと同じように、日時をずらしたULIDオブジェクトを生成、ULIDから生成したUUIDをばらばらに挿入。

from ulid import ULID
import datetime
import uuid
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy import select
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
import uuid

class Base(DeclarativeBase):
    pass

class UUID_Sample(Base):
    __tablename__ = 'uuid_sample'

    id: Mapped[uuid.UUID] = mapped_column(primary_key=True)

# DBエンジンを作成
url = "postgresql://postgres:example@localhost:5432/postgres"
engine = create_engine(url, echo=False)

# テーブルをDBに作成
Base.metadata.create_all(engine)
SessionClass = sessionmaker(engine)
session = SessionClass()

def use_db():
    
    # 日時指定で1日ずらして、ULIDオブジェクトを生成
    specfic_datetime = datetime.datetime.strptime("2023-03-02 18:00:00 +0000", "%Y-%m-%d %H:%M:%S %z")
    ulid1 = ULID.from_datetime(specfic_datetime).to_uuid()
    print("ulid1 = " + str(ulid1))
    ulid1_obj = UUID_Sample(id=ulid1)

    specfic_datetime = datetime.datetime.strptime("2023-03-03 18:00:00 +0000", "%Y-%m-%d %H:%M:%S %z")
    ulid2 = ULID.from_datetime(specfic_datetime).to_uuid()
    print("ulid2 = " + str(ulid2))
    ulid2_obj = UUID_Sample(id=ulid2)

    specfic_datetime = datetime.datetime.strptime("2023-03-04 18:00:00 +0000", "%Y-%m-%d %H:%M:%S %z")
    ulid3 = ULID.from_datetime(specfic_datetime).to_uuid()
    print("ulid3 = " + str(ulid3))
    ulid3_obj = UUID_Sample(id=ulid3)

    specfic_datetime = datetime.datetime.strptime("2023-03-05 18:00:00 +0000", "%Y-%m-%d %H:%M:%S %z")
    ulid4 = ULID.from_datetime(specfic_datetime).to_uuid()
    print("ulid4 = " + str(ulid4))
    ulid4_obj = UUID_Sample(id=ulid4)
    session.add_all([ulid2_obj, ulid4_obj, ulid1_obj, ulid3_obj])
    session.commit()

    # 整列した順番のリストを作成
    expected_list = list()
    expected_list.append(ulid1)
    expected_list.append(ulid2)
    expected_list.append(ulid3)
    expected_list.append(ulid4)

    stmt = select(UUID_Sample)
    results = session.scalars(stmt)
    print("Not use order by")
    i = 0
    for result in results:
        print("id = " + str(result.id))
        print("Is Sample = " + str(result.id == expected_list[i]))
        i += 1

    print("Use order by")
    stmt = select(UUID_Sample).order_by(UUID_Sample.id)
    results = session.scalars(stmt)
    i = 0
    for result in results:
        print("id = " + str(result.id))
        print("Is Sample = " + str(result.id == expected_list[i]))
        i += 1

結果はOrder byでソートされていることを確認できた。

ulid1 = 0186a37b-1d00-20ba-f545-49810830ce9d
ulid2 = 0186a8a1-7900-d435-b373-beeca474a2ed
ulid3 = 0186adc7-d500-26fa-2d95-ee12269bf390
ulid4 = 0186b2ee-3100-9cce-b5d3-4ab2c0ca769d
Not use order by
id = 0186a8a1-7900-d435-b373-beeca474a2ed
id = 0186b2ee-3100-9cce-b5d3-4ab2c0ca769d
id = 0186a37b-1d00-20ba-f545-49810830ce9d
id = 0186adc7-d500-26fa-2d95-ee12269bf390
Use order by
id = 0186a37b-1d00-20ba-f545-49810830ce9d
id = 0186a8a1-7900-d435-b373-beeca474a2ed
id = 0186adc7-d500-26fa-2d95-ee12269bf390
id = 0186b2ee-3100-9cce-b5d3-4ab2c0ca769d

最後に

UUIDは一意な値であるが、ソートができない課題を持っている。
ULIDがその課題を解決してくれている。
ULIDで生成したUUIDであればソート可能な値となっているので、テーブルのINDEXはUUIDにだけ設定しておけば不要なINDEXが増えなくてすむのかなと考える。
非常に便利なので、積極的に使っていきたい。

参考

ULIDに関するライブラリが多いとのこと。 今回使用したpython-ulidはメンテされている。

yu00sasaki.com

qiita.com

同一ミリ秒内での生成でも、同じミリ秒内に280 を超えない限り、ソート順が保証されるみたいで、良さそうです。

ここ重要ですね。同一ミリ秒で生成されることはないだろうかと。

Pythonデコレータの実装方法

ロバストPythonでも出てきたが、自分でデコレータを作成してみた。引数ありなしで少し実装が変わるので注意したい。

デコレータ

デコレータは関数の前後に実行したい振る舞いを定義することができる。

コンテキストマネージャとの違いは?

コンテキストマネージャとデコレータは共に、処理の前後で何かしらの処理をしたいときに使うイメージがある。 私の理解だと下記の違いがあると考える。

  • 関数の前後で何かしらの処理をしたい場合に使用
  • ある特定の処理のブロックの前後で何かしらの処理をしたい場合に使用

コンテキストマネージャの場合、with文と使うことでファイルオープンしたオブジェクトをブロック内の処理に渡すことができるので、個人的にコンテキストマネージャの方が使いやすいという印象がある。

自前のデコレータを作るには

引数なしのデコレータと引数ありのデコレータでは実装が少し違うので、実際に作ってみる。

引数なしのデコレータ

下記の例の場合、callingデコレータを用意した。 callingデコレータとなる関数のパラメータにCallableを指定。これがデコレータを付与した関数が設定される。 _wrapper関数をcalling関数内で宣言し、_wrapper関数を返却する。
_wrapper関数は、パラメータとして、*args**kwargsを設定する。 *args**kwargsにデコレータを設定した関数のパラメータが設定される。
下記の例の場合、tanakaと 20*argsに設定されている。

"""
デコレータにする関数を宣言。パラメータはCallable
"""
def calling(func: Callable) -> Callable:
    """
    _wrapper関数を宣言。パラメータは*argsと**kwargs
    デコレータを付与した関数のパラメータが設定される。
    """
    def _wrapper(*args, **kwargs):
        print("calling decorator start ------->")
        print("args = " + str(args))
        # デコレータを付与した関数を実行
        func(*args, **kwargs)
        print("---------> calling decorator end")

    return _wrapper

@calling
def execute(name: str, age: int):
    print("execute start")

if __name__ == '__main__':
    execute("tanaka", 20)

実行すると下記のように、デコレータにした関数の前後のメッセージがデコレータを付与した関数の前後で出力されているのがわかる。

calling decorator start ------->
args = ('tanaka', 20)
execute start
---------> calling decorator end

引数ありのデコレータ

引数ありのデコレータの場合、デコレータにする関数の中で_wrapper関数を呼ぶ内部関数を定義する必要がある。 引数なしのデコレータとの違いは、 デコレータとなる関数から返す関数が_wrapper関数なのか、_wrapper関数をもつ内部関数を返すかの違い。

下記の場合、calling_taxデコレータを用意。パラメータがデコレータで指定するパラメータで戻り値はCallable。 calling_tax関数で内部関数を定義する。 この関数のパラメータはCallableで、デコレータを付与した関数が設定される。
内部関数の_calling_tax関数で_wrapper関数を用意する。ここは引数なしのデコレータと一緒。

"""
デコレータにする関数を宣言
パラメータはそのままデコレータのパラメータになる
"""
def calling_tax(name: str) -> Callable:
    """
    内部関数を宣言。パラメータはCallable
    このCallableは、デコレータを設定した関数が設定される
    """
    def _calling_tax(func: Callable):
        """
        _wrapper関数内で処理を実行
        *args、**kwargsがデコレータを付与した関数のパラメータの値が設定される
        """
        @functools.wraps(func)
        def _wrapper(*args, **kwargs):
            print("calling_tax decorator start ------>")
            print("name = " + name)
            print("args = " + str(args))
            func(*args, **kwargs)
            print("-------> calling_tax decorator end")
        return _wrapper
    return _calling_tax

@calling_tax("watanabe")
def execute_tax(address: str, tel: int):
    print("start execute_tax")
    print("address = " + str(address))
    print("tel     = " + str(tel))

if __name__ == '__main__':
    execute_tax("Kanagawa", 11111)

実行結果は下記のように、デコレータのパラメータも渡されており、デコレータを付与した関数の前後で設定したメッセージが出力されていることがわかる。

calling_tax decorator start ------>
name = watanabe
args = ('Kanagawa', 11111)
start execute_tax
address = Kanagawa
tel     = 11111
-------> calling_tax decorator end

まとめ

自分でデコレータを作成する方法を理解できた。 引数あり/なしで実装の仕方が少し違うので注意したい。
コンテキストマネージャとの使い分けも今後、考えながら自分でデコレータを用意して実装をシンプルに実現できるようにしていきたい。

参考

atmarkit.itmedia.co.jp

zenn.dev

note.nkmk.me

Pythonで簡単なGUI作成: Streamlitの活用

この記事が目に留まり、Streamlitを知ったので、使ってみた。 Pythonで簡単にWebアプリの画面が出来ちゃう

it.impress.co.jp

Streamlit(ストリームリット)

Streamlitは、データ分析型のWebアプリケーションフレームワークとして広く利用されている。
簡単なコーディングでGUIを作成できる。

インストール

streamlitをインストールして、streamlit helloを実行すると、ブラウザが立ち上がりサンプルを確認できる。

pip install streamlit
streamlit hello

streamlit.io

チュートリアルをやってみる

docs.streamlit.io

下記のコードをuber_pickups.pyで作成し、streamlit run uber_pickups.pyを実行するとブラウザに画面が表示される。

import streamlit as st
import pandas as pd
import numpy as np

st.title("Uber pickups in NYC")

Numpyでヒストグラムのデータを生成し、st.bar_chart関数を使うだけで、ヒストグラムが表示できる。

hist_values = np.histogram(data[DATE_COLUMN].dt.hour, bins=24, range=(0, 24))[0]
st.bar_chart(hist_values)

mapにもプロットできる。

st.sliderを使って、スライダーも表示可能。 ラベルと最小、最大、デフォルト値を設定する。

hour_to_filter = st.slider('hour', 0, 23, 17)  # min: 0h, max: 23h, default: 17h

チェックボックスを使って、データの表示/非表示もできる。

ページの遷移の設定も可能。チュートリアルも存在するが、ここでは割愛する。

最後に

たた数行でグラフやマップも表示できるし、他にも入力フォームもPythonだけで出来てしまう。
PySimpleGUIも同じように数行で画面が作成できるけど、PySimpleGUIよりも今どきなWebアプリの画面ができるので、凄いなと感じた。
次回は、入力フォームからデータの登録とグラフの表示をしてみる。

lru_cacheデコレータを使ってみよう

ロバストPythonでも出てきたlru_cacheを試してみる。 キャッシュをデコレータだけで実現でき、標準で備わっているのでいい。

lru_cacheとは

関数をメモ化用の呼び出し可能オブジェクトでラップし、呼び出し最大maxsize回まで保存するデコレータ maxsizeを指定しない場合、上限なしで保存する。

docs.python.org

どんなときに使えるのか

処理に時間がかかる場合に使える。 キャッシュを利用した場合、キャッシュしたデータ元が更新され、キャッシュが更新されていない場合は古いデータが返却されてしまう。 そのため、常に最新データを返す必要がある場合は注意が必要。

使ってみる

呼び出し最大2回まで保存するように設定して実行してみる。

@lru_cache(maxsize=2)
def calculate(a):
    print("call calculate")
    r = 0
    for i in range(10000000):
        r += i

    return r

def main_call():

    for i in range(5):
        calculate(1)
        print(calculate.cache_info())

if __name__ == '__main__':
    main_call()

5回実行してprint文は1回だけなのでキャッシュが効いている。
しかし、説明によれば最大maxsizeまで保存すると書いてあるので、3回目でprint文がでるのかなと思ったが、違うみたい。

call calculate

CacheInfoを確認してみる

ラップされた関数には、cache_info()関数が追加されるようなので、cache_info()関数をコールして、CacheInfoを確認する。 currsizeは、常に1になっている。どうやら、キャッシュで保持しているデータの数みたい。

call calculate
49999995000000
CacheInfo(hits=0, misses=1, maxsize=2, currsize=1)
49999995000000
CacheInfo(hits=1, misses=1, maxsize=2, currsize=1)
49999995000000
CacheInfo(hits=2, misses=1, maxsize=2, currsize=1)
49999995000000
CacheInfo(hits=3, misses=1, maxsize=2, currsize=1)
49999995000000
CacheInfo(hits=4, misses=1, maxsize=2, currsize=1)

追加で以下のプログラムで動作を確認してみる。

@lru_cache(maxsize=2)
def calculate(a):
    print("-- Call calculate function")
    r = 0
    for i in range(10000000):
        r += i

    return r

def main_call():

    print("calculate 1")
    calculate(1)
    print(calculate.cache_info())

    print("calculate 2")
    calculate(2)
    print(calculate.cache_info())

    print("cacheが効いているはず -------")
    print("calculate 1")
    calculate(1)
    print(calculate.cache_info())

    print("calculate 2")
    calculate(2)
    print(calculate.cache_info())

    print("1のキャッシュが削除 -------")
    print("calculate 3")
    calculate(3)
    print(calculate.cache_info())

    print("2と3のcacheが効いているはず -------")
    print("calculate 2")
    calculate(2)
    print(calculate.cache_info())

    print("calculate 3")
    calculate(3)
    print(calculate.cache_info())

    print("1のcacheがない -------")
    print("calculate 1")
    calculate(1)
    print(calculate.cache_info())

実行結果からcalculate 3でキャッシュミスが発生し、calculate 3のキャッシュが登録されて、再度calculate 1の実行で、キャッシュミス(misses)が発生していることがわかる。
デコレータの説明では、最大maxsize回まで保存するとなっているけど、最大maxsize個まで保存するの方が説明としては正しいのかなと。(文章の理解力の問題か・・・)

calculate 1
-- Call calculate function
CacheInfo(hits=0, misses=1, maxsize=2, currsize=1)
calculate 2
-- Call calculate function
CacheInfo(hits=0, misses=2, maxsize=2, currsize=2)
cacheが効いているはず -------
calculate 1
CacheInfo(hits=1, misses=2, maxsize=2, currsize=2)
calculate 2
CacheInfo(hits=2, misses=2, maxsize=2, currsize=2)
1のキャッシュが削除 -------
calculate 3
-- Call calculate function
CacheInfo(hits=2, misses=3, maxsize=2, currsize=2)
2と3のcacheが効いているはず -------
calculate 2
CacheInfo(hits=3, misses=3, maxsize=2, currsize=2)
calculate 3
CacheInfo(hits=4, misses=3, maxsize=2, currsize=2)
1のcacheがない -------
calculate 1
-- Call calculate function
CacheInfo(hits=4, misses=4, maxsize=2, currsize=2)

まとめ

functools.lru_cacheは簡単にキャッシュを実現でき、標準でPythonに組み込まれているので、OSSを改めてインストールする必要がないので、プロジェクトの関係でOSSが利用できないプロジェクトでも導入がしやすいと考える。
自分自身も使う機会があれば、積極的に使ってみたい。

参考

www.fujitsu.com

高速な検索スピードを体感するTimescaleDB

データ集計など時間がかかわるデータの検索に時系列DBは必須。 使い方や関数は早めにキャッチアップが必要だと実感。

時系列DBとは

時間情報を持ったデータを格納し、最適化されたデータベース。 TimescaleDBはその中の1つであり、今回TimescaleDBをお試ししてみる。
詳細は参考の資料を参照。

https://www.sraoss.co.jp/wp-content/uploads/files/event_seminar/material/2021/timescaledb-intro-20210624.pdf

コンテナの起動

本体をインストールしたくないので、コンテナで起動する。 パスワードを設定したい場合、POSTGRES_PASSWORD環境変数に設定する。ここはPostgreSQLと一緒。

docker pull timescale/timescaledb-ha:pg16
docker run -d --name timescaledb -p 5432:5432 -e POSTGRES_PASSWORD=example timescale/timescaledb-ha:pg16
docker exec -it timescaledb psql -U postgres

docs.timescale.com

ハイパーテーブルの作成

TimescaleDBの紹介資料を参照して、ハイパーテーブルを作成する。 通常のテーブルを作成して、create_hypertable関数を実行。
サーバのCPU使用率、メモリ使用率の情報をもつserverspecテーブルを作成。

  • テーブルの作成
CREATE TABLE serverspec (
    time timestamp NOT NULL,
    servername text NOT NULL,
    cpu_userd double precision,
    memory_used double precision
);
  • create_hypertable関数の実行

create_hypertable関数には、ハイパーテーブルに変換するテーブル名、分割の条件にする時間の列名を指定する。 戻り値は、ハイパーテーブルのID、スキーマ名、テーブル名。

SELECT create_hypertable('serverspec', 'time');
WARNING:  column type "timestamp without time zone" used for "time" does not follow best practices
HINT:  Use datatype TIMESTAMPTZ instead.
    create_hypertable    
-------------------------
 (1,public,serverspec,t)

データの操作

実際にデータの挿入、検索、更新、削除をする。

  • 挿入
INSERT INTO serverspec VALUES ('2024-02-18 21:00:00', 'server1', 19.4, 10.4);
INSERT 0 1
  • 検索
SELECT * FROM serverspec;
        time         | servername | cpu_userd | memory_used 
---------------------+------------+-----------+-------------
 2024-02-18 21:00:00 | server1    |      19.4 |        10.4
(1 row)
  • 更新
UPDATE serverspec SET cpu_userd = cpu_userd + 0.1 WHERE time = '2024-02-18 21:00:00' and servername = 'server1
';
UPDATE 1
select * from serverspec;
        time         | servername | cpu_userd | memory_used 
---------------------+------------+-----------+-------------
 2024-02-18 21:00:00 | server1    |      19.5 |        10.4
(1 row)
  • 削除
DELETE FROM serverspec WHERE time = '2024-02-18 21:00:00' and servername = 'server1';

チャンクの確認準備

チャンクの確認のために、事前にデータを投入する。 1分間隔のデータを1年間分投入する。

INSERT INTO serverspec
   SELECT time,
     servername,
     round(random()::numeric * (100 - 0), 1) AS cpu_used,
     round(random()::numeric * (100 - 0), 1) AS memory_used
   FROM generate_series('2024-01-01 00:00:00'::timestamp, '2024-12-31 23:59:00', '1 minute') AS time,
     unnest(ARRAY['server1', 'server2', 'server3']) AS servername;
ANALYZE;

投入データのSQLを少し解説する。 random関数を使用して、0から100の範囲で数値を生成し、round関数で小数第一位を四捨五入する。

round(random()::numeric * (100 - 0), 1)

generate_series関数の第一パラメータに開始値、第二パラメータに終了値、第三パラメータに刻み値を設定する。 下記の場合は、1分刻みのデータを生成する。

generate_series('2024-01-01 00:00:00'::timestamp, '2024-12-31 23:59:00', '1 minute')

www.postgresql.jp

そもそも、チャンクとは

TimescaleDBでのチャンクとは、データを内部的に時間と空間で分割したテーブルのこと。 ユーザは、ハイパーテーブルを介して、チャンクのテーブルを意識することなく、データにアクセスができる。 空間は時間以外にチャンクを分割する条件。 チャンクのデフォルトは、7日ごとに分割される。チャンクを分割する間隔は、create_hypertableを実行する 際に指定可能。

チャンクの確認

登録したデータによって、チャンクがどのように生成されている状態なのかを確認する。 チャンクを確認する場合、show_chunks関数を使用する。

select show_chunks('serverspec');
               show_chunks               
-----------------------------------------
 _timescaledb_internal._hyper_1_1_chunk
 _timescaledb_internal._hyper_1_2_chunk
 _timescaledb_internal._hyper_1_3_chunk
 _timescaledb_internal._hyper_1_4_chunk
 _timescaledb_internal._hyper_1_5_chunk
 _timescaledb_internal._hyper_1_6_chunk
 _timescaledb_internal._hyper_1_7_chunk
 _timescaledb_internal._hyper_1_8_chunk
<省略>
(53 rows)

オプションをつけることで、取得する範囲を指定できる。 2024-12-01よりも古い日付のものを取得。

SELECT show_chunks('serverspec', older_than => DATE '2024-02-01');
              show_chunks               
----------------------------------------
 _timescaledb_internal._hyper_1_2_chunk
 _timescaledb_internal._hyper_1_3_chunk
 _timescaledb_internal._hyper_1_4_chunk
 _timescaledb_internal._hyper_1_5_chunk
 _timescaledb_internal._hyper_1_6_chunk
(5 rows)

docs.timescale.com

チャンクの削除

チャンクの削除は、drop_chunk関数を使用する。 データの削除は、DELETEコマンドでできるが、その場合、不要領域が発生し、再利用するためにVACUUMが必要になる。 チャンクごと削除すれば、不要領域が発生せず、VACUUMを実行する必要がない。

SELECT drop_chunks('serverspec', older_than => DATE '2024-02-01');
              drop_chunks               
----------------------------------------
 _timescaledb_internal._hyper_1_2_chunk
 _timescaledb_internal._hyper_1_3_chunk
 _timescaledb_internal._hyper_1_4_chunk
 _timescaledb_internal._hyper_1_5_chunk
 _timescaledb_internal._hyper_1_6_chunk
(5 rows)

docs.timescale.com

データ取得

実際に、データの取得をいろいろとやってみる。 TimescaleDBのGettingStartedを参考にする。

docs.timescale.com

firstとlast

firstは、別の列をキーに並べ替え、最初の値を返す。 lastは、fistの反対で、最後の値を返す。

SELECT servername, first(cpu_userd, time), last(cpu_userd, time) FROM serverspec WHERE time > now() - INTERVAL '4 days' GROUP BY servername;
 servername | first | last 
------------+-------+------
 server2    |  14.4 | 19.4
 server3    |  26.4 |  2.2
 server1    |   1.4 | 47.2
(3 rows)

time_bucket

time_bucket関数は時間を任意の間隔で丸める関数。 time_bucket関数を使用した場合、任意の時間で丸めることが可能。

SELECT time_bucket('1 hour', time) as bucket, first(cpu_userd, time), last(cpu_userd,time) FROM serverspec GROUP BY bucket LIMIT 10;
       bucket        | first | last 
---------------------+-------+------
 2024-02-01 00:00:00 |  45.8 | 82.3
 2024-02-01 01:00:00 |    45 | 75.3
 2024-02-01 02:00:00 |   2.1 | 67.4
 2024-02-01 03:00:00 |     9 | 66.8
 2024-02-01 04:00:00 |  12.4 |  5.5
 2024-02-01 05:00:00 |  53.8 | 52.9
 2024-02-01 06:00:00 |  12.2 | 69.9
 2024-02-01 07:00:00 |  99.6 | 12.1
 2024-02-01 08:00:00 |  39.2 | 25.8
 2024-02-01 09:00:00 |   3.7 | 29.1

30分間隔にする場合、30 minutesを指定する。

SELECT time_bucket('30 minutes', time) as bucket, first(cpu_userd, time), last(cpu_userd,time) FROM serverspec GROUP BY bucket LIMIT 10;
       bucket        | first | last 
---------------------+-------+------
 2024-02-01 00:00:00 |  45.8 | 36.5
 2024-02-01 00:30:00 |  71.3 | 82.3
 2024-02-01 01:00:00 |    45 | 76.4
 2024-02-01 01:30:00 |    78 | 75.3
 2024-02-01 02:00:00 |   2.1 | 56.5
 2024-02-01 02:30:00 |  29.8 | 67.4
 2024-02-01 03:00:00 |     9 |  0.2
 2024-02-01 03:30:00 |  47.8 | 66.8
 2024-02-01 04:00:00 |  12.4 | 41.1
 2024-02-01 04:30:00 |  37.6 |  5.5

Aggregation

時間ごとのCPUのMAX値などを集計してみる。

SELECT time_bucket('1 day', time) as day, servername, max(cpu_userd) as cpu_max, first(cpu_userd, time)  as open, last(cpu_userd, time) as close, min(cpu_u
serd) as cpu_min FROM serverspec GROUP BY day, servername ORDER BY day DESC, servername;
         day         | servername | cpu_max | open | close | cpu_min 
---------------------+------------+---------+------+-------+---------
 2024-12-31 00:00:00 | server1    |    99.8 | 60.1 |  47.2 |     0.1
 2024-12-31 00:00:00 | server2    |     100 | 14.1 |  19.4 |       0
 2024-12-31 00:00:00 | server3    |    99.9 | 66.8 |   2.2 |     0.1
 2024-12-30 00:00:00 | server1    |     100 | 33.5 |  78.3 |       0
 2024-12-30 00:00:00 | server2    |    99.9 | 44.8 |  34.5 |     0.1
 2024-12-30 00:00:00 | server3    |     100 | 32.5 |  50.7 |     0.1
 2024-12-29 00:00:00 | server1    |    99.8 | 42.7 |  31.4 |     0.1

docs.timescale.com

最後に

TimescaleDBのコンテナの起動からデータの集計を試してみて、PostgreSQL単体よりも検索スピードが高速であることを体感できた。(計測まではしていない) さまざまな機器からの情報を集計し、グラフ化するといった集計処理には最適なデータベースと理解。 Zabbixとも連携ができるように、監視装置からのアラームの情報を集計等でも活躍すると思うので、これからも時系列DBについて勉強しておく。

参考

透過的という言葉とは

qiita.com

パーティショニング

www.postgresql.jp