history.py 23.2 KB
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592
"""历史记录数据 + 持久化 + Qt Model。

HistoryItem        — 单条历史记录的数据类
HistoryListModel   — QAbstractListModel,配合 QListView IconMode 视口懒加载
HistoryManager     — 索引文件读写 + 缩略图生成 + 启动期一次性路径迁移
"""
import json
import logging
import shutil
from collections import OrderedDict
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional

from PySide6.QtCore import QAbstractListModel, QModelIndex, Qt
from PySide6.QtGui import QIcon, QPixmap

from .paths import get_app_data_path, save_png_with_validation


@dataclass
class HistoryItem:
    """历史记录项数据结构"""
    timestamp: str
    prompt: str
    generated_image_path: Path
    reference_image_paths: List[Path]
    aspect_ratio: str
    image_size: str
    model: str
    created_at: datetime

    def to_dict(self) -> Dict[str, Any]:
        return {
            'timestamp': self.timestamp,
            'prompt': self.prompt,
            'generated_image_path': str(self.generated_image_path),
            'reference_image_paths': [str(p) for p in self.reference_image_paths],
            'aspect_ratio': self.aspect_ratio,
            'image_size': self.image_size,
            'model': self.model,
            'created_at': self.created_at.isoformat()
        }

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'HistoryItem':
        return cls(
            timestamp=data['timestamp'],
            prompt=data['prompt'],
            generated_image_path=Path(data['generated_image_path']),
            reference_image_paths=[Path(p) for p in data['reference_image_paths']],
            aspect_ratio=data['aspect_ratio'],
            image_size=data['image_size'],
            model=data['model'],
            created_at=datetime.fromisoformat(data['created_at'])
        )


class HistoryListModel(QAbstractListModel):
    """历史记录列表的 Qt Model(路线 A 阶段 2)。

    数据本体只持有 list[str] 时间戳;渲染所需的 HistoryItem / 缩略图 QIcon 走
    LRU 缓存按需懒加载。配合 QListView IconMode,Qt 只会对视口可见 + 邻近
    几行调用 data(),从而实现"历史记录涨到 1000+ 也不会一次性建 N 个 widget"。

    Step 1:data(DecorationRole) 内同步加载 thumbnail 并缓存为 QIcon。
    Step 2 后会改为 QPixmapCache + QRunnable 异步加载。
    """

    _CACHE_LIMIT = 300

    def __init__(self, history_manager, build_placeholder_icon, logger, parent=None):
        super().__init__(parent)
        self._history_manager = history_manager
        self._build_placeholder_icon = build_placeholder_icon
        self._logger = logger
        self._timestamps: List[str] = []
        self._cache: 'OrderedDict[str, Dict[str, Any]]' = OrderedDict()

    # ---- Qt model interface -------------------------------------------------

    def rowCount(self, parent=QModelIndex()) -> int:
        if parent.isValid():
            return 0
        return len(self._timestamps)

    def roleNames(self):
        """暴露 Qt 内置 roles 给 QML(默认 QML 只能 model.display)。

        QListView (旧 QWidget UI) 用 int role 不受影响;QML ListView delegate
        现在能用 model.timestamp / model.toolTip 等访问。
        """
        return {
            Qt.DisplayRole: b"display",
            Qt.DecorationRole: b"decoration",
            Qt.ToolTipRole: b"toolTip",
            Qt.UserRole: b"timestamp",
        }

    def flags(self, index: QModelIndex):
        if not index.isValid():
            return Qt.NoItemFlags
        return Qt.ItemIsEnabled | Qt.ItemIsSelectable

    def data(self, index: QModelIndex, role: int = Qt.DisplayRole):
        if not index.isValid():
            return None
        row = index.row()
        if row < 0 or row >= len(self._timestamps):
            return None
        timestamp = self._timestamps[row]

        if role == Qt.UserRole:
            return timestamp

        cached = self._get_or_build(timestamp)
        if cached is None:
            return None
        if role == Qt.DisplayRole:
            return cached.get('display_text', '')
        if role == Qt.ToolTipRole:
            return cached.get('tooltip', '')
        if role == Qt.DecorationRole:
            return cached.get('icon')
        return None

    # ---- 增量操作 -----------------------------------------------------------

    def reset_timestamps(self, timestamps: List[str]):
        """重置整个列表(用于刷新按钮 / 首次加载 / 清空)。"""
        self.beginResetModel()
        self._timestamps = list(timestamps)
        self._cache.clear()
        self.endResetModel()

    def prepend_timestamp(self, timestamp: str):
        """把新 timestamp 插到最前。"""
        self.beginInsertRows(QModelIndex(), 0, 0)
        self._timestamps.insert(0, timestamp)
        self.endInsertRows()

    def remove_timestamp(self, timestamp: str) -> bool:
        """按 timestamp 删除一行。"""
        try:
            row = self._timestamps.index(timestamp)
        except ValueError:
            return False
        self.beginRemoveRows(QModelIndex(), row, row)
        del self._timestamps[row]
        self._cache.pop(timestamp, None)
        self.endRemoveRows()
        return True

    def row_of(self, timestamp: str) -> int:
        try:
            return self._timestamps.index(timestamp)
        except ValueError:
            return -1

    def invalidate_cache(self, timestamp: Optional[str] = None):
        """业务操作(路径修正、缩略图重建等)后让缓存失效。"""
        if timestamp is None:
            self._cache.clear()
        else:
            self._cache.pop(timestamp, None)

    # ---- 内部 ---------------------------------------------------------------

    def _get_or_build(self, timestamp: str) -> Optional[Dict[str, Any]]:
        """按需构建并 LRU 缓存渲染数据。"""
        cached = self._cache.get(timestamp)
        if cached is not None:
            self._cache.move_to_end(timestamp)
            return cached
        try:
            item = self._history_manager.load_history_item_fast(timestamp)
        except Exception:
            self._logger.exception(f"[HistoryListModel] load 失败 {timestamp}")
            item = None
        if item is None:
            placeholder = {
                'item': None,
                'icon': self._build_placeholder_icon("已删除"),
                'display_text': f"{timestamp}\n(已删除)",
                'tooltip': f"时间戳: {timestamp}\n记录已不存在",
            }
            self._put_cache(timestamp, placeholder)
            return placeholder

        prompt_preview = item.prompt[:20] + "..." if len(item.prompt) > 20 else item.prompt
        display_text = f"{item.timestamp}\n{prompt_preview}"
        tooltip = (
            f"时间: {item.created_at.strftime('%Y-%m-%d %H:%M:%S')}\n"
            f"提示词: {item.prompt}\n"
            f"宽高比: {item.aspect_ratio}\n"
            f"尺寸: {item.image_size}"
        )

        icon = self._load_thumbnail_icon(item)
        rendered = {
            'item': item,
            'icon': icon,
            'display_text': display_text,
            'tooltip': tooltip,
        }
        self._put_cache(timestamp, rendered)
        return rendered

    def _load_thumbnail_icon(self, item: HistoryItem) -> QIcon:
        """同步加载缩略图为 QIcon(Step 2 改异步)。"""
        if not item.generated_image_path.exists():
            return self._build_placeholder_icon("图片\n不存在")
        try:
            thumb_path = self._history_manager.get_or_create_thumbnail(item.generated_image_path)
            if thumb_path is None:
                return self._build_placeholder_icon("缩略图\n失败")
            pixmap = QPixmap(str(thumb_path))
            if pixmap.isNull():
                return self._build_placeholder_icon("图片\n加载失败")
            scaled = pixmap.scaled(120, 120, Qt.KeepAspectRatio, Qt.SmoothTransformation)
            return QIcon(scaled)
        except Exception:
            self._logger.exception(f"[HistoryListModel] 缩略图异常 {item.timestamp}")
            return self._build_placeholder_icon("图片\n错误")

    def _put_cache(self, timestamp: str, value: Dict[str, Any]):
        self._cache[timestamp] = value
        self._cache.move_to_end(timestamp)
        while len(self._cache) > self._CACHE_LIMIT:
            self._cache.popitem(last=False)


class HistoryManager:
    """历史记录管理器"""

    def __init__(self, base_path: Optional[Path] = None):
        self.logger = logging.getLogger(__name__)
        self.base_path = base_path or get_app_data_path()
        self.base_path.mkdir(parents=True, exist_ok=True)
        self.history_index_file = self.base_path / "history_index.json"
        self.max_history_count = 0  # 0 表示不限制历史记录数量

        self.logger.debug(f"历史记录管理器初始化完成,存储路径: {self.base_path}")

        # 启动时一次性把过期绝对路径归一化到当前 base_path,
        # 之后 load_history_index 不再做任何 stat 循环
        try:
            self._migrate_paths_once()
        except Exception:
            self.logger.exception("启动路径迁移失败 (可忽略)")

    def save_generation(self, image_bytes: bytes, prompt: str, reference_images: List[bytes],
                        aspect_ratio: str, image_size: str, model: str) -> str:
        """保存生成的图片到历史记录,返回时间戳。"""
        self.logger.info(f"开始保存历史记录 - 模型: {model}, 尺寸: {image_size}")

        timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
        record_dir = self.base_path / timestamp
        record_dir.mkdir(exist_ok=True)

        generated_image_path = record_dir / "generated.png"
        if not save_png_with_validation(str(generated_image_path), image_bytes):
            with open(generated_image_path, 'wb') as f:
                f.write(image_bytes)
            self.logger.warning(f"使用原始保存方法: {generated_image_path}")

        reference_image_paths = []
        for i, ref_img_bytes in enumerate(reference_images):
            ref_path = record_dir / f"reference_{i + 1}.png"
            if not save_png_with_validation(str(ref_path), ref_img_bytes):
                with open(ref_path, 'wb') as f:
                    f.write(ref_img_bytes)
                self.logger.warning(f"使用原始保存方法: {ref_path}")
            reference_image_paths.append(ref_path)

        metadata = {
            'timestamp': timestamp,
            'prompt': prompt,
            'aspect_ratio': aspect_ratio,
            'image_size': image_size,
            'model': model,
            'created_at': datetime.now().isoformat()
        }

        metadata_path = record_dir / "metadata.json"
        with open(metadata_path, 'w', encoding='utf-8') as f:
            json.dump(metadata, f, ensure_ascii=False, indent=2)

        history_item = HistoryItem(
            timestamp=timestamp,
            prompt=prompt,
            generated_image_path=generated_image_path,
            reference_image_paths=reference_image_paths,
            aspect_ratio=aspect_ratio,
            image_size=image_size,
            model=model,
            created_at=datetime.now()
        )

        self._update_history_index(history_item)

        try:
            self.get_or_create_thumbnail(generated_image_path)
        except Exception:
            self.logger.exception("保存历史记录时生成缩略图失败")

        self._cleanup_old_records()

        return timestamp

    @staticmethod
    def thumb_path_for(generated_image_path: Path) -> Path:
        """缩略图在 generated.png 同目录下的固定名称 thumb.jpg。"""
        return generated_image_path.parent / "thumb.jpg"

    def get_or_create_thumbnail(self, generated_image_path: Path, size: int = 240):
        """返回缓存缩略图路径,缺失或过期时用 PIL 生成一次。

        refresh_history 每次加载 100+ 条 2K PNG 会导致 ~1.7GB 瞬时内存,
        在 macOS 上触发 SIGKILL。缩略图把单条开销从 ~16MB 降到 ~5KB。

        失败时返回 None,调用方应回退到原图或占位图。
        """
        try:
            thumb_path = self.thumb_path_for(generated_image_path)
            if thumb_path.exists():
                try:
                    if thumb_path.stat().st_mtime >= generated_image_path.stat().st_mtime:
                        return thumb_path
                except OSError:
                    pass
            from PIL import Image
            with Image.open(generated_image_path) as img:
                img = img.convert("RGB")
                img.thumbnail((size, size), Image.LANCZOS)
                img.save(str(thumb_path), "JPEG", quality=75, optimize=True)
            return thumb_path
        except Exception:
            try:
                self.logger.exception(f"生成缩略图失败 {generated_image_path}")
            except Exception:
                pass
            return None

    def _migrate_paths_once(self):
        """启动时一次性路径归一化(取代过去每次 load 都修正的循环)。

        历史背景:旧版本 index.json 存绝对路径。.app 重打包或存储位置变更
        后,老路径失效。过去做法是 load_history_index 每次都全扫 stat 修正,
        N=513 时主线程阻塞 ~60ms × N 次/会话,Mac 上累计触发 jetsam SIGKILL。

        现在改成只在启动时跑一次:
        1. 抽样前 5 条,60% 以上路径有效 → 跳过(绝大多数启动走这条 fast path)
        2. 否则全量重写所有路径并 save,之后 load_history_index 直接信任 raw
        """
        if not self.history_index_file.exists():
            return
        try:
            with open(self.history_index_file, 'r', encoding='utf-8') as f:
                raw = json.load(f)
        except Exception:
            self.logger.exception("_migrate_paths_once 读取 index 失败")
            return
        if not isinstance(raw, list) or not raw:
            return

        sample = [d for d in raw[:5] if isinstance(d, dict)]
        if not sample:
            return
        ok = 0
        for d in sample:
            p = d.get('generated_image_path')
            if p and Path(p).exists():
                ok += 1
        # 60% 阈值:少数损坏可能是孤立文件被人手动删,不需要全迁移
        if ok * 10 >= len(sample) * 6:
            self.logger.info(
                f"[migrate_paths] 抽样 {ok}/{len(sample)} 路径有效,跳过迁移"
            )
            return

        self.logger.info(
            f"[migrate_paths] 抽样 {ok}/{len(sample)} 路径有效,开始全量迁移 {len(raw)} 条"
        )
        changed = 0
        for d in raw:
            if not isinstance(d, dict):
                continue
            ts = d.get('timestamp')
            if not ts:
                continue
            gen_p = d.get('generated_image_path')
            if gen_p and not Path(gen_p).exists():
                cand = self.base_path / ts / Path(gen_p).name
                if cand.exists():
                    d['generated_image_path'] = str(cand)
                    changed += 1
            refs = d.get('reference_image_paths') or []
            new_refs = []
            for r in refs:
                if Path(r).exists():
                    new_refs.append(r)
                else:
                    cand = self.base_path / ts / Path(r).name
                    if cand.exists():
                        new_refs.append(str(cand))
                        changed += 1
                    else:
                        new_refs.append(r)
            d['reference_image_paths'] = new_refs

        if changed > 0:
            self.logger.info(f"[migrate_paths] 修正 {changed} 个路径, 写回 index.json")
            try:
                with open(self.history_index_file, 'w', encoding='utf-8') as f:
                    json.dump(raw, f, ensure_ascii=False, indent=2)
            except Exception:
                self.logger.exception("[migrate_paths] 写回失败")

    def load_history_index(self) -> List[HistoryItem]:
        """加载历史记录索引(仅 raw read + from_dict + sort)。

        路径修正已下放到 __init__ 时的 _migrate_paths_once,
        load 路径完全没有 stat 系统调用,N=513 时只需几 ms。
        """
        if not self.history_index_file.exists():
            return []
        try:
            with open(self.history_index_file, 'r', encoding='utf-8') as f:
                data = json.load(f)
            if not isinstance(data, list):
                return []
            items: List[HistoryItem] = []
            for d in data:
                if not isinstance(d, dict):
                    continue
                try:
                    items.append(HistoryItem.from_dict(d))
                except Exception:
                    continue
            items.sort(key=lambda x: x.timestamp, reverse=True)
            return items
        except Exception:
            self.logger.exception("加载历史记录索引失败")
            return []

    def get_history_item(self, timestamp: str) -> Optional[HistoryItem]:
        """获取指定时间戳的历史记录项。

        以前实现走 load_history_index() 全扫 + 路径修正,每次 O(N) + N 次 stat。
        在 Mac 用户 513 条历史的环境下,点击历史项查看详情会触发主线程
        阻塞 ~60ms; 连续点击会累积内存峰值并触发 jetsam SIGKILL。
        现改为直接读 {timestamp}/metadata.json + 同目录扫描,O(1)。
        """
        return self.load_history_item_fast(timestamp)

    def load_history_item_fast(self, timestamp: str) -> Optional[HistoryItem]:
        """轻量读取单条历史记录:直接从 {timestamp}/metadata.json + 文件扫描。

        不读 index.json、不做全量路径修正、不扫其他记录。用于生成完成后
        增量刷新 UI,避免每次都走 O(N) 的 load_history_index。
        """
        record_dir = self.base_path / timestamp
        metadata_path = record_dir / "metadata.json"
        if not metadata_path.exists():
            return None
        try:
            with open(metadata_path, 'r', encoding='utf-8') as f:
                metadata = json.load(f)
            generated_image_path = record_dir / "generated.png"
            reference_image_paths = sorted(
                record_dir.glob("reference_*.png"),
                key=lambda p: p.name
            )
            created_at = (
                datetime.fromisoformat(metadata['created_at'])
                if 'created_at' in metadata else datetime.now()
            )
            return HistoryItem(
                timestamp=metadata.get('timestamp', timestamp),
                prompt=metadata.get('prompt', ''),
                generated_image_path=generated_image_path,
                reference_image_paths=reference_image_paths,
                aspect_ratio=metadata.get('aspect_ratio', ''),
                image_size=metadata.get('image_size', ''),
                model=metadata.get('model', ''),
                created_at=created_at,
            )
        except Exception:
            self.logger.exception(f"load_history_item_fast 失败 {timestamp}")
            return None

    def delete_history_item(self, timestamp: str) -> bool:
        """删除指定的历史记录"""
        try:
            record_dir = self.base_path / timestamp
            if record_dir.exists():
                shutil.rmtree(record_dir)

            # 直接对 raw json 操作, 避免 load_history_index O(N) 全扫
            if not self.history_index_file.exists():
                return True
            try:
                with open(self.history_index_file, 'r', encoding='utf-8') as f:
                    raw = json.load(f)
                if not isinstance(raw, list):
                    raw = []
            except Exception:
                raw = []
            raw = [d for d in raw if isinstance(d, dict) and d.get('timestamp') != timestamp]
            try:
                with open(self.history_index_file, 'w', encoding='utf-8') as f:
                    json.dump(raw, f, ensure_ascii=False, indent=2)
            except Exception:
                self.logger.exception("删除后写回索引失败")

            return True
        except Exception:
            self.logger.exception("删除历史记录失败")
            return False

    def _update_history_index(self, history_item: HistoryItem):
        """更新历史记录索引(每次生成完图片都会调用,hot path)。

        老实现走 load_history_index() 全扫 + 路径修正 + N 次 stat,再
        整个 list 反序列化 + 重新写回。513 条历史时主线程阻塞 ~60ms,
        Mac 上累计触发 jetsam SIGKILL。
        现改为直接对 raw json 列表 dict 操作:读->过滤->插首位->写。
        无 stat、无 from_dict、无路径修正。
        """
        try:
            if self.history_index_file.exists():
                with open(self.history_index_file, 'r', encoding='utf-8') as f:
                    raw = json.load(f)
                if not isinstance(raw, list):
                    raw = []
            else:
                raw = []
        except Exception:
            self.logger.exception("_update_history_index 读取索引失败")
            raw = []

        new_ts = history_item.timestamp
        raw = [d for d in raw if isinstance(d, dict) and d.get('timestamp') != new_ts]
        raw.insert(0, history_item.to_dict())

        try:
            with open(self.history_index_file, 'w', encoding='utf-8') as f:
                json.dump(raw, f, ensure_ascii=False, indent=2)
        except Exception:
            self.logger.exception("_update_history_index 写入索引失败")

    def _cleanup_old_records(self):
        """清理旧的历史记录,保持最大数量限制。max_history_count <= 0 表示不限制。"""
        if self.max_history_count <= 0:
            return
        if not self.history_index_file.exists():
            return
        try:
            with open(self.history_index_file, 'r', encoding='utf-8') as f:
                raw = json.load(f)
            if not isinstance(raw, list):
                return
        except Exception:
            self.logger.exception("_cleanup_old_records 读取索引失败")
            return

        raw.sort(key=lambda d: d.get('timestamp', '') if isinstance(d, dict) else '', reverse=True)
        if len(raw) <= self.max_history_count:
            return
        keep = raw[:self.max_history_count]
        remove = raw[self.max_history_count:]

        for d in remove:
            if not isinstance(d, dict):
                continue
            ts = d.get('timestamp')
            if not ts:
                continue
            record_dir = self.base_path / ts
            if record_dir.exists():
                try:
                    shutil.rmtree(record_dir)
                except Exception:
                    self.logger.exception(f"删除旧记录失败 {ts}")

        try:
            with open(self.history_index_file, 'w', encoding='utf-8') as f:
                json.dump(keep, f, ensure_ascii=False, indent=2)
        except Exception:
            self.logger.exception("_cleanup_old_records 写回索引失败")