imagegen.py 8.8 KB
"""ImageGenBridge — 图片生成 tab 的输入输出桥。

QML 调 imageGen.submitTask(prompt, refs, aspect, size, mode) → 桥层把 mode 中文
名翻译成 Gemini 模型 ID,调 TaskQueueManager.submit_task。任务完成后桥层
监听 TaskQueueManager.task_completed 信号,把 image_bytes 落 HistoryManager.save_generation,
然后把 result_path 通过 taskCompleted 信号转出(QML 只拿到文件路径,不传 bytes)。
"""
import logging
import shutil
import tempfile
import time
import uuid
from pathlib import Path
from typing import Optional

from PySide6.QtCore import Property, QObject, QUrl, Signal, Slot
from PySide6.QtGui import QGuiApplication

from core.generation import MODEL_BY_MODE, MODEL_PRO


class ImageGenBridge(QObject):
    apiKeyChanged = Signal()
    busyChanged = Signal()
    savedPromptsChanged = Signal()

    taskSubmitted = Signal(str)                    # task_id
    taskCompleted = Signal(str, str, str, str)     # task_id, result_path, prompt, model
    taskFailed = Signal(str, str)                  # task_id, error_message
    taskProgress = Signal(str, float, str)         # task_id, progress, status_text

    def __init__(self, task_queue_manager, history_manager, auth_bridge,
                 api_key: str = "", saved_prompts=None, config_path=None,
                 parent=None):
        super().__init__(parent)
        self._logger = logging.getLogger(__name__)
        self._tqm = task_queue_manager
        self._history = history_manager
        self._auth = auth_bridge
        self._api_key = api_key
        self._saved_prompts = list(saved_prompts or [])
        self._config_path = config_path  # Path 或 None;None 时 add/remove 不持久化

        # 转发 TaskQueueManager 信号 → 桥层 QML 友好信号
        self._tqm.task_added.connect(self._on_task_added)
        self._tqm.task_progress.connect(self._on_progress)
        self._tqm.task_completed.connect(self._on_completed)
        self._tqm.task_failed.connect(self._on_failed)

    # ---- Properties -----------------------------------------------------

    @Property(str, notify=apiKeyChanged)
    def apiKey(self) -> str:
        return self._api_key

    @Property(bool, notify=busyChanged)
    def busy(self) -> bool:
        return self._tqm.get_running_count() > 0

    @Property("QVariantList", notify=savedPromptsChanged)
    def savedPrompts(self) -> list:
        return list(self._saved_prompts)

    # ---- Slots ----------------------------------------------------------

    @Slot(str)
    def setApiKey(self, key: str) -> None:
        if key != self._api_key:
            self._api_key = key
            self.apiKeyChanged.emit()

    @Slot(str)
    def addSavedPrompt(self, prompt: str) -> None:
        prompt = (prompt or "").strip()
        if not prompt or prompt in self._saved_prompts:
            return
        self._saved_prompts.insert(0, prompt)
        self._persist_saved_prompts()
        self.savedPromptsChanged.emit()

    @Slot(str)
    def removeSavedPrompt(self, prompt: str) -> None:
        if prompt not in self._saved_prompts:
            return
        self._saved_prompts.remove(prompt)
        self._persist_saved_prompts()
        self.savedPromptsChanged.emit()

    def _persist_saved_prompts(self) -> None:
        """更新 config.json 的 saved_prompts 字段(保留其他字段)。"""
        if self._config_path is None:
            return
        from config_util import load_config_safe, save_config
        cfg, _ = load_config_safe(self._config_path)
        cfg["saved_prompts"] = list(self._saved_prompts)
        if not save_config(self._config_path, cfg):
            self._logger.warning(f"saved_prompts 持久化失败: {self._config_path}")

    @Slot(str, list, str, str, str, result=str)
    def submitTask(self, prompt: str, reference_images: list,
                   aspect_ratio: str, image_size: str, mode: str) -> str:
        """提交一条生成任务,返回 task_id。失败抛 RuntimeError。

        Args:
            prompt: 中文提示词
            reference_images: 参考图本地路径 list[str]
            aspect_ratio: '1:1' / '2:3' / ...
            image_size: '1K' / '2K' / '4K'
            mode: '极速模式' 或 '慢速模式'
        """
        from task_queue import TaskType  # 局部 import 避免桥层冷启动加载 Qt UI

        model = MODEL_BY_MODE.get(mode, MODEL_PRO)
        task_id = self._tqm.submit_task(
            task_type=TaskType.IMAGE_GENERATION,
            prompt=prompt,
            api_key=self._api_key,
            reference_images=list(reference_images or []),
            aspect_ratio=aspect_ratio,
            image_size=image_size,
            model=model,
            user_name=self._auth.currentUser if self._auth else "",
            device_name=self._auth.deviceName() if self._auth else "",
        )
        self._logger.info(f"提交生成任务: {task_id[:8]} - mode={mode}")
        return task_id

    @Slot(str)
    def cancelTask(self, task_id: str) -> None:
        self._tqm.cancel_task(task_id)
        self.busyChanged.emit()

    @Slot(result=str)
    def pasteFromClipboard(self) -> str:
        """从系统剪贴板拿图片,存到 temp 目录,返回本地路径。

        失败(剪贴板无图)返回空字符串。tempdir 启动期已被
        _cleanup_clipboard_tempfiles 清过 24h+ 旧文件。
        """
        clipboard = QGuiApplication.clipboard()
        if clipboard is None:
            return ""
        image = clipboard.image()
        if image.isNull():
            return ""

        tmp_dir = Path(tempfile.gettempdir()) / "nano_banana_app"
        tmp_dir.mkdir(parents=True, exist_ok=True)
        ts = time.strftime("%Y%m%d_%H%M%S")
        path = tmp_dir / f"clipboard_{ts}_{uuid.uuid4().hex[:6]}.png"
        if not image.save(str(path), "PNG"):
            self._logger.warning(f"剪贴板图片保存失败: {path}")
            return ""
        self._logger.info(f"剪贴板图片已存: {path}")
        return str(path)

    @Slot(str, str, result=bool)
    def saveFile(self, src: str, dest: str) -> bool:
        """复制 src 到 dest(用户从下载对话框选择目标路径)。失败返回 False。"""
        try:
            src_path = Path(src)
            dest_path = Path(dest)
            if not src_path.exists():
                self._logger.warning(f"saveFile 源文件不存在: {src}")
                return False
            dest_path.parent.mkdir(parents=True, exist_ok=True)
            shutil.copy2(str(src_path), str(dest_path))
            self._logger.info(f"图片已下载: {dest}")
            return True
        except Exception as e:
            self._logger.error(f"saveFile 失败 src={src} dest={dest}: {e}")
            return False

    @Slot("QVariantList", result="QVariantList")
    def normalizeFileUrls(self, urls) -> list:
        """QML DropArea 给的是 file:/// QUrl 列表,转成本地路径字符串列表(过滤非图片)。"""
        result = []
        valid_ext = {".png", ".jpg", ".jpeg", ".webp", ".bmp"}
        for u in urls or []:
            try:
                if isinstance(u, QUrl):
                    p = u.toLocalFile()
                else:
                    p = QUrl(str(u)).toLocalFile() or str(u)
                if p and Path(p).suffix.lower() in valid_ext:
                    result.append(p)
            except Exception:
                continue
        return result

    # ---- 内部信号转发 ----------------------------------------------------

    def _on_task_added(self, task) -> None:
        self.taskSubmitted.emit(task.id)
        self.busyChanged.emit()

    def _on_progress(self, task_id: str, progress: float, status_text: str) -> None:
        self.taskProgress.emit(task_id, progress, status_text)

    def _on_completed(self, task_id: str, image_bytes: bytes, prompt: str,
                      reference_images: list, aspect_ratio: str,
                      image_size: str, model: str) -> None:
        # 落历史记录(QML 只看路径)
        try:
            timestamp = self._history.save_generation(
                image_bytes=image_bytes,
                prompt=prompt,
                reference_images=reference_images,
                aspect_ratio=aspect_ratio,
                image_size=image_size,
                model=model,
            )
            result_path = str(self._history.base_path / timestamp / "generated.png")
        except Exception as e:
            self._logger.error(f"保存历史失败 {task_id[:8]}: {e}", exc_info=True)
            self.taskFailed.emit(task_id, f"图片生成成功但保存历史失败: {e}")
            self.busyChanged.emit()
            return

        self.taskCompleted.emit(task_id, result_path, prompt, model)
        self.busyChanged.emit()

    def _on_failed(self, task_id: str, error: str) -> None:
        self.taskFailed.emit(task_id, error)
        self.busyChanged.emit()