68b19c6f by 柴进

增加队列模式,支持同时做多个任务

1 parent 3689be68
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
14 "database": "saas_user", 14 "database": "saas_user",
15 "table": "nano_banana_users" 15 "table": "nano_banana_users"
16 }, 16 },
17 "last_user": "testuser", 17 "last_user": "chaijin",
18 "saved_password_hash": "50630320e4a550f2dba371820dad9d9301d456d101aca4d5ad8f4f3bcc9c1ed9", 18 "saved_password_hash": "50630320e4a550f2dba371820dad9d9301d456d101aca4d5ad8f4f3bcc9c1ed9",
19 "logging_config": { 19 "logging_config": {
20 "enabled": true, 20 "enabled": true,
......
1 """
2 任务队列系统
3 提供异步图像生成任务的队列管理和 UI 组件
4 """
5
6 from dataclasses import dataclass, field
7 from datetime import datetime
8 from enum import Enum
9 from typing import Optional, List, Dict
10 from queue import Queue
11 from threading import Lock
12 import uuid
13 import logging
14 import io
15
16 from PySide6.QtCore import QObject, Signal, QTimer, Qt
17 from PySide6.QtWidgets import (
18 QWidget, QVBoxLayout, QHBoxLayout, QLabel,
19 QPushButton, QListWidget, QListWidgetItem, QDialog, QScrollArea, QFrame
20 )
21 from PySide6.QtGui import QPixmap, QMouseEvent
22 from PIL import Image
23
24
25 class TaskType(Enum):
26 """任务类型"""
27 IMAGE_GENERATION = "image_gen"
28 STYLE_DESIGN = "style_design"
29
30
31 class TaskStatus(Enum):
32 """任务状态"""
33 PENDING = "pending"
34 RUNNING = "running"
35 COMPLETED = "completed"
36 FAILED = "failed"
37 CANCELLED = "cancelled"
38
39
40 @dataclass
41 class Task:
42 """任务数据模型"""
43 # 标识
44 id: str
45 type: TaskType
46 status: TaskStatus
47
48 # 输入参数
49 prompt: str
50 api_key: str
51 reference_images: List[str]
52 aspect_ratio: str
53 image_size: str
54 model: str
55
56 # 时间戳
57 created_at: datetime
58 started_at: Optional[datetime] = None
59 completed_at: Optional[datetime] = None
60
61 # 结果
62 result_bytes: Optional[bytes] = None
63 error_message: Optional[str] = None
64
65 # UI 相关
66 thumbnail: Optional[bytes] = None
67 progress: float = 0.0
68
69
70 class TaskQueueManager(QObject):
71 """
72 单例任务队列管理器
73 管理所有图像生成任务的生命周期
74 """
75 # Signals
76 task_added = Signal(Task)
77 task_started = Signal(str) # task_id
78 task_completed = Signal(str, bytes, str, list, str, str, str) # task_id, image_bytes, prompt, ref_images, aspect_ratio, image_size, model
79 task_failed = Signal(str, str) # task_id, error_message
80 task_progress = Signal(str, float, str) # task_id, progress, status_text
81
82 _instance = None
83 _lock = Lock()
84
85 def __new__(cls):
86 """单例模式"""
87 if cls._instance is None:
88 with cls._lock:
89 if cls._instance is None:
90 cls._instance = super().__new__(cls)
91 return cls._instance
92
93 def __init__(self):
94 if hasattr(self, '_initialized'):
95 return
96 super().__init__()
97
98 self.logger = logging.getLogger(__name__)
99 self._tasks: Dict[str, Task] = {}
100 self._queue = Queue()
101 self._current_worker = None
102 self._max_queue_size = 10
103 self._max_history_size = 10 # 只保留最近10条完成任务
104
105 self._initialized = True
106 self.logger.info("TaskQueueManager 初始化完成")
107
108 def submit_task(
109 self,
110 task_type: TaskType,
111 prompt: str,
112 api_key: str,
113 reference_images: List[str],
114 aspect_ratio: str,
115 image_size: str,
116 model: str
117 ) -> str:
118 """
119 提交新任务到队列
120
121 Args:
122 task_type: 任务类型
123 prompt: 图片描述
124 api_key: API 密钥
125 reference_images: 参考图片路径列表
126 aspect_ratio: 宽高比
127 image_size: 图片尺寸
128 model: 模型名称
129
130 Returns:
131 task_id: 任务唯一标识
132
133 Raises:
134 RuntimeError: 队列已满
135 """
136 # 检查队列容量
137 if self._queue.qsize() >= self._max_queue_size:
138 raise RuntimeError(f"任务队列已满 (最大 {self._max_queue_size} 个)")
139
140 # 创建任务
141 task = Task(
142 id=str(uuid.uuid4()),
143 type=task_type,
144 status=TaskStatus.PENDING,
145 prompt=prompt,
146 api_key=api_key,
147 reference_images=reference_images.copy() if reference_images else [],
148 aspect_ratio=aspect_ratio,
149 image_size=image_size,
150 model=model,
151 created_at=datetime.now()
152 )
153
154 self._tasks[task.id] = task
155 self._queue.put(task.id)
156
157 self.logger.info(f"任务已提交: {task.id[:8]} - {prompt[:30]}")
158 self.task_added.emit(task)
159
160 # 如果没有正在运行的任务,启动处理
161 if self._current_worker is None or not self._current_worker.isRunning():
162 self._process_next()
163
164 return task.id
165
166 def _process_next(self):
167 """处理队列中的下一个任务"""
168 if self._queue.empty():
169 self.logger.debug("队列为空,无任务处理")
170 return
171
172 task_id = self._queue.get()
173 task = self._tasks[task_id]
174 task.status = TaskStatus.RUNNING
175 task.started_at = datetime.now()
176
177 self.logger.info(f"开始处理任务: {task_id[:8]}")
178
179 # 导入 ImageGenerationWorker
180 from image_generator import ImageGenerationWorker
181
182 # 创建 worker
183 self._current_worker = ImageGenerationWorker(
184 task.api_key,
185 task.prompt,
186 task.reference_images,
187 task.aspect_ratio,
188 task.image_size,
189 task.model
190 )
191
192 # 绑定信号
193 self._current_worker.finished.connect(
194 lambda img_bytes, prompt, ref_imgs, ar, size, model:
195 self._on_task_completed(task_id, img_bytes, prompt, ref_imgs, ar, size, model)
196 )
197 self._current_worker.error.connect(
198 lambda error: self._on_task_failed(task_id, error)
199 )
200 self._current_worker.progress.connect(
201 lambda status: self.task_progress.emit(task_id, 0.5, status)
202 )
203
204 self.task_started.emit(task_id)
205 self._current_worker.start()
206
207 def _on_task_completed(self, task_id: str, image_bytes: bytes, prompt: str,
208 reference_images: list, aspect_ratio: str, image_size: str, model: str):
209 """任务完成回调"""
210 task = self._tasks.get(task_id)
211 if not task:
212 self.logger.error(f"任务 {task_id[:8]} 不存在")
213 return
214
215 task.status = TaskStatus.COMPLETED
216 task.completed_at = datetime.now()
217 task.result_bytes = image_bytes
218
219 # 生成缩略图
220 try:
221 task.thumbnail = self._create_thumbnail(image_bytes)
222 except Exception as e:
223 self.logger.warning(f"生成缩略图失败: {e}")
224
225 elapsed = (task.completed_at - task.started_at).total_seconds()
226 self.logger.info(f"任务完成: {task_id[:8]} - 耗时 {elapsed:.1f}s")
227
228 self.task_completed.emit(task_id, image_bytes, prompt, reference_images,
229 aspect_ratio, image_size, model)
230
231 # 清理旧任务历史,只保留最近的完成任务
232 self._cleanup_old_tasks()
233
234 # 处理下一个任务
235 self._process_next()
236
237 def _on_task_failed(self, task_id: str, error: str):
238 """任务失败回调"""
239 task = self._tasks.get(task_id)
240 if not task:
241 self.logger.error(f"任务 {task_id[:8]} 不存在")
242 return
243
244 task.status = TaskStatus.FAILED
245 task.completed_at = datetime.now()
246 task.error_message = error
247
248 self.logger.error(f"任务失败: {task_id[:8]} - {error}")
249
250 self.task_failed.emit(task_id, error)
251
252 # 清理旧任务历史
253 self._cleanup_old_tasks()
254
255 # 处理下一个任务
256 self._process_next()
257
258 def _cleanup_old_tasks(self):
259 """清理旧任务,只保留最近的完成/失败任务"""
260 # 获取所有已完成和失败的任务,按完成时间排序
261 finished_tasks = [
262 t for t in self._tasks.values()
263 if t.status in [TaskStatus.COMPLETED, TaskStatus.FAILED] and t.completed_at
264 ]
265 finished_tasks.sort(key=lambda t: t.completed_at, reverse=True)
266
267 # 只保留最近的 N 条
268 if len(finished_tasks) > self._max_history_size:
269 tasks_to_remove = finished_tasks[self._max_history_size:]
270 for task in tasks_to_remove:
271 del self._tasks[task.id]
272 self.logger.debug(f"清理旧任务: {task.id[:8]}")
273
274 def _create_thumbnail(self, image_bytes: bytes) -> bytes:
275 """
276 创建缩略图 (50x50)
277
278 Args:
279 image_bytes: 原始图片字节
280
281 Returns:
282 缩略图字节
283 """
284 img = Image.open(io.BytesIO(image_bytes))
285 img.thumbnail((50, 50))
286
287 thumb_io = io.BytesIO()
288 img.save(thumb_io, format='PNG')
289 return thumb_io.getvalue()
290
291 def get_task(self, task_id: str) -> Optional[Task]:
292 """获取任务详情"""
293 return self._tasks.get(task_id)
294
295 def get_all_tasks(self) -> List[Task]:
296 """获取所有任务"""
297 return list(self._tasks.values())
298
299 def get_pending_count(self) -> int:
300 """获取等待中任务数"""
301 return sum(1 for t in self._tasks.values() if t.status == TaskStatus.PENDING)
302
303 def get_running_count(self) -> int:
304 """获取运行中任务数"""
305 return sum(1 for t in self._tasks.values() if t.status == TaskStatus.RUNNING)
306
307 def cancel_task(self, task_id: str):
308 """取消任务 (仅等待中任务)"""
309 task = self._tasks.get(task_id)
310 if task and task.status == TaskStatus.PENDING:
311 task.status = TaskStatus.CANCELLED
312 self.logger.info(f"任务已取消: {task_id[:8]}")
313
314