-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathbot.py
More file actions
421 lines (354 loc) · 16.7 KB
/
bot.py
File metadata and controls
421 lines (354 loc) · 16.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
import logging
import httpx
import re
import os
import json
import datetime
from dotenv import load_dotenv
from telegram import ForceReply, Update, User
from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters
from telegram.constants import ParseMode, ChatAction
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure, ConfigurationError
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO)
logging.getLogger("httpx").setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
load_dotenv()
# --- Configuration ---
API_BASE = os.getenv("API_BASE", "https://gptcloud.arc53.com")
API_URL = API_BASE + "/api/answer"
API_KEY = os.getenv("API_KEY")
TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
# --- Load Additional Agents ---
ADDITIONAL_AGENTS = {}
for key, value in os.environ.items():
if key.startswith("API_KEY_") and key != "API_KEY":
agent_name = key[8:].lower() # remove API_KEY_ prefix and lowercase
ADDITIONAL_AGENTS[agent_name] = value
logger.info(f"Loaded {len(ADDITIONAL_AGENTS)} additional agents: {list(ADDITIONAL_AGENTS.keys())}")
if not API_KEY and ADDITIONAL_AGENTS:
# Fallback to the first available agent (sorted alphabetically for determinism)
first_agent = sorted(ADDITIONAL_AGENTS.keys())[0]
API_KEY = ADDITIONAL_AGENTS[first_agent]
logger.warning(f"API_KEY not set. Defaulting to agent '{first_agent}' key.")
API_CONTEXT_MESSAGES_COUNT = 20 # Number of messages (10 pairs) to use for API context
# --- Storage Configuration ---
STORAGE_TYPE = os.getenv("STORAGE_TYPE", "memory") # Default to in-memory
MONGODB_URI = os.getenv("MONGODB_URI") # Required if STORAGE_TYPE is 'mongodb'
MONGODB_DB_NAME = os.getenv("MONGODB_DB_NAME", "telegram_bot_memory")
MONGODB_COLLECTION_NAME = os.getenv("MONGODB_COLLECTION_NAME", "chat_histories")
# --- Global Storage Variables ---
mongo_client = None
mongo_collection = None
in_memory_storage = {} # Used if STORAGE_TYPE is 'memory'
# --- Initialize Storage ---
if STORAGE_TYPE.lower() == "mongodb":
if not MONGODB_URI:
logger.error("STORAGE_TYPE is 'mongodb' but MONGODB_URI is not set. Exiting.")
exit(1)
try:
logger.info(f"Attempting to connect to MongoDB: {MONGODB_URI[:15]}... DB: {MONGODB_DB_NAME}")
mongo_client = MongoClient(MONGODB_URI, serverSelectionTimeoutMS=5000)
mongo_client.admin.command('ismaster')
db = mongo_client[MONGODB_DB_NAME]
mongo_collection = db[MONGODB_COLLECTION_NAME]
logger.info(f"Successfully connected to MongoDB and selected collection '{MONGODB_COLLECTION_NAME}'.")
except Exception as e:
logger.error(f"An unexpected error occurred during MongoDB initialization: {e}", exc_info=True)
logger.warning("Falling back to in-memory storage.")
STORAGE_TYPE = "memory"
mongo_client = None
mongo_collection = None
elif STORAGE_TYPE.lower() == "memory":
logger.info("Using in-memory storage for chat history (will be lost on restart).")
else:
logger.warning(f"Unknown STORAGE_TYPE '{STORAGE_TYPE}'. Defaulting to in-memory storage.")
STORAGE_TYPE = "memory"
# --- Storage Access Functions ---
async def get_chat_data(chat_id: int) -> dict:
"""
Fetches chat history, conversation ID, and user info from the configured storage.
The user info stored pertains to the *last known user* who interacted in this chat.
History fetched is the complete history.
"""
chat_id_str = str(chat_id)
default_data = {"history": [], "conversation_id": None, "user_info": None}
if STORAGE_TYPE == "mongodb" and mongo_collection is not None:
try:
doc = mongo_collection.find_one({"_id": chat_id_str})
if doc:
history = doc.get("conversation_history", [])
conv_id = doc.get("conversation_id", None)
user_info = doc.get("user_info", None)
return {"history": history, "conversation_id": conv_id, "user_info": user_info}
else:
return default_data
except Exception as e:
logger.error(f"MongoDB Error fetching data for chat_id {chat_id_str}: {e}", exc_info=True)
return default_data
else:
data = in_memory_storage.get(chat_id_str, default_data)
return {
"history": data.get("history", []),
"conversation_id": data.get("conversation_id", None),
"user_info": data.get("user_info", None)
}
async def save_chat_data(chat_id: int, history: list, conversation_id: str | None, user_info: dict | None):
"""
Saves the complete chat history, conversation ID, and user info to the configured storage.
"""
chat_id_str = str(chat_id)
# The 'history' argument is the full history, which will be saved entirely.
if STORAGE_TYPE == "mongodb" and mongo_collection is not None:
try:
update_data = {
"conversation_history": history, # Save the full history
"conversation_id": conversation_id,
}
if user_info:
update_data["user_info"] = user_info
update_doc = {
"$set": update_data,
"$currentDate": {"last_updated": True}
}
mongo_collection.update_one(
{"_id": chat_id_str},
update_doc,
upsert=True
)
logger.debug(f"Saved full history for chat {chat_id_str} with user_info: {bool(user_info)}")
except Exception as e:
logger.error(f"MongoDB Error saving data for chat_id {chat_id_str}: {e}", exc_info=True)
else: # in-memory storage
if chat_id_str not in in_memory_storage:
in_memory_storage[chat_id_str] = {}
in_memory_storage[chat_id_str].update({
"history": history, # Save the full history
"conversation_id": conversation_id,
"last_updated": datetime.datetime.now(datetime.timezone.utc)
})
if user_info:
in_memory_storage[chat_id_str]["user_info"] = user_info
logger.debug(f"Saved full in-memory history for chat {chat_id_str} with user_info: {bool(user_info)}")
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Send a message when the command /start is issued."""
user = update.effective_user
await update.message.reply_html(
rf"Hi {user.mention_html()}! Ask me anything.",
)
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Send a message when the command /help is issued."""
help_text = (
"Here are the available commands:\n"
"/start - Begin a new conversation with the bot\n"
"/help - Display this help message\n\n"
"Just send any text to ask a question based on the configured documents!"
)
await update.message.reply_text(help_text)
def route_message(text: str) -> tuple[str, str]:
"""
Determines which API key to use based on the message prefix.
Returns (api_key, cleaned_text).
"""
if not text:
return API_KEY, text
# Check for @AGENT or #AGENT prefix
# We look for the first space to separate the tag
first_space = text.find(' ')
if first_space != -1:
tag = text[:first_space]
content = text[first_space+1:]
else:
tag = text
content = ""
if tag.startswith('#'):
agent_name = tag[1:].lower()
if agent_name in ADDITIONAL_AGENTS:
return ADDITIONAL_AGENTS[agent_name], content
else:
# Unknown agent
return None, None
return API_KEY, text
async def generate_answer(question: str, messages: list, conversation_id: str | None, api_key: str) -> dict:
"""
Generates an answer using the external DocsGPT API.
Uses only the last API_CONTEXT_MESSAGES_COUNT messages for context.
"""
if not api_key:
logger.warning("API_KEY is not set. Cannot call DocsGPT API.")
return {"answer": "Error: Backend API key is not configured.", "conversation_id": conversation_id}
# Use only the last API_CONTEXT_MESSAGES_COUNT messages for the API call context
context_messages = messages[-API_CONTEXT_MESSAGES_COUNT:]
try:
formatted_history = format_history_for_api(context_messages) # Use limited context_messages
history_json = json.dumps(formatted_history)
except TypeError as e:
logger.error(f"Failed to serialize history to JSON: {e}. History slice: {context_messages}", exc_info=True)
history_json = json.dumps([]) # Fallback to empty history for API
payload = {
"question": question,
"api_key": api_key,
"history": history_json,
"conversation_id": conversation_id
}
headers = {
"Content-Type": "application/json; charset=utf-8"
}
timeout = 120.0
default_error_msg = "Sorry, I couldn't get an answer from the backend service."
try:
async with httpx.AsyncClient() as client:
response = await client.post(API_URL, json=payload, headers=headers, timeout=timeout)
response.raise_for_status()
data = response.json()
answer = data.get("answer", default_error_msg)
returned_conv_id = data.get("conversation_id", conversation_id)
return {"answer": answer, "conversation_id": returned_conv_id}
except (httpx.ConnectTimeout, httpx.ReadTimeout):
logger.error("DocsGPT API timed out.")
return {"answer": "The brain is currently offline, please try again later", "conversation_id": conversation_id}
except httpx.HTTPStatusError as exc:
error_details = f"Status {exc.response.status_code}"
try:
error_body = exc.response.json()
error_details += f" - {error_body.get('detail', exc.response.text)}"
except json.JSONDecodeError:
error_details += f" - {exc.response.text}"
logger.error(f"HTTP error calling DocsGPT API: {error_details}")
return {"answer": f"{default_error_msg} (Error: {exc.response.status_code})", "conversation_id": conversation_id}
except httpx.RequestError as exc:
logger.error(f"Network error calling DocsGPT API: {exc}")
return {"answer": f"{default_error_msg} (Network Error)", "conversation_id": conversation_id}
except json.JSONDecodeError as exc:
logger.error(f"Failed to decode JSON response from DocsGPT API: {exc}. Response text: {response.text}", exc_info=True)
return {"answer": f"{default_error_msg} (Invalid Response Format)", "conversation_id": conversation_id}
except Exception as e:
logger.error(f"Unexpected error in generate_answer: {e}", exc_info=True)
return {"answer": f"{default_error_msg} (Unexpected Error)", "conversation_id": conversation_id}
async def echo(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handles non-command messages: get history, query API, save history & user info, reply."""
if not update.message or not update.message.text or not update.effective_chat or not update.effective_user:
logger.warning("Echo handler received an update without message, text, chat, or user.")
return
chat_id = update.effective_chat.id
user = update.effective_user
question = update.message.text
logger.info(f"Received message from user {user.id} ({user.username or user.first_name}) in chat_id {chat_id}")
try:
await context.bot.send_chat_action(chat_id=chat_id, action=ChatAction.TYPING)
logger.debug(f"Sent typing action to chat {chat_id}")
except Exception as e:
logger.warning(f"Failed to send typing action to chat {chat_id}: {e}")
user_info_dict = {
"id": user.id,
"first_name": user.first_name,
"last_name": user.last_name,
"username": user.username,
"is_bot": user.is_bot,
"language_code": user.language_code
}
user_info_dict = {k: v for k, v in user_info_dict.items() if v is not None}
# Get full history
chat_data = await get_chat_data(chat_id)
current_history = chat_data["history"] # This is the full history
current_conversation_id = chat_data["conversation_id"]
current_history.append({"role": "user", "content": question})
current_history.append({"role": "user", "content": question})
# Determine API key and clean question if needed
target_api_key, cleaned_question = route_message(question)
if target_api_key is None:
await update.message.reply_text("This agent doesn't exist")
return
# Update the last user message content to be the cleaned version
current_history[-1]["content"] = cleaned_question
# generate_answer will use a slice of current_history for API context
response_doc = await generate_answer(cleaned_question, current_history, current_conversation_id, target_api_key)
answer = response_doc["answer"]
new_conversation_id = response_doc["conversation_id"]
current_history.append({"role": "assistant", "content": answer})
# Save the full, updated current_history
await save_chat_data(chat_id, current_history, new_conversation_id, user_info_dict)
# Split message if too long
messages_to_send = split_message(answer)
for msg_chunk in messages_to_send:
try:
await update.message.reply_text(msg_chunk, parse_mode=ParseMode.MARKDOWN)
except Exception as e:
logger.warning(f"Failed to send Markdown message to chat {chat_id}: {e}. Retrying with plain text.")
try:
await update.message.reply_text(msg_chunk)
except Exception as fallback_e:
logger.error(f"Failed to send fallback plain text message to chat {chat_id}: {fallback_e}", exc_info=True)
def split_message(text: str, limit: int = 4096) -> list[str]:
"""
Splits a message into chunks of at most `limit` characters.
Tries to split by newlines first, then spaces, then forced split.
"""
if len(text) <= limit:
return [text]
chunks = []
while text:
if len(text) <= limit:
chunks.append(text)
break
# Try to find the last newline within the limit
split_at = text.rfind('\n', 0, limit)
if split_at == -1:
# No newline found, try to find the last space
split_at = text.rfind(' ', 0, limit)
if split_at == -1:
# No space found, force split at limit
split_at = limit
chunks.append(text[:split_at])
# If we split at a specific character that is whitespace, we can skip it for the next chunk start
# unless it's a forced split.
if split_at < len(text) and text[split_at] in ['\n', ' ']:
text = text[split_at+1:]
else:
text = text[split_at:]
return chunks
def format_history_for_api(messages: list) -> list:
"""
Converts internal history format [{'role': 'user', 'content': '...'}, ...]
to the API required format [{'prompt': '...', 'response': '...'}, ...].
Ensures only valid pairs are included.
"""
api_history = []
i = 0
while i < len(messages):
if messages[i].get("role") == "user" and "content" in messages[i]:
prompt_content = messages[i]["content"]
response_content = None
if i + 1 < len(messages) and messages[i+1].get("role") == "assistant" and "content" in messages[i+1]:
response_content = messages[i+1]["content"]
api_history.append({"prompt": prompt_content, "response": response_content})
i += 2
else:
i += 1
else:
i += 1
return api_history
def main() -> None:
"""Start the bot."""
if not TOKEN:
logger.critical("TELEGRAM_BOT_TOKEN environment variable not set! Exiting.")
return
if not API_KEY:
logger.warning("API_KEY environment variable not set! DocsGPT API calls will fail.")
if STORAGE_TYPE == "mongodb" and mongo_collection is None:
logger.critical("MongoDB storage configured but connection failed. Exiting.")
return
logger.info(f"Initializing Telegram Bot Application with storage type: {STORAGE_TYPE}")
application = Application.builder().token(TOKEN).build()
application.add_handler(CommandHandler("start", start))
application.add_handler(CommandHandler("help", help_command))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND & filters.UpdateType.MESSAGE, echo))
logger.info("Starting Telegram bot polling...")
application.run_polling(allowed_updates=Update.ALL_TYPES)
if mongo_client:
logger.info("Closing MongoDB connection.")
mongo_client.close()
logger.info("Telegram bot stopped.")
if __name__ == "__main__":
main()