diff --git a/CLAUDE.md b/CLAUDE.md index 10cf4851..aaf6f881 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -50,6 +50,7 @@ The codebase follows a layered service architecture under `src/windows_mcp/`: - Screenshots are capped to 1920x1080 for token efficiency - Mouse/keyboard input uses UIA (same coordinate space as BoundingRectangle; no DPI mismatch) - Screenshot is the preferred fast visual-context tool; Snapshot is the heavier path for UI element ids and DOM extraction +- Both Screenshot and Snapshot accept `window_name` (fuzzy title) or `window_pid` (exact pid) to capture a single window via `desktop/window_resolver.py`; the rect is read with `DwmGetWindowAttribute(DWMWA_EXTENDED_FRAME_BOUNDS)` falling back to `GetWindowRect` - Browser detection (Chrome, Edge, Firefox) triggers special DOM extraction mode in Snapshot - Fuzzy string matching (`thefuzz`) is used for element name matching - UI element fetching has retry logic (`THREAD_MAX_RETRIES=3` in tree service) @@ -64,6 +65,7 @@ The codebase follows a layered service architecture under `src/windows_mcp/`: | `WINDOWS_MCP_PROFILE_SNAPSHOT` | _(off)_ | Set to `1`/`true`/`yes`/`on` to log per-stage timing for Screenshot/Snapshot. Checked in `tools/_snapshot_helpers.py` and `desktop/service.py`. | | `ANONYMIZED_TELEMETRY` | `true` | Set to `false` to disable PostHog telemetry. Checked in `__main__.py` and `analytics.py`. | | `WINDOWS_MCP_DEBUG` | `false` | Set to `1`/`true`/`yes`/`on` to enable debug mode. Checked in `config.py`. Also available as `--debug` CLI flag. | +| `WINDOWS_MCP_DISABLE_FLASH` | _(off)_ | Set to `1`/`true`/`yes`/`on` to suppress the orange-red glowing border that briefly appears after every screenshot. Resolved in `desktop/flash_overlay.py`. | ## Security Context diff --git a/README.md b/README.md index 599c4372..bd2cce5b 100755 --- a/README.md +++ b/README.md @@ -446,6 +446,7 @@ All variables are optional unless noted. Set them via the `env` key in `claude_d | `WINDOWS_MCP_SCREENSHOT_SCALE` | `1.0` | Scale factor applied to screenshots before encoding. Accepts a float in the range `0.1`–`1.0`. Useful on high-resolution displays (1440p, 4K) where the default produces images that exceed Claude Desktop's 1 MB tool-result limit. Set to `0.5` to halve both dimensions (quarter the file size). | | `WINDOWS_MCP_SCREENSHOT_BACKEND` | `auto` | Screenshot capture backend. Accepted values: `auto` (tries dxcam → mss → pillow in order), `dxcam`, `mss`, `pillow`. Use `mss` or `pillow` if `dxcam` is unavailable or causes issues on your GPU. | | `WINDOWS_MCP_PROFILE_SNAPSHOT` | _(disabled)_ | Set to `1`, `true`, `yes`, or `on` to emit per-stage timing logs for Screenshot/Snapshot calls. Useful for diagnosing slow captures. | +| `WINDOWS_MCP_DISABLE_FLASH` | _(disabled)_ | Set to `1`, `true`, `yes`, or `on` to suppress the orange-red glowing border that briefly highlights the captured area after every screenshot. The flash is rendered on a transparent always-on-top window *after* capture so it never appears in the captured image. | ### Telemetry @@ -493,8 +494,8 @@ MCP Client can access the following tools to interact with Windows: - `Move`: Move mouse pointer or drag (set drag=True) to coordinates. - `Shortcut`: Press keyboard shortcuts (`Ctrl+c`, `Alt+Tab`, etc). - `Wait`: Pause for a defined duration. -- `Screenshot`: Fast screenshot-first desktop capture with cursor position, active/open windows, and an image. Skips UI tree extraction for speed and should be the default first call when you mainly need visual context. Supports `display=[0]` or `display=[0,1]` to capture specific screens. -- `Snapshot`: Full desktop state capture for workflows that need interactive element ids, scrollable regions, or `use_dom=True` browser extraction. Supports `use_vision=True` for including screenshots and `display=[0]` or `display=[0,1]` for limiting all returned Snapshot information to specific screens. +- `Screenshot`: Fast screenshot-first desktop capture with cursor position, active/open windows, and an image. Skips UI tree extraction for speed and should be the default first call when you mainly need visual context. Supports `display=[0]` or `display=[0,1]` to capture specific screens, or `window_name="..."` (fuzzy title match) / `window_pid=12345` (exact process id) to capture just one window's bounding rectangle. The targeted window is brought to the foreground first unless `focus_window=False`. `window_name` / `window_pid` and `display` are mutually exclusive. After capture, a brief orange-red glowing border is drawn over the captured area as a visual confirmation (set `WINDOWS_MCP_DISABLE_FLASH=1` to disable). +- `Snapshot`: Full desktop state capture for workflows that need interactive element ids, scrollable regions, or `use_dom=True` browser extraction. Supports `use_vision=True` for including screenshots, `display=[0]` or `display=[0,1]` for limiting all returned Snapshot information to specific screens, and `window_name` / `window_pid` for limiting capture to a specific window's bounding rectangle (mutually exclusive with `display`). - `App`: To launch an application from the start menu, resize or move the window and switch between apps. - `Shell`: To execute PowerShell commands. - `Scrape`: To scrape the entire webpage for information. diff --git a/src/windows_mcp/desktop/flash_overlay.py b/src/windows_mcp/desktop/flash_overlay.py new file mode 100644 index 00000000..831a6717 --- /dev/null +++ b/src/windows_mcp/desktop/flash_overlay.py @@ -0,0 +1,573 @@ +"""Brief on-screen visual confirmation that a screenshot was taken. + +Renders a soft orange-red glow halo around the captured area for ~2.5 s +using a Win32 layered window with per-pixel alpha (``UpdateLayeredWindow`` ++ premultiplied BGRA DIB section). Tk was tried twice and abandoned — +``-transparentcolor`` rendered nothing on some Windows configs and a +multi-Toplevel "strip" approach hung Tk's mainloop on a non-main thread +once more than ~6 windows were created back-to-back. + +The flash is started *after* capture and any active overlay is torn down +before the next capture so it never appears in a captured image. +""" + +import ctypes +import logging +import os +import threading +import time +from ctypes import wintypes + +logger = logging.getLogger(__name__) + +_FLASH_RGB = (0xFF, 0x45, 0x00) +_DURATION_MS = 3500 +_FRAME_INTERVAL_MS = 30 +_GLOW_BORDER_THICKNESS = 8 +_GLOW_BLUR_RADIUS = 14 +_GLOW_MARGIN = _GLOW_BLUR_RADIUS * 3 +_FULLSCREEN_INSET = 6 +_MIN_VISIBLE_INTENSITY = 0.04 +_INTENSITY_QUANT = 32 + +_lock = threading.Lock() +_active_overlay: "_Overlay | None" = None + + +def _flash_disabled() -> bool: + value = os.getenv("WINDOWS_MCP_DISABLE_FLASH", "") + return value.strip().lower() in {"1", "true", "yes", "on"} + + +class _Overlay: + def __init__(self) -> None: + self.stop_event = threading.Event() + self.closed_event = threading.Event() + self.thread: threading.Thread | None = None + + +def cancel_active_flash(timeout: float = 0.25) -> None: + """Tear down any flash overlay currently on screen.""" + global _active_overlay + with _lock: + ov = _active_overlay + _active_overlay = None + if ov is None: + return + ov.stop_event.set() + ov.closed_event.wait(timeout=timeout) + + +def show_capture_flash(capture_rect: "object | None" = None) -> None: + """Show a fade-in/out orange-red glow for a screenshot capture. + + Pass the same ``capture_rect`` (``uia.Rect`` with ``left/top/right/ + bottom``) used for the screenshot — or ``None`` for a full-desktop + capture, in which case every monitor gets an inner-edge halo. Returns + immediately; rendering happens on a daemon thread, and any previously + active overlay is cancelled atomically so a follow-up capture never + leaks a stale overlay. + """ + if _flash_disabled(): + return + try: + if capture_rect is not None: + rects: list[tuple[int, int, int, int]] = [ + ( + int(capture_rect.left), + int(capture_rect.top), + int(capture_rect.right), + int(capture_rect.bottom), + ) + ] + full_screen = False + else: + import windows_mcp.uia as uia + + monitor_rects = uia.GetMonitorsRect() + rects = [(m.left, m.top, m.right, m.bottom) for m in monitor_rects] + full_screen = True + except Exception: + logger.debug("could not resolve flash overlay rects", exc_info=True) + return + if not rects: + return + overlay = _Overlay() + overlay.thread = threading.Thread( + target=_run_overlay, + args=(rects, full_screen, overlay), + name="windows-mcp-flash", + daemon=True, + ) + # Atomic swap: save the previous overlay under the lock, install the new + # one, then signal the prior overlay outside the lock so it can tear down + # without blocking new callers. Without this, an overwrite would orphan + # the prior overlay — cancel_active_flash() could no longer reach it. + with _lock: + global _active_overlay + prev = _active_overlay + _active_overlay = overlay + if prev is not None: + prev.stop_event.set() + overlay.thread.start() + + +# --------------------------------------------------------------------------- +# Win32 plumbing +# --------------------------------------------------------------------------- + +_user32 = ctypes.windll.user32 +_gdi32 = ctypes.windll.gdi32 +_kernel32 = ctypes.windll.kernel32 + +_WS_POPUP = 0x80000000 +_WS_EX_LAYERED = 0x00080000 +_WS_EX_TRANSPARENT = 0x00000020 +_WS_EX_TOPMOST = 0x00000008 +_WS_EX_TOOLWINDOW = 0x00000080 +_WS_EX_NOACTIVATE = 0x08000000 +_ULW_ALPHA = 0x00000002 +_AC_SRC_OVER = 0x00 +_AC_SRC_ALPHA = 0x01 +_BI_RGB = 0 +_DIB_RGB_COLORS = 0 +_SW_SHOWNA = 8 +_HWND_TOPMOST = -1 +_SWP_NOSIZE = 0x0001 +_SWP_NOMOVE = 0x0002 +_SWP_NOACTIVATE = 0x0010 +_SWP_SHOWWINDOW = 0x0040 +_PM_REMOVE = 0x0001 +_WM_DESTROY = 0x0002 + + +class _POINT(ctypes.Structure): + _fields_ = [("x", ctypes.c_long), ("y", ctypes.c_long)] + + +class _SIZE(ctypes.Structure): + _fields_ = [("cx", ctypes.c_long), ("cy", ctypes.c_long)] + + +class _BLENDFUNCTION(ctypes.Structure): + _fields_ = [ + ("BlendOp", ctypes.c_byte), + ("BlendFlags", ctypes.c_byte), + ("SourceConstantAlpha", ctypes.c_byte), + ("AlphaFormat", ctypes.c_byte), + ] + + +class _BITMAPINFOHEADER(ctypes.Structure): + _fields_ = [ + ("biSize", ctypes.c_uint32), + ("biWidth", ctypes.c_long), + ("biHeight", ctypes.c_long), + ("biPlanes", ctypes.c_uint16), + ("biBitCount", ctypes.c_uint16), + ("biCompression", ctypes.c_uint32), + ("biSizeImage", ctypes.c_uint32), + ("biXPelsPerMeter", ctypes.c_long), + ("biYPelsPerMeter", ctypes.c_long), + ("biClrUsed", ctypes.c_uint32), + ("biClrImportant", ctypes.c_uint32), + ] + + +class _BITMAPINFO(ctypes.Structure): + _fields_ = [ + ("bmiHeader", _BITMAPINFOHEADER), + ("bmiColors", ctypes.c_uint32 * 3), + ] + + +# LRESULT is signed pointer-sized integer on Windows (use c_ssize_t for x64). +_LRESULT = ctypes.c_ssize_t + +_WNDPROC = ctypes.WINFUNCTYPE( + _LRESULT, + wintypes.HWND, + ctypes.c_uint, + wintypes.WPARAM, + wintypes.LPARAM, +) + + +class _WNDCLASSEX(ctypes.Structure): + _fields_ = [ + ("cbSize", ctypes.c_uint), + ("style", ctypes.c_uint), + ("lpfnWndProc", _WNDPROC), + ("cbClsExtra", ctypes.c_int), + ("cbWndExtra", ctypes.c_int), + ("hInstance", wintypes.HINSTANCE), + ("hIcon", wintypes.HICON), + ("hCursor", wintypes.HANDLE), + ("hbrBackground", wintypes.HBRUSH), + ("lpszMenuName", wintypes.LPCWSTR), + ("lpszClassName", wintypes.LPCWSTR), + ("hIconSm", wintypes.HICON), + ] + + +_user32.CreateWindowExW.restype = wintypes.HWND +_user32.RegisterClassExW.restype = ctypes.c_ushort +_user32.DefWindowProcW.restype = _LRESULT +_user32.DefWindowProcW.argtypes = [ + wintypes.HWND, + ctypes.c_uint, + wintypes.WPARAM, + wintypes.LPARAM, +] +_user32.GetDC.restype = wintypes.HDC +_user32.GetDC.argtypes = [wintypes.HWND] +_user32.ReleaseDC.restype = ctypes.c_int +_user32.ReleaseDC.argtypes = [wintypes.HWND, wintypes.HDC] +_user32.UpdateLayeredWindow.restype = wintypes.BOOL +_user32.UpdateLayeredWindow.argtypes = [ + wintypes.HWND, + wintypes.HDC, + ctypes.POINTER(_POINT), + ctypes.POINTER(_SIZE), + wintypes.HDC, + ctypes.POINTER(_POINT), + wintypes.COLORREF, + ctypes.POINTER(_BLENDFUNCTION), + wintypes.DWORD, +] +_user32.DestroyWindow.argtypes = [wintypes.HWND] +_user32.ShowWindow.argtypes = [wintypes.HWND, ctypes.c_int] +_user32.SetWindowPos.argtypes = [ + wintypes.HWND, + wintypes.HWND, + ctypes.c_int, + ctypes.c_int, + ctypes.c_int, + ctypes.c_int, + ctypes.c_uint, +] +_gdi32.CreateCompatibleDC.restype = wintypes.HDC +_gdi32.CreateCompatibleDC.argtypes = [wintypes.HDC] +_gdi32.CreateDIBSection.restype = wintypes.HBITMAP +_gdi32.CreateDIBSection.argtypes = [ + wintypes.HDC, + ctypes.POINTER(_BITMAPINFO), + wintypes.UINT, + ctypes.POINTER(ctypes.c_void_p), + wintypes.HANDLE, + wintypes.DWORD, +] +_gdi32.SelectObject.restype = wintypes.HGDIOBJ +_gdi32.SelectObject.argtypes = [wintypes.HDC, wintypes.HGDIOBJ] +_gdi32.DeleteObject.argtypes = [wintypes.HGDIOBJ] +_gdi32.DeleteDC.argtypes = [wintypes.HDC] + + +# --------------------------------------------------------------------------- +# Rendering +# --------------------------------------------------------------------------- + + +def _render_glow_rgba( + width: int, + height: int, + rect_list: list[tuple[int, int, int, int]], + *, + outward: bool = True, +) -> "object": + """Return a PIL RGBA image with a soft halo ring around each rect. + + Each rect is in window-local coordinates. A sharp solid border is drawn + just outside the rect edge (``outward=True``) so the captured area stays + clean and the halo reads as a surround, then the layer is + gaussian-blurred to spread the glow, and the sharp ring is composited + back on top so the inner edge stays crisp. ``outward=False`` nests the + ring inward — used for the full-screen inner halo. + """ + from PIL import Image, ImageDraw, ImageFilter + + sharp = Image.new("RGBA", (width, height), (0, 0, 0, 0)) + draw = ImageDraw.Draw(sharp) + color = (*_FLASH_RGB, 255) + for x1, y1, x2, y2 in rect_list: + for i in range(_GLOW_BORDER_THICKNESS): + if outward: + draw.rectangle( + [x1 - i - 1, y1 - i - 1, x2 + i, y2 + i], + outline=color, + width=1, + ) + else: + draw.rectangle( + [x1 + i, y1 + i, x2 - i - 1, y2 - i - 1], + outline=color, + width=1, + ) + blurred = sharp.filter(ImageFilter.GaussianBlur(radius=_GLOW_BLUR_RADIUS)) + return Image.alpha_composite(blurred, sharp) + + +def _premultiplied_bgra(rgba_image, intensity: float) -> bytes: + """Convert PIL RGBA to BGRA premultiplied bytes scaled by ``intensity``.""" + bgra = bytearray(rgba_image.tobytes("raw", "BGRA")) + if intensity >= 1.0: + for i in range(0, len(bgra), 4): + a = bgra[i + 3] + if a == 0: + continue + bgra[i] = (bgra[i] * a) // 255 + bgra[i + 1] = (bgra[i + 1] * a) // 255 + bgra[i + 2] = (bgra[i + 2] * a) // 255 + else: + for i in range(0, len(bgra), 4): + a = (bgra[i + 3] * int(intensity * 255)) // 255 + bgra[i + 3] = a + if a == 0: + bgra[i] = 0 + bgra[i + 1] = 0 + bgra[i + 2] = 0 + continue + bgra[i] = (bgra[i] * a) // 255 + bgra[i + 1] = (bgra[i + 1] * a) // 255 + bgra[i + 2] = (bgra[i + 2] * a) // 255 + return bytes(bgra) + + +# --------------------------------------------------------------------------- +# Window management +# --------------------------------------------------------------------------- + + +@_WNDPROC +def _wnd_proc(hwnd, msg, wparam, lparam): + if msg == _WM_DESTROY: + _user32.PostQuitMessage(0) + return 0 + return _user32.DefWindowProcW(hwnd, msg, wparam, lparam) + + +def _create_layered_window(class_name: str, x: int, y: int, w: int, h: int): + h_instance = _kernel32.GetModuleHandleW(None) + wc = _WNDCLASSEX() + wc.cbSize = ctypes.sizeof(_WNDCLASSEX) + wc.style = 0 + wc.lpfnWndProc = _wnd_proc + wc.cbClsExtra = 0 + wc.cbWndExtra = 0 + wc.hInstance = h_instance + wc.hIcon = None + wc.hCursor = None + wc.hbrBackground = None + wc.lpszMenuName = None + wc.lpszClassName = class_name + wc.hIconSm = None + + atom = _user32.RegisterClassExW(ctypes.byref(wc)) + if not atom: + raise OSError(f"RegisterClassExW failed: {ctypes.get_last_error()}") + + ex_style = ( + _WS_EX_LAYERED | _WS_EX_TRANSPARENT | _WS_EX_TOPMOST | _WS_EX_TOOLWINDOW | _WS_EX_NOACTIVATE + ) + hwnd = _user32.CreateWindowExW( + ex_style, + class_name, + "windows-mcp-flash", + _WS_POPUP, + x, + y, + w, + h, + None, + None, + h_instance, + None, + ) + if not hwnd: + _user32.UnregisterClassW(class_name, h_instance) + raise OSError(f"CreateWindowExW failed: {ctypes.get_last_error()}") + return hwnd, h_instance + + +def _push_bitmap(hwnd, x: int, y: int, w: int, h: int, bgra: bytes) -> None: + screen_dc = _user32.GetDC(None) + if not screen_dc: + raise OSError("GetDC failed") + try: + mem_dc = _gdi32.CreateCompatibleDC(screen_dc) + if not mem_dc: + raise OSError("CreateCompatibleDC failed") + try: + bmi = _BITMAPINFO() + bmi.bmiHeader.biSize = ctypes.sizeof(_BITMAPINFOHEADER) + bmi.bmiHeader.biWidth = w + bmi.bmiHeader.biHeight = -h # top-down DIB + bmi.bmiHeader.biPlanes = 1 + bmi.bmiHeader.biBitCount = 32 + bmi.bmiHeader.biCompression = _BI_RGB + + bits_ptr = ctypes.c_void_p() + hbm = _gdi32.CreateDIBSection( + screen_dc, + ctypes.byref(bmi), + _DIB_RGB_COLORS, + ctypes.byref(bits_ptr), + None, + 0, + ) + if not hbm: + raise OSError("CreateDIBSection failed") + try: + ctypes.memmove(bits_ptr, bgra, len(bgra)) + old_bmp = _gdi32.SelectObject(mem_dc, hbm) + try: + pos = _POINT(x, y) + size = _SIZE(w, h) + src_pos = _POINT(0, 0) + blend = _BLENDFUNCTION(_AC_SRC_OVER, 0, 255, _AC_SRC_ALPHA) + ok = _user32.UpdateLayeredWindow( + hwnd, + screen_dc, + ctypes.byref(pos), + ctypes.byref(size), + mem_dc, + ctypes.byref(src_pos), + 0, + ctypes.byref(blend), + _ULW_ALPHA, + ) + if not ok: + raise OSError(f"UpdateLayeredWindow failed: {ctypes.get_last_error()}") + finally: + _gdi32.SelectObject(mem_dc, old_bmp) + finally: + _gdi32.DeleteObject(hbm) + finally: + _gdi32.DeleteDC(mem_dc) + finally: + _user32.ReleaseDC(None, screen_dc) + + +def _pump_messages(hwnd) -> None: + msg = wintypes.MSG() + while _user32.PeekMessageW(ctypes.byref(msg), hwnd, 0, 0, _PM_REMOVE): + _user32.TranslateMessage(ctypes.byref(msg)) + _user32.DispatchMessageW(ctypes.byref(msg)) + + +def _intensity_at(t_norm: float, full_screen: bool) -> float: + if full_screen: + return 1.0 - abs(2 * t_norm - 1) + if t_norm < 0.15: + return t_norm / 0.15 + if t_norm < 0.65: + return 1.0 + return max(0.0, 1.0 - (t_norm - 0.65) / 0.35) + + +# --------------------------------------------------------------------------- +# Daemon thread entry point +# --------------------------------------------------------------------------- + + +def _run_overlay( + rects: list[tuple[int, int, int, int]], + full_screen: bool, + overlay: _Overlay, +) -> None: + try: + from PIL import Image # noqa: F401 — fail fast if Pillow missing + except Exception: + logger.debug("Pillow unavailable; skipping screenshot flash") + overlay.closed_event.set() + return + + hwnd = None + h_instance = None + class_name = f"WindowsMCPFlash_{id(overlay):x}" + + try: + union_left = min(r[0] for r in rects) + union_top = min(r[1] for r in rects) + union_right = max(r[2] for r in rects) + union_bottom = max(r[3] for r in rects) + if not full_screen: + union_left -= _GLOW_MARGIN + union_top -= _GLOW_MARGIN + union_right += _GLOW_MARGIN + union_bottom += _GLOW_MARGIN + width = union_right - union_left + height = union_bottom - union_top + if width <= 0 or height <= 0: + return + + local_rects = [] + for r_left, r_top, r_right, r_bottom in rects: + inset = _FULLSCREEN_INSET if full_screen else 0 + local_rects.append( + ( + r_left - union_left + inset, + r_top - union_top + inset, + r_right - union_left - inset, + r_bottom - union_top - inset, + ) + ) + + hwnd, h_instance = _create_layered_window(class_name, union_left, union_top, width, height) + _user32.ShowWindow(hwnd, _SW_SHOWNA) + _user32.SetWindowPos( + hwnd, + _HWND_TOPMOST, + 0, + 0, + 0, + 0, + _SWP_NOSIZE | _SWP_NOMOVE | _SWP_NOACTIVATE | _SWP_SHOWWINDOW, + ) + + glow_rgba = _render_glow_rgba(width, height, local_rects, outward=not full_screen) + + logger.info( + "screenshot flash overlay started: %dx%d layered window at (%d,%d) for %d rect(s)", + width, + height, + union_left, + union_top, + len(rects), + ) + + start = time.perf_counter() + last_intensity_q = -1 + while not overlay.stop_event.is_set(): + elapsed_ms = (time.perf_counter() - start) * 1000 + if elapsed_ms >= _DURATION_MS: + break + intensity = _intensity_at(elapsed_ms / _DURATION_MS, full_screen) + intensity_q = round(intensity * _INTENSITY_QUANT) + if intensity_q != last_intensity_q: + if intensity < _MIN_VISIBLE_INTENSITY: + bgra = b"\x00" * (width * height * 4) + else: + bgra = _premultiplied_bgra(glow_rgba, intensity) + _push_bitmap(hwnd, union_left, union_top, width, height, bgra) + last_intensity_q = intensity_q + _pump_messages(hwnd) + time.sleep(_FRAME_INTERVAL_MS / 1000) + except Exception: + logger.debug("screenshot flash overlay failed", exc_info=True) + finally: + try: + if hwnd: + _user32.DestroyWindow(hwnd) + except Exception: + pass + try: + if h_instance: + _user32.UnregisterClassW(class_name, h_instance) + except Exception: + pass + with _lock: + global _active_overlay + if _active_overlay is overlay: + _active_overlay = None + overlay.closed_event.set() diff --git a/src/windows_mcp/desktop/service.py b/src/windows_mcp/desktop/service.py index 8514bd60..cde51840 100755 --- a/src/windows_mcp/desktop/service.py +++ b/src/windows_mcp/desktop/service.py @@ -15,6 +15,8 @@ from PIL import ImageFont, ImageDraw, Image from windows_mcp.tree.service import Tree from windows_mcp.desktop import screenshot as screenshot_capture +from windows_mcp.desktop import flash_overlay +from windows_mcp.desktop import window_resolver from locale import getpreferredencoding from contextlib import contextmanager from typing import Literal @@ -91,6 +93,7 @@ def get_state( grid_lines: tuple[int, int] | None = None, display_indices: list[int] | None = None, max_image_size: Size | None = None, + capture_rect: "uia.Rect | None" = None, ) -> DesktopState: use_annotation = use_annotation is True or ( isinstance(use_annotation, str) and use_annotation.lower() == "true" @@ -117,7 +120,10 @@ def get_state( screenshot_capture_ms = 0.0 screenshot_resize_ms = 0.0 state_build_ms = 0.0 - capture_rect = self.get_display_union_rect(display_indices) if display_indices else None + if capture_rect is None and display_indices: + capture_rect = self.get_display_union_rect(display_indices) + elif capture_rect is not None and display_indices: + raise ValueError("capture_rect and display_indices are mutually exclusive") screenshot_region = self._rect_to_bounding_box(capture_rect) if capture_rect else None # Fast path for Screenshot tool (use_ui_tree=False): skip window enumeration. @@ -253,7 +259,9 @@ def get_state( screenshot_region=screenshot_region, screenshot_displays=display_indices, tree_state=tree_state, - screenshot_backend=getattr(self, "_last_screenshot_backend", None) if use_vision else None, + screenshot_backend=getattr(self, "_last_screenshot_backend", None) + if use_vision + else None, capture_sec=time() - start_time, ) if profile_enabled: @@ -341,7 +349,9 @@ def _get_apps_from_shortcuts(self) -> dict[str, str]: apps[name] = lnk_path return apps - def execute_command(self, command: str, timeout: int = 10, shell: str | None = None) -> tuple[str, int]: + def execute_command( + self, command: str, timeout: int = 10, shell: str | None = None + ) -> tuple[str, int]: return PowerShellExecutor.execute_command(command, timeout, shell) def is_window_browser(self, node: uia.Control): @@ -358,7 +368,9 @@ def get_default_language(self) -> str: reader = csv.DictReader(io.StringIO(response)) return "".join([row.get("DisplayName") for row in reader]) - def _find_window_by_name(self, name: str, refresh_state: bool = False) -> tuple["Window | None", str]: + def _find_window_by_name( + self, name: str, refresh_state: bool = False + ) -> tuple["Window | None", str]: """Find a window by fuzzy name match. Returns (window, error_msg). If the returned window is None, error_msg describes the failure reason. @@ -632,7 +644,7 @@ def get_coordinates_from_labels(self, labels: list[int]) -> list[tuple[int, int] results.append((element_node.center.x, element_node.center.y)) return results - def click(self, loc: tuple[int, int]|list[int], button: str = "left", clicks: int = 2): + def click(self, loc: tuple[int, int] | list[int], button: str = "left", clicks: int = 2): if isinstance(loc, list): x, y = loc[0], loc[1] else: @@ -714,7 +726,7 @@ def scroll( return 'Invalid type. Use "horizontal" or "vertical".' return None - def drag(self, loc: tuple[int, int]|list[int]): + def drag(self, loc: tuple[int, int] | list[int]): if isinstance(loc, list): x, y = loc[0], loc[1] else: @@ -956,10 +968,10 @@ def get_xpath_from_element(self, element: uia.Control): xpath = "/".join(path_parts) return xpath - - def get_windows_version(self) -> str: - response, status = PowerShellExecutor.execute_command("(Get-CimInstance Win32_OperatingSystem).Caption") + response, status = PowerShellExecutor.execute_command( + "(Get-CimInstance Win32_OperatingSystem).Caption" + ) if status == 0: return response.strip() return "Windows" @@ -997,14 +1009,18 @@ def parse_display_selection( return None if isinstance(display, bool): - raise ValueError("display must be a JSON array of non-negative integers, for example [0] or [0,1]") + raise ValueError( + "display must be a JSON array of non-negative integers, for example [0] or [0,1]" + ) if isinstance(display, int): values = [display] elif isinstance(display, (list, tuple)): values = list(display) else: - raise ValueError("display must be a JSON array of non-negative integers, for example [0] or [0,1]") + raise ValueError( + "display must be a JSON array of non-negative integers, for example [0] or [0,1]" + ) unique_values: list[int] = [] for value in values: @@ -1017,7 +1033,9 @@ def parse_display_selection( def get_display_union_rect(self, display_indices: list[int]) -> uia.Rect: monitor_rects = uia.GetMonitorsRect() if not monitor_rects: - logger.warning("Monitor enumeration returned no monitors while display filter was requested") + logger.warning( + "Monitor enumeration returned no monitors while display filter was requested" + ) raise ValueError("No displays detected") invalid_indices = [index for index in display_indices if index >= len(monitor_rects)] @@ -1039,9 +1057,66 @@ def get_display_union_rect(self, display_indices: list[int]) -> uia.Rect: bottom=max(rect.bottom for rect in selected_rects), ) + def resolve_window_capture_rect( + self, + *, + name: str | None = None, + pid: int | None = None, + focus: bool = True, + ) -> tuple[uia.Rect, str]: + """Resolve a top-level window to a capture rectangle. + + Returns ``(rect, title)``. If ``focus`` is True, the window is brought + to the foreground and unminimized before its rect is read so the + screenshot will show the actual on-screen content. + """ + hwnd, title = window_resolver.resolve_window(name=name, pid=pid) + if focus: + try: + self.bring_window_to_top(hwnd) + except Exception: + logger.debug("bring_window_to_top failed for %s", title, exc_info=True) + window_resolver.restore_if_minimized(hwnd) + sleep(0.1) + if not window_resolver.is_foreground(hwnd): + window_resolver.force_foreground(hwnd) + sleep(0.1) + if not window_resolver.is_foreground(hwnd): + # bring_window_to_top swallows its own exceptions, so we can't rely on + # them as a failure signal. The explicit foreground check above is the + # only reliable signal — refuse the capture rather than silently shoot + # whatever happens to be on top. The user can pass focus_window=False + # to accept that risk explicitly. + raise window_resolver.WindowNotFoundError( + f"Could not bring window {title!r} to the foreground after multiple " + "attempts (likely blocked by elevation or another foreground-locking " + "process). Focus it manually and retry, or pass focus_window=False to " + "capture its current rect even though it may be obscured." + ) + else: + if window_resolver.is_iconic(hwnd): + raise window_resolver.WindowNotFoundError( + f"Window {title!r} is minimized; pass focus_window=True to restore it" + ) + if not window_resolver.is_foreground(hwnd): + raise window_resolver.WindowNotFoundError( + f"Window {title!r} is not the foreground window; " + "screenshot would capture whatever is on top instead. " + "Pass focus_window=True (default) to bring it forward, " + "or focus the window manually first." + ) + rect = window_resolver.get_window_rect(hwnd) + if rect.isempty(): + raise window_resolver.WindowNotFoundError( + f"Window {title!r} has an empty bounding rectangle" + ) + return rect, title + def get_screenshot(self, capture_rect: uia.Rect | None = None) -> Image.Image: + flash_overlay.cancel_active_flash() image, used_backend = screenshot_capture.capture(capture_rect) self._last_screenshot_backend = used_backend + flash_overlay.show_capture_flash(capture_rect) return image def get_annotated_screenshot( @@ -1151,7 +1226,9 @@ def draw_annotation(label, node: TreeElementNode): # Draw "Cursor" label c_label = "CURSOR" c_label_width = draw.textlength(c_label, font=font) - draw.rectangle([acx + r, acy - r, acx + r + c_label_width + 4, acy - r + 16], fill="red") + draw.rectangle( + [acx + r, acy - r, acx + r + c_label_width + 4, acy - r + 16], fill="red" + ) draw.text((acx + r + 2, acy - r), c_label, fill="white", font=font) if capture_rect: @@ -1200,9 +1277,7 @@ def _clip_bounding_box_to_region( height=bottom - top, ) - def _filter_window_to_region( - self, window: Window | None, region: BoundingBox - ) -> Window | None: + def _filter_window_to_region(self, window: Window | None, region: BoundingBox) -> Window | None: if window is None: return None clipped_box = self._clip_bounding_box_to_region(window.bounding_box, region) @@ -1218,9 +1293,7 @@ def _filter_window_to_region( process_id=window.process_id, ) - def _filter_windows_to_region( - self, windows: list[Window], region: BoundingBox - ) -> list[Window]: + def _filter_windows_to_region(self, windows: list[Window], region: BoundingBox) -> list[Window]: filtered_windows: list[Window] = [] for window in windows: filtered_window = self._filter_window_to_region(window, region) @@ -1338,7 +1411,7 @@ def send_notification(self, title: str, message: str, app_id: str) -> str: if status == 0: return f'Notification sent: "{title}" - {message}' else: - return f'Notification may have been sent. PowerShell output: {response[:200]}' + return f"Notification may have been sent. PowerShell output: {response[:200]}" def list_processes( self, diff --git a/src/windows_mcp/desktop/window_resolver.py b/src/windows_mcp/desktop/window_resolver.py new file mode 100644 index 00000000..4beb71b6 --- /dev/null +++ b/src/windows_mcp/desktop/window_resolver.py @@ -0,0 +1,140 @@ +"""Resolve a top-level window by title or PID into a capture rectangle. + +Used by the Screenshot/Snapshot tools to support targeting a specific +window without taking a full-desktop screenshot. The resolver is decoupled +from ``Desktop`` so it can be unit-tested without spinning up UIA. +""" + +import ctypes +import ctypes.wintypes +import logging + +import win32con +import win32gui +import win32process +from fuzzywuzzy import process + +import windows_mcp.uia as uia + +logger = logging.getLogger(__name__) + +DWMWA_EXTENDED_FRAME_BOUNDS = 9 +_FUZZY_SCORE_CUTOFF = 70 + + +class WindowNotFoundError(ValueError): + """Raised when no visible top-level window matches the supplied criteria.""" + + +def enumerate_visible_windows() -> list[tuple[int, str, int]]: + """Return ``(hwnd, title, pid)`` for every visible, non-cloaked top-level window. + + The list is intended for matching, not display, so untitled windows are + included for PID-based lookup. + """ + results: list[tuple[int, str, int]] = [] + + def callback(hwnd: int, _: object) -> bool: + try: + if not win32gui.IsWindow(hwnd) or not win32gui.IsWindowVisible(hwnd): + return True + title = win32gui.GetWindowText(hwnd) + _, pid = win32process.GetWindowThreadProcessId(hwnd) + results.append((hwnd, title, pid)) + except Exception: + pass + return True + + win32gui.EnumWindows(callback, None) + return results + + +def get_window_rect(hwnd: int) -> uia.Rect: + """Return the window's frame rect, preferring DWM extended bounds.""" + rect = ctypes.wintypes.RECT() + try: + hr = ctypes.windll.dwmapi.DwmGetWindowAttribute( + ctypes.wintypes.HWND(hwnd), + ctypes.wintypes.DWORD(DWMWA_EXTENDED_FRAME_BOUNDS), + ctypes.byref(rect), + ctypes.sizeof(rect), + ) + except Exception: + hr = 1 + if hr == 0: + return uia.Rect(rect.left, rect.top, rect.right, rect.bottom) + left, top, right, bottom = win32gui.GetWindowRect(hwnd) + return uia.Rect(left, top, right, bottom) + + +def resolve_window( + *, + name: str | None = None, + pid: int | None = None, + windows: list[tuple[int, str, int]] | None = None, +) -> tuple[int, str]: + """Resolve a window by exact PID or fuzzy title match. + + PID takes precedence when both are given. Returns ``(hwnd, title)``. + Raises :class:`WindowNotFoundError` if nothing matches. + """ + if name is None and pid is None: + raise ValueError("resolve_window requires either name or pid") + + if windows is None: + windows = enumerate_visible_windows() + + if pid is not None: + candidates = [(hwnd, title) for hwnd, title, win_pid in windows if win_pid == pid] + if not candidates: + raise WindowNotFoundError(f"No visible window found for PID {pid}") + # Prefer windows that have a title; fall back to the first match. + candidates.sort(key=lambda t: 0 if t[1] else 1) + return candidates[0] + + titled = [(hwnd, title) for hwnd, title, _ in windows if title] + if not titled: + raise WindowNotFoundError("No titled windows available for name match") + titles = [title for _, title in titled] + match = process.extractOne(name, titles, score_cutoff=_FUZZY_SCORE_CUTOFF) + if match is None: + raise WindowNotFoundError(f"No window title matched {name!r} (score cutoff 70)") + matched_title, _ = match + for hwnd, title in titled: + if title == matched_title: + return hwnd, title + raise WindowNotFoundError(f"No window title matched {name!r}") + + +def is_iconic(hwnd: int) -> bool: + return bool(win32gui.IsIconic(hwnd)) + + +def is_foreground(hwnd: int) -> bool: + """True if ``hwnd`` is currently the system foreground window.""" + try: + return win32gui.GetForegroundWindow() == hwnd + except Exception: + return False + + +def restore_if_minimized(hwnd: int) -> None: + if is_iconic(hwnd): + win32gui.ShowWindow(hwnd, win32con.SW_RESTORE) + + +def force_foreground(hwnd: int) -> None: + """Last-resort focus attempt via SwitchToThisWindow. + + ``SetForegroundWindow`` is silently rejected when the calling process + didn't receive the last input event (Windows foreground lock), even + after the AttachThreadInput dance. ``SwitchToThisWindow`` is the + undocumented Win32 API the shell uses for Alt-Tab; it works around the + lock without injecting keyboard input. + """ + try: + ctypes.windll.user32.SwitchToThisWindow( + ctypes.wintypes.HWND(hwnd), ctypes.wintypes.BOOL(True) + ) + except Exception: + logger.debug("SwitchToThisWindow failed for hwnd %s", hwnd, exc_info=True) diff --git a/src/windows_mcp/tools/_snapshot_helpers.py b/src/windows_mcp/tools/_snapshot_helpers.py index baed9936..0ffa5da4 100644 --- a/src/windows_mcp/tools/_snapshot_helpers.py +++ b/src/windows_mcp/tools/_snapshot_helpers.py @@ -53,6 +53,9 @@ def capture_desktop_state( height_reference_line: int | None, display: list[int] | None, tool_name: str, + window_name: str | None = None, + window_pid: int | None = None, + focus_window: bool = True, ): profile_enabled = _snapshot_profile_enabled() profile_started_at = time.perf_counter() @@ -66,6 +69,17 @@ def capture_desktop_state( display_indices = Desktop.parse_display_selection(display) + capture_rect = None + target_window_title = None + if window_name or window_pid is not None: + if display_indices: + raise ValueError("window_name/window_pid and display are mutually exclusive") + capture_rect, target_window_title = desktop.resolve_window_capture_rect( + name=window_name, + pid=window_pid, + focus=focus_window, + ) + grid_lines = None if width_reference_line and height_reference_line: grid_lines = (int(width_reference_line), int(height_reference_line)) @@ -80,6 +94,7 @@ def capture_desktop_state( grid_lines=grid_lines, display_indices=display_indices, max_image_size=Size(width=MAX_IMAGE_WIDTH, height=MAX_IMAGE_HEIGHT), + capture_rect=capture_rect, ) if profile_enabled: desktop_state_ms = (time.perf_counter() - stage_started_at) * 1000 @@ -128,6 +143,7 @@ def capture_desktop_state( "active_desktop": active_desktop, "all_desktops": all_desktops, "screenshot_bytes": screenshot_bytes, + "target_window_title": target_window_title, } @@ -163,18 +179,21 @@ def build_snapshot_response( " for click, move and other mouse actions)\n" ) if desktop_state.screenshot_region: + metadata_text += f"Screenshot Region: {desktop_state.screenshot_region.xyxy_to_string()}\n" + if desktop_state.screenshot_displays: metadata_text += ( - f"Screenshot Region: {desktop_state.screenshot_region.xyxy_to_string()}\n" + f"Displays: {','.join(str(index) for index in desktop_state.screenshot_displays)}\n" ) - if desktop_state.screenshot_displays: - metadata_text += f"Displays: {','.join(str(index) for index in desktop_state.screenshot_displays)}\n" metadata_text += "Coordinate Space: Virtual desktop coordinates\n" if desktop_state.screenshot_backend: metadata_text += f"Screenshot Backend: {desktop_state.screenshot_backend}\n" + target_window_title = capture_result.get("target_window_title") + if target_window_title: + metadata_text += f"Target Window: {target_window_title}\n" if ui_detail_note: metadata_text += f"{ui_detail_note}\n" - response_text = dedent(f''' + response_text = dedent(f""" {metadata_text} Active Desktop: {active_desktop} @@ -187,14 +206,14 @@ def build_snapshot_response( Opened Windows: {windows} - ''') + """) if include_ui_details: - response_text += dedent(f''' + response_text += dedent(f""" UI Tree: - {semantic_tree or "No elements found."}''') + {semantic_tree or "No elements found."}""") response = [response_text] if screenshot_bytes: - response.append(Image(data=screenshot_bytes, format='png')) + response.append(Image(data=screenshot_bytes, format="png")) return response diff --git a/src/windows_mcp/tools/snapshot.py b/src/windows_mcp/tools/snapshot.py index 9f1a7b2b..a9d16d12 100644 --- a/src/windows_mcp/tools/snapshot.py +++ b/src/windows_mcp/tools/snapshot.py @@ -21,9 +21,10 @@ def register(mcp, *, get_desktop, get_analytics): global state_tool, screenshot_tool + @mcp.tool( - name='Snapshot', - description="Take a screenshot and inspect the screen. Keywords: screenshot, screen capture, see screen, observe, look, inspect, UI elements, what's on screen. Captures complete desktop state including: system language, focused/opened windows, interactive elements (buttons, text fields, links, menus with coordinates), and scrollable areas. Set use_vision=True to include screenshot with cursor highlight. Set use_annotation=False to get a clean screenshot without bounding box overlays on UI elements (default: True, draws colored rectangles around detected elements). Set use_ui_tree=False for a faster screenshot-only snapshot when you do not need interactive or scrollable element extraction. Set width_reference_lines/height_reference_lines to overlay a grid for better spatial reasoning (make sure vision is enabled to use it). Set use_dom=True for browser content to get web page elements instead of browser UI. Set display=[0] or display=[0,1] to limit all returned Snapshot information to specific screens; omit it to keep the default full-desktop behavior. Always call this first to understand the current desktop state before taking actions.", + name="Snapshot", + description="Take a screenshot and inspect the screen. Keywords: screenshot, screen capture, see screen, observe, look, inspect, UI elements, what's on screen. Captures complete desktop state including: system language, focused/opened windows, interactive elements (buttons, text fields, links, menus with coordinates), and scrollable areas. Set use_vision=True to include screenshot with cursor highlight. Set use_annotation=False to get a clean screenshot without bounding box overlays on UI elements (default: True, draws colored rectangles around detected elements). Set use_ui_tree=False for a faster screenshot-only snapshot when you do not need interactive or scrollable element extraction. Set width_reference_lines/height_reference_lines to overlay a grid for better spatial reasoning (make sure vision is enabled to use it). Set use_dom=True for browser content to get web page elements instead of browser UI. Set display=[0] or display=[0,1] to limit all returned Snapshot information to specific screens; omit it to keep the default full-desktop behavior. Set window_name (fuzzy title match) or window_pid (exact process id) to capture only that window's bounding rectangle; the window is brought to the foreground first unless focus_window=False. window_name/window_pid and display are mutually exclusive. Always call this first to understand the current desktop state before taking actions.", annotations=ToolAnnotations( title="Snapshot", readOnlyHint=True, @@ -41,6 +42,9 @@ def _state_tool( width_reference_line: int | None = None, height_reference_line: int | None = None, display: list[int] | None = None, + window_name: str | None = None, + window_pid: int | None = None, + focus_window: bool | str = True, ctx: Context = None, ): try: @@ -54,22 +58,25 @@ def _state_tool( height_reference_line=height_reference_line, display=display, tool_name="Snapshot tool", + window_name=window_name, + window_pid=window_pid, + focus_window=_as_bool(focus_window), ) except Exception as e: logger.warning( "Snapshot failed with display=%s use_vision=%s use_dom=%s", display, - use_vision if 'use_vision' in locals() else None, - use_dom if 'use_dom' in locals() else None, + use_vision if "use_vision" in locals() else None, + use_dom if "use_dom" in locals() else None, exc_info=True, ) - return [f'Error capturing desktop state: {str(e)}. Please try again.'] + return [f"Error capturing desktop state: {str(e)}. Please try again."] return build_snapshot_response(capture_result, include_ui_details=True) @mcp.tool( - name='Screenshot', - description="Captures a fast screenshot-first desktop snapshot with cursor position, desktop/window summaries, and an image. This path skips UI tree extraction for speed. Use Snapshot when you need interactive element ids, scrollable regions, or browser DOM extraction. Note: the returned image may be downscaled for efficiency; when it is, multiply image coordinates by the ratio of original size to displayed size to get the actual screen coordinates for mouse actions (Click, Move, etc.).", + name="Screenshot", + description="Captures a fast screenshot-first desktop snapshot with cursor position, desktop/window summaries, and an image. This path skips UI tree extraction for speed. Use Snapshot when you need interactive element ids, scrollable regions, or browser DOM extraction. Set window_name (fuzzy title match) or window_pid (exact process id) to capture just that window's bounding rectangle; the window is brought to the foreground first unless focus_window=False. window_name/window_pid and display are mutually exclusive. Note: the returned image may be downscaled for efficiency; when it is, multiply image coordinates by the ratio of original size to displayed size to get the actual screen coordinates for mouse actions (Click, Move, etc.).", annotations=ToolAnnotations( title="Screenshot", readOnlyHint=True, @@ -84,6 +91,9 @@ def _screenshot_tool( width_reference_line: int | None = None, height_reference_line: int | None = None, display: list[int] | None = None, + window_name: str | None = None, + window_pid: int | None = None, + focus_window: bool | str = True, ctx: Context = None, ): try: @@ -97,6 +107,9 @@ def _screenshot_tool( height_reference_line=height_reference_line, display=display, tool_name="Screenshot tool", + window_name=window_name, + window_pid=window_pid, + focus_window=_as_bool(focus_window), ) except Exception as e: logger.warning( @@ -104,7 +117,7 @@ def _screenshot_tool( display, exc_info=True, ) - return [f'Error capturing screenshot: {str(e)}. Please try again.'] + return [f"Error capturing screenshot: {str(e)}. Please try again."] return build_snapshot_response( capture_result, diff --git a/tests/conftest.py b/tests/conftest.py index a7008ebb..20d68522 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,5 +1,13 @@ +import os + import pytest +# Disable the post-screenshot flash overlay during the test suite. The overlay +# spawns a Tk window on a daemon thread which races with pytest teardown and +# can crash the interpreter. Tests for the flash itself set/clear this env var +# explicitly via monkeypatch. +os.environ.setdefault("WINDOWS_MCP_DISABLE_FLASH", "1") + from windows_mcp.tree.views import BoundingBox, Center, TreeElementNode, ScrollElementNode from windows_mcp.desktop.views import Window, Status, DesktopState diff --git a/tests/test_flash_overlay.py b/tests/test_flash_overlay.py new file mode 100644 index 00000000..f3bad33d --- /dev/null +++ b/tests/test_flash_overlay.py @@ -0,0 +1,225 @@ +"""Unit tests for the screenshot flash overlay. + +The actual Tk window is never created in these tests — they exercise the +public dispatch surface (env-var gating, lifecycle bookkeeping, fallthrough +when ``tkinter`` cannot be imported). +""" + +import sys +import threading +from unittest.mock import patch + +import pytest + +import windows_mcp.desktop.flash_overlay as flash_overlay + + +@pytest.fixture(autouse=True) +def _reset_active_overlay(): + """Each test starts and ends with no overlay registered.""" + with flash_overlay._lock: + flash_overlay._active_overlay = None + yield + with flash_overlay._lock: + ov = flash_overlay._active_overlay + flash_overlay._active_overlay = None + if ov is not None: + ov.stop_event.set() + + +class TestFlashDisabled: + def test_default_is_enabled(self, monkeypatch): + monkeypatch.delenv("WINDOWS_MCP_DISABLE_FLASH", raising=False) + assert flash_overlay._flash_disabled() is False + + @pytest.mark.parametrize("value", ["1", "true", "yes", "on", "TRUE", " On "]) + def test_truthy_values_disable(self, monkeypatch, value): + monkeypatch.setenv("WINDOWS_MCP_DISABLE_FLASH", value) + assert flash_overlay._flash_disabled() is True + + @pytest.mark.parametrize("value", ["0", "false", "no", "off", ""]) + def test_falsy_values_keep_enabled(self, monkeypatch, value): + monkeypatch.setenv("WINDOWS_MCP_DISABLE_FLASH", value) + assert flash_overlay._flash_disabled() is False + + +class _FakeRect: + """Stand-in for ``uia.Rect`` with the four corner attributes.""" + + def __init__(self, left, top, right, bottom): + self.left = left + self.top = top + self.right = right + self.bottom = bottom + + +class TestShowCaptureFlash: + def test_disabled_env_var_skips_thread(self, monkeypatch): + monkeypatch.setenv("WINDOWS_MCP_DISABLE_FLASH", "1") + with patch.object(threading, "Thread") as fake_thread: + flash_overlay.show_capture_flash(_FakeRect(0, 0, 100, 100)) + fake_thread.assert_not_called() + assert flash_overlay._active_overlay is None + + def test_empty_monitor_rects_skips_thread(self, monkeypatch): + """Full-screen path with zero monitors must not start a thread.""" + monkeypatch.delenv("WINDOWS_MCP_DISABLE_FLASH", raising=False) + # Patch on the real uia module — ``import windows_mcp.uia as uia`` resolves + # via the cached parent-package attribute once another test has imported + # the package, so monkeypatching sys.modules is not reliable on its own. + import windows_mcp.uia + + monkeypatch.setattr(windows_mcp.uia, "GetMonitorsRect", lambda: []) + with patch.object(threading, "Thread") as fake_thread: + flash_overlay.show_capture_flash(None) + fake_thread.assert_not_called() + assert flash_overlay._active_overlay is None + + def test_region_capture_passes_single_rect(self, monkeypatch): + monkeypatch.delenv("WINDOWS_MCP_DISABLE_FLASH", raising=False) + + captured = {} + + class _StubThread: + def __init__(self, target, args, name, daemon): + captured["target"] = target + captured["args"] = args + captured["name"] = name + captured["daemon"] = daemon + + def start(self): + captured["started"] = True + + monkeypatch.setattr(flash_overlay.threading, "Thread", _StubThread) + + flash_overlay.show_capture_flash(_FakeRect(10, 20, 110, 120)) + + assert captured["started"] is True + assert captured["daemon"] is True + assert captured["name"] == "windows-mcp-flash" + assert flash_overlay._active_overlay is not None + rects_arg, full_screen_arg, overlay_arg = captured["args"] + assert rects_arg == [(10, 20, 110, 120)] + assert full_screen_arg is False + assert overlay_arg is flash_overlay._active_overlay + + def test_full_screen_capture_enumerates_monitors(self, monkeypatch): + """When capture_rect is None the helper must read uia.GetMonitorsRect.""" + monkeypatch.delenv("WINDOWS_MCP_DISABLE_FLASH", raising=False) + import windows_mcp.uia + + monkeypatch.setattr( + windows_mcp.uia, + "GetMonitorsRect", + lambda: [_FakeRect(0, 0, 1920, 1080), _FakeRect(1920, 0, 3840, 1080)], + ) + + captured = {} + + class _StubThread: + def __init__(self, target, args, name, daemon): + captured["args"] = args + + def start(self): + pass + + monkeypatch.setattr(flash_overlay.threading, "Thread", _StubThread) + + flash_overlay.show_capture_flash(None) + + rects_arg, full_screen_arg, _ = captured["args"] + assert rects_arg == [(0, 0, 1920, 1080), (1920, 0, 3840, 1080)] + assert full_screen_arg is True + + def test_overlapping_calls_cancel_prior_overlay(self, monkeypatch): + """Second call must signal the prior overlay's stop_event so it can be torn down.""" + monkeypatch.delenv("WINDOWS_MCP_DISABLE_FLASH", raising=False) + + class _StubThread: + def __init__(self, *a, **kw): + pass + + def start(self): + pass + + monkeypatch.setattr(flash_overlay.threading, "Thread", _StubThread) + + flash_overlay.show_capture_flash(_FakeRect(0, 0, 100, 100)) + first = flash_overlay._active_overlay + assert first is not None + assert not first.stop_event.is_set() + + flash_overlay.show_capture_flash(_FakeRect(0, 0, 100, 100)) + second = flash_overlay._active_overlay + assert second is not None + assert second is not first + # Critical: the prior overlay must be signalled so cancel_active_flash semantics survive + assert first.stop_event.is_set() + + +class TestCancelActiveFlash: + def test_no_op_when_no_active_overlay(self): + flash_overlay.cancel_active_flash() + assert flash_overlay._active_overlay is None + + def test_signals_stop_and_clears_active(self, monkeypatch): + # Install a stub overlay manually so we don't depend on Tk + overlay = flash_overlay._Overlay() + overlay.thread = threading.Thread(target=lambda: None, daemon=True) + overlay.thread.start() + overlay.thread.join() + with flash_overlay._lock: + flash_overlay._active_overlay = overlay + + flash_overlay.cancel_active_flash(timeout=0.1) + + assert overlay.stop_event.is_set() + assert flash_overlay._active_overlay is None + + +class TestIntensityCurve: + def test_full_screen_is_bell_curve(self): + # Symmetric peak at t=0.5, zero at t=0 and t=1 + assert flash_overlay._intensity_at(0.0, full_screen=True) == 0.0 + assert flash_overlay._intensity_at(0.5, full_screen=True) == 1.0 + assert abs(flash_overlay._intensity_at(1.0, full_screen=True)) < 1e-9 + + def test_region_holds_then_fades(self): + # Pre-peak fade-in + assert flash_overlay._intensity_at(0.0, full_screen=False) == 0.0 + # Held at peak in the middle + assert flash_overlay._intensity_at(0.4, full_screen=False) == 1.0 + # Fading in last segment + assert flash_overlay._intensity_at(1.0, full_screen=False) == 0.0 + + +class TestPremultipliedBgra: + def test_full_intensity_premultiplies_color_by_alpha(self): + from PIL import Image + + # 1×1 pixel: orange-red 50% alpha → premult should be (R*128/255, G*128/255, B*128/255, 128) in BGRA order + img = Image.new("RGBA", (1, 1), (255, 69, 0, 128)) + out = flash_overlay._premultiplied_bgra(img, 1.0) + b, g, r, a = out + assert a == 128 + assert b == 0 + assert g == (69 * 128) // 255 + assert r == (255 * 128) // 255 + + def test_intensity_scales_alpha(self): + from PIL import Image + + img = Image.new("RGBA", (1, 1), (255, 69, 0, 255)) + out = flash_overlay._premultiplied_bgra(img, 0.5) + # Alpha was 255; intensity 0.5 → effective alpha ≈ 127. + _, _, _, a = out + assert 124 <= a <= 128 + + +class TestRunOverlayFallthrough: + def test_missing_tkinter_sets_closed_event(self, monkeypatch): + # Force ``import tkinter`` inside _run_overlay to fail + monkeypatch.setitem(sys.modules, "tkinter", None) + overlay = flash_overlay._Overlay() + flash_overlay._run_overlay([(0, 0, 100, 100)], False, overlay) + assert overlay.closed_event.is_set() diff --git a/tests/test_window_resolver.py b/tests/test_window_resolver.py new file mode 100644 index 00000000..ae326a7b --- /dev/null +++ b/tests/test_window_resolver.py @@ -0,0 +1,246 @@ +"""Unit tests for window resolution used by the Screenshot/Snapshot tools.""" + +import ctypes +from unittest.mock import MagicMock + +import pytest + +from windows_mcp.desktop import window_resolver +from windows_mcp.desktop.window_resolver import ( + WindowNotFoundError, + enumerate_visible_windows, + get_window_rect, + resolve_window, +) +from windows_mcp.uia import Rect + + +def _windows(): + return [ + (101, "Notepad - Untitled", 1000), + (202, "Cotire — Columbus Time Reporting", 60972), + (303, "", 60972), + (404, "Visual Studio Code", 5555), + ] + + +class TestEnumerateVisibleWindows: + def test_filters_invisible_and_invalid(self, monkeypatch): + def fake_enum(callback, _): + for hwnd in (1, 2, 3): + callback(hwnd, None) + + monkeypatch.setattr(window_resolver.win32gui, "EnumWindows", fake_enum) + monkeypatch.setattr( + window_resolver.win32gui, + "IsWindow", + lambda hwnd: hwnd != 2, + ) + monkeypatch.setattr( + window_resolver.win32gui, + "IsWindowVisible", + lambda hwnd: hwnd != 3, + ) + monkeypatch.setattr( + window_resolver.win32gui, + "GetWindowText", + lambda hwnd: f"win-{hwnd}", + ) + monkeypatch.setattr( + window_resolver.win32process, + "GetWindowThreadProcessId", + lambda hwnd: (0, hwnd * 10), + ) + + results = enumerate_visible_windows() + + assert results == [(1, "win-1", 10)] + assert 2 not in [r[0] for r in results] + assert 3 not in [r[0] for r in results] + + +class TestResolveWindow: + def test_requires_name_or_pid(self): + with pytest.raises(ValueError, match="name or pid"): + resolve_window(windows=_windows()) + + def test_resolves_by_pid_prefers_titled(self): + hwnd, title = resolve_window(pid=60972, windows=_windows()) + assert hwnd == 202 + assert title.startswith("Cotire") + + def test_resolves_by_pid_returns_first_match_when_no_titled(self): + windows = [ + (1, "", 555), + (2, "", 555), + ] + hwnd, title = resolve_window(pid=555, windows=windows) + assert hwnd == 1 + assert title == "" + + def test_pid_not_found_raises(self): + with pytest.raises(WindowNotFoundError, match="PID 999"): + resolve_window(pid=999, windows=_windows()) + + def test_resolves_by_fuzzy_name(self): + hwnd, title = resolve_window(name="cotire", windows=_windows()) + assert hwnd == 202 + assert title.startswith("Cotire") + + def test_name_not_found_raises(self): + with pytest.raises(WindowNotFoundError, match="cutoff"): + resolve_window(name="zzzzzzzzzzz", windows=_windows()) + + def test_no_titled_windows_raises(self): + only_untitled = [(1, "", 1)] + with pytest.raises(WindowNotFoundError, match="No titled windows"): + resolve_window(name="anything", windows=only_untitled) + + def test_pid_takes_precedence_over_name(self): + hwnd, _ = resolve_window(name="visual studio code", pid=60972, windows=_windows()) + assert hwnd == 202 + + +class TestIsForeground: + def test_true_when_foreground_handle_matches(self, monkeypatch): + monkeypatch.setattr(window_resolver.win32gui, "GetForegroundWindow", lambda: 4242) + assert window_resolver.is_foreground(4242) is True + + def test_false_when_foreground_handle_differs(self, monkeypatch): + monkeypatch.setattr(window_resolver.win32gui, "GetForegroundWindow", lambda: 999) + assert window_resolver.is_foreground(4242) is False + + def test_false_when_call_raises(self, monkeypatch): + def boom(): + raise OSError("denied") + + monkeypatch.setattr(window_resolver.win32gui, "GetForegroundWindow", boom) + assert window_resolver.is_foreground(1) is False + + +class TestForceForeground: + def test_invokes_switch_to_this_window(self, monkeypatch): + calls: list[tuple] = [] + fake_user32 = MagicMock() + fake_user32.SwitchToThisWindow.side_effect = lambda hwnd, fAlt: calls.append( + (hwnd.value, fAlt.value) + ) + monkeypatch.setattr(ctypes, "windll", MagicMock(user32=fake_user32)) + window_resolver.force_foreground(7777) + assert calls and calls[0] == (7777, True) + + def test_swallows_exception(self, monkeypatch): + fake_user32 = MagicMock() + fake_user32.SwitchToThisWindow.side_effect = OSError("nope") + monkeypatch.setattr(ctypes, "windll", MagicMock(user32=fake_user32)) + window_resolver.force_foreground(1) + + +class TestGetWindowRect: + def test_uses_dwm_when_call_succeeds(self, monkeypatch): + captured = {} + + def fake_dwm(hwnd, attr, rect_ptr, size): + captured["hwnd"] = hwnd.value + r = ctypes.cast(rect_ptr, ctypes.POINTER(ctypes.wintypes.RECT)).contents + r.left, r.top, r.right, r.bottom = 100, 200, 600, 700 + return 0 + + fake_dwmapi = MagicMock() + fake_dwmapi.DwmGetWindowAttribute.side_effect = fake_dwm + monkeypatch.setattr(ctypes, "windll", MagicMock(dwmapi=fake_dwmapi)) + + rect = get_window_rect(12345) + + assert isinstance(rect, Rect) + assert (rect.left, rect.top, rect.right, rect.bottom) == (100, 200, 600, 700) + assert captured["hwnd"] == 12345 + + def test_falls_back_to_get_window_rect_when_dwm_fails(self, monkeypatch): + fake_dwmapi = MagicMock() + fake_dwmapi.DwmGetWindowAttribute.return_value = 1 # nonzero HRESULT + monkeypatch.setattr(ctypes, "windll", MagicMock(dwmapi=fake_dwmapi)) + monkeypatch.setattr( + window_resolver.win32gui, + "GetWindowRect", + lambda hwnd: (10, 20, 30, 40), + ) + + rect = get_window_rect(99) + assert (rect.left, rect.top, rect.right, rect.bottom) == (10, 20, 30, 40) + + def test_falls_back_when_dwm_raises(self, monkeypatch): + fake_dwmapi = MagicMock() + fake_dwmapi.DwmGetWindowAttribute.side_effect = OSError("oops") + monkeypatch.setattr(ctypes, "windll", MagicMock(dwmapi=fake_dwmapi)) + monkeypatch.setattr( + window_resolver.win32gui, + "GetWindowRect", + lambda hwnd: (1, 2, 3, 4), + ) + + rect = get_window_rect(1) + assert (rect.left, rect.top, rect.right, rect.bottom) == (1, 2, 3, 4) + + +class TestResolveWindowCaptureRectFocusFailure: + """Regression test for PR #233 review feedback: when focus=True and the + post-condition foreground check fails for both bring_window_to_top and + force_foreground, the method must raise instead of capturing wrong content. + bring_window_to_top swallows its own exceptions, so the try/except path is + not a reliable failure signal — only the explicit ``is_foreground`` check is. + """ + + def test_raises_when_window_never_becomes_foreground(self, monkeypatch): + from windows_mcp.desktop.service import Desktop + + # Stand up a Desktop without running its full __init__ (Tree/UIA setup + # would otherwise pull in heavy Windows COM state in a unit test). + desktop = Desktop.__new__(Desktop) + + # Stub Desktop.bring_window_to_top so it neither raises nor focuses + # — same as real-world failure (it logs and returns). + monkeypatch.setattr(Desktop, "bring_window_to_top", lambda self, hwnd: None) + + # Resolver returns a fake (hwnd, title). + monkeypatch.setattr( + window_resolver, "resolve_window", lambda name=None, pid=None: (12345, "Stub Title") + ) + # is_foreground always returns False — focus attempt "fails". + monkeypatch.setattr(window_resolver, "is_foreground", lambda hwnd: False) + # force_foreground does nothing (also a no-op). + monkeypatch.setattr(window_resolver, "force_foreground", lambda hwnd: None) + # Make sleep a no-op so the test is fast. + monkeypatch.setattr("windows_mcp.desktop.service.sleep", lambda s: None) + + with pytest.raises(WindowNotFoundError) as excinfo: + desktop.resolve_window_capture_rect(name="Stub") + assert "foreground" in str(excinfo.value).lower() + assert "focus_window=False" in str(excinfo.value) + + def test_returns_rect_when_force_foreground_succeeds(self, monkeypatch): + from windows_mcp.desktop.service import Desktop + + desktop = Desktop.__new__(Desktop) + + monkeypatch.setattr(Desktop, "bring_window_to_top", lambda self, hwnd: None) + monkeypatch.setattr( + window_resolver, "resolve_window", lambda name=None, pid=None: (12345, "Stub Title") + ) + # First check fails, second (after force_foreground) succeeds. + calls = {"is_foreground": 0} + + def fake_is_foreground(hwnd): + calls["is_foreground"] += 1 + return calls["is_foreground"] >= 2 + + monkeypatch.setattr(window_resolver, "is_foreground", fake_is_foreground) + monkeypatch.setattr(window_resolver, "force_foreground", lambda hwnd: None) + monkeypatch.setattr("windows_mcp.desktop.service.sleep", lambda s: None) + monkeypatch.setattr( + window_resolver, "get_window_rect", lambda hwnd: Rect(10, 20, 110, 120) + ) + + rect, title = desktop.resolve_window_capture_rect(name="Stub") + assert title == "Stub Title" + assert (rect.left, rect.top, rect.right, rect.bottom) == (10, 20, 110, 120)