database.py 7.5 KB
"""
数据库管理模块
使用SQLite存储消息记录和发送历史
"""

import sqlite3
import os
from datetime import datetime
from typing import List, Dict, Optional, Tuple
import json


class Database:
    """数据库管理类"""

    DB_FILE = "data/etsy_notify.db"

    def __init__(self):
        self._ensure_data_dir()
        self._init_database()

    def _ensure_data_dir(self):
        """确保数据目录存在"""
        os.makedirs("data", exist_ok=True)

    def _init_database(self):
        """初始化数据库表"""
        with sqlite3.connect(self.DB_FILE) as conn:
            cursor = conn.cursor()

            # 发送记录表
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS sent_messages (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    conversation_id TEXT NOT NULL,
                    customer_name TEXT,
                    order_id TEXT,
                    message_content TEXT NOT NULL,
                    sent_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    conversation_start_time TEXT,
                    last_conversation_time TEXT,
                    tags TEXT,
                    trigger_type TEXT CHECK(trigger_type IN ('conversation', 'order')),
                    UNIQUE(conversation_id, order_id)
                )
            """)

            # 用户标签表
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS user_tags (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    tag_name TEXT UNIQUE NOT NULL,
                    tag_count INTEGER DEFAULT 0,
                    last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            """)

            # 系统状态表
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS system_status (
                    key TEXT PRIMARY KEY,
                    value TEXT,
                    updated_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            """)

            conn.commit()

    def record_sent_message(self, conversation_id: str, customer_name: str,
                          message_content: str, trigger_type: str,
                          order_id: str = None, conversation_start_time: str = None,
                          last_conversation_time: str = None, tags: List[str] = None) -> bool:
        """记录发送的消息"""
        try:
            with sqlite3.connect(self.DB_FILE) as conn:
                cursor = conn.cursor()

                tags_json = json.dumps(tags) if tags else None

                cursor.execute("""
                    INSERT OR REPLACE INTO sent_messages
                    (conversation_id, customer_name, order_id, message_content,
                     conversation_start_time, last_conversation_time, tags, trigger_type)
                    VALUES (?, ?, ?, ?, ?, ?, ?, ?)
                """, (conversation_id, customer_name, order_id, message_content,
                      conversation_start_time, last_conversation_time, tags_json, trigger_type))

                conn.commit()
                return True
        except Exception as e:
            print(f"记录消息失败: {e}")
            return False

    def is_already_sent(self, conversation_id: str, order_id: str = None) -> bool:
        """检查是否已经发送过消息"""
        with sqlite3.connect(self.DB_FILE) as conn:
            cursor = conn.cursor()

            if order_id:
                cursor.execute("""
                    SELECT 1 FROM sent_messages
                    WHERE conversation_id = ? OR order_id = ?
                    LIMIT 1
                """, (conversation_id, order_id))
            else:
                cursor.execute("""
                    SELECT 1 FROM sent_messages
                    WHERE conversation_id = ?
                    LIMIT 1
                """, (conversation_id,))

            return cursor.fetchone() is not None

    def get_daily_sent_count(self, date: str = None) -> int:
        """获取指定日期的发送数量"""
        if date is None:
            date = datetime.now().strftime('%Y-%m-%d')

        with sqlite3.connect(self.DB_FILE) as conn:
            cursor = conn.cursor()
            cursor.execute("""
                SELECT COUNT(*) FROM sent_messages
                WHERE DATE(sent_time) = ?
            """, (date,))

            result = cursor.fetchone()
            return result[0] if result else 0

    def get_sent_messages_for_export(self, start_date: str = None, end_date: str = None) -> List[Dict]:
        """获取发送记录用于导出"""
        with sqlite3.connect(self.DB_FILE) as conn:
            cursor = conn.cursor()

            query = """
                SELECT conversation_id, customer_name, order_id, message_content,
                       sent_time, conversation_start_time, last_conversation_time,
                       tags, trigger_type
                FROM sent_messages
            """
            params = []

            if start_date and end_date:
                query += " WHERE DATE(sent_time) BETWEEN ? AND ?"
                params = [start_date, end_date]
            elif start_date:
                query += " WHERE DATE(sent_time) >= ?"
                params = [start_date]
            elif end_date:
                query += " WHERE DATE(sent_time) <= ?"
                params = [end_date]

            query += " ORDER BY sent_time DESC"

            cursor.execute(query, params)
            rows = cursor.fetchall()

            columns = ['对话ID', '客户姓名', '订单ID', '发送内容', '发送时间',
                      '对话创建时间', '最后对话时间', '标签', '触发方式']

            return [dict(zip(columns, row)) for row in rows]

    def update_user_tags(self, tags_data: Dict[str, int]):
        """更新用户标签数据"""
        with sqlite3.connect(self.DB_FILE) as conn:
            cursor = conn.cursor()

            # 清空旧数据
            cursor.execute("DELETE FROM user_tags")

            # 插入新数据
            for tag_name, count in tags_data.items():
                cursor.execute("""
                    INSERT INTO user_tags (tag_name, tag_count)
                    VALUES (?, ?)
                """, (tag_name, count))

            conn.commit()

    def get_user_tags(self) -> Dict[str, int]:
        """获取用户标签数据"""
        with sqlite3.connect(self.DB_FILE) as conn:
            cursor = conn.cursor()
            cursor.execute("SELECT tag_name, tag_count FROM user_tags")
            return dict(cursor.fetchall())

    def clear_sent_records(self):
        """清空发送记录(重置营销消息时使用)"""
        with sqlite3.connect(self.DB_FILE) as conn:
            cursor = conn.cursor()
            cursor.execute("DELETE FROM sent_messages")
            conn.commit()

    def set_system_status(self, key: str, value: str):
        """设置系统状态"""
        with sqlite3.connect(self.DB_FILE) as conn:
            cursor = conn.cursor()
            cursor.execute("""
                INSERT OR REPLACE INTO system_status (key, value, updated_time)
                VALUES (?, ?, CURRENT_TIMESTAMP)
            """, (key, value))
            conn.commit()

    def get_system_status(self, key: str) -> Optional[str]:
        """获取系统状态"""
        with sqlite3.connect(self.DB_FILE) as conn:
            cursor = conn.cursor()
            cursor.execute("SELECT value FROM system_status WHERE key = ?", (key,))
            result = cursor.fetchone()
            return result[0] if result else None