You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

333 lines
7.9 KiB

This file contains invisible Unicode characters!

This file contains invisible Unicode characters that may be processed differently from what appears below. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to reveal hidden characters.

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import os
import uuid
import pandas as pd
import re
from flask import render_template, request, send_file
from common.logger import system_logger
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
UPLOAD_FOLDER = os.path.join(BASE_DIR, "uploads")
OUTPUT_FOLDER = os.path.join(BASE_DIR, "output")
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
os.makedirs(OUTPUT_FOLDER, exist_ok=True)
# =========================
# 1. 智能读取v2.3 FINAL
# =========================
def smart_read(file_path):
ext = os.path.splitext(file_path)[-1].lower()
# =========================
# 1⃣ CSV / Excel强制 header
# =========================
if ext in [".csv", ".xlsx", ".xls"]:
try:
if ext == ".csv":
df = pd.read_csv(file_path, dtype=str)
else:
df = pd.read_excel(file_path, dtype=str)
except Exception as e:
raise Exception(f"读取失败: {e}")
has_header = True
# 清洗
df.columns = [str(c).strip() for c in df.columns]
for c in df.columns:
df[c] = df[c].astype(str).str.strip()
system_logger.info(
f"[SmartRead-v2.3] file={file_path} type={ext} header=FORCED shape={df.shape}"
)
return df, has_header
# =========================
# 2⃣ TXT / 非结构文件:智能识别
# =========================
seps = [",", "|", "\t", r"\s+"]
df = None
for sep in seps:
try:
df = pd.read_csv(
file_path,
sep=sep,
engine="python",
header=None,
dtype=str
)
if df is not None and df.shape[1] >= 2:
break
except:
continue
if df is None:
raise Exception("无法解析文件")
# =========================
# 3⃣ header 判断(仅 TXT 使用)
# =========================
def is_header_row(row):
row = [str(x).strip() for x in row]
keywords = ["id", "uid", "name", "code", "工号", "编号", "time", "date"]
text_score = 0
structure_score = 0
numeric_penalty = 0
for v in row:
v_low = v.lower()
if any(k in v_low for k in keywords):
text_score += 2
if not re.fullmatch(r"\d+", v):
structure_score += 1
if len(v) <= 20:
structure_score += 1
if re.fullmatch(r"\d{4,20}", v):
numeric_penalty += 1
score = text_score + structure_score - numeric_penalty
return score >= 3
has_header = False
if len(df) >= 2:
row0 = df.iloc[0].tolist()
row1 = df.iloc[1].tolist()
score0 = is_header_row(row0)
score1 = is_header_row(row1)
has_header = score0 and not score1
# =========================
# 4⃣ 应用 schema
# =========================
if has_header:
df.columns = [str(x).strip() for x in df.iloc[0].tolist()]
df = df.iloc[1:].reset_index(drop=True)
else:
df.columns = [f"c{i}" for i in range(df.shape[1])]
# =========================
# 5⃣ 清洗
# =========================
df.columns = [str(c).strip() for c in df.columns]
for c in df.columns:
df[c] = df[c].astype(str).str.strip()
system_logger.info(
f"[SmartRead-v2.3] file={file_path} type=txt header={has_header} shape={df.shape}"
)
return df, has_header
# =========================
# 2. 特征评分(不变)
# =========================
def score_column(series):
s = series.dropna().astype(str).head(30)
if len(s) == 0:
return 0
score = 0
for v in s:
if re.fullmatch(r"\d{6,12}", v):
score += 3
if v.isdigit():
score += 2
if v.startswith("0"):
score += 1
if len(v) in [6, 8, 10, 12]:
score += 1
return score / len(s)
# =========================
# 3. 自动 key不变
# =========================
def detect_key(df):
candidates = ["uid", "id", "user_id", "emp_id", "工号", "编号"]
for col in df.columns:
for c in candidates:
if c in str(col).lower():
return col
best_col = None
best_score = 0
for col in df.columns:
score = score_column(df[col])
if score > best_score:
best_score = score
best_col = col
if best_score > 0.4:
return best_col
return None
# =========================
# 4. join不变
# =========================
def auto_join(df1, df2):
key1 = detect_key(df1)
key2 = detect_key(df2)
if not key1 or not key2:
raise Exception("无法识别匹配字段")
result = pd.merge(
df1,
df2,
left_on=key1,
right_on=key2,
how="left",
indicator=True
)
result["match_status"] = result["_merge"].map({
"both": "matched",
"left_only": "unmatched",
"right_only": "orphan"
}).astype(str)
result.drop(columns=["_merge"], inplace=True)
matched_count = int((result["match_status"] == "matched").sum())
unmatched_count = int((result["match_status"] == "unmatched").sum())
total = len(result)
match_rate = round(matched_count / total * 100, 2) if total else 0
stats = {
"matched_count": matched_count,
"unmatched_count": unmatched_count,
"match_rate": match_rate
}
return result, key1, key2, stats
# =========================
# 5. 页面(不变)
# =========================
def smart_data_page_v2():
preview = None
download_file = None
message = None
match_info = ""
matched_count = 0
unmatched_count = 0
match_rate = 0
if request.method == "POST":
try:
file1 = request.files.get("file1")
file2 = request.files.get("file2")
p1 = os.path.join(UPLOAD_FOLDER, f"{uuid.uuid4()}_{file1.filename}")
p2 = os.path.join(UPLOAD_FOLDER, f"{uuid.uuid4()}_{file2.filename}")
file1.save(p1)
file2.save(p2)
df1, h1 = smart_read(p1)
df2, h2 = smart_read(p2)
result, k1, k2, stats = auto_join(df1, df2)
matched_count = stats["matched_count"]
unmatched_count = stats["unmatched_count"]
match_rate = stats["match_rate"]
match_info = (
f"匹配字段:{k1}{k2} "
f"匹配率:{match_rate}% "
f"来源:{'有表头' if h1 else '无表头'} / {'有表头' if h2 else '无表头'}"
)
out_name = f"v2_{uuid.uuid4().hex}.xlsx"
out_path = os.path.join(OUTPUT_FOLDER, out_name)
result.to_excel(out_path, index=False)
preview = result.head(100).fillna("").to_html(
classes="table table-dark table-hover",
index=False
)
download_file = out_name
message = f"匹配成功,共 {len(result)} 条记录"
system_logger.info(
f"[SmartETL-v2.3] {match_info} "
f"matched={matched_count}, unmatched={unmatched_count}"
)
except Exception as e:
message = str(e)
system_logger.exception("[SmartETL-v2 ERROR]")
return render_template(
"smart_data_v2.html",
preview=preview,
download_file=download_file,
message=message,
match_info=match_info,
matched_count=matched_count,
unmatched_count=unmatched_count,
match_rate=match_rate
)
# =========================
# download
# =========================
def smart_download_v2(filename):
path = os.path.join(OUTPUT_FOLDER, filename)
system_logger.info(f"[SmartDownload-v2] {filename}")
return send_file(path, as_attachment=True)