database.py 2.58 KB
"""数据库连接 + 用户认证。

DatabaseManager 只负责 MySQL 连接和 user table 的 SHA256 密码核验。
db_config 由 config_util 加载并传入;本模块不读 config 文件本身。
"""
import hashlib
import logging

import pymysql


def hash_password(password: str) -> str:
    """使用 SHA256 哈希密码"""
    return hashlib.sha256(password.encode('utf-8')).hexdigest()


class DatabaseManager:
    """数据库连接管理类"""

    def __init__(self, db_config):
        self.config = db_config
        self.logger = logging.getLogger(__name__)

    def authenticate(self, username, password):
        """验证明文密码(前端首次输入)。返回 (success, message)。"""
        return self._do_query(username, hash_password(password))

    def authenticate_with_hash(self, username, password_hash):
        """直接用已存的 hash 登录("记住密码"路径)。返回 (success, message)。"""
        return self._do_query(username, password_hash)

    def _do_query(self, username, password_hash):
        try:
            self.logger.info(f"开始用户认证: {username}")
            self.logger.debug(f"连接数据库: {self.config['host']}:{self.config.get('port', 3306)}")
            conn = pymysql.connect(
                host=self.config['host'],
                port=self.config.get('port', 3306),
                user=self.config['user'],
                password=self.config['password'],
                database=self.config['database'],
                connect_timeout=5
            )

            try:
                with conn.cursor() as cursor:
                    sql = f"SELECT * FROM {self.config['table']} WHERE user_name=%s AND passwd=%s AND status='active'"
                    cursor.execute(sql, (username, password_hash))
                    result = cursor.fetchone()

                    if result:
                        self.logger.info(f"用户认证成功: {username}")
                        return True, "认证成功"
                    else:
                        self.logger.warning(f"用户认证失败: {username} - 用户名或密码错误")
                        return False, "用户名或密码错误"
            finally:
                conn.close()

        except pymysql.OperationalError as e:
            error_msg = "无法连接到服务器,请检查网络连接"
            self.logger.error(f"数据库连接失败: {e}")
            return False, error_msg
        except Exception as e:
            error_msg = f"认证失败: {str(e)}"
            self.logger.exception("认证过程异常")
            return False, error_msg