#!/usr/bin/env python3 """ 邮件批量通知工具 - CLI 版 用于在新系统切换时批量给全员发送新的账号口令通知 Usage: python3 mail_sender.py users.txt python3 mail_sender.py users.txt --smtp smtp.ctvit.com.cn --port 465 python3 mail_sender.py users.txt --sender zhangguozhong@ctvit.com.cn TXT 格式:每行 邮箱 密码(空格 / tab / | / , 都支持) """ import os import re import sys import time import argparse import smtplib import logging from datetime import datetime from email.mime.text import MIMEText from email.header import Header # ===================================== # 日志配置 # ===================================== logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) logger = logging.getLogger("MailSender") # ===================================== # 邮件模板 # ===================================== DEFAULT_TEMPLATE = """亲爱的用户: 您好!这是您的账号配置信息: 您的邮件账号:{email} 您的初始口令:{password} 请务必妥善保管您的账号信息,不要泄露给他人。 如有疑问,请联系技术支持部门。 --- 此邮件为系统自动发送,请勿直接回复。 """ # ===================================== # TXT 解析 # ===================================== def parse_users(file_path): users = [] with open(file_path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line or line.startswith("#"): continue parts = re.split(r"[\s\t\|:;]+", line) if len(parts) < 2: continue email = parts[0].strip() password = parts[1].strip() if not email or "@" not in email: logger.warning(f"跳过无效行(非邮箱格式): {line}") continue users.append({"email": email, "password": password}) return users # ===================================== # SMTP 连接(自动探测端口) # ===================================== def build_smtp(smtp_server, smtp_port, sender_email, sender_pass): smtp_port = int(smtp_port) # 465 → SSL if smtp_port == 465: server = smtplib.SMTP_SSL(smtp_server, smtp_port, timeout=30) logger.info(f"📡 连接 {smtp_server}:{smtp_port} (SSL)") server.login(sender_email, sender_pass) return server # 587 → STARTTLS if smtp_port == 587: server = smtplib.SMTP(smtp_server, smtp_port, timeout=30) server.ehlo() server.starttls() server.ehlo() logger.info(f"📡 连接 {smtp_server}:{smtp_port} (STARTTLS)") server.login(sender_email, sender_pass) return server # 25 → 明文 server = smtplib.SMTP(smtp_server, smtp_port, timeout=30) logger.info(f"📡 连接 {smtp_server}:{smtp_port} (明文)") server.login(sender_email, sender_pass) return server # ===================================== # 单封发送(含重试) # ===================================== def send_mail(server, sender_email, receiver_email, subject, content, reply_to=None): msg = MIMEText(content, "plain", "utf-8") msg["From"] = sender_email msg["To"] = receiver_email msg["Subject"] = Header(subject or "邮件通知", "utf-8") if reply_to: msg["Reply-To"] = reply_to server.sendmail(sender_email, [receiver_email], msg.as_string()) def send_with_retry( server, sender_email, receiver_email, subject, content, reply_to=None, max_retries=3 ): last_error = None for attempt in range(1, max_retries + 1): try: send_mail(server, sender_email, receiver_email, subject, content, reply_to) return True, None except smtplib.SMTPServerDisconnected: last_error = "连接断开" if attempt < max_retries: wait = 2 ** attempt logger.warning(f" ⚠ 连接断开,{wait}s 后重试 (第{attempt}次)") time.sleep(wait) except smtplib.SMTPResponseException as e: code = e.smtp_code if 400 <= code < 500 and attempt < max_retries: last_error = f"SMTP {code}" wait = 2 ** attempt logger.warning(f" ⚠ SMTP {code},{wait}s 后重试 (第{attempt}次)") time.sleep(wait) else: return False, f"SMTP {code}: {e}" except Exception as e: return False, str(e) return False, last_error # ===================================== # 主流程 # ===================================== def main(): parser = argparse.ArgumentParser( description="📧 邮件批量通知工具 (CLI)", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 示例: python3 mail_sender.py users.txt python3 mail_sender.py users.txt --port 465 --sender xxx@qq.com python3 mail_sender.py users.txt --subject "系统账号通知" --delay 2 python3 mail_sender.py users.txt --template my_template.txt """, ) parser.add_argument("file", help="用户列表 TXT 文件路径(每行:邮箱 密码)") parser.add_argument("--smtp", default="smtp.ctvit.com.cn", help="SMTP 服务器地址(默认 smtp.ctvit.com.cn)") parser.add_argument("--port", type=int, default=25, help="SMTP 端口(默认 25,常见 465/587)") parser.add_argument("--sender", default="zhangguozhong@ctvit.com.cn", help="发件邮箱") parser.add_argument("--passwd", help="SMTP 密码/授权码(如不传则交互输入)") parser.add_argument("--subject", default="账号及口令通知", help="邮件主题") parser.add_argument("--reply-to", help="回复地址(可选)") parser.add_argument("--template", help="邮件模板文件路径(替代默认模板,支持 {email} {password})") parser.add_argument("--delay", type=float, default=1.0, help="每封邮件间隔秒数(默认 1s)") parser.add_argument("--retry", type=int, default=3, help="失败重试次数(默认 3)") parser.add_argument("--test", action="store_true", help="测试模式:只给发件人自己发一封测试邮件") args = parser.parse_args() # ─── 检查文件 ─── if not os.path.exists(args.file): logger.error(f"❌ 文件不存在: {args.file}") sys.exit(1) # ─── 读取模板 ─── if args.template: with open(args.template, "r", encoding="utf-8") as f: template = f.read() logger.info(f"📝 使用自定义模板: {args.template}") else: template = DEFAULT_TEMPLATE logger.info("📝 使用默认模板") # ─── 获取密码 ─── sender_pass = args.passwd if not sender_pass: import getpass sender_pass = getpass.getpass(f"🔑 请输入 {args.sender} 的 SMTP 密码/授权码: ") # ─── 解析用户列表 ─── users = parse_users(args.file) if not users: logger.error("❌ 未识别到有效用户数据") sys.exit(1) logger.info(f"📋 共读取到 {len(users)} 个用户") # ─── 测试发送 ─── if args.test: content = template.format(email=args.sender, password="123456") server = None try: server = build_smtp(args.smtp, args.port, args.sender, sender_pass) send_mail(server, args.sender, args.sender, f"[测试] {args.subject}", content, args.reply_to) logger.info(f"✅ 测试邮件发送成功(已发送至 {args.sender})") except Exception as e: logger.error(f"❌ 测试发送失败: {e}") sys.exit(1) finally: if server: server.quit() return # ─── 正式发送 ─── total = len(users) success_count = 0 failed_count = 0 failed_users = [] server = None try: server = build_smtp(args.smtp, args.port, args.sender, sender_pass) logger.info(f"✅ 登录成功!开始批量发送(共 {total} 封)...") print("─" * 50) for idx, user in enumerate(users, 1): email = user["email"] password = user["password"] content = template.format(email=email, password=password) ok, err = send_with_retry( server, args.sender, email, args.subject, content, reply_to=args.reply_to, max_retries=args.retry, ) if ok: success_count += 1 logger.info(f" [{idx}/{total}] ✅ {email}") else: failed_count += 1 failed_users.append({"email": email, "error": err}) logger.error(f" [{idx}/{total}] ❌ {email} → {err}") # 发送间隔 if idx < total: time.sleep(args.delay) except smtplib.SMTPAuthenticationError: logger.error("❌ SMTP 认证失败,请检查邮箱地址和密码/授权码是否正确") sys.exit(1) except Exception as e: logger.error(f"❌ 连接错误: {e}") sys.exit(1) finally: if server: server.quit() # ─── 汇总 ─── print("═" * 50) logger.info(f"📊 发送汇总:") logger.info(f" 总 数:{total}") logger.info(f" 成 功:{success_count}") logger.info(f" 失 败:{failed_count}") if failed_users: print() logger.warning("📋 失败人员列表:") for fu in failed_users: logger.warning(f" ❌ {fu['email']} → {fu['error']}") # 写入失败日志 log_path = f"failed_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt" with open(log_path, "w", encoding="utf-8") as f: for fu in failed_users: f.write(f"{fu['email']}\t{fu['error']}\n") logger.info(f"📁 失败详情已保存至: {log_path}") if failed_count == 0: logger.info("🎉 全部发送成功!") elif success_count > 0: logger.info(f"⚠️ 部分失败,请检查失败列表重发") if __name__ == "__main__": main()