auth.py
3.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
"""AuthBridge — 登录认证 + 当前用户。
替代旧 LoginDialog。QML LoginScreen 调 auth.login(user, pwd),桥内部走
DatabaseManager.authenticate(同步 MySQL,5s timeout)+ audit_logger.log_login。
PoC 模式(db_config = None):接受任意非空用户名密码,便于无 db 环境调 UI。
"""
import logging
import platform
import socket
from typing import Optional
from PySide6.QtCore import Property, QObject, Signal, Slot
from core.database import DatabaseManager
class AuthBridge(QObject):
loggedInChanged = Signal()
currentUserChanged = Signal()
loginFailed = Signal(str) # error_message
def __init__(self, db_config: Optional[dict] = None, audit_logger=None, parent=None):
super().__init__(parent)
self._logger = logging.getLogger(__name__)
self._db_config = db_config
self._db = DatabaseManager(db_config) if db_config else None
self._audit = audit_logger
self._logged_in = False
self._current_user = ""
@Property(bool, notify=loggedInChanged)
def loggedIn(self) -> bool:
return self._logged_in
@Property(str, notify=currentUserChanged)
def currentUser(self) -> str:
return self._current_user
@Slot(str, str, result=bool)
def login(self, username: str, password: str) -> bool:
username = (username or "").strip()
if not username or not password:
self.loginFailed.emit("用户名和密码不能为空")
return False
# PoC 模式:无 db_config 时接受任意非空
if self._db is None:
self._on_login_success(username)
return True
ok, msg = self._db.authenticate(username, password)
if not ok:
self._logger.warning(f"登录失败: {username} - {msg}")
self.loginFailed.emit(msg)
return False
self._on_login_success(username)
return True
@Slot()
def logout(self) -> None:
self._logged_in = False
self._current_user = ""
self.loggedInChanged.emit()
self.currentUserChanged.emit()
@Slot(result=str)
def deviceName(self) -> str:
"""供 audit 日志和 ImageGenBridge 使用"""
try:
return socket.gethostname() or platform.node() or "unknown"
except Exception:
return "unknown"
# ---- 内部 -----------------------------------------------------------
def _on_login_success(self, username: str) -> None:
self._current_user = username
self._logged_in = True
self.currentUserChanged.emit()
self.loggedInChanged.emit()
self._logger.info(f"登录成功: {username}")
if self._audit is not None:
try:
self._audit.log_login(
user_name=username,
local_ip=self._get_local_ip(),
public_ip=None, # public ip 走慢路径,task #13 再加
device_name=self.deviceName(),
)
except Exception as e:
self._logger.warning(f"audit log_login 失败(不影响登录): {e}")
@staticmethod
def _get_local_ip() -> Optional[str]:
try:
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.settimeout(0.5)
s.connect(("8.8.8.8", 80))
return s.getsockname()[0]
except Exception:
return None