diff --git a/parol6/PAROL6_ROBOT.py b/parol6/PAROL6_ROBOT.py index 28b1ede..bcf94de 100644 --- a/parol6/PAROL6_ROBOT.py +++ b/parol6/PAROL6_ROBOT.py @@ -14,28 +14,18 @@ logger = logging.getLogger(__name__) -# ----------------------------- -# Typing aliases -# ----------------------------- Vec6f = NDArray[np.float64] Vec6i = NDArray[np.int32] Limits2f = NDArray[np.float64] # shape (6,2) -# ----------------------------- -# Kinematics and conversion constants -# ----------------------------- Microstep = 32 steps_per_revolution = 200 -# Conversion constants degree_per_step_constant: float = 360.0 / (Microstep * steps_per_revolution) radian_per_step_constant: float = (2.0 * np.pi) / (Microstep * steps_per_revolution) radian_per_sec_2_deg_per_sec_const: float = 360.0 / (2.0 * np.pi) deg_per_sec_2_radian_per_sec_const: float = (2.0 * np.pi) / 360.0 -# ----------------------------- -# Joint limits -# ----------------------------- # Limits (deg) you get after homing and moving to extremes _joint_limits_degree: Limits2f = np.array( [ @@ -52,12 +42,12 @@ _joint_limits_radian: Limits2f = np.deg2rad(_joint_limits_degree) -# URDF path for pinokin Robot +# URDF consumed by the pinokin Robot below. _urdf_path = str( Path(__file__).resolve().parent / "urdf_model" / "urdf" / "PAROL6.urdf" ) -# Current robot instance (tool transform applied in-place) +# Tool transform is applied in-place on this shared instance. robot: Robot = Robot(_urdf_path) @@ -66,17 +56,10 @@ def apply_tool( variant_key: str = "", tcp_offset_m: tuple[float, float, float] | None = None, ) -> None: - """ - Apply tool transform to the robot model. - - Parameters - ---------- - tool_name : str - Name of the tool from the tool registry - variant_key : str - Optional variant key for the tool - tcp_offset_m : tuple, optional - Additional (x, y, z) offset in meters, composed in the tool's local frame. + """Apply tool transform to the robot model. + + ``tcp_offset_m`` is an additional (x, y, z) offset in meters, composed in + the tool's local frame. """ T_tool = get_tool_transform(tool_name, variant_key=variant_key or None) @@ -96,7 +79,6 @@ def apply_tool( logger.info(f"Applied tool {label} (identity)") -# Initialize with no tool apply_tool("NONE") @@ -106,9 +88,6 @@ def _cleanup_robot() -> None: del robot -# ----------------------------- -# Additional raw parameter arrays -# ----------------------------- # Reduction ratio per joint _joint_ratio: NDArray[np.float64] = np.array( [6.4, 20.0, 20.0 * (38.0 / 42.0), 4.0, 4.0, 10.0], dtype=np.float64 @@ -120,10 +99,9 @@ def _cleanup_robot() -> None: ) _joint_min_speed: Vec6i = np.array([100, 100, 100, 100, 100, 100], dtype=np.int32) -# Effective max speeds with scaling applied _joint_max_speed: Vec6i = _joint_max_speed_hw.copy() -# Jog speeds (steps/s) - 80% of scaled max for safety margin during jogging +# 80% of scaled max for safety margin during jogging _joint_max_jog_speed: Vec6i = (_joint_max_speed * 0.8).astype(np.int32) _joint_min_jog_speed: Vec6i = np.array([100, 100, 100, 100, 100, 100], dtype=np.int32) @@ -135,7 +113,6 @@ def _cleanup_robot() -> None: # Derived: j_max = a_max * 10 (reach max accel in ~0.1s) _joint_max_jerk: Vec6i = (_joint_max_acc * 10).astype(np.int32) -# Compute joint angular velocities/accelerations in rad/s _joint_speed_rad = ( _joint_max_speed.astype(float) * radian_per_step_constant / _joint_ratio ) diff --git a/parol6/ack_policy.py b/parol6/ack_policy.py index e8512a7..99136ff 100644 --- a/parol6/ack_policy.py +++ b/parol6/ack_policy.py @@ -90,15 +90,14 @@ def __init__( def requires_ack(self, cmd_type: CmdType) -> bool: """Check if a command type requires an ACK response.""" - # Forced override (e.g., diagnostics) + # Forced override (e.g., diagnostics) takes precedence over everything if self._force_ack is not None: return bool(self._force_ack) - # System commands always require ACKs if cmd_type in SYSTEM_CMD_TYPES: return True - # Query commands use request/response, not ACKs + # Queries use request/response, not ACKs if cmd_type in QUERY_CMD_TYPES: return False @@ -106,9 +105,8 @@ def requires_ack(self, cmd_type: CmdType) -> bool: if cmd_type in FIRE_AND_FORGET: return False - # Queued motion commands require ACK (returns command index) + # Queued motion commands ACK to return the command index if cmd_type in QUEUED_CMD_TYPES: return True - # Default: no ACK return False diff --git a/parol6/client/async_client.py b/parol6/client/async_client.py index ec66551..2f0f1be 100644 --- a/parol6/client/async_client.py +++ b/parol6/client/async_client.py @@ -120,7 +120,7 @@ def datagram_received(self, data: bytes, addr: tuple[str, int]) -> None: try: self.rx_queue.put_nowait((data, addr)) except asyncio.QueueFull: - pass # Drop packet when queue is full (expected under load) + pass # Dropping under backpressure is expected def error_received(self, exc: Exception) -> None: pass @@ -133,7 +133,6 @@ def _create_multicast_socket(group: str, port: int, iface_ip: str) -> socket.soc """Create and configure a multicast socket with loopback-first semantics.""" sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) - # Allow multiple listeners on same port sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) try: sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) @@ -141,13 +140,11 @@ def _create_multicast_socket(group: str, port: int, iface_ip: str) -> socket.soc pass sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1 << 20) - # Bind to port try: sock.bind(("", port)) except OSError: sock.bind((iface_ip, port)) - # Helper to detect primary NIC IP def _detect_primary_ip() -> str: tmp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: @@ -159,7 +156,7 @@ def _detect_primary_ip() -> str: with contextlib.suppress(Exception): tmp.close() - # Join multicast group with fallbacks + # Join the multicast group, falling back through the primary NIC then INADDR_ANY. try: mreq = struct.pack("=4s4s", socket.inet_aton(group), socket.inet_aton(iface_ip)) sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) @@ -221,7 +218,7 @@ def datagram_received(self, data: bytes, addr: tuple[str, int]) -> None: # Zero-allocation decode directly into shared buffer if decode_status_bin_into(data, self._client._shared_status): self._client._status_generation += 1 - # Event.set() is synchronous and wakes all waiters + # Event.set() is synchronous, so it's safe to wake waiters from this callback self._client._status_event.set() def error_received(self, exc: Exception) -> None: @@ -246,7 +243,7 @@ def __init__( timeout: float = 1.0, retries: int = 1, ) -> None: - # Endpoint configuration (host/port immutable after endpoint creation) + # host/port are immutable after endpoint creation self._host = host self._port = port self.timeout = timeout @@ -259,7 +256,6 @@ def __init__( # Pre-allocated TX buffer for fire-and-forget command encoding self._tx_buf = bytearray(256) - # Persistent asyncio datagram endpoint self._transport: asyncio.DatagramTransport | None = None self._protocol: _UDPClientProtocol | None = None self._rx_queue: asyncio.Queue[tuple[bytes, tuple[str, int]]] = asyncio.Queue( @@ -267,13 +263,12 @@ def __init__( ) self._ep_lock = asyncio.Lock() - # Serialize request/response + # Serializes request/response so concurrent queries don't steal each other's replies self._req_lock = asyncio.Lock() - # ACK policy (category-based, no stream_mode dependency) self._ack_policy = AckPolicy() - # Shared status state (single buffer, event-based notification) + # Single shared buffer with event-based notification self._status_transport: asyncio.DatagramTransport | None = None self._status_sock: socket.socket | None = None self._shared_status: StatusBuffer = StatusBuffer() @@ -283,18 +278,16 @@ def __init__( # Last command index returned by server for queued commands self._last_command_index: int | None = None - # Active tool key (set by select_tool) self._active_tool_key: str | None = None self._active_variant_key: str = "" - # Bound tool specs. Populated from the parol6 tool registry so that + # Populated from the parol6 tool registry so that # `from parol6 import AsyncRobotClient; c = AsyncRobotClient(...)` works # without going through Robot.create_async_client(). The Robot factory # rebinds these from its own tools.available, which is the same source. self._bound_tools: dict[str, ToolSpec] = {} self._bind_default_tools() - # Lifecycle flag self._closed: bool = False def _bind_default_tools(self) -> None: @@ -519,7 +512,6 @@ async def stream_status_shared(self) -> AsyncIterator[StatusBuffer]: yield self._shared_status continue - # Wait for next update await self._status_event.wait() if self._closed: @@ -539,12 +531,11 @@ async def _send(self, cmd: msgspec.Struct) -> int: await self._ensure_endpoint() assert self._transport is not None - # Get command type from struct's tag cmd_type = STRUCT_TO_CMDTYPE.get(type(cmd)) if cmd_type is None: return 0 - # System commands: need stable bytes across await + # System commands need stable bytes across the await, so encode a fresh buffer if cmd_type in SYSTEM_CMD_TYPES: try: await self._request_ok_raw(encode_command(cmd), self.timeout) @@ -552,7 +543,6 @@ async def _send(self, cmd: msgspec.Struct) -> int: except TimeoutError: return 0 - # Motion and other non-query commands if cmd_type not in QUERY_CMD_TYPES: if self._ack_policy.requires_ack(cmd_type): try: @@ -561,12 +551,11 @@ async def _send(self, cmd: msgspec.Struct) -> int: return ok.index if ok.index is not None else 0 except TimeoutError: return -1 - # Fire-and-forget: reuse pre-allocated buffer (sendto copies) + # Fire-and-forget: safe to reuse the shared buffer since sendto copies it encode_command_into(cmd, self._tx_buf) self._transport.sendto(self._tx_buf) return 1 - # Queries via _send: fire-and-forget encode_command_into(cmd, self._tx_buf) self._transport.sendto(self._tx_buf) return 1 @@ -1230,7 +1219,6 @@ async def wait_status( logger.debug("Status predicate raised", exc_info=True) continue - # Wait for next update with timeout remaining = max(0.0, end_time - time.monotonic()) if remaining <= 0: return False diff --git a/parol6/client/dry_run_client.py b/parol6/client/dry_run_client.py index 2ff61b1..a90968f 100644 --- a/parol6/client/dry_run_client.py +++ b/parol6/client/dry_run_client.py @@ -265,7 +265,6 @@ def _dispatch(self, params: Any) -> DryRunResult | None: # to the planner which handles them as inline segments. cmd_cls = self._registry.get_command_for_struct(type(params)) if cmd_cls is not None and issubclass(cmd_cls, (JogJCommand, JogLCommand)): - # Flush blend buffer, sync state, simulate jog self._planner.flush() self._state.Position_in[:] = self._planner.state.Position_in cmd = cmd_cls(params) @@ -434,7 +433,6 @@ def _simulate_cartesian_jog(self, cmd: JogLCommand) -> DryRunResult | None: max(2, int(duration * CONTROL_RATE_HZ)), ) - # Current pose via FK current_se3 = get_fkine_se3(self._state) se3_rpy(current_se3, self._rpy_buf) # pose = [x_m, y_m, z_m, rx_rad, ry_rad, rz_rad] @@ -495,7 +493,6 @@ def _simulate_cartesian_jog(self, cmd: JogLCommand) -> DryRunResult | None: radians[i] = last_valid_q - # Update state to final position rad_to_steps(last_valid_q, steps_buf) self._state.Position_in[:] = steps_buf diff --git a/parol6/client/sync_client.py b/parol6/client/sync_client.py index 0febb7e..de06916 100644 --- a/parol6/client/sync_client.py +++ b/parol6/client/sync_client.py @@ -49,11 +49,9 @@ def _stop_sync_loop() -> None: loop = _SYNC_LOOP async def _shutdown(): - # Cancel all pending tasks tasks = [t for t in asyncio.all_tasks(loop) if t is not asyncio.current_task()] for task in tasks: task.cancel() - # Let cancelled tasks finalize if tasks: await asyncio.gather(*tasks, return_exceptions=True) loop.stop() diff --git a/parol6/commands/base.py b/parol6/commands/base.py index 25b89e8..28512df 100644 --- a/parol6/commands/base.py +++ b/parol6/commands/base.py @@ -29,9 +29,6 @@ class ExecutionStatusCode(Enum): CANCELLED = auto() -# ----- Small utilities ----- - - class Countdown: """Simple count-down timer.""" @@ -108,9 +105,8 @@ def __init__(self, p: P) -> None: self._q_rad_buf: np.ndarray = np.zeros(6, dtype=np.float64) self._steps_buf: np.ndarray = np.zeros(6, dtype=np.int32) - # Ensure command objects are usable as dict keys (identity-based) def __hash__(self) -> int: - # Identity-based hash is appropriate for ephemeral command instances + # Identity-based hash: command instances are ephemeral dict keys. return id(self) @property @@ -247,8 +243,7 @@ def compute(self, state: ControllerState) -> bytes: ... def execute_step(self, state: ControllerState) -> ExecutionStatusCode: - # Queries are dispatched via compute() by the controller. - # This exists only to satisfy the abstract method. + # Queries are dispatched via compute(); execute_step is never used. raise NotImplementedError("Queries use compute(), not execute_step()") diff --git a/parol6/commands/cartesian_commands.py b/parol6/commands/cartesian_commands.py index 54f4169..b58a829 100644 --- a/parol6/commands/cartesian_commands.py +++ b/parol6/commands/cartesian_commands.py @@ -41,7 +41,7 @@ logger = logging.getLogger(__name__) -# Pre-computed Cartesian jog limit constants (avoid per-tick recomputation) +# Pre-computed to avoid per-tick recomputation. _CART_ANG_JOG_MIN_RAD: float = float(np.deg2rad(CART_ANG_JOG_MIN)) _CART_ANG_JOG_MAX_RAD: float = float(LIMITS.cart.jog.velocity.angular) _CART_LIN_JOG_MIN_MS: float = CART_LIN_JOG_MIN / 1000.0 @@ -137,7 +137,7 @@ def execute_step(self, state: "ControllerState") -> ExecutionStatusCode: """Execute one tick of Cartesian jogging.""" cse = state.cartesian_streaming_executor - # Initialize only if not already active (preserve velocity across streaming) + # Initialize only if not already active, to preserve velocity across streaming. if not cse.active: steps_to_rad(state.Position_in, self._q_rad_buf) cse.sync_pose(get_fkine_se3(state)) @@ -146,7 +146,6 @@ def execute_step(self, state: "ControllerState") -> ExecutionStatusCode: self._q_ik_seed[:] = self._q_rad_buf self._vel_ratio = 1.0 - # Handle timer expiry - stop smoothly if self.timer_expired(): cse.set_jog_velocity_1dof(self._axis_index, 0.0, self.is_rotation) smoothed_pose, smoothed_vel, finished = cse.tick() @@ -163,7 +162,6 @@ def execute_step(self, state: "ControllerState") -> ExecutionStatusCode: self.stop_and_idle(state) return ExecutionStatusCode.COMPLETED - # Compute target velocity based on speed fraction from velocity vector vels = self.p.velocities speed_mag = abs(vels[self._axis_index + (3 if self.is_rotation else 0)]) if self.is_rotation: @@ -182,7 +180,7 @@ def execute_step(self, state: "ControllerState") -> ExecutionStatusCode: if self._vel_ratio > 1.0: velocity /= self._vel_ratio - # Set target velocity (WRF transforms to body frame, TRF uses body directly) + # WRF transforms to body frame; TRF uses the body frame directly. if self.p.frame == "WRF": cse.set_jog_velocity_1dof_wrf(self._axis_index, velocity, self.is_rotation) else: @@ -288,7 +286,7 @@ def _precompute_trajectory(self, state: "ControllerState") -> None: if joint_path.is_partial: ik_valid = joint_path.valid assert ik_valid is not None - # Extract TCP poses (x,y,z,rx,ry,rz) in meters+radians from SE3 + # TCP poses are (x,y,z,rx,ry,rz) in meters+radians. n = len(cart_poses) tcp_poses = np.empty((n, 6), dtype=np.float64) _rpy_buf = np.empty(3, dtype=np.float64) @@ -335,8 +333,8 @@ def _compute_target_pose(self, state: "ControllerState") -> None: pose = self.p.pose if self.p.rel: - # Relative move: compute delta SE3, then apply in tool frame (TRF) - # or world frame (WRF) depending on self.p.frame + # Relative move: apply the delta SE3 in tool frame (TRF) or world + # frame (WRF) per self.p.frame. delta_se3 = np.zeros((4, 4), dtype=np.float64) se3_from_rpy( pose[0] / 1000.0, @@ -354,7 +352,6 @@ def _compute_target_pose(self, state: "ControllerState") -> None: # Pre-multiply for world-relative motion self.target_pose = delta_se3 @ cast(np.ndarray, self.initial_pose) else: - # Absolute target pose self.target_pose = np.zeros((4, 4), dtype=np.float64) se3_from_rpy( pose[0] / 1000.0, diff --git a/parol6/commands/joint_commands.py b/parol6/commands/joint_commands.py index 2894127..0c40dc8 100644 --- a/parol6/commands/joint_commands.py +++ b/parol6/commands/joint_commands.py @@ -42,12 +42,9 @@ class JointMoveCommandBase(TrajectoryMoveCommandBase[_MP]): """Base class for joint-space trajectory commands. - Subclasses must implement: - - _get_target_rad(): Return target joint positions in radians - - This base class provides: - - do_setup(): Builds trajectory via JointPath.interpolate + TrajectoryBuilder - - execute_step(): Inherited from TrajectoryMoveCommandBase (uses MotionExecutor) + Subclasses supply the target via _get_target_rad(); this class builds the + trajectory (JointPath.interpolate + TrajectoryBuilder) and inherits + execute_step() from TrajectoryMoveCommandBase. """ __slots__ = ( diff --git a/parol6/commands/servo_commands.py b/parol6/commands/servo_commands.py index 90ac4ff..c5f300f 100644 --- a/parol6/commands/servo_commands.py +++ b/parol6/commands/servo_commands.py @@ -129,7 +129,7 @@ def __init__(self, p: ServoJCmd): self._pos_rad_buf = np.zeros(6, dtype=np.float64) def do_setup(self, state: ControllerState) -> None: - # Convert target from degrees to radians into pre-allocated list + # Target arrives in degrees; convert into pre-allocated radian buffer for i in range(6): self._target_rad[i] = math.radians(self.p.angles[i]) @@ -175,7 +175,7 @@ def do_setup(self, state: ControllerState) -> None: self._target_se3, ) - # Solve IK using current joint angles as seed + # Seed IK with current joint angles for branch continuity steps_to_rad(state.Position_in, self._q_rad_buf) ik_result = solve_ik(PAROL6_ROBOT.robot, self._target_se3, self._q_rad_buf) if not ik_result.success or ik_result.q is None: @@ -251,7 +251,6 @@ def execute_step(self, state: ControllerState) -> ExecutionStatusCode: self._q_ik_seed[:] = self._q_rad_buf self._initialized = True - # CSE drives Cartesian path cse.set_pose_target(self._target_se3) smoothed_pose, vel, finished = cse.tick() @@ -273,7 +272,6 @@ def execute_step(self, state: ControllerState) -> ExecutionStatusCode: else: self._q_ik_seed[:] = ik_result.q - # Compute per-joint delta from commanded position dq = self._dq_buf for i in range(6): dq[i] = float(ik_result.q[i]) - self._q_commanded[i] diff --git a/parol6/commands/system_commands.py b/parol6/commands/system_commands.py index d2c98ef..f9f0d93 100644 --- a/parol6/commands/system_commands.py +++ b/parol6/commands/system_commands.py @@ -43,7 +43,6 @@ class ResumeCommand(SystemCommand[ResumeCmd]): __slots__ = () def execute_step(self, state: ControllerState) -> ExecutionStatusCode: - """Execute resume - set controller to enabled state.""" logger.info("RESUME command executed") state.enabled = True state.disabled_reason = "" @@ -62,7 +61,6 @@ class HaltCommand(SystemCommand[HaltCmd]): __slots__ = () def execute_step(self, state: ControllerState) -> ExecutionStatusCode: - """Execute halt - zero speeds and set controller to disabled state.""" logger.info("HALT command executed") state.Speed_out.fill(0) state.enabled = False @@ -82,7 +80,6 @@ class WriteIOCommand(SystemCommand[WriteIOCmd]): __slots__ = () def execute_step(self, state: ControllerState) -> ExecutionStatusCode: - """Execute set port - update I/O port state.""" logger.info(f"WRITE_IO: Setting port {self.p.port_index} to {self.p.value}") state.InOut_out[self.p.port_index] = self.p.value @@ -120,7 +117,7 @@ class SimulatorCommand(SystemCommand[SimulatorCmd]): __slots__ = () def execute_step(self, state: ControllerState) -> ExecutionStatusCode: - """Execute simulator toggle by setting env var and signaling reconfiguration.""" + """Toggle the env var; controller picks it up and reinitializes transport.""" os.environ["PAROL6_FAKE_SERIAL"] = "1" if self.p.on else "0" logger.info(f"SIMULATOR command executed: {'ON' if self.p.on else 'OFF'}") @@ -129,7 +126,6 @@ def execute_step(self, state: ControllerState) -> ExecutionStatusCode: return ExecutionStatusCode.COMPLETED -# Valid motion profile types VALID_PROFILES = frozenset(("TOPPRA", "RUCKIG", "QUINTIC", "TRAPEZOID", "LINEAR")) @@ -166,7 +162,6 @@ def do_setup(self, state: ControllerState) -> None: raise err def execute_step(self, state: ControllerState) -> ExecutionStatusCode: - """Execute profile change.""" profile = self.p.profile.upper() old_profile = state.motion_profile @@ -194,7 +189,6 @@ class SetTcpOffsetCommand(MotionCommand[SetTcpOffsetCmd]): __slots__ = () def execute_step(self, state: ControllerState) -> ExecutionStatusCode: - """Convert mm to meters and apply TCP offset.""" offset_m = (self.p.x / 1000.0, self.p.y / 1000.0, self.p.z / 1000.0) state.set_tcp_offset(offset_m) diff --git a/parol6/commands/utility_commands.py b/parol6/commands/utility_commands.py index 29914a6..15134b1 100644 --- a/parol6/commands/utility_commands.py +++ b/parol6/commands/utility_commands.py @@ -36,7 +36,6 @@ class DelayCommand(CommandBase[DelayCmd]): __slots__ = () def do_setup(self, state: "ControllerState") -> None: - """Start the delay timer.""" self.start_timer(self.p.seconds) logger.info(f" -> Delay starting for {self.p.seconds} seconds...") @@ -64,7 +63,6 @@ class ResetCommand(SystemCommand[ResetCmd]): __slots__ = () def execute_step(self, state: "ControllerState") -> ExecutionStatusCode: - """Reset state immediately.""" state.reset() self._sync_mock = True self.finish() @@ -85,7 +83,6 @@ class ResetLoopStatsCommand(SystemCommand[ResetLoopStatsCmd]): __slots__ = () def execute_step(self, state: "ControllerState") -> ExecutionStatusCode: - """Signal controller to reset loop stats.""" state.loop_stats_reset_pending = True logger.debug("RESET_LOOP_STATS command executed") self.finish() diff --git a/parol6/config.py b/parol6/config.py index 476b9c6..2f87eae 100644 --- a/parol6/config.py +++ b/parol6/config.py @@ -199,7 +199,6 @@ def get_com_port_with_fallback() -> str: # Standby/home position in degrees - pass-through from robot definition STANDBY_ANGLES_DEG: list[float] = list(PAROL6_ROBOT.joint.standby_deg) -# Alias HOME_ANGLES_DEG: list[float] = STANDBY_ANGLES_DEG diff --git a/parol6/motion/geometry.py b/parol6/motion/geometry.py index be971fe..e36cb0b 100644 --- a/parol6/motion/geometry.py +++ b/parol6/motion/geometry.py @@ -24,7 +24,6 @@ logger = logging.getLogger(__name__) -# Default control rate for geometry sampling DEFAULT_CONTROL_RATE = CONTROL_RATE_HZ @@ -96,7 +95,6 @@ def generate_arc( num_points = max(2, n_samples) - # Vectorized arc position generation using scipy Rotation t_values = np.linspace(0, 1, num_points) if num_points > 1 else np.array([1.0]) angles = t_values * arc_angle @@ -278,7 +276,6 @@ def compute_circle_from_3_points( p2 = np.asarray(p2, dtype=np.float64) p3 = np.asarray(p3, dtype=np.float64) - # Vectors from p1 to p2 and p3 a = p2 - p1 b = p3 - p1 @@ -298,7 +295,6 @@ def compute_circle_from_3_points( normal /= np.linalg.norm(normal) return center, radius, normal - # Normal to the plane normal = np.asarray(np.cross(a, b), dtype=np.float64) normal_len = float(np.linalg.norm(normal)) if normal_len < 1e-12: @@ -409,7 +405,7 @@ def build_composite_cartesian_path( _linear_se3_segment_into(waypoints[0], waypoints[1], out) return out - # Compute segment lengths (mm) + # Segment lengths in mm (FK transforms are in meters) seg_lengths: list[float] = [0.0] * (n - 1) for i in range(n - 1): seg_lengths[i] = ( diff --git a/parol6/motion/streaming_executors.py b/parol6/motion/streaming_executors.py index ceffda9..0adafe4 100644 --- a/parol6/motion/streaming_executors.py +++ b/parol6/motion/streaming_executors.py @@ -84,7 +84,7 @@ def _tangent_to_pose_jit( se3_mul(ref_pose, delta, out) -# Module-level constant for error checking (avoids tuple creation per check) +# Module-level constant avoids tuple creation per error check. _RUCKIG_ERRORS = (Result.Error, Result.ErrorInvalidInput) @@ -119,7 +119,7 @@ def __init__(self, num_dofs: int, dt: float = INTERVAL_S): self._vel_scale: float = 1.0 self._acc_scale: float = 1.0 - # Pre-allocated output buffers (reused every tick to avoid allocations) + # Reused every tick to avoid allocations. self._pos_out = np.zeros(num_dofs) self._vel_out = np.zeros(num_dofs) self._zeros_np = np.zeros(num_dofs) @@ -162,7 +162,7 @@ def _tick_ruckig(self) -> tuple[Result, np.ndarray, np.ndarray]: self.active = False else: self.out.pass_to_input(self.inp) - # Copy into pre-allocated buffers (avoids list() allocation per tick) + # Copy into pre-allocated buffers to avoid a list() allocation per tick. self._pos_out[:] = self.out.new_position self._vel_out[:] = self.out.new_velocity return result, self._pos_out, self._vel_out @@ -203,24 +203,21 @@ def __init__(self, num_dofs: int = 6, dt: float = INTERVAL_S): num_dofs: Number of degrees of freedom (joints) dt: Control cycle time in seconds """ - # Cartesian velocity limit (mm/s), None = no cart limiting - # Must be set before super().__init__ calls _init_limits/_init_state + # Cartesian velocity limit (mm/s), None = no cart limiting. + # Must be set before super().__init__ calls _init_limits/_init_state. self._cart_vel_limit: float | None = None - # Pre-allocated buffers for cart velocity limit calculations (avoids per-call allocations) + # Pre-allocated buffers for cart velocity limit calculations (avoids per-call allocations). self._q_current_buf = np.zeros(num_dofs, dtype=np.float64) self._q_target_buf = np.zeros(num_dofs, dtype=np.float64) self._dq_buf = np.zeros(num_dofs, dtype=np.float64) self._jacob0_buf = np.zeros((6, num_dofs), dtype=np.float64, order="F") - # Pre-allocated buffers for Ruckig parameters - each has ONE semantic purpose - # Position sync (current/target position share same values) + # Each Ruckig-parameter buffer below has ONE semantic purpose, never reused across roles. self._sync_pos_buf: list[float] = [0.0] * num_dofs - # Limit parameters (max_velocity, max_acceleration, max_jerk) self._max_vel_buf: list[float] = [0.0] * num_dofs self._max_acc_buf: list[float] = [0.0] * num_dofs self._max_jerk_buf: list[float] = [0.0] * num_dofs - # Target velocity for jogging self._target_vel_buf: list[float] = [0.0] * num_dofs super().__init__(num_dofs, dt) @@ -293,11 +290,9 @@ def set_position_target(self, q_target: list[float]) -> None: Args: q_target: Target joint positions in radians """ - # Apply Cartesian velocity limiting if enabled if self._cart_vel_limit is not None and self._cart_vel_limit > 0: self._apply_cart_velocity_limit(q_target) else: - # Reset to hardware limits (reuse buffer) self._max_vel_buf[:] = self._hardware_v_max self.inp.max_velocity = self._max_vel_buf @@ -317,7 +312,7 @@ def set_jog_velocity(self, joint_velocities: NDArray[np.float64]) -> None: Args: joint_velocities: Desired velocity for each joint in rad/s (signed) """ - # Use jog-specific velocity limits (~80% of hardware limits) - reuse buffers + # Jog uses its own velocity limits (~80% of hardware) rather than the hardware caps. for i in range(self.num_dofs): self._max_vel_buf[i] = self._jog_v_max[i] * self._vel_scale self._max_acc_buf[i] = self._hardware_a_max[i] * self._acc_scale @@ -338,16 +333,15 @@ def _apply_cart_velocity_limit(self, q_target: list[float]) -> None: ensure TCP velocity along the direction to target stays within the Cartesian velocity limit. """ - # Use pre-allocated buffers to avoid allocations self._q_current_buf[:] = self.inp.current_position self._q_target_buf[:] = q_target np.subtract(self._q_target_buf, self._q_current_buf, out=self._dq_buf) - # Get the linear part of the Jacobian (first 3 rows) + # Linear part of the Jacobian is the first 3 rows. PAROL6_ROBOT.robot.jacob0_into(self._q_current_buf, self._jacob0_buf) J_lin = self._jacob0_buf[:3, :] - # Compute Cartesian velocity per unit "scale" along dq direction + # Cartesian velocity per unit "scale" along the dq direction. cart_vel_per_scale = np.linalg.norm(J_lin @ self._dq_buf) if cart_vel_per_scale > 1e-6: @@ -356,18 +350,17 @@ def _apply_cart_velocity_limit(self, q_target: list[float]) -> None: ) # mm/s to m/s max_scale = v_max_m_s / cart_vel_per_scale - # Reuse pre-allocated buffer for velocity limits for j in range(self.num_dofs): - # Joint velocity = dq[j] * scale, so max joint vel = |dq[j]| * max_scale + # Joint velocity = dq[j] * scale, so max joint vel = |dq[j]| * max_scale. q_dot_max = min( abs(self._dq_buf[j]) * max_scale, self._hardware_v_max[j] ) - # Ensure non-zero minimum to avoid Ruckig issues + # Non-zero minimum avoids Ruckig issues with zero limits. self._max_vel_buf[j] = max(q_dot_max, 1e-6) self.inp.max_velocity = self._max_vel_buf else: - # Near-zero motion, use hardware limits (reuse buffer) + # Near-zero motion: fall back to hardware limits. self._max_vel_buf[:] = self._hardware_v_max self.inp.max_velocity = self._max_vel_buf @@ -384,7 +377,6 @@ def tick(self) -> tuple[np.ndarray, np.ndarray, bool]: - finished: True if target reached (position mode) or velocity reached (velocity mode) """ if not self.active: - # Sync _pos_out with current position for inactive state self._pos_out[:] = self.inp.current_position return self._pos_out, self._zeros_np, True @@ -393,7 +385,6 @@ def tick(self) -> tuple[np.ndarray, np.ndarray, bool]: if result in _RUCKIG_ERRORS: return self._pos_out, self._zeros_np, True - # pos/vel are pre-allocated buffers from _tick_ruckig return pos, vel, result == Result.Finished def reset_limits(self) -> None: @@ -446,13 +437,12 @@ def __init__(self, dt: float = INTERVAL_S): Args: dt: Control cycle time in seconds """ - # Reference pose for tangent space computations - # Must be set before super().__init__ calls _init_limits/_init_state + # Reference pose for tangent space computations. + # Must be set before super().__init__ calls _init_limits/_init_state. self.reference_pose: np.ndarray | None = None - # Pre-allocated arrays for Ruckig parameters (reused to avoid per-tick allocations) - # Ruckig copies values on assignment, so we update in-place then assign same array - # Must be created before super().__init__() since _apply_limits() is called during init + # Ruckig copies values on assignment, so these are updated in-place then re-assigned. + # Must exist before super().__init__(), which calls _apply_limits() during init. self._max_velocity_arr = np.zeros(6, dtype=np.float64) self._max_acceleration_arr = np.zeros(6, dtype=np.float64) self._max_jerk_arr = np.zeros(6, dtype=np.float64) @@ -461,16 +451,14 @@ def __init__(self, dt: float = INTERVAL_S): super().__init__(num_dofs=6, dt=dt) # 6-DOF: [x, y, z, wx, wy, wz] - # Pre-allocated numpy arrays for hot path (avoids allocations per tick) self._tangent_buf = np.zeros(6, dtype=np.float64) self._vel_np_buf = np.zeros(6, dtype=np.float64) self._world_vel_buf = np.zeros(6, dtype=np.float64) - # SE3 workspace buffers for JIT functions (avoids allocations in pose conversions) + # SE3 workspace buffers let the JIT pose conversions run with zero allocation. self._ref_inv_buf = np.zeros((4, 4), dtype=np.float64) self._delta_buf = np.zeros((4, 4), dtype=np.float64) self._result_pose_buf = np.zeros((4, 4), dtype=np.float64) - # Additional workspace for se3_log_ws/se3_exp_ws (zero internal allocation) self._omega_ws = np.zeros(3, dtype=np.float64) self._R_ws = np.zeros((3, 3), dtype=np.float64) self._V_ws = np.zeros((3, 3), dtype=np.float64) # Reused for V and V_inv @@ -502,17 +490,14 @@ def _apply_limits(self) -> None: Uses pre-allocated numpy arrays to avoid per-tick allocations. """ - # Update velocity limits in-place self._max_velocity_arr[:3] = self._v_lin_max * self._vel_scale self._max_velocity_arr[3:] = self._v_ang_max * self._vel_scale self.inp.max_velocity = self._max_velocity_arr - # Update acceleration limits in-place self._max_acceleration_arr[:3] = self._a_lin_max * self._acc_scale self._max_acceleration_arr[3:] = self._a_ang_max * self._acc_scale self.inp.max_acceleration = self._max_acceleration_arr - # Update jerk limits in-place self._max_jerk_arr[:3] = self._j_lin_max self._max_jerk_arr[3:] = self._j_ang_max self.inp.max_jerk = self._max_jerk_arr @@ -551,7 +536,6 @@ def _pose_to_tangent(self, pose: np.ndarray) -> np.ndarray: if self.reference_pose is None: self._tangent_buf.fill(0.0) return self._tangent_buf - # Use JIT function with pre-allocated workspace buffers (zero allocation) _pose_to_tangent_jit( self.reference_pose, pose, @@ -576,7 +560,6 @@ def _tangent_to_pose(self, tangent: np.ndarray) -> np.ndarray: """ if self.reference_pose is None: return np.eye(4, dtype=np.float64) - # Copy tangent to buffer and use JIT function with pre-allocated workspace self._tangent_buf[:] = tangent _tangent_to_pose_jit( self.reference_pose, @@ -625,7 +608,6 @@ def set_jog_velocity_1dof( velocity: Target velocity (m/s for linear, rad/s for rotation) is_rotation: True for rotation axes (RX, RY, RZ) """ - # Update target velocity in-place (zero allocation) self._target_velocity_arr.fill(0.0) if is_rotation: self._target_velocity_arr[3 + axis] = velocity @@ -661,18 +643,15 @@ def set_jog_velocity_1dof_wrf( logger.warning("set_jog_velocity_1dof_wrf called without reference_pose") return - # Reuse pre-allocated buffer for world velocity self._world_vel_buf.fill(0.0) if is_rotation: self._world_vel_buf[3 + axis] = velocity else: self._world_vel_buf[axis] = velocity - # Transform from world frame to body frame (tangent space) - # Body velocity = R^T @ world velocity + # Transform world frame to body frame (tangent space): body velocity = R^T @ world velocity. R = self.reference_pose[:3, :3] - # JIT-compiled transform into target buffer (zero allocation) np.dot(R.T, self._world_vel_buf[:3], self._target_velocity_arr[:3]) np.dot(R.T, self._world_vel_buf[3:], self._target_velocity_arr[3:]) @@ -720,9 +699,7 @@ def tick(self) -> tuple[np.ndarray, NDArray[np.float64], bool]: True, ) - # Convert tangent back to pose smoothed_pose = self._tangent_to_pose(pos) - # Copy velocity into pre-allocated buffer self._vel_np_buf[:] = vel # Don't auto-deactivate in velocity mode - caller controls via set_jog_velocity(0) diff --git a/parol6/motion/trajectory.py b/parol6/motion/trajectory.py index bbcddb0..06fcda5 100644 --- a/parol6/motion/trajectory.py +++ b/parol6/motion/trajectory.py @@ -45,7 +45,6 @@ def _rad_to_steps_alloc(rad: NDArray) -> NDArray[np.int32]: if rad.ndim == 1: rad_to_steps(rad, out) else: - # 2D array (trajectory): convert row by row for i in range(rad.shape[0]): rad_to_steps(rad[i], out[i]) return out @@ -73,7 +72,7 @@ def __call__(self, s_in: float | NDArray, order: int = 0) -> NDArray[np.float64] if order <= 1: result = self._pp(s, order) else: - # Second+ derivative of piecewise linear is zero + # Second+ derivative of a piecewise-linear path is zero result = np.zeros((len(s), self.dof)) return result[0] if scalar and result.ndim > 1 else result @@ -261,7 +260,6 @@ def from_poses( return cls(positions=positions) valid = np.array(result.valid, dtype=np.bool_) - # Count consecutive valid from start first_fail = int(np.argmin(valid)) # first False index if first_fail < 2: if stop_on_failure: @@ -299,7 +297,6 @@ def interpolate( start = np.asarray(start_rad, dtype=np.float64) end = np.asarray(end_rad, dtype=np.float64) - # Vectorized interpolation using broadcasting t = np.linspace(0, 1, n_samples).reshape(-1, 1) positions = start + t * (end - start) @@ -471,27 +468,21 @@ def build(self) -> Trajectory: Trajectory ready for execution """ if len(self.joint_path) < 2: - # Trivial path - single point steps = _rad_to_steps_alloc( self.joint_path.positions[0:1] # Keep 2D shape (1, 6) ) return Trajectory(steps=steps, duration=0.0) - # Route to appropriate trajectory builder based on profile if self.profile == ProfileType.RUCKIG: - # Point-to-point jerk-limited motion (doesn't follow path waypoints) + # Point-to-point jerk-limited motion; ignores intermediate waypoints return self._build_ruckig_trajectory() elif self.profile == ProfileType.LINEAR: - # Simple linear interpolation - no velocity smoothing return self._build_simple_trajectory() elif self.profile == ProfileType.QUINTIC: - # Quintic polynomial timing return self._build_quintic_trajectory() elif self.profile == ProfileType.TRAPEZOID: - # Trapezoidal velocity profile along path return self._build_trapezoid_trajectory() else: - # TOPPRA is default - time-optimal path following return self._build_toppra_trajectory() def _build_toppra_trajectory(self) -> Trajectory: @@ -505,7 +496,6 @@ def _build_toppra_trajectory(self) -> Trajectory: positions = self.joint_path.positions n_points = len(positions) - # Uniform parameterization for path knots ss_waypoints = np.linspace(0.0, 1.0, n_points) # Piecewise linear PPoly — prevents cubic spline overshoot that @@ -519,7 +509,6 @@ def _build_toppra_trajectory(self) -> Trajectory: c[1, i, :] = positions[i] path = _LinearPath(PPoly(c, ss_waypoints), dof) - # Use pre-computed limit arrays for constraints joint_vel_constraint = constraint.JointVelocityConstraint(self._vlim) joint_acc_constraint = constraint.JointAccelerationConstraint(self._alim) constraints: list[constraint.Constraint] = [ @@ -527,7 +516,6 @@ def _build_toppra_trajectory(self) -> Trajectory: joint_acc_constraint, ] - # Add Cartesian velocity constraint if specified if self.cart_vel_limit is not None and self.cart_vel_limit > 0: cart_constraint = self._build_cart_vel_constraint(path, ss_waypoints) if cart_constraint is not None: @@ -540,7 +528,6 @@ def _build_toppra_trajectory(self) -> Trajectory: n_gridpoints = n_points * 3 gridpoints = np.linspace(0.0, 1.0, n_gridpoints) - # Use TOPPRAsd if duration is specified, otherwise time-optimal TOPPRA if self.duration is not None and self.duration > 0: instance = algo.TOPPRAsd(constraints, path, gridpoints=gridpoints) instance.set_desired_duration(self.duration) @@ -572,7 +559,7 @@ def _build_toppra_trajectory(self) -> Trajectory: n_points, ) - # Sample trajectory at control rate, including endpoint (vectorized) + # Sample at control rate, including the exact endpoint n_output = max(2, int(np.floor(duration / self.dt)) + 1) times = np.arange(n_output - 1) * self.dt trajectory_rad = np.empty((n_output, 6), dtype=np.float64) @@ -585,7 +572,6 @@ def _build_toppra_trajectory(self) -> Trajectory: duration, ) - # Convert to motor steps (vectorized) steps = _rad_to_steps_alloc(trajectory_rad) return Trajectory(steps=steps, duration=duration) @@ -608,17 +594,14 @@ def _build_simple_trajectory(self) -> Trajectory: else self._compute_joint_duration_linear() ) - # Sample path uniformly n_output = max(2, int(np.ceil(duration / self.dt))) s_values = np.linspace(0.0, 1.0, n_output) trajectory_rad = self.joint_path.sample_many(s_values) - # Enforce velocity limits by stretching segments where needed trajectory_rad, duration = self._enforce_segment_limits( trajectory_rad, duration ) - # Convert to motor steps steps = _rad_to_steps_alloc(trajectory_rad) return Trajectory(steps=steps, duration=duration) @@ -693,23 +676,20 @@ def _enforce_segment_limits( if n_points < 2: return trajectory_rad, duration - # Initial uniform time per segment initial_dt = duration / (n_points - 1) - # Compute per-segment joint deltas deltas = np.diff(trajectory_rad, axis=0) # (N-1, 6) - # For each segment, compute minimum time needed to respect velocity limits - # time_needed = max(|delta[j]| / v_max[j]) for all joints + # Minimum time per segment to respect velocity limits: + # max(|delta[j]| / v_max[j]) over joints min_segment_times = np.max(np.abs(deltas) / self.v_max, axis=1) # (N-1,) - # Also check acceleration limits between segments - # This is approximate - we check if velocity change between segments is feasible + # Approximate acceleration check: is the velocity change between + # adjacent segments feasible? if n_points > 2: - velocities = deltas / initial_dt # Approximate velocities per segment + velocities = deltas / initial_dt accel = np.diff(velocities, axis=0) / initial_dt # (N-2, 6) accel_times = np.zeros(n_points - 1) - # Segments that cause high acceleration need more time for i in range(len(accel)): max_accel_ratio = np.max(np.abs(accel[i]) / self.a_max) if max_accel_ratio > 1.0: @@ -721,13 +701,10 @@ def _enforce_segment_limits( ) min_segment_times = np.maximum(min_segment_times, accel_times) - # Ensure minimum dt per segment min_segment_times = np.maximum(min_segment_times, self.dt) - # Compute actual segment times: max of initial_dt and min_segment_times segment_times = np.maximum(min_segment_times, initial_dt) - # Check if any stretching was needed new_duration = float(np.sum(segment_times)) if new_duration <= duration * 1.001: # No significant change return trajectory_rad, duration @@ -739,14 +716,13 @@ def _enforce_segment_limits( (new_duration / duration - 1) * 100, ) - # Resample trajectory at control rate with new timing + # Resample at control rate with the new per-segment timing cumulative_times = np.zeros(n_points) cumulative_times[1:] = np.cumsum(segment_times) n_output = max(2, int(np.ceil(new_duration / self.dt))) output_times = np.linspace(0.0, new_duration, n_output) - # Interpolate each joint new_trajectory = np.empty((n_output, 6), dtype=np.float64) for j in range(6): new_trajectory[:, j] = np.interp( @@ -812,10 +788,8 @@ def _compute_joint_duration_quintic(self) -> float: total_delta = np.abs(positions[-1] - positions[0]) - # Velocity-limited duration: T = 1.875 * delta / v_max time_vel = 1.875 * total_delta / self.v_max - # Acceleration-limited duration: T = sqrt(5.77 * delta / a_max) with np.errstate(divide="ignore", invalid="ignore"): time_acc = np.where( self.a_max > 0, @@ -823,7 +797,6 @@ def _compute_joint_duration_quintic(self) -> float: 0.0, ) - # Take the maximum per joint, then across all joints time_per_joint = np.maximum(time_vel, time_acc) return max(float(np.max(time_per_joint)), self.dt * 2) @@ -843,11 +816,8 @@ def _compute_joint_duration_linear(self) -> float: total_delta = np.abs(positions[-1] - positions[0]) - # Velocity-limited duration time_vel = total_delta / self.v_max - # Acceleration-limited duration (for triangular velocity profile) - # Time to reach target with constant accel then decel: T = 2 * sqrt(2 * d / a) with np.errstate(divide="ignore", invalid="ignore"): time_acc = np.where( self.a_max > 0, @@ -855,7 +825,6 @@ def _compute_joint_duration_linear(self) -> float: 0.0, ) - # Take the maximum per joint, then across all joints time_per_joint = np.maximum(time_vel, time_acc) return max(float(np.max(time_per_joint)), self.dt * 2) @@ -874,11 +843,12 @@ def _compute_cartesian_duration_from_path(self) -> float: if len(positions) < 2: return self.dt * 2 - # Compute per-segment time based on max joint movement in each segment + # Per-segment time from the max joint movement within each segment; + # summing these handles singularities/wrist flips that total + # start-to-end displacement would miss. deltas = np.diff(positions, axis=0) # (N-1, 6) segment_times = np.max(np.abs(deltas) / self.v_max, axis=1) # (N-1,) - # Ensure minimum time per segment segment_times = np.maximum(segment_times, self.dt) return max(float(np.sum(segment_times)), self.dt * 2) @@ -916,15 +886,12 @@ def _build_quintic_trajectory_joint(self) -> Trajectory: times = np.linspace(0.0, duration, n_output) trajectory_rad = np.empty((n_output, 6), dtype=np.float64) - # Generate quintic profile for each joint for j in range(6): delta = end_pos[j] - start_pos[j] if abs(delta) < 1e-9: - # Joint doesn't move trajectory_rad[:, j] = start_pos[j] continue - # Create quintic trajectory for this joint bc_start = BoundaryCondition( position=start_pos[j], velocity=0.0, acceleration=0.0 ) @@ -934,16 +901,13 @@ def _build_quintic_trajectory_joint(self) -> Trajectory: interval = TimeInterval(start=0.0, end=duration) traj = PolynomialTrajectory.order_5_trajectory(bc_start, bc_end, interval) - # Sample the trajectory for i, t in enumerate(times): trajectory_rad[i, j] = traj(t)[0] - # Enforce velocity limits by stretching segments where needed trajectory_rad, duration = self._enforce_segment_limits( trajectory_rad, duration ) - # Convert to motor steps steps = _rad_to_steps_alloc(trajectory_rad) return Trajectory(steps=steps, duration=duration) @@ -963,7 +927,7 @@ def _build_quintic_trajectory_cartesian(self) -> Trajectory: # Use per-segment analysis to handle singularities and wrist flips duration = self._compute_cartesian_duration_from_path() - # Create quintic trajectory from s=0 to s=1 over duration + # Quintic profile for the path parameter s, from s=0 to s=1 bc_start = BoundaryCondition(position=0.0, velocity=0.0, acceleration=0.0) bc_end = BoundaryCondition(position=1.0, velocity=0.0, acceleration=0.0) interval = TimeInterval(start=0.0, end=duration) @@ -972,20 +936,16 @@ def _build_quintic_trajectory_cartesian(self) -> Trajectory: n_output = max(2, int(np.ceil(duration / self.dt))) times = np.linspace(0.0, duration, n_output) - # Evaluate quintic trajectory to get profile-shaped s values profile_s = np.empty(n_output, dtype=np.float64) for i in range(n_output): profile_s[i] = traj(float(times[i]))[0] - # Sample path at quintic-shaped s values trajectory_rad = self.joint_path.sample_many(profile_s) - # Enforce velocity limits by stretching segments where needed trajectory_rad, duration = self._enforce_segment_limits( trajectory_rad, duration ) - # Convert to motor steps steps = _rad_to_steps_alloc(trajectory_rad) return Trajectory(steps=steps, duration=duration) @@ -1026,15 +986,12 @@ def _build_trapezoid_trajectory_joint(self) -> Trajectory: times = np.linspace(0.0, duration, n_output) trajectory_rad = np.empty((n_output, 6), dtype=np.float64) - # Generate trapezoidal profile for each joint for j in range(6): delta = end_pos[j] - start_pos[j] if abs(delta) < 1e-9: - # Joint doesn't move trajectory_rad[:, j] = start_pos[j] continue - # Create trapezoidal profile for this joint params = TrapParams( q0=start_pos[j], q1=end_pos[j], @@ -1047,19 +1004,16 @@ def _build_trapezoid_trajectory_joint(self) -> Trajectory: params ) - # Scale times to match synchronized duration + # Scale this joint's own profile time onto the synchronized duration time_scale = profile_duration / duration if duration > 0 else 1.0 - # Sample the trajectory for i, t in enumerate(times): trajectory_rad[i, j] = traj_fn(t * time_scale)[0] - # Enforce velocity limits by stretching segments where needed trajectory_rad, duration = self._enforce_segment_limits( trajectory_rad, duration ) - # Convert to motor steps steps = _rad_to_steps_alloc(trajectory_rad) return Trajectory(steps=steps, duration=duration) @@ -1082,10 +1036,9 @@ def _build_trapezoid_trajectory_cartesian(self) -> Trajectory: # Use per-segment analysis to handle singularities and wrist flips duration = self._compute_cartesian_duration_from_path() - # Compute s-profile limits from joint limits vmax_s, amax_s, _ = self._compute_s_profile_limits() - # Create trapezoidal profile for path parameter s (0 to 1) + # Trapezoidal profile for the path parameter s, from s=0 to s=1 params = TrapParams( q0=0.0, q1=1.0, @@ -1107,20 +1060,16 @@ def _build_trapezoid_trajectory_cartesian(self) -> Trajectory: n_output = max(2, int(np.ceil(duration / self.dt))) times = np.linspace(0.0, duration, n_output) - # Evaluate profile at scaled times to get profile-shaped s values profile_s = np.array( [traj_fn(t * time_scale)[0] for t in times], dtype=np.float64 ) - # Sample path at trapezoid-shaped s values trajectory_rad = self.joint_path.sample_many(profile_s) - # Enforce velocity limits by stretching segments where needed trajectory_rad, duration = self._enforce_segment_limits( trajectory_rad, duration ) - # Convert to motor steps steps = _rad_to_steps_alloc(trajectory_rad) return Trajectory(steps=steps, duration=duration) @@ -1155,10 +1104,10 @@ def _build_cart_vel_constraint( # cart_vel_limit is already in m/s (SI units) v_max_m_s = self.cart_vel_limit - # Use scaled joint limits (respects user's velocity_frac) + # Scaled joint limits respect the user's velocity_frac v_max_joint = self.v_max - # Pre-allocate buffers for velocity limits (avoids per-call allocation) + # Pre-allocate; vlim_func is called once per gridpoint vlim_buffer = np.empty((6, 2), dtype=np.float64) _jac_buf = np.zeros((6, 6), dtype=np.float64, order="F") @@ -1167,11 +1116,10 @@ def vlim_func(s: float) -> NDArray: q = path(s) dq_ds = path(s, 1) # Path tangent (first derivative) - # Get the linear part of the Jacobian (first 3 rows) + # Linear (translational) part of the Jacobian is the first 3 rows robot.jacob0_into(q, _jac_buf) J_lin = _jac_buf[:3, :] - # Cartesian velocity per unit s_dot along path tangent cart_vel_per_sdot = np.linalg.norm(J_lin @ dq_ds) if cart_vel_per_sdot < 1e-6: @@ -1180,7 +1128,6 @@ def vlim_func(s: float) -> NDArray: vlim_buffer[:, 1] = v_max_joint return vlim_buffer.copy() - # Maximum s_dot to satisfy Cartesian velocity constraint max_sdot = v_max_m_s / cart_vel_per_sdot # The Cartesian constraint limits s_dot, not individual joint velocities. @@ -1191,7 +1138,6 @@ def vlim_func(s: float) -> NDArray: # keeping joints at their relative proportions. abs_dq_ds = np.abs(dq_ds) - # Compute s_dot limit from each joint's velocity limit with np.errstate(divide="ignore", invalid="ignore"): s_dot_per_joint = np.where( abs_dq_ds > 1e-9, @@ -1202,12 +1148,11 @@ def vlim_func(s: float) -> NDArray: # The binding joint limit determines max achievable s_dot s_dot_from_joints = float(np.min(s_dot_per_joint)) - # If Cartesian constraint is more restrictive, scale down all limits if max_sdot < s_dot_from_joints and s_dot_from_joints > 0: + # Cartesian constraint is the tighter one; scale all limits down scale = max_sdot / s_dot_from_joints q_dot_max = v_max_joint * scale else: - # Cartesian constraint is not binding, use joint limits q_dot_max = v_max_joint vlim_buffer[:, 0] = -q_dot_max @@ -1247,7 +1192,8 @@ def _build_ruckig_trajectory(self) -> Trajectory: inp.max_acceleration = self.a_max.tolist() inp.max_jerk = self.j_max.tolist() - # Pre-allocate buffer (estimate max iterations from simple duration + margin) + # Pre-size the buffer from an estimated duration; the loop below + # grows it if Ruckig runs longer than expected. est_duration = self._estimate_simple_duration() max_iters = int(est_duration / self.dt) + 500 # generous margin trajectory_rad = np.empty((max_iters, n_dofs), dtype=np.float64) @@ -1270,10 +1216,8 @@ def _build_ruckig_trajectory(self) -> Trajectory: actual_duration = out.trajectory.duration - # Trim to actual size trajectory_rad = trajectory_rad[:count] - # Convert to motor steps (vectorized) steps = _rad_to_steps_alloc(trajectory_rad) return Trajectory(steps=steps, duration=actual_duration) @@ -1288,7 +1232,6 @@ def _estimate_simple_duration(self) -> float: if len(positions) < 2: return self.dt * 2 - # Compute per-segment time based on max joint movement deltas = np.diff(positions, axis=0) # (N-1, 6) segment_times = np.max(np.abs(deltas) / self.v_max, axis=1) # (N-1,) diff --git a/parol6/protocol/wire.py b/parol6/protocol/wire.py index fba49c8..c6df9ba 100644 --- a/parol6/protocol/wire.py +++ b/parol6/protocol/wire.py @@ -45,7 +45,7 @@ def _enc_hook(obj: object) -> object: if isinstance(obj, np.ndarray): return obj.tolist() # type: ignore[no-matching-overload, ty:no-matching-overload] if isinstance(obj, (np.integer, np.floating)): - return obj.item() # Convert numpy scalar to Python native type + return obj.item() raise NotImplementedError(f"Cannot encode {type(obj)}") @@ -843,7 +843,6 @@ def _collect_command_structs() -> list[type]: continue if not issubclass(cls, msgspec.Struct): continue - # Check for tag in struct config config = getattr(cls, "__struct_config__", None) if config is not None and config.tag is not None: structs.append(cls) @@ -1592,18 +1591,15 @@ def pack_tx_frame_into( out[2] = 0xFF out[3] = 52 - # Positions and speeds: JIT-compiled packing + # Positions and speeds packed via JIT-compiled helper _pack_positions(out, position_out, 4) _pack_positions(out, speed_out, 22) - # Command out[40] = command_code - # Bitfields out[41] = _pack_bitfield(affected_joint_out) out[42] = _pack_bitfield(inout_out) - # Timeout out[43] = int(timeout_out) & 0xFF # Gripper: position, speed, current as 2 bytes each (big-endian) @@ -1625,7 +1621,6 @@ def pack_tx_frame_into( # CRC placeholder byte (0xE4) — fixed value, not computed out[53] = 228 - # End bytes out[54] = 0x01 out[55] = 0x02 diff --git a/parol6/robot.py b/parol6/robot.py index 6e3b9e3..09332e1 100644 --- a/parol6/robot.py +++ b/parol6/robot.py @@ -553,7 +553,7 @@ def __init__( ), ) - # Initialize pinokin for FK/IK + # pinokin provides FK/IK self._pinokin = PinokinRobot(self._urdf_path) # Pre-allocated buffers for FK/IK @@ -879,7 +879,6 @@ def create_sync_client(self, **kwargs: Any) -> SyncRobotClient: port: int = kwargs.get("port", self._port) timeout: float = kwargs.get("timeout", 5.0) client = SyncRobotClient(host=host, port=port, timeout=timeout) - # Bind async tools to the inner async client async_bound: dict[str, ToolSpec] = {} for spec in self.tools.available: bound_spec = copy.copy(spec) @@ -887,7 +886,6 @@ def create_sync_client(self, **kwargs: Any) -> SyncRobotClient: bound_spec._get_status = client._inner._tool_status # type: ignore[attr-defined, ty:unresolved-attribute] async_bound[spec.key] = bound_spec client._inner._bound_tools = async_bound - # Wrap async-bound tools in sync adapters bound: dict[str, ToolSpec] = {} for key, async_tool in async_bound.items(): bound[key] = make_sync_tool(async_tool, _run) diff --git a/parol6/server/async_logging.py b/parol6/server/async_logging.py index 4f62358..f7d1839 100644 --- a/parol6/server/async_logging.py +++ b/parol6/server/async_logging.py @@ -34,7 +34,6 @@ def start(self) -> None: if self._started: return - # Get the root logger's handlers if this logger has none if self._logger.handlers: target_handlers = self._logger.handlers[:] else: @@ -50,21 +49,16 @@ def start(self) -> None: current = current.parent if not target_handlers: - # No handlers found, nothing to wrap return - # Store original handlers self._original_handlers = target_handlers - # Create QueueHandler queue_handler = QueueHandler(self._queue) - # For the controller logger specifically, replace its handlers - # and stop propagation so we control all output + # Stop propagation so this handler controls all output for the logger. self._logger.handlers = [queue_handler] self._logger.propagate = False - # Start listener thread to process queued records self._listener = QueueListener( self._queue, *self._original_handlers, respect_handler_level=True ) @@ -83,7 +77,6 @@ def stop(self) -> None: self._listener.stop() self._listener = None - # Restore propagation and clear our handler self._logger.handlers = [] self._logger.propagate = True diff --git a/parol6/server/cli.py b/parol6/server/cli.py index 57f9a7b..cf4af42 100644 --- a/parol6/server/cli.py +++ b/parol6/server/cli.py @@ -21,7 +21,6 @@ def main() -> int: parser.add_argument("--serial", help="Serial port (e.g., /dev/ttyUSB0 or COM3)") parser.add_argument("--baudrate", type=int, default=3000000, help="Serial baudrate") - # Verbose logging options parser.add_argument( "-v", "--verbose", @@ -122,7 +121,6 @@ def main() -> int: serial_baudrate=args.baudrate, ) - # Create and run controller controller = None def handle_sigterm(signum, frame): @@ -132,7 +130,7 @@ def handle_sigterm(signum, frame): controller.stop() raise SystemExit(0) - # Install signal handler for graceful shutdown on SIGTERM + # Graceful shutdown on SIGTERM signal.signal(signal.SIGTERM, handle_sigterm) try: diff --git a/parol6/server/command_executor.py b/parol6/server/command_executor.py index bea8899..915fc0e 100644 --- a/parol6/server/command_executor.py +++ b/parol6/server/command_executor.py @@ -76,7 +76,7 @@ def __init__(self, state_manager: "StateManager"): def _update_queue_state(self, state: "ControllerState") -> None: """Update queue snapshot and next action in state.""" - # Reuse list to avoid allocation (clear + extend pattern) + # Reuse the list to avoid an allocation on the hot path. state.queue_nonstreamable.clear() for qc in self.command_queue: if not (isinstance(qc.command, MotionCommand) and qc.command.streamable): @@ -109,17 +109,17 @@ def try_stream_fast_path( if not (isinstance(active_inst, MotionCommand) and active_inst.streamable): return False - # Decode incoming command try: cmd_struct = decode_command(data) except Exception as e: logger.debug("Stream fast-path decode failed: %s", e) return False - # Check if struct type matches active command's expected type active_params_type = getattr(active_inst, "PARAMS_TYPE", None) if active_params_type is None or type(cmd_struct) is not active_params_type: - return cmd_struct # Return decoded struct for caller to reuse + return ( + cmd_struct # Hand the decoded struct back so the caller can reuse it. + ) logger.log( TRACE, @@ -128,10 +128,9 @@ def try_stream_fast_path( type(cmd_struct).__name__, ) - # Assign new params (validation already done by decode) + # Validation already happened during decode. active_inst.assign_params(cmd_struct) - # Re-setup with new params try: active_inst.setup(state) logger.log(TRACE, "stream_fast_path applied") @@ -165,12 +164,10 @@ def queue_command( logger.warning("Command queue full (max %d)", MAX_COMMAND_QUEUE_SIZE) raise QueueFullError("Queue full") - # Assign monotonic command index state = self._state_manager.get_state() cmd_index = state.next_command_index state.next_command_index += 1 - # Create queued command queued_cmd = QueuedCommand( command=command, command_id=command_id, @@ -180,7 +177,6 @@ def queue_command( self.command_queue.append(queued_cmd) - # Update queue snapshot self._update_queue_state(state) logger.log( @@ -226,7 +222,6 @@ def execute_active_command(self) -> None: ac.command_index, ) - # Execute one tick if not ac.first_tick_logged: logger.log(TRACE, "tick_start name=%s", type(ac.command).__name__) ac.first_tick_logged = True @@ -296,7 +291,7 @@ def _process_tick_result( state.action_params = "" state.action_state = ActionState.IDLE - # Clear queued streamable commands on failure to prevent pileup + # Drop queued streamable commands so they don't pile up after a failure. if isinstance(ac.command, MotionCommand) and ac.command.streamable: removed = self.clear_streamable_commands( f"Active streamable command failed: {ac.command.robot_error}" diff --git a/parol6/server/command_registry.py b/parol6/server/command_registry.py index fe40116..be31240 100644 --- a/parol6/server/command_registry.py +++ b/parol6/server/command_registry.py @@ -56,13 +56,12 @@ class CommandRegistry: _discovered: bool def __new__(cls) -> CommandRegistry: - """Ensure singleton pattern.""" if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance def __init__(self): - """Initialize the registry (only runs once due to singleton).""" + # Guarded so the singleton only initializes once across repeated __init__ calls. if not hasattr(self, "_initialized"): self._commands = {} self._class_to_type = {} @@ -101,14 +100,12 @@ def register(self, cmd_type: CmdType, command_class: type[CommandBase]) -> None: ) else: self._commands[cmd_type] = command_class - # Maintain reverse mapping for fast class -> type lookup + # Reverse map for fast class -> type lookup. self._class_to_type[command_class] = cmd_type - # Determine and store category at registration time category = self._determine_category(command_class) self._class_to_category[command_class] = category - # Also register by struct type if PARAMS_TYPE is set params_type = getattr(command_class, "PARAMS_TYPE", None) if params_type is not None: self._struct_to_command[params_type] = command_class @@ -127,7 +124,6 @@ def get_command_class(self, cmd_type: CmdType) -> type[CommandBase] | None: Returns: The command class if found, None otherwise """ - # Ensure commands are discovered if not self._discovered: self.discover_commands() @@ -138,7 +134,6 @@ def get_type_for_class(self, cls: type[CommandBase]) -> CmdType | None: Retrieve the registered CmdType for a given command class. Returns None if the class is not registered. """ - # Ensure commands are discovered if not self._discovered: self.discover_commands() # Prefer explicit reverse map; fall back to class attribute set by decorator @@ -151,7 +146,6 @@ def list_registered_commands(self) -> list[CmdType]: Returns: List of CmdType values (sorted by int value) """ - # Ensure commands are discovered if not self._discovered: self.discover_commands() @@ -169,27 +163,25 @@ def discover_commands(self) -> None: logger.debug("Discovering commands...") - # Import parol6.commands package try: commands_package = import_module("parol6.commands") except ImportError as e: logger.error(f"Failed to import parol6.commands: {e}") return - # Iterate through all modules in the commands package package_path = commands_package.__path__ for _, modname, ispkg in pkgutil.iter_modules(package_path): if ispkg: - continue # Skip subpackages + continue full_module_name = f"parol6.commands.{modname}" - # Skip the base module + # base holds the abstract classes, not a registrable command. if modname == "base": continue try: - # Import the module (this triggers decorators) + # Importing the module triggers its @register_command decorators. import_module(full_module_name) logger.debug(f"Imported command module: {full_module_name}") except ImportError as e: @@ -287,7 +279,6 @@ def clear(self) -> None: logger.debug("Command registry cleared") -# Global registry instance _registry = CommandRegistry() @@ -313,14 +304,12 @@ class MoveJCommand(CommandBase): """ def decorator(cls: type[_CmdT]) -> type[_CmdT]: - # Verify it's a CommandBase subclass if not issubclass(cls, CommandBase): raise TypeError(f"Class {cls.__name__} must inherit from CommandBase") - # Register with the global registry _registry.register(cmd_type, cls) - # Add the command type as a class attribute for reference + # Stash on the class as a fallback for get_type_for_class. cls._cmd_type = cmd_type return cls diff --git a/parol6/server/controller.py b/parol6/server/controller.py index 2d42365..21839cf 100644 --- a/parol6/server/controller.py +++ b/parol6/server/controller.py @@ -120,13 +120,11 @@ def __init__(self, config: ControllerConfig): self.state_manager = StateManager() self.udp_transport: UDPTransport | None = None - # E-stop state tracking (start as released to avoid false positive on first check) + # Start as released to avoid a false positive on the first check self.estop_active: bool = False - # Status multicast broadcaster self._status_broadcaster: Any | None = None - # Helper classes self._timer = LoopTimer(self.config.loop_interval) self._phase_timer = PhaseTimer( [ @@ -139,8 +137,8 @@ def __init__(self, config: ControllerConfig): "sim", # tick_simulation ] ) - self._cmd_rate = EventRateMetrics() # Command reception rate - self._gc_tracker = GCTracker() # GC frequency and duration tracking + self._cmd_rate = EventRateMetrics() + self._gc_tracker = GCTracker() self._ack_policy = AckPolicy() self._async_log = AsyncLogHandler() self._transport_mgr = TransportManager( @@ -163,7 +161,6 @@ def __init__(self, config: ControllerConfig): self._tool_cmd_activated: bool = False self._tool_cmd_index: int = -1 - # Initialize components on construction self._initialize_components() def _initialize_components(self) -> None: @@ -174,10 +171,8 @@ def _initialize_components(self) -> None: RuntimeError: If critical components fail to initialize """ try: - # Discover and register all commands discover_commands() - # Initialize UDP transport logger.debug( f"Starting UDP server on {self.config.udp_host}:{self.config.udp_port}" ) @@ -187,13 +182,10 @@ def _initialize_components(self) -> None: if not self.udp_transport.create_socket(): raise RuntimeError("Failed to create UDP socket") - # Initialize robot state self.state_manager.reset_state() - # Initialize serial transport via TransportManager self._transport_mgr.initialize() - # Create status broadcaster try: logger.debug( f"StatusBroadcaster config: group={MCAST_GROUP} port={MCAST_PORT} ttl={MCAST_TTL} iface={MCAST_IF} rate_hz={STATUS_RATE_HZ} stale_s={STATUS_STALE_S}" @@ -249,7 +241,6 @@ def start(self): # Disable automatic GC — collections are deferred to slack time self._gc_tracker.take_control() - # Start main control loop logger.debug("Starting main control loop") self._timer.metrics.mark_started(time.perf_counter()) logger.info( @@ -267,26 +258,23 @@ def stop(self): self.running = False self.shutdown_event.set() - # Stop motion planner subprocess try: self._planner.stop() except Exception as e: logger.debug("Error stopping motion planner: %s", e) - # Stop IK worker subprocess + # close_cache() also tears down the IK worker subprocess try: close_cache() except Exception as e: logger.debug("Error stopping IK worker: %s", e) - # Close status broadcaster try: if self._status_broadcaster: self._status_broadcaster.close() except Exception as e: logger.debug("Error closing status broadcaster: %s", e) - # Clean up transports if self.udp_transport: self.udp_transport.close_socket() @@ -303,7 +291,6 @@ def stop(self): def _read_from_firmware(self, state: ControllerState) -> None: """Phase 1: Poll serial for data, unpack frames, handle auto-reconnect.""" if self._transport_mgr.is_connected(): - # Poll serial for new data self._transport_mgr.poll_serial() try: mv, ver, ts = self._transport_mgr.get_latest_frame() @@ -450,7 +437,6 @@ def _sync_timer_metrics(self, state: ControllerState) -> None: """Copy timing metrics from LoopTimer and PhaseTimer to controller state.""" m = self._timer.metrics - # Check if loop stats reset was requested if state.loop_stats_reset_pending: m.reset_stats(include_counters=True) state.loop_stats_reset_pending = False @@ -497,7 +483,6 @@ def _log_periodic_status(self, state: ControllerState) -> None: gc_dur, ) - # Rate-limited debug log every 3s if not m.should_log(now, 3.0): return diff --git a/parol6/server/ik_layout.py b/parol6/server/ik_layout.py index ba6727b..3a82936 100644 --- a/parol6/server/ik_layout.py +++ b/parol6/server/ik_layout.py @@ -27,11 +27,9 @@ def unregister_shm(shm: SharedMemory) -> None: - """Unregister a SharedMemory segment from the resource_tracker (pre-3.13). - - On Python 3.13+ with track=False this is a no-op. On older versions, this - prevents the resource_tracker daemon from lingering as an orphan process. - Skipped on Windows where the resource_tracker uses _posixsubprocess. + """Unregister a SharedMemory segment from the resource_tracker so its daemon + doesn't linger as an orphan. No-op on 3.13+ (track=False) and on Windows + (where the resource_tracker uses _posixsubprocess). """ if sys.version_info >= (3, 13) or sys.platform == "win32": return diff --git a/parol6/server/ik_worker.py b/parol6/server/ik_worker.py index d8d91a9..df6f331 100644 --- a/parol6/server/ik_worker.py +++ b/parol6/server/ik_worker.py @@ -1,8 +1,8 @@ """ IK Worker subprocess. -This module runs the computationally expensive IK enablement calculations -in a separate process, communicating with the main process via shared memory. +Runs the computationally expensive IK enablement calculations in a separate +process, communicating with the main process via shared memory. """ import logging @@ -54,7 +54,6 @@ def ik_enablement_worker_main( set_pdeathsig() - # Attach to shared memory input_shm = SharedMemory(name=input_shm_name, create=False, **SHM_EXTRA_KWARGS) output_shm = SharedMemory(name=output_shm_name, create=False, **SHM_EXTRA_KWARGS) unregister_shm(input_shm) @@ -100,7 +99,7 @@ def ik_enablement_worker_main( cart_en_wrf[:] = 1 cart_en_trf[:] = 1 - # Initialize robot model in this process + # Robot model must be built inside this subprocess, not inherited. import parol6.PAROL6_ROBOT as PAROL6_ROBOT from parol6.utils.ik import solve_ik @@ -131,14 +130,10 @@ def ik_enablement_worker_main( robot.set_tool_transform(tool_T.copy()) logger.info("IK worker: tool transform updated") - # Input data is already available via views (q_rad, T_matrix) - - # Compute joint enablement if qlim is not None: _compute_joint_enable(q_rad, qlim, joint_en) # else: joint_en stays all ones (pre-allocated default) - # Compute cartesian enablement for both frames _compute_cart_enable( T_matrix, True, @@ -171,11 +166,10 @@ def ik_enablement_worker_main( except Exception as e: logger.exception("IK worker subprocess error: %s", e) finally: - # Release numpy views before closing shared memory + # Release numpy views before closing shared memory. del q_rad, T_flat, T_matrix, tool_T_flat, tool_T del joint_en, cart_en_wrf, cart_en_trf, version_view - # Release memoryviews try: input_mv.release() except BufferError: @@ -296,7 +290,6 @@ def _compute_cart_enable( # Compute all 12 target poses in one numba call _compute_target_poses(T, t_step, r_step, is_wrf, axis_dirs, targets) - # Check IK for each target for i in range(12): try: ik = solve_ik(robot, targets[i], q_rad, quiet_logging=True) diff --git a/parol6/server/loop_timer.py b/parol6/server/loop_timer.py index adb3581..613a0ec 100644 --- a/parol6/server/loop_timer.py +++ b/parol6/server/loop_timer.py @@ -13,21 +13,13 @@ from typing import Self -# ============================================================================= -# Constants for power-of-2 buffer operations -# ============================================================================= # ~5 seconds of data, rounded up to next power of 2 for fast modulo via bitmask _TARGET_BUFFER_SECONDS = 5.0 _raw_size = int(cfg.CONTROL_RATE_HZ * _TARGET_BUFFER_SECONDS) -BUFFER_SIZE = 1 << (_raw_size - 1).bit_length() # Next power of 2 +BUFFER_SIZE = 1 << (_raw_size - 1).bit_length() BUFFER_MASK = BUFFER_SIZE - 1 -# ============================================================================= -# Numba-accelerated statistics functions (cached to disk) -# ============================================================================= - - @njit(cache=True) def _quickselect_partition(arr: np.ndarray, left: int, right: int) -> int: """Partition array around last element as pivot. Returns pivot index.""" @@ -65,7 +57,6 @@ def _compute_phase_stats( if n == 0: return 0.0, 0.0, 0.0 - # Compute mean and max in single pass total = 0.0 max_val = samples[0] for i in range(n): @@ -90,28 +81,25 @@ def _compute_phase_stats( def _compute_loop_stats( samples: np.ndarray, scratch: np.ndarray, n: int ) -> tuple[float, float, float, float, float, float]: - """Compute loop stats using single-pass Welford's algorithm for mean+std. + """Compute loop stats via single-pass Welford for mean+std. - Uses pre-allocated scratch buffer for percentile computation. - Only one copy to scratch (p99 first, then p95 on same data). + Uses the pre-allocated scratch buffer for percentiles (no hot-path alloc): + one copy to scratch, p99 first then p95 on the same data. """ if n == 0: return 0.0, 0.0, 0.0, 0.0, 0.0, 0.0 - # Single-pass Welford's algorithm for mean and variance + min/max mean = 0.0 - m2 = 0.0 # Sum of squared differences + m2 = 0.0 # sum of squared differences min_val = samples[0] max_val = samples[0] for i in range(n): x = samples[i] - # Welford's online algorithm delta = x - mean mean += delta / (i + 1) delta2 = x - mean m2 += delta * delta2 - # Track min/max if x < min_val: min_val = x if x > max_val: @@ -119,17 +107,14 @@ def _compute_loop_stats( std = np.sqrt(m2 / n) if n > 0 else 0.0 - # p95 and p99 via quickselect with single copy if n >= 20: - # Copy to scratch once for i in range(n): scratch[i] = samples[i] - # Compute p99 first (higher index) + # p95 reuses the scratch array post-p99 because k95 < k99 k99 = int(n * 0.99) p99 = _quickselect(scratch[:n], k99) - # Compute p95 on same array (works because k95 < k99) k95 = int(n * 0.95) p95 = _quickselect(scratch[:n], k95) else: @@ -139,11 +124,6 @@ def _compute_loop_stats( return mean, std, min_val, max_val, p95, p99 -# ============================================================================= -# EventRateMetrics - track rate of sporadic events -# ============================================================================= - - @njit(cache=True) def _compute_event_rate( buffer: np.ndarray, count: int, now: float, max_age_s: float @@ -325,7 +305,6 @@ def stats(self, now: float, max_age_s: float = 3.0) -> tuple[float, float]: if now - last_ts > max_age_s: return 0.0, 0.0 - # Compute rate and windowed mean duration cutoff = now - max_age_s total_dur = 0.0 count_in_window = 0 @@ -387,11 +366,6 @@ def shutdown(self) -> None: pass -# ============================================================================= -# PhaseMetrics - regular Python class (no jitclass overhead) -# ============================================================================= - - class PhaseMetrics: """Rolling statistics for a single phase. @@ -521,11 +495,6 @@ def __exit__(self, *args: object) -> None: self._timer.stop() -# ============================================================================= -# LoopMetrics -# ============================================================================= - - class LoopMetrics: """Metrics tracked by the loop timer with rolling statistics. @@ -574,7 +543,6 @@ def __init__(self) -> None: self.max_period_s = 0.0 self.p95_period_s = 0.0 self.p99_period_s = 0.0 - # Overshoot stats self.mean_overshoot_s = 0.0 self.max_overshoot_s = 0.0 self.p99_overshoot_s = 0.0 @@ -676,7 +644,7 @@ def compute_stats(self) -> None: self.p95_period_s = p95 self.p99_period_s = p99 - # Compute overshoot stats using the simpler phase stats function + # overshoot only needs mean/max/p99, so reuse the simpler phase stats if self._overshoot_count > 0: mean, max_val, p99 = _compute_phase_stats( self._overshoot_buffer, self._scratch, self._overshoot_count @@ -700,7 +668,6 @@ def reset_stats(self, include_counters: bool = False) -> None: self.max_period_s = 0.0 self.p95_period_s = 0.0 self.p99_period_s = 0.0 - # Reset overshoot stats self._overshoot_buffer.fill(0.0) self._overshoot_idx = 0 self._overshoot_count = 0 @@ -782,23 +749,20 @@ def wait_for_next_tick(self) -> None: """ self.metrics.loop_count += 1 - # Compute stats periodically (not every loop) if self.metrics.loop_count % self._stats_interval == 0: self.metrics.compute_stats() - # Advance deadline self._next_deadline += self._interval sleep_time = self._next_deadline - time.perf_counter() if sleep_time > self._busy_threshold: - # Sleep for most of the time, leaving headroom for busy-loop + # leave headroom for the busy-loop so the sleep can't overshoot time.sleep(sleep_time - self._busy_threshold) if sleep_time > 0: - # Busy-loop for remaining time (precise timing) + # busy-loop the remainder for precise timing without OS jitter while time.perf_counter() < self._next_deadline: pass - # Track how much we overshot the deadline now = time.perf_counter() self.metrics.record_overshoot(now - self._next_deadline) else: diff --git a/parol6/server/segment_player.py b/parol6/server/segment_player.py index e01b131..12803fd 100644 --- a/parol6/server/segment_player.py +++ b/parol6/server/segment_player.py @@ -89,10 +89,8 @@ def tick(self, state: ControllerState) -> bool: state.queued_duration += seg.duration seg = self._planner.poll_segment() - # Process active segment or activate next max_immediate = 8 # prevent infinite recursion on back-to-back instant commands for _ in range(max_immediate): - # Activate next segment if idle if self._active is None: if not self._buffer: return False @@ -100,7 +98,6 @@ def tick(self, state: ControllerState) -> bool: active = self._active - # --- Trajectory segment: index into waypoints --- if isinstance(active, TrajectorySegment): if self._step < len(active.trajectory_steps): state.Position_out[:] = active.trajectory_steps[self._step] @@ -126,7 +123,6 @@ def tick(self, state: ControllerState) -> bool: state.Command_out = CommandCode.MOVE return True - # --- Inline segment: tick the command --- if isinstance(active, InlineSegment): result = self._tick_inline(active, state) if result is None: @@ -134,7 +130,6 @@ def tick(self, state: ControllerState) -> bool: continue return result - # --- Error segment: halt advance run --- if isinstance(active, ErrorSegment): logger.error( "Command %d failed: %s", active.command_index, active.error @@ -144,13 +139,11 @@ def tick(self, state: ControllerState) -> bool: state.action_current = "" state.action_params = "" self._active = None - # Halt: cancel all remaining planned work self._buffer.clear() self._planner.cancel() self._drain_planner_queue(state) return False - # Unknown segment type logger.error("Unknown segment type: %s", type(active).__name__) self._active = None continue @@ -250,7 +243,6 @@ def cancel(self, state: ControllerState) -> None: self._inline_activated = False self._buffer.clear() self._planner.cancel() - # Drain stale segments from planner output queue self._drain_planner_queue(state) def _drain_planner_queue(self, state: ControllerState) -> None: diff --git a/parol6/server/state.py b/parol6/server/state.py index a45ca5f..a7855da 100644 --- a/parol6/server/state.py +++ b/parol6/server/state.py @@ -141,8 +141,8 @@ def object_detection(self) -> int: class GripperModeResetTracker: """Tracks gripper mode for auto-reset functionality.""" - calibration_sent: bool = False # Flag for calibration mode - error_clear_sent: bool = False # Flag for error clear mode + calibration_sent: bool = False + error_clear_sent: bool = False @dataclass @@ -184,7 +184,6 @@ class ControllerState: # Robot telemetry and command buffers - using ndarray for efficiency Command_out: CommandCode = CommandCode.IDLE # The command code to send to firmware - # int32 joint buffers Position_out: np.ndarray = field( default_factory=lambda: np.zeros((6,), dtype=np.int32) ) @@ -209,7 +208,6 @@ class ControllerState: # Tool teleport: when >= 0, snap gripper to this position (0-255) on next tick tool_teleport_pos: float = -1.0 - # uint8 flag/bitfield buffers Affected_joint_out: np.ndarray = field( default_factory=lambda: np.zeros((8,), dtype=np.uint8) ) diff --git a/parol6/server/status_broadcast.py b/parol6/server/status_broadcast.py index c2828cb..f967ec6 100644 --- a/parol6/server/status_broadcast.py +++ b/parol6/server/status_broadcast.py @@ -60,7 +60,6 @@ def __init__( self._max_send_failures = 3 self._last_fail_log_time = 0.0 - # Setup socket on construction self._setup_socket() def _detect_primary_ip(self) -> str: @@ -179,11 +178,11 @@ def _setup_socket(self) -> None: logger.warning( f"StatusBroadcaster: failed to set IP_MULTICAST_IF: {e2}" ) - # As a last resort, switch to UNICAST try: sock.close() except Exception: pass + # Both multicast-interface attempts failed; fall back to unicast. self._switch_to_unicast() return diff --git a/parol6/server/status_cache.py b/parol6/server/status_cache.py index 0264f0a..bb24b27 100644 --- a/parol6/server/status_cache.py +++ b/parol6/server/status_cache.py @@ -65,7 +65,7 @@ def _unpack_ik_response_into( Returns new version if data was copied, 0 if unchanged. """ - # Version is at offset 36, little-endian uint64 + # Version is a little-endian uint64 at offset 36. version = np.uint64(0) for i in range(8): version |= np.uint64(buf_arr[36 + i]) << np.uint64(i * 8) @@ -130,7 +130,7 @@ class StatusCache: """ def __init__(self) -> None: - # Public snapshots (materialized only when they change) + # Public snapshots, materialized only when they change. self.angles_deg: np.ndarray = np.zeros((6,), dtype=np.float64) self.speeds: np.ndarray = np.zeros((6,), dtype=np.int32) self.speeds_rad_s: np.ndarray = np.zeros((6,), dtype=np.float64) @@ -142,60 +142,52 @@ def __init__(self) -> None: self.tool_status: ToolStatus = ToolStatus() self.last_serial_s: float = 0.0 # last time a fresh serial frame was observed - self._last_tool_name: str = "NONE" # Track tool changes - self._last_tool_variant: str = "" # Track variant changes - self._last_tool_positions: tuple[float, ...] = () # Track tool DOF changes + self._last_tool_name: str = "NONE" + self._last_tool_variant: str = "" + self._last_tool_positions: tuple[float, ...] = () - # Action tracking fields self._action_current: str = "" self._action_params: str = "" self._action_state: ActionState = ActionState.IDLE - # Queue tracking fields self._executing_index: int = -1 self._completed_index: int = -1 self._last_checkpoint: str = "" - # Error tracking field self._error: RobotError | None = None - # Pipeline depth fields self._queued_segments: int = 0 self._queued_duration: float = 0.0 - # Binary cache self._binary_cache: bytes = b"" self._binary_dirty: bool = True - # Change-detection caches to avoid expensive recomputation when inputs unchanged + # Change-detection caches to avoid expensive recomputation when inputs unchanged. self._last_pos_in: np.ndarray = np.zeros((6,), dtype=np.int32) self._last_io_buf: np.ndarray = np.zeros((5,), dtype=np.uint8) - # Pre-allocated buffer for IK request (avoids allocation per position change) + # Pre-allocated buffer for IK request (avoids allocation per position change). self._q_rad_buf: np.ndarray = np.zeros(6, dtype=np.float64) - # Dirty-check: last q_rad submitted to the IK worker + # Dirty-check: last q_rad submitted to the IK worker. self._ik_last_q_rad: np.ndarray = np.full(6, np.nan, dtype=np.float64) - # TCP speed computation state self._prev_tcp_pos: np.ndarray = np.zeros(3, dtype=np.float64) self._tcp_pos_buf: np.ndarray = np.zeros(3, dtype=np.float64) self._tcp_pos_initialized: bool = False self._status_rate_hz: float = _cfg.STATUS_RATE_HZ - # IK enablement results (pre-allocated for zero-alloc reads) + # IK enablement results, pre-allocated for zero-alloc reads. self._joint_en = np.ones(12, dtype=np.uint8) self._cart_en_wrf = np.ones(12, dtype=np.uint8) self._cart_en_trf = np.ones(12, dtype=np.uint8) - # IK worker subprocess state self._ik_stopped = False self._ik_last_version = 0 shm_suffix = f"_{id(self)}" input_name = f"parol6_ik_in{shm_suffix}" output_name = f"parol6_ik_out{shm_suffix}" - # Create shared memory segments self._ik_input_shm = SharedMemory( name=input_name, create=True, size=IK_INPUT_SIZE, **SHM_EXTRA_KWARGS ) @@ -211,15 +203,14 @@ def __init__(self) -> None: assert input_buf is not None assert output_buf is not None - # Initialize with zeros np.frombuffer(input_buf, dtype=np.uint8)[:] = 0 np.frombuffer(output_buf, dtype=np.uint8)[:] = 0 - # Memoryviews for cleanup + # Retained so the buffers can be released before unlinking the segments. self._ik_input_mv = memoryview(input_buf) self._ik_output_mv = memoryview(output_buf) - # Zero-alloc input views: write directly into shared memory + # Zero-alloc input views: write directly into shared memory. self._ik_input_q_view = np.frombuffer( input_buf, dtype=np.float64, @@ -232,20 +223,19 @@ def __init__(self) -> None: count=16, offset=IK_INPUT_T_OFFSET, ) - # Tool transform view for syncing tool changes to IK worker + # Syncs tool changes to the IK worker. self._ik_input_tool_view = np.frombuffer( input_buf, dtype=np.float64, count=16, offset=IK_INPUT_TOOL_OFFSET, ) - # Initialize to identity (no tool) + # Identity = no tool. self._ik_input_tool_view.reshape(4, 4)[:] = np.eye(4) - # Zero-alloc output view for numba reader + # Zero-alloc output view for numba reader. self._ik_output_arr = np.frombuffer(output_buf, dtype=np.uint8) - # Spawn subprocess self._ik_shutdown_event: Event = multiprocessing.Event() self._ik_request_event: Event = multiprocessing.Event() self._ik_process: Process = Process( @@ -268,10 +258,8 @@ def _stop_ik_worker(self) -> None: return self._ik_stopped = True - # Signal shutdown self._ik_shutdown_event.set() - # Wait for process to exit if self._ik_process.is_alive(): self._ik_process.join(timeout=2.0) if self._ik_process.is_alive(): @@ -284,13 +272,12 @@ def _stop_ik_worker(self) -> None: while self._ik_process.exitcode is None and time.time() < deadline: time.sleep(0.01) - # Release numpy views before closing shared memory + # Release numpy views and memoryviews before closing shared memory. del self._ik_input_q_view del self._ik_input_T_view del self._ik_input_tool_view del self._ik_output_arr - # Release memoryviews try: self._ik_input_mv.release() except BufferError: @@ -300,7 +287,6 @@ def _stop_ik_worker(self) -> None: except BufferError: pass - # Clean up shared memory _cleanup_shm(self._ik_input_shm) _cleanup_shm(self._ik_output_shm) logger.debug("IK worker stopped") @@ -351,7 +337,6 @@ def update_from_state(self, state: ControllerState) -> None: - Tool status populated via tool config's populate_status() - IK enablement is computed asynchronously in a subprocess """ - # Do change detection self._last_io_buf[:] = state.InOut_in[:5] pos_changed, io_changed, spd_changed = _update_arrays( state.Position_in, @@ -368,7 +353,6 @@ def update_from_state(self, state: ControllerState) -> None: or state.current_tool_variant != self._last_tool_variant ) - # Convert speeds from steps/s to rad/s when they change if spd_changed: speed_steps_to_rad(self.speeds, self.speeds_rad_s) @@ -378,7 +362,7 @@ def update_from_state(self, state: ControllerState) -> None: self._tcp_pos_initialized = ( False # avoid speed spike from TCP offset change ) - # Sync tool transform to IK worker + # Sync tool transform to IK worker. T_tool = get_tool_transform( state.current_tool, variant_key=state.current_tool_variant ) @@ -402,10 +386,9 @@ def update_from_state(self, state: ControllerState) -> None: self._tcp_pos_initialized = True self._prev_tcp_pos[:] = self._tcp_pos_buf else: - # Robot not moving — reset TCP speed to zero + # Robot not moving — reset TCP speed to zero. self.tcp_speed = 0.0 - # Submit IK request asynchronously try: T_matrix = get_fkine_se3(state) self._submit_ik_request(self._q_rad_buf, T_matrix) @@ -433,7 +416,6 @@ def update_from_state(self, state: ControllerState) -> None: if tool_status_changed: self._last_tool_positions = ts.positions - # Poll for async IK results (non-blocking, zero-alloc) ik_changed = self._poll_ik_results() action_changed = ( @@ -468,7 +450,6 @@ def update_from_state(self, state: ControllerState) -> None: self._queued_segments = state.queued_segments self._queued_duration = state.queued_duration - # Mark binary cache dirty if anything changed if ( pos_changed or tool_changed diff --git a/parol6/server/transport_manager.py b/parol6/server/transport_manager.py index 31877a3..496f74e 100644 --- a/parol6/server/transport_manager.py +++ b/parol6/server/transport_manager.py @@ -60,7 +60,6 @@ def initialize(self) -> bool: except Exception as e: logger.debug("Failed to load persisted COM port: %s", e) - # Create transport if self.serial_port or is_simulation_mode(): self.transport = create_and_connect_transport( port=self.serial_port, @@ -166,21 +165,18 @@ def switch_simulator_mode( # Update env to drive transport_factory.is_simulation_mode() os.environ["PAROL6_FAKE_SERIAL"] = "1" if enable else "0" - # Disconnect existing transport if self.transport: try: self.transport.disconnect() except Exception as e: logger.debug("Transport disconnect: %s", e) - # Recreate transport according to new mode self.transport = create_and_connect_transport( port=self.serial_port, baudrate=self.serial_baudrate, auto_find_port=True, ) - # If enabling simulator, sync to last known controller state if ( enable and sync_state is not None @@ -197,7 +193,6 @@ def switch_simulator_mode( self.first_frame_received = False logger.info("Simulator mode toggled to %s", "on" if enable else "off") - # Wait for first frame with timeout if not self._wait_for_first_frame(timeout=0.5): logger.warning( "Timeout waiting for first frame after SIMULATOR toggle" diff --git a/parol6/server/transports/mock_serial_transport.py b/parol6/server/transports/mock_serial_transport.py index 4434bd2..5f43392 100644 --- a/parol6/server/transports/mock_serial_transport.py +++ b/parol6/server/transports/mock_serial_transport.py @@ -61,11 +61,9 @@ def _simulate_motion_jit( Note: CommandCode enums are used directly inside the function (resolved at compile time). Passing enums as arguments would add ~90µs overhead per call. """ - # Handle homing countdown if homing_countdown > 0: homing_countdown -= 1 if homing_countdown == 0: - # Homing complete homed_in.fill(1) for i in range(6): steps = cfg.deg_to_steps_scalar(home_angles_deg[i], i) @@ -77,7 +75,6 @@ def _simulate_motion_jit( # Ensure E-stop stays released io_in[4] = 1 - # Simulate motion based on command type if command_out == CommandCode.HOME: if homing_countdown == 0: for i in range(6): @@ -189,7 +186,6 @@ def _write_frame_jit( state_position_out[:] = position_out state_speed_out[:] = speed_out - # Simulate gripper state updates if gripper_data_out[4] == 1: # Calibration mode state_gripper_data_in[0] = gripper_data_out[5] state_gripper_data_in[4] = 0x40 @@ -249,7 +245,7 @@ def _simulate_gripper_ramp_jit( gripper_pos_f += pos_delta else: gripper_pos_f -= pos_delta - gripper_data_in[1] = int(gripper_pos_f + 0.5) # round to int + gripper_data_in[1] = int(gripper_pos_f + 0.5) return gripper_pos_f @@ -307,23 +303,19 @@ class MockRobotState: ) # Joint speeds (in steps/sec) speed_in: np.ndarray = field(default_factory=lambda: np.zeros((6,), dtype=np.int32)) - # Homed status per joint homed_in: np.ndarray = field(default_factory=lambda: np.zeros((8,), dtype=np.uint8)) - # I/O states io_in: np.ndarray = field( default_factory=lambda: np.zeros((8,), dtype=np.uint8) ) # E-stop released io_out: np.ndarray = field( default_factory=lambda: np.zeros((8,), dtype=np.uint8) ) # Commanded outputs (echoed from controller) - # Error states temperature_error_in: np.ndarray = field( default_factory=lambda: np.zeros((8,), dtype=np.uint8) ) position_error_in: np.ndarray = field( default_factory=lambda: np.zeros((8,), dtype=np.uint8) ) - # Gripper state gripper_data_in: np.ndarray = field( default_factory=lambda: np.zeros((6,), dtype=np.int32) ) @@ -336,7 +328,6 @@ class MockRobotState: # Generalized tool ramp for binary-activation tools (pneumatic grippers, etc.) tool_ramp_target: float = 0.0 # target normalized position (0..1) tool_ramp_current: float = 0.0 # current ramp progress (0..1) - # Timing data timing_data_in: np.ndarray = field( default_factory=lambda: np.zeros((1,), dtype=np.int32) ) @@ -362,7 +353,6 @@ def __post_init__(self): deg = float(cfg.STANDBY_ANGLES_DEG[i]) steps = int(cfg.deg_to_steps_scalar(deg, i)) self.position_in[i] = steps - # Initialize float accumulator from integer steps self.position_f = self.position_in.astype(np.float64) # Ensure E-stop is not pressed (bit 4 = 1 means released) @@ -394,16 +384,12 @@ def __init__( self.baudrate = baudrate self.timeout = timeout - # Internal robot state self._state = MockRobotState() - # Frame generation tracking self._frame_interval = cfg.INTERVAL_S # match control loop cadence - # Connection state self._connected = False - # Statistics self._frames_received = 0 # Latest-frame infrastructure (simulation publishes into this buffer) @@ -449,7 +435,7 @@ def connect(self, port: str | None = None) -> bool: self.port = port self._connected = True - self._state = MockRobotState() # Reset state on connect + self._state = MockRobotState() self._simulator = None # Force re-resolve on next tick self._simulator_tool_name = "" # Initialize time base to perf_counter for consistent scheduling @@ -457,7 +443,6 @@ def connect(self, port: str | None = None) -> bool: logger.debug(f"MockSerialTransport connected to simulated port: {self.port}") return True - # Allow controller to sync the simulator pose/homing from live controller state def sync_from_controller_state(self, state: ControllerState) -> None: """ Synchronize the mock robot internal state from a controller state snapshot. diff --git a/parol6/server/transports/serial_transport.py b/parol6/server/transports/serial_transport.py index 22ab705..ace4b03 100644 --- a/parol6/server/transports/serial_transport.py +++ b/parol6/server/transports/serial_transport.py @@ -69,7 +69,7 @@ def _parse_frames_njit( if avail < 4: break - # Find start sequence + # Find start sequence (0xFF 0xFF 0xFF) found = False while avail >= 3: if ( @@ -90,7 +90,7 @@ def _parse_frames_njit( if avail < total_needed: break - # Validate end markers + # Validate end markers (0x01 0x02) if length >= 2: e0 = rb[(tail + 4 + length - 2) % cap] e1 = rb[(tail + 4 + length - 1) % cap] @@ -98,7 +98,7 @@ def _parse_frames_njit( tail = (tail + 1) % cap continue - # Extract payload (first 52 bytes) + # Extract payload (fixed 52-byte status frame) payload_len = 52 if length >= 52 else length start = (tail + 4) % cap for i in range(payload_len): @@ -144,10 +144,12 @@ def __init__( self.timeout = timeout self.serial: serial.Serial | None = None self.last_reconnect_attempt = 0.0 - self.reconnect_interval = 1.0 # seconds between reconnect attempts - self._reconnect_failures = 0 # count consecutive failures to reduce log spam + self.reconnect_interval = 1.0 # seconds + self._reconnect_failures = ( + 0 # consecutive failures; throttles reconnect log level + ) - # Reduced-copy latest-frame infrastructure (poll_read will publish here) + # Reduced-copy latest-frame infrastructure; poll_read publishes here self._scratch = np.zeros(4096, dtype=np.uint8) self._scratch_mv = memoryview(self._scratch.data) # Fixed-size ring buffer for RX stream (drop-oldest on overflow) @@ -181,11 +183,10 @@ def connect(self, port: str | None = None) -> bool: Returns: True if connection successful, False otherwise """ - # Reset failure counter on explicit connect call self._reconnect_failures = 0 success = self._connect_internal(port, quiet=False) if not success: - # Mark as failed so auto_reconnect logs at DEBUG level + # so auto_reconnect logs subsequent attempts at DEBUG level self._reconnect_failures = 1 return success @@ -230,7 +231,6 @@ def auto_reconnect(self) -> bool: self.last_reconnect_attempt = now if self.port: - # Log at INFO only on first attempt, DEBUG on subsequent log_level = logging.DEBUG if self._reconnect_failures > 0 else logging.INFO logger.log(log_level, f"Attempting to reconnect to {self.port}...") success = self._connect_internal( @@ -268,11 +268,9 @@ def _connect_internal(self, port: str | None, quiet: bool = False) -> bool: return False try: - # Close existing connection if any if self.serial and self.serial.is_open: self.serial.close() - # Create new connection self.serial = serial.Serial( port=self.port, baudrate=self.baudrate, timeout=self.timeout ) @@ -328,7 +326,6 @@ def write_frame( return False try: - # Write to serial using preallocated buffer and zero-alloc pack ser = self.serial if ser is None: return False @@ -348,7 +345,6 @@ def write_frame( except serial.SerialException as e: logger.error(f"Serial write error: {e}") - # Mark connection as lost self.disconnect() return False except Exception as e: @@ -376,7 +372,6 @@ def poll_read(self) -> bool: if n is None or n <= 0: return False - # Append to ring buffer and parse self._append_to_ring(n) self._parse_ring_for_frames() return True @@ -426,24 +421,15 @@ def get_latest_frame_view(self) -> tuple[memoryview | None, int, float]: return (mv, self._frame_version, self._frame_ts) def _update_hz_tracking(self) -> None: - """ - Update EMA Hz tracking and print debug info periodically. - - This method calculates the instantaneous Hz based on time between messages, - updates the EMA (Exponential Moving Average), tracks min/max values, - and prints debug info every few seconds. - """ + """Tally RX messages and log average Hz every _print_interval seconds.""" current_time = time.perf_counter() - # Increment message counters self._rx_msg_count += 1 self._interval_msg_count += 1 - # Check if it's time to print debug info if self._last_print_time == 0.0: self._last_print_time = current_time elif current_time - self._last_print_time >= self._print_interval: - # Print debug information if self._interval_msg_count > 0: avg_hz = self._interval_msg_count / ( current_time - self._last_print_time @@ -456,6 +442,5 @@ def _update_hz_tracking(self) -> None: f"Serial RX Stats - No messages received in last {self._print_interval:.1f}s" ) - # Reset interval statistics self._last_print_time = current_time self._interval_msg_count = 0 diff --git a/parol6/server/transports/transport_factory.py b/parol6/server/transports/transport_factory.py index 938e1f9..c816efe 100644 --- a/parol6/server/transports/transport_factory.py +++ b/parol6/server/transports/transport_factory.py @@ -1,10 +1,4 @@ -""" -Transport factory for creating appropriate transport instances. - -This module provides a factory pattern for creating transport instances -based on configuration and environment. It automatically selects between -real serial or mock serial (inline simulation) transport types. -""" +"""Factory for selecting between real serial and mock (inline simulation) transports based on configuration and environment.""" import logging import os @@ -18,12 +12,7 @@ def is_simulation_mode() -> bool: - """ - Check if simulation mode is enabled. - - Returns: - True if simulation mode is enabled via environment variable - """ + """Whether simulation mode is enabled via the PAROL6_FAKE_SERIAL environment variable.""" fake_serial = str(os.getenv("PAROL6_FAKE_SERIAL", "0")).lower() return fake_serial in ("1", "true", "yes", "on") @@ -51,15 +40,12 @@ def create_transport( Returns: Transport instance (SerialTransport or MockSerialTransport) """ - # Determine transport type if transport_type is None: - # Auto-detect based on environment if is_simulation_mode(): transport_type = "mock" else: transport_type = "serial" - # Create appropriate transport if transport_type == "mock": logger.debug("Creating MockSerialTransport for simulation") transport: MockSerialTransport | SerialTransport = MockSerialTransport( @@ -110,15 +96,12 @@ def create_and_connect_transport( # For real serial, handle port finding if not port and auto_find_port: - # Try to load saved port port = get_com_port_with_fallback() if port: logger.debug(f"Using saved serial port: {port}") - # Create transport transport = create_transport(transport_type, port=port, baudrate=baudrate, **kwargs) - # Attempt connection if port is known if port: if transport.connect(port): return transport diff --git a/parol6/server/transports/udp_transport.py b/parol6/server/transports/udp_transport.py index 2d0879f..7c2e571 100644 --- a/parol6/server/transports/udp_transport.py +++ b/parol6/server/transports/udp_transport.py @@ -43,7 +43,7 @@ def __init__( self._running = False self._rx = bytearray(self.buffer_size) self._rxv = memoryview(self._rx) - # Pre-allocated buffer for poll_receive_all (avoids list allocation per call) + # Reused by poll_receive_all to avoid a list allocation per call. self._recv_all_buf: list[tuple[bytes, tuple[str, int]]] = [] def create_socket(self) -> bool: @@ -54,10 +54,9 @@ def create_socket(self) -> bool: True if successful, False otherwise """ try: - # Create UDP socket self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - # Non-blocking mode for polling + # Non-blocking so the control loop can poll instead of blocking. self.socket.setblocking(False) # Allow address/port reuse for fast restarts diff --git a/parol6/tools.py b/parol6/tools.py index 42f0b2f..0574e21 100644 --- a/parol6/tools.py +++ b/parol6/tools.py @@ -117,10 +117,10 @@ class PneumaticGripperConfig(ToolConfig): def populate_status(self, hw: ControllerState, out: ToolStatus) -> None: port_idx = 2 if self.io_port == 1 else 3 - # Simulator writes ramped position into Gripper_data_in[1] (0-255). - # Real hardware has no position feedback — use valve output directly. - # Convention: 0.0 = open, 1.0 = closed. Pneumatic simulator ramps - # toward 255 for open, so we invert. + # Simulator writes the ramped position (0-255) into Gripper_data_in[1]; + # real hardware has no position feedback, so fall back to the valve + # output. Convention: 0.0 = open, 1.0 = closed; the simulator ramps + # toward 255 for open, hence the inversion. pos_byte = hw.Gripper_data_in[1] if pos_byte > 0 or hw.InOut_out[port_idx] == 0: out.positions = (1.0 - float(pos_byte) / 255.0,) @@ -226,7 +226,6 @@ def estimate_duration(self, action: str, params: list) -> float: min_tps, max_tps = self.firmware_speed_range_tps velocity_tps = min_tps + (speed_byte / 255.0) * (max_tps - min_tps) - # Compute tick_range from mechanical params travel_mm = 0.0 for m in self.motions: if isinstance(m, LinearMotion): @@ -275,28 +274,26 @@ def resolve_params(self, cfg: ToolConfig) -> None: if not isinstance(cfg, PneumaticGripperConfig): return - # Find first LinearMotion with estimated speed for m in cfg.motions: if isinstance(m, LinearMotion) and m.estimated_speed_m_s: - # Normalized speed: fraction of full travel per second + # Normalized speed: fraction of full travel per second. self._ramp_speed = m.estimated_speed_m_s / m.travel_m break if self._ramp_speed > 0: - # Map io_port to InOut_out index (port 1 -> index 2, port 2 -> index 3) + # Map io_port to InOut_out index (port 1 -> index 2, port 2 -> index 3). self._io_port = cfg.io_port + 1 def tick(self, state: MockRobotState, dt: float) -> None: if self._io_port < 0: return - # Read commanded I/O output to determine target (0=closed, 1=open) + # Commanded I/O output sets the target: 0 = closed, 1 = open. io_val = float(state.io_out[self._io_port]) target = 1.0 if io_val > 0 else 0.0 if target != state.tool_ramp_target: state.tool_ramp_target = target - # Ramp toward target error = state.tool_ramp_target - state.tool_ramp_current if abs(error) < 1e-6: return @@ -308,10 +305,10 @@ def tick(self, state: MockRobotState, dt: float) -> None: else: state.tool_ramp_current -= step - # Write ramped position as byte into gripper_data_in[1] (same slot electric uses) + # gripper_data_in[1] is the same slot the electric simulator uses. state.gripper_data_in[1] = int(state.tool_ramp_current * 255.0 + 0.5) - # Update part detection input when ramp completes + # Update part-detection input once the ramp reaches the target. if abs(state.tool_ramp_current - state.tool_ramp_target) < 1e-6: det_idx = self._io_port state.io_in[det_idx] = 1 if state.tool_ramp_target < 0.5 else 0 @@ -335,7 +332,6 @@ def __init__(self) -> None: def resolve_params(self, cfg: ToolConfig) -> None: if not isinstance(cfg, ElectricGripperConfig): return - # Find jaw travel from default motion for m in cfg.motions: if isinstance(m, LinearMotion): travel_mm = m.travel_m * 1000.0 @@ -488,8 +484,8 @@ def _make_tcp_transform( _TCP_RPY = (0.0, 0.0, 0.0) # All PAROL6 tool meshes were designed with Rx(π) in the kinematic chain. -# Now that the kinematic transform is pure translation (for correct IK), -# the rotation is applied to the mesh definitions instead. +# The kinematic transform is pure translation (for correct IK), so the +# rotation lives on the mesh definitions instead. _MESH_RPY = (math.pi, 0.0, 0.0) diff --git a/parol6/utils/ik.py b/parol6/utils/ik.py index b856e23..58e102e 100644 --- a/parol6/utils/ik.py +++ b/parol6/utils/ik.py @@ -182,7 +182,6 @@ def solve_ik( result.violations = None if result.success: - # JIT-compiled safety validation ok, is_recovery, idx = _ik_safety_check( result.q, current_q, @@ -216,9 +215,6 @@ def solve_ik( return result -# ----------------------------- -# Fast, vectorized limit checking with edge-triggered logging -# ----------------------------- # Pre-allocated buffers for check_limits (avoid per-call allocation) _cl_viol = np.zeros(6, dtype=np.bool_) _cl_below = np.zeros(6, dtype=np.bool_) @@ -328,7 +324,6 @@ def check_limits( if log: any_viol = not all_ok - # Edge-triggered violation logs if any_viol and ( np.any(_cl_viol != _last_violation_mask) or not _last_any_violation ): diff --git a/parol6/utils/warmup.py b/parol6/utils/warmup.py index b4d3739..6717ff3 100644 --- a/parol6/utils/warmup.py +++ b/parol6/utils/warmup.py @@ -90,8 +90,8 @@ def warmup_jit() -> float: start = time.perf_counter() def _progress(label: str) -> None: - # Only chatter when it's genuinely a slow cold compile, so warm-cache - # starts stay quiet while a cold start visibly shows it isn't frozen. + # Only chatter on a genuinely slow cold compile: warm-cache starts stay + # quiet while a cold start visibly shows it isn't frozen. elapsed = time.perf_counter() - start if elapsed > 1.0: logger.info(" ...JIT warmup: %s ready (%.1fs)", label, elapsed) @@ -156,14 +156,14 @@ def _progress(label: str) -> None: # parol6/server/status_cache.py dummy_5u8 = np.zeros(5, dtype=np.uint8) _update_arrays( - dummy_6i, # pos_in - dummy_5u8, # io_in - dummy_6i, # spd_in - dummy_6i, # pos_last - dummy_6f, # angles_deg - dummy_6f, # q_rad_buf - dummy_5u8, # io_cached - dummy_6i, # spd_cached + dummy_6i, + dummy_5u8, + dummy_6i, + dummy_6i, + dummy_6f, + dummy_6f, + dummy_5u8, + dummy_6i, ) # Dummy SE3 matrices for jit warmups below @@ -191,15 +191,16 @@ def _progress(label: str) -> None: dummy_tx_frame = memoryview(bytearray(64)) dummy_gripper_data = np.zeros(6, dtype=np.int32) dummy_8u8_bitfield = np.zeros(8, dtype=np.uint8) + # bitfield args need 8 elements for _pack_bitfield pack_tx_frame_into( - dummy_tx_frame, # out - dummy_6i, # position_out - dummy_6i, # speed_out - 0, # command_code - dummy_8u8_bitfield, # affected_joint_out (8 elements for _pack_bitfield) - dummy_8u8_bitfield, # inout_out (8 elements for _pack_bitfield) - 0, # timeout_out - dummy_gripper_data, # gripper_data_out + dummy_tx_frame, + dummy_6i, + dummy_6i, + 0, + dummy_8u8_bitfield, + dummy_8u8_bitfield, + 0, + dummy_gripper_data, ) dummy_rx_frame = memoryview(bytearray(64)) dummy_8u8_homed = np.zeros(8, dtype=np.uint8) @@ -209,15 +210,15 @@ def _progress(label: str) -> None: dummy_timing_out = np.zeros(1, dtype=np.int32) dummy_grip_out = np.zeros(6, dtype=np.int32) unpack_rx_frame_into( - dummy_rx_frame, # data - dummy_6i, # pos_out - dummy_6i, # spd_out - dummy_8u8_homed, # homed_out - dummy_8u8_io, # io_out - dummy_8u8_temp, # temp_out - dummy_8u8_poserr, # poserr_out - dummy_timing_out, # timing_out - dummy_grip_out, # grip_out + dummy_rx_frame, + dummy_6i, + dummy_6i, + dummy_8u8_homed, + dummy_8u8_io, + dummy_8u8_temp, + dummy_8u8_poserr, + dummy_timing_out, + dummy_grip_out, ) # parol6/server/transports/serial_transport.py - real-hardware frame I/O. @@ -232,7 +233,7 @@ def _progress(label: str) -> None: # parol6/server/loop_timer.py - stats computation dummy_1000f = np.zeros(1000, dtype=np.float64) dummy_1000f_scratch = np.zeros(1000, dtype=np.float64) - # Fill with some data for realistic warmup + # Fill with realistic timing data so the stats kernels warm a real code path dummy_1000f[:100] = np.linspace(0.004, 0.006, 100) _quickselect_partition(dummy_1000f_scratch[:10].copy(), 0, 9) _quickselect(dummy_1000f_scratch[:100].copy(), 50) @@ -245,54 +246,55 @@ def _progress(label: str) -> None: dummy_8u8 = np.zeros(8, dtype=np.uint8) dummy_gripper_6i = np.zeros(6, dtype=np.int32) _simulate_motion_jit( - dummy_pos_f, # position_f - dummy_6i, # position_in - dummy_6i, # speed_in - dummy_6i, # speed_out - dummy_6i, # position_out - dummy_8u8, # homed_in - dummy_8u8, # io_in - dummy_6f.copy(), # prev_pos_f - dummy_6f.copy(), # vmax_f - dummy_6f.copy(), # jmin_f - dummy_6f.copy(), # jmax_f - dummy_6f.copy(), # home_angles_deg - 0, # command_out - 0.004, # dt - 0, # homing_countdown + dummy_pos_f, + dummy_6i, + dummy_6i, + dummy_6i, + dummy_6i, + dummy_8u8, + dummy_8u8, + dummy_6f.copy(), + dummy_6f.copy(), + dummy_6f.copy(), + dummy_6f.copy(), + dummy_6f.copy(), + 0, + 0.004, + 0, ) dummy_gripper_ramp = np.zeros(3, dtype=np.float64) _write_frame_jit( - dummy_6i, # state_position_out - dummy_6i, # state_speed_out - dummy_gripper_6i, # state_gripper_data_in - dummy_6i, # position_out - dummy_6i, # speed_out - dummy_gripper_6i, # gripper_data_out - dummy_gripper_ramp, # gripper_ramp + dummy_6i, + dummy_6i, + dummy_gripper_6i, + dummy_6i, + dummy_6i, + dummy_gripper_6i, + dummy_gripper_ramp, ) _simulate_gripper_ramp_jit( - dummy_gripper_ramp, # gripper_ramp - dummy_gripper_6i, # gripper_data_in - 0.0, # gripper_pos_f - 0.004, # dt - 10432.0, # tick_range - 40.0, # min_speed - 80000.0, # max_speed + dummy_gripper_ramp, + dummy_gripper_6i, + 0.0, + 0.004, + 10432.0, + 40.0, + 80000.0, ) dummy_payload = memoryview(bytearray(64)) dummy_timing = np.zeros(1, dtype=np.int32) dummy_gripper_in = np.zeros(6, dtype=np.int32) + # io_in needs 8 elements for _pack_bitfield _encode_payload_jit( - dummy_payload, # out - dummy_6i, # position_in - dummy_6i, # speed_in - dummy_8u8, # homed_in - dummy_8u8, # io_in (8 elements for _pack_bitfield) - dummy_8u8, # temp_err_in - dummy_8u8, # pos_err_in - dummy_timing, # timing_in - dummy_gripper_in, # gripper_in + dummy_payload, + dummy_6i, + dummy_6i, + dummy_8u8, + dummy_8u8, + dummy_8u8, + dummy_8u8, + dummy_timing, + dummy_gripper_in, ) _progress("simulator & I/O")