在 FastAPI 中使用 WebSocket 非常簡單。首先需要導入 WebSocket 類和 WebSocketDisconnect 異常類,然后在路由函數中添加一個 WebSocket 參數來處理 WebSocket 連接。
以下是一個簡單的例子:
from fastapi import FastAPI, WebSocket
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Message text was: {data}")
在這個例子中,我們創建了一個 WebSocket 端點 /ws
,當有客戶端連接時,會調用 websocket_endpoint
函數處理連接。在函數中我們首先調用 await websocket.accept()
來接受連接,并開始一個無限循環來接收和發送消息。
當客戶端發送消息時,我們通過 await websocket.receive_text()
方法來接收消息,并通過 await websocket.send_text()
方法來發送消息給客戶端。
在 FastAPI 中使用 WebSocket 很容易,你可以根據自己的需求來處理 WebSocket 連接和消息。