From c585a36584c530fb5a668b67f4eb7897da7f3bf0 Mon Sep 17 00:00:00 2001 From: Iliya Brook Date: Sat, 18 Apr 2026 18:46:12 +0300 Subject: [PATCH] fix: buffer Telegram photo-album messages into a single Claude request --- src/bot/orchestrator.py | 117 ++++++- src/bot/utils/media_group_buffer.py | 215 ++++++++++++ src/config/settings.py | 15 + src/utils/constants.py | 5 + .../unit/test_bot/test_media_group_buffer.py | 317 ++++++++++++++++++ tests/unit/test_bot/test_middleware.py | 1 + 6 files changed, 659 insertions(+), 11 deletions(-) create mode 100644 src/bot/utils/media_group_buffer.py create mode 100644 tests/unit/test_bot/test_media_group_buffer.py diff --git a/src/bot/orchestrator.py b/src/bot/orchestrator.py index 6d9719f0..d5125f00 100644 --- a/src/bot/orchestrator.py +++ b/src/bot/orchestrator.py @@ -39,6 +39,11 @@ should_send_as_photo, validate_image_path, ) +from .utils.media_group_buffer import ( + BufferedMediaGroup, + MediaGroupBuffer, + MediaGroupKey, +) logger = structlog.get_logger() @@ -135,6 +140,10 @@ def __init__(self, settings: Settings, deps: Dict[str, Any]): self.deps = deps self._active_requests: Dict[int, ActiveRequest] = {} self._known_commands: frozenset[str] = frozenset() + self._media_group_buffer = MediaGroupBuffer( + flush_timeout=settings.media_group_buffer_timeout, + on_flush=self._on_media_group_flush, + ) def _inject_deps(self, handler: Callable) -> Callable: # type: ignore[type-arg] """Wrap handler to inject dependencies into context.bot_data.""" @@ -1341,14 +1350,75 @@ async def agentic_document( async def agentic_photo( self, update: Update, context: ContextTypes.DEFAULT_TYPE ) -> None: - """Process photo -> Claude, minimal chrome.""" + """Process photo -> Claude, minimal chrome. + + Photos that belong to a Telegram album (identified by a shared + ``media_group_id``) are buffered until all album items have + arrived, then sent to Claude as a single request. Standalone + photos are processed immediately. + """ user_id = update.effective_user.id + message = update.message + if message is None: + return features = context.bot_data.get("features") image_handler = features.get_image_handler() if features else None if not image_handler: - await update.message.reply_text("Photo processing is not available.") + await message.reply_text("Photo processing is not available.") + return + + media_group_id = getattr(message, "media_group_id", None) + if media_group_id is not None: + # Album — buffer and wait for siblings. + chat_id = message.chat.id + thread_id = self._extract_message_thread_id(update) + key: MediaGroupKey = (user_id, chat_id, thread_id, media_group_id) + await self._media_group_buffer.add_photo( + key, message.photo[-1], message.caption, update, context + ) + return + + # Standalone photo — process right away. + await self._process_photo_batch( + update=update, + context=context, + photos=[message.photo[-1]], + caption=message.caption, + ) + + async def _on_media_group_flush( + self, key: MediaGroupKey, result: BufferedMediaGroup + ) -> None: + """Called by MediaGroupBuffer after the debounce window closes.""" + logger.info( + "Media group flush", + user_id=key[0], + photo_count=result.photo_count, + has_caption=bool(result.caption), + ) + await self._process_photo_batch( + update=result.first_update, + context=result.last_context, + photos=result.photos, + caption=result.caption, + ) + + async def _process_photo_batch( + self, + *, + update: Update, + context: ContextTypes.DEFAULT_TYPE, + photos: List[Any], + caption: Optional[str], + ) -> None: + """Send *photos* (one or many) to Claude in a single request.""" + user_id = update.effective_user.id + features = context.bot_data.get("features") + image_handler = features.get_image_handler() if features else None + + if not image_handler or not photos: return chat = update.message.chat @@ -1356,22 +1426,39 @@ async def agentic_photo( progress_msg = await update.message.reply_text("Working...") try: - photo = update.message.photo[-1] - processed_image = await image_handler.process_image( - photo, update.message.caption - ) - fmt = processed_image.metadata.get("format", "png") - images = [ + # First photo carries the caption so its prompt template is + # built with the user's context. Remaining photos only + # contribute image data. + first_processed = await image_handler.process_image(photos[0], caption) + fmt = first_processed.metadata.get("format", "png") + images: List[Dict[str, str]] = [ { - "data": processed_image.base64_data, + "data": first_processed.base64_data, "media_type": _MEDIA_TYPE_MAP.get(fmt, "image/png"), } ] + for photo in photos[1:]: + extra = await image_handler.process_image(photo, None) + extra_fmt = extra.metadata.get("format", "png") + images.append( + { + "data": extra.base64_data, + "media_type": _MEDIA_TYPE_MAP.get(extra_fmt, "image/png"), + } + ) + + prompt = first_processed.prompt + if len(images) > 1: + prompt = ( + f"{prompt}\n\n" + f"(Attached {len(images)} images in this message — " + f"please consider them together.)" + ) await self._handle_agentic_media_message( update=update, context=context, - prompt=processed_image.prompt, + prompt=prompt, progress_msg=progress_msg, user_id=user_id, chat=chat, @@ -1383,7 +1470,10 @@ async def agentic_photo( await progress_msg.edit_text(_format_error_message(e), parse_mode="HTML") logger.error( - "Claude photo processing failed", error=str(e), user_id=user_id + "Claude photo processing failed", + error=str(e), + user_id=user_id, + photo_count=len(photos), ) async def agentic_voice( @@ -1684,6 +1774,11 @@ async def _handle_stop_callback( ) return + # Cancel any pending media-group buffer for this user. + for buf_key in self._media_group_buffer.pending_keys: + if buf_key[0] == target_user_id: + self._media_group_buffer.cancel(buf_key) + active = self._active_requests.get(target_user_id) if not active: await query.answer("Already completed.", show_alert=False) diff --git a/src/bot/utils/media_group_buffer.py b/src/bot/utils/media_group_buffer.py new file mode 100644 index 00000000..9dfaeadc --- /dev/null +++ b/src/bot/utils/media_group_buffer.py @@ -0,0 +1,215 @@ +"""Buffer for Telegram media-group (album) photos. + +When a user sends an album (text + N photos), Telegram delivers it as +N separate ``Update``s that share the same ``message.media_group_id``. +Only one of those messages carries the user's caption. Without buffering, +the bot would fire N independent Claude requests — one per photo. + +This module collects all photos sharing a ``media_group_id`` within a +short debounce window, then hands them off as a single payload via an +``on_flush`` callback. + +Design constraints +------------------ +* Unlike :class:`~src.bot.utils.message_buffer.MessageBuffer`, there is no + length-based heuristic here — *any* photo with a ``media_group_id`` is + buffered, because by definition such photos are part of an album. +* Albums always have 2+ items, so we flush exclusively via the debounce + timer (no "short tail" early-flush path). +* The flush callback runs as an independent ``asyncio.Task`` so the + handler that received the final chunk can return immediately. +""" + +from __future__ import annotations + +import asyncio +from dataclasses import dataclass, field +from typing import Any, Callable, Coroutine, Dict, List, Optional, Tuple + +import structlog + +logger = structlog.get_logger() + +# (user_id, chat_id, thread_id, media_group_id) +MediaGroupKey = Tuple[int, int, Optional[int], str] + + +@dataclass +class BufferedMediaGroup: + """Payload returned when a media-group buffer is ready for processing.""" + + photos: List[Any] # telegram.PhotoSize — largest size per message + caption: Optional[str] + first_update: Any # telegram.Update (the first one we saw) + last_context: Any # ContextTypes.DEFAULT_TYPE + photo_count: int + + +@dataclass +class _MediaGroupEntry: + """Internal mutable state for one pending media-group buffer.""" + + photos: List[Any] = field(default_factory=list) + caption: Optional[str] = None + first_update: Any = None + last_context: Any = None + timer_task: Optional[asyncio.Task[None]] = None + status_message: Any = None # telegram.Message + + +FlushCallback = Callable[[MediaGroupKey, BufferedMediaGroup], Coroutine[Any, Any, None]] + + +class MediaGroupBuffer: + """Per-album debounce buffer for Telegram photo groups. + + Parameters + ---------- + flush_timeout: + Seconds to wait after the last photo before flushing the buffer. + Telegram typically delivers all photos in an album within a few + hundred milliseconds; the default of ``1.0`` gives generous + headroom while keeping perceived latency low. + on_flush: + Async callback invoked when the buffer flushes. + Signature: ``async def on_flush(key, result) -> None``. + """ + + def __init__( + self, + flush_timeout: float = 1.0, + on_flush: Optional[FlushCallback] = None, + ) -> None: + self._buffers: Dict[MediaGroupKey, _MediaGroupEntry] = {} + self._flush_timeout = flush_timeout + self._on_flush = on_flush + + # -- query --------------------------------------------------------------- + + @property + def pending_keys(self) -> List[MediaGroupKey]: + """Return keys that currently have buffered photos.""" + return list(self._buffers.keys()) + + def has_buffer(self, key: MediaGroupKey) -> bool: + """Return True if there is a pending buffer for *key*.""" + return key in self._buffers + + # -- mutate -------------------------------------------------------------- + + async def add_photo( + self, + key: MediaGroupKey, + photo: Any, + caption: Optional[str], + update: Any, + context: Any, + ) -> None: + """Append a photo to the buffer and (re)start the debounce timer. + + Always returns ``None`` — albums are flushed exclusively via the + timer, never inline. The caller should return immediately after + calling this method. + """ + entry = self._buffers.get(key) + + if entry is None: + entry = _MediaGroupEntry( + photos=[photo], + caption=caption, + first_update=update, + last_context=context, + ) + self._buffers[key] = entry + + # Lightweight user feedback — the final response is still + # anchored to the first update's message_id via ``first_update``. + try: + entry.status_message = await update.message.reply_text( + "Receiving album\u2026" + ) + except Exception: + logger.debug("Failed to send album status message") + else: + entry.photos.append(photo) + entry.last_context = context + # Captions can arrive on any message in the group (Telegram + # places it on one of them, not always the first). Keep the + # first non-empty caption we see. + if not entry.caption and caption: + entry.caption = caption + self._cancel_timer(entry) + + self._schedule_timer(key) + + def cancel(self, key: MediaGroupKey) -> Optional[BufferedMediaGroup]: + """Cancel a pending buffer synchronously and return its contents.""" + entry = self._buffers.get(key) + if entry is None: + return None + + self._cancel_timer(entry) + return self._pop_result(key) + + # -- internal ------------------------------------------------------------ + + def _schedule_timer(self, key: MediaGroupKey) -> None: + entry = self._buffers.get(key) + if entry is None: + return + entry.timer_task = asyncio.create_task(self._timer_coro(key)) + + @staticmethod + def _cancel_timer(entry: _MediaGroupEntry) -> None: + if entry.timer_task is not None and not entry.timer_task.done(): + entry.timer_task.cancel() + entry.timer_task = None + + async def _timer_coro(self, key: MediaGroupKey) -> None: + """Sleep for the debounce period, then flush.""" + try: + await asyncio.sleep(self._flush_timeout) + except asyncio.CancelledError: + return + + result = self._pop_result(key) + if result is None: + return + + logger.info( + "Media group buffer timer fired", + user_id=key[0], + media_group_id=key[3], + photo_count=result.photo_count, + has_caption=bool(result.caption), + ) + + if self._on_flush is not None: + asyncio.create_task(self._on_flush(key, result)) + + def _pop_result(self, key: MediaGroupKey) -> Optional[BufferedMediaGroup]: + """Remove the entry for *key* and return a :class:`BufferedMediaGroup`.""" + entry = self._buffers.pop(key, None) + if entry is None: + return None + + self._cancel_timer(entry) + + # Best-effort cleanup of the status message. + if entry.status_message is not None: + asyncio.ensure_future(self._delete_message(entry.status_message)) + + return BufferedMediaGroup( + photos=list(entry.photos), + caption=entry.caption, + first_update=entry.first_update, + last_context=entry.last_context, + photo_count=len(entry.photos), + ) + + @staticmethod + async def _delete_message(msg: Any) -> None: + try: + await msg.delete() + except Exception: + pass diff --git a/src/config/settings.py b/src/config/settings.py index c4f7cb18..dd9b0326 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -22,6 +22,7 @@ DEFAULT_CLAUDE_TIMEOUT_SECONDS, DEFAULT_DATABASE_URL, DEFAULT_MAX_SESSIONS_PER_USER, + DEFAULT_MEDIA_GROUP_BUFFER_TIMEOUT, DEFAULT_PROJECT_THREADS_SYNC_ACTION_INTERVAL_SECONDS, DEFAULT_RATE_LIMIT_BURST, DEFAULT_RATE_LIMIT_REQUESTS, @@ -281,6 +282,20 @@ class Settings(BaseSettings): le=5.0, ) + # Media-group (photo album) buffering + media_group_buffer_timeout: float = Field( + DEFAULT_MEDIA_GROUP_BUFFER_TIMEOUT, + description=( + "Seconds to wait for additional photos in a Telegram album before " + "sending them to Claude as a single request. Telegram delivers " + "album photos as separate Updates that share a media_group_id; " + "this debounce coalesces them into one call so the user gets one " + "reply instead of N." + ), + ge=0.3, + le=5.0, + ) + # Monitoring log_level: str = Field("INFO", description="Logging level") enable_telemetry: bool = Field(False, description="Enable anonymous telemetry") diff --git a/src/utils/constants.py b/src/utils/constants.py index 7b66f9a6..d0d8b77e 100644 --- a/src/utils/constants.py +++ b/src/utils/constants.py @@ -91,5 +91,10 @@ DEFAULT_RETRY_BACKOFF_FACTOR = 3.0 DEFAULT_RETRY_MAX_DELAY = 30.0 +# Media-group (photo album) buffering. Telegram delivers album photos as +# separate Updates sharing a media_group_id; we debounce them into a +# single Claude request. +DEFAULT_MEDIA_GROUP_BUFFER_TIMEOUT = 1.0 + # Logging LOG_FORMAT = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" diff --git a/tests/unit/test_bot/test_media_group_buffer.py b/tests/unit/test_bot/test_media_group_buffer.py new file mode 100644 index 00000000..1e58b059 --- /dev/null +++ b/tests/unit/test_bot/test_media_group_buffer.py @@ -0,0 +1,317 @@ +"""Tests for MediaGroupBuffer (Telegram photo-album buffering). + +Covers: +- Basic buffering: first photo starts a buffer, subsequent ones append +- Timer fires exactly once and delivers all photos + caption +- Caption arriving on later messages is captured +- Multiple independent groups are keyed separately +- cancel() clears state and returns pending contents +- Status-message lifecycle (best-effort delete on flush/cancel) +""" + +import asyncio +from typing import List, Tuple +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from src.bot.utils.media_group_buffer import ( + BufferedMediaGroup, + MediaGroupBuffer, + MediaGroupKey, +) + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +def _make_update(photo_id: str = "p", with_reply: bool = True) -> MagicMock: + """Build a minimal mock Update/Message with reply_text wired up.""" + update = MagicMock() + update.message = MagicMock() + if with_reply: + status_msg = MagicMock() + status_msg.delete = AsyncMock() + update.message.reply_text = AsyncMock(return_value=status_msg) + else: + update.message.reply_text = AsyncMock(side_effect=RuntimeError("no reply")) + update._photo_id = photo_id # just for test identity + return update + + +def _make_photo(file_id: str) -> MagicMock: + photo = MagicMock() + photo.file_id = file_id + return photo + + +# --------------------------------------------------------------------------- +# add_photo — basic behavior +# --------------------------------------------------------------------------- + + +class TestAddPhoto: + async def test_first_photo_starts_buffer(self): + """First photo initialises an entry and sends status message.""" + buf = MediaGroupBuffer(flush_timeout=5.0) + key: MediaGroupKey = (1, 2, None, "mg-A") + update = _make_update() + + await buf.add_photo(key, _make_photo("f1"), "cap", update, object()) + + assert buf.has_buffer(key) + assert buf.pending_keys == [key] + update.message.reply_text.assert_awaited_once() + + async def test_subsequent_photo_appends(self): + """A second photo on the same key extends the buffer, not new status msg.""" + buf = MediaGroupBuffer(flush_timeout=5.0) + key: MediaGroupKey = (1, 2, None, "mg-A") + u1, u2 = _make_update(), _make_update() + + await buf.add_photo(key, _make_photo("f1"), "cap", u1, object()) + await buf.add_photo(key, _make_photo("f2"), None, u2, object()) + + # Only the FIRST update sends a status message. + u1.message.reply_text.assert_awaited_once() + u2.message.reply_text.assert_not_awaited() + assert buf.has_buffer(key) + + async def test_different_keys_separate_buffers(self): + buf = MediaGroupBuffer(flush_timeout=5.0) + key_a: MediaGroupKey = (1, 2, None, "mg-A") + key_b: MediaGroupKey = (1, 2, None, "mg-B") + + await buf.add_photo(key_a, _make_photo("a1"), "capA", _make_update(), object()) + await buf.add_photo(key_b, _make_photo("b1"), "capB", _make_update(), object()) + + assert set(buf.pending_keys) == {key_a, key_b} + + async def test_returns_none_always(self): + """add_photo is purely buffering — never returns a result inline.""" + buf = MediaGroupBuffer(flush_timeout=5.0) + key: MediaGroupKey = (1, 2, None, "mg") + result = await buf.add_photo( + key, _make_photo("f1"), None, _make_update(), object() + ) + assert result is None + + +# --------------------------------------------------------------------------- +# Timer flush +# --------------------------------------------------------------------------- + + +class TestFlush: + async def test_timer_fires_after_timeout(self): + """After flush_timeout elapses, on_flush is invoked exactly once.""" + flushes: List[Tuple[MediaGroupKey, BufferedMediaGroup]] = [] + flush_done = asyncio.Event() + + async def _on_flush(key, result): + flushes.append((key, result)) + flush_done.set() + + buf = MediaGroupBuffer(flush_timeout=0.05, on_flush=_on_flush) + key: MediaGroupKey = (1, 2, None, "mg") + + await buf.add_photo(key, _make_photo("f1"), "cap", _make_update(), object()) + await buf.add_photo(key, _make_photo("f2"), None, _make_update(), object()) + + await asyncio.wait_for(flush_done.wait(), timeout=1.0) + # Allow the flush task scheduled via create_task() to run. + await asyncio.sleep(0.01) + + assert len(flushes) == 1 + flushed_key, result = flushes[0] + assert flushed_key == key + assert result.photo_count == 2 + assert result.caption == "cap" + assert [p.file_id for p in result.photos] == ["f1", "f2"] + assert not buf.has_buffer(key) + + async def test_timer_reset_on_new_photo(self): + """Each new photo restarts the debounce timer so late additions are included.""" + flushes: List[BufferedMediaGroup] = [] + flush_done = asyncio.Event() + + async def _on_flush(key, result): + flushes.append(result) + flush_done.set() + + buf = MediaGroupBuffer(flush_timeout=0.15, on_flush=_on_flush) + key: MediaGroupKey = (1, 2, None, "mg") + + await buf.add_photo(key, _make_photo("f1"), "cap", _make_update(), object()) + # Wait less than the full timeout, then add another photo. + await asyncio.sleep(0.08) + await buf.add_photo(key, _make_photo("f2"), None, _make_update(), object()) + await asyncio.sleep(0.08) # still within reset timeout + await buf.add_photo(key, _make_photo("f3"), None, _make_update(), object()) + + await asyncio.wait_for(flush_done.wait(), timeout=1.0) + await asyncio.sleep(0.01) + + assert len(flushes) == 1 + assert flushes[0].photo_count == 3 + + async def test_caption_captured_on_later_photo(self): + """If caption only arrives on a non-first message, it's still captured.""" + flushes: List[BufferedMediaGroup] = [] + flush_done = asyncio.Event() + + async def _on_flush(key, result): + flushes.append(result) + flush_done.set() + + buf = MediaGroupBuffer(flush_timeout=0.05, on_flush=_on_flush) + key: MediaGroupKey = (1, 2, None, "mg") + + await buf.add_photo(key, _make_photo("f1"), None, _make_update(), object()) + await buf.add_photo( + key, _make_photo("f2"), "late caption", _make_update(), object() + ) + + await asyncio.wait_for(flush_done.wait(), timeout=1.0) + await asyncio.sleep(0.01) + assert flushes[0].caption == "late caption" + + async def test_first_non_empty_caption_wins(self): + """If multiple messages carry captions, the first one is kept.""" + flushes: List[BufferedMediaGroup] = [] + flush_done = asyncio.Event() + + async def _on_flush(key, result): + flushes.append(result) + flush_done.set() + + buf = MediaGroupBuffer(flush_timeout=0.05, on_flush=_on_flush) + key: MediaGroupKey = (1, 2, None, "mg") + + await buf.add_photo(key, _make_photo("f1"), "first", _make_update(), object()) + await buf.add_photo(key, _make_photo("f2"), "second", _make_update(), object()) + + await asyncio.wait_for(flush_done.wait(), timeout=1.0) + await asyncio.sleep(0.01) + assert flushes[0].caption == "first" + + async def test_first_update_preserved(self): + """The first update is preserved so replies anchor to the album origin.""" + flushes: List[BufferedMediaGroup] = [] + flush_done = asyncio.Event() + + async def _on_flush(key, result): + flushes.append(result) + flush_done.set() + + buf = MediaGroupBuffer(flush_timeout=0.05, on_flush=_on_flush) + key: MediaGroupKey = (1, 2, None, "mg") + first = _make_update("first") + second = _make_update("second") + + await buf.add_photo(key, _make_photo("f1"), "cap", first, object()) + await buf.add_photo(key, _make_photo("f2"), None, second, object()) + + await asyncio.wait_for(flush_done.wait(), timeout=1.0) + await asyncio.sleep(0.01) + assert flushes[0].first_update is first + + async def test_last_context_preserved(self): + """last_context reflects the most recent update's context.""" + flushes: List[BufferedMediaGroup] = [] + flush_done = asyncio.Event() + + async def _on_flush(key, result): + flushes.append(result) + flush_done.set() + + buf = MediaGroupBuffer(flush_timeout=0.05, on_flush=_on_flush) + key: MediaGroupKey = (1, 2, None, "mg") + ctx1 = object() + ctx2 = object() + + await buf.add_photo(key, _make_photo("f1"), "cap", _make_update(), ctx1) + await buf.add_photo(key, _make_photo("f2"), None, _make_update(), ctx2) + + await asyncio.wait_for(flush_done.wait(), timeout=1.0) + await asyncio.sleep(0.01) + assert flushes[0].last_context is ctx2 + + +# --------------------------------------------------------------------------- +# cancel() +# --------------------------------------------------------------------------- + + +class TestCancel: + async def test_cancel_returns_pending_contents(self): + buf = MediaGroupBuffer(flush_timeout=5.0) + key: MediaGroupKey = (1, 2, None, "mg") + + await buf.add_photo(key, _make_photo("f1"), "cap", _make_update(), object()) + await buf.add_photo(key, _make_photo("f2"), None, _make_update(), object()) + + result = buf.cancel(key) + + assert result is not None + assert result.photo_count == 2 + assert result.caption == "cap" + assert not buf.has_buffer(key) + + async def test_cancel_prevents_flush(self): + """Once cancelled, the debounce timer must not invoke on_flush.""" + flushes: List[BufferedMediaGroup] = [] + + async def _on_flush(key, result): + flushes.append(result) + + buf = MediaGroupBuffer(flush_timeout=0.05, on_flush=_on_flush) + key: MediaGroupKey = (1, 2, None, "mg") + + await buf.add_photo(key, _make_photo("f1"), "cap", _make_update(), object()) + buf.cancel(key) + + # Wait well past the flush timeout. + await asyncio.sleep(0.15) + assert flushes == [] + + async def test_cancel_missing_key(self): + buf = MediaGroupBuffer(flush_timeout=5.0) + assert buf.cancel((1, 2, None, "nope")) is None + + +# --------------------------------------------------------------------------- +# Edge cases +# --------------------------------------------------------------------------- + + +class TestEdgeCases: + async def test_reply_text_failure_does_not_break_buffering(self): + """A failure sending the status message is swallowed silently.""" + buf = MediaGroupBuffer(flush_timeout=5.0) + key: MediaGroupKey = (1, 2, None, "mg") + update = _make_update(with_reply=False) + + # Must not raise — status message send failure is non-fatal. + await buf.add_photo(key, _make_photo("f1"), "cap", update, object()) + assert buf.has_buffer(key) + + async def test_thread_id_in_key_disambiguates(self): + """Same media_group_id in different threads must not collide.""" + buf = MediaGroupBuffer(flush_timeout=5.0) + key_thread_a: MediaGroupKey = (1, 2, 10, "mg") + key_thread_b: MediaGroupKey = (1, 2, 20, "mg") + + await buf.add_photo( + key_thread_a, _make_photo("a"), "cA", _make_update(), object() + ) + await buf.add_photo( + key_thread_b, _make_photo("b"), "cB", _make_update(), object() + ) + + assert len(buf.pending_keys) == 2 + + +# Enable asyncio for all tests in this module. +pytestmark = pytest.mark.asyncio diff --git a/tests/unit/test_bot/test_middleware.py b/tests/unit/test_bot/test_middleware.py index 4ff58365..9959cc40 100644 --- a/tests/unit/test_bot/test_middleware.py +++ b/tests/unit/test_bot/test_middleware.py @@ -35,6 +35,7 @@ def mock_settings(): settings.enable_api_server = False settings.enable_scheduler = False settings.approved_directory = "/tmp/test" + settings.media_group_buffer_timeout = 1.0 return settings