You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

185 lines
3.9 KiB

import os
import time
from flask import render_template, request, send_from_directory
from werkzeug.utils import secure_filename
from common.logger import system_logger
# ==========================
# 路径配置
# ==========================
BASE_DIR = os.path.dirname(
os.path.dirname(os.path.abspath(__file__))
)
TEMP_FOLDER = os.path.join(
BASE_DIR,
"uploads",
"temp"
)
os.makedirs(TEMP_FOLDER, exist_ok=True)
# ==========================
# 限制配置
# ==========================
MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB
UPLOAD_LIMIT = 5 # 次数
LIMIT_WINDOW = 60 * 60 # 10分钟
UPLOAD_RECORD = {} # IP限流记录
# ==========================
# 工具函数
# ==========================
def get_client_ip():
forwarded = request.headers.get("X-Forwarded-For")
if forwarded:
return forwarded.split(",")[0].strip()
return request.remote_addr
def check_rate_limit(ip):
now = time.time()
if ip not in UPLOAD_RECORD:
UPLOAD_RECORD[ip] = []
# 清理过期记录
UPLOAD_RECORD[ip] = [
t for t in UPLOAD_RECORD[ip]
if now - t < LIMIT_WINDOW
]
if len(UPLOAD_RECORD[ip]) >= UPLOAD_LIMIT:
remain = int(LIMIT_WINDOW - (now - UPLOAD_RECORD[ip][0]))
system_logger.info(
f"[TempUpload] rate limited ip={ip} remain={remain}s"
)
raise Exception(f"上传过于频繁,请 {remain} 秒后重试")
UPLOAD_RECORD[ip].append(now)
# ==========================
# 页面主逻辑
# ==========================
def temp_upload_page():
message = None
ip = get_client_ip()
if request.method == "POST":
try:
file = request.files.get("file")
if not file or not file.filename:
system_logger.info(
f"[TempUpload] upload rejected (no file) ip={ip}"
)
raise Exception("请选择文件")
# 文件大小检测
file.seek(0, os.SEEK_END)
file_size = file.tell()
file.seek(0)
if file_size > MAX_FILE_SIZE:
system_logger.info(
f"[TempUpload] file too large ip={ip} size={file_size}"
)
raise Exception("文件超过50MB限制")
# IP限流
check_rate_limit(ip)
filename = secure_filename(file.filename)
save_path = os.path.join(
TEMP_FOLDER,
filename
)
file.save(save_path)
message = f"上传成功:{filename}"
system_logger.info(
f"[TempUpload] upload success "
f"file={filename} ip={ip} size={file_size}"
)
except Exception as e:
message = str(e)
system_logger.info(
f"[TempUpload] upload failed ip={ip} err={e}"
)
# ==========================
# 文件列表
# ==========================
files = []
for filename in os.listdir(TEMP_FOLDER):
file_path = os.path.join(TEMP_FOLDER, filename)
if os.path.isfile(file_path):
size = os.path.getsize(file_path)
files.append({
"name": filename,
"size": round(size / 1024 / 1024, 2)
})
files.sort(key=lambda x: x["name"], reverse=True)
system_logger.info(
f"[TempUpload] list files ip={ip} count={len(files)}"
)
return render_template(
"temp_upload.html",
files=files,
message=message
)
# ==========================
# 下载
# ==========================
def download_temp_file(filename):
ip = get_client_ip()
system_logger.info(
f"[TempUpload] download file={filename} ip={ip}"
)
return send_from_directory(
TEMP_FOLDER,
filename,
as_attachment=True
)