diff --git a/CLAUDE.md b/CLAUDE.md index 30bc8de..8c2c8c6 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -149,7 +149,17 @@ uv run mkdocs gh-deploy - `DeviceGroup`: Batch operations (set_power, set_color, etc.) - `LocationGrouping` / `GroupGrouping`: Organizational structures for location/group-based grouping -5. **Utilities** +5. **Animation Layer** (`src/lifx/animation/`) + + - `animator.py`: High-level `Animator` class with direct UDP sending + - `framebuffer.py`: Multi-tile canvas mapping and orientation correction + - `packets.py`: Prebaked packet templates (`MatrixPacketGenerator`, `MultiZonePacketGenerator`) + - `orientation.py`: Tile orientation remapping with LRU-cached lookup tables + - Optimized for high-frequency frame delivery (30+ FPS) for real-time effects + - Uses protocol-ready uint16 HSBK values (no conversion overhead) + - Multi-tile canvas support using `user_x`/`user_y` tile positions + +6. **Utilities** - `color.py`: `HSBK` class with RGB conversion, `Colors` presets - `const.py`: Critical constants (network settings, UUIDs, official URLs) @@ -481,6 +491,80 @@ for frame in animation_frames: **Note:** `MatrixLight.set64()` is already fire-and-forget by default. +### Animation Module (High-Frequency Frame Delivery) + +For real-time effects and applications that need to push color data at 30+ FPS, use the animation module: + +```python +from lifx import Animator, MatrixLight + +async with await MatrixLight.from_ip("192.168.1.100") as device: + # Create animator for matrix device + animator = await Animator.for_matrix(device) + +# Device connection closed - animator sends via direct UDP +while running: + # Generate HSBK frame (protocol-ready uint16 values) + # H/S/B: 0-65535, K: 1500-9000 + hsbk_frame = [(65535, 65535, 65535, 3500)] * animator.pixel_count + + # send_frame() is synchronous for speed + stats = animator.send_frame(hsbk_frame) + print(f"Sent {stats.packets_sent} packets") + + await asyncio.sleep(1 / 30) # 30 FPS + +animator.close() +``` + +**Key Features:** +- **Direct UDP**: Bypasses connection layer for maximum throughput +- **Prebaked packets**: Templates created once, only colors updated per frame +- **Multi-tile canvas**: Unified coordinate space for multi-tile devices (e.g., 5-tile LIFX Tile) +- **Tile orientation**: Automatic pixel remapping for rotated tiles + +**Multi-Tile Canvas:** + +For devices with multiple tiles, the animator creates a unified canvas based on tile positions: + +```python +async with await MatrixLight.from_ip("192.168.1.100") as device: + animator = await Animator.for_matrix(device) + +# For 5 horizontal tiles: canvas is 40x8 (320 pixels) +print(f"Canvas: {animator.canvas_width}x{animator.canvas_height}") + +# Generate frame for entire canvas (row-major order) +frame = [] +for y in range(animator.canvas_height): + for x in range(animator.canvas_width): + hue = int(x / animator.canvas_width * 65535) # Rainbow across all tiles + frame.append((hue, 65535, 65535, 3500)) + +animator.send_frame(frame) +``` + +**HSBK Format (Protocol-Ready):** +```python +# (hue, saturation, brightness, kelvin) +# H/S/B: 0-65535, K: 1500-9000 +red = (0, 65535, 65535, 3500) # Full red +blue = (43690, 65535, 65535, 3500) # Full blue (240/360 * 65535) +white = (0, 0, 65535, 5500) # Daylight white +off = (0, 0, 0, 3500) # Off (black) +``` + +**For MultiZone devices (strips/beams):** +```python +from lifx import Animator, MultiZoneLight + +async with await MultiZoneLight.from_ip("192.168.1.100") as device: + animator = await Animator.for_multizone(device) + +# Same API as matrix +stats = animator.send_frame(hsbk_frame) +``` + ### Packet Flow 1. Create packet instance (e.g., `LightSetColor`) diff --git a/docs/api/animation.md b/docs/api/animation.md new file mode 100644 index 0000000..f52a110 --- /dev/null +++ b/docs/api/animation.md @@ -0,0 +1,356 @@ +# Animation Module + +The animation module provides efficient high-frequency frame delivery for LIFX devices, optimized +for real-time effects and applications that need to push color data at 30+ FPS. + +## Overview + +The animation system uses a streamlined architecture optimized for speed: + +``` +Application Frame -> FrameBuffer -> PacketGenerator -> Direct UDP + (canvas map) (prebaked packets) (fire-and-forget) +``` + +Key features: + +- **Direct UDP**: Bypasses connection layer for maximum throughput +- **Prebaked packets**: Templates created once, only colors updated per frame +- **Multi-tile canvas**: Unified coordinate space for multi-tile devices +- **Tile orientation**: Automatic pixel remapping for rotated tiles +- **Synchronous sending**: `send_frame()` is synchronous for minimum overhead + +## Quick Start + +```python +import asyncio +from lifx import Animator, MatrixLight + +async def main(): + async with await MatrixLight.from_ip("192.168.1.100") as device: + # Create animator for matrix device + animator = await Animator.for_matrix(device) + + # Device connection closed - animator sends via direct UDP + try: + while True: + # Generate HSBK frame (protocol-ready uint16 values) + # H/S/B: 0-65535, K: 1500-9000 + hsbk_frame = [(65535, 65535, 65535, 3500)] * animator.pixel_count + + # send_frame() is synchronous for speed + stats = animator.send_frame(hsbk_frame) + print(f"Sent {stats.packets_sent} packets in {stats.total_time_ms:.2f}ms") + + await asyncio.sleep(1 / 30) # 30 FPS + finally: + animator.close() +``` + +## Multi-Tile Canvas + +For devices with multiple tiles (like the original 5-tile LIFX Tile), the animator creates +a unified canvas based on tile positions (`user_x`, `user_y`). Animations span all tiles +as one continuous image. + +```python +async with await MatrixLight.from_ip("192.168.1.100") as device: + animator = await Animator.for_matrix(device) + +# Check canvas dimensions +print(f"Canvas: {animator.canvas_width}x{animator.canvas_height}") +# For 5 horizontal tiles: "Canvas: 40x8" + +# Generate frame for entire canvas (row-major order) +frame = [] +for y in range(animator.canvas_height): + for x in range(animator.canvas_width): + hue = int(x / animator.canvas_width * 65535) # Gradient across all tiles + frame.append((hue, 65535, 65535, 3500)) + +animator.send_frame(frame) +``` + +## HSBK Format + +All color data uses protocol-ready uint16 values: + +| Component | Range | Description | +|-----------|-------|-------------| +| Hue | 0-65535 | Maps to 0-360 degrees | +| Saturation | 0-65535 | Maps to 0.0-1.0 | +| Brightness | 0-65535 | Maps to 0.0-1.0 | +| Kelvin | 1500-9000 | Color temperature | + +This design pushes conversion work to the caller (e.g. using NumPy) for better performance. +The `lifx-async` library remains dependency-free. + +```python +# Red at full brightness +red = (0, 65535, 65535, 3500) + +# 50% brightness warm white +warm_white = (0, 0, 32768, 2700) + +# Convert from user-friendly values +def to_protocol_hsbk( + hue: float, sat: float, bright: float, kelvin: int +) -> tuple[int, int, int, int]: + """Convert user-friendly values to protocol format.""" + return ( + int(hue / 360 * 65535), + int(sat * 65535), + int(bright * 65535), + kelvin, + ) +``` + +## Animator + +High-level class integrating all animation components. + +::: lifx.animation.animator.Animator + options: + show_root_heading: true + heading_level: 3 + members_order: source + show_if_no_docstring: false + filters: + - "!^_" + +### AnimatorStats + +Statistics returned by `Animator.send_frame()`. + +::: lifx.animation.animator.AnimatorStats + options: + show_root_heading: true + heading_level: 4 + members_order: source + show_if_no_docstring: false + +## FrameBuffer + +Canvas mapping and orientation handling for matrix devices. + +::: lifx.animation.framebuffer.FrameBuffer + options: + show_root_heading: true + heading_level: 3 + members_order: source + show_if_no_docstring: false + filters: + - "!^_" + +### TileRegion + +Represents a tile's region within the canvas. + +::: lifx.animation.framebuffer.TileRegion + options: + show_root_heading: true + heading_level: 4 + members_order: source + show_if_no_docstring: false + +## Packet Generators + +Device-specific packet generation with prebaked templates. + +### PacketGenerator (Base) + +::: lifx.animation.packets.PacketGenerator + options: + show_root_heading: true + heading_level: 4 + members_order: source + show_if_no_docstring: false + +### PacketTemplate + +Prebaked packet template for zero-allocation frame updates. + +::: lifx.animation.packets.PacketTemplate + options: + show_root_heading: true + heading_level: 4 + members_order: source + show_if_no_docstring: false + +### MatrixPacketGenerator + +Generates Set64 packets for MatrixLight devices. + +::: lifx.animation.packets.MatrixPacketGenerator + options: + show_root_heading: true + heading_level: 4 + members_order: source + show_if_no_docstring: false + filters: + - "!^_" + +### MultiZonePacketGenerator + +Generates SetExtendedColorZones packets for MultiZoneLight devices. + +::: lifx.animation.packets.MultiZonePacketGenerator + options: + show_root_heading: true + heading_level: 4 + members_order: source + show_if_no_docstring: false + filters: + - "!^_" + +## Tile Orientation + +Pixel remapping for rotated tiles. + +### Orientation Enum + +::: lifx.animation.orientation.Orientation + options: + show_root_heading: true + heading_level: 4 + members_order: source + show_if_no_docstring: false + +### build_orientation_lut + +::: lifx.animation.orientation.build_orientation_lut + options: + show_root_heading: true + heading_level: 4 + +## Examples + +### Matrix Animation (Single Tile) + +```python +import asyncio +from lifx import Animator, MatrixLight + +async def rainbow_animation(): + async with await MatrixLight.from_ip("192.168.1.100") as device: + animator = await Animator.for_matrix(device) + + hue_offset = 0 + try: + while True: + # Generate rainbow gradient + frame = [] + for i in range(animator.pixel_count): + hue = (hue_offset + i * 1000) % 65536 + frame.append((hue, 65535, 32768, 3500)) + + stats = animator.send_frame(frame) + print(f"Sent {stats.packets_sent} packets") + + hue_offset = (hue_offset + 500) % 65536 + await asyncio.sleep(1 / 30) # 30 FPS + finally: + animator.close() +``` + +### Multi-Tile Animation (LIFX Tile with 5 tiles) + +```python +import asyncio +import math +from lifx import Animator, MatrixLight + +async def multi_tile_wave(): + async with await MatrixLight.from_ip("192.168.1.100") as device: + animator = await Animator.for_matrix(device) + + # Canvas spans all tiles (e.g., 40x8 for 5 horizontal tiles) + width = animator.canvas_width + height = animator.canvas_height + print(f"Canvas: {width}x{height}") + + hue_offset = 0 + try: + while True: + frame = [] + for y in range(height): + for x in range(width): + # Wave that flows across all tiles + pos = x + y * 0.5 # Diagonal wave + hue = int((pos / width) * 65535 + hue_offset) % 65536 + frame.append((hue, 65535, 65535, 3500)) + + animator.send_frame(frame) + hue_offset = (hue_offset + 1000) % 65536 + await asyncio.sleep(1 / 30) + finally: + animator.close() +``` + +### MultiZone Animation + +```python +import asyncio +from lifx import Animator, MultiZoneLight + +async def chase_animation(): + async with await MultiZoneLight.from_ip("192.168.1.100") as device: + animator = await Animator.for_multizone(device) + + position = 0 + try: + while True: + # Generate chase pattern + frame = [] + for i in range(animator.pixel_count): + if i == position: + frame.append((0, 65535, 65535, 3500)) # Red + else: + frame.append((0, 0, 0, 3500)) # Off + + animator.send_frame(frame) + + position = (position + 1) % animator.pixel_count + await asyncio.sleep(1 / 20) # 20 FPS + finally: + animator.close() +``` + +## Performance Characteristics + +### Direct UDP Delivery + +The animation module bypasses the connection layer entirely: + +- No ACKs, no waiting, no retries +- Packets sent via raw UDP socket +- Maximum throughput for real-time effects +- Some packet loss is acceptable (visual artifacts are brief) + +### Prebaked Packet Templates + +Packets are constructed once at initialization: + +- Header and payload structure prebaked as `bytearray` +- Per-frame: only color data and sequence number updated +- Zero object allocation in the hot path +- Sequence number wraps at 256 (uint8) + +### Multi-Tile Canvas Mapping + +For devices with multiple tiles: + +- Tile positions read from device (`user_x`, `user_y`) +- Canvas bounds calculated from all tile positions +- Input frame interpreted as 2D row-major canvas +- Each tile extracts its region based on position +- Orientation correction applied per-tile + +### Typical Performance + +| Device Type | Pixels | Packets/Frame | Send Time | +|-------------|--------|---------------|-----------| +| Single tile (8x8) | 64 | 1 | <0.5ms | +| 5-tile chain | 320 | 5 | <1ms | +| Large Ceiling (16x8) | 128 | 3 | <1ms | +| MultiZone (82 zones) | 82 | 1 | <0.5ms | diff --git a/docs/api/index.md b/docs/api/index.md index b5182e3..f19034d 100644 --- a/docs/api/index.md +++ b/docs/api/index.md @@ -11,6 +11,12 @@ lifx/ ├── color.py # Color utilities (HSBK, Colors) ├── const.py # Network constants and URLs ├── exceptions.py # Exception hierarchy +├── animation/ # Animation module for high-frequency frame delivery +│ ├── __init__.py # Public API exports +│ ├── animator.py # High-level Animator class with direct UDP +│ ├── framebuffer.py # Multi-tile canvas mapping and orientation +│ ├── packets.py # Prebaked packet templates +│ └── orientation.py # Tile orientation remapping ├── devices/ # Device classes │ ├── base.py # Base Device class │ ├── light.py # Light device (color control) @@ -73,6 +79,14 @@ Work with colors: - [`HSBK`](colors.md#lifx.color.HSBK) - Color representation - [`Colors`](colors.md#lifx.color.Colors) - Built-in presets +### Animation + +High-frequency frame delivery for real-time effects: + +- [`Animator`](animation.md#lifx.animation.animator.Animator) - High-level animation interface with direct UDP +- [`FrameBuffer`](animation.md#lifx.animation.framebuffer.FrameBuffer) - Multi-tile canvas mapping +- [`PacketTemplate`](animation.md#lifx.animation.packets.PacketTemplate) - Prebaked packet templates + ### Network Layer Low-level network operations: @@ -195,6 +209,14 @@ async def set_custom_color(light: Light, hue: float) -> None: [:octicons-arrow-right-24: Colors](colors.md) +- :material-animation:{ .lg .middle } __Animation__ + + ______________________________________________________________________ + + High-frequency frame delivery for real-time effects + + [:octicons-arrow-right-24: Animation](animation.md) + - :material-network:{ .lg .middle } __Network Layer__ ______________________________________________________________________ diff --git a/docs/user-guide/animation.md b/docs/user-guide/animation.md new file mode 100644 index 0000000..ef31fd4 --- /dev/null +++ b/docs/user-guide/animation.md @@ -0,0 +1,369 @@ +# Animation Guide + +This guide covers how to use the animation module for high-frequency frame delivery to LIFX devices. +The animation system is designed for real-time effects and applications that need to push color data at 30+ FPS. + +## When to Use Animation + +Use the animation module when you need: + +- **High frame rates** (20+ FPS) +- **Real-time effects** from external sources +- **Integration with music visualizers** +- **Continuous animations** that run for extended periods + +For simple, one-time color changes, use the device methods directly (`set_color()`, `set_tile_colors()`, etc.) instead. + +## Basic Usage + +### Matrix Devices (Tiles, Candle, Path) + +```python +import asyncio +from lifx import Animator, MatrixLight + +async def main(): + async with await MatrixLight.from_ip("192.168.1.100") as device: + # Create animator (queries device for tile info) + animator = await Animator.for_matrix(device) + + # Device connection closed - animator sends via direct UDP + print(f"Canvas: {animator.canvas_width}x{animator.canvas_height}") + print(f"Total pixels: {animator.pixel_count}") + + try: + # Animation loop + for _ in range(100): + # Generate frame (H, S, B, K as uint16) + frame = [(65535, 65535, 65535, 3500)] * animator.pixel_count + + # send_frame() is synchronous for speed + stats = animator.send_frame(frame) + print(f"Sent {stats.packets_sent} packets") + + await asyncio.sleep(1 / 30) # 30 FPS + finally: + animator.close() + +asyncio.run(main()) +``` + +### MultiZone Devices (Strips, Beams) + +```python +import asyncio +from lifx import Animator, MultiZoneLight + +async def main(): + async with await MultiZoneLight.from_ip("192.168.1.100") as device: + # Create animator + animator = await Animator.for_multizone(device) + + print(f"Device has {animator.pixel_count} zones") + + try: + # Animation loop + for _ in range(100): + # Generate frame + frame = [(0, 65535, 65535, 3500)] * animator.pixel_count + + stats = animator.send_frame(frame) + await asyncio.sleep(1 / 30) + finally: + animator.close() + +asyncio.run(main()) +``` + +## Multi-Tile Canvas + +For devices with multiple tiles (like the original 5-tile LIFX Tile), the animator automatically +creates a unified canvas based on tile positions. This allows animations to span across all tiles +as one continuous image, rather than each tile showing a mirrored copy. + +### How It Works + +1. The animator reads each tile's position (`user_x`, `user_y`) from the device +2. Positions are in "tile-width units" (1.0 = one tile width) +3. A canvas is created that encompasses all tiles +4. Your input frame is interpreted as a 2D row-major image +5. Each tile extracts its region from the canvas based on its position + +### Example: 5 Horizontal Tiles + +```python +async with await MatrixLight.from_ip("192.168.1.100") as device: + animator = await Animator.for_matrix(device) + +# For 5 tiles arranged horizontally: +# - canvas_width = 40 (5 tiles x 8 pixels) +# - canvas_height = 8 +# - pixel_count = 320 (40 x 8) + +print(f"Canvas: {animator.canvas_width}x{animator.canvas_height}") + +# Generate a gradient that flows across ALL tiles +frame = [] +for y in range(animator.canvas_height): + for x in range(animator.canvas_width): + # Hue varies from 0 to 65535 across the full width + hue = int(x / animator.canvas_width * 65535) + frame.append((hue, 65535, 65535, 3500)) + +animator.send_frame(frame) # Rainbow spans all 5 tiles! +``` + +### Canvas Coordinate System + +The canvas uses row-major ordering: + +``` +For a 40x8 canvas (5 horizontal tiles): + +Index: 0 1 2 3 4 ... 39 (row 0) + 40 41 42 43 44 ... 79 (row 1) + ... + 280 281 ... 319 (row 7) + +Tile positions: +Tile 0: x=0-7, y=0-7 +Tile 1: x=8-15, y=0-7 +Tile 2: x=16-23, y=0-7 +Tile 3: x=24-31, y=0-7 +Tile 4: x=32-39, y=0-7 +``` + +## Understanding HSBK Format + +The animation module uses protocol-ready HSBK values for performance: + +```python +# HSBK tuple: (hue, saturation, brightness, kelvin) +# - Hue: 0-65535 (maps to 0-360 degrees) +# - Saturation: 0-65535 (maps to 0.0-1.0) +# - Brightness: 0-65535 (maps to 0.0-1.0) +# - Kelvin: 1500-9000 + +# Examples +red = (0, 65535, 65535, 3500) # Full red +blue = (43690, 65535, 65535, 3500) # Full blue (240/360 * 65535) +white = (0, 0, 65535, 5500) # Daylight white +dim_warm = (0, 0, 16384, 2700) # 25% warm white +off = (0, 0, 0, 3500) # Off (black) +``` + +### Converting from User-Friendly Values + +```python +def to_protocol_hsbk( + hue: float, # 0-360 degrees + saturation: float, # 0.0-1.0 + brightness: float, # 0.0-1.0 + kelvin: int, # 1500-9000 +) -> tuple[int, int, int, int]: + """Convert user-friendly values to protocol format.""" + return ( + int(hue / 360 * 65535), + int(saturation * 65535), + int(brightness * 65535), + kelvin, + ) + +# Usage +red = to_protocol_hsbk(0, 1.0, 1.0, 3500) +blue = to_protocol_hsbk(240, 1.0, 1.0, 3500) +``` + +### Converting from RGB + +```python +def rgb_to_protocol_hsbk( + r: int, g: int, b: int, # 0-255 + kelvin: int = 3500, +) -> tuple[int, int, int, int]: + """Convert RGB to protocol HSBK.""" + # Normalize to 0-1 + r_norm = r / 255 + g_norm = g / 255 + b_norm = b / 255 + + max_c = max(r_norm, g_norm, b_norm) + min_c = min(r_norm, g_norm, b_norm) + delta = max_c - min_c + + # Brightness + brightness = max_c + + # Saturation + if max_c == 0: + saturation = 0 + else: + saturation = delta / max_c + + # Hue + if delta == 0: + hue = 0 + elif max_c == r_norm: + hue = 60 * (((g_norm - b_norm) / delta) % 6) + elif max_c == g_norm: + hue = 60 * (((b_norm - r_norm) / delta) + 2) + else: + hue = 60 * (((r_norm - g_norm) / delta) + 4) + + return ( + int(hue / 360 * 65535), + int(saturation * 65535), + int(brightness * 65535), + kelvin, + ) +``` + +## Tile Orientation Handling + +For matrix devices with the `has_chain` capability (like the original LIFX Tile), tiles may be +physically rotated. The animator automatically handles orientation correction: + +```python +async with await MatrixLight.from_ip("192.168.1.100") as device: + # Orientation is detected from device accelerometer data + animator = await Animator.for_matrix(device) + +# Your frame uses logical canvas coordinates +# The animator remaps to physical tile positions +animator.send_frame(logical_frame) +``` + +**Supported orientations:** + +- `RIGHT_SIDE_UP` - Normal position +- `ROTATED_90` - 90 degrees clockwise +- `ROTATED_180` - Upside down +- `ROTATED_270` - 90 degrees counter-clockwise +- `FACE_UP` - Facing ceiling (treated as right-side-up for 2D mapping) +- `FACE_DOWN` - Facing floor (treated as right-side-up for 2D mapping) + +## Performance Tips + +### The Animation Loop Pattern + +```python +async with await MatrixLight.from_ip("192.168.1.100") as device: + animator = await Animator.for_matrix(device) + +# Device connection closed here - animator works via direct UDP + +try: + while running: + frame = generate_frame() + animator.send_frame(frame) # Synchronous, very fast + await asyncio.sleep(1 / target_fps) +finally: + animator.close() # Clean up UDP socket +``` + +### Pre-generate Frames + +```python +# Generate frames in advance +frames = [] +for i in range(100): + frame = generate_animation_frame(i) + frames.append(frame) + +# Play back at consistent rate +for frame in frames: + animator.send_frame(frame) + await asyncio.sleep(1 / 30) +``` + +### Use NumPy for Large Canvases + +For large devices or complex animations, NumPy can speed up frame generation: + +```python +import numpy as np + +def generate_gradient_numpy(width: int, height: int, hue_offset: int) -> list: + """Generate rainbow gradient using NumPy.""" + # Create coordinate grids + x = np.arange(width) + y = np.arange(height) + xx, yy = np.meshgrid(x, y) + + # Calculate hues based on position + hues = ((xx + yy * 0.5 + hue_offset) * 1000) % 65536 + + # Build frame array + frame = np.zeros((height, width, 4), dtype=np.uint16) + frame[:, :, 0] = hues # Hue + frame[:, :, 1] = 65535 # Saturation + frame[:, :, 2] = 65535 # Brightness + frame[:, :, 3] = 3500 # Kelvin + + # Convert to list of tuples (row-major) + return [tuple(p) for p in frame.reshape(-1, 4)] +``` + +For a complete example including vectorized RGB to HSBK conversion, see +[examples/16_animation_numpy.py](https://github.com/Djelibeybi/lifx-async/blob/main/examples/16_animation_numpy.py). + +### Monitor Statistics + +```python +total_packets = 0 +frame_count = 0 +start_time = time.monotonic() + +for frame in animation: + stats = animator.send_frame(frame) + total_packets += stats.packets_sent + frame_count += 1 + +elapsed = time.monotonic() - start_time +fps = frame_count / elapsed +print(f"Average FPS: {fps:.1f}") +print(f"Total packets: {total_packets}") +print(f"Avg packets/frame: {total_packets / frame_count:.1f}") +``` + +## Troubleshooting + +### Flickering or Glitches + +**Cause:** Packet loss on the network + +**Solutions:** + +1. Reduce frame rate (try 20 FPS instead of 30) +2. Ensure good WiFi signal to the device +3. Consider wired connection if possible +4. Accept that some packet loss is normal for UDP + +### Animation Appears on Each Tile Separately + +**Cause:** Device doesn't have `has_chain` capability, so canvas mode isn't used + +**Solutions:** + +1. Check device capabilities: only the original LIFX Tile has multi-tile canvas +2. For other matrix devices (Ceiling, Candle, Path), canvas equals tile size + +### Wrong Colors on Rotated Tiles + +**Cause:** Orientation not detected correctly + +**Solutions:** + +1. Ensure device chain is loaded before creating animator +2. Check tile accelerometer data via `device.device_chain` +3. Physical tiles must be stable (not moving) for accurate orientation + +### Memory Growth + +**Cause:** Creating new frame lists each iteration + +**Solutions:** + +1. Reuse frame lists when possible +2. Use generator patterns for very long animations +3. Clear references after use diff --git a/examples/15_animation.py b/examples/15_animation.py new file mode 100644 index 0000000..c467369 --- /dev/null +++ b/examples/15_animation.py @@ -0,0 +1,372 @@ +"""Animation module example with device auto-detection. + +Demonstrates the animation module for high-frequency frame delivery to LIFX +devices. Automatically detects whether the device is a matrix (Tile, Candle, +Path) or multizone (Strip, Beam) device and runs an appropriate animation. + +The animation module sends frames via direct UDP for maximum throughput - +no connection layer overhead, no ACKs, just fire packets as fast as possible. +""" + +import argparse +import asyncio +import math +import time + +from lifx import Animator, MatrixLight, MultiZoneLight, find_by_ip, find_by_serial + + +def print_animator_info(animator: Animator) -> None: + """Print information about the animator configuration.""" + print("\n--- Animator Info ---") + w, h, p = animator.canvas_width, animator.canvas_height, animator.pixel_count + print(f" Canvas: {w}x{h} ({p} pixels)") + print(" Network: Direct UDP (fire-and-forget)") + print("---------------------\n") + + +async def run_matrix_animation( + device: MatrixLight, + duration: float = 10.0, + fps: float = 30.0, +) -> None: + """Run a rainbow wave animation on a matrix device. + + For multi-tile devices (like the original LIFX Tile), the animation spans + the entire canvas - a unified coordinate space based on tile positions. + This means the rainbow wave flows across all tiles as one continuous image. + """ + print(f"\nRunning matrix animation for {duration:.1f} seconds...") + print(f"Animation: Rainbow wave (30 degree angle) at {fps:.0f} FPS") + + # Create animator (queries device once, then sends via direct UDP) + animator = await Animator.for_matrix(device) + + # Get canvas dimensions (may span multiple tiles) + canvas_width = animator.canvas_width + canvas_height = animator.canvas_height + pixel_count = animator.pixel_count # canvas_width * canvas_height + + # Get tile info + tiles = device.device_chain + if not tiles: + print("Error: No tiles found") + return + + print(f"Device: {len(tiles)} tile(s)") + print(f"Canvas: {canvas_width}x{canvas_height} ({pixel_count} pixels)") + if len(tiles) > 1: + print(" (Animation spans all tiles as one unified canvas)") + + # Print debug info + print_animator_info(animator) + + # Wave direction: 30 degrees from horizontal + wave_angle = math.radians(30) + cos_wave = math.cos(wave_angle) + sin_wave = math.sin(wave_angle) + + # Calculate max position for normalization (using canvas dimensions) + max_pos = canvas_width * cos_wave + canvas_height * sin_wave + + start_time = time.monotonic() + frame_count = 0 + total_packets = 0 + hue_offset = 0 + last_status_time = start_time + + try: + while time.monotonic() - start_time < duration: + frame = [] + + # Generate canvas-sized frame (row-major order) + for y in range(canvas_height): + for x in range(canvas_width): + # Project position onto wave direction (like multizone but angled) + pos = x * cos_wave + y * sin_wave + + # Map position to hue (0-65535) + hue = int((pos / max_pos) * 65535 + hue_offset) % 65536 + + frame.append( + ( + hue, + 65535, # Full saturation + 65535, # Full brightness + 3500, # Kelvin + ) + ) + + # send_frame is synchronous for maximum speed + stats = animator.send_frame(frame) + frame_count += 1 + total_packets += stats.packets_sent + + # Print periodic status (every 2 seconds) + now = time.monotonic() + if now - last_status_time >= 2.0: + elapsed_so_far = now - start_time + current_fps = frame_count / elapsed_so_far + print( + f" [{elapsed_so_far:.1f}s] frames={frame_count}, " + f"packets={total_packets}, fps={current_fps:.1f}" + ) + last_status_time = now + + # Shift the rainbow + hue_offset = (hue_offset + 1000) % 65536 + + # Target FPS + await asyncio.sleep(1 / fps) + + except KeyboardInterrupt: + print("\nAnimation interrupted") + finally: + animator.close() + + elapsed = time.monotonic() - start_time + actual_fps = frame_count / elapsed if elapsed > 0 else 0 + avg_packets_per_frame = total_packets / frame_count if frame_count > 0 else 0 + print("\nAnimation complete!") + print(f" Frames: {frame_count}") + print(f" Duration: {elapsed:.1f}s") + print(f" Average FPS: {actual_fps:.1f}") + print(f" Total packets: {total_packets}") + print(f" Avg packets/frame: {avg_packets_per_frame:.2f}") + + +async def run_multizone_animation( + device: MultiZoneLight, + duration: float = 10.0, + fps: float = 30.0, +) -> None: + """Run a rainbow wave animation on a multizone device.""" + print(f"\nRunning multizone animation for {duration:.1f} seconds...") + print(f"Animation: Rainbow wave at {fps:.0f} FPS") + + # Create animator (queries device once, then sends via direct UDP) + animator = await Animator.for_multizone(device) + zone_count = animator.pixel_count + + print(f"Device: {zone_count} zones") + + # Print debug info + print_animator_info(animator) + + start_time = time.monotonic() + frame_count = 0 + total_packets = 0 + hue_offset = 0 + last_status_time = start_time + + try: + while time.monotonic() - start_time < duration: + frame = [] + + for i in range(zone_count): + # Create rainbow gradient across zones, shifting over time + hue_val = int((i / zone_count) * 65536) + hue = (hue_offset + hue_val) % 65536 + + frame.append( + ( + hue, + 65535, # Full saturation + 65535, # Full brightness + 3500, # Kelvin + ) + ) + + # send_frame is synchronous for maximum speed + stats = animator.send_frame(frame) + frame_count += 1 + total_packets += stats.packets_sent + + # Print periodic status (every 2 seconds) + now = time.monotonic() + if now - last_status_time >= 2.0: + elapsed_so_far = now - start_time + current_fps = frame_count / elapsed_so_far + print( + f" [{elapsed_so_far:.1f}s] frames={frame_count}, " + f"packets={total_packets}, fps={current_fps:.1f}" + ) + last_status_time = now + + # Rotate the rainbow + hue_offset = (hue_offset + 1000) % 65536 + + # Target FPS + await asyncio.sleep(1 / fps) + + except KeyboardInterrupt: + print("\nAnimation interrupted") + finally: + animator.close() + + elapsed = time.monotonic() - start_time + actual_fps = frame_count / elapsed if elapsed > 0 else 0 + avg_packets_per_frame = total_packets / frame_count if frame_count > 0 else 0 + print("\nAnimation complete!") + print(f" Frames: {frame_count}") + print(f" Duration: {elapsed:.1f}s") + print(f" Average FPS: {actual_fps:.1f}") + print(f" Total packets: {total_packets}") + print(f" Avg packets/frame: {avg_packets_per_frame:.2f}") + + +async def main( + serial: str, + ip: str | None = None, + duration: float = 10.0, + fps: float = 30.0, +) -> None: + """Find device and run appropriate animation.""" + print("=" * 70) + print("LIFX Animation Example") + print("=" * 70) + + # Find the device + if ip: + print(f"\nSearching for device at IP: {ip}") + device = await find_by_ip(ip) + if device is None: + print(f"No device found at IP '{ip}'") + return + # Verify serial matches if both provided + if device.serial.lower().replace(":", "") != serial.lower().replace(":", ""): + print(f"Warning: Device serial {device.serial} doesn't match {serial}") + else: + print(f"\nSearching for device with serial: {serial}") + device = await find_by_serial(serial) + if device is None: + print(f"No device found with serial '{serial}'") + print("\nTroubleshooting:") + print("1. Check that the serial number is correct (12 hex digits)") + print("2. Ensure the device is powered on and on the network") + print("3. Try providing the --ip address if discovery is slow") + return + + print(f"Found: {type(device).__name__} at {device.ip}") + + # Connect and get device info + async with device: + # get_color() is available on Light and subclasses + _, power, label = await device.get_color() # type: ignore[union-attr] + print(f"Label: {label}") + print(f"Power: {'ON' if power > 0 else 'OFF'}") + + # Check device type and capabilities + is_matrix = isinstance(device, MatrixLight) + is_multizone = isinstance(device, MultiZoneLight) + + if not is_matrix and not is_multizone: + # Check capabilities as fallback + if device.capabilities: + is_matrix = device.capabilities.has_matrix + is_multizone = device.capabilities.has_multizone + + # Print capability info for debugging + print("\n--- Device Capabilities ---") + print(f" Device class: {type(device).__name__}") + print(f" Is matrix: {is_matrix}") + print(f" Is multizone: {is_multizone}") + if device.capabilities: + caps = device.capabilities + print(f" has_matrix: {caps.has_matrix}") + print(f" has_multizone: {caps.has_multizone}") + print(f" has_extended_multizone: {caps.has_extended_multizone}") + else: + print(" capabilities: None (not detected)") + print("---------------------------") + + if not is_matrix and not is_multizone: + print("\nThis device does not support animations.") + print("The animation module requires a Matrix or MultiZone device:") + print(" - Matrix: Tile, Candle, Path, Ceiling") + print(" - MultiZone: Strip, Beam") + return + + # Turn on if off + was_off = power == 0 + if was_off: + print("\nTurning device ON...") + await device.set_power(True) + await asyncio.sleep(1) + + # Run appropriate animation + try: + if is_matrix: + assert isinstance(device, MatrixLight) + await run_matrix_animation(device, duration, fps) + else: + assert isinstance(device, MultiZoneLight) + await run_multizone_animation(device, duration, fps) + finally: + # Restore power state + if was_off: + print("\nTurning device back OFF...") + await device.set_power(False) + + print("\n" + "=" * 70) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Run animation on a LIFX matrix or multizone device", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Find device by serial and run animation + python 15_animation.py --serial d073d5123456 + + # Specify IP address for faster connection + python 15_animation.py --serial d073d5123456 --ip 192.168.1.100 + + # Run animation for 30 seconds at 60 FPS + python 15_animation.py --serial d073d5123456 --duration 30 --fps 60 + + # Serial number formats (both work): + python 15_animation.py --serial d073d5123456 + python 15_animation.py --serial d0:73:d5:12:34:56 + """, + ) + parser.add_argument( + "--serial", + "-s", + required=True, + help="Device serial number (12 hex digits, with or without colons)", + ) + parser.add_argument( + "--ip", + "-i", + help="Optional IP address for faster connection", + ) + parser.add_argument( + "--duration", + "-d", + type=float, + default=10.0, + help="Animation duration in seconds (default: 10)", + ) + parser.add_argument( + "--fps", + "-f", + type=float, + default=30.0, + help="Target frames per second (default: 30)", + ) + + args = parser.parse_args() + + try: + asyncio.run( + main( + args.serial, + args.ip, + args.duration, + args.fps, + ) + ) + except KeyboardInterrupt: + print("\n\nCancelled by user.") diff --git a/examples/16_animation_numpy.py b/examples/16_animation_numpy.py new file mode 100644 index 0000000..2c28bbe --- /dev/null +++ b/examples/16_animation_numpy.py @@ -0,0 +1,543 @@ +"""Animation example using NumPy for efficient frame generation. + +This example demonstrates how to use NumPy for high-performance animation +frame generation. NumPy enables vectorized operations that are significantly +faster than Python loops, especially for large pixel counts. + +Requires: numpy (pip install numpy) + +Key optimizations: +- Vectorized RGB to HSBK conversion +- Vectorized frame generation with no Python loops +- Pre-allocated arrays to avoid memory allocation per frame +- Direct conversion to protocol-ready uint16 format +""" + +import argparse +import asyncio +import time + +try: + import numpy as np + from numpy.typing import NDArray +except ImportError: + print("This example requires NumPy. Install it with:") + print(" pip install numpy") + print(" # or") + print(" uv add --dev numpy") + raise SystemExit(1) + +from lifx import Animator, MatrixLight, MultiZoneLight, find_by_ip, find_by_serial + + +def print_animator_info(animator: Animator) -> None: + """Print information about the animator configuration.""" + print("\n--- Animator Info ---") + w, h, p = animator.canvas_width, animator.canvas_height, animator.pixel_count + print(f" Canvas: {w}x{h} ({p} pixels)") + print(" Network: Direct UDP (fire-and-forget)") + print("---------------------\n") + + +def rgb_to_hsbk_numpy( + rgb: NDArray[np.uint8], + kelvin: int = 3500, +) -> NDArray[np.uint16]: + """Convert RGB array to protocol-ready HSBK using vectorized operations. + + Args: + rgb: Array of shape (N, 3) with RGB values 0-255 + kelvin: Color temperature for all pixels + + Returns: + Array of shape (N, 4) with HSBK values in protocol format: + - Hue: 0-65535 + - Saturation: 0-65535 + - Brightness: 0-65535 + - Kelvin: 1500-9000 + """ + # Normalize RGB to 0-1 float + rgb_norm = rgb.astype(np.float32) / 255.0 + + r = rgb_norm[:, 0] + g = rgb_norm[:, 1] + b = rgb_norm[:, 2] + + # Calculate max, min, delta + max_c = np.maximum(np.maximum(r, g), b) + min_c = np.minimum(np.minimum(r, g), b) + delta = max_c - min_c + + # Brightness is just the max + brightness = max_c + + # Saturation + saturation = np.where(max_c > 0, delta / max_c, 0) + + # Hue calculation (vectorized) + hue = np.zeros_like(max_c) + + # Where delta > 0, calculate hue + mask = delta > 0 + + # Red is max + red_max = mask & (max_c == r) + hue[red_max] = 60 * (((g[red_max] - b[red_max]) / delta[red_max]) % 6) + + # Green is max + green_max = mask & (max_c == g) + hue[green_max] = 60 * (((b[green_max] - r[green_max]) / delta[green_max]) + 2) + + # Blue is max + blue_max = mask & (max_c == b) + hue[blue_max] = 60 * (((r[blue_max] - g[blue_max]) / delta[blue_max]) + 4) + + # Convert to protocol format (uint16) + hsbk = np.zeros((len(rgb), 4), dtype=np.uint16) + hsbk[:, 0] = (hue / 360 * 65535).astype(np.uint16) + hsbk[:, 1] = (saturation * 65535).astype(np.uint16) + hsbk[:, 2] = (brightness * 65535).astype(np.uint16) + hsbk[:, 3] = kelvin + + return hsbk + + +def hsbk_array_to_list(hsbk: NDArray[np.uint16]) -> list[tuple[int, int, int, int]]: + """Convert NumPy HSBK array to list of tuples for the animator. + + Args: + hsbk: Array of shape (N, 4) with HSBK values + + Returns: + List of (H, S, B, K) tuples + """ + return [tuple(row) for row in hsbk.tolist()] # type: ignore[misc] + + +class NumpyFrameGenerator: + """Efficient frame generator using NumPy for vectorized operations.""" + + def __init__(self, pixel_count: int, width: int = 8, height: int = 8): + """Initialize the frame generator. + + Args: + pixel_count: Total number of pixels + width: Width for 2D calculations (for matrix devices) + height: Height for 2D calculations (for matrix devices) + """ + self.pixel_count = pixel_count + self.width = width + self.height = height + + # Pre-allocate arrays + self.hsbk = np.zeros((pixel_count, 4), dtype=np.uint16) + self.hsbk[:, 3] = 3500 # Default kelvin + + # Pre-compute coordinate grids for matrix effects + if width * height == pixel_count: + # Single tile or multizone + y_coords, x_coords = np.mgrid[0:height, 0:width] + self.x = x_coords.flatten().astype(np.float32) + self.y = y_coords.flatten().astype(np.float32) + else: + # Multi-tile: repeat coordinate grid + tiles = pixel_count // (width * height) + y_coords, x_coords = np.mgrid[0:height, 0:width] + self.x = np.tile(x_coords.flatten(), tiles).astype(np.float32) + self.y = np.tile(y_coords.flatten(), tiles).astype(np.float32) + + # Pre-compute center and distances for radial effects + self.center_x = width / 2 + self.center_y = height / 2 + self.dx = self.x - self.center_x + self.dy = self.y - self.center_y + self.distance = np.sqrt(self.dx**2 + self.dy**2) + self.angle = np.arctan2(self.dy, self.dx) + + # For linear effects (multizone) + self.index = np.arange(pixel_count, dtype=np.float32) + + def generate_rainbow_spiral( + self, time_offset: float + ) -> list[tuple[int, int, int, int]]: + """Generate a rainbow spiral pattern (good for matrix devices). + + Args: + time_offset: Animation time in seconds + + Returns: + List of HSBK tuples ready for the animator + """ + # Hue based on distance and angle, rotating over time + hue = ( + self.distance * 0.15 + self.angle / (2 * np.pi) + time_offset * 0.5 + ) % 1.0 + + # Brightness varies with distance from center + brightness = np.clip(1.0 - self.distance * 0.08, 0.3, 1.0) + + # Convert to protocol format + self.hsbk[:, 0] = (hue * 65535).astype(np.uint16) + self.hsbk[:, 1] = 65535 # Full saturation + self.hsbk[:, 2] = (brightness * 65535).astype(np.uint16) + + return hsbk_array_to_list(self.hsbk) + + def generate_rainbow_wave( + self, time_offset: float + ) -> list[tuple[int, int, int, int]]: + """Generate a rainbow wave pattern (good for multizone devices). + + Args: + time_offset: Animation time in seconds + + Returns: + List of HSBK tuples ready for the animator + """ + # Hue gradient along the strip, moving over time + hue = (self.index / self.pixel_count + time_offset * 0.3) % 1.0 + + # Brightness wave + wave_phase = self.index / self.pixel_count * 4 * np.pi + time_offset * 3 + brightness = 0.5 + 0.5 * np.sin(wave_phase) + + # Convert to protocol format + self.hsbk[:, 0] = (hue * 65535).astype(np.uint16) + self.hsbk[:, 1] = 65535 # Full saturation + self.hsbk[:, 2] = (brightness * 65535).astype(np.uint16) + + return hsbk_array_to_list(self.hsbk) + + def generate_plasma(self, time_offset: float) -> list[tuple[int, int, int, int]]: + """Generate a plasma effect using sine waves. + + Args: + time_offset: Animation time in seconds + + Returns: + List of HSBK tuples ready for the animator + """ + t = time_offset + + # Classic plasma formula with multiple sine waves + v1 = np.sin(self.x * 0.5 + t) + v2 = np.sin((self.y * 0.5 + t) * 0.5) + v3 = np.sin((self.x * 0.5 + self.y * 0.5 + t) * 0.5) + v4 = np.sin(np.sqrt(self.dx**2 + self.dy**2) * 0.5 + t) + + # Combine and normalize to 0-1 + v = (v1 + v2 + v3 + v4) / 4 + hue = (v + 1) / 2 # Convert from -1..1 to 0..1 + + # Brightness variation + brightness = 0.6 + 0.4 * np.sin(t * 2 + self.distance * 0.3) + + # Convert to protocol format + self.hsbk[:, 0] = (hue * 65535).astype(np.uint16) + self.hsbk[:, 1] = 65535 + self.hsbk[:, 2] = (brightness * 65535).astype(np.uint16) + + return hsbk_array_to_list(self.hsbk) + + def generate_fire(self, time_offset: float) -> list[tuple[int, int, int, int]]: + """Generate a fire effect (warm colors, flickering). + + Args: + time_offset: Animation time in seconds + + Returns: + List of HSBK tuples ready for the animator + """ + t = time_offset + + # Fire rises from bottom, so invert y + fire_y = (self.height - 1 - self.y) / self.height + + # Random-ish flickering using sine combinations + flicker = ( + np.sin(self.x * 2 + t * 10) * np.sin(self.y * 3 + t * 7) * np.sin(t * 15) + ) + flicker = (flicker + 1) / 2 * 0.3 # 0 to 0.3 variation + + # Brightness decreases toward top with flickering + brightness = np.clip(fire_y + flicker, 0, 1) + + # Hue from red (0) to yellow (60/360 = 0.167) based on height + hue = fire_y * 0.12 # Red to orange-yellow + + # Saturation decreases slightly at the tips (more white/yellow) + saturation = np.clip(1.0 - fire_y * 0.3, 0.7, 1.0) + + # Convert to protocol format + self.hsbk[:, 0] = (hue * 65535).astype(np.uint16) + self.hsbk[:, 1] = (saturation * 65535).astype(np.uint16) + self.hsbk[:, 2] = (brightness * 65535).astype(np.uint16) + self.hsbk[:, 3] = 2700 # Warm kelvin for fire + + return hsbk_array_to_list(self.hsbk) + + +async def run_animation( + device: MatrixLight | MultiZoneLight, + duration: float = 10.0, + fps: float = 30.0, + effect: str = "auto", +) -> None: + """Run animation with NumPy-optimized frame generation.""" + is_matrix = isinstance(device, MatrixLight) + + # Create animator + if is_matrix: + animator = await Animator.for_matrix(device) + tiles = device.device_chain + width = tiles[0].width if tiles else 8 + height = tiles[0].height if tiles else 8 + else: + animator = await Animator.for_multizone(device) + width = animator.pixel_count + height = 1 + + pixel_count = animator.pixel_count + print(f"\nPixel count: {pixel_count}") + print(f"Dimensions: {width}x{height}") + + # Print debug info + print_animator_info(animator) + + # Create frame generator + generator = NumpyFrameGenerator(pixel_count, width, height) + + # Select effect + if effect == "auto": + effect = "spiral" if is_matrix else "wave" + + effect_funcs = { + "spiral": generator.generate_rainbow_spiral, + "wave": generator.generate_rainbow_wave, + "plasma": generator.generate_plasma, + "fire": generator.generate_fire, + } + + if effect not in effect_funcs: + print(f"Unknown effect '{effect}', using 'spiral'") + effect = "spiral" + + generate_frame = effect_funcs[effect] + print(f"Effect: {effect}") + print(f"Duration: {duration:.1f}s at {fps:.0f} FPS") + print() + + # Animation loop + start_time = time.monotonic() + frame_count = 0 + total_packets = 0 + total_gen_time = 0.0 + total_send_time = 0.0 + last_status_time = start_time + + try: + while time.monotonic() - start_time < duration: + t = time.monotonic() - start_time + + # Generate frame (timed) + gen_start = time.perf_counter() + frame = generate_frame(t) + gen_end = time.perf_counter() + total_gen_time += gen_end - gen_start + + # Send frame (timed) - synchronous for maximum speed + send_start = time.perf_counter() + stats = animator.send_frame(frame) + send_end = time.perf_counter() + total_send_time += send_end - send_start + + frame_count += 1 + total_packets += stats.packets_sent + + # Print periodic status (every 2 seconds) + now = time.monotonic() + if now - last_status_time >= 2.0: + elapsed_so_far = now - start_time + current_fps = frame_count / elapsed_so_far + print( + f" [{elapsed_so_far:.1f}s] frames={frame_count}, " + f"packets={total_packets}, fps={current_fps:.1f}" + ) + last_status_time = now + + # Target FPS + await asyncio.sleep(1 / fps) + + except KeyboardInterrupt: + print("\nAnimation interrupted") + finally: + animator.close() + + # Print statistics + elapsed = time.monotonic() - start_time + actual_fps = frame_count / elapsed if elapsed > 0 else 0 + avg_gen_ms = (total_gen_time / frame_count * 1000) if frame_count > 0 else 0 + avg_send_ms = (total_send_time / frame_count * 1000) if frame_count > 0 else 0 + avg_packets_per_frame = total_packets / frame_count if frame_count > 0 else 0 + + print("\nAnimation complete!") + print(f" Frames: {frame_count}") + print(f" Duration: {elapsed:.1f}s") + print(f" Average FPS: {actual_fps:.1f}") + print(f" Total packets: {total_packets}") + print(f" Avg packets/frame: {avg_packets_per_frame:.2f}") + print("\nPerformance:") + print(f" Avg frame generation: {avg_gen_ms:.2f}ms") + print(f" Avg frame send: {avg_send_ms:.2f}ms") + print(f" Frame budget ({fps:.0f} FPS): {1000 / fps:.1f}ms") + + +async def main( + serial: str, + ip: str | None = None, + duration: float = 10.0, + fps: float = 30.0, + effect: str = "auto", +) -> None: + """Find device and run animation.""" + print("=" * 70) + print("LIFX Animation Example (NumPy)") + print("=" * 70) + + # Find the device + if ip: + print(f"\nSearching for device at IP: {ip}") + device = await find_by_ip(ip, timeout=5.0) + if device is None: + print(f"No device found at IP '{ip}'") + return + else: + print(f"\nSearching for device with serial: {serial}") + device = await find_by_serial(serial, timeout=5.0) + if device is None: + print(f"No device found with serial '{serial}'") + return + + print(f"Found: {type(device).__name__} at {device.ip}") + + # Connect and run animation + async with device: + _, power, label = await device.get_color() # type: ignore[union-attr] + print(f"Label: {label}") + + # Check device type + is_matrix = isinstance(device, MatrixLight) + is_multizone = isinstance(device, MultiZoneLight) + + # Print capability info for debugging + print("\n--- Device Capabilities ---") + print(f" Device class: {type(device).__name__}") + print(f" Is matrix: {is_matrix}") + print(f" Is multizone: {is_multizone}") + if device.capabilities: + caps = device.capabilities + print(f" has_matrix: {caps.has_matrix}") + print(f" has_multizone: {caps.has_multizone}") + print(f" has_extended_multizone: {caps.has_extended_multizone}") + else: + print(" capabilities: None (not detected)") + print("---------------------------") + + if not is_matrix and not is_multizone: + print("\nThis device does not support animations.") + print("Requires a Matrix or MultiZone device.") + return + + # Turn on if off + was_off = power == 0 + if was_off: + print("\nTurning device ON...") + await device.set_power(True) + await asyncio.sleep(1) + + try: + if is_matrix: + assert isinstance(device, MatrixLight) + await run_animation(device, duration, fps, effect) + else: + assert isinstance(device, MultiZoneLight) + await run_animation(device, duration, fps, effect) + finally: + if was_off: + print("\nTurning device back OFF...") + await device.set_power(False) + + print("\n" + "=" * 70) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Run NumPy-optimized animation on a LIFX device", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Effects: + auto - Automatically select based on device type (default) + spiral - Rainbow spiral pattern (best for matrix) + wave - Rainbow wave pattern (best for multizone) + plasma - Classic plasma effect + fire - Fire/flame effect + +Examples: + # Run with auto-selected effect + python 16_animation_numpy.py --serial d073d5123456 + + # Run plasma effect for 30 seconds at 60 FPS + python 16_animation_numpy.py --serial d073d5123456 --effect plasma \ + --duration 30 --fps 60 + + # Specify IP for faster connection + python 16_animation_numpy.py --serial d073d5123456 --ip 192.168.1.100 + """, + ) + parser.add_argument( + "--serial", + "-s", + required=True, + help="Device serial number (12 hex digits)", + ) + parser.add_argument( + "--ip", + "-i", + help="Optional IP address for faster connection", + ) + parser.add_argument( + "--duration", + "-d", + type=float, + default=10.0, + help="Animation duration in seconds (default: 10)", + ) + parser.add_argument( + "--fps", + "-f", + type=float, + default=30.0, + help="Target frames per second (default: 30)", + ) + parser.add_argument( + "--effect", + "-e", + default="auto", + choices=["auto", "spiral", "wave", "plasma", "fire"], + help="Animation effect (default: auto)", + ) + + args = parser.parse_args() + + try: + asyncio.run( + main( + args.serial, + args.ip, + args.duration, + args.fps, + args.effect, + ) + ) + except KeyboardInterrupt: + print("\n\nCancelled by user.") diff --git a/mkdocs.yml b/mkdocs.yml index 0eab53b..c11bc31 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -128,6 +128,7 @@ plugins: User Guide: - user-guide/themes.md: Working with color themes - user-guide/ceiling-lights.md: Controlling LIFX Ceiling uplight and downlight components + - user-guide/animation.md: High-frequency animation for real-time effects - user-guide/advanced-usage.md: Advanced usage patterns and best practices - user-guide/effects-custom.md: Creating custom light effects - user-guide/troubleshooting.md: Common issues and solutions @@ -141,6 +142,7 @@ plugins: - api/high-level.md: High-level discovery and batch operations API - api/themes.md: Theme system API - api/effects.md: Effects API + - api/animation.md: Animation module for high-frequency frame delivery - api/colors.md: Color utilities and HSBK conversion - api/devices.md: Device classes and capabilities - api/network.md: Network layer components @@ -198,6 +200,7 @@ nav: - User Guide: - Themes: user-guide/themes.md - Ceiling Lights: user-guide/ceiling-lights.md + - Animation: user-guide/animation.md - Advanced Usage: user-guide/advanced-usage.md - Custom Effects: user-guide/effects-custom.md - Troubleshooting: user-guide/troubleshooting.md @@ -211,6 +214,7 @@ nav: - High-Level API: api/high-level.md - Themes: api/themes.md - Effects: api/effects.md + - Animation: api/animation.md - Color Utilities: api/colors.md - Device Classes: api/devices.md - Network Layer: api/network.md diff --git a/src/lifx/__init__.py b/src/lifx/__init__.py index 04036c0..318d70c 100644 --- a/src/lifx/__init__.py +++ b/src/lifx/__init__.py @@ -7,6 +7,7 @@ from importlib.metadata import version as get_version +from lifx.animation import Animator, AnimatorStats from lifx.api import ( DeviceGroup, discover, @@ -95,6 +96,9 @@ "LIFXEffect", "EffectPulse", "EffectColorloop", + # Animation + "Animator", + "AnimatorStats", # Themes "Theme", "ThemeLibrary", diff --git a/src/lifx/animation/__init__.py b/src/lifx/animation/__init__.py new file mode 100644 index 0000000..05b2443 --- /dev/null +++ b/src/lifx/animation/__init__.py @@ -0,0 +1,87 @@ +"""LIFX Animation Module. + +This module provides efficient animation support for LIFX devices, +optimized for high-frequency frame delivery. + +Architecture: + FrameBuffer (orientation) -> PacketGenerator -> Direct UDP + +Key Components: + - Orientation: Tile orientation remapping with LRU-cached lookup tables + - FrameBuffer: Orientation mapping for matrix devices + - PacketGenerator: Device-specific packet generation (matrix, multizone) + - Animator: High-level class that sends frames via direct UDP + +Quick Start: + ```python + from lifx import Animator, MatrixLight + + async with await MatrixLight.from_ip("192.168.1.100") as device: + # Query device once for tile info + animator = await Animator.for_matrix(device) + + # Device connection closed - animator sends via direct UDP + while running: + # Send HSBK frame (protocol-ready uint16 values) + hsbk_frame = [(65535, 65535, 65535, 3500)] * animator.pixel_count + stats = animator.send_frame(hsbk_frame) # Synchronous for speed + print(f"Sent {stats.packets_sent} packets") + await asyncio.sleep(1 / 30) # 30 FPS + + animator.close() + ``` + +HSBK Format: + All color data uses protocol-ready uint16 values: + - Hue: 0-65535 (maps to 0-360 degrees) + - Saturation: 0-65535 (maps to 0.0-1.0) + - Brightness: 0-65535 (maps to 0.0-1.0) + - Kelvin: 1500-9000 +""" + +# Animator - High-level API +from lifx.animation.animator import ( + Animator, + AnimatorStats, +) + +# FrameBuffer - Orientation and canvas mapping +from lifx.animation.framebuffer import ( + FrameBuffer, + TileRegion, +) + +# Orientation - Tile remapping +from lifx.animation.orientation import ( + Orientation, + build_orientation_lut, +) + +# Packet generators +from lifx.animation.packets import ( + HEADER_SIZE, + SEQUENCE_OFFSET, + MatrixPacketGenerator, + MultiZonePacketGenerator, + PacketGenerator, + PacketTemplate, +) + +__all__ = [ + # Animator (high-level API) + "Animator", + "AnimatorStats", + # FrameBuffer + "FrameBuffer", + "TileRegion", + # Orientation + "Orientation", + "build_orientation_lut", + # Packet generators + "PacketGenerator", + "PacketTemplate", + "MatrixPacketGenerator", + "MultiZonePacketGenerator", + "HEADER_SIZE", + "SEQUENCE_OFFSET", +] diff --git a/src/lifx/animation/animator.py b/src/lifx/animation/animator.py new file mode 100644 index 0000000..7679d5b --- /dev/null +++ b/src/lifx/animation/animator.py @@ -0,0 +1,323 @@ +"""High-level Animator class for LIFX device animation. + +This module provides the Animator class, which sends animation frames +directly via UDP for maximum throughput - no connection layer overhead. + +The factory methods query the device once for configuration (tile info, +zone count), then the Animator sends frames via raw UDP packets with +prebaked packet templates for zero-allocation performance. + +Example: + ```python + from lifx.animation import Animator + + async with await MatrixLight.from_ip("192.168.1.100") as device: + # Query device once for tile info + animator = await Animator.for_matrix(device) + + # Device connection no longer needed - animator sends via direct UDP + while running: + stats = animator.send_frame(frame) + await asyncio.sleep(1 / 30) # 30 FPS + + animator.close() + ``` +""" + +from __future__ import annotations + +import random +import socket +import time +from dataclasses import dataclass +from typing import TYPE_CHECKING + +from lifx.animation.framebuffer import FrameBuffer +from lifx.animation.packets import ( + SEQUENCE_OFFSET, + MatrixPacketGenerator, + MultiZonePacketGenerator, + PacketGenerator, + PacketTemplate, +) +from lifx.const import LIFX_UDP_PORT +from lifx.protocol.models import Serial + +if TYPE_CHECKING: + from lifx.devices.matrix import MatrixLight + from lifx.devices.multizone import MultiZoneLight + + +@dataclass(frozen=True) +class AnimatorStats: + """Statistics about a frame send operation. + + Attributes: + packets_sent: Number of packets sent + total_time_ms: Total time for the operation in milliseconds + """ + + packets_sent: int + total_time_ms: float + + +class Animator: + """High-level animator for LIFX devices. + + Sends animation frames directly via UDP for maximum throughput. + No connection layer, no ACKs, no waiting - just fire packets as + fast as possible. + + All packets are prebaked at initialization time. Per-frame, only + color data and sequence numbers are updated in place before sending. + + Attributes: + pixel_count: Total number of pixels/zones + + Example: + ```python + async with await MatrixLight.from_ip("192.168.1.100") as device: + animator = await Animator.for_matrix(device) + + # No connection needed after this - direct UDP + while running: + stats = animator.send_frame(frame) + await asyncio.sleep(1 / 30) # 30 FPS + + animator.close() + ``` + """ + + def __init__( + self, + ip: str, + serial: Serial, + framebuffer: FrameBuffer, + packet_generator: PacketGenerator, + port: int = LIFX_UDP_PORT, + ) -> None: + """Initialize animator for direct UDP sending. + + Use the `for_matrix()` or `for_multizone()` class methods for + automatic configuration from a device. + + Args: + ip: Device IP address + serial: Device serial number + framebuffer: Configured FrameBuffer for orientation mapping + packet_generator: Configured PacketGenerator for the device + port: UDP port (default: 56700) + """ + self._ip = ip + self._port = port + self._serial = serial + self._framebuffer = framebuffer + self._packet_generator = packet_generator + + # Protocol source ID (random, identifies this client) + self._source = random.randint(1, 0xFFFFFFFF) # nosec B311 + + # Sequence number (0-255, wraps around) + self._sequence = 0 + + # Create prebaked packet templates + self._templates: list[PacketTemplate] = packet_generator.create_templates( + source=self._source, + target=serial.value, + ) + + # UDP socket (created lazily) + self._socket: socket.socket | None = None + + @classmethod + async def for_matrix( + cls, + device: MatrixLight, + ) -> Animator: + """Create an Animator configured for a MatrixLight device. + + Queries the device for tile information, then returns an animator + that sends frames via direct UDP (no device connection needed + after creation). + + Args: + device: MatrixLight device (must be connected) + + Returns: + Configured Animator instance + + Example: + ```python + async with await MatrixLight.from_ip("192.168.1.100") as device: + animator = await Animator.for_matrix(device) + + # Device connection closed, animator still works via UDP + while running: + stats = animator.send_frame(frame) + await asyncio.sleep(1 / 30) # 30 FPS + ``` + """ + # Get device info + ip = device.ip + serial = Serial.from_string(device.serial) + + # Ensure we have tile chain + if device.device_chain is None: + await device.get_device_chain() + + tiles = device.device_chain + if not tiles: + raise ValueError("Device has no tiles") + + # Create framebuffer with orientation correction + framebuffer = await FrameBuffer.for_matrix(device) + + # Create packet generator + packet_generator = MatrixPacketGenerator( + tile_count=len(tiles), + tile_width=tiles[0].width, + tile_height=tiles[0].height, + ) + + return cls(ip, serial, framebuffer, packet_generator) + + @classmethod + async def for_multizone( + cls, + device: MultiZoneLight, + ) -> Animator: + """Create an Animator configured for a MultiZoneLight device. + + Only devices with extended multizone capability are supported. + Queries the device for zone count, then returns an animator + that sends frames via direct UDP. + + Args: + device: MultiZoneLight device (must be connected and support + extended multizone protocol) + + Returns: + Configured Animator instance + + Raises: + ValueError: If device doesn't support extended multizone + + Example: + ```python + async with await MultiZoneLight.from_ip("192.168.1.100") as device: + animator = await Animator.for_multizone(device) + + # Device connection closed, animator still works via UDP + while running: + stats = animator.send_frame(frame) + await asyncio.sleep(1 / 30) # 30 FPS + ``` + """ + # Ensure capabilities are loaded + if device.capabilities is None: + await device._ensure_capabilities() + + # Check extended multizone capability + has_extended = bool( + device.capabilities and device.capabilities.has_extended_multizone + ) + if not has_extended: + raise ValueError( + "Device does not support extended multizone protocol. " + "Only extended multizone devices are supported for animation." + ) + + # Get device info + ip = device.ip + serial = Serial.from_string(device.serial) + + # Create framebuffer (no orientation for multizone) + framebuffer = await FrameBuffer.for_multizone(device) + + # Get zone count + zone_count = await device.get_zone_count() + + # Create packet generator + packet_generator = MultiZonePacketGenerator(zone_count=zone_count) + + return cls(ip, serial, framebuffer, packet_generator) + + @property + def pixel_count(self) -> int: + """Get total number of input pixels (canvas size for multi-tile).""" + # For multi-tile devices, this returns the canvas size + # For single-tile/multizone, this returns device pixel count + return self._framebuffer.canvas_size + + @property + def canvas_width(self) -> int: + """Get width of the logical canvas in pixels.""" + return self._framebuffer.canvas_width + + @property + def canvas_height(self) -> int: + """Get height of the logical canvas in pixels.""" + return self._framebuffer.canvas_height + + def send_frame( + self, + hsbk: list[tuple[int, int, int, int]], + ) -> AnimatorStats: + """Send a frame to the device via direct UDP. + + Applies orientation mapping (for matrix devices), updates colors + in prebaked packets, and sends them directly via UDP. No ACKs, + no waiting - maximum throughput. + + This is a synchronous method for minimum overhead. UDP sendto() + is non-blocking for datagrams. + + Args: + hsbk: Protocol-ready HSBK data for all pixels. + Each tuple is (hue, sat, brightness, kelvin) where + H/S/B are 0-65535 and K is 1500-9000. + + Returns: + AnimatorStats with operation statistics + + Raises: + ValueError: If hsbk length doesn't match pixel_count + """ + start_time = time.perf_counter() + + # Apply orientation mapping + device_data = self._framebuffer.apply(hsbk) + + # Update colors in prebaked templates + self._packet_generator.update_colors(self._templates, device_data) + + # Ensure socket exists + if self._socket is None: + self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self._socket.setblocking(False) + + # Send each packet, updating sequence number + for tmpl in self._templates: + tmpl.data[SEQUENCE_OFFSET] = self._sequence + self._sequence = (self._sequence + 1) % 256 + self._socket.sendto(tmpl.data, (self._ip, self._port)) + + end_time = time.perf_counter() + + return AnimatorStats( + packets_sent=len(self._templates), + total_time_ms=(end_time - start_time) * 1000, + ) + + def close(self) -> None: + """Close the UDP socket. + + Call this when done with the animator to free resources. + """ + if self._socket is not None: + self._socket.close() + self._socket = None + + def __del__(self) -> None: + """Clean up socket on garbage collection.""" + self.close() diff --git a/src/lifx/animation/framebuffer.py b/src/lifx/animation/framebuffer.py new file mode 100644 index 0000000..fe94c28 --- /dev/null +++ b/src/lifx/animation/framebuffer.py @@ -0,0 +1,395 @@ +"""Orientation mapping for LIFX matrix device animations. + +This module provides the FrameBuffer class, which handles pixel coordinate +remapping for matrix devices based on tile orientation. For animations, +every frame is assumed to change all pixels, so no diff tracking is performed. + +Multi-Tile Canvas Support: + For devices with multiple tiles (e.g., original LIFX Tile with 5 tiles), + the FrameBuffer creates a unified canvas based on tile positions (user_x, + user_y). The user provides colors for the entire canvas, and the FrameBuffer + extracts the appropriate region for each tile. + + Example with 5 tiles arranged horizontally: + - Canvas dimensions: 40x8 (5 tiles * 8 pixels wide) + - User provides 320 HSBK tuples (40*8) + - FrameBuffer extracts 64 pixels for each tile based on position + +Design Philosophy: + Colors are "protocol-ready" HSBK tuples - uint16 values matching the LIFX + protocol (0-65535 for H/S/B, 1500-9000 for K). + +Example: + ```python + from lifx.animation.framebuffer import FrameBuffer + + # Create framebuffer for a matrix device + fb = await FrameBuffer.for_matrix(matrix_device) + + # For multi-tile devices, check canvas dimensions + print(f"Canvas: {fb.canvas_width}x{fb.canvas_height}") # e.g., 40x8 + + # Provide colors for the entire canvas + canvas_colors = [(65535, 65535, 65535, 3500)] * (fb.canvas_width * fb.canvas_height) + device_order_data = fb.apply(canvas_colors) + ``` +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from lifx.devices.matrix import MatrixLight, TileInfo + from lifx.devices.multizone import MultiZoneLight + + +@dataclass(frozen=True) +class TileRegion: + """Region of a tile within the canvas. + + Attributes: + x: X offset in canvas coordinates + y: Y offset in canvas coordinates + width: Tile width in pixels + height: Tile height in pixels + orientation_lut: Lookup table for orientation remapping (optional) + """ + + x: int + y: int + width: int + height: int + orientation_lut: tuple[int, ...] | None = None + + +class FrameBuffer: + """Orientation mapping for matrix device animations. + + For matrix devices with tile orientation (like the original LIFX Tile), + this class remaps pixel coordinates from user-space (logical layout) to + device-space (physical tile order accounting for rotation). + + For multi-tile devices, the FrameBuffer creates a unified canvas where + each tile's position (user_x, user_y) determines which region of the + canvas it displays. This allows animations to span across all tiles + instead of being mirrored. + + For multizone devices and matrix devices without orientation, this is + essentially a passthrough. + + Attributes: + pixel_count: Total number of device pixels + canvas_width: Width of the logical canvas in pixels + canvas_height: Height of the logical canvas in pixels + tile_regions: List of tile regions with positions and orientations + + Example: + ```python + # Create for a device + fb = await FrameBuffer.for_matrix(matrix_device) + + # Check canvas dimensions + print(f"Canvas: {fb.canvas_width}x{fb.canvas_height}") + + # Provide canvas-sized input + canvas = [(0, 0, 65535, 3500)] * (fb.canvas_width * fb.canvas_height) + device_data = fb.apply(canvas) + ``` + """ + + def __init__( + self, + pixel_count: int, + canvas_width: int = 0, + canvas_height: int = 0, + tile_regions: list[TileRegion] | None = None, + ) -> None: + """Initialize framebuffer. + + Args: + pixel_count: Total number of device pixels + canvas_width: Width of the logical canvas (0 = same as pixel_count) + canvas_height: Height of the logical canvas (0 = 1 for linear) + tile_regions: List of tile regions with positions and orientations. + If provided, input is interpreted as a 2D canvas. + """ + if pixel_count < 0: + raise ValueError(f"pixel_count must be non-negative, got {pixel_count}") + + self._pixel_count = pixel_count + self._tile_regions = tile_regions + + # Canvas dimensions + if tile_regions: + # Calculate from tile regions + self._canvas_width = canvas_width + self._canvas_height = canvas_height + else: + # Linear (multizone) or single tile + self._canvas_width = canvas_width if canvas_width > 0 else pixel_count + self._canvas_height = canvas_height if canvas_height > 0 else 1 + + @classmethod + async def for_matrix( + cls, + device: MatrixLight, + ) -> FrameBuffer: + """Create a FrameBuffer configured for a MatrixLight device. + + Automatically determines pixel count from device chain and creates + appropriate mapping for tile orientations and positions. + + For multi-tile devices (has_chain capability), creates a unified canvas + based on tile positions (user_x, user_y). Each tile's position determines + which region of the canvas it displays, allowing animations to span + across all tiles. + + Args: + device: MatrixLight device (must be connected) + + Returns: + Configured FrameBuffer instance + + Example: + ```python + async with await MatrixLight.from_ip("192.168.1.100") as matrix: + fb = await FrameBuffer.for_matrix(matrix) + print(f"Canvas: {fb.canvas_width}x{fb.canvas_height}") + ``` + """ + # Ensure device chain is loaded + if device.device_chain is None: + await device.get_device_chain() + + tiles = device.device_chain + if not tiles: + raise ValueError("Device has no tiles") + + # Calculate total device pixels + pixel_count = sum(t.width * t.height for t in tiles) + + # Ensure capabilities are loaded + if device.capabilities is None: + await device._ensure_capabilities() + + # Only build canvas mapping for devices with chain capability. + # The original LIFX Tile is the only matrix device with accelerometer-based + # orientation detection and multi-tile positioning. Other matrix devices + # (Ceiling, Luna, Candle, Path, etc.) have fixed positions. + if device.capabilities and device.capabilities.has_chain: + return cls._for_multi_tile(tiles, pixel_count) + else: + # Single tile device - simple passthrough + first_tile = tiles[0] + return cls( + pixel_count=pixel_count, + canvas_width=first_tile.width, + canvas_height=first_tile.height, + ) + + @classmethod + def _for_multi_tile( + cls, + tiles: list[TileInfo], + pixel_count: int, + ) -> FrameBuffer: + """Create FrameBuffer for multi-tile device with canvas positioning. + + Uses user_x/user_y to determine where each tile sits in the canvas. + Coordinates are in tile-width units (1.0 = one tile width) and + represent the center of each tile. + """ + from lifx.animation.orientation import Orientation, build_orientation_lut + + if not tiles: # pragma: no cover + raise ValueError("No tiles provided") + + first_tile = tiles[0] + tile_width = first_tile.width + tile_height = first_tile.height + + # Convert tile center positions to pixel coordinates + # user_x/user_y are in "tile width" units, representing tile centers + tile_centers = [ + (int(round(t.user_x * tile_width)), int(round(t.user_y * tile_height))) + for t in tiles + ] + + # Calculate bounding box of all tile centers + min_cx = min(c[0] for c in tile_centers) + max_cx = max(c[0] for c in tile_centers) + min_cy = min(c[1] for c in tile_centers) + max_cy = max(c[1] for c in tile_centers) + + # Canvas extends from leftmost tile left edge to rightmost tile right edge + # Since centers are at tile_width/2 from edges: + # - Left edge of leftmost tile: min_cx - tile_width/2 + # - Right edge of rightmost tile: max_cx + tile_width/2 + # Total width = (max_cx - min_cx) + tile_width + canvas_width = (max_cx - min_cx) + tile_width + canvas_height = (max_cy - min_cy) + tile_height + + # Origin offset (to convert tile centers to top-left positions) + origin_x = min_cx - tile_width // 2 + origin_y = min_cy - tile_height // 2 + + # Build tile regions with canvas-relative positions + tile_regions: list[TileRegion] = [] + for tile, (cx, cy) in zip(tiles, tile_centers, strict=True): + # Convert center to top-left, relative to canvas origin + x = cx - tile_width // 2 - origin_x + y = cy - tile_height // 2 - origin_y + + # Build orientation LUT for this tile + orientation = Orientation.from_string(tile.nearest_orientation) + lut = build_orientation_lut(tile_width, tile_height, orientation) + + tile_regions.append( + TileRegion( + x=x, + y=y, + width=tile_width, + height=tile_height, + orientation_lut=lut, + ) + ) + + return cls( + pixel_count=pixel_count, + canvas_width=canvas_width, + canvas_height=canvas_height, + tile_regions=tile_regions, + ) + + @classmethod + async def for_multizone( + cls, + device: MultiZoneLight, + ) -> FrameBuffer: + """Create a FrameBuffer configured for a MultiZoneLight device. + + Automatically determines pixel count from zone count. + Multizone devices don't need permutation (zones are linear). + + Args: + device: MultiZoneLight device (must be connected) + + Returns: + Configured FrameBuffer instance + + Example: + ```python + async with await MultiZoneLight.from_ip("192.168.1.100") as strip: + fb = await FrameBuffer.for_multizone(strip) + ``` + """ + # Get zone count (fetches from device if not cached) + zone_count = await device.get_zone_count() + + return cls(pixel_count=zone_count) + + @property + def pixel_count(self) -> int: + """Get total number of device pixels.""" + return self._pixel_count + + @property + def canvas_width(self) -> int: + """Get width of the logical canvas in pixels.""" + return self._canvas_width + + @property + def canvas_height(self) -> int: + """Get height of the logical canvas in pixels.""" + return self._canvas_height + + @property + def canvas_size(self) -> int: + """Get total number of canvas pixels (width * height).""" + return self._canvas_width * self._canvas_height + + @property + def tile_regions(self) -> list[TileRegion] | None: + """Get tile regions if configured.""" + return self._tile_regions + + def apply( + self, hsbk: list[tuple[int, int, int, int]] + ) -> list[tuple[int, int, int, int]]: + """Apply orientation mapping to frame data. + + For multi-tile devices, the input is interpreted as a row-major 2D + canvas of size (canvas_width x canvas_height). Each tile extracts + its region from the canvas based on its position. + + For single-tile or multizone devices, this is a passthrough. + + Args: + hsbk: List of protocol-ready HSBK tuples. + - For multi-tile: length must match canvas_size + - For single-tile/multizone: length must match pixel_count + Each tuple is (hue, sat, brightness, kelvin) where + H/S/B are 0-65535 and K is 1500-9000. + + Returns: + Remapped HSBK data in device order + + Raises: + ValueError: If hsbk length doesn't match expected size + """ + # Multi-tile canvas mode + if self._tile_regions: + expected_size = self._canvas_width * self._canvas_height + if len(hsbk) != expected_size: + raise ValueError( + f"HSBK length ({len(hsbk)}) must match " + f"canvas_size ({expected_size})" + ) + return self._apply_canvas(hsbk) + + # Single-tile or multizone mode (passthrough) + if len(hsbk) != self._pixel_count: + raise ValueError( + f"HSBK length ({len(hsbk)}) must match " + f"pixel_count ({self._pixel_count})" + ) + + return list(hsbk) + + def _apply_canvas( + self, hsbk: list[tuple[int, int, int, int]] + ) -> list[tuple[int, int, int, int]]: + """Extract tile regions from canvas and apply orientation. + + Args: + hsbk: Row-major canvas data (canvas_width x canvas_height) + + Returns: + Device-ordered pixels (concatenated tiles) + """ + result: list[tuple[int, int, int, int]] = [] + canvas_width = self._canvas_width + + for region in self._tile_regions: # type: ignore[union-attr] + # Extract pixels for this tile from the canvas + tile_pixels: list[tuple[int, int, int, int]] = [] + + for row in range(region.height): + canvas_y = region.y + row + for col in range(region.width): + canvas_x = region.x + col + canvas_idx = canvas_y * canvas_width + canvas_x + tile_pixels.append(hsbk[canvas_idx]) + + # Apply orientation remapping for this tile + if region.orientation_lut: + tile_pixels = [ + tile_pixels[region.orientation_lut[i]] + for i in range(len(tile_pixels)) + ] + + result.extend(tile_pixels) + + return result diff --git a/src/lifx/animation/orientation.py b/src/lifx/animation/orientation.py new file mode 100644 index 0000000..beb8ef1 --- /dev/null +++ b/src/lifx/animation/orientation.py @@ -0,0 +1,159 @@ +"""Tile orientation remapping for LIFX matrix devices. + +This module provides utilities for remapping pixel coordinates based on tile +orientation, enabling correct display regardless of how tiles are physically +mounted. + +LIFX tiles report their orientation via accelerometer data. This module +converts that orientation into lookup tables for efficient pixel remapping +during animation. + +The key insight is that orientation affects how row-major framebuffer indices +map to physical tile positions. By pre-computing lookup tables (LUTs), we +can apply orientation correction with a single array lookup per pixel. + +Example: + ```python + from lifx.animation.orientation import Orientation, build_orientation_lut + + # Build LUT for a single 8x8 tile rotated 90 degrees + lut = build_orientation_lut(8, 8, Orientation.ROTATED_90) + + # Apply LUT to remap pixels + output = [framebuffer[lut[i]] for i in range(len(framebuffer))] + ``` +""" + +from __future__ import annotations + +from enum import IntEnum +from functools import lru_cache + + +class Orientation(IntEnum): + """Tile orientation based on accelerometer data. + + These values match the orientation detection in TileInfo.nearest_orientation + but use integer enum for efficient comparison and caching. + + Physical mounting positions: + - RIGHT_SIDE_UP: Normal position, no rotation needed + - ROTATED_90: Rotated 90 degrees clockwise (RotatedRight) + - ROTATED_180: Upside down (UpsideDown) + - ROTATED_270: Rotated 90 degrees counter-clockwise (RotatedLeft) + - FACE_UP: Tile facing ceiling + - FACE_DOWN: Tile facing floor + """ + + RIGHT_SIDE_UP = 0 # "Upright" + ROTATED_90 = 1 # "RotatedRight" + ROTATED_180 = 2 # "UpsideDown" + ROTATED_270 = 3 # "RotatedLeft" + FACE_UP = 4 # "FaceUp" + FACE_DOWN = 5 # "FaceDown" + + @classmethod + def from_string(cls, orientation_str: str) -> Orientation: + """Convert TileInfo.nearest_orientation string to Orientation enum. + + Args: + orientation_str: String from TileInfo.nearest_orientation + + Returns: + Corresponding Orientation enum value + + Raises: + ValueError: If orientation string is not recognized + """ + mapping = { + "Upright": cls.RIGHT_SIDE_UP, + "RotatedRight": cls.ROTATED_90, + "UpsideDown": cls.ROTATED_180, + "RotatedLeft": cls.ROTATED_270, + "FaceUp": cls.FACE_UP, + "FaceDown": cls.FACE_DOWN, + } + if orientation_str not in mapping: + raise ValueError(f"Unknown orientation: {orientation_str}") + return mapping[orientation_str] + + +@lru_cache(maxsize=64) +def build_orientation_lut( + width: int, + height: int, + orientation: Orientation, +) -> tuple[int, ...]: + """Build a lookup table for remapping pixels based on tile orientation. + + The LUT maps physical tile positions to row-major framebuffer indices. + For a pixel at physical position i, lut[i] gives the framebuffer index. + + This is LRU-cached because tiles typically have standard dimensions (8x8) + and there are only 6 orientations, so the cache will be highly effective. + + Args: + width: Tile width in pixels + height: Tile height in pixels + orientation: Tile orientation + + Returns: + Tuple of indices mapping physical position to framebuffer position. + Tuple is used instead of list for hashability in caches. + + Example: + >>> lut = build_orientation_lut(8, 8, Orientation.RIGHT_SIDE_UP) + >>> len(lut) + 64 + >>> lut[0] # First pixel maps to index 0 + 0 + >>> lut = build_orientation_lut(8, 8, Orientation.ROTATED_180) + >>> lut[0] # First physical position maps to last framebuffer index + 63 + """ + size = width * height + lut: list[int] = [0] * size + + for y in range(height): + for x in range(width): + # Physical position in row-major order + physical_idx = y * width + x + + # Calculate source position based on orientation + if orientation == Orientation.RIGHT_SIDE_UP: + # No transformation + src_x, src_y = x, y + elif orientation == Orientation.ROTATED_90: + # 90 degrees clockwise: (x, y) -> (height - 1 - y, x) + # Note: Only valid for square tiles. Non-square tiles would require + # a source buffer with swapped dimensions (e.g., 5x7 for a 7x5 tile). + # For non-square tiles, fall back to identity transformation. + if width == height: + src_x = height - 1 - y + src_y = x + else: + src_x, src_y = x, y + elif orientation == Orientation.ROTATED_180: + # 180 degrees: (x, y) -> (width - 1 - x, height - 1 - y) + # Works for both square and non-square tiles + src_x = width - 1 - x + src_y = height - 1 - y + elif orientation == Orientation.ROTATED_270: + # 270 degrees (90 counter-clockwise): (x, y) -> (y, width - 1 - x) + # Note: Only valid for square tiles. For non-square tiles, + # fall back to identity transformation. + if width == height: + src_x = y + src_y = width - 1 - x + else: + src_x, src_y = x, y + else: + # FACE_UP and FACE_DOWN: treat as right-side-up (no x/y rotation) + # The z-axis orientation doesn't affect 2D pixel mapping + src_x, src_y = x, y + + # Source index in row-major order + src_idx = src_y * width + src_x + lut[physical_idx] = src_idx + + return tuple(lut) diff --git a/src/lifx/animation/packets.py b/src/lifx/animation/packets.py new file mode 100644 index 0000000..e7bcb5c --- /dev/null +++ b/src/lifx/animation/packets.py @@ -0,0 +1,497 @@ +"""Device-specific packet generators for animation. + +This module provides packet generators that create prebaked packet templates +for high-performance animation. All packets (header + payload) are prebaked +at initialization time, and only color data and sequence numbers are updated +per frame. + +**Performance Optimization:** +- Complete packets (header + payload) are prebaked as bytearrays +- Per-frame updates only touch color bytes and sequence number +- Zero object allocation in the hot path +- Direct struct.pack_into for color updates + +Supported Devices: + - MatrixLight: Uses Set64 packets (64 pixels per packet per tile) + - MultiZoneLight: Uses SetExtendedColorZones (82 zones per packet) + +Example: + ```python + from lifx.animation.packets import MatrixPacketGenerator + + # Create generator and prebake packets + gen = MatrixPacketGenerator(tile_count=1, tile_width=8, tile_height=8) + templates = gen.create_templates(source=12345, target=b"\\xd0\\x73...") + + # Per-frame: update colors and send + gen.update_colors(templates, hsbk_data) + for tmpl in templates: + tmpl.data[23] = sequence # Update sequence byte + socket.sendto(tmpl.data, (ip, port)) + ``` +""" + +from __future__ import annotations + +import struct +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import ClassVar + +# Header constants +HEADER_SIZE = 36 +SEQUENCE_OFFSET = 23 # Offset of sequence byte in header + +# Header field values for animation packets +PROTOCOL_NUMBER = 1024 +ORIGIN = 0 +ADDRESSABLE = 1 +TAGGED = 0 +ACK_REQUIRED = 0 +RES_REQUIRED = 0 + + +@dataclass +class PacketTemplate: + """Prebaked packet template for zero-allocation animation. + + Contains a complete packet (header + payload) as a mutable bytearray. + Only the sequence byte and color data need to be updated per frame. + + Attributes: + data: Complete packet bytes (header + payload) + color_offset: Byte offset where color data starts + color_count: Number of HSBK colors in this packet + hsbk_start: Starting index in the input HSBK array + """ + + data: bytearray + color_offset: int + color_count: int + hsbk_start: int + + +def _build_header( + pkt_type: int, + source: int, + target: bytes, + payload_size: int, +) -> bytearray: + """Build a LIFX header as a bytearray. + + Args: + pkt_type: Packet type identifier + source: Client source ID + target: 6-byte device serial + payload_size: Size of payload in bytes + + Returns: + 36-byte header as bytearray + """ + header = bytearray(HEADER_SIZE) + + # Frame (8 bytes) + size = HEADER_SIZE + payload_size + protocol_field = ( + (ORIGIN & 0b11) << 14 + | (TAGGED & 0b1) << 13 + | (ADDRESSABLE & 0b1) << 12 + | (PROTOCOL_NUMBER & 0xFFF) + ) + struct.pack_into(" list[PacketTemplate]: + """Create prebaked packet templates. + + Args: + source: Client source ID for header + target: 6-byte device serial for header + + Returns: + List of PacketTemplate with prebaked packets + """ + + @abstractmethod + def update_colors( + self, templates: list[PacketTemplate], hsbk: list[tuple[int, int, int, int]] + ) -> None: + """Update color data in prebaked templates. + + Args: + templates: Prebaked packet templates + hsbk: Protocol-ready HSBK data for all pixels + """ + + @abstractmethod + def pixel_count(self) -> int: + """Get the total pixel count this generator expects.""" + + +class MatrixPacketGenerator(PacketGenerator): + """Packet generator for MatrixLight devices. + + Generates Set64 packets for all tiles. Uses prebaked packet templates + with complete headers for maximum performance. + + For standard tiles (≤64 pixels): + - Single Set64 packet directly to display buffer (fb_index=0) + + For large tiles (>64 pixels, e.g., Ceiling 16x8=128): + - Multiple Set64 packets to temp buffer (fb_index=1) + - CopyFrameBuffer packet to copy fb_index=1 → fb_index=0 + + Set64 Payload Layout (522 bytes): + - Offset 0: tile_index (uint8) + - Offset 1: length (uint8, always 1) + - Offset 2-5: TileBufferRect (fb_index, x, y, width - 4 x uint8) + - Offset 6-9: duration (uint32) + - Offset 10-521: colors (64 x HSBK, each 8 bytes) + + CopyFrameBuffer Payload Layout (15 bytes): + - Offset 0: tile_index (uint8) + - Offset 1: length (uint8, always 1) + - Offset 2: src_fb_index (uint8, 1 = temp buffer) + - Offset 3: dst_fb_index (uint8, 0 = display) + - Offset 4-7: src_x, src_y, dst_x, dst_y (uint8 each) + - Offset 8-9: width, height (uint8 each) + - Offset 10-13: duration (uint32) + - Offset 14: reserved (uint8) + """ + + # Packet types + SET64_PKT_TYPE: ClassVar[int] = 715 + COPY_FRAME_BUFFER_PKT_TYPE: ClassVar[int] = 716 + + # Set64 payload layout + _SET64_PAYLOAD_SIZE: ClassVar[int] = 522 + _COLORS_OFFSET_IN_PAYLOAD: ClassVar[int] = 10 + _MAX_COLORS_PER_PACKET: ClassVar[int] = 64 + + # CopyFrameBuffer payload layout + _COPY_FB_PAYLOAD_SIZE: ClassVar[int] = 15 + + def __init__( + self, + tile_count: int, + tile_width: int, + tile_height: int, + ) -> None: + """Initialize matrix packet generator. + + Args: + tile_count: Number of tiles in the device chain + tile_width: Width of each tile in pixels + tile_height: Height of each tile in pixels + """ + self._tile_count = tile_count + self._tile_width = tile_width + self._tile_height = tile_height + self._pixels_per_tile = tile_width * tile_height + self._total_pixels = tile_count * self._pixels_per_tile + + # Determine if we need large tile mode (>64 pixels per tile) + self._is_large_tile = self._pixels_per_tile > self._MAX_COLORS_PER_PACKET + + # Calculate packets needed per tile + self._rows_per_packet = self._MAX_COLORS_PER_PACKET // tile_width + self._packets_per_tile = ( + self._pixels_per_tile + self._MAX_COLORS_PER_PACKET - 1 + ) // self._MAX_COLORS_PER_PACKET + + @property + def is_large_tile(self) -> bool: + """Check if tiles have >64 pixels (requires multi-packet strategy).""" + return self._is_large_tile + + @property + def packets_per_tile(self) -> int: + """Get number of Set64 packets needed per tile.""" + return self._packets_per_tile + + def pixel_count(self) -> int: + """Get total pixel count.""" + return self._total_pixels + + def create_templates(self, source: int, target: bytes) -> list[PacketTemplate]: + """Create prebaked packet templates for all tiles. + + Args: + source: Client source ID + target: 6-byte device serial + + Returns: + List of PacketTemplate with complete prebaked packets + """ + if self._is_large_tile: + return self._create_large_tile_templates(source, target) + else: + return self._create_standard_templates(source, target) + + def _create_standard_templates( + self, source: int, target: bytes + ) -> list[PacketTemplate]: + """Create templates for standard tiles (≤64 pixels each).""" + templates: list[PacketTemplate] = [] + + for tile_idx in range(self._tile_count): + # Build header + header = _build_header( + self.SET64_PKT_TYPE, source, target, self._SET64_PAYLOAD_SIZE + ) + + # Build payload + payload = bytearray(self._SET64_PAYLOAD_SIZE) + payload[0] = tile_idx # tile_index + payload[1] = 1 # length + # TileBufferRect: fb_index=0, x=0, y=0, width=tile_width + struct.pack_into(" list[PacketTemplate]: + """Create templates for large tiles (>64 pixels each).""" + templates: list[PacketTemplate] = [] + + for tile_idx in range(self._tile_count): + tile_pixel_start = tile_idx * self._pixels_per_tile + + # Create Set64 packets for this tile + for pkt_idx in range(self._packets_per_tile): + color_start = pkt_idx * self._MAX_COLORS_PER_PACKET + color_end = min( + color_start + self._MAX_COLORS_PER_PACKET, + self._pixels_per_tile, + ) + color_count = color_end - color_start + + if color_count == 0: # pragma: no cover + continue + + # Calculate y offset for this chunk + y_offset = pkt_idx * self._rows_per_packet + + # Build header + header = _build_header( + self.SET64_PKT_TYPE, source, target, self._SET64_PAYLOAD_SIZE + ) + + # Build payload + payload = bytearray(self._SET64_PAYLOAD_SIZE) + payload[0] = tile_idx # tile_index + payload[1] = 1 # length + # TileBufferRect: fb_index=1 (temp), x=0, y=y_offset, width + struct.pack_into(" None: + """Update color data in prebaked templates. + + Args: + templates: Prebaked packet templates + hsbk: Protocol-ready HSBK data for all pixels + """ + for tmpl in templates: + if tmpl.color_count == 0: + continue # Skip CopyFrameBuffer packets + + for i in range(tmpl.color_count): + h, s, b, k = hsbk[tmpl.hsbk_start + i] + offset = tmpl.color_offset + i * 8 + struct.pack_into("82 zones, multiple packets are generated. + + SetExtendedColorZones Payload Layout (664 bytes): + - Offset 0-3: duration (uint32) + - Offset 4: apply (uint8, 1 = APPLY) + - Offset 5-6: zone_index (uint16) + - Offset 7: colors_count (uint8) + - Offset 8-663: colors (82 x HSBK, each 8 bytes) + """ + + SET_EXTENDED_COLOR_ZONES_PKT_TYPE: ClassVar[int] = 510 + + _PAYLOAD_SIZE: ClassVar[int] = 664 + _COLORS_OFFSET_IN_PAYLOAD: ClassVar[int] = 8 + _MAX_ZONES_PER_PACKET: ClassVar[int] = 82 + + def __init__(self, zone_count: int) -> None: + """Initialize multizone packet generator. + + Args: + zone_count: Total number of zones on the device + """ + self._zone_count = zone_count + self._packets_needed = ( + zone_count + self._MAX_ZONES_PER_PACKET - 1 + ) // self._MAX_ZONES_PER_PACKET + + def pixel_count(self) -> int: + """Get total zone count.""" + return self._zone_count + + def create_templates(self, source: int, target: bytes) -> list[PacketTemplate]: + """Create prebaked packet templates for all zones. + + Args: + source: Client source ID + target: 6-byte device serial + + Returns: + List of PacketTemplate with complete prebaked packets + """ + templates: list[PacketTemplate] = [] + + for pkt_idx in range(self._packets_needed): + zone_start = pkt_idx * self._MAX_ZONES_PER_PACKET + zone_end = min(zone_start + self._MAX_ZONES_PER_PACKET, self._zone_count) + zone_count = zone_end - zone_start + + # Build header + header = _build_header( + self.SET_EXTENDED_COLOR_ZONES_PKT_TYPE, + source, + target, + self._PAYLOAD_SIZE, + ) + + # Build payload + payload = bytearray(self._PAYLOAD_SIZE) + # duration = 0 + struct.pack_into(" None: + """Update color data in prebaked templates. + + Args: + templates: Prebaked packet templates + hsbk: Protocol-ready HSBK data for all zones + """ + for tmpl in templates: + for i in range(tmpl.color_count): + h, s, b, k = hsbk[tmpl.hsbk_start + i] + offset = tmpl.color_offset + i * 8 + struct.pack_into(" int: + """Get total number of zones on this tile.""" + return self.width * self.height + + @property + def requires_frame_buffer(self) -> bool: + """Check if tile has more than 64 zones.""" + return self.total_zones > 64 + + @property + def nearest_orientation(self) -> str: + """Determine the orientation of the tile from accelerometer data.""" + abs_x = abs(self.accel_meas_x) + abs_y = abs(self.accel_meas_y) + abs_z = abs(self.accel_meas_z) + + if ( + self.accel_meas_x == -1 + and self.accel_meas_y == -1 + and self.accel_meas_z == -1 + ): + return "Upright" + + elif abs_x > abs_y and abs_x > abs_z: + if self.accel_meas_x > 0: + return "RotatedRight" + else: + return "RotatedLeft" + + elif abs_z > abs_x and abs_z > abs_y: + if self.accel_meas_z > 0: + return "FaceDown" + else: + return "FaceUp" + + else: + if self.accel_meas_y > 0: + return "UpsideDown" + else: + return "Upright" + + +@pytest.fixture +def mock_tile_upright() -> MockTileInfo: + """Create a mock 8x8 tile in upright orientation.""" + return MockTileInfo( + tile_index=0, + width=8, + height=8, + accel_meas_x=0, + accel_meas_y=-100, + accel_meas_z=0, + ) + + +@pytest.fixture +def mock_tile_rotated_90() -> MockTileInfo: + """Create a mock 8x8 tile rotated 90 degrees (RotatedRight).""" + return MockTileInfo( + tile_index=0, + width=8, + height=8, + accel_meas_x=100, # Positive X = RotatedRight + accel_meas_y=0, + accel_meas_z=0, + ) + + +@pytest.fixture +def mock_tile_rotated_180() -> MockTileInfo: + """Create a mock 8x8 tile rotated 180 degrees (UpsideDown).""" + return MockTileInfo( + tile_index=0, + width=8, + height=8, + accel_meas_x=0, + accel_meas_y=100, # Positive Y = UpsideDown + accel_meas_z=0, + ) + + +@pytest.fixture +def mock_tile_rotated_270() -> MockTileInfo: + """Create a mock 8x8 tile rotated 270 degrees (RotatedLeft).""" + return MockTileInfo( + tile_index=0, + width=8, + height=8, + accel_meas_x=-100, # Negative X = RotatedLeft + accel_meas_y=0, + accel_meas_z=0, + ) + + +@pytest.fixture +def mock_tile_chain() -> list[MockTileInfo]: + """Create a mock chain of 3 tiles with different orientations.""" + return [ + MockTileInfo( + tile_index=0, + width=8, + height=8, + accel_meas_x=0, + accel_meas_y=-100, # Upright + accel_meas_z=0, + ), + MockTileInfo( + tile_index=1, + width=8, + height=8, + accel_meas_x=100, # RotatedRight + accel_meas_y=0, + accel_meas_z=0, + ), + MockTileInfo( + tile_index=2, + width=8, + height=8, + accel_meas_x=0, + accel_meas_y=100, # UpsideDown + accel_meas_z=0, + ), + ] + + +@pytest.fixture +def mock_multizone_device() -> MagicMock: + """Create a mock MultiZoneLight device.""" + device = MagicMock() + device.capabilities = MagicMock() + device.capabilities.has_extended_multizone = True + device._zone_count = 82 + return device + + +@pytest.fixture +def mock_matrix_device() -> MagicMock: + """Create a mock MatrixLight device.""" + device = MagicMock() + device._device_chain = [ + MockTileInfo(tile_index=0, width=8, height=8), + ] + return device diff --git a/tests/test_animation/test_animator.py b/tests/test_animation/test_animator.py new file mode 100644 index 0000000..ebbf0ac --- /dev/null +++ b/tests/test_animation/test_animator.py @@ -0,0 +1,482 @@ +"""Tests for the high-level Animator class.""" + +from __future__ import annotations + +import asyncio +import socket +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from lifx.animation.animator import Animator, AnimatorStats +from lifx.animation.framebuffer import FrameBuffer +from lifx.animation.packets import MatrixPacketGenerator +from lifx.protocol.models import Serial + + +class TestAnimatorStats: + """Tests for AnimatorStats dataclass.""" + + def test_stats_fields(self) -> None: + """Test AnimatorStats has correct fields.""" + stats = AnimatorStats( + packets_sent=5, + total_time_ms=10.5, + ) + assert stats.packets_sent == 5 + assert stats.total_time_ms == 10.5 + + def test_frozen_dataclass(self) -> None: + """Verify AnimatorStats is immutable.""" + stats = AnimatorStats(packets_sent=5, total_time_ms=10.5) + with pytest.raises(AttributeError): + stats.packets_sent = 10 # type: ignore[misc] + + +class TestAnimatorConstruction: + """Tests for Animator construction and properties.""" + + def test_init_with_components(self) -> None: + """Test direct constructor works.""" + framebuffer = FrameBuffer(pixel_count=64) + packet_generator = MatrixPacketGenerator( + tile_count=1, tile_width=8, tile_height=8 + ) + serial = Serial.from_string("d073d5123456") + + animator = Animator( + ip="192.168.1.100", + serial=serial, + framebuffer=framebuffer, + packet_generator=packet_generator, + ) + + assert animator.pixel_count == 64 + + def test_canvas_width_property(self) -> None: + """Test canvas_width property delegation.""" + framebuffer = FrameBuffer(pixel_count=64, canvas_width=8, canvas_height=8) + packet_generator = MatrixPacketGenerator( + tile_count=1, tile_width=8, tile_height=8 + ) + serial = Serial.from_string("d073d5123456") + + animator = Animator( + ip="192.168.1.100", + serial=serial, + framebuffer=framebuffer, + packet_generator=packet_generator, + ) + + assert animator.canvas_width == 8 + assert animator.canvas_height == 8 + + def test_pixel_count_from_framebuffer(self) -> None: + """Test pixel_count property delegation.""" + framebuffer = FrameBuffer(pixel_count=128) + packet_generator = MatrixPacketGenerator( + tile_count=2, tile_width=8, tile_height=8 + ) + serial = Serial.from_string("d073d5123456") + + animator = Animator( + ip="192.168.1.100", + serial=serial, + framebuffer=framebuffer, + packet_generator=packet_generator, + ) + + assert animator.pixel_count == 128 + + +class TestAnimatorSendFrame: + """Tests for Animator.send_frame method.""" + + @pytest.fixture + def animator(self) -> Animator: + """Create an animator for testing.""" + framebuffer = FrameBuffer(pixel_count=64) + packet_generator = MatrixPacketGenerator( + tile_count=1, tile_width=8, tile_height=8 + ) + serial = Serial.from_string("d073d5123456") + + return Animator( + ip="192.168.1.100", + serial=serial, + framebuffer=framebuffer, + packet_generator=packet_generator, + ) + + def test_send_frame_wrong_length_raises(self, animator: Animator) -> None: + """Test that wrong Color array length raises ValueError.""" + hsbk: list[tuple[int, int, int, int]] = [ + (100, 100, 100, 3500) + ] * 32 # Wrong length + + with pytest.raises(ValueError, match="must match pixel_count"): + animator.send_frame(hsbk) + + def test_send_frame_sends_packets(self, animator: Animator) -> None: + """Test that send_frame sends packets via UDP.""" + hsbk: list[tuple[int, int, int, int]] = [(100, 100, 100, 3500)] * 64 + + # Mock the socket + with patch.object(socket, "socket") as mock_socket_class: + mock_sock = MagicMock() + mock_socket_class.return_value = mock_sock + + stats = animator.send_frame(hsbk) + + assert stats.packets_sent >= 1 + mock_sock.sendto.assert_called() + + def test_send_frame_returns_stats(self, animator: Animator) -> None: + """Test that send_frame returns AnimatorStats.""" + hsbk: list[tuple[int, int, int, int]] = [(100, 100, 100, 3500)] * 64 + + with patch.object(socket, "socket") as mock_socket_class: + mock_sock = MagicMock() + mock_socket_class.return_value = mock_sock + + stats = animator.send_frame(hsbk) + + assert isinstance(stats, AnimatorStats) + assert stats.packets_sent >= 1 + assert stats.total_time_ms >= 0 + + def test_send_frame_is_synchronous(self, animator: Animator) -> None: + """Test that send_frame is synchronous (not a coroutine).""" + import inspect + + assert not inspect.iscoroutinefunction(animator.send_frame) + + def test_send_frame_reuses_socket(self, animator: Animator) -> None: + """Test that send_frame reuses the same socket.""" + hsbk: list[tuple[int, int, int, int]] = [(100, 100, 100, 3500)] * 64 + + with patch.object(socket, "socket") as mock_socket_class: + mock_sock = MagicMock() + mock_socket_class.return_value = mock_sock + + # Send multiple frames + animator.send_frame(hsbk) + animator.send_frame(hsbk) + animator.send_frame(hsbk) + + # Socket should only be created once + assert mock_socket_class.call_count == 1 + + def test_send_frame_sends_to_correct_address(self, animator: Animator) -> None: + """Test that packets are sent to correct IP:port.""" + hsbk: list[tuple[int, int, int, int]] = [(100, 100, 100, 3500)] * 64 + + with patch.object(socket, "socket") as mock_socket_class: + mock_sock = MagicMock() + mock_socket_class.return_value = mock_sock + + animator.send_frame(hsbk) + + # Check sendto was called with correct address + call_args = mock_sock.sendto.call_args + address = call_args[0][1] + assert address == ("192.168.1.100", 56700) + + def test_close_closes_socket(self, animator: Animator) -> None: + """Test that close() closes the socket.""" + hsbk: list[tuple[int, int, int, int]] = [(100, 100, 100, 3500)] * 64 + + with patch.object(socket, "socket") as mock_socket_class: + mock_sock = MagicMock() + mock_socket_class.return_value = mock_sock + + animator.send_frame(hsbk) + animator.close() + + mock_sock.close.assert_called_once() + + +class TestAnimatorForMatrixFactory: + """Tests for Animator.for_matrix factory method.""" + + @pytest.mark.asyncio + async def test_for_matrix_fetches_device_chain_when_none(self) -> None: + """Test for_matrix fetches device chain if not already loaded.""" + tile = MagicMock() + tile.width = 8 + tile.height = 8 + tile.user_x = 0.0 + tile.user_y = 0.0 + tile.nearest_orientation = "Upright" + + device = MagicMock() + device.ip = "192.168.1.100" + device.serial = "d073d5123456" + device.device_chain = None # Not loaded yet + device.capabilities = MagicMock() + device.capabilities.has_chain = False + + # get_device_chain should be called and populate device_chain + async def mock_get_device_chain() -> list: + device.device_chain = [tile] + return [tile] + + device.get_device_chain = mock_get_device_chain + + animator = await Animator.for_matrix(device) + + assert animator.pixel_count == 64 + + +class TestAnimatorForMultizoneFactory: + """Tests for Animator.for_multizone factory method.""" + + @pytest.mark.asyncio + async def test_for_multizone_no_extended_capability_raises(self) -> None: + """Test for_multizone raises error when device lacks extended multizone.""" + device = MagicMock() + device.capabilities = MagicMock() + device.capabilities.has_extended_multizone = False + + with pytest.raises(ValueError, match="extended multizone"): + await Animator.for_multizone(device) + + @pytest.mark.asyncio + async def test_for_multizone_loads_capabilities_when_none(self) -> None: + """Test for_multizone calls _ensure_capabilities when None. + + If capabilities haven't been fetched, we should load them first. + Then if device doesn't support extended multizone, raise error. + """ + device = MagicMock() + device.capabilities = None + + # Mock _ensure_capabilities to set capabilities without extended multizone + async def set_capabilities() -> None: + device.capabilities = MagicMock() + device.capabilities.has_extended_multizone = False + + device._ensure_capabilities = AsyncMock(side_effect=set_capabilities) + + with pytest.raises(ValueError, match="extended multizone"): + await Animator.for_multizone(device) + + # Verify _ensure_capabilities was called + device._ensure_capabilities.assert_called_once() + + +@pytest.mark.emulator +class TestAnimatorForMatrixIntegration: + """Integration tests for Animator.for_matrix with emulator.""" + + async def test_for_matrix_creates_animator(self, emulator_devices) -> None: + """Test factory method works with real device.""" + from lifx.devices.matrix import MatrixLight + + # Find the matrix device + matrix = None + for device in emulator_devices: + if isinstance(device, MatrixLight): + matrix = device + break + + assert matrix is not None, "No MatrixLight in emulator_devices" + + async with matrix: + animator = await Animator.for_matrix(matrix) + + assert animator.pixel_count > 0 + + async def test_send_frame_sends_packets(self, emulator_devices) -> None: + """Test send_frame sends packets.""" + from lifx.devices.matrix import MatrixLight + + matrix = None + for device in emulator_devices: + if isinstance(device, MatrixLight): + matrix = device + break + + assert matrix is not None + + async with matrix: + animator = await Animator.for_matrix(matrix) + + # Create frame + hsbk: list[tuple[int, int, int, int]] = [ + (65535, 65535, 32768, 3500) + ] * animator.pixel_count + stats = animator.send_frame(hsbk) + + assert stats.packets_sent >= 1 + + animator.close() + + async def test_animation_loop_simulation(self, emulator_devices) -> None: + """Test multiple frames in sequence.""" + from lifx.devices.matrix import MatrixLight + + matrix = None + for device in emulator_devices: + if isinstance(device, MatrixLight): + matrix = device + break + + assert matrix is not None + + async with matrix: + animator = await Animator.for_matrix(matrix) + + total_packets = 0 + for frame_num in range(5): + # Create frame with shifting colors + hsbk: list[tuple[int, int, int, int]] = [] + for i in range(animator.pixel_count): + hue = ((i + frame_num * 10) * 1000) % 65536 + hsbk.append((hue, 65535, 32768, 3500)) + + stats = animator.send_frame(hsbk) + total_packets += stats.packets_sent + + # Small delay between frames + await asyncio.sleep(0.01) + + # Should have sent packets for multiple frames + assert total_packets >= 5 + + animator.close() + + +@pytest.mark.emulator +class TestAnimatorForMultizoneIntegration: + """Integration tests for Animator.for_multizone with emulator.""" + + async def test_for_multizone_creates_animator(self, emulator_devices) -> None: + """Test factory method works with real device.""" + from lifx.devices.multizone import MultiZoneLight + + multizone = None + for device in emulator_devices: + if isinstance(device, MultiZoneLight): + multizone = device + break + + assert multizone is not None, "No MultiZoneLight in emulator_devices" + + async with multizone: + animator = await Animator.for_multizone(multizone) + + assert animator.pixel_count > 0 + + animator.close() + + async def test_send_frame_extended_protocol(self, emulator_devices) -> None: + """Test extended multizone sends packets.""" + from lifx.devices.multizone import MultiZoneLight + + multizone = None + for device in emulator_devices: + if isinstance(device, MultiZoneLight): + multizone = device + break + + assert multizone is not None + + async with multizone: + animator = await Animator.for_multizone(multizone) + + hsbk: list[tuple[int, int, int, int]] = [ + (65535, 65535, 32768, 3500) + ] * animator.pixel_count + stats = animator.send_frame(hsbk) + + assert stats.packets_sent >= 1 + + animator.close() + + async def test_animation_loop_simulation(self, emulator_devices) -> None: + """Test multiple frames in sequence.""" + from lifx.devices.multizone import MultiZoneLight + + multizone = None + for device in emulator_devices: + if isinstance(device, MultiZoneLight): + multizone = device + break + + assert multizone is not None + + async with multizone: + animator = await Animator.for_multizone(multizone) + + total_packets = 0 + for frame_num in range(5): + # Create frame with shifting colors + hsbk: list[tuple[int, int, int, int]] = [] + for i in range(animator.pixel_count): + hue = ((i + frame_num * 10) * 1000) % 65536 + hsbk.append((hue, 65535, 32768, 3500)) + + stats = animator.send_frame(hsbk) + total_packets += stats.packets_sent + + # Small delay between frames + await asyncio.sleep(0.01) + + # Should have sent packets for multiple frames + assert total_packets >= 5 + + animator.close() + + +@pytest.mark.emulator +class TestAnimatorErrorHandling: + """Integration tests for Animator error handling.""" + + async def test_send_frame_wrong_length_raises(self, emulator_devices) -> None: + """Test wrong Color array length raises error.""" + from lifx.devices.matrix import MatrixLight + + matrix = None + for device in emulator_devices: + if isinstance(device, MatrixLight): + matrix = device + break + + assert matrix is not None + + async with matrix: + animator = await Animator.for_matrix(matrix) + + # Wrong length + hsbk: list[tuple[int, int, int, int]] = [(100, 100, 100, 3500)] * ( + animator.pixel_count // 2 + ) + + with pytest.raises(ValueError, match="must match"): + animator.send_frame(hsbk) + + animator.close() + + async def test_for_matrix_no_tiles_raises(self, emulator_devices) -> None: + """Test for_matrix raises when device has no tiles.""" + from lifx.devices.matrix import MatrixLight + + matrix = None + for device in emulator_devices: + if isinstance(device, MatrixLight): + matrix = device + break + + assert matrix is not None + + async with matrix: + # Temporarily clear device chain to simulate no tiles + original_chain = matrix._device_chain + matrix._device_chain = [] + + with pytest.raises(ValueError, match="no tiles"): + await Animator.for_matrix(matrix) + + # Restore + matrix._device_chain = original_chain diff --git a/tests/test_animation/test_framebuffer.py b/tests/test_animation/test_framebuffer.py new file mode 100644 index 0000000..d72a7a3 --- /dev/null +++ b/tests/test_animation/test_framebuffer.py @@ -0,0 +1,380 @@ +"""Tests for FrameBuffer orientation mapping.""" + +from __future__ import annotations + +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from lifx.animation.framebuffer import FrameBuffer, TileRegion + + +class TestFrameBuffer: + """Tests for FrameBuffer class.""" + + def test_init_default(self) -> None: + """Test default initialization.""" + fb = FrameBuffer(pixel_count=64) + assert fb.pixel_count == 64 + assert fb.canvas_width == 64 + assert fb.canvas_height == 1 + + def test_init_with_canvas_dimensions(self) -> None: + """Test initialization with canvas dimensions.""" + fb = FrameBuffer(pixel_count=64, canvas_width=8, canvas_height=8) + assert fb.pixel_count == 64 + assert fb.canvas_width == 8 + assert fb.canvas_height == 8 + assert fb.canvas_size == 64 + + def test_init_invalid_pixel_count(self) -> None: + """Test that negative pixel count raises error.""" + with pytest.raises(ValueError, match="non-negative"): + FrameBuffer(pixel_count=-1) + + def test_apply_passthrough(self) -> None: + """Test apply returns copy of data (passthrough mode).""" + fb = FrameBuffer(pixel_count=4) + data: list[tuple[int, int, int, int]] = [ + (100, 100, 100, 3500), + (200, 200, 200, 3500), + (300, 300, 300, 3500), + (400, 400, 400, 3500), + ] + + result = fb.apply(data) + + assert result == data + assert result is not data # Should be a copy + + def test_apply_invalid_length(self) -> None: + """Test that wrong data length raises error.""" + fb = FrameBuffer(pixel_count=64) + data: list[tuple[int, int, int, int]] = [(100, 100, 100, 3500)] * 32 + + with pytest.raises(ValueError, match="must match pixel_count"): + fb.apply(data) + + +class TestFrameBufferClassMethods: + """Tests for FrameBuffer class methods.""" + + @pytest.mark.asyncio + async def test_for_matrix_no_chain(self) -> None: + """Test for_matrix raises error when device has no chain.""" + device = MagicMock() + device.device_chain = [] + device.get_device_chain = AsyncMock(return_value=[]) + + with pytest.raises(ValueError, match="no tiles"): + await FrameBuffer.for_matrix(device) + + @pytest.mark.asyncio + async def test_for_matrix_fetches_chain_when_none(self, mock_tile_upright) -> None: + """Test for_matrix fetches device chain if not already loaded.""" + device = MagicMock() + device.device_chain = None # Not loaded yet + device.capabilities = MagicMock() + device.capabilities.has_chain = False + + # get_device_chain should be called and populate device_chain + async def mock_get_device_chain() -> list: + device.device_chain = [mock_tile_upright] + return [mock_tile_upright] + + device.get_device_chain = mock_get_device_chain + + fb = await FrameBuffer.for_matrix(device) + + assert fb.pixel_count == 64 + + @pytest.mark.asyncio + async def test_for_matrix_single_tile_with_chain_capability( + self, mock_tile_upright + ) -> None: + """Test for_matrix with single tile and chain capability (LIFX Tile). + + Devices with chain capability support accelerometer-based orientation + detection, so permutation should be built. + """ + device = MagicMock() + device.device_chain = [mock_tile_upright] + device.get_device_chain = AsyncMock(return_value=[mock_tile_upright]) + # LIFX Tile has chain capability + device.capabilities = MagicMock() + device.capabilities.has_chain = True + + fb = await FrameBuffer.for_matrix(device) + + # Multi-tile mode uses tile_regions instead of permutation + assert fb.canvas_size == 64 # 8x8 canvas + assert fb.canvas_width == 8 + assert fb.canvas_height == 8 + assert fb.tile_regions is not None + assert len(fb.tile_regions) == 1 + + @pytest.mark.asyncio + async def test_for_matrix_single_tile_without_chain_capability( + self, mock_tile_upright + ) -> None: + """Test for_matrix with single tile but no chain capability. + + Devices without chain capability don't have accelerometer-based orientation + detection, so tile_regions should be None (passthrough mode). + """ + device = MagicMock() + device.device_chain = [mock_tile_upright] + device.get_device_chain = AsyncMock(return_value=[mock_tile_upright]) + # Luna, Candle, Path, etc. don't have chain capability + device.capabilities = MagicMock() + device.capabilities.has_chain = False + + fb = await FrameBuffer.for_matrix(device) + + assert fb.pixel_count == 64 # 8x8 tile + assert fb.canvas_width == 8 + assert fb.canvas_height == 8 + assert fb.tile_regions is None # No tile regions for non-chain devices + + @pytest.mark.asyncio + async def test_for_matrix_loads_capabilities_when_none( + self, mock_tile_upright + ) -> None: + """Test for_matrix calls _ensure_capabilities when None. + + If capabilities haven't been fetched, we should load them first. + """ + device = MagicMock() + device.device_chain = [mock_tile_upright] + device.get_device_chain = AsyncMock(return_value=[mock_tile_upright]) + device.capabilities = None + + # Mock _ensure_capabilities to set capabilities without has_chain + async def set_capabilities() -> None: + device.capabilities = MagicMock() + device.capabilities.has_chain = False + + device._ensure_capabilities = AsyncMock(side_effect=set_capabilities) + + fb = await FrameBuffer.for_matrix(device) + + # Verify _ensure_capabilities was called + device._ensure_capabilities.assert_called_once() + # Without has_chain, should use passthrough mode + assert fb.pixel_count == 64 + assert fb.tile_regions is None + + @pytest.mark.asyncio + async def test_for_multizone(self) -> None: + """Test for_multizone creates correct framebuffer.""" + device = MagicMock() + device.get_zone_count = AsyncMock(return_value=82) + + fb = await FrameBuffer.for_multizone(device) + + assert fb.pixel_count == 82 + assert fb.canvas_width == 82 + assert fb.canvas_height == 1 + assert fb.tile_regions is None # No tile regions for multizone + + +class TestFrameBufferMultiTileCanvas: + """Tests for multi-tile canvas functionality.""" + + def test_tile_region_dataclass(self) -> None: + """Test TileRegion dataclass.""" + lut = tuple(range(64)) + region = TileRegion(x=8, y=0, width=8, height=8, orientation_lut=lut) + + assert region.x == 8 + assert region.y == 0 + assert region.width == 8 + assert region.height == 8 + assert region.orientation_lut == lut + + def test_init_with_tile_regions(self) -> None: + """Test initialization with tile regions.""" + regions = [ + TileRegion(x=0, y=0, width=8, height=8), + TileRegion(x=8, y=0, width=8, height=8), + ] + fb = FrameBuffer( + pixel_count=128, # 2 tiles * 64 pixels + canvas_width=16, + canvas_height=8, + tile_regions=regions, + ) + + assert fb.canvas_width == 16 + assert fb.canvas_height == 8 + assert fb.canvas_size == 128 + assert fb.tile_regions == regions + + def test_apply_canvas_two_tiles_horizontal(self) -> None: + """Test applying canvas to two horizontally arranged tiles.""" + # Two 4x2 tiles arranged horizontally + # Tile 0: canvas[0:4, 0:2] + # Tile 1: canvas[4:8, 0:2] + regions = [ + TileRegion(x=0, y=0, width=4, height=2), + TileRegion(x=4, y=0, width=4, height=2), + ] + fb = FrameBuffer( + pixel_count=16, # 2 tiles * 8 pixels + canvas_width=8, + canvas_height=2, + tile_regions=regions, + ) + + # Canvas: 8x2 grid, row-major + # Row 0: 0 1 2 3 | 4 5 6 7 + # Row 1: 8 9 10 11 | 12 13 14 15 + canvas: list[tuple[int, int, int, int]] = [ + (i * 1000, 0, 0, 3500) for i in range(16) + ] + + result = fb.apply(canvas) + + # Expected: Tile 0 gets [0,1,2,3,8,9,10,11], Tile 1 gets [4,5,6,7,12,13,14,15] + assert len(result) == 16 + + # Tile 0 (first 8 pixels) + assert result[0] == (0, 0, 0, 3500) # canvas[0,0] + assert result[1] == (1000, 0, 0, 3500) # canvas[1,0] + assert result[2] == (2000, 0, 0, 3500) # canvas[2,0] + assert result[3] == (3000, 0, 0, 3500) # canvas[3,0] + assert result[4] == (8000, 0, 0, 3500) # canvas[0,1] + assert result[5] == (9000, 0, 0, 3500) # canvas[1,1] + assert result[6] == (10000, 0, 0, 3500) # canvas[2,1] + assert result[7] == (11000, 0, 0, 3500) # canvas[3,1] + + # Tile 1 (next 8 pixels) + assert result[8] == (4000, 0, 0, 3500) # canvas[4,0] + assert result[9] == (5000, 0, 0, 3500) # canvas[5,0] + assert result[10] == (6000, 0, 0, 3500) # canvas[6,0] + assert result[11] == (7000, 0, 0, 3500) # canvas[7,0] + assert result[12] == (12000, 0, 0, 3500) # canvas[4,1] + assert result[13] == (13000, 0, 0, 3500) # canvas[5,1] + assert result[14] == (14000, 0, 0, 3500) # canvas[6,1] + assert result[15] == (15000, 0, 0, 3500) # canvas[7,1] + + def test_apply_canvas_with_orientation(self) -> None: + """Test applying canvas with tile orientation.""" + # Single 2x2 tile with 180 degree rotation + # LUT for 180 rotation: [3, 2, 1, 0] + lut = (3, 2, 1, 0) + regions = [TileRegion(x=0, y=0, width=2, height=2, orientation_lut=lut)] + fb = FrameBuffer( + pixel_count=4, + canvas_width=2, + canvas_height=2, + tile_regions=regions, + ) + + # Canvas: + # 0 1 + # 2 3 + canvas: list[tuple[int, int, int, int]] = [ + (0, 0, 0, 3500), + (1000, 0, 0, 3500), + (2000, 0, 0, 3500), + (3000, 0, 0, 3500), + ] + + result = fb.apply(canvas) + + # After 180 rotation, output should be: + # Position 0 gets canvas[lut[0]] = canvas[3] + # Position 1 gets canvas[lut[1]] = canvas[2] + # etc. + assert result[0] == (3000, 0, 0, 3500) + assert result[1] == (2000, 0, 0, 3500) + assert result[2] == (1000, 0, 0, 3500) + assert result[3] == (0, 0, 0, 3500) + + def test_apply_canvas_wrong_length_raises(self) -> None: + """Test that wrong canvas length raises error.""" + regions = [TileRegion(x=0, y=0, width=8, height=8)] + fb = FrameBuffer( + pixel_count=64, + canvas_width=8, + canvas_height=8, + tile_regions=regions, + ) + + wrong_length: list[tuple[int, int, int, int]] = [(0, 0, 0, 3500)] * 32 + + with pytest.raises(ValueError, match="must match canvas_size"): + fb.apply(wrong_length) + + @pytest.mark.asyncio + async def test_for_matrix_multi_tile_chain(self) -> None: + """Test for_matrix with multiple tiles creates canvas.""" + # Create 3 tiles arranged horizontally + tile1 = MagicMock() + tile1.width = 8 + tile1.height = 8 + tile1.user_x = 0.0 + tile1.user_y = 0.0 + tile1.nearest_orientation = "Upright" + + tile2 = MagicMock() + tile2.width = 8 + tile2.height = 8 + tile2.user_x = 1.0 # 1 tile width to the right + tile2.user_y = 0.0 + tile2.nearest_orientation = "Upright" + + tile3 = MagicMock() + tile3.width = 8 + tile3.height = 8 + tile3.user_x = 2.0 # 2 tile widths to the right + tile3.user_y = 0.0 + tile3.nearest_orientation = "Upright" + + device = MagicMock() + device.device_chain = [tile1, tile2, tile3] + device.get_device_chain = AsyncMock(return_value=[tile1, tile2, tile3]) + device.capabilities = MagicMock() + device.capabilities.has_chain = True + + fb = await FrameBuffer.for_matrix(device) + + # Canvas should span all 3 tiles horizontally + assert fb.canvas_width == 24 # 3 * 8 + assert fb.canvas_height == 8 + assert fb.canvas_size == 192 # 24 * 8 + assert fb.tile_regions is not None + assert len(fb.tile_regions) == 3 + + # Check tile positions + assert fb.tile_regions[0].x == 0 + assert fb.tile_regions[1].x == 8 + assert fb.tile_regions[2].x == 16 + + @pytest.mark.asyncio + async def test_for_matrix_five_tile_chain(self) -> None: + """Test for_matrix with 5 tiles (original LIFX Tile configuration).""" + tiles = [] + for i in range(5): + tile = MagicMock() + tile.width = 8 + tile.height = 8 + tile.user_x = float(i) # 0.0, 1.0, 2.0, 3.0, 4.0 + tile.user_y = 0.0 + tile.nearest_orientation = "Upright" + tiles.append(tile) + + device = MagicMock() + device.device_chain = tiles + device.get_device_chain = AsyncMock(return_value=tiles) + device.capabilities = MagicMock() + device.capabilities.has_chain = True + + fb = await FrameBuffer.for_matrix(device) + + assert fb.canvas_width == 40 # 5 * 8 + assert fb.canvas_height == 8 + assert fb.canvas_size == 320 # 40 * 8 + assert fb.tile_regions is not None + assert len(fb.tile_regions) == 5 diff --git a/tests/test_animation/test_orientation.py b/tests/test_animation/test_orientation.py new file mode 100644 index 0000000..7c6204b --- /dev/null +++ b/tests/test_animation/test_orientation.py @@ -0,0 +1,167 @@ +"""Tests for tile orientation remapping.""" + +from __future__ import annotations + +import pytest + +from lifx.animation.orientation import ( + Orientation, + build_orientation_lut, +) + + +class TestOrientation: + """Tests for Orientation enum.""" + + def test_from_string_upright(self) -> None: + """Test converting 'Upright' string.""" + assert Orientation.from_string("Upright") == Orientation.RIGHT_SIDE_UP + + def test_from_string_rotated_right(self) -> None: + """Test converting 'RotatedRight' string.""" + assert Orientation.from_string("RotatedRight") == Orientation.ROTATED_90 + + def test_from_string_upside_down(self) -> None: + """Test converting 'UpsideDown' string.""" + assert Orientation.from_string("UpsideDown") == Orientation.ROTATED_180 + + def test_from_string_rotated_left(self) -> None: + """Test converting 'RotatedLeft' string.""" + assert Orientation.from_string("RotatedLeft") == Orientation.ROTATED_270 + + def test_from_string_face_up(self) -> None: + """Test converting 'FaceUp' string.""" + assert Orientation.from_string("FaceUp") == Orientation.FACE_UP + + def test_from_string_face_down(self) -> None: + """Test converting 'FaceDown' string.""" + assert Orientation.from_string("FaceDown") == Orientation.FACE_DOWN + + def test_from_string_unknown_raises(self) -> None: + """Test that unknown orientation string raises ValueError.""" + with pytest.raises(ValueError, match="Unknown orientation"): + Orientation.from_string("InvalidOrientation") + + +class TestBuildOrientationLut: + """Tests for build_orientation_lut function.""" + + def test_right_side_up_identity(self) -> None: + """Test that RIGHT_SIDE_UP produces identity mapping.""" + lut = build_orientation_lut(4, 4, Orientation.RIGHT_SIDE_UP) + # Should be 0, 1, 2, 3, 4, 5, ... 15 + assert lut == tuple(range(16)) + + def test_rotated_180_reverses(self) -> None: + """Test that ROTATED_180 reverses the pixel order.""" + lut = build_orientation_lut(4, 4, Orientation.ROTATED_180) + # First physical position should map to last framebuffer index + assert lut[0] == 15 + # Last physical position should map to first framebuffer index + assert lut[15] == 0 + + def test_rotated_90_corners(self) -> None: + """Test ROTATED_90 corner mappings for 4x4 grid.""" + lut = build_orientation_lut(4, 4, Orientation.ROTATED_90) + # For 90 degree clockwise rotation: + # Physical (0,0) -> Source (3,0) = index 3 + assert lut[0] == 3 + # Physical (3,0) -> Source (3,3) = index 15 + assert lut[3] == 15 + # Physical (0,3) -> Source (0,0) = index 0 + assert lut[12] == 0 + # Physical (3,3) -> Source (0,3) = index 12 + assert lut[15] == 12 + + def test_rotated_270_corners(self) -> None: + """Test ROTATED_270 corner mappings for 4x4 grid.""" + lut = build_orientation_lut(4, 4, Orientation.ROTATED_270) + # For 270 degree (90 counter-clockwise) rotation: + # Physical (0,0) -> Source (0,3) = index 12 + assert lut[0] == 12 + # Physical (3,0) -> Source (0,0) = index 0 + assert lut[3] == 0 + # Physical (0,3) -> Source (3,3) = index 15 + assert lut[12] == 15 + # Physical (3,3) -> Source (3,0) = index 3 + assert lut[15] == 3 + + def test_face_up_same_as_upright(self) -> None: + """Test that FACE_UP is treated same as RIGHT_SIDE_UP.""" + lut_face_up = build_orientation_lut(4, 4, Orientation.FACE_UP) + lut_upright = build_orientation_lut(4, 4, Orientation.RIGHT_SIDE_UP) + assert lut_face_up == lut_upright + + def test_face_down_same_as_upright(self) -> None: + """Test that FACE_DOWN is treated same as RIGHT_SIDE_UP.""" + lut_face_down = build_orientation_lut(4, 4, Orientation.FACE_DOWN) + lut_upright = build_orientation_lut(4, 4, Orientation.RIGHT_SIDE_UP) + assert lut_face_down == lut_upright + + def test_8x8_tile_size(self) -> None: + """Test LUT for standard 8x8 tile.""" + lut = build_orientation_lut(8, 8, Orientation.RIGHT_SIDE_UP) + assert len(lut) == 64 + + def test_rectangular_tile(self) -> None: + """Test LUT for non-square tile (16x8).""" + lut = build_orientation_lut(16, 8, Orientation.RIGHT_SIDE_UP) + assert len(lut) == 128 + + def test_lru_cache_works(self) -> None: + """Test that repeated calls return cached results.""" + # Call twice with same args + lut1 = build_orientation_lut(8, 8, Orientation.RIGHT_SIDE_UP) + lut2 = build_orientation_lut(8, 8, Orientation.RIGHT_SIDE_UP) + # Should be the exact same object (cached) + assert lut1 is lut2 + + def test_lru_cache_different_args(self) -> None: + """Test that different args return different results.""" + lut1 = build_orientation_lut(8, 8, Orientation.RIGHT_SIDE_UP) + lut2 = build_orientation_lut(8, 8, Orientation.ROTATED_180) + # Should be different objects + assert lut1 is not lut2 + assert lut1 != lut2 + + def test_bijective_mapping(self) -> None: + """Test that LUT is a bijection (one-to-one mapping).""" + for orientation in Orientation: + lut = build_orientation_lut(8, 8, orientation) + # All indices should be unique (bijective) + assert len(set(lut)) == 64 + # All indices should be in valid range + assert all(0 <= i < 64 for i in lut) + + def test_rotated_90_non_square_falls_back_to_identity(self) -> None: + """Test ROTATED_90 on non-square tile falls back to identity. + + Non-square tiles cannot be rotated 90/270 degrees without changing + dimensions, so we fall back to identity transformation. + """ + lut = build_orientation_lut(16, 8, Orientation.ROTATED_90) + # Should be identity since 90 rotation isn't valid for non-square + assert lut == tuple(range(128)) + + def test_rotated_270_non_square_falls_back_to_identity(self) -> None: + """Test ROTATED_270 on non-square tile falls back to identity. + + Non-square tiles cannot be rotated 90/270 degrees without changing + dimensions, so we fall back to identity transformation. + """ + lut = build_orientation_lut(16, 8, Orientation.ROTATED_270) + # Should be identity since 270 rotation isn't valid for non-square + assert lut == tuple(range(128)) + + def test_rotated_180_works_for_non_square(self) -> None: + """Test ROTATED_180 works correctly for non-square tiles. + + 180 degree rotation is valid for any tile dimensions. + """ + lut = build_orientation_lut(4, 2, Orientation.ROTATED_180) + # 4x2 = 8 pixels, reversed + assert len(lut) == 8 + # First physical position maps to last framebuffer index + assert lut[0] == 7 + # Last physical position maps to first framebuffer index + assert lut[7] == 0 diff --git a/tests/test_animation/test_packets.py b/tests/test_animation/test_packets.py new file mode 100644 index 0000000..76094bf --- /dev/null +++ b/tests/test_animation/test_packets.py @@ -0,0 +1,504 @@ +"""Tests for packet generators.""" + +from __future__ import annotations + +import struct + +import pytest + +from lifx.animation.packets import ( + HEADER_SIZE, + MatrixPacketGenerator, + MultiZonePacketGenerator, + PacketTemplate, +) + +# Test source and target for templates +TEST_SOURCE = 12345 +TEST_TARGET = b"\xd0\x73\xd5\x12\x34\x56" + + +def get_payload(template: PacketTemplate) -> bytes: + """Extract payload bytes from a packet template.""" + return bytes(template.data[HEADER_SIZE:]) + + +class TestPacketTemplate: + """Tests for PacketTemplate class.""" + + def test_template_structure(self) -> None: + """Test PacketTemplate has expected fields.""" + data = bytearray(100) + tmpl = PacketTemplate( + data=data, + color_offset=46, + color_count=64, + hsbk_start=0, + ) + assert tmpl.data is data + assert tmpl.color_offset == 46 + assert tmpl.color_count == 64 + assert tmpl.hsbk_start == 0 + + +class TestMatrixPacketGenerator: + """Tests for MatrixPacketGenerator.""" + + # Set64 packet type + SET64_PKT_TYPE = 715 + + def test_pixel_count(self) -> None: + """Test pixel_count returns correct value.""" + gen = MatrixPacketGenerator(tile_count=2, tile_width=8, tile_height=8) + assert gen.pixel_count() == 128 + + def test_create_templates_single_tile(self) -> None: + """Test template creation for single tile.""" + gen = MatrixPacketGenerator(tile_count=1, tile_width=8, tile_height=8) + templates = gen.create_templates(TEST_SOURCE, TEST_TARGET) + + assert len(templates) == 1 + assert isinstance(templates[0], PacketTemplate) + + # Check tile_index in payload (offset 0) + payload = get_payload(templates[0]) + assert payload[0] == 0 # tile_index + + # Check fb_index in rect (offset 2) + assert payload[2] == 0 # fb_index - direct to display buffer + + def test_create_templates_all_tiles(self) -> None: + """Test template creation for all tiles.""" + gen = MatrixPacketGenerator(tile_count=3, tile_width=8, tile_height=8) + templates = gen.create_templates(TEST_SOURCE, TEST_TARGET) + + assert len(templates) == 3 + # Check tile_index in each payload + assert get_payload(templates[0])[0] == 0 + assert get_payload(templates[1])[0] == 1 + assert get_payload(templates[2])[0] == 2 + + def test_update_colors_invalid_length(self) -> None: + """Test that wrong data length raises error.""" + gen = MatrixPacketGenerator(tile_count=1, tile_width=8, tile_height=8) + templates = gen.create_templates(TEST_SOURCE, TEST_TARGET) + hsbk: list[tuple[int, int, int, int]] = [(100, 100, 100, 3500)] * 32 + + with pytest.raises(IndexError): + gen.update_colors(templates, hsbk) + + def test_update_colors_packed(self) -> None: + """Test that HSBK values are correctly packed into packet bytes.""" + gen = MatrixPacketGenerator(tile_count=1, tile_width=8, tile_height=8) + templates = gen.create_templates(TEST_SOURCE, TEST_TARGET) + # Use distinct values to verify conversion + hsbk: list[tuple[int, int, int, int]] = [(65535, 32768, 16384, 4000)] + [ + (0, 0, 0, 3500) + ] * 63 + + gen.update_colors(templates, hsbk) + + payload = get_payload(templates[0]) + + # Colors start at offset 10, each color is 8 bytes (H, S, B, K as uint16) + h, s, b, k = struct.unpack_from(" None: + """Test that duration is always 0 for instant animation updates.""" + gen = MatrixPacketGenerator(tile_count=1, tile_width=8, tile_height=8) + templates = gen.create_templates(TEST_SOURCE, TEST_TARGET) + + payload = get_payload(templates[0]) + + # Duration is at offset 6 (uint32) - should always be 0 + (duration,) = struct.unpack_from(" None: + """Test that payload has correct size (522 bytes).""" + gen = MatrixPacketGenerator(tile_count=1, tile_width=8, tile_height=8) + templates = gen.create_templates(TEST_SOURCE, TEST_TARGET) + + payload = get_payload(templates[0]) + assert len(payload) == 522 + + def test_is_large_tile_standard(self) -> None: + """Test that 8x8 tile is not detected as large.""" + gen = MatrixPacketGenerator(tile_count=1, tile_width=8, tile_height=8) + assert gen.is_large_tile is False + assert gen.packets_per_tile == 1 + + def test_is_large_tile_ceiling(self) -> None: + """Test that 16x8 tile (128 pixels) is detected as large.""" + gen = MatrixPacketGenerator(tile_count=1, tile_width=16, tile_height=8) + assert gen.is_large_tile is True + assert gen.packets_per_tile == 2 # 128 / 64 = 2 + + def test_is_large_tile_very_large(self) -> None: + """Test that very large tile (32x16=512 pixels) is detected.""" + gen = MatrixPacketGenerator(tile_count=1, tile_width=32, tile_height=16) + assert gen.is_large_tile is True + assert gen.packets_per_tile == 8 # 512 / 64 = 8 + + def test_header_contains_source(self) -> None: + """Test that prebaked header contains the source ID.""" + gen = MatrixPacketGenerator(tile_count=1, tile_width=8, tile_height=8) + templates = gen.create_templates(TEST_SOURCE, TEST_TARGET) + + # Source is at offset 4 in header (uint32) + (source,) = struct.unpack_from(" None: + """Test that prebaked header contains the target serial.""" + gen = MatrixPacketGenerator(tile_count=1, tile_width=8, tile_height=8) + templates = gen.create_templates(TEST_SOURCE, TEST_TARGET) + + # Target is at offset 8 in header (8 bytes, little-endian) + target_bytes = templates[0].data[8:14] # First 6 bytes of target + assert target_bytes == TEST_TARGET + + def test_header_packet_type(self) -> None: + """Test that prebaked header contains correct packet type.""" + gen = MatrixPacketGenerator(tile_count=1, tile_width=8, tile_height=8) + templates = gen.create_templates(TEST_SOURCE, TEST_TARGET) + + # Packet type is at offset 32 in header (uint16) + (pkt_type,) = struct.unpack_from("64 pixels).""" + + # Packet types + SET64_PKT_TYPE = 715 + COPY_FB_PKT_TYPE = 716 + + def test_pixel_count_large_tile(self) -> None: + """Test pixel_count for large tile (16x8 = 128 pixels).""" + gen = MatrixPacketGenerator(tile_count=1, tile_width=16, tile_height=8) + assert gen.pixel_count() == 128 + + def test_create_templates_large_tile_count(self) -> None: + """Test that large tile creates Set64 + CopyFrameBuffer templates.""" + gen = MatrixPacketGenerator(tile_count=1, tile_width=16, tile_height=8) + templates = gen.create_templates(TEST_SOURCE, TEST_TARGET) + + # Should have 2 Set64 templates + 1 CopyFrameBuffer + assert len(templates) == 3 + + # Check packet types in headers + assert struct.unpack_from(" None: + """Test that Set64 templates write to fb_index=1 for large tiles.""" + gen = MatrixPacketGenerator(tile_count=1, tile_width=16, tile_height=8) + templates = gen.create_templates(TEST_SOURCE, TEST_TARGET) + + # Both Set64 templates should have fb_index=1 (offset 2 in payload) + assert get_payload(templates[0])[2] == 1 # fb_index + assert get_payload(templates[1])[2] == 1 # fb_index + + def test_create_templates_large_tile_y_offsets(self) -> None: + """Test that Set64 templates have correct y offsets.""" + gen = MatrixPacketGenerator(tile_count=1, tile_width=16, tile_height=8) + templates = gen.create_templates(TEST_SOURCE, TEST_TARGET) + + # First template: y=0 (first 4 rows of 16 pixels = 64 pixels) + # Second template: y=4 (next 4 rows) + assert get_payload(templates[0])[4] == 0 # y offset + assert ( + get_payload(templates[1])[4] == 4 + ) # y offset (64 pixels / 16 width = 4 rows) + + def test_create_templates_large_tile_copy_fb_structure(self) -> None: + """Test CopyFrameBuffer template structure.""" + gen = MatrixPacketGenerator(tile_count=1, tile_width=16, tile_height=8) + templates = gen.create_templates(TEST_SOURCE, TEST_TARGET) + + copy_template = templates[2] + payload = get_payload(copy_template) + + # Verify CopyFrameBuffer payload structure (15 bytes) + assert len(payload) == 15 + + # Offset 0: tile_index + assert payload[0] == 0 + # Offset 1: length = 1 + assert payload[1] == 1 + # Offset 2: src_fb_index = 1 + assert payload[2] == 1 + # Offset 3: dst_fb_index = 0 + assert payload[3] == 0 + # Offset 4-7: src_x, src_y, dst_x, dst_y = 0 + assert payload[4:8] == bytes([0, 0, 0, 0]) + # Offset 8: width + assert payload[8] == 16 + # Offset 9: height + assert payload[9] == 8 + # Offset 10-13: duration = 0 (instant for animation) + (duration,) = struct.unpack_from(" None: + """Test that all templates (Set64 and CopyFrameBuffer) have duration=0.""" + gen = MatrixPacketGenerator(tile_count=1, tile_width=16, tile_height=8) + templates = gen.create_templates(TEST_SOURCE, TEST_TARGET) + + # Set64 templates should have duration=0 (offset 6 in payload) + (duration1,) = struct.unpack_from(" None: + """Test that colors are correctly distributed across templates.""" + gen = MatrixPacketGenerator(tile_count=1, tile_width=16, tile_height=8) + templates = gen.create_templates(TEST_SOURCE, TEST_TARGET) + # First 64 pixels red, next 64 blue + hsbk: list[tuple[int, int, int, int]] = [(0, 65535, 65535, 3500)] * 64 + [ + (43690, 65535, 65535, 3500) + ] * 64 + + gen.update_colors(templates, hsbk) + + # First template should have red + h1, s1, b1, k1 = struct.unpack_from(" None: + """Test that all large tiles create correct templates.""" + gen = MatrixPacketGenerator(tile_count=2, tile_width=16, tile_height=8) + templates = gen.create_templates(TEST_SOURCE, TEST_TARGET) + + # 2 tiles x (2 Set64 + 1 CopyFrameBuffer) = 6 templates + assert len(templates) == 6 + + # Verify packet types in order + assert struct.unpack_from(" None: + """Test that CopyFrameBuffer template has color_count=0.""" + gen = MatrixPacketGenerator(tile_count=1, tile_width=16, tile_height=8) + templates = gen.create_templates(TEST_SOURCE, TEST_TARGET) + + # CopyFrameBuffer is the last template + copy_template = templates[2] + assert copy_template.color_count == 0 + + def test_update_colors_skips_copy_fb_templates(self) -> None: + """Test that update_colors skips CopyFrameBuffer templates (color_count=0).""" + gen = MatrixPacketGenerator(tile_count=1, tile_width=16, tile_height=8) + templates = gen.create_templates(TEST_SOURCE, TEST_TARGET) + + # Create HSBK data for all 128 pixels + hsbk: list[tuple[int, int, int, int]] = [(65535, 65535, 65535, 3500)] * 128 + + # This should not raise an error - CopyFrameBuffer templates are skipped + gen.update_colors(templates, hsbk) + + # Verify Set64 templates were updated + h1 = struct.unpack_from(" None: + """Test pixel_count returns zone count.""" + gen = MultiZonePacketGenerator(zone_count=82) + assert gen.pixel_count() == 82 + + def test_create_templates_single_packet(self) -> None: + """Test template creation for <=82 zones.""" + gen = MultiZonePacketGenerator(zone_count=82) + templates = gen.create_templates(TEST_SOURCE, TEST_TARGET) + + assert len(templates) == 1 + assert isinstance(templates[0], PacketTemplate) + + # Check packet type in header + (pkt_type,) = struct.unpack_from(" None: + """Test that wrong data length raises error.""" + gen = MultiZonePacketGenerator(zone_count=82) + templates = gen.create_templates(TEST_SOURCE, TEST_TARGET) + hsbk: list[tuple[int, int, int, int]] = [(100, 100, 100, 3500)] * 40 + + with pytest.raises(IndexError): + gen.update_colors(templates, hsbk) + + def test_update_colors_packed(self) -> None: + """Test that HSBK values are correctly packed into packet bytes.""" + gen = MultiZonePacketGenerator(zone_count=82) + templates = gen.create_templates(TEST_SOURCE, TEST_TARGET) + # Use distinct values for first zone + hsbk: list[tuple[int, int, int, int]] = [(65535, 32768, 16384, 4000)] + [ + (0, 0, 0, 3500) + ] * 81 + + gen.update_colors(templates, hsbk) + payload = get_payload(templates[0]) + + # Colors start at offset 8, each color is 8 bytes + h, s, b, k = struct.unpack_from(" None: + """Test that duration is always 0 for instant animation updates.""" + gen = MultiZonePacketGenerator(zone_count=82) + templates = gen.create_templates(TEST_SOURCE, TEST_TARGET) + + payload = get_payload(templates[0]) + + # Duration is at offset 0 (uint32) - should always be 0 + (duration,) = struct.unpack_from(" None: + """Test that apply field is set to APPLY (1).""" + gen = MultiZonePacketGenerator(zone_count=82) + templates = gen.create_templates(TEST_SOURCE, TEST_TARGET) + + payload = get_payload(templates[0]) + + # Apply is at offset 4 (uint8) + assert payload[4] == 1 # APPLY + + def test_payload_size(self) -> None: + """Test that payload has correct size (664 bytes).""" + gen = MultiZonePacketGenerator(zone_count=82) + templates = gen.create_templates(TEST_SOURCE, TEST_TARGET) + + payload = get_payload(templates[0]) + assert len(payload) == 664 + + +class TestMultiZonePacketGeneratorLargeZones: + """Tests for MultiZonePacketGenerator with >82 zones.""" + + SET_EXTENDED_PKT_TYPE = 510 + + def test_create_templates_120_zones_needs_two_packets(self) -> None: + """Test that 120 zones (Outdoor Neon Flex) creates 2 templates.""" + gen = MultiZonePacketGenerator(zone_count=120) + templates = gen.create_templates(TEST_SOURCE, TEST_TARGET) + + assert len(templates) == 2 + assert ( + struct.unpack_from(" None: + """Test that zone indices are correct for 120 zones.""" + gen = MultiZonePacketGenerator(zone_count=120) + templates = gen.create_templates(TEST_SOURCE, TEST_TARGET) + + # First template: zones 0-81 (82 zones) + # zone_index at offset 5-6 (uint16) + zone_idx1 = struct.unpack_from(" None: + """Test that exactly 164 zones (2x82) needs 2 templates.""" + gen = MultiZonePacketGenerator(zone_count=164) + templates = gen.create_templates(TEST_SOURCE, TEST_TARGET) + + assert len(templates) == 2 + # Both templates should have 82 zones + assert get_payload(templates[0])[7] == 82 + assert get_payload(templates[1])[7] == 82 + + def test_create_templates_165_zones_needs_three_packets(self) -> None: + """Test that 165 zones needs 3 templates.""" + gen = MultiZonePacketGenerator(zone_count=165) + templates = gen.create_templates(TEST_SOURCE, TEST_TARGET) + + assert len(templates) == 3 + assert get_payload(templates[0])[7] == 82 # First 82 + assert get_payload(templates[1])[7] == 82 # Next 82 + assert get_payload(templates[2])[7] == 1 # Last 1 + + def test_update_colors_large_zone(self) -> None: + """Test that colors are correctly split across templates for 120 zones.""" + gen = MultiZonePacketGenerator(zone_count=120) + templates = gen.create_templates(TEST_SOURCE, TEST_TARGET) + # First 82 zones red, remaining 38 zones blue + hsbk: list[tuple[int, int, int, int]] = [(0, 65535, 65535, 3500)] * 82 + [ + (43690, 65535, 65535, 3500) + ] * 38 + + gen.update_colors(templates, hsbk) + + # First template should have red + h1 = struct.unpack_from("