f29b806b by shady

Merge remote-tracking branch 'origin/master'

2 parents f4ceb240 68b19c6f
......@@ -14,7 +14,7 @@
"database": "saas_user",
"table": "nano_banana_users"
},
"last_user": "testuser",
"last_user": "chaijin",
"saved_password_hash": "50630320e4a550f2dba371820dad9d9301d456d101aca4d5ad8f4f3bcc9c1ed9",
"logging_config": {
"enabled": true,
......
"""
任务队列系统
提供异步图像生成任务的队列管理和 UI 组件
"""
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Optional, List, Dict
from queue import Queue
from threading import Lock
import uuid
import logging
import io
from PySide6.QtCore import QObject, Signal, QTimer, Qt
from PySide6.QtWidgets import (
QWidget, QVBoxLayout, QHBoxLayout, QLabel,
QPushButton, QListWidget, QListWidgetItem, QDialog, QScrollArea, QFrame
)
from PySide6.QtGui import QPixmap, QMouseEvent
from PIL import Image
class TaskType(Enum):
"""任务类型"""
IMAGE_GENERATION = "image_gen"
STYLE_DESIGN = "style_design"
class TaskStatus(Enum):
"""任务状态"""
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
@dataclass
class Task:
"""任务数据模型"""
# 标识
id: str
type: TaskType
status: TaskStatus
# 输入参数
prompt: str
api_key: str
reference_images: List[str]
aspect_ratio: str
image_size: str
model: str
# 时间戳
created_at: datetime
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
# 结果
result_bytes: Optional[bytes] = None
error_message: Optional[str] = None
# UI 相关
thumbnail: Optional[bytes] = None
progress: float = 0.0
class TaskQueueManager(QObject):
"""
单例任务队列管理器
管理所有图像生成任务的生命周期
"""
# Signals
task_added = Signal(Task)
task_started = Signal(str) # task_id
task_completed = Signal(str, bytes, str, list, str, str, str) # task_id, image_bytes, prompt, ref_images, aspect_ratio, image_size, model
task_failed = Signal(str, str) # task_id, error_message
task_progress = Signal(str, float, str) # task_id, progress, status_text
_instance = None
_lock = Lock()
def __new__(cls):
"""单例模式"""
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if hasattr(self, '_initialized'):
return
super().__init__()
self.logger = logging.getLogger(__name__)
self._tasks: Dict[str, Task] = {}
self._queue = Queue()
self._current_worker = None
self._max_queue_size = 10
self._max_history_size = 10 # 只保留最近10条完成任务
self._initialized = True
self.logger.info("TaskQueueManager 初始化完成")
def submit_task(
self,
task_type: TaskType,
prompt: str,
api_key: str,
reference_images: List[str],
aspect_ratio: str,
image_size: str,
model: str
) -> str:
"""
提交新任务到队列
Args:
task_type: 任务类型
prompt: 图片描述
api_key: API 密钥
reference_images: 参考图片路径列表
aspect_ratio: 宽高比
image_size: 图片尺寸
model: 模型名称
Returns:
task_id: 任务唯一标识
Raises:
RuntimeError: 队列已满
"""
# 检查队列容量
if self._queue.qsize() >= self._max_queue_size:
raise RuntimeError(f"任务队列已满 (最大 {self._max_queue_size} 个)")
# 创建任务
task = Task(
id=str(uuid.uuid4()),
type=task_type,
status=TaskStatus.PENDING,
prompt=prompt,
api_key=api_key,
reference_images=reference_images.copy() if reference_images else [],
aspect_ratio=aspect_ratio,
image_size=image_size,
model=model,
created_at=datetime.now()
)
self._tasks[task.id] = task
self._queue.put(task.id)
self.logger.info(f"任务已提交: {task.id[:8]} - {prompt[:30]}")
self.task_added.emit(task)
# 如果没有正在运行的任务,启动处理
if self._current_worker is None or not self._current_worker.isRunning():
self._process_next()
return task.id
def _process_next(self):
"""处理队列中的下一个任务"""
if self._queue.empty():
self.logger.debug("队列为空,无任务处理")
return
task_id = self._queue.get()
task = self._tasks[task_id]
task.status = TaskStatus.RUNNING
task.started_at = datetime.now()
self.logger.info(f"开始处理任务: {task_id[:8]}")
# 导入 ImageGenerationWorker
from image_generator import ImageGenerationWorker
# 创建 worker
self._current_worker = ImageGenerationWorker(
task.api_key,
task.prompt,
task.reference_images,
task.aspect_ratio,
task.image_size,
task.model
)
# 绑定信号
self._current_worker.finished.connect(
lambda img_bytes, prompt, ref_imgs, ar, size, model:
self._on_task_completed(task_id, img_bytes, prompt, ref_imgs, ar, size, model)
)
self._current_worker.error.connect(
lambda error: self._on_task_failed(task_id, error)
)
self._current_worker.progress.connect(
lambda status: self.task_progress.emit(task_id, 0.5, status)
)
self.task_started.emit(task_id)
self._current_worker.start()
def _on_task_completed(self, task_id: str, image_bytes: bytes, prompt: str,
reference_images: list, aspect_ratio: str, image_size: str, model: str):
"""任务完成回调"""
task = self._tasks.get(task_id)
if not task:
self.logger.error(f"任务 {task_id[:8]} 不存在")
return
task.status = TaskStatus.COMPLETED
task.completed_at = datetime.now()
task.result_bytes = image_bytes
# 生成缩略图
try:
task.thumbnail = self._create_thumbnail(image_bytes)
except Exception as e:
self.logger.warning(f"生成缩略图失败: {e}")
elapsed = (task.completed_at - task.started_at).total_seconds()
self.logger.info(f"任务完成: {task_id[:8]} - 耗时 {elapsed:.1f}s")
self.task_completed.emit(task_id, image_bytes, prompt, reference_images,
aspect_ratio, image_size, model)
# 清理旧任务历史,只保留最近的完成任务
self._cleanup_old_tasks()
# 处理下一个任务
self._process_next()
def _on_task_failed(self, task_id: str, error: str):
"""任务失败回调"""
task = self._tasks.get(task_id)
if not task:
self.logger.error(f"任务 {task_id[:8]} 不存在")
return
task.status = TaskStatus.FAILED
task.completed_at = datetime.now()
task.error_message = error
self.logger.error(f"任务失败: {task_id[:8]} - {error}")
self.task_failed.emit(task_id, error)
# 清理旧任务历史
self._cleanup_old_tasks()
# 处理下一个任务
self._process_next()
def _cleanup_old_tasks(self):
"""清理旧任务,只保留最近的完成/失败任务"""
# 获取所有已完成和失败的任务,按完成时间排序
finished_tasks = [
t for t in self._tasks.values()
if t.status in [TaskStatus.COMPLETED, TaskStatus.FAILED] and t.completed_at
]
finished_tasks.sort(key=lambda t: t.completed_at, reverse=True)
# 只保留最近的 N 条
if len(finished_tasks) > self._max_history_size:
tasks_to_remove = finished_tasks[self._max_history_size:]
for task in tasks_to_remove:
del self._tasks[task.id]
self.logger.debug(f"清理旧任务: {task.id[:8]}")
def _create_thumbnail(self, image_bytes: bytes) -> bytes:
"""
创建缩略图 (50x50)
Args:
image_bytes: 原始图片字节
Returns:
缩略图字节
"""
img = Image.open(io.BytesIO(image_bytes))
img.thumbnail((50, 50))
thumb_io = io.BytesIO()
img.save(thumb_io, format='PNG')
return thumb_io.getvalue()
def get_task(self, task_id: str) -> Optional[Task]:
"""获取任务详情"""
return self._tasks.get(task_id)
def get_all_tasks(self) -> List[Task]:
"""获取所有任务"""
return list(self._tasks.values())
def get_pending_count(self) -> int:
"""获取等待中任务数"""
return sum(1 for t in self._tasks.values() if t.status == TaskStatus.PENDING)
def get_running_count(self) -> int:
"""获取运行中任务数"""
return sum(1 for t in self._tasks.values() if t.status == TaskStatus.RUNNING)
def cancel_task(self, task_id: str):
"""取消任务 (仅等待中任务)"""
task = self._tasks.get(task_id)
if task and task.status == TaskStatus.PENDING:
task.status = TaskStatus.CANCELLED
self.logger.info(f"任务已取消: {task_id[:8]}")
class TaskQueueWidget(QWidget):
"""
右侧极窄任务列表
按设计文档显示任务状态文字列表
"""
def __init__(self, manager: TaskQueueManager, parent=None):
super().__init__(parent)
self.logger = logging.getLogger(__name__)
self.manager = manager
self.parent_window = parent # 用于数据回填
self._setup_ui()
self._connect_signals()
self._update_summary()
def _setup_ui(self):
"""构建右侧任务列表 UI"""
layout = QVBoxLayout()
layout.setContentsMargins(8, 8, 8, 8)
layout.setSpacing(2)
# 标题
title = QLabel("任务队列")
title.setStyleSheet("QLabel { font-weight: bold; font-size: 10px; color: #666; }")
title.setAlignment(Qt.AlignCenter)
layout.addWidget(title)
# 分隔线
line = QLabel()
line.setFrameStyle(QFrame.HLine | QFrame.Sunken)
layout.addWidget(line)
# 任务状态列表 - 可点击的列表项
self.task_list = QListWidget()
self.task_list.setStyleSheet("""
QListWidget {
border: none;
font-size: 11px;
padding: 2px;
}
QListWidget::item {
padding: 4px 2px;
border-bottom: 1px solid #eee;
min-height: 20px;
}
QListWidget::item:hover {
background-color: #e3f2fd;
cursor: pointer;
}
QListWidget::item:selected {
background-color: #bbdefb;
}
""")
self.task_list.itemClicked.connect(self._on_task_item_clicked)
layout.addWidget(self.task_list)
layout.addStretch()
self.setLayout(layout)
# 设置极窄宽度
self.setMaximumWidth(120)
self.setMinimumWidth(80)
# 样式
self.setStyleSheet("""
TaskQueueWidget {
background-color: #f5f5f5;
border-left: 1px solid #ddd;
}
""")
def _connect_signals(self):
"""绑定信号"""
self.manager.task_added.connect(self._on_task_added)
self.manager.task_started.connect(self._on_task_started)
self.manager.task_completed.connect(self._on_task_completed)
self.manager.task_failed.connect(self._on_task_failed)
self.manager.task_progress.connect(self._on_task_progress)
def _update_summary(self):
"""更新任务列表 - 显示可点击的状态项"""
self.task_list.clear()
tasks = self.manager.get_all_tasks()
# 按状态分类并转换为文字
for task in tasks:
# 状态文字和颜色
if task.status == TaskStatus.RUNNING:
status_text = "执行中"
color = "#FF9500" # 橙色
elif task.status == TaskStatus.PENDING:
status_text = "等待中"
color = "#007AFF" # 蓝色
elif task.status == TaskStatus.COMPLETED:
status_text = "已完成"
color = "#34C759" # 绿色
elif task.status == TaskStatus.FAILED:
status_text = "失败"
color = "#FF3B30" # 红色
else:
status_text = "未知"
color = "#666666" # 灰色
# 创建列表项
item = QListWidgetItem(status_text)
item.setData(Qt.UserRole, task.id) # 存储任务ID
# 设置颜色
if hasattr(item, 'setForeground'):
from PySide6.QtGui import QBrush, QColor
item.setForeground(QBrush(QColor(color)))
# 添加到列表
self.task_list.addItem(item)
# 最多显示最近10个任务
if self.task_list.count() > 10:
for i in range(self.task_list.count() - 10):
self.task_list.takeItem(0)
def _on_task_added(self, task: Task):
"""任务添加回调"""
self._update_summary()
def _on_task_item_clicked(self, item: QListWidgetItem):
"""点击任务项 - 回填数据到主窗口"""
task_id = item.data(Qt.UserRole)
if not task_id:
return
task = self.manager.get_task(task_id)
if not task or not self.parent_window:
return
# 切换到对应的标签页
if task.type == TaskType.STYLE_DESIGN:
self.parent_window.tab_widget.setCurrentIndex(1) # 款式设计标签
style_tab = self.parent_window.tab_widget.currentWidget()
else:
self.parent_window.tab_widget.setCurrentIndex(0) # 图片生成标签
gen_tab = self.parent_window.tab_widget.currentWidget()
# 如果是已完成任务,直接在主窗口显示结果
if task.status == TaskStatus.COMPLETED and task.result_bytes:
if task.type == TaskType.STYLE_DESIGN and hasattr(style_tab, '_display_generated_image_from_bytes'):
# 款式设计:将图片数据存储到样式标签并显示
style_tab.generated_image_bytes = task.result_bytes
style_tab._display_generated_image_from_bytes()
elif hasattr(gen_tab, '_display_generated_image_from_bytes'):
# 图片生成:将图片数据存储到主窗口并显示
self.parent_window.generated_image_bytes = task.result_bytes
gen_tab._display_generated_image_from_bytes()
# 回填参数到主窗口
self._load_task_to_main_window(task)
def _load_task_to_main_window(self, task: Task):
"""将任务数据回填到主窗口"""
try:
if task.type == TaskType.STYLE_DESIGN:
# 款式设计标签页 - 回填prompt到预览框
self.parent_window.tab_widget.setCurrentIndex(1) # 款式设计标签
style_tab = self.parent_window.tab_widget.currentWidget()
# 回填prompt到预览框
if hasattr(style_tab, 'prompt_preview'):
style_tab.prompt_preview.setPlainText(task.prompt)
# 回填设置
if hasattr(style_tab, 'aspect_ratio') and task.aspect_ratio:
index = style_tab.aspect_ratio.findText(task.aspect_ratio)
if index >= 0:
style_tab.aspect_ratio.setCurrentIndex(index)
if hasattr(style_tab, 'image_size') and task.image_size:
index = style_tab.image_size.findText(task.image_size)
if index >= 0:
style_tab.image_size.setCurrentIndex(index)
else:
# 图片生成标签页
self.parent_window.tab_widget.setCurrentIndex(0) # 图片生成标签
gen_tab = self.parent_window.tab_widget.currentWidget()
# 回填prompt
if hasattr(gen_tab, 'prompt_text'):
gen_tab.prompt_text.setPlainText(task.prompt)
# 回填参考图片
if task.reference_images and hasattr(gen_tab, 'add_reference_image'):
for ref_path in task.reference_images:
gen_tab.add_reference_image(ref_path)
# 回填设置
if hasattr(gen_tab, 'aspect_ratio') and task.aspect_ratio:
index = gen_tab.aspect_ratio.findText(task.aspect_ratio)
if index >= 0:
gen_tab.aspect_ratio.setCurrentIndex(index)
if hasattr(gen_tab, 'image_size') and task.image_size:
index = gen_tab.image_size.findText(task.image_size)
if index >= 0:
gen_tab.image_size.setCurrentIndex(index)
except Exception as e:
self.logger.error(f"回填数据到主窗口失败: {e}")
def _on_task_started(self, task_id: str):
"""任务开始回调"""
self._update_summary()
def _on_task_completed(self, task_id: str, *args):
"""任务完成回调"""
self._update_summary()
def _on_task_failed(self, task_id: str, error: str):
"""任务失败回调"""
self._update_summary()
def _show_task_detail_dialog(self):
"""显示任务详情弹窗"""
if self.detail_dialog and self.detail_dialog.isVisible():
self.detail_dialog.raise_()
return
dialog = QDialog(self)
dialog.setWindowTitle("任务详情")
dialog.resize(400, 500)
layout = QVBoxLayout()
# 标题
title = QLabel("📋 任务队列")
title.setStyleSheet("QLabel { font-weight: bold; font-size: 14px; }")
layout.addWidget(title)
# 任务列表
task_list = QListWidget()
task_list.itemClicked.connect(lambda item: self._on_detail_task_clicked(item, dialog))
# 添加任务项
for task in self.manager.get_all_tasks():
item = QListWidgetItem()
item.setData(Qt.UserRole, task.id)
status_map = {
TaskStatus.RUNNING: ("●", "#FF9500"),
TaskStatus.PENDING: ("○", "#8E8E93"),
TaskStatus.COMPLETED: ("✓", "#34C759"),
TaskStatus.FAILED: ("✗", "#FF3B30"),
}
icon, color = status_map.get(task.status, ("?", "#000"))
prompt_preview = task.prompt[:30] + "..." if len(task.prompt) > 30 else task.prompt
display_text = f"{icon} {prompt_preview}"
item.setText(display_text)
if hasattr(item, 'setForeground'):
from PySide6.QtGui import QBrush, QColor
item.setForeground(QBrush(QColor(color)))
task_list.addItem(item)
layout.addWidget(task_list)
# 关闭按钮
close_btn = QPushButton("关闭")
close_btn.clicked.connect(dialog.close)
layout.addWidget(close_btn)
dialog.setLayout(layout)
self.detail_dialog = dialog
dialog.exec()
def _on_detail_task_clicked(self, item: QListWidgetItem, dialog: QDialog):
"""详情弹窗中点击任务项"""
dialog.close()
self._on_task_item_clicked(item)
def _on_task_progress(self, task_id: str, progress: float, status_text: str):
"""任务进度回调"""
# 侧边栏模式下进度信息通过状态标签显示
# 不需要额外更新,由_update_summary统一处理
pass
def _on_task_item_clicked(self, item: QListWidgetItem):
"""单击任务 - 回填数据到主窗口或显示结果"""
task_id = item.data(Qt.UserRole)
task = self.manager.get_task(task_id)
if not task or not self.parent_window:
return
# 切换到对应的标签页
if task.type == TaskType.STYLE_DESIGN:
self.parent_window.tab_widget.setCurrentIndex(1) # 款式设计标签
style_tab = self.parent_window.tab_widget.currentWidget()
else:
self.parent_window.tab_widget.setCurrentIndex(0) # 图片生成标签
gen_tab = self.parent_window.tab_widget.currentWidget()
# 如果是已完成任务,直接在主窗口显示结果
if task.status == TaskStatus.COMPLETED and task.result_bytes:
self.parent_window.generated_image_bytes = task.result_bytes
# 显示生成的图片
if hasattr(self.parent_window, 'display_generated_image'):
self.parent_window.display_generated_image()
elif task.type == TaskType.STYLE_DESIGN and hasattr(style_tab, '_display_generated_image_from_bytes'):
style_tab._display_generated_image_from_bytes()
elif hasattr(gen_tab, '_display_generated_image_from_bytes'):
gen_tab._display_generated_image_from_bytes()
# 回填参数
self._load_task_to_main_window(task)
def _load_task_to_main_window(self, task: Task):
"""将任务数据回填到主窗口"""
try:
if task.type == TaskType.STYLE_DESIGN:
# 切换到款式设计标签
self.parent_window.tab_widget.setCurrentIndex(1)
style_tab = self.parent_window.tab_widget.currentWidget()
if hasattr(style_tab, 'library_manager'):
# 款式设计不需要回填prompt,因为是参数组合
pass
else:
# 切换到图片生成标签
self.parent_window.tab_widget.setCurrentIndex(0)
gen_tab = self.parent_window.tab_widget.currentWidget()
if hasattr(gen_tab, 'prompt_input'):
# 回填prompt
gen_tab.prompt_input.setPlainText(task.prompt)
# 回填参考图片
if task.reference_images:
for ref_path in task.reference_images:
if hasattr(gen_tab, 'add_reference_image'):
gen_tab.add_reference_image(ref_path)
# 回填设置
if hasattr(gen_tab, 'aspect_ratio') and task.aspect_ratio:
index = gen_tab.aspect_ratio.findText(task.aspect_ratio)
if index >= 0:
gen_tab.aspect_ratio.setCurrentIndex(index)
if hasattr(gen_tab, 'image_size') and task.image_size:
index = gen_tab.image_size.findText(task.image_size)
if index >= 0:
gen_tab.image_size.setCurrentIndex(index)
except Exception as e:
self.logger.error(f"回填数据到主窗口失败: {e}")
"""
任务队列系统
提供异步图像生成任务的队列管理和 UI 组件
"""
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Optional, List, Dict
from queue import Queue
from threading import Lock
import uuid
import logging
import io
from PySide6.QtCore import QObject, Signal, QTimer, Qt
from PySide6.QtWidgets import (
QWidget, QVBoxLayout, QHBoxLayout, QLabel,
QPushButton, QListWidget, QListWidgetItem, QDialog, QScrollArea, QFrame
)
from PySide6.QtGui import QPixmap, QMouseEvent
from PIL import Image
class TaskType(Enum):
"""任务类型"""
IMAGE_GENERATION = "image_gen"
STYLE_DESIGN = "style_design"
class TaskStatus(Enum):
"""任务状态"""
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
@dataclass
class Task:
"""任务数据模型"""
# 标识
id: str
type: TaskType
status: TaskStatus
# 输入参数
prompt: str
api_key: str
reference_images: List[str]
aspect_ratio: str
image_size: str
model: str
# 时间戳
created_at: datetime
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
# 结果
result_bytes: Optional[bytes] = None
error_message: Optional[str] = None
# UI 相关
thumbnail: Optional[bytes] = None
progress: float = 0.0
class TaskQueueManager(QObject):
"""
单例任务队列管理器
管理所有图像生成任务的生命周期
"""
# Signals
task_added = Signal(Task)
task_started = Signal(str) # task_id
task_completed = Signal(str, bytes, str, list, str, str, str) # task_id, image_bytes, prompt, ref_images, aspect_ratio, image_size, model
task_failed = Signal(str, str) # task_id, error_message
task_progress = Signal(str, float, str) # task_id, progress, status_text
_instance = None
_lock = Lock()
def __new__(cls):
"""单例模式"""
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if hasattr(self, '_initialized'):
return
super().__init__()
self.logger = logging.getLogger(__name__)
self._tasks: Dict[str, Task] = {}
self._queue = Queue()
self._current_worker = None
self._max_queue_size = 10
self._max_history_size = 10 # 只保留最近10条完成任务
self._initialized = True
self.logger.info("TaskQueueManager 初始化完成")
def submit_task(
self,
task_type: TaskType,
prompt: str,
api_key: str,
reference_images: List[str],
aspect_ratio: str,
image_size: str,
model: str
) -> str:
"""
提交新任务到队列
Args:
task_type: 任务类型
prompt: 图片描述
api_key: API 密钥
reference_images: 参考图片路径列表
aspect_ratio: 宽高比
image_size: 图片尺寸
model: 模型名称
Returns:
task_id: 任务唯一标识
Raises:
RuntimeError: 队列已满
"""
# 检查队列容量
if self._queue.qsize() >= self._max_queue_size:
raise RuntimeError(f"任务队列已满 (最大 {self._max_queue_size} 个)")
# 创建任务
task = Task(
id=str(uuid.uuid4()),
type=task_type,
status=TaskStatus.PENDING,
prompt=prompt,
api_key=api_key,
reference_images=reference_images.copy() if reference_images else [],
aspect_ratio=aspect_ratio,
image_size=image_size,
model=model,
created_at=datetime.now()
)
self._tasks[task.id] = task
self._queue.put(task.id)
self.logger.info(f"任务已提交: {task.id[:8]} - {prompt[:30]}")
self.task_added.emit(task)
# 如果没有正在运行的任务,启动处理
if self._current_worker is None or not self._current_worker.isRunning():
self._process_next()
return task.id
def _process_next(self):
"""处理队列中的下一个任务"""
if self._queue.empty():
self.logger.debug("队列为空,无任务处理")
return
task_id = self._queue.get()
task = self._tasks[task_id]
task.status = TaskStatus.RUNNING
task.started_at = datetime.now()
self.logger.info(f"开始处理任务: {task_id[:8]}")
# 导入 ImageGenerationWorker
from image_generator import ImageGenerationWorker
# 创建 worker
self._current_worker = ImageGenerationWorker(
task.api_key,
task.prompt,
task.reference_images,
task.aspect_ratio,
task.image_size,
task.model
)
# 绑定信号
self._current_worker.finished.connect(
lambda img_bytes, prompt, ref_imgs, ar, size, model:
self._on_task_completed(task_id, img_bytes, prompt, ref_imgs, ar, size, model)
)
self._current_worker.error.connect(
lambda error: self._on_task_failed(task_id, error)
)
self._current_worker.progress.connect(
lambda status: self.task_progress.emit(task_id, 0.5, status)
)
self.task_started.emit(task_id)
self._current_worker.start()
def _on_task_completed(self, task_id: str, image_bytes: bytes, prompt: str,
reference_images: list, aspect_ratio: str, image_size: str, model: str):
"""任务完成回调"""
task = self._tasks.get(task_id)
if not task:
self.logger.error(f"任务 {task_id[:8]} 不存在")
return
task.status = TaskStatus.COMPLETED
task.completed_at = datetime.now()
task.result_bytes = image_bytes
# 生成缩略图
try:
task.thumbnail = self._create_thumbnail(image_bytes)
except Exception as e:
self.logger.warning(f"生成缩略图失败: {e}")
elapsed = (task.completed_at - task.started_at).total_seconds()
self.logger.info(f"任务完成: {task_id[:8]} - 耗时 {elapsed:.1f}s")
self.task_completed.emit(task_id, image_bytes, prompt, reference_images,
aspect_ratio, image_size, model)
# 清理旧任务历史,只保留最近的完成任务
self._cleanup_old_tasks()
# 处理下一个任务
self._process_next()
def _on_task_failed(self, task_id: str, error: str):
"""任务失败回调"""
task = self._tasks.get(task_id)
if not task:
self.logger.error(f"任务 {task_id[:8]} 不存在")
return
task.status = TaskStatus.FAILED
task.completed_at = datetime.now()
task.error_message = error
self.logger.error(f"任务失败: {task_id[:8]} - {error}")
self.task_failed.emit(task_id, error)
# 清理旧任务历史
self._cleanup_old_tasks()
# 处理下一个任务
self._process_next()
def _cleanup_old_tasks(self):
"""清理旧任务,只保留最近的完成/失败任务"""
# 获取所有已完成和失败的任务,按完成时间排序
finished_tasks = [
t for t in self._tasks.values()
if t.status in [TaskStatus.COMPLETED, TaskStatus.FAILED] and t.completed_at
]
finished_tasks.sort(key=lambda t: t.completed_at, reverse=True)
# 只保留最近的 N 条
if len(finished_tasks) > self._max_history_size:
tasks_to_remove = finished_tasks[self._max_history_size:]
for task in tasks_to_remove:
del self._tasks[task.id]
self.logger.debug(f"清理旧任务: {task.id[:8]}")
def _create_thumbnail(self, image_bytes: bytes) -> bytes:
"""
创建缩略图 (50x50)
Args:
image_bytes: 原始图片字节
Returns:
缩略图字节
"""
img = Image.open(io.BytesIO(image_bytes))
img.thumbnail((50, 50))
thumb_io = io.BytesIO()
img.save(thumb_io, format='PNG')
return thumb_io.getvalue()
def get_task(self, task_id: str) -> Optional[Task]:
"""获取任务详情"""
return self._tasks.get(task_id)
def get_all_tasks(self) -> List[Task]:
"""获取所有任务"""
return list(self._tasks.values())
def get_pending_count(self) -> int:
"""获取等待中任务数"""
return sum(1 for t in self._tasks.values() if t.status == TaskStatus.PENDING)
def get_running_count(self) -> int:
"""获取运行中任务数"""
return sum(1 for t in self._tasks.values() if t.status == TaskStatus.RUNNING)
def cancel_task(self, task_id: str):
"""取消任务 (仅等待中任务)"""
task = self._tasks.get(task_id)
if task and task.status == TaskStatus.PENDING:
task.status = TaskStatus.CANCELLED
self.logger.info(f"任务已取消: {task_id[:8]}")