database.py
2.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
"""数据库连接 + 用户认证。
DatabaseManager 只负责 MySQL 连接和 user table 的 SHA256 密码核验。
db_config 由 config_util 加载并传入;本模块不读 config 文件本身。
"""
import hashlib
import logging
import pymysql
def hash_password(password: str) -> str:
"""使用 SHA256 哈希密码"""
return hashlib.sha256(password.encode('utf-8')).hexdigest()
class DatabaseManager:
"""数据库连接管理类"""
def __init__(self, db_config):
self.config = db_config
self.logger = logging.getLogger(__name__)
def authenticate(self, username, password):
"""
验证用户凭证
返回: (success: bool, message: str)
"""
try:
self.logger.info(f"开始用户认证: {username}")
password_hash = hash_password(password)
self.logger.debug(f"连接数据库: {self.config['host']}:{self.config.get('port', 3306)}")
conn = pymysql.connect(
host=self.config['host'],
port=self.config.get('port', 3306),
user=self.config['user'],
password=self.config['password'],
database=self.config['database'],
connect_timeout=5
)
try:
with conn.cursor() as cursor:
sql = f"SELECT * FROM {self.config['table']} WHERE user_name=%s AND passwd=%s AND status='active'"
cursor.execute(sql, (username, password_hash))
result = cursor.fetchone()
if result:
self.logger.info(f"用户认证成功: {username}")
return True, "认证成功"
else:
self.logger.warning(f"用户认证失败: {username} - 用户名或密码错误")
return False, "用户名或密码错误"
finally:
conn.close()
except pymysql.OperationalError as e:
error_msg = "无法连接到服务器,请检查网络连接"
self.logger.error(f"数据库连接失败: {e}")
return False, error_msg
except Exception as e:
error_msg = f"认证失败: {str(e)}"
self.logger.error(f"认证过程异常: {e}")
return False, error_msg