Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
os:
- macos-15-intel # intel
- macos-latest # arm64
# - windows-latest
- windows-latest
- ubuntu-latest
python-version:
- '3.10'
Expand Down
2 changes: 1 addition & 1 deletion CONTRIBUTING.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ To resolve this, manually link the upstream and pull all tags::
git remote add upstream https://github.com/con/duct
git fetch upstream



Testing
-------
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
requires = [
"setuptools >= 46.4.0",
"versioningit",
"psutil; sys_platform == 'win32'",
]
build-backend = "setuptools.build_meta"

Expand Down Expand Up @@ -43,6 +44,7 @@ classifiers = [
"Topic :: System :: Systems Administration",
"Operating System :: Unix",
"Operating System :: MacOS",
"Operating System :: Microsoft :: Windows",
]

# Version is managed by versioningit (see [tool.versioningit.*])
Expand Down
146 changes: 134 additions & 12 deletions src/con_duct/duct_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@

SYSTEM = platform.system()

if SYSTEM == "Windows":
import psutil # Perform globally at time of import to avoid repeated imports later

lgr = logging.getLogger("con-duct")
DEFAULT_LOG_LEVEL = os.environ.get("DUCT_LOG_LEVEL", "INFO").upper()

Expand Down Expand Up @@ -359,7 +362,7 @@ def _get_sample_linux(session_id: int) -> Sample:
return sample


def _try_to_get_sid(pid: int) -> int:
def _try_to_get_sid_mac(pid: int) -> int:
"""
It is possible that the `pid` returned by the top `ps` call no longer exists at time of `getsid` request.
"""
Expand Down Expand Up @@ -405,13 +408,19 @@ def _add_pid_to_sample_from_line_mac(


def _get_sample_mac(session_id: int) -> Optional[Sample]:
"""
The following are Mac-specific implementation details, as different from other OS.

The `ps` command on Mac does not support the `-s` option to filter by session ID.
Therefore, we retrieve all processes and filter them manually by checking their session IDs.
"""
sample = Sample()

lines = _get_ps_lines_mac()
pid_to_matching_sid = {
pid: sid
for line in lines
if (sid := _try_to_get_sid(pid=(pid := int(line.split(maxsplit=1)[0]))))
if (sid := _try_to_get_sid_mac(pid=(pid := int(line.split(maxsplit=1)[0]))))
== session_id
}

Expand All @@ -436,9 +445,99 @@ def _get_sample_mac(session_id: int) -> Optional[Sample]:
return sample


def _add_pid_to_sample_from_line_windows(
process_info: dict[str, str], sample: Sample
) -> None:
current_time = datetime.now().astimezone()
creation_time = datetime.strptime(
process_info["ctime"][:21],
"%Y%m%d%H%M%S.%f",
).astimezone()
etime = current_time - creation_time

sample.add_pid(
pid=process_info["pid"],
stats=ProcessStats(
pcpu=process_info["pcpu"],
pmem=process_info["pmem"],
rss=process_info["rss"],
vsz=process_info["vsz"],
timestamp=current_time.isoformat(),
etime=etime.isoformat(),
stat=Counter([process_info["stat"]]),
cmd=process_info["cmd"],
),
)


def _get_sample_windows(ppid: int) -> Optional[Sample]:
"""
The following are Windows-specific implementation details, as different from other OS.

Elapsed time must be calculated on-the-fly from process creation datetime, as there is no etime field.
Status values are full text (e.g., "running", "sleeping") rather than standardized single-letter codes.

Parent PID must be used in place of session IDs.
- There is no analog to session IDs in Windows.
- "Session #" is a global ID for Terminal Services sessions.
- Child processes are not adopted if the parent ends first, unlike Unix-like systems.
- This does, however, currently only support first-order children (not the full tree of descendants).
"""
sample = Sample()

all_child_info = [
{
"pcpu": proc_info["cpu_percent"],
"pmem": proc_info["memory_percent"],
"rss": proc_info["memory_info"].rss,
"vsz": proc_info["memory_info"].vms,
"ctime": proc_info["create_time"],
"stat": proc_info["status"],
"cmd": " ".join(proc_info["cmdline"]),
}
for proc in psutil.process_iter()
if proc.ppid() == ppid and (proc_info := proc.as_dict())
]
all_info = [
{
"pcpu": proc_info["cpu_percent"],
"pmem": proc_info["memory_percent"],
"rss": proc_info["memory_info"].rss,
"vsz": proc_info["memory_info"].vms,
"ctime": proc_info["create_time"],
"stat": proc_info["status"],
"cmd": " ".join(proc_info["cmdline"]),
}
for proc in psutil.process_iter()
if "sleep" in proc.cmdline() and (proc_info := proc.as_dict())
]
if not all_child_info:
lgr.debug(f"No processes found with parent ID {ppid}.")
lgr.debug(f"{all_info=}")
time.sleep(15)
return None

# collections.dequeue with maxlen=0 is used to approximate the
# performance of list comprehension (superior to basic for-loop)
# and also does not store `None` (or other) return values
deque(
(
_add_pid_to_sample_from_line_windows( # type: ignore[func-returns-value]
process_info=process_info, sample=sample
)
for process_info in all_child_info
),
maxlen=0,
)

sample.averages = Averages.from_sample(sample=sample)
return sample


_get_sample_per_system = {
"Linux": _get_sample_linux,
"Darwin": _get_sample_mac,
"Windows": _get_sample_windows,
}
_get_sample: Callable[[int], Optional[Sample]] = _get_sample_per_system[SYSTEM] # type: ignore[assignment]

Expand Down Expand Up @@ -508,11 +607,22 @@ def collect_environment(self) -> None:

def get_system_info(self) -> None:
"""Gathers system information related to CPU, GPU, memory, and environment variables."""
if SYSTEM == "Windows":
import getpass

cpu_total = os.cpu_count()
memory_total = psutil.virtual_memory().total
uid = hash(getpass.getuser())
else:
cpu_total = os.sysconf("SC_NPROCESSORS_CONF")
memory_total = os.sysconf("SC_PAGESIZE") * os.sysconf("SC_PHYS_PAGES")
uid = os.getuid()

self.system_info = SystemInfo(
cpu_total=os.sysconf("SC_NPROCESSORS_CONF"),
memory_total=os.sysconf("SC_PAGESIZE") * os.sysconf("SC_PHYS_PAGES"),
cpu_total=cpu_total,
memory_total=memory_total,
hostname=socket.gethostname(),
uid=os.getuid(),
uid=uid,
user=os.environ.get("USER"),
)
# GPU information
Expand Down Expand Up @@ -552,13 +662,23 @@ def get_system_info(self) -> None:
self.gpus = None

def collect_sample(self) -> Optional[Sample]:
assert self.session_id is not None
try:
sample = _get_sample(self.session_id)
if SYSTEM == "Windows":
# Windows does not have session IDs
# But parent-child relationships are more stable
# So use parent PID instead
assert self.process is not None
sample = _get_sample(ppid=self.process.pid)
return sample
except subprocess.CalledProcessError as exc: # when session_id has no processes
lgr.debug("Error collecting sample: %s", str(exc))
return None
else:
assert self.session_id is not None
try:
sample = _get_sample(self.session_id)
return sample
except (
subprocess.CalledProcessError
) as exc: # when session_id has no processes
lgr.debug("Error collecting sample: %s", str(exc))
return None

def update_from_sample(self, sample: Sample) -> None:
self.full_run_stats = self.full_run_stats.aggregate(sample)
Expand Down Expand Up @@ -1036,10 +1156,12 @@ def execute(
lgr.info("duct %s is executing %r...", __version__, full_command)
lgr.info("Log files will be written to %s", log_paths.prefix)
try:
if mode == SessionMode.NEW_SESSION:
if mode == SessionMode.NEW_SESSION and SYSTEM != "Windows":
report.session_id = os.getsid(
process.pid
) # Get session ID of the new process
elif SYSTEM == "Windows":
report.session_id = process.pid # Use parent PID as session identifier
else: # CURRENT_SESSION mode
report.session_id = os.getsid(
os.getpid()
Expand Down
17 changes: 17 additions & 0 deletions test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from pathlib import Path
from typing import Generator
import pytest
from con_duct.duct_main import SYSTEM


@pytest.fixture(scope="session", autouse=True)
Expand All @@ -23,6 +24,22 @@ def set_test_config() -> Generator:
del os.environ[k]


@pytest.fixture(scope="session")
def echo_command() -> list[str]:
"""System-specific command for echo base."""
return ["echo"] if SYSTEM != "Windows" else ["cmd", "/c", "echo"]


@pytest.fixture(scope="session")
def sleep_command() -> list[str]:
"""System-specific command for sleep base."""
return (
["sleep"]
if SYSTEM != "Windows"
else ["powershell", "-command", "Start-Sleep", "-Seconds"]
)


@pytest.fixture(autouse=True)
def reset_logger_state() -> Generator:
"""Automatically reset logger state after each test.
Expand Down
6 changes: 2 additions & 4 deletions test/duct_main/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@
import json
import os
from pathlib import Path
import platform
import subprocess
import time
import pytest
from con_duct.duct_main import SUFFIXES

SYSTEM = platform.system()
TEST_SCRIPT_DIR = Path(__file__).parent.parent / "data"
# Allow overriding the duct executable for testing external builds (e.g., PyInstaller)
_DUCT_EXECUTABLES = [
Expand All @@ -23,8 +21,8 @@ def duct_cmd(request: pytest.FixtureRequest) -> str:
return str(request.param)


def test_sanity(temp_output_dir: str, duct_cmd: str) -> None:
command = f"{duct_cmd} -p {temp_output_dir}log_ sleep 0.1"
def test_sanity(temp_output_dir: str, duct_cmd: str, sleep_command: str) -> None:
command = f"{duct_cmd} -p {temp_output_dir}log_ {sleep_command} 0.1"
Comment on lines +24 to +25
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IDEA: we may be better off making a sleep.py fixture file that handles these complexities and calling duct python on that script

subprocess.check_output(command, shell=True)


Expand Down
Loading
Loading