7a8809ce by 柴进

增加定时重建的相关代码

1 parent 02df9569
...@@ -90,6 +90,60 @@ def scheduled_sync(): ...@@ -90,6 +90,60 @@ def scheduled_sync():
90 logger.error(f"❌ 定时同步失败: {e}", exc_info=True) 90 logger.error(f"❌ 定时同步失败: {e}", exc_info=True)
91 91
92 92
93 def check_faiss_index_update():
94 """检查FAISS索引文件是否有更新,必要时重载"""
95 try:
96 import os
97
98 index_path = config['faiss']['index_path']
99 mapping_path = config['faiss']['mapping_path']
100 tombstone_path = config['faiss']['tombstone_path']
101
102 # 检查所有相关文件是否存在
103 if not all(os.path.exists(p) for p in [index_path, mapping_path, tombstone_path]):
104 return
105
106 # 获取文件修改时间
107 current_mtime = max(
108 os.path.getmtime(index_path),
109 os.path.getmtime(mapping_path),
110 os.path.getmtime(tombstone_path)
111 )
112 # 初始化 FAISS 管理器
113 faiss_manager = FAISSManager(
114 index_path=config['faiss']['index_path'],
115 mapping_path=config['faiss']['mapping_path'],
116 tombstone_path=config['faiss']['tombstone_path'],
117 vector_dim=config['faiss']['vector_dim']
118 )
119
120 # 初始化或检查上次加载时间
121 if not hasattr(check_faiss_index_update, 'last_mtime'):
122 check_faiss_index_update.last_mtime = current_mtime
123 return
124
125 # 如果文件有更新,重载索引
126 if current_mtime > check_faiss_index_update.last_mtime:
127 logger.info("检测到FAISS索引文件更新,重新加载...")
128
129 # 获取重载前的统计信息
130 old_stats = faiss_manager.get_stats()
131
132 if faiss_manager.load_index():
133 check_faiss_index_update.last_mtime = current_mtime
134 new_stats = faiss_manager.get_stats()
135
136 logger.info("✅ FAISS索引重载成功")
137 logger.info(f" 向量数: {old_stats['total_vectors']} -> {new_stats['total_vectors']}")
138 logger.info(f" 墓碑数: {old_stats['tombstone_count']} -> {new_stats['tombstone_count']}")
139 logger.info(f" 有效向量: {old_stats['effective_vectors']} -> {new_stats['effective_vectors']}")
140 else:
141 logger.error("❌ FAISS索引重载失败")
142
143 except Exception as e:
144 logger.error(f"检查FAISS索引更新失败: {e}", exc_info=True)
145
146
93 @asynccontextmanager 147 @asynccontextmanager
94 async def lifespan(app: FastAPI): 148 async def lifespan(app: FastAPI):
95 """应用生命周期管理""" 149 """应用生命周期管理"""
...@@ -155,8 +209,17 @@ async def lifespan(app: FastAPI): ...@@ -155,8 +209,17 @@ async def lifespan(app: FastAPI):
155 id='scheduled_sync', 209 id='scheduled_sync',
156 replace_existing=True 210 replace_existing=True
157 ) 211 )
212
213 # 添加 FAISS 索引检查任务(每5分钟检查一次)
214 scheduler.add_job(
215 func=check_faiss_index_update,
216 trigger=CronTrigger(minute='*/5'),
217 id='check_faiss_update',
218 replace_existing=True
219 )
220
158 scheduler.start() 221 scheduler.start()
159 logger.info("定时任务已启动(每天 0:00 和 12:00 执行)") 222 logger.info("定时任务已启动(每天 0:00 和 12:00 执行,每5分钟检查FAISS索引更新)")
160 223
161 logger.info("Design Image Search 服务启动完成") 224 logger.info("Design Image Search 服务启动完成")
162 225
......
...@@ -416,7 +416,13 @@ class FAISSManager: ...@@ -416,7 +416,13 @@ class FAISSManager:
416 db_path = db_manager 416 db_path = db_manager
417 417
418 logger.info("开始重建索引(清理墓碑)...") 418 logger.info("开始重建索引(清理墓碑)...")
419 return self.compact_index(db_path) 419 result = self.compact_index(db_path)
420
421 if result:
422 logger.info("索引重建完成,内存状态已更新")
423 # 提示:如果是多进程架构,其他进程需要调用 load_index() 来获取最新索引
424
425 return result
420 426
421 def get_stats(self): 427 def get_stats(self):
422 """获取索引统计信息""" 428 """获取索引统计信息"""
......
...@@ -98,6 +98,7 @@ class DesignDataSync: ...@@ -98,6 +98,7 @@ class DesignDataSync:
98 SELECT id, design_no, images, utc_modified 98 SELECT id, design_no, images, utc_modified
99 FROM saas_design.design 99 FROM saas_design.design
100 WHERE eps_id = 2 100 WHERE eps_id = 2
101 AND DELETE_KEY = 0
101 AND utc_modified > %s 102 AND utc_modified > %s
102 ORDER BY utc_modified ASC 103 ORDER BY utc_modified ASC
103 LIMIT %s 104 LIMIT %s
......