ヒトリ歩き

愚痴とかいろいろ書きます

データの受信と送信をWebSocketで実装

WebSocketでサーバからクライアントにデータを送信して、テーブルにデータの表示までをやってみる。
クライアント側も準備しないといけないので、ちょっと手間。

やりたいこと

RESTで受信したデータをWebSocketでクライアントに送信して、クライアント側でテーブルに表示したい。

どんな実装か

サーバー側は、FastAPIでデータの受信部とクライアントにデータを送信する部分を実装。
クライアントは、サーバにWebSocketで接続しデータをテーブルに表示する処理を実装。

サーバー側

データ受信は、/send_dataで取得する。
取得したデータをクライアントに送信できる形へ加工して、グローバルの配列に設定している。

queue = []
@app.post("/send_data")
async def send_data(request: EventMessages):
    logger.info("START")
    for item in request.root:
        data = item.model_dump()
        dt_now = datetime.datetime.now()
        message = (
            data["message"] + " Now DateTime : " + dt_now.strftime("%Y-%m-%d %H:%M:%S")
        )

        id = str(uuid.uuid1())
        data["message"] = message
        data["id"] = id
        global queue
        queue.append(data)

    return {"result": "OK"}

クライアントから/wsに接続が来たら、定期的に接続しているクライアントにデータを送る。
送信するデータはグローバル変数の配列に保存されているデータを送信する。

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await manager.connect(websocket)
    logger.info("WebSocket Connection!!!")
    try:
        while True:
            global queue
            while True:
                logger.info(f"Status = {websocket.application_state}")
                logger.info("Current Queue Size = " + str(len(queue)))
                item = None
                try:
                    item = queue.pop(0)
                except IndexError:
                    await asyncio.sleep(5)
                    continue
                await manager.broadcast(item)
                logger.info(f"send {item}")
                await asyncio.sleep(5)

    except WebSocketDisconnect:
        manager.disconnect(websocket)

クライアント側

ReactでMUIのDataGridを使って、受信したデータを表示。

function App() {
  //Public API that will echo messages sent to it back to the client
  const [socketUrl, setSocketUrl] = useState(WS_URL);
  const [messageHistory, setMessageHistory] = useState([]);
  const [rows, setRows] = useState([]);

  const { sendMessage, lastJsonMessage, readyState } = useWebSocket(socketUrl);

  // 受信したデータを設定する
  useEffect(() => {
    if (lastJsonMessage !== null) {
      setMessageHistory((prev) => {
        for (let i = 0; i < prev.length; i++) {
          if (prev[i].id === lastJsonMessage.id) {
            return prev;
          }
        }
        prev.push(lastJsonMessage);
        return prev;
      })
    }
  }, [lastJsonMessage]);

  useEffect(() => {
    setRows([...messageHistory]);
  }, [lastJsonMessage])

  const handleClickChangeSocketUrl = useCallback(
    () => setSocketUrl(WS_URL),
    []
  );

  const handleClickSendMessage = useCallback(() => sendMessage('Hello'), []);

  const connectionStatus = {
    [ReadyState.CONNECTING]: 'Connecting',
    [ReadyState.OPEN]: 'Open',
    [ReadyState.CLOSING]: 'Closing',
    [ReadyState.CLOSED]: 'Closed',
    [ReadyState.UNINSTANTIATED]: 'Uninstantiated',
  }[readyState];
  
  const columns = [
    { field: 'id', headerName: 'id', width: 300 },
    { field: 'severity', headerName: 'Severity', width: 150 },
    { field: "message", headerName: "Message", width: 500}
  ];

  return (
    
    <div>
      <div style={{ height: 600, width: '100%' }}>
        <DataGrid rows={rows} columns={columns} />
      </div>
    </div>
  );
};

export default App

結果

2つのブラウザを表示して、それぞれでデータを受信して表示することが出来た。

ただ、問題点としては、最後に受信したメッセージが、次のメッセージを受信するまで表示されないことや、サーバ側がソースの修正で再リロードしても、ソケットがクローズせずにループし続けるという問題がある。
これは今後、機会があれば調べて解決しておこうと思う。

最後に

FastAPIのWebSocketを使ってクライアントと接続してデータの表示までやってみた。
どちらかというとクライアント側の準備に手間取って時間がかかったが、何とかデータを表示することが出来てよかった。
キューの役割でグローバル変数の配列を使用したけど、実際の運用ではこのやり方はNGなのでもう少しアーキテクチャは 考える必要があるなと考えるきっかけになった。

[解決] ソケットがクローズせずにループし続けるという問題

送信データがない場合は、WebSocketのsend_jsonをコールせずにcontinueしてたからだった。
ソースの修正で再リロード時にsend_jsonをコールすると例外がスローされるのでその時点でソケット通信のクローズが 出来たのでリロードもうまくいった。

            while True:
                logger.info(f"Status = {websocket.application_state}")
                logger.info("Current Queue Size = " + str(len(queue)))
                item = None
                try:
                    item = queue.pop(0)
                except IndexError:
                    await asyncio.sleep(5)
                    # continue // ここのせいだった
                logger.info(item)
                await manager.broadcast(item)
                logger.info(f"send {item}")
                await asyncio.sleep(5)

    except WebSocketDisconnect:
        manager.disconnect(websocket)