You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

159 lines
3.8 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import os
import uuid
import pandas as pd
from flask import render_template, request, send_file
from common.logger import system_logger
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
UPLOAD_FOLDER = os.path.join(BASE_DIR, "uploads")
OUTPUT_FOLDER = os.path.join(BASE_DIR, "output")
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
os.makedirs(OUTPUT_FOLDER, exist_ok=True)
# =========================
# 1. 读取文件CSV/TXT/Excel
# =========================
def read_any(file_path):
ext = file_path.lower().split(".")[-1]
# Excel
if ext in ["xls", "xlsx"]:
df = pd.read_excel(file_path, dtype=str)
return df
# CSV / TXT
if ext in ["csv", "txt"]:
seps = [",", "|", "\t", " "]
for sep in seps:
try:
df = pd.read_csv(
file_path,
sep=sep,
engine="python",
dtype=str,
encoding="utf-8"
)
if df.shape[1] > 1:
return df
except:
continue
# fallback GBK
for sep in seps:
try:
df = pd.read_csv(
file_path,
sep=sep,
engine="python",
dtype=str,
encoding="gbk"
)
if df.shape[1] > 1:
return df
except:
continue
raise Exception("无法解析TXT/CSV结构")
raise Exception("不支持的文件类型")
# =========================
# 2. 转换核心
# =========================
def convert_file(df, target):
out_name = f"convert_{uuid.uuid4().hex}"
# Excel
if target == "excel":
path = os.path.join(OUTPUT_FOLDER, out_name + ".xlsx")
df.to_excel(path, index=False)
# CSV
elif target == "csv":
path = os.path.join(OUTPUT_FOLDER, out_name + ".csv")
df.to_csv(path, index=False, encoding="utf-8-sig")
# TXT默认 tab
elif target == "txt":
path = os.path.join(OUTPUT_FOLDER, out_name + ".txt")
df.to_csv(path, index=False, sep="\t", encoding="utf-8")
else:
raise Exception("未知转换类型")
return path, os.path.basename(path)
# =========================
# 3. 页面
# =========================
def file_convert_page():
preview = None
download_file = None
message = None
if request.method == "POST":
try:
file = request.files.get("file")
target = request.form.get("target")
if not file or not file.filename:
raise Exception("请选择文件")
filename = f"{uuid.uuid4()}_{file.filename}"
path = os.path.join(UPLOAD_FOLDER, filename)
file.save(path)
df = read_any(path)
preview = df.head(100).fillna("").to_html(
classes="table table-dark table-hover",
index=False
)
out_path, download_file = convert_file(df, target)
message = f"转换成功:{len(df)}"
system_logger.info(
f"[FileConvert] {file.filename} -> {target}"
)
except Exception as e:
message = str(e)
system_logger.exception("[FileConvert ERROR]")
return render_template(
"file_convert.html",
preview=preview,
download_file=download_file,
message=message
)
# =========================
# download
# =========================
def file_convert_download(filename):
path = os.path.join(OUTPUT_FOLDER, filename)
system_logger.info(f"[FileConvertDownload] {filename}")
return send_file(path, as_attachment=True)