|
|
import os
|
|
|
import uuid
|
|
|
import pandas as pd
|
|
|
|
|
|
from flask import render_template, request, send_file
|
|
|
from common.logger import system_logger
|
|
|
|
|
|
|
|
|
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
|
|
UPLOAD_FOLDER = os.path.join(BASE_DIR, "uploads")
|
|
|
OUTPUT_FOLDER = os.path.join(BASE_DIR, "output")
|
|
|
|
|
|
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
|
|
|
os.makedirs(OUTPUT_FOLDER, exist_ok=True)
|
|
|
|
|
|
|
|
|
# =========================
|
|
|
# 1. 读取文件(CSV/TXT/Excel)
|
|
|
# =========================
|
|
|
def read_any(file_path):
|
|
|
|
|
|
ext = file_path.lower().split(".")[-1]
|
|
|
|
|
|
# Excel
|
|
|
if ext in ["xls", "xlsx"]:
|
|
|
df = pd.read_excel(file_path, dtype=str)
|
|
|
return df
|
|
|
|
|
|
# CSV / TXT
|
|
|
if ext in ["csv", "txt"]:
|
|
|
|
|
|
seps = [",", "|", "\t", " "]
|
|
|
|
|
|
for sep in seps:
|
|
|
try:
|
|
|
df = pd.read_csv(
|
|
|
file_path,
|
|
|
sep=sep,
|
|
|
engine="python",
|
|
|
dtype=str,
|
|
|
encoding="utf-8"
|
|
|
)
|
|
|
|
|
|
if df.shape[1] > 1:
|
|
|
return df
|
|
|
|
|
|
except:
|
|
|
continue
|
|
|
|
|
|
# fallback GBK
|
|
|
for sep in seps:
|
|
|
try:
|
|
|
df = pd.read_csv(
|
|
|
file_path,
|
|
|
sep=sep,
|
|
|
engine="python",
|
|
|
dtype=str,
|
|
|
encoding="gbk"
|
|
|
)
|
|
|
|
|
|
if df.shape[1] > 1:
|
|
|
return df
|
|
|
|
|
|
except:
|
|
|
continue
|
|
|
|
|
|
raise Exception("无法解析TXT/CSV结构")
|
|
|
|
|
|
raise Exception("不支持的文件类型")
|
|
|
|
|
|
|
|
|
# =========================
|
|
|
# 2. 转换核心
|
|
|
# =========================
|
|
|
def convert_file(df, target):
|
|
|
|
|
|
out_name = f"convert_{uuid.uuid4().hex}"
|
|
|
|
|
|
# Excel
|
|
|
if target == "excel":
|
|
|
path = os.path.join(OUTPUT_FOLDER, out_name + ".xlsx")
|
|
|
df.to_excel(path, index=False)
|
|
|
|
|
|
# CSV
|
|
|
elif target == "csv":
|
|
|
path = os.path.join(OUTPUT_FOLDER, out_name + ".csv")
|
|
|
df.to_csv(path, index=False, encoding="utf-8-sig")
|
|
|
|
|
|
# TXT(默认 tab)
|
|
|
elif target == "txt":
|
|
|
path = os.path.join(OUTPUT_FOLDER, out_name + ".txt")
|
|
|
df.to_csv(path, index=False, sep="\t", encoding="utf-8")
|
|
|
|
|
|
else:
|
|
|
raise Exception("未知转换类型")
|
|
|
|
|
|
return path, os.path.basename(path)
|
|
|
|
|
|
|
|
|
# =========================
|
|
|
# 3. 页面
|
|
|
# =========================
|
|
|
def file_convert_page():
|
|
|
|
|
|
preview = None
|
|
|
download_file = None
|
|
|
message = None
|
|
|
|
|
|
if request.method == "POST":
|
|
|
|
|
|
try:
|
|
|
file = request.files.get("file")
|
|
|
target = request.form.get("target")
|
|
|
|
|
|
if not file or not file.filename:
|
|
|
raise Exception("请选择文件")
|
|
|
|
|
|
filename = f"{uuid.uuid4()}_{file.filename}"
|
|
|
path = os.path.join(UPLOAD_FOLDER, filename)
|
|
|
file.save(path)
|
|
|
|
|
|
df = read_any(path)
|
|
|
|
|
|
preview = df.head(100).fillna("").to_html(
|
|
|
classes="table table-dark table-hover",
|
|
|
index=False
|
|
|
)
|
|
|
|
|
|
out_path, download_file = convert_file(df, target)
|
|
|
|
|
|
message = f"转换成功:{len(df)} 行"
|
|
|
|
|
|
system_logger.info(
|
|
|
f"[FileConvert] {file.filename} -> {target}"
|
|
|
)
|
|
|
|
|
|
except Exception as e:
|
|
|
message = str(e)
|
|
|
system_logger.exception("[FileConvert ERROR]")
|
|
|
|
|
|
return render_template(
|
|
|
"file_convert.html",
|
|
|
preview=preview,
|
|
|
download_file=download_file,
|
|
|
message=message
|
|
|
)
|
|
|
|
|
|
|
|
|
# =========================
|
|
|
# download
|
|
|
# =========================
|
|
|
def file_convert_download(filename):
|
|
|
|
|
|
path = os.path.join(OUTPUT_FOLDER, filename)
|
|
|
|
|
|
system_logger.info(f"[FileConvertDownload] {filename}")
|
|
|
|
|
|
return send_file(path, as_attachment=True) |