runtime.py 11.3 KB
"""启动期 / 运行期工具:崩溃诊断、日志初始化、temp 清理、系统信息。

从 image_generator.py 顶部抽出,让 main_qml.py(QML 主入口)和旧 main()
(QWidget 入口,task #19 删)共用同一套启动序列。
"""
import faulthandler
import json
import logging
import os
import platform
import sys
import tempfile
import traceback
from datetime import datetime
from pathlib import Path


def flush_logs() -> None:
    """强制刷盘所有日志 handlers。

    macOS 对 UI 线程卡顿超过阈值会 SIGKILL,缓冲区未刷盘的日志会丢失,
    下次崩溃定位不到。在可疑阶段边界调用此函数保证日志落盘。
    """
    try:
        for h in logging.getLogger().handlers:
            try:
                h.flush()
            except Exception:
                pass
    except Exception:
        pass


def cleanup_clipboard_tempfiles(max_age_hours: int = 24) -> int:
    """清理遗留的剪贴板临时文件,防止长时间运行累积到磁盘/句柄上限。

    仅删除 {tempdir}/nano_banana_app/clipboard_*.png / _clipboard_tmp.png 中
    超过 max_age_hours 小时的文件。返回删除数量。
    """
    try:
        import time
        temp_dir = Path(tempfile.gettempdir()) / "nano_banana_app"
        if not temp_dir.exists():
            return 0
        cutoff = time.time() - max_age_hours * 3600
        removed = 0
        for p in temp_dir.iterdir():
            try:
                if not p.is_file():
                    continue
                name = p.name
                if not (name.startswith("clipboard_") or name == "_clipboard_tmp.png"):
                    continue
                if p.stat().st_mtime < cutoff:
                    p.unlink()
                    removed += 1
            except Exception:
                pass
        return removed
    except Exception:
        return 0


def get_crash_log_path() -> Path:
    """获取崩溃日志文件路径(尽早可用,不依赖任何初始化)"""
    system = platform.system()
    if system == "Darwin":
        p = Path.home() / "Library/Application Support/ZB100ImageGenerator"
    elif system == "Windows":
        p = Path(os.environ.get("APPDATA", str(Path.home()))) / "ZB100ImageGenerator"
    else:
        p = Path.home() / ".config/zb100imagegenerator"
    try:
        p.mkdir(parents=True, exist_ok=True)
    except Exception:
        p = Path(tempfile.gettempdir()) / "ZB100ImageGenerator"
        p.mkdir(parents=True, exist_ok=True)
    return p / "crash_log.txt"


def enable_crash_diagnostics() -> None:
    """在最早期启用崩溃诊断工具:faulthandler / 全局 excepthook / Qt msg handler。"""
    crash_log = get_crash_log_path()

    # 1. faulthandler: segfault 时自动输出 Python 调用栈到文件
    try:
        crash_fh = open(crash_log, "a", encoding="utf-8")
        crash_fh.write(f"\n{'=' * 60}\n")
        crash_fh.write(f"[STARTUP] {datetime.now().isoformat()} - faulthandler 已启用\n")
        crash_fh.flush()
        faulthandler.enable(file=crash_fh, all_threads=True)
        # 同时输出到 stderr (仅在 stderr 真实存在时;windowed build 下跳过)
        if sys.stderr is not None:
            try:
                faulthandler.enable(file=sys.stderr, all_threads=True)
            except (RuntimeError, ValueError):
                pass
        try:
            print(f"崩溃诊断日志路径: {crash_log}")
        except Exception:
            pass
    except Exception as e:
        try:
            print(f"faulthandler 启用失败: {e}")
        except Exception:
            pass
        if sys.stderr is not None:
            try:
                faulthandler.enable()
            except (RuntimeError, ValueError):
                pass

    # 2. 全局 Python 异常钩子
    def _global_excepthook(exc_type, exc_value, exc_tb):
        msg = "".join(traceback.format_exception(exc_type, exc_value, exc_tb))
        logging.critical(f"未捕获异常:\n{msg}")
        try:
            with open(crash_log, "a", encoding="utf-8") as f:
                f.write(f"\n[UNCAUGHT EXCEPTION] {datetime.now().isoformat()}\n{msg}\n")
        except Exception:
            pass
        sys.__excepthook__(exc_type, exc_value, exc_tb)

    sys.excepthook = _global_excepthook

    # 3. Qt 消息拦截器
    try:
        from PySide6.QtCore import qInstallMessageHandler, QtMsgType

        def _qt_message_handler(mode, context, message):
            level_map = {
                QtMsgType.QtDebugMsg: "QT_DEBUG",
                QtMsgType.QtInfoMsg: "QT_INFO",
                QtMsgType.QtWarningMsg: "QT_WARNING",
                QtMsgType.QtCriticalMsg: "QT_CRITICAL",
                QtMsgType.QtFatalMsg: "QT_FATAL",
            }
            level = level_map.get(mode, "QT_UNKNOWN")
            location = f"{context.file}:{context.line}" if context.file else "unknown"
            full_msg = f"[{level}] {location} - {message}"

            if mode in (QtMsgType.QtWarningMsg, QtMsgType.QtCriticalMsg, QtMsgType.QtFatalMsg):
                logging.warning(full_msg)
                try:
                    with open(crash_log, "a", encoding="utf-8") as f:
                        f.write(f"[{datetime.now().isoformat()}] {full_msg}\n")
                except Exception:
                    pass
            else:
                logging.debug(full_msg)

        qInstallMessageHandler(_qt_message_handler)
    except Exception as e:
        print(f"Qt 消息拦截器安装失败: {e}")


def log_system_info() -> None:
    """记录系统环境信息,用于排查兼容性问题"""
    logger = logging.getLogger(__name__)
    info_lines = [
        f"操作系统: {platform.system()} {platform.release()} ({platform.version()})",
        f"架构: {platform.machine()}",
        f"Python: {sys.version}",
        f"打包模式: {'PyInstaller' if getattr(sys, 'frozen', False) else '开发环境'}",
    ]

    try:
        import PySide6
        info_lines.append(f"PySide6: {PySide6.__version__}")
    except Exception:
        info_lines.append("PySide6: 版本获取失败")

    try:
        from PySide6.QtCore import qVersion
        info_lines.append(f"Qt: {qVersion()}")
    except Exception:
        info_lines.append("Qt: 版本获取失败")

    try:
        import PIL
        info_lines.append(f"Pillow: {PIL.__version__}")
    except Exception:
        info_lines.append("Pillow: 版本获取失败")

    try:
        from google import genai as _genai
        info_lines.append(
            f"google-genai: {_genai.__version__}" if hasattr(_genai, '__version__')
            else "google-genai: 已加载"
        )
    except Exception:
        info_lines.append("google-genai: 未安装或加载失败")

    if getattr(sys, 'frozen', False):
        info_lines.append(f"可执行路径: {sys.executable}")
        if hasattr(sys, '_MEIPASS'):
            info_lines.append(f"_MEIPASS: {sys._MEIPASS}")

    full_info = "\n  ".join(info_lines)
    logger.info(f"系统环境信息:\n  {full_info}")

    # 同时写入崩溃日志
    try:
        crash_log = get_crash_log_path()
        with open(crash_log, "a", encoding="utf-8") as f:
            f.write(f"[SYSTEM INFO] {datetime.now().isoformat()}\n  {full_info}\n")
    except Exception:
        pass


def init_logging(log_level=logging.INFO) -> bool:
    """初始化日志:智能选 logs 目录 + RotatingFileHandler + 可选 console。

    从 config.json 的 logging_config 读配置(enabled / level / log_to_console /
    max_bytes / backup_count),失败时用 INFO 级别 + 默认 5MB 滚动 + 控制台。
    成功返回 True,失败返回 False。
    """
    try:
        # 智能选择 logs 目录
        def _candidates():
            system = platform.system()
            cands = []

            if getattr(sys, 'frozen', False) and system == "Darwin":
                cands.append(Path.home() / "Library/Application Support/ZB100ImageGenerator/logs")
            elif getattr(sys, 'frozen', False):
                cands.append(Path(sys.executable).parent / "logs")
            else:
                # 开发环境:项目根目录(core/ 的上一级)
                cands.append(Path(__file__).resolve().parent.parent / "logs")

            if system == "Darwin":
                cands.append(Path.home() / "Library/Application Support/ZB100ImageGenerator/logs")
                cands.append(Path.home() / "Documents/ZB100ImageGenerator/logs")
            elif system == "Windows":
                cands.append(Path(os.environ.get("APPDATA", "")) / "ZB100ImageGenerator/logs")
                cands.append(Path.home() / "Documents/ZB100ImageGenerator/logs")
            else:
                cands.append(Path.home() / ".config/zb100imagegenerator/logs")
                cands.append(Path.home() / "Documents/ZB100ImageGenerator/logs")

            return cands

        def _writable(path: Path) -> bool:
            try:
                path.mkdir(parents=True, exist_ok=True)
                test = path / ".write_test"
                test.write_text("test")
                test.unlink()
                return True
            except Exception:
                return False

        logs_dir = None
        for c in _candidates():
            if _writable(c):
                logs_dir = c
                print(f"使用logs目录: {logs_dir}")
                break
        if logs_dir is None:
            logs_dir = Path(tempfile.gettempdir()) / "ZB100ImageGenerator_logs"
            logs_dir.mkdir(exist_ok=True)
            print(f"警告: 所有logs路径都不可用,使用临时目录: {logs_dir}")

        # 读取 config.json 的 logging_config
        script_dir = (
            Path(__file__).resolve().parent.parent
            if not getattr(sys, 'frozen', False)
            else Path(sys.executable).parent
        )
        config_path = script_dir / "config.json"
        logging_config = {"enabled": True, "level": "INFO", "log_to_console": True}
        if config_path.exists():
            try:
                with open(config_path, 'r', encoding='utf-8') as f:
                    cfg = json.load(f)
                    logging_config = cfg.get("logging_config", logging_config)
            except Exception as e:
                print(f"加载日志配置失败: {e}")

        if not logging_config.get("enabled", True):
            print("日志系统已禁用")
            return True

        level_str = logging_config.get("level", "INFO").upper()
        log_level = getattr(logging, level_str, logging.INFO)

        logs_dir.mkdir(parents=True, exist_ok=True)
        log_file = logs_dir / "app.log"
        log_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"

        from logging.handlers import RotatingFileHandler
        max_bytes = int(logging_config.get("max_bytes", 5 * 1024 * 1024))
        backup_count = int(logging_config.get("backup_count", 5))
        handlers = [RotatingFileHandler(log_file, maxBytes=max_bytes,
                                        backupCount=backup_count, encoding='utf-8')]
        if logging_config.get("log_to_console", True):
            handlers.append(logging.StreamHandler())

        logging.basicConfig(level=log_level, format=log_format, handlers=handlers, force=True)
        logging.info(f"日志系统初始化完成 - 级别: {level_str}, 文件: {log_file}")
        return True

    except Exception as e:
        print(f"日志系统初始化失败: {e}")
        return False