import os import re import json import time import uuid import smtplib import threading import traceback from email.mime.text import MIMEText from email.header import Header from flask import render_template, request, jsonify from common.logger import system_logger # ===================================== # Path # ===================================== BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) UPLOAD_FOLDER = os.path.join(BASE_DIR, "uploads") PROGRESS_FOLDER = os.path.join(BASE_DIR, "logs", "progress") os.makedirs(UPLOAD_FOLDER, exist_ok=True) os.makedirs(PROGRESS_FOLDER, exist_ok=True) # ===================================== # 进度文件存储(跨进程共享) # ===================================== def progress_path(task_id): return os.path.join(PROGRESS_FOLDER, f"{task_id}.json") def write_progress(task_id, **kwargs): path = progress_path(task_id) data = {} if os.path.exists(path): try: with open(path, "r", encoding="utf-8") as f: data = json.load(f) # results 列表累积 old_results = data.get("results", []) except Exception: old_results = [] else: old_results = [] data.update(kwargs) # 处理 results 追加 results_append = data.pop("results_append", None) if results_append: old_results.append(results_append) data["results"] = old_results elif "results" not in data: data["results"] = old_results with open(path, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False) def read_progress(task_id): path = progress_path(task_id) if not os.path.exists(path): return None with open(path, "r", encoding="utf-8") as f: return json.load(f) def clean_progress(task_id): path = progress_path(task_id) if os.path.exists(path): os.remove(path) # ===================================== # TXT 解析 # ===================================== def parse_users(file_path): users = [] with open(file_path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue parts = re.split(r"[\s\t\|:;]+", line) if len(parts) < 2: continue email = parts[0].strip() password = parts[1].strip() if not email or "@" not in email: continue users.append({"email": email, "password": password}) return users # ===================================== # 清理 textarea 缩进 # ===================================== def normalize_template(text): if not text: return "" lines = text.splitlines() while lines and not lines[0].strip(): lines.pop(0) while lines and not lines[-1].strip(): lines.pop() return "\n".join(lines) # ===================================== # 模板变量替换 # ===================================== def render_template_text(template, email, password): template = normalize_template(template) return ( str(template) .replace("{{email}}", str(email or "")) .replace("{{password}}", str(password or "")) ) # ===================================== # SMTP 连接 # ===================================== def build_smtp(smtp_server, smtp_port, sender_email, sender_pass): smtp_port = int(smtp_port) try: if smtp_port == 465: server = smtplib.SMTP_SSL(smtp_server, smtp_port, timeout=30) else: server = smtplib.SMTP(smtp_server, smtp_port, timeout=30) server.ehlo() if smtp_port == 587: server.starttls() server.ehlo() server.login(sender_email, sender_pass) return server except Exception as e: raise Exception(f"SMTP连接失败:{smtp_server}:{smtp_port} {str(e)}") # ===================================== # 发单封邮件 # ===================================== def send_single_mail(server, sender_email, receiver_email, subject, content, reply_to=None): msg = MIMEText(content, "plain", "utf-8") msg["From"] = sender_email msg["To"] = receiver_email msg["Subject"] = Header(subject or "邮件通知", "utf-8") if reply_to: msg["Reply-To"] = reply_to server.sendmail(sender_email, [receiver_email], msg.as_string()) def send_with_retry(server, sender_email, receiver_email, subject, content, reply_to=None, max_retries=3): last_error = None for attempt in range(1, max_retries + 1): try: send_single_mail(server, sender_email, receiver_email, subject, content, reply_to) return True, None except smtplib.SMTPServerDisconnected: last_error = "SMTPServerDisconnected" if attempt < max_retries: time.sleep(2 ** attempt) except smtplib.SMTPResponseException as e: code = e.smtp_code if 400 <= code < 500 and attempt < max_retries: last_error = f"SMTP {code}" time.sleep(2 ** attempt) else: return False, str(e) except Exception as e: return False, str(e) return False, last_error # ===================================== # 异步后台任务:批量发送 # ===================================== def batch_send_worker( task_id, smtp_server, smtp_port, sender_email, sender_pass, subject, template_content, users, reply_to, ): server = None total = len(users) success_count = 0 failed_count = 0 write_progress(task_id, status="running", total=total, success=0, failed=0, current=0, results=[]) try: server = build_smtp(smtp_server, smtp_port, sender_email, sender_pass) for idx, user in enumerate(users): email = user.get("email", "") password = user.get("password", "") ok, err = send_with_retry( server, sender_email, email, subject, render_template_text(template_content, email, password), reply_to=reply_to ) if ok: success_count += 1 result = {"email": email, "status": "success"} write_progress(task_id, success=success_count, current=idx + 1, results_append=result ) else: failed_count += 1 result = {"email": email, "status": f"failed: {err}"} write_progress(task_id, failed=failed_count, current=idx + 1, results_append=result ) # 每封间隔 1s 防垃圾标记 time.sleep(1) except Exception as e: write_progress(task_id, error=str(e)) finally: if server: try: server.quit() except Exception: pass write_progress(task_id, status="done") # ===================================== # 进度查询接口 # ===================================== def mail_notify_progress(task_id): data = read_progress(task_id) if data is None: return jsonify({"error": "task_id not found"}), 404 # 只返回最近 50 条结果 if "results" in data: data["results"] = data["results"][-50:] return jsonify(data) # ===================================== # 页面入口 # ===================================== def mail_notify_page(): message = None preview_content = "" task_id = None smtp_server = "smtp.ctvit.com.cn" smtp_port = "25" reply_to = "" sender_email = "" sender_pass = "" subject = "账号及口令通知" template_content = """尊敬的老师/同事: 您好! 根据系统账号开通安排,您的账号已创建完成。现将相关登录信息通知如下,请妥善保存并注意账号安全。 ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 账号(邮箱):{{email}} 初始口令:{{password}} ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 请在首次登录后及时修改初始口令,并妥善保管个人账号信息。 此致 敬礼! 信息技术支持中心""" try: if request.method == "POST": action = request.form.get("action", "send") smtp_server = (request.form.get("smtp_server") or smtp_server).strip() smtp_port = (request.form.get("smtp_port") or smtp_port).strip() sender_email = (request.form.get("sender_email") or "").strip() sender_pass = (request.form.get("sender_pass") or "").strip() subject = (request.form.get("subject") or subject).strip() reply_to = (request.form.get("reply_to") or "").strip() template_content = request.form.get("template_content") or template_content upload_file = request.files.get("user_file") or request.files.get("file") if not sender_email: raise Exception("请输入发件邮箱") if not sender_pass: raise Exception("请输入SMTP密码") if action == "test": preview_content = render_template_text(template_content, sender_email, "123456") server = None try: server = build_smtp(smtp_server, smtp_port, sender_email, sender_pass) send_single_mail(server, sender_email, sender_email, f"[测试] {subject}", preview_content, reply_to=reply_to) finally: if server: server.quit() message = "✅ 测试邮件发送成功(已发送至自己邮箱)" system_logger.info(f"[MailNotify-Test] sender={sender_email}") elif action == "send": if not upload_file or not upload_file.filename: raise Exception("请上传 TXT 文件") ext = os.path.splitext(upload_file.filename)[1].lower() if ext != ".txt": raise Exception("仅支持 txt 文件") temp_name = f"{uuid.uuid4().hex}.txt" save_path = os.path.join(UPLOAD_FOLDER, temp_name) upload_file.save(save_path) users = parse_users(save_path) if not users: raise Exception("未识别到有效用户数据") task_id = uuid.uuid4().hex write_progress(task_id, status="queued", total=len(users), success=0, failed=0, current=0, results=[], error=None ) thread = threading.Thread( target=batch_send_worker, args=( task_id, smtp_server, smtp_port, sender_email, sender_pass, subject, template_content, users, reply_to, ), daemon=True, ) thread.start() message = f"⏳ 批量发送已启动,任务ID:{task_id}(共 {len(users)} 封,每封间隔1s)" system_logger.info(f"[MailNotify] task={task_id} total={len(users)} sender={sender_email}") preview_content = render_template_text(template_content, "demo@test.com", "123456") except Exception as e: message = f"❌ 错误:{str(e)}" system_logger.exception("[MailNotify ERROR]") # 不回传密码 return render_template( "mail_notify.html", message=message, smtp_server=smtp_server, smtp_port=smtp_port, sender_email=sender_email, sender_pass="", reply_to=reply_to, subject=subject, template_content=template_content, preview_content=preview_content, task_id=task_id, )