|
|
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
|
|
|
from fastapi.responses import HTMLResponse
|
|
|
from fastapi.staticfiles import StaticFiles
|
|
|
from fastapi.templating import Jinja2Templates
|
|
|
from fastapi.requests import Request
|
|
|
import uvicorn
|
|
|
|
|
|
app = FastAPI()
|
|
|
|
|
|
# 挂载模板目录
|
|
|
templates = Jinja2Templates(directory="templates")
|
|
|
|
|
|
# 活跃连接列表
|
|
|
active_connections = []
|
|
|
|
|
|
# WebSocket 与昵称映射表
|
|
|
usernames = {}
|
|
|
|
|
|
# 首页路由
|
|
|
@app.get("/")
|
|
|
async def get(request: Request):
|
|
|
return templates.TemplateResponse("chat.html", {"request": request})
|
|
|
|
|
|
# WebSocket 路由
|
|
|
@app.websocket("/ws")
|
|
|
async def websocket_endpoint(websocket: WebSocket):
|
|
|
await websocket.accept()
|
|
|
active_connections.append(websocket)
|
|
|
await broadcast_online_count() # 一连上就广播在线人数
|
|
|
|
|
|
try:
|
|
|
# 接收第一个消息作为昵称
|
|
|
username = await websocket.receive_text()
|
|
|
usernames[websocket] = username
|
|
|
|
|
|
await broadcast(f"[系统]{username} 加入了聊天室")
|
|
|
await broadcast_online_count()
|
|
|
|
|
|
while True:
|
|
|
data = await websocket.receive_text()
|
|
|
await broadcast(f"{username}:{data}")
|
|
|
|
|
|
except WebSocketDisconnect:
|
|
|
active_connections.remove(websocket)
|
|
|
left_username = usernames.pop(websocket, "匿名用户")
|
|
|
|
|
|
await broadcast(f"[系统]{left_username} 离开了聊天室")
|
|
|
await broadcast_online_count()
|
|
|
|
|
|
# 广播消息给所有连接
|
|
|
async def broadcast(message: str):
|
|
|
to_remove = []
|
|
|
for connection in active_connections:
|
|
|
try:
|
|
|
await connection.send_text(message)
|
|
|
except:
|
|
|
to_remove.append(connection)
|
|
|
for conn in to_remove:
|
|
|
active_connections.remove(conn)
|
|
|
usernames.pop(conn, None)
|
|
|
|
|
|
# 广播在线人数
|
|
|
async def broadcast_online_count():
|
|
|
message = f"[人数]{len(active_connections)}"
|
|
|
to_remove = []
|
|
|
for conn in active_connections:
|
|
|
try:
|
|
|
await conn.send_text(message)
|
|
|
except:
|
|
|
to_remove.append(conn)
|
|
|
for conn in to_remove:
|
|
|
active_connections.remove(conn)
|
|
|
usernames.pop(conn, None)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
uvicorn.run("chat:app", host="0.0.0.0", port=8202) |