user_util.py 9.59 KB
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
用户管理工具 - 管理员专用

用法:
  python user_util.py add <username> <password>      # 添加用户
  python user_util.py list                           # 列出用户
  python user_util.py disable <username>             # 禁用用户
  python user_util.py enable <username>              # 启用用户
  python user_util.py reset <username> <password>    # 重置密码

安全提示:
  - 此工具仅供管理员使用,请勿分发给用户
  - 避免在命令行直接输入密码(可被 shell 历史记录)
  - 建议使用环境变量或交互式输入密码
"""

import hashlib
import pymysql
import json
import sys
import os
from pathlib import Path

# Windows 控制台编码修复
if sys.platform == 'win32':
    import codecs
    sys.stdout = codecs.getwriter('utf-8')(sys.stdout.buffer, 'strict')
    sys.stderr = codecs.getwriter('utf-8')(sys.stderr.buffer, 'strict')


def hash_password(password: str) -> str:
    """使用 SHA256 哈希密码"""
    return hashlib.sha256(password.encode('utf-8')).hexdigest()


class UserManager:
    """用户管理类"""
    def __init__(self, db_config):
        self.config = db_config

    def add_user(self, username, password):
        """添加新用户"""
        hashed = hash_password(password)
        conn = None
        try:
            conn = pymysql.connect(
                host=self.config['host'],
                port=self.config.get('port', 3306),
                user=self.config['user'],
                password=self.config['password'],
                database=self.config['database'],
                connect_timeout=5
            )

            with conn.cursor() as cursor:
                sql = f"INSERT INTO {self.config['table']} (user_name, passwd, status) VALUES (%s, %s, %s)"
                cursor.execute(sql, (username, hashed, 'active'))
            conn.commit()
            print(f"✓ 用户 '{username}' 添加成功")
            print(f"  密码哈希: {hashed[:16]}...")

        except pymysql.IntegrityError:
            print(f"✗ 用户 '{username}' 已存在")
        except pymysql.OperationalError as e:
            print(f"✗ 数据库连接失败: {e}")
        except Exception as e:
            print(f"✗ 添加用户失败: {e}")
        finally:
            if conn:
                conn.close()

    def list_users(self):
        """列出所有用户"""
        conn = None
        try:
            conn = pymysql.connect(
                host=self.config['host'],
                port=self.config.get('port', 3306),
                user=self.config['user'],
                password=self.config['password'],
                database=self.config['database'],
                connect_timeout=5
            )

            with conn.cursor() as cursor:
                sql = f"SELECT user_name, passwd, status FROM {self.config['table']}"
                cursor.execute(sql)
                results = cursor.fetchall()

            if not results:
                print("没有找到任何用户")
                return

            # 打印表格
            print("\n用户列表:")
            print("  ┌────────────────────────┬──────────┬──────────────────┐")
            print("  │ 用户名                 │ 状态     │ 密码哈希(前8位)  │")
            print("  ├────────────────────────┼──────────┼──────────────────┤")

            for row in results:
                username = row[0] or ""
                passwd_hash = row[1] or ""
                status = row[2] or "NULL"

                # 填充空格使对齐
                username_display = username[:20].ljust(20)
                status_display = status[:8].ljust(8)
                hash_display = passwd_hash[:16] if passwd_hash else "N/A".ljust(16)

                print(f"  │ {username_display} │ {status_display} │ {hash_display} │")

            print("  └────────────────────────┴──────────┴──────────────────┘")
            print(f"\n总计: {len(results)} 个用户\n")

        except pymysql.OperationalError as e:
            print(f"✗ 数据库连接失败: {e}")
        except Exception as e:
            print(f"✗ 查询用户失败: {e}")
        finally:
            if conn:
                conn.close()

    def disable_user(self, username):
        """禁用用户"""
        self._update_user_status(username, 'disabled', "已禁用")

    def enable_user(self, username):
        """启用用户"""
        self._update_user_status(username, 'active', "已启用")

    def _update_user_status(self, username, status, action_name):
        """更新用户状态"""
        conn = None
        try:
            conn = pymysql.connect(
                host=self.config['host'],
                port=self.config.get('port', 3306),
                user=self.config['user'],
                password=self.config['password'],
                database=self.config['database'],
                connect_timeout=5
            )

            with conn.cursor() as cursor:
                sql = f"UPDATE {self.config['table']} SET status=%s WHERE user_name=%s"
                affected = cursor.execute(sql, (status, username))

            if affected > 0:
                conn.commit()
                print(f"✓ 用户 '{username}' {action_name}")
            else:
                print(f"✗ 用户 '{username}' 不存在")

        except pymysql.OperationalError as e:
            print(f"✗ 数据库连接失败: {e}")
        except Exception as e:
            print(f"✗ 更新用户状态失败: {e}")
        finally:
            if conn:
                conn.close()

    def reset_password(self, username, new_password):
        """重置用户密码"""
        hashed = hash_password(new_password)
        conn = None
        try:
            conn = pymysql.connect(
                host=self.config['host'],
                port=self.config.get('port', 3306),
                user=self.config['user'],
                password=self.config['password'],
                database=self.config['database'],
                connect_timeout=5
            )

            with conn.cursor() as cursor:
                sql = f"UPDATE {self.config['table']} SET passwd=%s WHERE user_name=%s"
                affected = cursor.execute(sql, (hashed, username))

            if affected > 0:
                conn.commit()
                print(f"✓ 用户 '{username}' 密码已重置")
                print(f"  新密码哈希: {hashed[:16]}...")
            else:
                print(f"✗ 用户 '{username}' 不存在")

        except pymysql.OperationalError as e:
            print(f"✗ 数据库连接失败: {e}")
        except Exception as e:
            print(f"✗ 重置密码失败: {e}")
        finally:
            if conn:
                conn.close()


def load_db_config():
    """从 config.json 加载数据库配置"""
    config_path = Path('config.json')

    if not config_path.exists():
        print(f"✗ 配置文件不存在: {config_path}")
        print(f"  请确保在项目目录下运行此工具")
        return None

    try:
        with open(config_path, 'r', encoding='utf-8') as f:
            config = json.load(f)
            db_config = config.get('db_config')

        if not db_config:
            print("✗ 配置文件中未找到 db_config 字段")
            return None

        required_fields = ['host', 'user', 'password', 'database', 'table']
        missing = [f for f in required_fields if f not in db_config]

        if missing:
            print(f"✗ db_config 缺少必需字段: {', '.join(missing)}")
            return None

        return db_config

    except json.JSONDecodeError:
        print(f"✗ 配置文件格式错误: {config_path}")
        return None
    except Exception as e:
        print(f"✗ 加载配置失败: {e}")
        return None


def print_help():
    """打印帮助信息"""
    print(__doc__)


def main():
    """主函数"""
    if len(sys.argv) < 2:
        print_help()
        sys.exit(1)

    command = sys.argv[1].lower()

    # 加载数据库配置
    db_config = load_db_config()
    if not db_config:
        sys.exit(1)

    manager = UserManager(db_config)

    # 处理命令
    if command == 'add':
        if len(sys.argv) != 4:
            print("用法: python user_util.py add <username> <password>")
            sys.exit(1)
        username = sys.argv[2]
        password = sys.argv[3]
        manager.add_user(username, password)

    elif command == 'list':
        manager.list_users()

    elif command == 'disable':
        if len(sys.argv) != 3:
            print("用法: python user_util.py disable <username>")
            sys.exit(1)
        username = sys.argv[2]
        manager.disable_user(username)

    elif command == 'enable':
        if len(sys.argv) != 3:
            print("用法: python user_util.py enable <username>")
            sys.exit(1)
        username = sys.argv[2]
        manager.enable_user(username)

    elif command == 'reset':
        if len(sys.argv) != 4:
            print("用法: python user_util.py reset <username> <password>")
            sys.exit(1)
        username = sys.argv[2]
        new_password = sys.argv[3]
        manager.reset_password(username, new_password)

    else:
        print(f"✗ 未知命令: {command}")
        print_help()
        sys.exit(1)


if __name__ == '__main__':
    main()