design.md
21 KB
任务队列系统技术设计 (方案A: 底部状态栏)
🧠 核心判断
值得做
理由:用户痛点真实存在。图像生成是 I/O 密集型任务,串行等待浪费时间。任务队列是成熟解决方案,复杂度可控。
当前问题分析
数据结构
- ImageGenerationWorker (QThread): 每次生成创建新线程
-
阻塞点:
generate_image_async()禁用按钮直到任务完成 - 无队列: 无法提交多个任务排队
特殊情况
- 图片生成和款式设计使用相同的
ImageGenerationWorker - 两个 tab 独立触发生成,无共享状态管理
复杂度问题
- 当前架构简单但不可扩展
- 缺少全局任务状态管理
️ 技术方案 (方案A)
架构概览
┌─────────────────────────────────────────┐
│ MainWindow │
│ ┌────────────────────────────────┐ │
│ │ Tab: 图片生成 | 款式设计 │ │
│ └────────────────────────────────┘ │
│ ┌────────────────────────────────┐ │
│ │ TaskQueueWidget (可折叠) │ ← 新增
│ │ ● 任务队列: [生成x1] [设计x2] │ │
│ │ 正在处理: 生成银戒图片... │ │
│ │ [展开▼] │ │
│ └────────────────────────────────┘ │
└─────────────────────────────────────────┘
数据流:
Tab (生成/设计)
↓ submit_task()
TaskQueueManager (单例)
↓ enqueue()
Queue<Task> → Worker Pool
↓ on_complete
TaskQueueWidget (UI 更新)
核心组件设计
1. 数据结构
Task (任务模型)
from dataclasses import dataclass
from datetime import datetime
from typing import Optional, List
from enum import Enum
class TaskType(Enum):
IMAGE_GENERATION = "image_gen"
STYLE_DESIGN = "style_design"
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
@dataclass
class Task:
"""任务数据结构"""
# 标识
id: str # UUID
type: TaskType
status: TaskStatus
# 输入参数
prompt: str
api_key: str
reference_images: List[str] # 图片路径列表
aspect_ratio: str
image_size: str
model: str
# 时间戳
created_at: datetime
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
# 结果
result_bytes: Optional[bytes] = None # 生成的图片数据
error_message: Optional[str] = None
# UI 相关
thumbnail: Optional[bytes] = None # 缩略图 (50x50)
progress: float = 0.0 # 0.0 ~ 1.0
品味评分: 🟢 好品味
- 消除特殊情况:统一图片生成和款式设计的任务模型
- 清晰的所有权:Task 拥有所有输入和输出数据
- 不可变 ID:UUID 保证唯一性
2. TaskQueueManager (任务队列管理器)
from PySide6.QtCore import QObject, Signal
from queue import Queue
from threading import Lock
import uuid
class TaskQueueManager(QObject):
"""
单例任务队列管理器
管理所有图像生成任务的生命周期
"""
# Signals
task_added = Signal(Task)
task_started = Signal(str) # task_id
task_completed = Signal(str, bytes) # task_id, image_bytes
task_failed = Signal(str, str) # task_id, error_message
task_progress = Signal(str, float, str) # task_id, progress, status_text
_instance = None
_lock = Lock()
def __new__(cls):
"""单例模式"""
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if hasattr(self, '_initialized'):
return
super().__init__()
self._tasks = {} # task_id -> Task
self._queue = Queue() # 待处理任务队列
self._current_worker = None
self._max_queue_size = 10
self._initialized = True
def submit_task(
self,
task_type: TaskType,
prompt: str,
api_key: str,
reference_images: List[str],
aspect_ratio: str,
image_size: str,
model: str
) -> str:
"""
提交新任务到队列
Returns:
task_id: 任务唯一标识
"""
# 检查队列容量
if self._queue.qsize() >= self._max_queue_size:
raise RuntimeError(f"任务队列已满 (最大 {self._max_queue_size} 个)")
# 创建任务
task = Task(
id=str(uuid.uuid4()),
type=task_type,
status=TaskStatus.PENDING,
prompt=prompt,
api_key=api_key,
reference_images=reference_images,
aspect_ratio=aspect_ratio,
image_size=image_size,
model=model,
created_at=datetime.now()
)
self._tasks[task.id] = task
self._queue.put(task.id)
self.task_added.emit(task)
# 如果没有正在运行的任务,启动处理
if self._current_worker is None or not self._current_worker.isRunning():
self._process_next()
return task.id
def _process_next(self):
"""处理队列中的下一个任务"""
if self._queue.empty():
return
task_id = self._queue.get()
task = self._tasks[task_id]
task.status = TaskStatus.RUNNING
task.started_at = datetime.now()
# 创建 worker
self._current_worker = ImageGenerationWorker(
task.api_key,
task.prompt,
task.reference_images,
task.aspect_ratio,
task.image_size,
task.model
)
# 绑定信号
self._current_worker.finished.connect(
lambda img_bytes, *args: self._on_task_completed(task_id, img_bytes)
)
self._current_worker.error.connect(
lambda error: self._on_task_failed(task_id, error)
)
self._current_worker.progress.connect(
lambda status: self.task_progress.emit(task_id, 0.5, status)
)
self.task_started.emit(task_id)
self._current_worker.start()
def _on_task_completed(self, task_id: str, image_bytes: bytes):
"""任务完成回调"""
task = self._tasks[task_id]
task.status = TaskStatus.COMPLETED
task.completed_at = datetime.now()
task.result_bytes = image_bytes
# 生成缩略图
task.thumbnail = self._create_thumbnail(image_bytes)
self.task_completed.emit(task_id, image_bytes)
# 处理下一个任务
self._process_next()
def _on_task_failed(self, task_id: str, error: str):
"""任务失败回调"""
task = self._tasks[task_id]
task.status = TaskStatus.FAILED
task.completed_at = datetime.now()
task.error_message = error
self.task_failed.emit(task_id, error)
# 处理下一个任务
self._process_next()
def _create_thumbnail(self, image_bytes: bytes) -> bytes:
"""创建缩略图 (50x50)"""
from PIL import Image
import io
img = Image.open(io.BytesIO(image_bytes))
img.thumbnail((50, 50))
thumb_io = io.BytesIO()
img.save(thumb_io, format='PNG')
return thumb_io.getvalue()
def get_task(self, task_id: str) -> Optional[Task]:
"""获取任务详情"""
return self._tasks.get(task_id)
def get_all_tasks(self) -> List[Task]:
"""获取所有任务"""
return list(self._tasks.values())
def get_pending_count(self) -> int:
"""获取等待中任务数"""
return sum(1 for t in self._tasks.values() if t.status == TaskStatus.PENDING)
def get_running_count(self) -> int:
"""获取运行中任务数"""
return sum(1 for t in self._tasks.values() if t.status == TaskStatus.RUNNING)
def cancel_task(self, task_id: str):
"""取消任务 (仅等待中任务)"""
task = self._tasks.get(task_id)
if task and task.status == TaskStatus.PENDING:
task.status = TaskStatus.CANCELLED
# TODO: 从队列中移除
品味评分: 🟢 好品味
- 单例模式: 全局唯一队列管理器
- 信号驱动: 松耦合,UI 通过信号更新
- 清晰职责: 只管理任务生命周期,不关心 UI
3. TaskQueueWidget (UI 组件)
class TaskQueueWidget(QWidget):
"""
底部任务队列状态栏
默认折叠,显示摘要信息
"""
def __init__(self, manager: TaskQueueManager, parent=None):
super().__init__(parent)
self.manager = manager
self.expanded = False
self._setup_ui()
self._connect_signals()
def _setup_ui(self):
"""构建 UI"""
layout = QVBoxLayout()
layout.setContentsMargins(10, 5, 10, 5)
layout.setSpacing(5)
# === 折叠状态 (默认) ===
self.summary_widget = QWidget()
summary_layout = QHBoxLayout()
summary_layout.setContentsMargins(0, 0, 0, 0)
# 状态标签
self.status_label = QLabel("● 就绪")
self.status_label.setStyleSheet("QLabel { color: #34C759; }")
# 队列摘要
self.queue_summary = QLabel("任务队列: 无任务")
# 展开/折叠按钮
self.toggle_btn = QPushButton("展开 ▼")
self.toggle_btn.setMaximumWidth(80)
self.toggle_btn.clicked.connect(self.toggle_expanded)
summary_layout.addWidget(self.status_label)
summary_layout.addWidget(self.queue_summary)
summary_layout.addStretch()
summary_layout.addWidget(self.toggle_btn)
self.summary_widget.setLayout(summary_layout)
layout.addWidget(self.summary_widget)
# === 展开状态 (详细列表) ===
self.detail_widget = QWidget()
self.detail_widget.setVisible(False)
detail_layout = QVBoxLayout()
# 任务列表
self.task_list = QListWidget()
self.task_list.setMaximumHeight(200)
detail_layout.addWidget(QLabel("任务详情:"))
detail_layout.addWidget(self.task_list)
self.detail_widget.setLayout(detail_layout)
layout.addWidget(self.detail_widget)
self.setLayout(layout)
# 样式
self.setStyleSheet("""
TaskQueueWidget {
background-color: #f5f5f5;
border-top: 1px solid #ddd;
}
""")
def _connect_signals(self):
"""绑定信号"""
self.manager.task_added.connect(self._on_task_added)
self.manager.task_started.connect(self._on_task_started)
self.manager.task_completed.connect(self._on_task_completed)
self.manager.task_failed.connect(self._on_task_failed)
self.manager.task_progress.connect(self._on_task_progress)
def toggle_expanded(self):
"""切换展开/折叠"""
self.expanded = not self.expanded
self.detail_widget.setVisible(self.expanded)
self.toggle_btn.setText("折叠 ▲" if self.expanded else "展开 ▼")
def _update_summary(self):
"""更新摘要信息"""
pending = self.manager.get_pending_count()
running = self.manager.get_running_count()
if running > 0:
self.status_label.setText("● 运行中")
self.status_label.setStyleSheet("QLabel { color: #FF9500; }")
current_task = next(
(t for t in self.manager.get_all_tasks() if t.status == TaskStatus.RUNNING),
None
)
if current_task:
prompt_preview = current_task.prompt[:20] + "..." if len(current_task.prompt) > 20 else current_task.prompt
self.queue_summary.setText(f"正在处理: {prompt_preview} | 等待: {pending}")
elif pending > 0:
self.status_label.setText("● 等待中")
self.status_label.setStyleSheet("QLabel { color: #007AFF; }")
self.queue_summary.setText(f"任务队列: {pending} 个等待")
else:
self.status_label.setText("● 就绪")
self.status_label.setStyleSheet("QLabel { color: #34C759; }")
self.queue_summary.setText("任务队列: 无任务")
def _on_task_added(self, task: Task):
"""任务添加回调"""
self._update_summary()
if self.expanded:
self._add_task_item(task)
def _on_task_started(self, task_id: str):
"""任务开始回调"""
self._update_summary()
def _on_task_completed(self, task_id: str, image_bytes: bytes):
"""任务完成回调"""
self._update_summary()
# 可选: 显示通知
task = self.manager.get_task(task_id)
if task:
self.status_label.setText(f"✓ 完成: {task.prompt[:15]}...")
QTimer.singleShot(3000, self._update_summary) # 3秒后恢复
def _on_task_failed(self, task_id: str, error: str):
"""任务失败回调"""
self._update_summary()
self.status_label.setText(f"✗ 失败: {error[:20]}...")
self.status_label.setStyleSheet("QLabel { color: #FF3B30; }")
QTimer.singleShot(3000, self._update_summary)
def _on_task_progress(self, task_id: str, progress: float, status_text: str):
"""任务进度回调"""
# 更新状态文本
task = self.manager.get_task(task_id)
if task:
prompt_preview = task.prompt[:15] + "..." if len(task.prompt) > 15 else task.prompt
self.queue_summary.setText(f"正在处理: {prompt_preview} - {status_text}")
def _add_task_item(self, task: Task):
"""添加任务到列表"""
item = QListWidgetItem()
item_widget = QWidget()
item_layout = QHBoxLayout()
status_icon = "●" if task.status == TaskStatus.RUNNING else "○"
status_color = "#FF9500" if task.status == TaskStatus.RUNNING else "#8E8E93"
status_label = QLabel(status_icon)
status_label.setStyleSheet(f"QLabel {{ color: {status_color}; }}")
prompt_label = QLabel(task.prompt[:30] + "..." if len(task.prompt) > 30 else task.prompt)
item_layout.addWidget(status_label)
item_layout.addWidget(prompt_label)
item_layout.addStretch()
item_widget.setLayout(item_layout)
item.setSizeHint(item_widget.sizeHint())
self.task_list.addItem(item)
self.task_list.setItemWidget(item, item_widget)
品味评分: 🟢 好品味
- 默认折叠: 不干扰用户 80% 的单任务场景
- 响应式更新: 通过信号自动更新 UI,无需轮询
- 清晰的视觉层级: 折叠 = 摘要,展开 = 详细
集成方式
MainWindow 修改
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
# ... 现有初始化代码 ...
# === 新增:任务队列管理器 ===
self.task_manager = TaskQueueManager()
# ... tab 创建代码 ...
# === 新增:底部任务队列 UI ===
self.task_queue_widget = TaskQueueWidget(self.task_manager)
main_layout.addWidget(self.task_queue_widget) # 添加到底部
central_widget.setLayout(main_layout)
图片生成 Tab 修改
def generate_image_async(self):
"""提交任务到队列,不再阻塞 UI"""
try:
task_id = self.task_manager.submit_task(
task_type=TaskType.IMAGE_GENERATION,
prompt=self.prompt_text.toPlainText().strip(),
api_key=self.api_key,
reference_images=self.uploaded_images,
aspect_ratio=self.aspect_ratio.currentText(),
image_size=self.image_size.currentText(),
model="gemini-3-pro-image-preview"
)
# 不再禁用按钮,用户可以继续提交任务
self.status_label.setText(f"● 任务已提交 (ID: {task_id[:8]})")
self.status_label.setStyleSheet("QLabel { color: #007AFF; }")
# 绑定完成回调(用于更新当前 tab 的图片显示)
self.task_manager.task_completed.connect(
lambda tid, img: self._on_my_task_completed(task_id, tid, img)
)
except RuntimeError as e:
QMessageBox.warning(self, "队列已满", str(e))
验收标准
功能验收
- 可以连续提交多个任务,不阻塞 UI
- 底部状态栏正确显示队列状态
- 任务按顺序执行
- 任务完成后自动更新对应 tab 的显示
- 队列已满时拒绝新任务并提示用户
性能验收
- 队列操作不阻塞主线程
- UI 更新流畅(60 FPS)
兼容性验收
- 不影响现有历史记录功能
- 不破坏图片生成和款式设计的现有逻辑
潜在破坏点
| 破坏点 | 风险 | 缓解措施 |
|---|---|---|
| ImageGenerationWorker 生命周期 | Worker 被提前释放导致信号失效 | TaskQueueManager 持有 Worker 引用 |
| 信号连接泄漏 | 多次提交任务导致重复信号绑定 | 使用 functools.partial 或检查现有连接 |
| 队列无限增长 | 用户疯狂点击导致内存溢出 | 硬限制最大 10 个任务 |
| 历史记录重复保存 | 任务完成时多个地方保存历史 | 只在 TaskQueueManager 统一保存 |
数据流示意图
用户点击"生成"
↓
Tab.generate_image_async()
↓
TaskQueueManager.submit_task()
↓ (创建 Task 对象)
Queue.put(task_id)
↓ (emit task_added)
TaskQueueWidget._on_task_added() → 更新 UI 摘要
↓
TaskQueueManager._process_next()
↓ (创建 ImageGenerationWorker)
Worker.start()
↓ (emit progress)
TaskQueueWidget._on_task_progress() → 更新状态文本
↓ (生成完成)
Worker.finished
↓ (emit task_completed)
TaskQueueManager._on_task_completed()
↓
[保存历史记录] + TaskQueueWidget._on_task_completed()
↓
Tab._on_my_task_completed() → 显示图片
↓
TaskQueueManager._process_next() → 处理下一个任务
🧪 实现步骤 (分阶段)
Phase 1: 核心数据结构 (1 day)
- 实现
Taskdataclass - 实现
TaskQueueManager(不含 UI) - 单元测试
Phase 2: UI 组件 (1 day)
- 实现
TaskQueueWidget - 集成到 MainWindow
- 视觉测试
Phase 3: 集成现有代码 (1 day)
- 修改图片生成 Tab
- 修改款式设计 Tab
- 回归测试
Phase 4: 打磨 (0.5 day)
- 错误处理
- 边界情况
- 用户体验优化
🧠 关键技术决策
为什么用单例 TaskQueueManager?
-
全局唯一队列,避免多个队列冲突 -
简化代码,Tab 不需要传递队列对象 -
线程安全 (使用 Lock)
为什么用 QThread 而不是 ThreadPoolExecutor?
-
QThread 提供信号机制,与 PySide6 集成好 -
当前代码已使用 QThread,迁移成本低 -
ThreadPoolExecutor 需要额外的信号封装
为什么队列限制 10 个?
-
防止用户疯狂点击导致 API 费用暴涨 -
10 个任务 × 平均 15 秒 = 2.5 分钟,合理等待时间 -
超过 10 个说明用户使用方式不对,应该提示
后续扩展 (可选)
- 任务持久化: 保存到数据库,应用重启后恢复未完成任务
- 优先级队列: 紧急任务插队
- 并发 Worker: 同时运行 2-3 个任务 (需评估 API 限流)
- 任务预览: 点击任务查看详细参数
- 任务导出: 导出任务历史为 CSV
需要确认的问题
用户体验
- 任务完成通知: 是否需要弹窗通知?还是只在状态栏显示 3 秒?
- 结果显示: 任务完成后是否自动切换到对应 tab?
- 失败重试: 是否支持一键重试失败任务?
技术细节
- 历史记录: 任务完成后自动保存到历史记录?还是手动保存?
- 队列容量: 10 个是否合适?需要根据实际使用调整?
品味审查
🟢 好品味
- 数据结构清晰:Task 拥有所有状态
- 消除特殊情况:统一图片生成和款式设计
- 单一职责:Manager 管理任务,Widget 管理 UI
- 信号驱动:松耦合,易测试
🟡 可改进
- 单例模式有争议 (但本场景合理)
- 队列容量硬编码 (可配置化)
无垃圾代码
总结
这个方案:
- 最小复杂度: 只新增 3 个类 (Task, TaskQueueManager, TaskQueueWidget)
- 零破坏性: 不修改现有 ImageGenerationWorker
- 可扩展: 后续可轻松添加优先级、并发等功能
- 用户友好: 默认折叠不干扰,展开可查看详情
实现总行数估算: ~500 行 风险: 低 收益: 高 (显著提升用户体验)