imagegen.py
20.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
"""ImageGenBridge — 图片生成 tab 的输入输出桥。
QML 调 imageGen.submitTask(prompt, refs, aspect, size, mode) → 桥层把 mode 中文
名翻译成 Gemini 模型 ID,调 TaskQueueManager.submit_task。任务完成后桥层
监听 TaskQueueManager.task_completed 信号,把 image_bytes 落 HistoryManager.save_generation,
然后把 result_path 通过 taskCompleted 信号转出(QML 只拿到文件路径,不传 bytes)。
"""
import logging
import platform
import shutil
import subprocess
import tempfile
import time
import uuid
from pathlib import Path
from typing import Optional
from PySide6.QtCore import Property, QObject, QUrl, Signal, Slot
from PySide6.QtGui import QGuiApplication, QImage, QPixmap
from core.generation import MODEL_BY_MODE, MODEL_PRO
_PASTE_VALID_EXT = {".png", ".jpg", ".jpeg", ".gif", ".bmp", ".webp"}
_IMAGE_MAX_BYTES = 10 * 1024 * 1024 # 10MB,与旧 validate_image_file 一致
class ImageGenBridge(QObject):
apiKeyChanged = Signal()
busyChanged = Signal()
savedPromptsChanged = Signal()
taskSubmitted = Signal(str) # task_id
taskCompleted = Signal(str, str, str, str) # task_id, result_path, prompt, model
taskFailed = Signal(str, str) # task_id, error_message
taskProgress = Signal(str, float, str) # task_id, progress, status_text
def __init__(self, task_queue_manager, history_manager, auth_bridge,
api_key: str = "", saved_prompts=None, config_path=None,
parent=None):
super().__init__(parent)
self._logger = logging.getLogger(__name__)
self._tqm = task_queue_manager
self._history = history_manager
self._auth = auth_bridge
self._api_key = api_key
self._saved_prompts = list(saved_prompts or [])
self._config_path = config_path # Path 或 None;None 时 add/remove 不持久化
# 转发 TaskQueueManager 信号 → 桥层 QML 友好信号
self._tqm.task_added.connect(self._on_task_added)
self._tqm.task_progress.connect(self._on_progress)
self._tqm.task_completed.connect(self._on_completed)
self._tqm.task_failed.connect(self._on_failed)
# ---- Properties -----------------------------------------------------
@Property(str, notify=apiKeyChanged)
def apiKey(self) -> str:
return self._api_key
@Property(bool, notify=busyChanged)
def busy(self) -> bool:
return self._tqm.get_running_count() > 0
@Property("QVariantList", notify=savedPromptsChanged)
def savedPrompts(self) -> list:
return list(self._saved_prompts)
# ---- Slots ----------------------------------------------------------
@Slot(str)
def setApiKey(self, key: str) -> None:
if key != self._api_key:
self._api_key = key
self.apiKeyChanged.emit()
@Slot(str)
def addSavedPrompt(self, prompt: str) -> None:
prompt = (prompt or "").strip()
if not prompt or prompt in self._saved_prompts:
return
self._saved_prompts.insert(0, prompt)
self._persist_saved_prompts()
self.savedPromptsChanged.emit()
@Slot(str)
def removeSavedPrompt(self, prompt: str) -> None:
if prompt not in self._saved_prompts:
return
self._saved_prompts.remove(prompt)
self._persist_saved_prompts()
self.savedPromptsChanged.emit()
@Slot(str, result=bool)
def isSavedPrompt(self, prompt: str) -> bool:
"""QML 用来实时切换 ⭐/✓ 按钮态"""
return (prompt or "").strip() in self._saved_prompts
@Slot(str, result=bool)
def toggleSavedPrompt(self, prompt: str) -> bool:
"""已收藏 → 取消(返回 False);未收藏且非空 → 添加(返回 True)。"""
prompt = (prompt or "").strip()
if not prompt:
return False
if prompt in self._saved_prompts:
self._saved_prompts.remove(prompt)
self._persist_saved_prompts()
self.savedPromptsChanged.emit()
return False
self._saved_prompts.insert(0, prompt)
self._persist_saved_prompts()
self.savedPromptsChanged.emit()
return True
def _persist_saved_prompts(self) -> None:
"""更新 config.json 的 saved_prompts 字段(保留其他字段)。"""
if self._config_path is None:
return
from config_util import load_config_safe, save_config
cfg, _ = load_config_safe(self._config_path)
cfg["saved_prompts"] = list(self._saved_prompts)
if not save_config(self._config_path, cfg):
self._logger.warning(f"saved_prompts 持久化失败: {self._config_path}")
@Slot(str, list, str, str, str, result=str)
def submitTask(self, prompt: str, reference_images: list,
aspect_ratio: str, image_size: str, mode: str) -> str:
"""提交一条生成任务,返回 task_id。失败抛 RuntimeError。
Args:
prompt: 中文提示词
reference_images: 参考图本地路径 list[str]
aspect_ratio: '1:1' / '2:3' / ...
image_size: '1K' / '2K' / '4K'
mode: '极速模式' 或 '慢速模式'
"""
from task_queue import TaskType # 局部 import 避免桥层冷启动加载 Qt UI
model = MODEL_BY_MODE.get(mode, MODEL_PRO)
task_id = self._tqm.submit_task(
task_type=TaskType.IMAGE_GENERATION,
prompt=prompt,
api_key=self._api_key,
reference_images=list(reference_images or []),
aspect_ratio=aspect_ratio,
image_size=image_size,
model=model,
user_name=self._auth.currentUser if self._auth else "",
device_name=self._auth.deviceName() if self._auth else "",
)
self._logger.info(f"提交生成任务: {task_id[:8]} - mode={mode}")
return task_id
@Slot(str)
def cancelTask(self, task_id: str) -> None:
self._tqm.cancel_task(task_id)
self.busyChanged.emit()
@Slot(result="QVariantList")
def pasteFromClipboard(self) -> list:
"""从系统剪贴板拿图片,返回本地路径列表(正斜杠风格)。失败返回空列表。
三条路径(按优先级,与旧 paste_from_clipboard / _safe_get_clipboard_image 对齐):
路径 A — mimeData.hasUrls():剪贴板是文件 URL(资源管理器 Ctrl+C 图片文件)
→ 直接拿 toLocalFile() 路径
路径 B — mimeData.data(mime_type) → QImage.loadFromData:剪贴板是图像 raw bytes
走 image/png / image/jpeg / image/bmp / image/tiff / application/x-qt-image,
按 mime 顺序尝试。微信截图 / QQ 截图 / 系统截图工具大多写 raw PNG / DIB 进剪贴板,
这条最稳,覆盖 clipboard.image() 不识别的截图工具。
路径 C — mimeData.hasImage() / imageData() / clipboard.image():兜底
"""
clipboard = QGuiApplication.clipboard()
if clipboard is None:
return []
mime_data = clipboard.mimeData()
# 路径 A:文件 URL
if mime_data is not None and mime_data.hasUrls():
files = []
for url in mime_data.urls():
if not url.isLocalFile():
continue
p = url.toLocalFile()
if p and Path(p).suffix.lower() in _PASTE_VALID_EXT:
files.append(Path(p).as_posix())
if files:
self._logger.info(f"剪贴板拿到 {len(files)} 个图片文件")
return files
# 路径 B / C:从剪贴板拿 QImage
image = self._extract_clipboard_image(clipboard)
if image is None or image.isNull():
self._logger.info("剪贴板既无图片文件也无图像数据")
return []
# 写入 temp PNG(先 normalize 到 ARGB32 防止 Indexed/Mono 格式产生异常 PNG)
normalized = image.convertToFormat(QImage.Format.Format_ARGB32)
tmp_dir = Path(tempfile.gettempdir()) / "nano_banana_app"
tmp_dir.mkdir(parents=True, exist_ok=True)
ts = time.strftime("%Y%m%d_%H%M%S")
path = tmp_dir / f"clipboard_{ts}_{uuid.uuid4().hex[:6]}.png"
ok = normalized.save(str(path), "PNG")
if not ok:
self._logger.warning(f"normalized.save 失败,回退原 image.save: {path}")
ok = image.save(str(path), "PNG")
if not ok:
self._logger.error(f"剪贴板图片保存失败: {path}")
return []
self._logger.info(f"剪贴板图片已存: {path}")
return [path.as_posix()]
def _extract_clipboard_image(self, clipboard) -> Optional[QImage]:
"""从剪贴板拿 QImage,多路径 fallback(路径 B + C)。
macOS 26+ (Darwin 25+) 注意:Qt 的 application/x-qt-image 反序列化 /
mimeData.imageData() / clipboard.image() 会触发 NSPasteboard→NSImage
转换,部分场景下 **native crash**(Python 层捕获不到)。所以 macOS 上:
- application/x-qt-image 跳过(即便它在 mime 列表里)
- mimeData.imageData() 跳过
- clipboard.image() 跳过
- 改走 osascript 让系统把剪贴板 PNG 数据写到 temp 文件,再 QImage 读盘
"""
is_mac = platform.system() == "Darwin"
mime_data = clipboard.mimeData()
# 路径 B:mimeData.data(mime) raw bytes → QImage.loadFromData
# 这条只读字节,不触发 Qt 自动 image 反序列化,macOS 也安全
if mime_data is not None:
available_formats = list(mime_data.formats())
self._logger.info(f"[clipboard] 可用 MIME: {available_formats}")
# macOS 上跳过 application/x-qt-image(Qt 内部格式,反序列化可能崩)
safe_mimes = ["image/png", "image/jpeg", "image/bmp", "image/tiff"]
if not is_mac:
safe_mimes.append("application/x-qt-image")
for mime_type in safe_mimes:
if mime_type in available_formats:
try:
raw = mime_data.data(mime_type)
if raw and len(raw) > 0:
img = QImage()
if img.loadFromData(raw):
self._logger.info(
f"[clipboard] 从 {mime_type} 加载成功, "
f"{img.width()}x{img.height()}, {len(raw)} bytes"
)
return img.copy()
else:
self._logger.warning(
f"[clipboard] {mime_type} loadFromData 失败"
f" ({len(raw)} bytes)"
)
except Exception:
self._logger.exception(f"[clipboard] 读 {mime_type} 异常")
# macOS 优先走 osascript(路径 B 拿不到时唯一安全选项)
if is_mac:
img = self._extract_via_osascript()
if img is not None:
return img
return None # macOS 不再走有风险的 Qt API
# 路径 C1:mimeData.hasImage() / imageData()(仅非 macOS)
if mime_data is not None:
try:
if mime_data.hasImage():
image_data = mime_data.imageData()
if isinstance(image_data, QImage) and not image_data.isNull():
self._logger.info(
f"[clipboard] imageData() 成功, "
f"{image_data.width()}x{image_data.height()}"
)
return image_data.copy()
except Exception:
self._logger.exception("[clipboard] imageData() 异常")
# 路径 C2:clipboard.image() 兜底(仅非 macOS)
try:
img = clipboard.image()
if img and not img.isNull():
self._logger.info("[clipboard] clipboard.image() 成功")
return img.copy()
except Exception:
self._logger.exception("[clipboard] clipboard.image() 异常")
return None
def _extract_via_osascript(self) -> Optional[QImage]:
"""macOS 专用:用 AppleScript 把剪贴板 PNG 数据写到 temp 文件,再 QImage 读盘。
绕开 Qt clipboard API 的 NSPasteboard→NSImage 转换风险路径。
失败返回 None(用户复制的可能不是图片,或 osascript 报错)。
"""
try:
tmp_dir = Path(tempfile.gettempdir()) / "nano_banana_app"
tmp_dir.mkdir(parents=True, exist_ok=True)
tmp_path = tmp_dir / "_clipboard_tmp.png"
if tmp_path.exists():
try:
tmp_path.unlink()
except Exception:
pass
# AppleScript:把剪贴板里的 PNG 数据写到 tmp 文件
# «class PNGf» 是 macOS 内置的 PNG 数据类型
script = (
'set theFile to POSIX file "' + str(tmp_path) + '"\n'
'try\n'
' set imgData to the clipboard as «class PNGf»\n'
' set fp to open for access theFile with write permission\n'
' write imgData to fp\n'
' close access fp\n'
'on error\n'
' try\n'
' close access theFile\n'
' end try\n'
' error "no image"\n'
'end try\n'
)
result = subprocess.run(
["osascript", "-e", script],
capture_output=True, timeout=5,
)
if result.returncode != 0:
# 用户复制的不是图片 / 没图,正常情况
stderr = result.stderr.decode("utf-8", errors="replace")[:200]
self._logger.info(f"[clipboard][osascript] 无图: {stderr}")
return None
if not tmp_path.exists() or tmp_path.stat().st_size == 0:
return None
img = QImage(str(tmp_path))
if img.isNull():
self._logger.warning(f"[clipboard][osascript] 写盘但 QImage 加载失败: {tmp_path}")
return None
self._logger.info(
f"[clipboard][osascript] 成功 {img.width()}x{img.height()} "
f"({tmp_path.stat().st_size} bytes)"
)
return img.copy()
except subprocess.TimeoutExpired:
self._logger.warning("[clipboard][osascript] 超时 5s")
return None
except FileNotFoundError:
# osascript 找不到(理论不应发生在 macOS 上)
self._logger.warning("[clipboard][osascript] osascript 命令不存在")
return None
except Exception:
self._logger.exception("[clipboard][osascript] 异常")
return None
@Slot(str, str, result=bool)
def saveFile(self, src: str, dest: str) -> bool:
"""复制 src 到 dest(用户从下载对话框选择目标路径)。失败返回 False。"""
try:
src_path = Path(src)
dest_path = Path(dest)
if not src_path.exists():
self._logger.warning(f"saveFile 源文件不存在: {src}")
return False
dest_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(str(src_path), str(dest_path))
self._logger.info(f"图片已下载: {dest}")
return True
except Exception:
self._logger.exception(f"saveFile 失败 src={src} dest={dest}")
return False
@Slot("QVariantList", result="QVariantList")
def normalizeFileUrls(self, urls) -> list:
"""QML DropArea/FileDialog 给的是 file:/// QUrl 列表,转成本地路径字符串列表
(正斜杠风格,过滤非图片扩展名 + 10MB 大小校验 + QPixmap 完整性验证)。
"""
result = []
for u in urls or []:
try:
if isinstance(u, QUrl):
p = u.toLocalFile()
else:
p = QUrl(str(u)).toLocalFile() or str(u)
if not p:
continue
if not self._validate_image_file(p):
continue
# 统一正斜杠,避免 QML Image "file:///" + path 拼接异常
result.append(Path(p).as_posix())
except Exception:
self._logger.exception(f"normalizeFileUrls 处理 {u} 异常")
continue
return result
@Slot(str, result="QVariantMap")
def validateImageFile(self, file_path: str) -> dict:
"""对外暴露的图片校验,返回 {ok: bool, reason: str}(QML 友好)。
用于参考图来源(FileDialog / 拖拽 / 粘贴)的入口校验:
- 文件存在
- 扩展名在白名单(png/jpg/jpeg/gif/bmp/webp)
- 大小在 0 < size <= 10MB
- QPixmap 加载不为 isNull(PNG/JPEG header 完整)
"""
ok, reason = self._validate_image_file_with_reason(file_path)
return {"ok": ok, "reason": reason}
@Slot("QVariantList", result="QVariantList")
def validateImageFiles(self, paths) -> list:
"""批量校验:返回通过校验的本地路径列表(正斜杠风格)。"""
result = []
for p in paths or []:
sp = str(p)
if self._validate_image_file(sp):
result.append(Path(sp).as_posix())
return result
def _validate_image_file(self, file_path: str) -> bool:
ok, _ = self._validate_image_file_with_reason(file_path)
return ok
def _validate_image_file_with_reason(self, file_path: str):
"""与旧 validate_image_file 等价:扩展名 + 10MB + QPixmap 完整性。返回 (ok, reason)。"""
try:
p = Path(file_path)
if not p.exists():
return False, f"文件不存在: {file_path}"
ext = p.suffix.lower()
if ext not in _PASTE_VALID_EXT:
return False, f"不支持的格式: {ext}"
size = p.stat().st_size
if size == 0:
return False, f"文件为空: {file_path}"
if size > _IMAGE_MAX_BYTES:
mb = size / 1024 / 1024
return False, f"图片过大 {mb:.1f}MB(上限 10MB): {file_path}"
pix = QPixmap(str(p))
if pix.isNull():
return False, f"图片损坏或格式无效: {file_path}"
self._logger.info(
f"图片校验通过: {file_path} ({size} bytes, {pix.width()}x{pix.height()})"
)
return True, ""
except Exception:
self._logger.exception(f"_validate_image_file 异常: {file_path}")
return False, f"校验异常: {file_path}"
# ---- 内部信号转发 ----------------------------------------------------
def _on_task_added(self, task) -> None:
self.taskSubmitted.emit(task.id)
self.busyChanged.emit()
def _on_progress(self, task_id: str, progress: float, status_text: str) -> None:
self.taskProgress.emit(task_id, progress, status_text)
def _on_completed(self, task_id: str, image_bytes: bytes, prompt: str,
reference_images: list, aspect_ratio: str,
image_size: str, model: str) -> None:
# 落历史记录(QML 只看路径)
try:
timestamp = self._history.save_generation(
image_bytes=image_bytes,
prompt=prompt,
reference_images=reference_images,
aspect_ratio=aspect_ratio,
image_size=image_size,
model=model,
)
result_path = str(self._history.base_path / timestamp / "generated.png")
except Exception as e:
self._logger.error(f"保存历史失败 {task_id[:8]}: {e}", exc_info=True)
self.taskFailed.emit(task_id, f"图片生成成功但保存历史失败: {e}")
self.busyChanged.emit()
return
self.taskCompleted.emit(task_id, result_path, prompt, model)
self.busyChanged.emit()
def _on_failed(self, task_id: str, error: str) -> None:
self.taskFailed.emit(task_id, error)
self.busyChanged.emit()