8b5beb1d by 柴进

:bug: 补齐遗漏的启动必需模块 (修复 Mac 打包后启动失败)

根因: audit_logger.py / config_util.py / preflight.py 这三个启动
必需模块从未被 git 追踪过. Windows 构建机上这些文件在本地磁盘,
PyInstaller 能找到, 所以 Win 包正常; 但 Mac 拉代码后根目录缺这
三个文件, PyInstaller 的 Analysis 找不到 import 链, 构建要么失败
要么运行时 ImportError.

本次补齐:
- audit_logger.py — 审计日志单例 (NDJSON 本地队列 + 异步 MySQL 上传)
- config_util.py  — 跨平台配置路径解析与安全加载
- preflight.py    — 启动门禁 (config/DB/schema 校验)
- database_schema.sql — 运维参考
- migrations/2026-04-21_add_audit_log_columns.sql — 审计表迁移
- .gitignore      — 屏蔽 .venv/build/dist/logs/__pycache__ 等,
                    防止以后再漏推业务代码时被大量噪声淹没
1 parent 1bbb3c47
# Python
__pycache__/
*.pyc
*.pyo
*.backup
# Virtual environments
.venv/
venv/
# Build artifacts
build/
dist/
*.egg-info/
# IDE
.idea/
.vscode/
.claude/
# Runtime / user data
logs/
errorlog/
data/
images/
# OS
.DS_Store
Thumbs.db
"""
审计日志本地队列 + 后台上传 worker。
核心保证:事件一旦 log_use / log_login 返回,就已经 fsync 到本地 NDJSON 文件。
后台 worker 负责把本地队列异步上传到 MySQL;失败指数退避重试,成功后 compaction
重写队列文件删除已送达行。应用退出时 flush 一次尽量送达。
公开接口:
- init_audit_logger(db_config, queue_path, logs_dir): 启动单例
- get_audit_logger(): 获取单例(未初始化返回 None)
- AuditLogger.log_use(...)
- AuditLogger.log_login(...)
- AuditLogger.shutdown(timeout=5.0)
"""
from __future__ import annotations
import json
import logging
import os
import threading
import time
from datetime import datetime
from pathlib import Path
from typing import Optional
import pymysql
from PySide6.QtCore import QThread
logger = logging.getLogger(__name__)
_instance: Optional["AuditLogger"] = None
_instance_lock = threading.Lock()
def init_audit_logger(db_config: dict, queue_path: Path, logs_dir: Path) -> "AuditLogger":
"""在 preflight 通过后调用;幂等。"""
global _instance
with _instance_lock:
if _instance is None:
_instance = AuditLogger(db_config, queue_path, logs_dir)
_instance.start()
return _instance
def get_audit_logger() -> Optional["AuditLogger"]:
return _instance
class AuditLogger:
"""
对外门面。只负责:
1. 落盘(log_use / log_login, fsync 后返回)
2. 拉起/关闭 worker
真正上传逻辑在 _UploadWorker。
"""
def __init__(self, db_config: dict, queue_path: Path, logs_dir: Path):
self._db_config = db_config
self._queue_path = Path(queue_path)
self._logs_dir = Path(logs_dir)
self._file_lock = threading.Lock()
self._worker = _UploadWorker(
db_config=db_config,
queue_path=self._queue_path,
file_lock=self._file_lock,
)
def start(self) -> None:
self._queue_path.parent.mkdir(parents=True, exist_ok=True)
self._worker.start()
def log_use(
self,
user_name: str,
device_name: str,
prompt: str,
result_path: Optional[str],
status: str,
error_message: Optional[str],
model: Optional[str],
duration_ms: Optional[int],
finish_reason: Optional[str],
) -> None:
record = {
"kind": "use_log",
"ts": datetime.now().isoformat(timespec="seconds"),
"user_name": user_name or "未知用户",
"device_name": device_name or "未知设备",
"prompt": prompt or "",
"result_path": result_path,
"status": status,
"error_message": error_message,
"model": model,
"duration_ms": duration_ms,
"finish_reason": finish_reason,
}
self._append(record)
def log_login(
self,
user_name: str,
local_ip: Optional[str],
public_ip: Optional[str],
device_name: Optional[str],
) -> None:
record = {
"kind": "login_log",
"ts": datetime.now().isoformat(timespec="seconds"),
"user_name": user_name,
"local_ip": local_ip,
"public_ip": public_ip,
"device_name": device_name,
"login_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
}
self._append(record)
def shutdown(self, timeout: float = 5.0) -> None:
"""应用退出前调用,尽量 flush。"""
self._worker.stop(timeout)
def _append(self, record: dict) -> None:
"""落盘 + fsync。发生任何异常都不向上抛,但会落 error 日志(而不是 pass 吞掉)。"""
try:
line = json.dumps(record, ensure_ascii=False, default=str)
except Exception as e:
logger.error(f"审计事件序列化失败,已丢弃: {e}; record keys={list(record.keys())}")
return
try:
with self._file_lock:
with open(self._queue_path, "a", encoding="utf-8") as f:
f.write(line + "\n")
f.flush()
os.fsync(f.fileno())
self._worker.wake()
except Exception as e:
# 本地磁盘都写不进去,是真·严重故障。降级到日志文件,不再 raise
logger.error(f"审计事件落盘失败: {e}; 事件内容已写 error 日志兜底: {line[:200]}")
class _UploadWorker(QThread):
"""后台线程:循环 drain 队列文件 → 批量 INSERT → compaction。"""
def __init__(self, db_config: dict, queue_path: Path, file_lock: threading.Lock):
super().__init__()
self._db_config = db_config
self._queue_path = Path(queue_path)
self._file_lock = file_lock
self._stop_event = threading.Event()
self._wake_event = threading.Event()
self._backoff = 1.0
# --- 外部控制 ---
def wake(self) -> None:
self._wake_event.set()
def stop(self, timeout: float = 5.0) -> None:
self._stop_event.set()
self._wake_event.set()
self.wait(int(timeout * 1000))
# --- 主循环 ---
def run(self) -> None:
logger.info("audit UploadWorker started")
while not self._stop_event.is_set():
try:
sent, unsent = self._drain_once()
except Exception as e:
logger.error(f"audit drain 抛出未预期异常: {e}", exc_info=True)
sent, unsent = 0, 1 # 当做失败处理
if unsent > 0:
self._backoff = min(self._backoff * 2, 300.0)
logger.debug(f"audit: unsent={unsent}, backoff={self._backoff}s")
else:
self._backoff = 1.0
# 退出前再尝试一次 drain(worker stop 时)
if self._stop_event.is_set():
break
wait_s = self._backoff if unsent > 0 else 60.0
self._wake_event.wait(wait_s)
self._wake_event.clear()
# 退出前最后一次 drain
try:
self._drain_once()
except Exception:
pass
logger.info("audit UploadWorker stopped")
# --- 核心 drain ---
def _drain_once(self) -> tuple[int, int]:
"""
读快照 -> 批量上传 -> compaction。
返回 (sent_count, unsent_count)。
"""
# 1. 快照读
with self._file_lock:
if not self._queue_path.exists():
return 0, 0
eof_at_read = self._queue_path.stat().st_size
if eof_at_read == 0:
return 0, 0
with open(self._queue_path, "rb") as f:
head_bytes = f.read(eof_at_read)
try:
head_text = head_bytes.decode("utf-8")
except UnicodeDecodeError as e:
logger.error(f"audit 队列文件不是合法 UTF-8,跳过本轮: {e}")
return 0, 1
lines = [ln for ln in head_text.split("\n") if ln.strip()]
if not lines:
return 0, 0
# 2. 连 DB + 批量 INSERT
try:
conn = pymysql.connect(
host=self._db_config["host"],
port=int(self._db_config.get("port", 3306)),
user=self._db_config["user"],
password=self._db_config["password"],
database=self._db_config["database"],
connect_timeout=5,
read_timeout=10,
write_timeout=10,
charset="utf8mb4",
)
except Exception as e:
logger.warning(f"audit connect 失败,稍后重试: {e}")
return 0, len(lines)
sent = 0
unsent_lines: list[str] = []
try:
with conn.cursor() as cursor:
for i, line in enumerate(lines):
try:
record = json.loads(line)
except json.JSONDecodeError as e:
logger.error(f"audit 队列出现坏行,已跳过: {e}; line={line[:120]!r}")
# 不保留到 unsent(避免无限重试坏行)
continue
try:
self._insert_one(cursor, record)
sent += 1
except Exception as e:
logger.warning(
f"audit INSERT 失败(后续全部留队列): {type(e).__name__}: {e}"
)
unsent_lines = lines[i:]
break
conn.commit()
except Exception as e:
logger.warning(f"audit commit 失败: {e}")
unsent_lines = lines
sent = 0
finally:
try:
conn.close()
except Exception:
pass
# 3. Compaction:重写队列文件 = unsent_lines + 期间新增的 tail
with self._file_lock:
try:
# 读快照之后新增的尾部
current_size = self._queue_path.stat().st_size
tail = b""
if current_size > eof_at_read:
with open(self._queue_path, "rb") as f:
f.seek(eof_at_read)
tail = f.read()
with open(self._queue_path, "wb") as f:
for ln in unsent_lines:
f.write((ln + "\n").encode("utf-8"))
if tail:
f.write(tail)
f.flush()
os.fsync(f.fileno())
except Exception as e:
# compaction 失败:不致命,已发送的会在下次 drain 被重发(幂等性由
# MySQL auto-increment id 保障,不会真复制业务数据,仅审计可能重复一次)
logger.error(f"audit compaction 失败: {e}", exc_info=True)
if sent > 0:
logger.info(f"audit drained: sent={sent}, unsent={len(unsent_lines)}")
return sent, len(unsent_lines)
# --- 具体插入 ---
def _insert_one(self, cursor, record: dict) -> None:
kind = record.get("kind")
if kind == "use_log":
sql = """
INSERT INTO `nano_banana_user_use_log`
(`user_name`, `device_name`, `prompt`, `result_path`, `status`,
`error_message`, `model`, `duration_ms`, `finish_reason`)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(
sql,
(
record.get("user_name", "未知用户"),
record.get("device_name", "未知设备"),
record.get("prompt", ""),
record.get("result_path"),
record.get("status", "unknown"),
record.get("error_message"),
record.get("model"),
record.get("duration_ms"),
record.get("finish_reason"),
),
)
elif kind == "login_log":
sql = """
INSERT INTO `nano_banana_user_log`
(`user_name`, `local_ip`, `public_ip`, `device_name`, `login_time`)
VALUES (%s, %s, %s, %s, %s)
"""
login_time_val = record.get("login_time") or record.get("ts")
cursor.execute(
sql,
(
record.get("user_name"),
record.get("local_ip"),
record.get("public_ip"),
record.get("device_name"),
login_time_val,
),
)
else:
raise ValueError(f"未知审计事件 kind={kind!r}")
"""
config.json 安全加载器。
公开接口:
- DEFAULT_CONFIG: 默认配置常量
- load_config_safe(path): 区分处理各类 IO / JSON 错误,返回 (config, error_message)
- get_config_dir(): 跨平台返回配置目录
- get_config_path(): 返回 config.json 绝对路径
"""
from __future__ import annotations
import json
import os
import shutil
import sys
import platform
from datetime import datetime
from pathlib import Path
from typing import Tuple
DEFAULT_CONFIG: dict = {
"api_key": "",
"saved_prompts": [],
"db_config": None,
"last_user": "",
"saved_password_hash": "",
"logging_config": {
"enabled": True,
"level": "INFO",
"log_to_console": True,
},
"history_config": {
"max_history_count": 100,
},
}
def get_config_dir() -> Path:
"""跨平台返回用户级配置目录(与 image_generator.py 中 get_config_dir 一致)。"""
if getattr(sys, "frozen", False):
system = platform.system()
if system == "Darwin":
d = Path.home() / "Library" / "Application Support" / "ZB100ImageGenerator"
elif system == "Windows":
d = Path(os.getenv("APPDATA", str(Path.home()))) / "ZB100ImageGenerator"
else:
d = Path.home() / ".config" / "zb100imagegenerator"
else:
d = Path(".").resolve()
d.mkdir(parents=True, exist_ok=True)
return d
def get_config_path() -> Path:
return get_config_dir() / "config.json"
def load_config_safe(config_path: Path) -> Tuple[dict, str]:
"""
安全加载 config.json。
返回 (config, error):
- 成功:(合并后的 config dict, "")
- 失败但可恢复:(DEFAULT_CONFIG 副本, 错误描述);preflight 会根据返回
的 error 和 config 内容共同决定是否拦截启动
- 不抛异常
行为:
- 文件不存在 → (DEFAULT_CONFIG 副本, "") # 让 preflight 判断是否允许
- 空文件 / JSON 损坏 → 备份为 .bak.<timestamp> + 返回默认值 + error
- PermissionError / OSError → 返回默认值 + error(不备份,读都读不到)
- 顶层不是 object → 返回默认值 + error
- 正常 → defaults.update(loaded) 合并后返回(保留未知字段以兼容)
"""
config_path = Path(config_path)
if not config_path.exists():
return dict(DEFAULT_CONFIG), ""
try:
content = config_path.read_text(encoding="utf-8")
except PermissionError as e:
return dict(DEFAULT_CONFIG), f"permission denied: {e}"
except OSError as e:
return dict(DEFAULT_CONFIG), f"IO error: {e}"
if not content.strip():
_backup(config_path, reason="empty")
return dict(DEFAULT_CONFIG), "config.json was empty, using defaults"
try:
loaded = json.loads(content)
except json.JSONDecodeError as e:
_backup(config_path, reason="parse-error")
return dict(DEFAULT_CONFIG), f"JSON parse error, backed up to .bak: {e}"
if not isinstance(loaded, dict):
return dict(DEFAULT_CONFIG), "config.json top-level is not an object"
merged = dict(DEFAULT_CONFIG)
merged.update(loaded)
return merged, ""
def _backup(src: Path, reason: str) -> None:
"""把损坏 / 空的 config 文件备份,不抛异常。"""
try:
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
dst = src.with_suffix(f".json.bak.{reason}.{ts}")
shutil.copy2(src, dst)
except Exception:
pass
-- Nano Banana App Database Schema
-- 包含用户登录日志表和使用日志表
--
-- 表说明:
-- 1. nano_banana_user_log: 记录用户登录日志(IP地址、设备名称、登录时间)
-- 2. nano_banana_user_use_log: 记录用户生图操作日志(prompt、结果、状态、错误信息)
-- 创建用户登录日志表
CREATE TABLE `nano_banana_user_log` (
`user_name` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '用户名',
`local_ip` varchar(45) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '局域网IP地址',
`public_ip` varchar(45) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '公网IP地址(可为空)',
`device_name` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '设备名称',
`login_time` datetime COLLATE utf8mb4_unicode_ci DEFAULT CURRENT_TIMESTAMP COMMENT '登录时间',
INDEX `idx_user_name` (`user_name`),
INDEX `idx_login_time` (`login_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='用户登录日志表';
-- 数据迁移脚本(如果表已存在)
ALTER TABLE `nano_banana_user_log`
ADD COLUMN `local_ip` varchar(45) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '局域网IP地址' AFTER `user_name`,
ADD COLUMN `public_ip` varchar(45) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '公网IP地址' AFTER `local_ip`,
MODIFY COLUMN `device_name` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '设备名称',
ADD INDEX `idx_user_name` (`user_name`),
ADD INDEX `idx_login_time` (`login_time`);
-- 创建用户使用日志表
CREATE TABLE `nano_banana_user_use_log` (
`id` INT AUTO_INCREMENT PRIMARY KEY COMMENT '自增主键',
`record_time` DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '记录时间',
`user_name` VARCHAR(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '用户名',
`device_name` VARCHAR(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '设备名称',
`prompt` TEXT COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '用户请求的 Prompt',
`result_path` VARCHAR(512) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '返回数据地址(成功时为图片路径)',
`status` ENUM('success', 'failure') COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '操作状态',
`error_message` TEXT COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '失败时的错误信息',
INDEX `idx_user_name` (`user_name`),
INDEX `idx_record_time` (`record_time`),
INDEX `idx_status` (`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='用户使用日志表';
\ No newline at end of file
-- =====================================================================
-- Migration: add audit log columns (2026-04-21)
-- Change: add-audit-log-reliability
-- Purpose: 为审计日志表新增 model / duration_ms / finish_reason 三列,
-- 便于按模型、耗时、失败原因做分析和定位。
--
-- 执行者:@柴进(在 RDS 上执行,必须早于新版本下发)
-- 执行后:客户端新版启动时 preflight 会校验这三列存在,否则拦截启动。
-- =====================================================================
USE `saas_user`;
ALTER TABLE `nano_banana_user_use_log`
ADD COLUMN `model` VARCHAR(64) NULL COMMENT '本次生成使用的 Gemini 模型 ID',
ADD COLUMN `duration_ms` INT NULL COMMENT 'Worker start 到 emit 的耗时(毫秒)',
ADD COLUMN `finish_reason` VARCHAR(64) NULL COMMENT 'Gemini 响应 candidates[0].finish_reason';
-- 回滚(仅开发 / 灰度回退场景,生产不建议删列):
-- ALTER TABLE `nano_banana_user_use_log`
-- DROP COLUMN `model`,
-- DROP COLUMN `duration_ms`,
-- DROP COLUMN `finish_reason`;
"""
启动门禁:保证审计日志上传的所有前置条件都成立。
任一失败即阻止应用进入主流程,对用户只显示一句"应用启动失败,请联系 @柴进"。
详细错误脱敏后写入 logs/preflight_error.log。
"""
from __future__ import annotations
import logging
import re
import sys
import traceback
from datetime import datetime
from pathlib import Path
from typing import Tuple
import pymysql
from config_util import load_config_safe
logger = logging.getLogger(__name__)
REQUIRED_DB_FIELDS = ("host", "port", "user", "password", "database")
REQUIRED_TABLES = ("nano_banana_user_use_log", "nano_banana_user_log")
REQUIRED_USE_LOG_COLUMNS = (
"user_name", "device_name", "prompt", "result_path", "status",
"error_message", "model", "duration_ms", "finish_reason",
)
REQUIRED_LOGIN_LOG_COLUMNS = (
"user_name", "local_ip", "public_ip", "device_name", "login_time",
)
def preflight_check(config_path: Path, audit_queue_path: Path) -> Tuple[bool, str, dict]:
"""
返回 (ok, error_detail, config)。
- ok=True: 一切就绪,调用方可以继续启动
- ok=False: error_detail 为详细错误描述(未脱敏;handle_preflight_failure 会脱敏后落盘)
- config: 成功时为可用 config dict;失败时可能为部分加载或 DEFAULT_CONFIG
"""
# 1. config.json
try:
config, load_err = load_config_safe(config_path)
except Exception as e:
return False, f"config load crashed:\n{traceback.format_exc()}", {}
if load_err:
return False, f"config load error: {load_err}", config
# 2. db_config 字段完整
db = config.get("db_config")
if not db or not isinstance(db, dict):
return False, "config.json 缺少 db_config 字段或格式错误", config
missing = [k for k in REQUIRED_DB_FIELDS if not db.get(k)]
if missing:
return False, f"db_config 缺少字段: {missing}", config
# 3. MySQL 连接 + SELECT 1
conn = None
try:
conn = pymysql.connect(
host=db["host"],
port=int(db["port"]),
user=db["user"],
password=db["password"],
database=db["database"],
connect_timeout=5,
read_timeout=5,
write_timeout=5,
charset="utf8mb4",
)
except Exception as e:
return False, f"MySQL connect 失败: {type(e).__name__}: {e}", config
try:
with conn.cursor() as cur:
cur.execute("SELECT 1")
cur.fetchone()
# 4. 表存在
for table in REQUIRED_TABLES:
try:
cur.execute(f"SELECT 1 FROM `{table}` LIMIT 1")
cur.fetchone()
except Exception as e:
return False, f"审计表 {table} 不可用: {type(e).__name__}: {e}", config
# 5. 必要列存在
ok, col_err = _check_columns(cur, db["database"], "nano_banana_user_use_log",
REQUIRED_USE_LOG_COLUMNS)
if not ok:
return False, col_err, config
ok, col_err = _check_columns(cur, db["database"], "nano_banana_user_log",
REQUIRED_LOGIN_LOG_COLUMNS)
if not ok:
return False, col_err, config
finally:
try:
conn.close()
except Exception:
pass
# 6. 本地队列目录可写
try:
audit_queue_path.parent.mkdir(parents=True, exist_ok=True)
probe = audit_queue_path.parent / ".preflight_probe"
probe.write_text("ok", encoding="utf-8")
probe.unlink()
except Exception as e:
return False, f"审计队列目录不可写 ({audit_queue_path.parent}): {e}", config
return True, "", config
def _check_columns(cur, db_name: str, table: str, required: tuple[str, ...]) -> Tuple[bool, str]:
cur.execute(
"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS "
"WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s",
(db_name, table),
)
existing = {row[0] for row in cur.fetchall()}
missing = [c for c in required if c not in existing]
if missing:
return False, f"表 {table} 缺少列: {missing}(请运行 migrations/2026-04-21_add_audit_log_columns.sql)"
return True, ""
def handle_preflight_failure(detail: str, logs_dir: Path) -> None:
"""
写入脱敏详情到 logs/preflight_error.log,显示单行对话框,sys.exit(1)。
调用此函数前必须已经创建 QApplication。
"""
from PySide6.QtWidgets import QMessageBox, QApplication
# 写日志(脱敏)
try:
logs_dir.mkdir(parents=True, exist_ok=True)
err_log = logs_dir / "preflight_error.log"
with open(err_log, "a", encoding="utf-8") as f:
f.write(f"\n===== {datetime.now().isoformat(timespec='seconds')} =====\n")
f.write(_scrub(detail))
f.write("\n")
except Exception:
pass
# 对用户:一句话
try:
app = QApplication.instance()
if app is None:
# preflight 失败比 QApplication 创建还早的极端情况(不应发生)
app = QApplication(sys.argv)
box = QMessageBox()
box.setIcon(QMessageBox.Critical)
box.setWindowTitle("启动失败")
box.setText("应用启动失败,请联系 @柴进")
box.setStandardButtons(QMessageBox.Ok)
box.exec()
except Exception:
# 最坏情况:连对话框都弹不出来
print("应用启动失败,请联系 @柴进", file=sys.stderr)
sys.exit(1)
_SCRUB_PATTERNS = [
(re.compile(r'("password"\s*:\s*)"[^"]*"'), r'\1"***"'),
(re.compile(r'("api_key"\s*:\s*)"[^"]*"'), r'\1"***"'),
(re.compile(r"(password\s*=\s*)\S+"), r"\1***"),
(re.compile(r"(api_key\s*=\s*)\S+"), r"\1***"),
]
def _scrub(detail: str) -> str:
"""从详情里擦除 password / api_key。"""
out = detail
for pat, repl in _SCRUB_PATTERNS:
out = pat.sub(repl, out)
return out