Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 7 additions & 30 deletions parol6/PAROL6_ROBOT.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,18 @@

logger = logging.getLogger(__name__)

# -----------------------------
# Typing aliases
# -----------------------------
Vec6f = NDArray[np.float64]
Vec6i = NDArray[np.int32]
Limits2f = NDArray[np.float64] # shape (6,2)

# -----------------------------
# Kinematics and conversion constants
# -----------------------------
Microstep = 32
steps_per_revolution = 200

# Conversion constants
degree_per_step_constant: float = 360.0 / (Microstep * steps_per_revolution)
radian_per_step_constant: float = (2.0 * np.pi) / (Microstep * steps_per_revolution)
radian_per_sec_2_deg_per_sec_const: float = 360.0 / (2.0 * np.pi)
deg_per_sec_2_radian_per_sec_const: float = (2.0 * np.pi) / 360.0

# -----------------------------
# Joint limits
# -----------------------------
# Limits (deg) you get after homing and moving to extremes
_joint_limits_degree: Limits2f = np.array(
[
Expand All @@ -52,12 +42,12 @@
_joint_limits_radian: Limits2f = np.deg2rad(_joint_limits_degree)


# URDF path for pinokin Robot
# URDF consumed by the pinokin Robot below.
_urdf_path = str(
Path(__file__).resolve().parent / "urdf_model" / "urdf" / "PAROL6.urdf"
)

# Current robot instance (tool transform applied in-place)
# Tool transform is applied in-place on this shared instance.
robot: Robot = Robot(_urdf_path)


Expand All @@ -66,17 +56,10 @@ def apply_tool(
variant_key: str = "",
tcp_offset_m: tuple[float, float, float] | None = None,
) -> None:
"""
Apply tool transform to the robot model.

Parameters
----------
tool_name : str
Name of the tool from the tool registry
variant_key : str
Optional variant key for the tool
tcp_offset_m : tuple, optional
Additional (x, y, z) offset in meters, composed in the tool's local frame.
"""Apply tool transform to the robot model.

``tcp_offset_m`` is an additional (x, y, z) offset in meters, composed in
the tool's local frame.
"""
T_tool = get_tool_transform(tool_name, variant_key=variant_key or None)

Expand All @@ -96,7 +79,6 @@ def apply_tool(
logger.info(f"Applied tool {label} (identity)")


# Initialize with no tool
apply_tool("NONE")


Expand All @@ -106,9 +88,6 @@ def _cleanup_robot() -> None:
del robot


# -----------------------------
# Additional raw parameter arrays
# -----------------------------
# Reduction ratio per joint
_joint_ratio: NDArray[np.float64] = np.array(
[6.4, 20.0, 20.0 * (38.0 / 42.0), 4.0, 4.0, 10.0], dtype=np.float64
Expand All @@ -120,10 +99,9 @@ def _cleanup_robot() -> None:
)
_joint_min_speed: Vec6i = np.array([100, 100, 100, 100, 100, 100], dtype=np.int32)

# Effective max speeds with scaling applied
_joint_max_speed: Vec6i = _joint_max_speed_hw.copy()

# Jog speeds (steps/s) - 80% of scaled max for safety margin during jogging
# 80% of scaled max for safety margin during jogging
_joint_max_jog_speed: Vec6i = (_joint_max_speed * 0.8).astype(np.int32)
_joint_min_jog_speed: Vec6i = np.array([100, 100, 100, 100, 100, 100], dtype=np.int32)

Expand All @@ -135,7 +113,6 @@ def _cleanup_robot() -> None:
# Derived: j_max = a_max * 10 (reach max accel in ~0.1s)
_joint_max_jerk: Vec6i = (_joint_max_acc * 10).astype(np.int32)

# Compute joint angular velocities/accelerations in rad/s
_joint_speed_rad = (
_joint_max_speed.astype(float) * radian_per_step_constant / _joint_ratio
)
Expand Down
8 changes: 3 additions & 5 deletions parol6/ack_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,25 +90,23 @@ def __init__(

def requires_ack(self, cmd_type: CmdType) -> bool:
"""Check if a command type requires an ACK response."""
# Forced override (e.g., diagnostics)
# Forced override (e.g., diagnostics) takes precedence over everything
if self._force_ack is not None:
return bool(self._force_ack)

# System commands always require ACKs
if cmd_type in SYSTEM_CMD_TYPES:
return True

# Query commands use request/response, not ACKs
# Queries use request/response, not ACKs
if cmd_type in QUERY_CMD_TYPES:
return False

# Streaming commands are fire-and-forget
if cmd_type in FIRE_AND_FORGET:
return False

# Queued motion commands require ACK (returns command index)
# Queued motion commands ACK to return the command index
if cmd_type in QUEUED_CMD_TYPES:
return True

# Default: no ACK
return False
30 changes: 9 additions & 21 deletions parol6/client/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def datagram_received(self, data: bytes, addr: tuple[str, int]) -> None:
try:
self.rx_queue.put_nowait((data, addr))
except asyncio.QueueFull:
pass # Drop packet when queue is full (expected under load)
pass # Dropping under backpressure is expected

def error_received(self, exc: Exception) -> None:
pass
Expand All @@ -133,21 +133,18 @@ def _create_multicast_socket(group: str, port: int, iface_ip: str) -> socket.soc
"""Create and configure a multicast socket with loopback-first semantics."""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)

# Allow multiple listeners on same port
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except AttributeError:
pass
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1 << 20)

# Bind to port
try:
sock.bind(("", port))
except OSError:
sock.bind((iface_ip, port))

# Helper to detect primary NIC IP
def _detect_primary_ip() -> str:
tmp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
Expand All @@ -159,7 +156,7 @@ def _detect_primary_ip() -> str:
with contextlib.suppress(Exception):
tmp.close()

# Join multicast group with fallbacks
# Join the multicast group, falling back through the primary NIC then INADDR_ANY.
try:
mreq = struct.pack("=4s4s", socket.inet_aton(group), socket.inet_aton(iface_ip))
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
Expand Down Expand Up @@ -221,7 +218,7 @@ def datagram_received(self, data: bytes, addr: tuple[str, int]) -> None:
# Zero-allocation decode directly into shared buffer
if decode_status_bin_into(data, self._client._shared_status):
self._client._status_generation += 1
# Event.set() is synchronous and wakes all waiters
# Event.set() is synchronous, so it's safe to wake waiters from this callback
self._client._status_event.set()

def error_received(self, exc: Exception) -> None:
Expand All @@ -246,7 +243,7 @@ def __init__(
timeout: float = 1.0,
retries: int = 1,
) -> None:
# Endpoint configuration (host/port immutable after endpoint creation)
# host/port are immutable after endpoint creation
self._host = host
self._port = port
self.timeout = timeout
Expand All @@ -259,21 +256,19 @@ def __init__(
# Pre-allocated TX buffer for fire-and-forget command encoding
self._tx_buf = bytearray(256)

# Persistent asyncio datagram endpoint
self._transport: asyncio.DatagramTransport | None = None
self._protocol: _UDPClientProtocol | None = None
self._rx_queue: asyncio.Queue[tuple[bytes, tuple[str, int]]] = asyncio.Queue(
maxsize=256
)
self._ep_lock = asyncio.Lock()

# Serialize request/response
# Serializes request/response so concurrent queries don't steal each other's replies
self._req_lock = asyncio.Lock()

# ACK policy (category-based, no stream_mode dependency)
self._ack_policy = AckPolicy()

# Shared status state (single buffer, event-based notification)
# Single shared buffer with event-based notification
self._status_transport: asyncio.DatagramTransport | None = None
self._status_sock: socket.socket | None = None
self._shared_status: StatusBuffer = StatusBuffer()
Expand All @@ -283,18 +278,16 @@ def __init__(
# Last command index returned by server for queued commands
self._last_command_index: int | None = None

# Active tool key (set by select_tool)
self._active_tool_key: str | None = None
self._active_variant_key: str = ""

# Bound tool specs. Populated from the parol6 tool registry so that
# Populated from the parol6 tool registry so that
# `from parol6 import AsyncRobotClient; c = AsyncRobotClient(...)` works
# without going through Robot.create_async_client(). The Robot factory
# rebinds these from its own tools.available, which is the same source.
self._bound_tools: dict[str, ToolSpec] = {}
self._bind_default_tools()

# Lifecycle flag
self._closed: bool = False

def _bind_default_tools(self) -> None:
Expand Down Expand Up @@ -519,7 +512,6 @@ async def stream_status_shared(self) -> AsyncIterator[StatusBuffer]:
yield self._shared_status
continue

# Wait for next update
await self._status_event.wait()

if self._closed:
Expand All @@ -539,20 +531,18 @@ async def _send(self, cmd: msgspec.Struct) -> int:
await self._ensure_endpoint()
assert self._transport is not None

# Get command type from struct's tag
cmd_type = STRUCT_TO_CMDTYPE.get(type(cmd))
if cmd_type is None:
return 0

# System commands: need stable bytes across await
# System commands need stable bytes across the await, so encode a fresh buffer
if cmd_type in SYSTEM_CMD_TYPES:
try:
await self._request_ok_raw(encode_command(cmd), self.timeout)
return 1
except TimeoutError:
return 0

# Motion and other non-query commands
if cmd_type not in QUERY_CMD_TYPES:
if self._ack_policy.requires_ack(cmd_type):
try:
Expand All @@ -561,12 +551,11 @@ async def _send(self, cmd: msgspec.Struct) -> int:
return ok.index if ok.index is not None else 0
except TimeoutError:
return -1
# Fire-and-forget: reuse pre-allocated buffer (sendto copies)
# Fire-and-forget: safe to reuse the shared buffer since sendto copies it
encode_command_into(cmd, self._tx_buf)
self._transport.sendto(self._tx_buf)
return 1

# Queries via _send: fire-and-forget
encode_command_into(cmd, self._tx_buf)
self._transport.sendto(self._tx_buf)
return 1
Expand Down Expand Up @@ -1230,7 +1219,6 @@ async def wait_status(
logger.debug("Status predicate raised", exc_info=True)
continue

# Wait for next update with timeout
remaining = max(0.0, end_time - time.monotonic())
if remaining <= 0:
return False
Expand Down
3 changes: 0 additions & 3 deletions parol6/client/dry_run_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,6 @@ def _dispatch(self, params: Any) -> DryRunResult | None:
# to the planner which handles them as inline segments.
cmd_cls = self._registry.get_command_for_struct(type(params))
if cmd_cls is not None and issubclass(cmd_cls, (JogJCommand, JogLCommand)):
# Flush blend buffer, sync state, simulate jog
self._planner.flush()
self._state.Position_in[:] = self._planner.state.Position_in
cmd = cmd_cls(params)
Expand Down Expand Up @@ -434,7 +433,6 @@ def _simulate_cartesian_jog(self, cmd: JogLCommand) -> DryRunResult | None:
max(2, int(duration * CONTROL_RATE_HZ)),
)

# Current pose via FK
current_se3 = get_fkine_se3(self._state)
se3_rpy(current_se3, self._rpy_buf)
# pose = [x_m, y_m, z_m, rx_rad, ry_rad, rz_rad]
Expand Down Expand Up @@ -495,7 +493,6 @@ def _simulate_cartesian_jog(self, cmd: JogLCommand) -> DryRunResult | None:

radians[i] = last_valid_q

# Update state to final position
rad_to_steps(last_valid_q, steps_buf)
self._state.Position_in[:] = steps_buf

Expand Down
2 changes: 0 additions & 2 deletions parol6/client/sync_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,9 @@ def _stop_sync_loop() -> None:
loop = _SYNC_LOOP

async def _shutdown():
# Cancel all pending tasks
tasks = [t for t in asyncio.all_tasks(loop) if t is not asyncio.current_task()]
for task in tasks:
task.cancel()
# Let cancelled tasks finalize
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
loop.stop()
Expand Down
9 changes: 2 additions & 7 deletions parol6/commands/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ class ExecutionStatusCode(Enum):
CANCELLED = auto()


# ----- Small utilities -----


class Countdown:
"""Simple count-down timer."""

Expand Down Expand Up @@ -108,9 +105,8 @@ def __init__(self, p: P) -> None:
self._q_rad_buf: np.ndarray = np.zeros(6, dtype=np.float64)
self._steps_buf: np.ndarray = np.zeros(6, dtype=np.int32)

# Ensure command objects are usable as dict keys (identity-based)
def __hash__(self) -> int:
# Identity-based hash is appropriate for ephemeral command instances
# Identity-based hash: command instances are ephemeral dict keys.
return id(self)

@property
Expand Down Expand Up @@ -247,8 +243,7 @@ def compute(self, state: ControllerState) -> bytes:
...

def execute_step(self, state: ControllerState) -> ExecutionStatusCode:
# Queries are dispatched via compute() by the controller.
# This exists only to satisfy the abstract method.
# Queries are dispatched via compute(); execute_step is never used.
raise NotImplementedError("Queries use compute(), not execute_step()")


Expand Down
Loading
Loading