Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
325 changes: 304 additions & 21 deletions plugins/alive/scripts/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,30 @@ def _today():
return datetime.now().strftime("%Y-%m-%d")


def _read_json(path, key):
"""Read a JSON file. Create with {key: []} if missing."""
def _read_json(path, key, strict=True):
"""Read a JSON file. Create with {key: []} if missing.

If strict=False, return None on malformed files instead of exiting.
This allows callers like _collect_all_tasks to skip bad files gracefully.
"""
if not os.path.exists(path):
return {key: []}
try:
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, dict) or key not in data:
print("Error: malformed {}".format(path), file=sys.stderr)
sys.exit(1)
if strict:
print("Error: malformed {}".format(path), file=sys.stderr)
sys.exit(1)
print("Warning: skipping malformed {}".format(path), file=sys.stderr)
return None
return data
except json.JSONDecodeError:
print("Error: malformed JSON in {}".format(path), file=sys.stderr)
sys.exit(1)
if strict:
print("Error: malformed JSON in {}".format(path), file=sys.stderr)
sys.exit(1)
print("Warning: skipping malformed JSON in {}".format(path), file=sys.stderr)
return None


def _atomic_write(path, data):
Expand All @@ -59,6 +69,262 @@ def _next_id(tasks):
return "t{:03d}".format(highest + 1)


# ---------------------------------------------------------------------------
# v2 tasks.md → v3 tasks.json migration
# ---------------------------------------------------------------------------

# Section headers map to priority values
_SECTION_PRIORITY = {
"urgent": "urgent",
"active": "active",
"to do": "todo",
"todo": "todo",
"done": "done",
"done (recent)": "done",
"completed": "done",
}


def _parse_task_line(line, priority, counter):
"""Parse a single markdown task line into a v3 task dict.

Handles:
- [ ] Task text @session_id (by DATE)
- [x] Task text @session_id (DATE)
"""
# Determine done status from checkbox
done_match = re.match(r"^\s*-\s*\[([ xX])\]\s*(.*)", line)
if not done_match:
return None
is_done = done_match.group(1).lower() == "x"
text = done_match.group(2).strip()
if not text:
return None

# Extract session/assignee (@hexid)
session = None
session_match = re.search(r"@([a-f0-9]{6,12})\b", text)
if session_match:
session = session_match.group(1)
text = text[:session_match.start()].rstrip() + text[session_match.end():]
text = text.strip()

# Extract @urgent tag (not a session id)
if "@urgent" in text:
priority = "urgent"
text = text.replace("@urgent", "").strip()

# Extract due date: (by DATE) or (by EOD DATE)
due = None
due_match = re.search(r"\(by\s+(?:EOD\s+)?(\d{4}-\d{2}-\d{2})\)", text)
if due_match:
due = due_match.group(1)
text = text[:due_match.start()].rstrip() + text[due_match.end():]
text = text.strip()

# Extract completion date: (DATE) at end
completed_date = None
if is_done:
comp_match = re.search(r"\((\d{4}-\d{2}-\d{2})\)\s*$", text)
if comp_match:
completed_date = comp_match.group(1)
text = text[:comp_match.start()].strip()

# Clean up trailing/leading punctuation artifacts
text = text.strip(" ,;-")

task = {
"id": "t{:03d}".format(counter),
"title": text,
"status": "done" if is_done else ("active" if priority == "active" else "todo"),
"priority": priority if not is_done else "todo",
"assignee": None,
"due": due,
"tags": [],
"created": completed_date or _today(),
"session": session or "migrated",
}

if is_done and completed_date:
task["completed"] = completed_date

return task


def _migrate_tasks_md(md_path, json_path):
"""Migrate a v2 tasks.md to v3 tasks.json + completed.json.

- Parses section headers (## Urgent, ## Active, ## To Do, ## Done)
- Parses checkbox lines with session IDs, dates, tags
- Writes tasks.json (open tasks) and completed.json (done tasks)
- Renames tasks.md → tasks.md.v2-backup
- Returns the path to the new tasks.json
"""
with open(md_path, "r", encoding="utf-8") as f:
content = f.read()

lines = content.split("\n")
current_priority = "todo"
counter = 1
open_tasks = []
done_tasks = []

# Skip YAML frontmatter if present
in_frontmatter = False
body_start = 0
if lines and lines[0].strip() == "---":
for i, line in enumerate(lines[1:], 1):
if line.strip() == "---":
body_start = i + 1
break

for line in lines[body_start:]:
stripped = line.strip()

# Section header detection
if stripped.startswith("## ") or stripped.startswith("# "):
header = stripped.lstrip("#").strip().lower()
if header in _SECTION_PRIORITY:
current_priority = _SECTION_PRIORITY[header]
continue

# Task line
if re.match(r"^\s*-\s*\[", stripped):
task = _parse_task_line(stripped, current_priority, counter)
if task:
counter += 1
if task["status"] == "done":
done_tasks.append(task)
else:
open_tasks.append(task)

# Write tasks.json
tasks_data = {"tasks": open_tasks}
_atomic_write(json_path, tasks_data)

# Write completed.json alongside (in _kernel/ if kernel path, else same dir)
parent = os.path.dirname(json_path)
completed_path = os.path.join(parent, "completed.json")
if not os.path.exists(completed_path):
completed_data = {"completed": done_tasks}
_atomic_write(completed_path, completed_data)
elif done_tasks:
# Append to existing completed.json
existing = _read_json(completed_path, "completed", strict=False)
if existing is not None:
existing["completed"].extend(done_tasks)
_atomic_write(completed_path, existing)

# Backup the original
backup_path = md_path + ".v2-backup"
if not os.path.exists(backup_path):
os.rename(md_path, backup_path)
else:
# Backup already exists (edge case), just remove the md
os.remove(md_path)

task_count = len(open_tasks)
done_count = len(done_tasks)
print(
"Migrated {} → {} ({} open, {} done)".format(
os.path.basename(md_path), os.path.basename(json_path),
task_count, done_count
),
file=sys.stderr,
)
return json_path


def _upgrade_v2_json(json_path):
"""Upgrade a v2-format tasks.json in place.

v2 format: {"tasks": [{"text": "...", "status": "...", "priority": "normal"}]}
v3 format: {"tasks": [{"id": "t001", "title": "...", "status": "...", "priority": "todo", ...}]}

Detection: any task with "text" key and no "id" key is v2.
"""
try:
with open(json_path, "r", encoding="utf-8") as f:
data = json.load(f)
except (json.JSONDecodeError, OSError):
return # Let _read_json handle the error

if not isinstance(data, dict) or "tasks" not in data:
return

tasks = data["tasks"]
if not tasks:
return

# Check if upgrade needed: v2 tasks have "text" not "title"
needs_upgrade = any(
"text" in t and "id" not in t
for t in tasks
)
if not needs_upgrade:
return

# Map v2 priority values to v3
priority_map = {"normal": "todo", "urgent": "urgent", "active": "active"}

upgraded = []
counter = 1
for t in tasks:
if "text" in t and "id" not in t:
v2_priority = t.get("priority", "todo")
v3_priority = priority_map.get(v2_priority, v2_priority)
upgraded.append({
"id": "t{:03d}".format(counter),
"title": t["text"],
"status": t.get("status", "todo"),
"priority": v3_priority,
"assignee": None,
"due": None,
"tags": t.get("tags", []),
"created": _today(),
"session": t.get("session", "migrated"),
})
else:
# Already v3 format (mixed file), keep as-is
upgraded.append(t)
counter += 1

data["tasks"] = upgraded
_atomic_write(json_path, data)

print(
"Upgraded {} v2 tasks in {} to v3 format".format(
len([t for t in tasks if "text" in t and "id" not in t]),
os.path.basename(json_path)
),
file=sys.stderr,
)


def _ensure_tasks_json(json_path):
"""Ensure tasks.json exists and is v3 format. Migrate if needed.

Called before any read/write to a tasks.json path.
Handles three cases:
1. tasks.json exists in v3 format — no-op
2. tasks.json exists in v2 format (text, no id) — upgrade in place
3. No tasks.json, but tasks.md exists — migrate from markdown
4. Neither exists — will be created by _read_json on first access
"""
if os.path.exists(json_path):
_upgrade_v2_json(json_path)
return json_path

# Check for v2 tasks.md at the same location
parent = os.path.dirname(json_path)
md_path = os.path.join(parent, "tasks.md")
if os.path.isfile(md_path):
return _migrate_tasks_md(md_path, json_path)

# Neither exists — will be created by _read_json on first access
return json_path


def _all_task_files(walnut):
"""Return absolute paths of every tasks.json under walnut, recursively.

Expand All @@ -67,24 +333,36 @@ def _all_task_files(walnut):
"""
results = []
walnut_abs = os.path.abspath(walnut)
skip_dirs = {".git", "node_modules", "__pycache__", "dist", "build", ".next", "target"}
skip_dirs = {
".git", "node_modules", "__pycache__", "dist", "build", ".next", "target",
# Archive and reference directories contain legacy files that may not
# conform to the v3 tasks.json schema. Never scan into them.
"_archive", "_references", "01_Archive", "raw",
}
for root, dirs, files in os.walk(walnut):
# Skip hidden dirs and known non-content dirs
dirs[:] = [d for d in dirs if not d.startswith(".") and d not in skip_dirs]
# Skip hidden dirs, known non-content dirs, and anything with "archive" in name
dirs[:] = [
d for d in dirs
if not d.startswith(".")
and d not in skip_dirs
and "archive" not in d.lower()
]
# Stop at nested walnut boundaries (but not the root walnut itself)
if os.path.abspath(root) != walnut_abs:
kernel_key = os.path.join(root, "_kernel", "key.md")
if os.path.isfile(kernel_key):
dirs[:] = [] # don't descend into nested walnut
continue
if "tasks.json" in files:
results.append(os.path.join(root, "tasks.json"))
if "tasks.md" in files and "tasks.json" not in files:
print(
"Warning: {}/tasks.md found (v2 format). "
"Run system-upgrade to migrate.".format(root),
file=sys.stderr,
)
json_path = os.path.join(root, "tasks.json")
_ensure_tasks_json(json_path) # upgrade v2 format if needed
results.append(json_path)
elif "tasks.md" in files:
# Auto-migrate v2 tasks.md → tasks.json on first touch
md_path = os.path.join(root, "tasks.md")
json_path = os.path.join(root, "tasks.json")
_migrate_tasks_md(md_path, json_path)
results.append(json_path)
return results


Expand All @@ -94,7 +372,9 @@ def _find_task(walnut, task_id):
Returns (file_path, task_dict, data_dict) or exits with error.
"""
for tf in _all_task_files(walnut):
data = _read_json(tf, "tasks")
data = _read_json(tf, "tasks", strict=False)
if data is None:
continue
for task in data["tasks"]:
if task.get("id") == task_id:
return tf, task, data
Expand Down Expand Up @@ -125,16 +405,19 @@ def _resolve_bundle_path(walnut, bundle):
def _tasks_path_for_bundle(walnut, bundle):
if bundle:
bundle_dir = _resolve_bundle_path(walnut, bundle)
return os.path.join(bundle_dir, "tasks.json")
return os.path.join(walnut, "_kernel", "tasks.json")
json_path = os.path.join(bundle_dir, "tasks.json")
else:
json_path = os.path.join(walnut, "_kernel", "tasks.json")
return _ensure_tasks_json(json_path)


def _collect_all_tasks(walnut):
"""Return every task from every tasks.json under walnut."""
all_tasks = []
for tf in _all_task_files(walnut):
data = _read_json(tf, "tasks")
all_tasks.extend(data["tasks"])
data = _read_json(tf, "tasks", strict=False)
if data is not None:
all_tasks.extend(data["tasks"])
return all_tasks


Expand Down
Loading