|
|
#!/usr/bin/env python3
|
|
|
"""
|
|
|
邮件批量通知工具 - CLI 版
|
|
|
用于在新系统切换时批量给全员发送新的账号口令通知
|
|
|
|
|
|
Usage:
|
|
|
python3 mail_sender.py users.txt
|
|
|
python3 mail_sender.py users.txt --smtp smtp.ctvit.com.cn --port 465
|
|
|
python3 mail_sender.py users.txt --sender zhangguozhong@ctvit.com.cn
|
|
|
|
|
|
TXT 格式:每行 邮箱 密码(空格 / tab / | / , 都支持)
|
|
|
"""
|
|
|
|
|
|
import os
|
|
|
import re
|
|
|
import sys
|
|
|
import time
|
|
|
import argparse
|
|
|
import smtplib
|
|
|
import logging
|
|
|
from datetime import datetime
|
|
|
from email.mime.text import MIMEText
|
|
|
from email.header import Header
|
|
|
|
|
|
# =====================================
|
|
|
# 日志配置
|
|
|
# =====================================
|
|
|
logging.basicConfig(
|
|
|
level=logging.INFO,
|
|
|
format="%(asctime)s [%(levelname)s] %(message)s",
|
|
|
datefmt="%Y-%m-%d %H:%M:%S",
|
|
|
)
|
|
|
logger = logging.getLogger("MailSender")
|
|
|
|
|
|
|
|
|
# =====================================
|
|
|
# 邮件模板
|
|
|
# =====================================
|
|
|
DEFAULT_TEMPLATE = """亲爱的用户:
|
|
|
|
|
|
您好!这是您的账号配置信息:
|
|
|
|
|
|
您的邮件账号:{email}
|
|
|
您的初始口令:{password}
|
|
|
|
|
|
请务必妥善保管您的账号信息,不要泄露给他人。
|
|
|
如有疑问,请联系技术支持部门。
|
|
|
|
|
|
---
|
|
|
此邮件为系统自动发送,请勿直接回复。
|
|
|
"""
|
|
|
|
|
|
|
|
|
# =====================================
|
|
|
# TXT 解析
|
|
|
# =====================================
|
|
|
def parse_users(file_path):
|
|
|
users = []
|
|
|
with open(file_path, "r", encoding="utf-8") as f:
|
|
|
for line in f:
|
|
|
line = line.strip()
|
|
|
if not line or line.startswith("#"):
|
|
|
continue
|
|
|
parts = re.split(r"[\s\t\|:;]+", line)
|
|
|
if len(parts) < 2:
|
|
|
continue
|
|
|
email = parts[0].strip()
|
|
|
password = parts[1].strip()
|
|
|
if not email or "@" not in email:
|
|
|
logger.warning(f"跳过无效行(非邮箱格式): {line}")
|
|
|
continue
|
|
|
users.append({"email": email, "password": password})
|
|
|
return users
|
|
|
|
|
|
|
|
|
# =====================================
|
|
|
# SMTP 连接(自动探测端口)
|
|
|
# =====================================
|
|
|
def build_smtp(smtp_server, smtp_port, sender_email, sender_pass):
|
|
|
smtp_port = int(smtp_port)
|
|
|
|
|
|
# 465 → SSL
|
|
|
if smtp_port == 465:
|
|
|
server = smtplib.SMTP_SSL(smtp_server, smtp_port, timeout=30)
|
|
|
logger.info(f"📡 连接 {smtp_server}:{smtp_port} (SSL)")
|
|
|
server.login(sender_email, sender_pass)
|
|
|
return server
|
|
|
|
|
|
# 587 → STARTTLS
|
|
|
if smtp_port == 587:
|
|
|
server = smtplib.SMTP(smtp_server, smtp_port, timeout=30)
|
|
|
server.ehlo()
|
|
|
server.starttls()
|
|
|
server.ehlo()
|
|
|
logger.info(f"📡 连接 {smtp_server}:{smtp_port} (STARTTLS)")
|
|
|
server.login(sender_email, sender_pass)
|
|
|
return server
|
|
|
|
|
|
# 25 → 明文
|
|
|
server = smtplib.SMTP(smtp_server, smtp_port, timeout=30)
|
|
|
logger.info(f"📡 连接 {smtp_server}:{smtp_port} (明文)")
|
|
|
server.login(sender_email, sender_pass)
|
|
|
return server
|
|
|
|
|
|
|
|
|
# =====================================
|
|
|
# 单封发送(含重试)
|
|
|
# =====================================
|
|
|
def send_mail(server, sender_email, receiver_email, subject, content, reply_to=None):
|
|
|
msg = MIMEText(content, "plain", "utf-8")
|
|
|
msg["From"] = sender_email
|
|
|
msg["To"] = receiver_email
|
|
|
msg["Subject"] = Header(subject or "邮件通知", "utf-8")
|
|
|
if reply_to:
|
|
|
msg["Reply-To"] = reply_to
|
|
|
server.sendmail(sender_email, [receiver_email], msg.as_string())
|
|
|
|
|
|
|
|
|
def send_with_retry(
|
|
|
server, sender_email, receiver_email,
|
|
|
subject, content, reply_to=None, max_retries=3
|
|
|
):
|
|
|
last_error = None
|
|
|
for attempt in range(1, max_retries + 1):
|
|
|
try:
|
|
|
send_mail(server, sender_email, receiver_email, subject, content, reply_to)
|
|
|
return True, None
|
|
|
except smtplib.SMTPServerDisconnected:
|
|
|
last_error = "连接断开"
|
|
|
if attempt < max_retries:
|
|
|
wait = 2 ** attempt
|
|
|
logger.warning(f" ⚠ 连接断开,{wait}s 后重试 (第{attempt}次)")
|
|
|
time.sleep(wait)
|
|
|
except smtplib.SMTPResponseException as e:
|
|
|
code = e.smtp_code
|
|
|
if 400 <= code < 500 and attempt < max_retries:
|
|
|
last_error = f"SMTP {code}"
|
|
|
wait = 2 ** attempt
|
|
|
logger.warning(f" ⚠ SMTP {code},{wait}s 后重试 (第{attempt}次)")
|
|
|
time.sleep(wait)
|
|
|
else:
|
|
|
return False, f"SMTP {code}: {e}"
|
|
|
except Exception as e:
|
|
|
return False, str(e)
|
|
|
return False, last_error
|
|
|
|
|
|
|
|
|
# =====================================
|
|
|
# 主流程
|
|
|
# =====================================
|
|
|
def main():
|
|
|
parser = argparse.ArgumentParser(
|
|
|
description="📧 邮件批量通知工具 (CLI)",
|
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
|
epilog="""
|
|
|
示例:
|
|
|
python3 mail_sender.py users.txt
|
|
|
python3 mail_sender.py users.txt --port 465 --sender xxx@qq.com
|
|
|
python3 mail_sender.py users.txt --subject "系统账号通知" --delay 2
|
|
|
python3 mail_sender.py users.txt --template my_template.txt
|
|
|
""",
|
|
|
)
|
|
|
|
|
|
parser.add_argument("file", help="用户列表 TXT 文件路径(每行:邮箱 密码)")
|
|
|
parser.add_argument("--smtp", default="smtp.ctvit.com.cn", help="SMTP 服务器地址(默认 smtp.ctvit.com.cn)")
|
|
|
parser.add_argument("--port", type=int, default=25, help="SMTP 端口(默认 25,常见 465/587)")
|
|
|
parser.add_argument("--sender", default="zhangguozhong@ctvit.com.cn", help="发件邮箱")
|
|
|
parser.add_argument("--passwd", help="SMTP 密码/授权码(如不传则交互输入)")
|
|
|
parser.add_argument("--subject", default="账号及口令通知", help="邮件主题")
|
|
|
parser.add_argument("--reply-to", help="回复地址(可选)")
|
|
|
parser.add_argument("--template", help="邮件模板文件路径(替代默认模板,支持 {email} {password})")
|
|
|
parser.add_argument("--delay", type=float, default=1.0, help="每封邮件间隔秒数(默认 1s)")
|
|
|
parser.add_argument("--retry", type=int, default=3, help="失败重试次数(默认 3)")
|
|
|
parser.add_argument("--test", action="store_true", help="测试模式:只给发件人自己发一封测试邮件")
|
|
|
|
|
|
args = parser.parse_args()
|
|
|
|
|
|
# ─── 检查文件 ───
|
|
|
if not os.path.exists(args.file):
|
|
|
logger.error(f"❌ 文件不存在: {args.file}")
|
|
|
sys.exit(1)
|
|
|
|
|
|
# ─── 读取模板 ───
|
|
|
if args.template:
|
|
|
with open(args.template, "r", encoding="utf-8") as f:
|
|
|
template = f.read()
|
|
|
logger.info(f"📝 使用自定义模板: {args.template}")
|
|
|
else:
|
|
|
template = DEFAULT_TEMPLATE
|
|
|
logger.info("📝 使用默认模板")
|
|
|
|
|
|
# ─── 获取密码 ───
|
|
|
sender_pass = args.passwd
|
|
|
if not sender_pass:
|
|
|
import getpass
|
|
|
sender_pass = getpass.getpass(f"🔑 请输入 {args.sender} 的 SMTP 密码/授权码: ")
|
|
|
|
|
|
# ─── 解析用户列表 ───
|
|
|
users = parse_users(args.file)
|
|
|
if not users:
|
|
|
logger.error("❌ 未识别到有效用户数据")
|
|
|
sys.exit(1)
|
|
|
logger.info(f"📋 共读取到 {len(users)} 个用户")
|
|
|
|
|
|
# ─── 测试发送 ───
|
|
|
if args.test:
|
|
|
content = template.format(email=args.sender, password="123456")
|
|
|
server = None
|
|
|
try:
|
|
|
server = build_smtp(args.smtp, args.port, args.sender, sender_pass)
|
|
|
send_mail(server, args.sender, args.sender, f"[测试] {args.subject}", content, args.reply_to)
|
|
|
logger.info(f"✅ 测试邮件发送成功(已发送至 {args.sender})")
|
|
|
except Exception as e:
|
|
|
logger.error(f"❌ 测试发送失败: {e}")
|
|
|
sys.exit(1)
|
|
|
finally:
|
|
|
if server:
|
|
|
server.quit()
|
|
|
return
|
|
|
|
|
|
# ─── 正式发送 ───
|
|
|
total = len(users)
|
|
|
success_count = 0
|
|
|
failed_count = 0
|
|
|
failed_users = []
|
|
|
|
|
|
server = None
|
|
|
try:
|
|
|
server = build_smtp(args.smtp, args.port, args.sender, sender_pass)
|
|
|
logger.info(f"✅ 登录成功!开始批量发送(共 {total} 封)...")
|
|
|
print("─" * 50)
|
|
|
|
|
|
for idx, user in enumerate(users, 1):
|
|
|
email = user["email"]
|
|
|
password = user["password"]
|
|
|
content = template.format(email=email, password=password)
|
|
|
|
|
|
ok, err = send_with_retry(
|
|
|
server, args.sender, email,
|
|
|
args.subject, content,
|
|
|
reply_to=args.reply_to,
|
|
|
max_retries=args.retry,
|
|
|
)
|
|
|
|
|
|
if ok:
|
|
|
success_count += 1
|
|
|
logger.info(f" [{idx}/{total}] ✅ {email}")
|
|
|
else:
|
|
|
failed_count += 1
|
|
|
failed_users.append({"email": email, "error": err})
|
|
|
logger.error(f" [{idx}/{total}] ❌ {email} → {err}")
|
|
|
|
|
|
# 发送间隔
|
|
|
if idx < total:
|
|
|
time.sleep(args.delay)
|
|
|
|
|
|
except smtplib.SMTPAuthenticationError:
|
|
|
logger.error("❌ SMTP 认证失败,请检查邮箱地址和密码/授权码是否正确")
|
|
|
sys.exit(1)
|
|
|
except Exception as e:
|
|
|
logger.error(f"❌ 连接错误: {e}")
|
|
|
sys.exit(1)
|
|
|
finally:
|
|
|
if server:
|
|
|
server.quit()
|
|
|
|
|
|
# ─── 汇总 ───
|
|
|
print("═" * 50)
|
|
|
logger.info(f"📊 发送汇总:")
|
|
|
logger.info(f" 总 数:{total}")
|
|
|
logger.info(f" 成 功:{success_count}")
|
|
|
logger.info(f" 失 败:{failed_count}")
|
|
|
|
|
|
if failed_users:
|
|
|
print()
|
|
|
logger.warning("📋 失败人员列表:")
|
|
|
for fu in failed_users:
|
|
|
logger.warning(f" ❌ {fu['email']} → {fu['error']}")
|
|
|
# 写入失败日志
|
|
|
log_path = f"failed_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
|
|
|
with open(log_path, "w", encoding="utf-8") as f:
|
|
|
for fu in failed_users:
|
|
|
f.write(f"{fu['email']}\t{fu['error']}\n")
|
|
|
logger.info(f"📁 失败详情已保存至: {log_path}")
|
|
|
|
|
|
if failed_count == 0:
|
|
|
logger.info("🎉 全部发送成功!")
|
|
|
elif success_count > 0:
|
|
|
logger.info(f"⚠️ 部分失败,请检查失败列表重发")
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
main()
|