02df9569 by 柴进

增加定时重建的相关代码

1 parent 26d3e1c7
......@@ -19,6 +19,8 @@ from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
import numpy as np
from dotenv import load_dotenv
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
# 尝试导入不同的 JWT 库
try:
......@@ -50,6 +52,7 @@ db_manager = None
search_engine = None
data_sync = None
sync_thread = None
scheduler = None
os.environ['KMP_DUPLICATE_LIB_OK'] = 'TRUE'
......@@ -77,10 +80,20 @@ def load_config():
return config
def scheduled_sync():
"""定时任务:每天 0:00 和 12:00 执行增量同步"""
try:
logger.info("🔄 定时任务:开始增量同步")
result = data_sync.sync_once()
logger.info(f"✅ 定时同步完成: {result}")
except Exception as e:
logger.error(f"❌ 定时同步失败: {e}", exc_info=True)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用生命周期管理"""
global config, db_manager, search_engine, data_sync, sync_thread
global config, db_manager, search_engine, data_sync, sync_thread, scheduler
logger.info("启动 Design Image Search 服务...")
......@@ -134,12 +147,26 @@ async def lifespan(app: FastAPI):
sync_thread.start()
logger.info("后台数据同步线程已启动")
# 启动定时任务(每天 0:00 和 12:00)
scheduler = BackgroundScheduler()
scheduler.add_job(
func=scheduled_sync,
trigger=CronTrigger(hour='0,12', minute='0'),
id='scheduled_sync',
replace_existing=True
)
scheduler.start()
logger.info("定时任务已启动(每天 0:00 和 12:00 执行)")
logger.info("Design Image Search 服务启动完成")
yield
# 清理代码
logger.info("正在关闭 Design Image Search 服务...")
if scheduler and scheduler.running:
scheduler.shutdown()
logger.info("定时任务已关闭")
# 创建 FastAPI 应用
......@@ -431,6 +458,48 @@ async def trigger_sync(token: Dict = Depends(verify_token)):
)
@app.post("/admin/rebuild-all")
async def rebuild_all(token: Dict = Depends(verify_token)):
"""
全量重建(临时将 last_sync_time 设为 1970,复用增量同步逻辑)
Args:
token: JWT 认证信息(自动注入)
Returns:
Dict: 重建结果
"""
try:
from datetime import datetime
logger.info("🔄 手动触发全量重建")
# 临时保存当前的 last_sync_time
original_sync_time = db_manager.get_last_sync_time()
# 临时设置为 1970-01-01(获取所有历史数据)
db_manager.update_sync_time(datetime(1970, 1, 1), 0)
try:
# 调用增量同步(但会处理所有数据)
result = data_sync.sync_once()
return {
"success": True,
"message": "全量重建完成",
"result": result
}
except Exception as e:
# 失败时恢复原来的 sync_time
if original_sync_time:
db_manager.update_sync_time(original_sync_time, 0)
raise e
except Exception as e:
logger.error(f"❌ 全量重建失败: {e}", exc_info=True)
raise HTTPException(500, f"重建失败: {str(e)}")
# 配置 CORS
app.add_middleware(
CORSMiddleware,
......
......@@ -396,6 +396,28 @@ class FAISSManager:
except Exception as e:
logger.error(f"保存索引失败: {e}")
def rebuild_index(self, db_manager):
"""
重建索引(清理墓碑标记的向量)
这是 compact_index 的别名方法,用于保持API兼容性
Args:
db_manager: 数据库管理器实例或数据库路径
Returns:
bool: 是否成功
"""
# 如果传入的是数据库管理器对象,获取其路径
if hasattr(db_manager, 'db_path'):
db_path = db_manager.db_path
else:
# 假设传入的是数据库路径字符串
db_path = db_manager
logger.info("开始重建索引(清理墓碑)...")
return self.compact_index(db_path)
def get_stats(self):
"""获取索引统计信息"""
return {
......
......@@ -14,6 +14,7 @@ Pillow>=10.0.0
# Database
PyMySQL>=1.1.0
# sqlite3 is built-in to Python, no need to install
# JWT Authentication
python-jose[cryptography]>=3.3.0
......@@ -26,7 +27,10 @@ requests>=2.31.0
# Utilities
python-dotenv>=1.0.0
pyyaml>=6.0.1
numpy>=1.24.0,<2.0
numpy>=1.24.0,<2.0.0
# Logging & Monitoring
structlog>=23.1.0
# Task Scheduling
apscheduler==3.10.4
......
......@@ -4,7 +4,7 @@ uvicorn[standard]>=0.24.0
python-multipart>=0.0.6
# Machine Learning & Computer Vision
# 使用版本范围,避免CUDA依赖(镜像源会自动提供CPU版本)
# Use version ranges to avoid CUDA dependencies (CPU version from mirror)
torch>=2.0.0,<2.1.0
torchvision>=0.15.0,<0.16.0
faiss-cpu>=1.7.4
......@@ -26,7 +26,10 @@ requests>=2.31.0
# Utilities
python-dotenv>=1.0.0
pyyaml>=6.0.1
numpy>=1.24.0
numpy>=1.24.0,<2.0.0
# Logging & Monitoring
structlog>=23.1.0
\ No newline at end of file
structlog>=23.1.0
# Task Scheduling
apscheduler==3.10.4
......