auth.py 3.41 KB
"""AuthBridge — 登录认证 + 当前用户。

替代旧 LoginDialog。QML LoginScreen 调 auth.login(user, pwd),桥内部走
DatabaseManager.authenticate(同步 MySQL,5s timeout)+ audit_logger.log_login。

PoC 模式(db_config = None):接受任意非空用户名密码,便于无 db 环境调 UI。
"""
import logging
import platform
import socket
from typing import Optional

from PySide6.QtCore import Property, QObject, Signal, Slot

from core.database import DatabaseManager


class AuthBridge(QObject):
    loggedInChanged = Signal()
    currentUserChanged = Signal()
    loginFailed = Signal(str)  # error_message

    def __init__(self, db_config: Optional[dict] = None, audit_logger=None, parent=None):
        super().__init__(parent)
        self._logger = logging.getLogger(__name__)
        self._db_config = db_config
        self._db = DatabaseManager(db_config) if db_config else None
        self._audit = audit_logger
        self._logged_in = False
        self._current_user = ""

    @Property(bool, notify=loggedInChanged)
    def loggedIn(self) -> bool:
        return self._logged_in

    @Property(str, notify=currentUserChanged)
    def currentUser(self) -> str:
        return self._current_user

    @Slot(str, str, result=bool)
    def login(self, username: str, password: str) -> bool:
        username = (username or "").strip()
        if not username or not password:
            self.loginFailed.emit("用户名和密码不能为空")
            return False

        # PoC 模式:无 db_config 时接受任意非空
        if self._db is None:
            self._on_login_success(username)
            return True

        ok, msg = self._db.authenticate(username, password)
        if not ok:
            self._logger.warning(f"登录失败: {username} - {msg}")
            self.loginFailed.emit(msg)
            return False

        self._on_login_success(username)
        return True

    @Slot()
    def logout(self) -> None:
        self._logged_in = False
        self._current_user = ""
        self.loggedInChanged.emit()
        self.currentUserChanged.emit()

    @Slot(result=str)
    def deviceName(self) -> str:
        """供 audit 日志和 ImageGenBridge 使用"""
        try:
            return socket.gethostname() or platform.node() or "unknown"
        except Exception:
            return "unknown"

    # ---- 内部 -----------------------------------------------------------

    def _on_login_success(self, username: str) -> None:
        self._current_user = username
        self._logged_in = True
        self.currentUserChanged.emit()
        self.loggedInChanged.emit()
        self._logger.info(f"登录成功: {username}")

        if self._audit is not None:
            try:
                self._audit.log_login(
                    user_name=username,
                    local_ip=self._get_local_ip(),
                    public_ip=None,  # public ip 走慢路径,task #13 再加
                    device_name=self.deviceName(),
                )
            except Exception as e:
                self._logger.warning(f"audit log_login 失败(不影响登录): {e}")

    @staticmethod
    def _get_local_ip() -> Optional[str]:
        try:
            with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
                s.settimeout(0.5)
                s.connect(("8.8.8.8", 80))
                return s.getsockname()[0]
        except Exception:
            return None