taskqueue.py
5.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
"""TaskQueueBridge — 任务队列 sidebar 桥层。
QML sidebar ListView 直接用 taskQueue.model(QAbstractListModel),
桥层监听 TaskQueueManager 单例信号,按 task_id 增量更新 model。
只持有最近 N 条(与 TaskQueueManager 自身的 _max_history_size 一致),
更老的任务会随 TaskQueueManager._cleanup_old_tasks 自然消失。
"""
import logging
from datetime import datetime
from typing import Dict, List
from PySide6.QtCore import (
Property, QAbstractListModel, QModelIndex, QObject, Qt, Signal, Slot,
)
class _TaskListModel(QAbstractListModel):
"""暴露给 QML ListView 的任务列表模型。
Roles:taskId / prompt / status / progress / statusText / elapsed
"""
TaskIdRole = Qt.UserRole + 1
PromptRole = Qt.UserRole + 2
StatusRole = Qt.UserRole + 3
ProgressRole = Qt.UserRole + 4
StatusTextRole = Qt.UserRole + 5
ElapsedRole = Qt.UserRole + 6
def __init__(self, parent=None):
super().__init__(parent)
self._ids: List[str] = []
self._rows: Dict[str, dict] = {}
def rowCount(self, parent=QModelIndex()) -> int:
if parent.isValid():
return 0
return len(self._ids)
def roleNames(self):
return {
_TaskListModel.TaskIdRole: b"taskId",
_TaskListModel.PromptRole: b"prompt",
_TaskListModel.StatusRole: b"status",
_TaskListModel.ProgressRole: b"progress",
_TaskListModel.StatusTextRole: b"statusText",
_TaskListModel.ElapsedRole: b"elapsed",
}
def data(self, index: QModelIndex, role: int = Qt.DisplayRole):
if not index.isValid():
return None
row = index.row()
if row < 0 or row >= len(self._ids):
return None
record = self._rows.get(self._ids[row], {})
if role == _TaskListModel.TaskIdRole:
return record.get("task_id", "")
if role == _TaskListModel.PromptRole:
return record.get("prompt", "")
if role == _TaskListModel.StatusRole:
return record.get("status", "")
if role == _TaskListModel.ProgressRole:
return float(record.get("progress", 0.0))
if role == _TaskListModel.StatusTextRole:
return record.get("status_text", "")
if role == _TaskListModel.ElapsedRole:
return record.get("elapsed", "")
return None
# ---- 增量操作(桥层调用)---------------------------------------------
def upsert(self, task_id: str, **fields) -> None:
if task_id in self._rows:
self._rows[task_id].update(fields)
row = self._ids.index(task_id)
top = self.index(row, 0)
self.dataChanged.emit(top, top)
else:
self.beginInsertRows(QModelIndex(), 0, 0)
self._ids.insert(0, task_id)
self._rows[task_id] = {"task_id": task_id, **fields}
self.endInsertRows()
class TaskQueueBridge(QObject):
pendingCountChanged = Signal()
runningCountChanged = Signal()
def __init__(self, task_queue_manager, parent=None):
super().__init__(parent)
self._logger = logging.getLogger(__name__)
self._tqm = task_queue_manager
self._model = _TaskListModel(self)
self._tqm.task_added.connect(self._on_task_added)
self._tqm.task_started.connect(self._on_task_started)
self._tqm.task_completed.connect(self._on_task_completed)
self._tqm.task_failed.connect(self._on_task_failed)
self._tqm.task_progress.connect(self._on_progress)
# ---- Properties -----------------------------------------------------
@Property(QObject, constant=True)
def model(self):
return self._model
@Property(int, notify=pendingCountChanged)
def pendingCount(self) -> int:
return self._tqm.get_pending_count()
@Property(int, notify=runningCountChanged)
def runningCount(self) -> int:
return self._tqm.get_running_count()
# ---- Slots ----------------------------------------------------------
@Slot(str)
def cancelTask(self, task_id: str) -> None:
self._tqm.cancel_task(task_id)
self.pendingCountChanged.emit()
self.runningCountChanged.emit()
# ---- 信号转 model 增量 -----------------------------------------------
def _on_task_added(self, task) -> None:
self._model.upsert(
task.id,
prompt=task.prompt,
status="pending",
progress=0.0,
status_text="等待中",
elapsed="",
)
self.pendingCountChanged.emit()
def _on_task_started(self, task_id: str) -> None:
self._model.upsert(task_id, status="running", status_text="生成中…")
self.pendingCountChanged.emit()
self.runningCountChanged.emit()
def _on_progress(self, task_id: str, progress: float, status_text: str) -> None:
self._model.upsert(task_id, progress=progress, status_text=status_text)
def _on_task_completed(self, task_id, *_args) -> None:
elapsed = self._format_elapsed(task_id)
self._model.upsert(task_id, status="completed", progress=1.0,
status_text="已完成", elapsed=elapsed)
self.runningCountChanged.emit()
def _on_task_failed(self, task_id: str, error: str) -> None:
elapsed = self._format_elapsed(task_id)
self._model.upsert(task_id, status="failed", status_text=error or "失败",
elapsed=elapsed)
self.pendingCountChanged.emit()
self.runningCountChanged.emit()
def _format_elapsed(self, task_id: str) -> str:
task = self._tqm.get_task(task_id)
if task and task.started_at and task.completed_at:
secs = (task.completed_at - task.started_at).total_seconds()
return f"{secs:.1f}s"
return ""