You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

294 lines
10 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python3
"""
邮件批量通知工具 - CLI 版
用于在新系统切换时批量给全员发送新的账号口令通知
Usage:
python3 mail_sender.py users.txt
python3 mail_sender.py users.txt --smtp smtp.ctvit.com.cn --port 465
python3 mail_sender.py users.txt --sender zhangguozhong@ctvit.com.cn
TXT 格式:每行 邮箱 密码(空格 / tab / | / , 都支持)
"""
import os
import re
import sys
import time
import argparse
import smtplib
import logging
from datetime import datetime
from email.mime.text import MIMEText
from email.header import Header
# =====================================
# 日志配置
# =====================================
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger("MailSender")
# =====================================
# 邮件模板
# =====================================
DEFAULT_TEMPLATE = """亲爱的用户:
您好!这是您的账号配置信息:
您的邮件账号:{email}
您的初始口令:{password}
请务必妥善保管您的账号信息,不要泄露给他人。
如有疑问,请联系技术支持部门。
---
此邮件为系统自动发送,请勿直接回复。
"""
# =====================================
# TXT 解析
# =====================================
def parse_users(file_path):
users = []
with open(file_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
parts = re.split(r"[\s\t\|:;]+", line)
if len(parts) < 2:
continue
email = parts[0].strip()
password = parts[1].strip()
if not email or "@" not in email:
logger.warning(f"跳过无效行(非邮箱格式): {line}")
continue
users.append({"email": email, "password": password})
return users
# =====================================
# SMTP 连接(自动探测端口)
# =====================================
def build_smtp(smtp_server, smtp_port, sender_email, sender_pass):
smtp_port = int(smtp_port)
# 465 → SSL
if smtp_port == 465:
server = smtplib.SMTP_SSL(smtp_server, smtp_port, timeout=30)
logger.info(f"📡 连接 {smtp_server}:{smtp_port} (SSL)")
server.login(sender_email, sender_pass)
return server
# 587 → STARTTLS
if smtp_port == 587:
server = smtplib.SMTP(smtp_server, smtp_port, timeout=30)
server.ehlo()
server.starttls()
server.ehlo()
logger.info(f"📡 连接 {smtp_server}:{smtp_port} (STARTTLS)")
server.login(sender_email, sender_pass)
return server
# 25 → 明文
server = smtplib.SMTP(smtp_server, smtp_port, timeout=30)
logger.info(f"📡 连接 {smtp_server}:{smtp_port} (明文)")
server.login(sender_email, sender_pass)
return server
# =====================================
# 单封发送(含重试)
# =====================================
def send_mail(server, sender_email, receiver_email, subject, content, reply_to=None):
msg = MIMEText(content, "plain", "utf-8")
msg["From"] = sender_email
msg["To"] = receiver_email
msg["Subject"] = Header(subject or "邮件通知", "utf-8")
if reply_to:
msg["Reply-To"] = reply_to
server.sendmail(sender_email, [receiver_email], msg.as_string())
def send_with_retry(
server, sender_email, receiver_email,
subject, content, reply_to=None, max_retries=3
):
last_error = None
for attempt in range(1, max_retries + 1):
try:
send_mail(server, sender_email, receiver_email, subject, content, reply_to)
return True, None
except smtplib.SMTPServerDisconnected:
last_error = "连接断开"
if attempt < max_retries:
wait = 2 ** attempt
logger.warning(f" ⚠ 连接断开,{wait}s 后重试 (第{attempt}次)")
time.sleep(wait)
except smtplib.SMTPResponseException as e:
code = e.smtp_code
if 400 <= code < 500 and attempt < max_retries:
last_error = f"SMTP {code}"
wait = 2 ** attempt
logger.warning(f" ⚠ SMTP {code}{wait}s 后重试 (第{attempt}次)")
time.sleep(wait)
else:
return False, f"SMTP {code}: {e}"
except Exception as e:
return False, str(e)
return False, last_error
# =====================================
# 主流程
# =====================================
def main():
parser = argparse.ArgumentParser(
description="📧 邮件批量通知工具 (CLI)",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例:
python3 mail_sender.py users.txt
python3 mail_sender.py users.txt --port 465 --sender xxx@qq.com
python3 mail_sender.py users.txt --subject "系统账号通知" --delay 2
python3 mail_sender.py users.txt --template my_template.txt
""",
)
parser.add_argument("file", help="用户列表 TXT 文件路径(每行:邮箱 密码)")
parser.add_argument("--smtp", default="smtp.ctvit.com.cn", help="SMTP 服务器地址(默认 smtp.ctvit.com.cn")
parser.add_argument("--port", type=int, default=25, help="SMTP 端口(默认 25常见 465/587")
parser.add_argument("--sender", default="zhangguozhong@ctvit.com.cn", help="发件邮箱")
parser.add_argument("--passwd", help="SMTP 密码/授权码(如不传则交互输入)")
parser.add_argument("--subject", default="账号及口令通知", help="邮件主题")
parser.add_argument("--reply-to", help="回复地址(可选)")
parser.add_argument("--template", help="邮件模板文件路径(替代默认模板,支持 {email} {password}")
parser.add_argument("--delay", type=float, default=1.0, help="每封邮件间隔秒数(默认 1s")
parser.add_argument("--retry", type=int, default=3, help="失败重试次数(默认 3")
parser.add_argument("--test", action="store_true", help="测试模式:只给发件人自己发一封测试邮件")
args = parser.parse_args()
# ─── 检查文件 ───
if not os.path.exists(args.file):
logger.error(f"❌ 文件不存在: {args.file}")
sys.exit(1)
# ─── 读取模板 ───
if args.template:
with open(args.template, "r", encoding="utf-8") as f:
template = f.read()
logger.info(f"📝 使用自定义模板: {args.template}")
else:
template = DEFAULT_TEMPLATE
logger.info("📝 使用默认模板")
# ─── 获取密码 ───
sender_pass = args.passwd
if not sender_pass:
import getpass
sender_pass = getpass.getpass(f"🔑 请输入 {args.sender} 的 SMTP 密码/授权码: ")
# ─── 解析用户列表 ───
users = parse_users(args.file)
if not users:
logger.error("❌ 未识别到有效用户数据")
sys.exit(1)
logger.info(f"📋 共读取到 {len(users)} 个用户")
# ─── 测试发送 ───
if args.test:
content = template.format(email=args.sender, password="123456")
server = None
try:
server = build_smtp(args.smtp, args.port, args.sender, sender_pass)
send_mail(server, args.sender, args.sender, f"[测试] {args.subject}", content, args.reply_to)
logger.info(f"✅ 测试邮件发送成功(已发送至 {args.sender}")
except Exception as e:
logger.error(f"❌ 测试发送失败: {e}")
sys.exit(1)
finally:
if server:
server.quit()
return
# ─── 正式发送 ───
total = len(users)
success_count = 0
failed_count = 0
failed_users = []
server = None
try:
server = build_smtp(args.smtp, args.port, args.sender, sender_pass)
logger.info(f"✅ 登录成功!开始批量发送(共 {total} 封)...")
print("" * 50)
for idx, user in enumerate(users, 1):
email = user["email"]
password = user["password"]
content = template.format(email=email, password=password)
ok, err = send_with_retry(
server, args.sender, email,
args.subject, content,
reply_to=args.reply_to,
max_retries=args.retry,
)
if ok:
success_count += 1
logger.info(f" [{idx}/{total}] ✅ {email}")
else:
failed_count += 1
failed_users.append({"email": email, "error": err})
logger.error(f" [{idx}/{total}] ❌ {email}{err}")
# 发送间隔
if idx < total:
time.sleep(args.delay)
except smtplib.SMTPAuthenticationError:
logger.error("❌ SMTP 认证失败,请检查邮箱地址和密码/授权码是否正确")
sys.exit(1)
except Exception as e:
logger.error(f"❌ 连接错误: {e}")
sys.exit(1)
finally:
if server:
server.quit()
# ─── 汇总 ───
print("" * 50)
logger.info(f"📊 发送汇总:")
logger.info(f" 总 数:{total}")
logger.info(f" 成 功:{success_count}")
logger.info(f" 失 败:{failed_count}")
if failed_users:
print()
logger.warning("📋 失败人员列表:")
for fu in failed_users:
logger.warning(f"{fu['email']}{fu['error']}")
# 写入失败日志
log_path = f"failed_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
with open(log_path, "w", encoding="utf-8") as f:
for fu in failed_users:
f.write(f"{fu['email']}\t{fu['error']}\n")
logger.info(f"📁 失败详情已保存至: {log_path}")
if failed_count == 0:
logger.info("🎉 全部发送成功!")
elif success_count > 0:
logger.info(f"⚠️ 部分失败,请检查失败列表重发")
if __name__ == "__main__":
main()