database.py
7.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
"""
数据库管理模块
使用SQLite存储消息记录和发送历史
"""
import sqlite3
import os
from datetime import datetime
from typing import List, Dict, Optional, Tuple
import json
class Database:
"""数据库管理类"""
DB_FILE = "data/etsy_notify.db"
def __init__(self):
self._ensure_data_dir()
self._init_database()
def _ensure_data_dir(self):
"""确保数据目录存在"""
os.makedirs("data", exist_ok=True)
def _init_database(self):
"""初始化数据库表"""
with sqlite3.connect(self.DB_FILE) as conn:
cursor = conn.cursor()
# 发送记录表
cursor.execute("""
CREATE TABLE IF NOT EXISTS sent_messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
conversation_id TEXT NOT NULL,
customer_name TEXT,
order_id TEXT,
message_content TEXT NOT NULL,
sent_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
conversation_start_time TEXT,
last_conversation_time TEXT,
tags TEXT,
trigger_type TEXT CHECK(trigger_type IN ('conversation', 'order')),
UNIQUE(conversation_id, order_id)
)
""")
# 用户标签表
cursor.execute("""
CREATE TABLE IF NOT EXISTS user_tags (
id INTEGER PRIMARY KEY AUTOINCREMENT,
tag_name TEXT UNIQUE NOT NULL,
tag_count INTEGER DEFAULT 0,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# 系统状态表
cursor.execute("""
CREATE TABLE IF NOT EXISTS system_status (
key TEXT PRIMARY KEY,
value TEXT,
updated_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()
def record_sent_message(self, conversation_id: str, customer_name: str,
message_content: str, trigger_type: str,
order_id: str = None, conversation_start_time: str = None,
last_conversation_time: str = None, tags: List[str] = None) -> bool:
"""记录发送的消息"""
try:
with sqlite3.connect(self.DB_FILE) as conn:
cursor = conn.cursor()
tags_json = json.dumps(tags) if tags else None
cursor.execute("""
INSERT OR REPLACE INTO sent_messages
(conversation_id, customer_name, order_id, message_content,
conversation_start_time, last_conversation_time, tags, trigger_type)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (conversation_id, customer_name, order_id, message_content,
conversation_start_time, last_conversation_time, tags_json, trigger_type))
conn.commit()
return True
except Exception as e:
print(f"记录消息失败: {e}")
return False
def is_already_sent(self, conversation_id: str, order_id: str = None) -> bool:
"""检查是否已经发送过消息"""
with sqlite3.connect(self.DB_FILE) as conn:
cursor = conn.cursor()
if order_id:
cursor.execute("""
SELECT 1 FROM sent_messages
WHERE conversation_id = ? OR order_id = ?
LIMIT 1
""", (conversation_id, order_id))
else:
cursor.execute("""
SELECT 1 FROM sent_messages
WHERE conversation_id = ?
LIMIT 1
""", (conversation_id,))
return cursor.fetchone() is not None
def get_daily_sent_count(self, date: str = None) -> int:
"""获取指定日期的发送数量"""
if date is None:
date = datetime.now().strftime('%Y-%m-%d')
with sqlite3.connect(self.DB_FILE) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT COUNT(*) FROM sent_messages
WHERE DATE(sent_time) = ?
""", (date,))
result = cursor.fetchone()
return result[0] if result else 0
def get_sent_messages_for_export(self, start_date: str = None, end_date: str = None) -> List[Dict]:
"""获取发送记录用于导出"""
with sqlite3.connect(self.DB_FILE) as conn:
cursor = conn.cursor()
query = """
SELECT conversation_id, customer_name, order_id, message_content,
sent_time, conversation_start_time, last_conversation_time,
tags, trigger_type
FROM sent_messages
"""
params = []
if start_date and end_date:
query += " WHERE DATE(sent_time) BETWEEN ? AND ?"
params = [start_date, end_date]
elif start_date:
query += " WHERE DATE(sent_time) >= ?"
params = [start_date]
elif end_date:
query += " WHERE DATE(sent_time) <= ?"
params = [end_date]
query += " ORDER BY sent_time DESC"
cursor.execute(query, params)
rows = cursor.fetchall()
columns = ['对话ID', '客户姓名', '订单ID', '发送内容', '发送时间',
'对话创建时间', '最后对话时间', '标签', '触发方式']
return [dict(zip(columns, row)) for row in rows]
def update_user_tags(self, tags_data: Dict[str, int]):
"""更新用户标签数据"""
with sqlite3.connect(self.DB_FILE) as conn:
cursor = conn.cursor()
# 清空旧数据
cursor.execute("DELETE FROM user_tags")
# 插入新数据
for tag_name, count in tags_data.items():
cursor.execute("""
INSERT INTO user_tags (tag_name, tag_count)
VALUES (?, ?)
""", (tag_name, count))
conn.commit()
def get_user_tags(self) -> Dict[str, int]:
"""获取用户标签数据"""
with sqlite3.connect(self.DB_FILE) as conn:
cursor = conn.cursor()
cursor.execute("SELECT tag_name, tag_count FROM user_tags")
return dict(cursor.fetchall())
def clear_sent_records(self):
"""清空发送记录(重置营销消息时使用)"""
with sqlite3.connect(self.DB_FILE) as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM sent_messages")
conn.commit()
def set_system_status(self, key: str, value: str):
"""设置系统状态"""
with sqlite3.connect(self.DB_FILE) as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT OR REPLACE INTO system_status (key, value, updated_time)
VALUES (?, ?, CURRENT_TIMESTAMP)
""", (key, value))
conn.commit()
def get_system_status(self, key: str) -> Optional[str]:
"""获取系统状态"""
with sqlite3.connect(self.DB_FILE) as conn:
cursor = conn.cursor()
cursor.execute("SELECT value FROM system_status WHERE key = ?", (key,))
result = cursor.fetchone()
return result[0] if result else None