import os import uuid import pandas as pd import re from flask import render_template, request, send_file from common.logger import system_logger BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) UPLOAD_FOLDER = os.path.join(BASE_DIR, "uploads") OUTPUT_FOLDER = os.path.join(BASE_DIR, "output") os.makedirs(UPLOAD_FOLDER, exist_ok=True) os.makedirs(OUTPUT_FOLDER, exist_ok=True) # ========================= # 1. 智能读取(v2.3 FINAL) # ========================= def smart_read(file_path): ext = os.path.splitext(file_path)[-1].lower() # ========================= # 1️⃣ CSV / Excel:强制 header # ========================= if ext in [".csv", ".xlsx", ".xls"]: try: if ext == ".csv": df = pd.read_csv(file_path, dtype=str) else: df = pd.read_excel(file_path, dtype=str) except Exception as e: raise Exception(f"读取失败: {e}") has_header = True # 清洗 df.columns = [str(c).strip() for c in df.columns] for c in df.columns: df[c] = df[c].astype(str).str.strip() system_logger.info( f"[SmartRead-v2.3] file={file_path} type={ext} header=FORCED shape={df.shape}" ) return df, has_header # ========================= # 2️⃣ TXT / 非结构文件:智能识别 # ========================= seps = [",", "|", "\t", r"\s+"] df = None for sep in seps: try: df = pd.read_csv( file_path, sep=sep, engine="python", header=None, dtype=str ) if df is not None and df.shape[1] >= 2: break except: continue if df is None: raise Exception("无法解析文件") # ========================= # 3️⃣ header 判断(仅 TXT 使用) # ========================= def is_header_row(row): row = [str(x).strip() for x in row] keywords = ["id", "uid", "name", "code", "工号", "编号", "time", "date"] text_score = 0 structure_score = 0 numeric_penalty = 0 for v in row: v_low = v.lower() if any(k in v_low for k in keywords): text_score += 2 if not re.fullmatch(r"\d+", v): structure_score += 1 if len(v) <= 20: structure_score += 1 if re.fullmatch(r"\d{4,20}", v): numeric_penalty += 1 score = text_score + structure_score - numeric_penalty return score >= 3 has_header = False if len(df) >= 2: row0 = df.iloc[0].tolist() row1 = df.iloc[1].tolist() score0 = is_header_row(row0) score1 = is_header_row(row1) has_header = score0 and not score1 # ========================= # 4️⃣ 应用 schema # ========================= if has_header: df.columns = [str(x).strip() for x in df.iloc[0].tolist()] df = df.iloc[1:].reset_index(drop=True) else: df.columns = [f"c{i}" for i in range(df.shape[1])] # ========================= # 5️⃣ 清洗 # ========================= df.columns = [str(c).strip() for c in df.columns] for c in df.columns: df[c] = df[c].astype(str).str.strip() system_logger.info( f"[SmartRead-v2.3] file={file_path} type=txt header={has_header} shape={df.shape}" ) return df, has_header # ========================= # 2. 特征评分(不变) # ========================= def score_column(series): s = series.dropna().astype(str).head(30) if len(s) == 0: return 0 score = 0 for v in s: if re.fullmatch(r"\d{6,12}", v): score += 3 if v.isdigit(): score += 2 if v.startswith("0"): score += 1 if len(v) in [6, 8, 10, 12]: score += 1 return score / len(s) # ========================= # 3. 自动 key(不变) # ========================= def detect_key(df): candidates = ["uid", "id", "user_id", "emp_id", "工号", "编号"] for col in df.columns: for c in candidates: if c in str(col).lower(): return col best_col = None best_score = 0 for col in df.columns: score = score_column(df[col]) if score > best_score: best_score = score best_col = col if best_score > 0.4: return best_col return None # ========================= # 4. join(不变) # ========================= def auto_join(df1, df2): key1 = detect_key(df1) key2 = detect_key(df2) if not key1 or not key2: raise Exception("无法识别匹配字段") result = pd.merge( df1, df2, left_on=key1, right_on=key2, how="left", indicator=True ) result["match_status"] = result["_merge"].map({ "both": "matched", "left_only": "unmatched", "right_only": "orphan" }).astype(str) result.drop(columns=["_merge"], inplace=True) matched_count = int((result["match_status"] == "matched").sum()) unmatched_count = int((result["match_status"] == "unmatched").sum()) total = len(result) match_rate = round(matched_count / total * 100, 2) if total else 0 stats = { "matched_count": matched_count, "unmatched_count": unmatched_count, "match_rate": match_rate } return result, key1, key2, stats # ========================= # 5. 页面(不变) # ========================= def smart_data_page_v2(): preview = None download_file = None message = None match_info = "" matched_count = 0 unmatched_count = 0 match_rate = 0 if request.method == "POST": try: file1 = request.files.get("file1") file2 = request.files.get("file2") p1 = os.path.join(UPLOAD_FOLDER, f"{uuid.uuid4()}_{file1.filename}") p2 = os.path.join(UPLOAD_FOLDER, f"{uuid.uuid4()}_{file2.filename}") file1.save(p1) file2.save(p2) df1, h1 = smart_read(p1) df2, h2 = smart_read(p2) result, k1, k2, stats = auto_join(df1, df2) matched_count = stats["matched_count"] unmatched_count = stats["unmatched_count"] match_rate = stats["match_rate"] match_info = ( f"匹配字段:{k1} ↔ {k2} | " f"匹配率:{match_rate}% | " f"来源:{'有表头' if h1 else '无表头'} / {'有表头' if h2 else '无表头'}" ) out_name = f"v2_{uuid.uuid4().hex}.xlsx" out_path = os.path.join(OUTPUT_FOLDER, out_name) result.to_excel(out_path, index=False) preview = result.head(100).fillna("").to_html( classes="table table-dark table-hover", index=False ) download_file = out_name message = f"匹配成功,共 {len(result)} 条记录" system_logger.info( f"[SmartETL-v2.3] {match_info} " f"matched={matched_count}, unmatched={unmatched_count}" ) except Exception as e: message = str(e) system_logger.exception("[SmartETL-v2 ERROR]") return render_template( "smart_data_v2.html", preview=preview, download_file=download_file, message=message, match_info=match_info, matched_count=matched_count, unmatched_count=unmatched_count, match_rate=match_rate ) # ========================= # download # ========================= def smart_download_v2(filename): path = os.path.join(OUTPUT_FOLDER, filename) system_logger.info(f"[SmartDownload-v2] {filename}") return send_file(path, as_attachment=True)