apflow v2 is repositioned from "task orchestration framework" to AI Agent Production Middleware. This is a major rewrite. The v1 codebase is preserved on the v1-task-orchestration branch.
- Self-built protocol layers:
api/a2a/,api/mcp/,api/graphql/,api/docs/,api/routes/— replaced by apcore ecosystem - CLI:
cli/— replaced by apcore-cli - AI framework wrappers:
extensions/crewai/,extensions/llm/,extensions/generate/— out of scope for middleware - Other executors:
extensions/grpc/,extensions/tools/— dropped - DuckDB storage: replaced by SQLite
- Dependencies removed: duckdb-engine, pytz, fastapi, uvicorn, a2a-sdk, rich, typer, crewai, litellm, anthropic, strawberry-graphql, grpclib, protobuf, beautifulsoup4, trafilatura
- Entry points removed:
apflow-servercommand (replaced byapflow serve) - 25+ stale v1 documentation files
-
apcore Module Bridge (
bridge/)ExecutableTaskModuleAdapter: wraps any executor as apcore Modulediscover_executor_modules(): auto-discovers executors via AST scanner- 5 task management modules: create, execute, list, get, delete
create_apflow_registry(): one call to register all apflow capabilities- Zero config:
pip install apflow→ MCP + A2A + CLI ready
-
Durable Execution (
durability/)CheckpointManager: save/load/delete execution state intask_checkpointstableRetryManager: fixed/exponential/linear backoff with jitterCircuitBreaker: CLOSED/OPEN/HALF_OPEN state machine per executor, thread-safeCircuitBreakerRegistry: thread-safe registry of per-executor breakersExecutableTaskinterface: 3 new optional methods (supports_checkpoint,get_checkpoint,resume_from_checkpoint)- Full TaskManager integration: circuit breaker pre-check, checkpoint loading, retry wrapping, post-execution cleanup
-
Cost Governance (
governance/)BudgetManager: per-task token budget tracking with check/updatePolicyEngine: register named policies with block/downgrade/notify/continue actionsProviderRouter: model downgrade chains (e.g., opus → sonnet → haiku)UsageReporter: aggregated usage reporting with JSON export- Full TaskManager integration: pre-execution budget check with policy evaluation, post-execution token usage update
-
SQLite Storage (replacing DuckDB)
SQLiteDialect: WAL mode, 5 PRAGMAs for optimal performance- Memory mode (
sqlite:///:memory:) for testing - File mode with WAL for standalone production
- Zero external dependencies (Python stdlib)
- All 3 existing migrations ported from
information_schematosqlalchemy.inspect()
-
Database Migration 004: durability fields (6) + governance fields (5) +
task_checkpointstable -
TaskModel extensions: 11 new fields (checkpoint_at, resume_from, attempt_count, max_attempts, backoff_strategy, backoff_base_seconds, token_usage, token_budget, estimated_cost_usd, actual_cost_usd, cost_policy)
-
TASK_TABLE_NAME validation: regex guard against SQL injection from environment variable
-
CLI entry points (
cli.py+app.py)apflow serve: Start A2A HTTP server with optional Explorer UIapflow mcp: Start MCP server (stdio, streamable-http, or SSE)apflow info: Show version, configuration, and registered modulescreate_app(): Factory function bootstraps full stack with durability + governance
-
ConfigManager with YAML support (
config_manager.py)- Load from: defaults →
apflow.yaml→ environment variables - Env override:
api.server_url→APFLOW_API_SERVER_URL - Type coercion for int/float/bool/list from env strings
get(key, default)interface for future apcore Config migration
- Load from: defaults →
- Version: 0.18.2 → 0.20.0
- Python: >= 3.10 → >= 3.11 (aligned with apcore)
- Positioning: "Distributed Task Orchestration Framework" → "AI-Perceivable Distributed Orchestration"
- Core dependencies: added apcore >= 0.14.0, apcore-mcp >= 0.11.0, apcore-a2a >= 0.3.0, apcore-cli >= 0.3.0
- TaskCreator: relaxed single-root constraint, allows multi-root task forests
- README.md: completely rewritten for v2
- CLAUDE.md: updated with v2 architecture overview
- Rebrand: aipartnerup → aiperceivable
-
Docker Support
- Docker Compose configuration (
docker-compose.yml) for easy deployment - Docker image definition (
Dockerfile) for containerized apflow .dockerignorefile for optimized image builds
- Docker Compose configuration (
-
Enhanced GraphQL Testing
- New
test_loaders.pyfor GraphQL DataLoaders testing (task loading by ID and children) - Additional test coverage in
test_mutations.pyfor execute_task mutation - Expanded
test_queries.pywith task tree and running tasks queries - Enhanced
test_schema.pyto verify TaskTreeType and TaskProgressEvent in schema - Added task progress subscriptions tests in
test_subscriptions.py - Improved tests for distributed configuration in
test_distributed_config.pyandtest_leader_election.py - Enhanced lease management tests with concurrent acquisition and validation
- New
pyproject.toml: Updatedapdev[dev]dependency to version 0.1.6 for improved compatibility- Documentation workflow: Updated GitHub Actions versions for consistency
- Shared
make_task_dict()helper function inconftest.pyfor consistent task dictionary creation in GraphQL tests
-
PostgreSQL Driver Selection
- Fixed
normalize_postgresql_url()function to correctly select async/sync driver based onasync_modeparameter - Ensures proper database connection handling for both sync and async contexts
- Fixed
-
Configuration Management
- Fixed
test_scheduler.pyto properly patchload_cli_configin fixtures, preventing local config loading during tests - Enhanced runtime tests to handle cases when no task executor is configured
- Fixed
-
Documentation Workflow
- Added
workflow_dispatchtrigger for manual documentation deployment - Fixed GitHub Actions workflow compatibility for documentation site generation
- Added
-
Distributed cluster support (
apflow.core.distributed)- Leader election with PostgreSQL advisory locks
- Lease-based task ownership with automatic expiration and renewal
- Node registry with heartbeat-based health monitoring
- Task placement strategies: round-robin, least-loaded, random, affinity-based
- Distributed worker with configurable concurrency and graceful shutdown
- Distributed runtime orchestrating node lifecycle, leader election, and task distribution
- Idempotency store for at-least-once delivery with deduplication
- Event bus for inter-node communication (task assigned/completed/failed, node join/leave, leader change)
- Database migration
003_add_distributed_supportadding node, lease, and idempotency tables - Configuration via
DistributedConfigwith environment variable support (APFLOW_DISTRIBUTED_*)
-
GraphQL API adapter (
apflow.api.graphql)- Strawberry GraphQL schema with queries, mutations, and subscriptions
- Task CRUD operations: create, get, list, cancel, retry
- Real-time task status subscriptions via WebSocket
- GraphQL server mountable as FastAPI sub-application
- New
[graphql]optional dependency group (strawberry-graphql>=0.220.0)
-
Protocol abstraction layer (
apflow.api.protocols)ProtocolAdapterbase class for multi-protocol support- A2A protocol adapter (
apflow.api.a2a.protocol_adapter) - MCP protocol adapter (
apflow.api.mcp.protocol_adapter) - Unified protocol types (
apflow.api.protocol_types) - Protocol discovery and capability reporting in
capabilities.py - Refactored
app.pyto use protocol registry for dynamic protocol mounting
-
Documentation
- GraphQL API guide with schema reference and usage examples
- Distributed cluster guide covering setup, configuration, and operation
- Reorganized architecture docs:
exception-handling.mdandtask-manager.mdmoved todocs/architecture/ - Consolidated documentation from
apflow-docsrepo intoapflowfor single-repo deployment - Protocol specification (10 files): overview, core concepts, data model, execution lifecycle, examples, interfaces, conformance, errors, validation
- Rich homepage (
docs/index.md) with Material grid cards, learning paths, and quick navigation - Site assets: logo (
docs/assets/logo.svg) and favicon (docs/assets/favicon.ico) - MkDocs configuration (
mkdocs.yml) with Material theme, mermaid2 diagrams, code highlighting, and explicit nav structure - GitHub Actions workflow (
.github/workflows/docs.yml) for automatic GitHub Pages deployment on push tomain - New
[docs]optional dependency group:mkdocs,mkdocs-material,mkdocs-mermaid2-plugin,mkdocs-minify-plugin,pymdown-extensions
pyproject.toml: Addedgraphqloptional dependency group, addedgreenlet>=3.0.0topostgresextras, addedapdev[dev]>=0.1.2to dev dependencies, includedgraphqlin[all]extras, added[docs]optional dependency groupdocs/README.md: Simplified to point to the live documentation site with local development instructions.gitignore: Addedsite/for MkDocs build outputapi/app.py: Refactored to protocol registry pattern for mounting A2A, MCP, and GraphQL adaptersapi/main.py: Updated startup to support distributed mode with--distributedflagcore/execution/task_executor.py: Added distributed-aware task execution with lease acquisitioncore/storage/sqlalchemy/models.py: Extended with distributed node, lease, and idempotency modelscore/config/registry.py: Added distributed configuration support
- Test mocking in
test_multi_phase_crew.py: Added missingcrewai.Agentandcrewai.Taskpatches to preventOPENAI_API_KEYerrors in unit tests - Integration test
test_scrape_crewai_chain_schema_based: Addedapi_keys_availablefixture to skip when API keys are not set
scripts/directory: Removed legacy utility scripts (release.sh,check_allowed_chars.py,check_heavy_imports.py,check_import_performance.sh,detect_circular_imports.py,quick_import_check.py)docs/development/extending.md: Removed in favor of reorganized architecture docs
-
CLI monitoring commands (
apflow monitor)apflow monitor status- Display current task status with rich tablesapflow monitor watch <task-id>- Real-time task monitoring with live updatesapflow monitor trace <task-id>- Execution timeline with dependency tree visualizationapflow monitor db- Database connection status and statistics- Supports both API and local database modes
- Rich terminal UI with tables, trees, panels, and live updates
-
Structured error handling with enhanced context
ApflowErrorbase class now supports structured format withwhat,why,how_to_fix, andcontextfields- Error messages include emoji indicators and formatted troubleshooting guidance
- Backward compatible with existing error handling code
NetworkErrorandAuthenticationErrorclasses with detailed context
-
Memory leak testing infrastructure
- Performance test suite in
tests/performance/test_memory_leaks.py - Tests for memory stability over 1000+ task executions
- Executor instance cleanup verification
- Database file growth monitoring
- Added
memory-profiler>=0.61.0andpsutil>=5.9.0to dev dependencies
- Performance test suite in
-
Comprehensive documentation and examples
- Hello World Guide - 5-minute quick start tutorial
- Executor Selection Guide - Decision tree, comparison matrix, and detailed scenarios for all 8 executor types
- Example code files in
examples/hello_world/:simple.py- Basic custom executor examplewith_dependencies.py- Task dependencies examplebuilt_in_executors.py- REST executor examples
- Example code files in
examples/executor_examples/:rest_executor_example.py- HTTP API integration patternsssh_executor_example.py- Remote command execution examplesdocker_executor_example.py- Container execution examples
- Updated documentation indexes to reference new guides and examples
-
Enhanced error handling in executors
- REST executor: Improved exception handling for timeout, connection errors, SSL errors, and HTTP errors with structured context
- SSH executor: Enhanced authentication error handling with key permission checks and troubleshooting guidance
- TaskRepository improvements
- Added
query_tasks_by_statuses()method for efficient multi-status queries using SQLAlchemy IN clause - Optimized status filtering in monitoring commands
- Added
-
CLI
tasks scheduled completepassing unsupportedresultparameter to repositorycomplete_scheduled_run()does not accept aresultparameter; removed the incorrect kwarg from the CLI direct-path call
-
CLI
tasks countinefficient in-memory counting- Replaced
query_tasks()+len()per-status loop (which loaded all records into memory) with newTaskRepository.count_tasks_by_status()usingSELECT COUNT(*)SQL queries, matching the API's efficient implementation
- Replaced
-
--error-message/-moption forapflow tasks cancel- Allows specifying a custom cancellation error message, consistent with the API
error_messageparameter - Defaults to
"Cancelled by user"/"Force cancelled by user"when not provided
- Allows specifying a custom cancellation error message, consistent with the API
-
--calculate-next-run / --no-calculate-next-runoption forapflow tasks scheduled complete- Controls whether
next_run_atis recalculated after completing a scheduled run (default:True), consistent with the APIcalculate_next_runparameter
- Controls whether
-
--before/-boption forapflow tasks scheduled due- Allows specifying a custom cutoff time (ISO datetime) to query due tasks, consistent with the API
beforeparameter - Defaults to current UTC time when not provided
- Allows specifying a custom cutoff time (ISO datetime) to query due tasks, consistent with the API
-
TaskRepository.count_tasks_by_status()method- Efficient SQL
COUNT(*)grouped-by-status query withuser_idandroot_onlyfilters
- Efficient SQL
apflow tasks historyCLI command- Removed non-functional stub command; underlying
TaskRepository.get_task_history()andTaskModel.status_logwere never implemented - Task history is managed at the A2A Protocol TaskStore layer, not TaskModel
- Removed non-functional stub command; underlying
- Enhanced executor security and validation; added tests for JWT secret generation and TaskManager locking
- Updated token claims to use
rolesas a list; added tests for role-claim handling
-
Scheduler webhook trigger admin permission bypass
handle_webhook_triggernow checks_is_admin(request)and setsuser_id=Nonefor admin users, matching the pattern used by all other handlers (handle_tasks_list,handle_tasks_count, etc.)- Previously, admin JWT tokens passed to the webhook trigger were treated as regular users, causing "Permission denied" when the scheduler triggered tasks owned by other users
-
Scheduled task re-execution not resetting child tasks
complete_scheduled_run()now clearsresult,progress, and resets all child tasks topendingbefore the next scheduled run viareset_task_tree_for_reexecution()- Previously, a scheduled parent task would re-run but its children retained their
completedstatus and stale results from the previous cycle, causing incorrect or skipped execution
-
ISO datetime 'Z' suffix parsing inconsistency
- Added
parse_iso_datetime()helper incore/utils/helpers.pythat normalises theZsuffix before callingfromisoformat(), fixing compatibility with Python 3.10 and below - Replaced scattered
datetime.fromisoformat(value.replace('Z', '+00:00'))calls acrossschedule_calculator.py,ical.py,tasks.pyroutes, and CLI commands with the centralised helper
- Added
-
Scheduler auto-generates admin JWT for API access
InternalScheduler._get_auth_token()auto-generates an admin JWT using the localjwt_secretfromconfig.cli.yamlwhenadmin_auth_tokenis not explicitly configured- Token is cached per scheduler session (generated once, reused for all API calls)
- Falls back gracefully to unauthenticated access if JWT generation fails
- Eliminates the need for manual token configuration when running
apflow scheduler startlocally
-
Scheduler startup auth identity logging
_log_auth_identity()logs the auth source (configured token vs auto-generated) and JWT subject at startup for troubleshooting
-
Webhook verify hook for custom authentication (
register_webhook_verify_hook)- New
@register_webhook_verify_hookdecorator in public API (apflow.register_webhook_verify_hook) for tenant-level custom webhook verification - Hook receives
WebhookVerifyContext(task_id, signature, timestamp, client_ip) and returnsWebhookVerifyResult(valid, user_id, error) - Supports both sync and async hook functions
- Added
WebhookVerifyContext,WebhookVerifyResultdataclasses, andWebhookVerifyHooktype alias incore/types.py
- New
-
apflow scheduler listCLI command- List scheduled tasks with filters:
--enabled-only/--all,--user-id,--type,--status,--format - Supports both API and direct DB modes via the standard API gateway helper
- Example:
apflow scheduler list --status running,apflow scheduler list --type daily -f json
- List scheduled tasks with filters:
-
apflow scheduler start --verboseoption- New
--verbose/-vflag enables DEBUG-level logging for the scheduler process - Works in both foreground and background modes
- New
-
tasks.scheduled.export-icalAPI endpoint- Export scheduled tasks as iCalendar format via JSON-RPC
- Supports
user_id,schedule_type,enabled_only,limit,calendar_name, andbase_urlparameters - Non-admin users can only export their own tasks; admin users can export all
-
Status filter for scheduled task queries
tasks.scheduled.listAPI endpoint and CLI command (apflow tasks scheduled list,apflow scheduler list) now accept--status/statusparameter to filter by task status (e.g.pending,running,completed,failed)TaskRepository.get_scheduled_tasks()updated with newstatusparameter
-
reset_task_tree_for_reexecution()repository method- Resets all child tasks in a tree to clean pending state (status, result, error, timestamps, progress) before scheduled re-execution
- Called automatically by
complete_scheduled_run()for parent tasks with children
-
Comprehensive test coverage for admin webhook permissions (
TestWebhookTriggerAdminPermission: 3 tests) -
Comprehensive test coverage for iCal export API endpoint (
TestScheduledTasksExportIcal: 4 tests) -
Additional scheduler tests for auto-generated auth token and auth identity logging
-
Webhook trigger authentication redesigned — three-layer priority chain
- Authentication priority: (1) JWT via middleware → (2)
webhook_verify_hookfor tenant-level custom verification → (3)APFLOW_WEBHOOK_SECRETfor internal HMAC - IP whitelist and rate limit are now applied as an additional protection layer after authentication, instead of being bundled with signature validation
- Replaces the previous single-
WebhookGatewayapproach where all validation was handled in one pass
- Authentication priority: (1) JWT via middleware → (2)
-
Unified task tree execution model for scheduler
- Scheduler (
internal.py) and webhook gateway (webhook.py) now always load the task tree from DB viaget_task_tree_for_api(), removing thehas_children-based auto-detection of single vs tree mode - Dependency cascade is handled uniformly by
execute_after_taskregardless of task structure
- Scheduler (
-
Scheduled tasks table now shows Status column
_print_scheduled_tasks_table()includes task status with color-coded display (yellow=pending, blue=running, green=completed, red=failed)
-
Refined log levels across scheduler and API routes
- Scheduler (
internal.py): Demoted per-poll-cycle and per-task-execution detail logs from INFO to DEBUG (Found N due tasks,Executing scheduled task,Loaded task tree,Task not completed (status=pending), etc.) - Scheduler: Task failures without an explicit error (e.g.,
status=pending) are now logged at DEBUG instead of WARNING - API routes (
tasks.py): Demoted per-request method/params/duration logs from INFO to DEBUG, reducing noise under default INFO level - API routes: Demoted CRUD operation detail logs from INFO to DEBUG (
Updated task,Creating task tree from N tasks,Generated task copy preview) - Removed emoji characters from API route log messages
- Scheduler (
-
Code formatting applied across codebase via Black formatter
-
Executor Schema Mechanism — Pydantic BaseModel with ClassVar
- All built-in executors now declare input/output schemas as Pydantic
BaseModelclasses assigned viainputs_schema: ClassVar[type[BaseModel]]andoutputs_schema: ClassVar[type[BaseModel]], replacing inlineget_input_schema()/get_output_schema()method overrides BaseTask.get_input_schema()andget_output_schema()are now implemented in the base class: they callmodel_json_schema()on the Pydantic model and resolve$refreferences inline viaresolve_schema_refs()- Subclasses no longer need to override
get_input_schema()/get_output_schema()— just set theClassVar - Both Pydantic
BaseModeland legacydictschemas are still supported (backward compatible) - Added
resolve_schema_refs()helper incore/utils/helpers.pyto inline$defs/$reffrom Pydantic-generated JSON Schema - Updated all 16 built-in executors:
AggregateResultsExecutor,ApiExecutor,BatchCrewaiExecutor,CrewaiExecutor,DockerExecutor,SendEmailExecutor,GenerateExecutor,GrpcExecutor,RestExecutor,LLMExecutor,McpExecutor,ScrapeExecutor,SshExecutor,CommandExecutor,SystemInfoExecutor,WebSocketExecutor - Updated documentation across 8 files:
extending.md,custom-tasks.md,quick-reference.md,python.md,basic_task.md,quick-start.md,faq.md,best-practices.md
- All built-in executors now declare input/output schemas as Pydantic
-
API Architecture Refactoring — Three-Layer Design
- Introduced unified capabilities registry (
api/capabilities.py) as single source of truth for all 15 task operations, replacing separate inline definitions in A2A and MCP modules - A2A
POST /now only handles agent-level actions (tasks.execute,tasks.generate,tasks.cancel); CRUD operations return an error directing clients toPOST /tasks - MCP adapter auto-generates all 15 tools from the capabilities registry via
call_by_tool_namerouting - Simplified A2A adapter: removed ~300 lines of CRUD bridging code from
agent_executor.pyandtask_routes_adapter.py
- Introduced unified capabilities registry (
-
Email Executor (
send_email_executor)- Send emails via Resend HTTP API or SMTP protocol
- Provider dispatch:
inputs["provider"]selects"resend"or"smtp" - Resend provider: POST to
https://api.resend.com/emailsviahttpx.AsyncClient, returns message ID and status code - SMTP provider: Uses
aiosmtplib.send()(optional dependency, guarded import) - Supports plain text and HTML emails, cc/bcc recipients, reply-to address
- Input validation for required fields (provider, to, subject, from_email, body/html)
- Schema methods (
get_input_schema,get_output_schema) and demo mode support - Added
[email]optional dependency withaiosmtplib>=3.0.0 - Added
emailto[all]installation extra - Comprehensive test coverage: 26 tests for validation, Resend, SMTP, schemas, and demo results
-
Method Discovery Endpoint (
GET /tasks/methods)- Returns all available task methods grouped by category (
agent_action,crud,query,monitoring) with input schemas, descriptions, and examples - Driven by the capabilities registry for automatic consistency
- Returns all available task methods grouped by category (
-
Task Scheduling System
- Added
ScheduleTypeenum supporting six schedule types:once,interval,cron,daily,weekly,monthly - Added 9 scheduling fields to
TaskModel:schedule_type,schedule_expression,schedule_enabled,schedule_start_at,schedule_end_at,next_run_at,last_run_at,max_runs,run_count - Created
ScheduleCalculatormodule for next execution time calculation across all schedule types - Database migration
002_add_scheduling_fields.pyfor adding scheduling columns and indexes - Automatic execution mode detection: tasks with children execute in tree mode, tasks without children execute in single mode
- Added
-
Internal Scheduler
- Built-in asyncio-based scheduler (
InternalScheduler) for standalone deployment - Configurable poll interval, max concurrent tasks, task timeout, and user filtering
- Scheduler lifecycle management: start, stop, pause, resume
- Task completion callbacks for monitoring and integration
- Built-in asyncio-based scheduler (
-
External Scheduler Gateway
WebhookGatewayfor HTTP webhook integration with external schedulers- HMAC signature validation, IP filtering, and rate limiting
- Helper functions for generating cron entries and Kubernetes CronJob manifests
- Support for cron, Kubernetes CronJob, APScheduler, Temporal integration
-
Calendar Integration
ICalExporterfor exporting scheduled tasks in iCalendar format- RRULE support for recurring events (daily, weekly, monthly, interval)
- Calendar feed URL generation for subscription in Google Calendar, Outlook, etc.
-
Scheduler CLI Commands
apflow scheduler start- Start internal scheduler (foreground or background)apflow scheduler stop- Stop running schedulerapflow scheduler status- Show scheduler statusapflow scheduler trigger- Manually trigger a taskapflow scheduler export-ical- Export tasks to iCal formatapflow scheduler webhook-url- Generate webhook URL for external schedulers
-
Scheduling API Endpoints
tasks.scheduled.list- List all scheduled taskstasks.scheduled.due- Get tasks due for executiontasks.scheduled.init- Initialize/recalculate next_run_attasks.scheduled.complete- Complete scheduled run and calculate next executiontasks.webhook.trigger- Trigger task execution via webhook (JSON-RPC)POST /webhook/trigger/{task_id}- REST endpoint for external schedulers
-
Repository Methods for Scheduling
get_due_scheduled_tasks()- Query tasks ready for executionget_scheduled_tasks()- List scheduled tasks with filtersmark_scheduled_task_running()- Mark task as in-progresscomplete_scheduled_run()- Complete run and update scheduling stateinitialize_schedule()- Calculate initial next_run_at
-
Added
schedulingoptional dependency withcroniter>=1.0.0 -
Added scheduling to
standardandallinstallation extras -
Comprehensive test coverage: 118 tests for scheduling functionality
-
GenerateExecutor Enhancements - Advanced Task Tree Generation System
- Dual-mode generation system:
single_shot(fast) andmulti_phase(high-quality) modes - Multi-phase CrewAI-based task generation with 4-phase specialization:
- Phase 1: Requirement analysis - understand goals and constraints
- Phase 2: Structure design - design correct task hierarchies with proper parent-child relationships
- Phase 3: Input generation - fill task inputs matching executor schemas exactly
- Phase 4: Review and validation - validate and auto-fix common issues
- Schema formatter module (
schema_formatter.py) - provides accurate runtime executor schemas instead of outdated static documentation - Principles extractor module (
principles_extractor.py) - extracts framework principles without code examples to guide LLM generation - Enhanced validation system addressing three critical issues:
- Schema compliance: validates all task inputs match executor schema definitions (required fields, types, constraints)
- Root task patterns: enforces correct task hierarchy (multi-executor trees must have aggregator root)
- Auto-fix mechanisms: automatically corrects multiple roots, invalid parent IDs, and schema mismatches
- Comprehensive test coverage: 359 tests for GenerateExecutor features, multi-phase crew, schema formatting, and principles extraction
- Documentation: detailed development guide (
docs/development/generate-executor-improvements.md) and enhanced examples
- Dual-mode generation system:
-
Integration tests for schema-based task tree execution and generation, covering scrape_executor + llm_executor, scrape_executor + crewai_executor, and generate_executor flows (tests/integration/test_schema_based_execution.py).
-
Improved demo mode test coverage, including custom demo results, sleep scaling, and fallback logic (tests/core/test_demo_mode.py).
-
Enhanced TaskBuilder tests for input/output schema handling and multi-level dependencies (tests/core/test_task_builder.py).
-
More comprehensive failure handling tests for TaskManager, including all error types and re-execution (tests/core/execution/test_task_failure_handling.py).
-
Tests for hook context modification and persistence in pre_hook and post_hook scenarios (tests/core/storage/test_hook_modify_task_with_context.py).
-
Integration tests for TaskExecutor with custom and built-in tools (tests/core/execution/test_task_executor_tools_integration.py).
-
New
ScrapeExecutor(scrape_executor) for web content extraction and data scraping capabilities -
Extension scanner module (
src/apflow/core/extensions/scanner.py) for AST-based scanning of extension files -
Test coverage for extension scanner functionality (tests/core/test_extension_scanner.py)
-
New
APFLOW_EXTENSIONS_IDSenvironment variable for fine-grained control over extension ID-based access restrictions -
Documentation section on task data fields to clarify distinction between inputs, params, and result fields
-
CrewAI Dependency Data Injection Improvements
CrewaiExecutornow injects dependency outputs directly into CrewAI task templates and input variables, preventing missing-template-variable errors in complex workflows- TaskManager executor instantiation path has been extended to pass structured dependency data into CrewAI-based executors for more reliable multi-phase generation
-
GenerateExecutor Improvements
- Refactored task tree generation to use runtime executor schemas instead of static documentation, ensuring schema accuracy
- Moved from single-shot generation to intelligent prompt building with principle extraction and schema formatting
- Enhanced post-processing to ensure correct user_id handling and UUID validation
- Improved error handling and validation with detailed logging for debugging
- Added
generation_modeparameter to executor inputs for selecting between fast (single_shot) and high-quality (multi_phase) generation
-
Refactored WebSocketExecutor to provide full input/output schema, error handling, and cancellation support (src/apflow/extensions/websocket/websocket_executor.py).
-
Refactored and cleaned up JSON-RPC API tests for clarity, conciseness, and modern style (tests/api/a2a/test_http_json_rpc.py).
-
Improved dummy executor and related test coverage for TaskBuilder (tests/core/test_task_builder.py).
-
Enhanced FailingExecutor and related tests for more robust error simulation (tests/core/execution/test_task_failure_handling.py).
-
Improved hook context usage and persistence in hook modification tests (tests/core/storage/test_hook_modify_task_with_context.py).
-
Expanded demo mode tests to cover more edge cases and sleep scaling (tests/core/test_demo_mode.py).
-
Improved schema-based execution tests for dependency mapping and fallback logic (tests/integration/test_schema_based_execution.py).
-
Enhanced extension management system with AST-based scanning and detailed configuration options
-
Improved extension loading control with executor ID-based restrictions for enhanced security
-
Enhanced documentation for CLI command extensions, overrides, and library usage patterns
-
Improved CrewAI-based generation dependency handling in TaskManager and
CrewaiExecutor, ensuring dependency data is consistently available for CrewAI tasks and multi-phase generation flows -
Enhanced
GenerateExecutorandSchemaFormatterwith executor exclusion support and lazy loading of multi-phase CrewAI components, reducing overhead when advanced generation features are not used
- Fixed LLM-generated task trees not respecting executor schemas - now validates all inputs match schema definitions
- Fixed poor task tree structure issues - ensures correct root task patterns for multi-executor scenarios
- Fixed outdated documentation in prompts - now uses runtime schemas instead of static markdown docs
- Fixed single-shot LLM quality inconsistencies - added multi-phase mode with specialized crews for complex requirements
- Task failure handling and demo mode functionality improvements with enhanced error tracking and recovery mechanisms
- Improved
ExtensionScannerpath handling for nested project layouts and ensured multi-phase CrewAI tests are skipped cleanly when CrewAI is not installed, including corrected import paths forMultiPhaseGenerationCrew
- Implemented dependency validation and resolution utilities with corresponding unit tests
- Implemented user ownership validation for task linking and copying
- Added assertions for task_tree_id in TaskCreator tests
- Add project URLs for homepage and source in pyproject.toml
- Standardized parameter names in dependency validation functions and enhanced tests for task inclusion validation
- Updated circular dependency detection function signature and tests for consistency
- Refactored dependency validation and resolution logic for clarity and maintainability
- Remove unused imports from test files
- Update directory structure documentation for clarity and organization
- Simplified 'apflow serve' command usage in documentation
- Improved dependency cycle and edge case handling in task linking/copying
-
Task Model Enhancements
- Added
task_tree_idfield to TaskModel for managing task hierarchy and relationships - Introduced
origin_typefield to track task origin (own, link, copy, reference, archive) - Implemented
SchemaMigrationmodel to track migration history for schema evolution - Added
has_referencesflag validation in task creation tests
- Added
-
Executor Access Control System
- Implemented executor access control via
APFLOW_EXTENSIONSenvironment variable - Added functions to retrieve allowed executor IDs and filter available executors
- Created new API endpoint (
/system/executors) to list available executors with restriction support - Introduced CLI commands to list executors with various output formats (table, json, yaml)
- Added permission checks in TaskManager to enforce executor access control during task execution
- Comprehensive test coverage for executor permissions and API functionality
- Implemented executor access control via
-
CLI Extension System Enhancements
- Automatic Root Command Detection: All single functions registered via
@cli_registerare now automatically treated as root commands (e.g.,apflow version,apflow server --port 8000)- Removed
root_commandparameter - no longer needed - Functions are always root commands, classes are always groups
- Simplified API:
@cli_register(name="hello")creates a root command automatically
- Removed
- Group Extension Support: Added
groupparameter to@cli_registerdecorator for extending existing CLI groups- Extend custom groups:
@cli_register(group="my-group", name="new-command") - Extend built-in groups:
@cli_register(group="tasks", name="custom-action") - Override subcommands:
@cli_register(group="my-group", name="existing", override=True)
- Extend custom groups:
- New
get_cli_group()Function: Convenient API for accessing and extending CLI groups- Supports both registered extensions and built-in groups (tasks, config, etc.)
- Usage:
group = get_cli_group("my-group"); @group.command() def new_cmd(): ... - Clear error messages when group doesn't exist
- Improved Override Behavior: Clear distinction between different override scenarios
name="my-group", override=True- Override entire groupgroup="my-group", name="cmd", override=True- Override subcommand in groupname="my-cmd", override=True- Override root command
- Comprehensive test coverage (8 tests) for all extension scenarios
- Automatic Root Command Detection: All single functions registered via
-
CLI Documentation
- Added comprehensive CLI documentation with configuration management guide (
cli/configuration.md) - Created practical usage examples (
cli/examples.md) - Developed detailed CLI usage guide covering installation, modes, and best practices (
cli.md) - Introduced CLI documentation index for improved navigation
- Added comprehensive CLI documentation with configuration management guide (
-
TaskCreator hanldeing different origin types
- Added comprehensive tests for
TaskCreatormethods handling different origin types:from_link,from_copy,from_archive, andfrom_mixedmethods. - Removed create_task_copy* moethods
- Includes test cases for single tasks, recursive trees, field overrides, dependency handling, and edge cases (error conditions, immutability).
- Added comprehensive tests for
-
TaskCreator Logic Refactoring
- Refactored and clarified the logic for
from_link,from_copy,from_archive, andfrom_mixedmethods inTaskCreator. - Improved type annotations, docstrings, and error handling for all task creation methods.
- Enhanced dependency and subtree validation logic for recursive copy/link/archive operations.
- Improved handling of parent/child relationships and task tree construction.
- Updated internal helper methods for better maintainability and testability.
- Refactored and clarified the logic for
-
Extension Management Refactoring
- Moved executor-related functions from
apflow.api.extensionstoapflow.core.extensions.managerfor better code organization - Updated import paths across the entire codebase to reflect new structure
- Enhanced TaskManager to load executors dynamically with permission checks
- Improved on-demand extension loading for better performance and security
- Moved executor-related functions from
-
Decorator Override Logic
- All
decorators.pyoverridelogic has been changed toforcelogic for consistency and clarity across the codebase.
- All
-
Task Execution Improvements
- Enhanced task execution logic with improved error handling
- Implemented priority grouping for better task scheduling
- Optimized task tree retrieval using
task_tree_idwith fallback mechanism
-
TaskRepository Refactored
- Refactored task update methods in
TaskRepositoryfor improved maintainability and clarity. - Use the method
task_update(self, task_id: str, **kwargs)instead of allupdate_task_*series functions. - Refactored
TaskModeltoTaskModelTypeand enhanced task creation methods for better type safety and extensibility.
- Refactored task update methods in
- Database Schema Management
- Simplified TaskRepository initialization by removing custom column checks
- Added migration tests to ensure proper schema evolution and idempotent execution
- Improved database schema handling for more reliable operations
- Fixed edge cases in recursive copy/link/archive logic where tasks with external dependencies or disconnected subtrees could cause errors.
- Improved error messages for invalid task trees and dependency cycles.
- Fluent API (TaskBuilder) for streamlined task creation and execution documents
- Removed tasks command aliases from CLI documentation and implementation
- Distributed core enablement for multi-node orchestration (experimental)
- CLI: implement
task historycommand and add short aliases for all major tasks subcommands:list→ls,status→st,cancel→c,watch→w,history→hi- All aliases are documented and tested
- gRPC executor: support multiple backends and improved channel management
- Updated gRPC dependencies and test references from grpcio to grpclib
- Standard Installation Profile - New
[standard]extra providing recommended feature set- Combines A2A server, CLI tools, CrewAI, and LLM support in single installation command
- Installation:
pip install apflow[standard]orpip install -e ".[standard,dev]"for development - Simplifies setup for most common use cases without requiring manual feature selection
- Test Infrastructure Improvements
- Fixed event loop conflicts in pytest-asyncio test environment via ThreadPoolExecutor-based async isolation
- New test fixtures:
disable_api_for_testsfor API gateway testing,run_asynchelper for sync test context - All 59 CLI tests now passing without hangs (fixed exit code 130 issues)
- A2A tests excluded by default (added to pytest.ini ignore list since a2a is optional dependency)
- Full test suite stable: 861 tests passing
- Security Enhancement: Token Masking
- Improved token masking in CLI config display from 8-character preview to 3-character preview
- Prevents sensitive words from leaking in masked token display
- Example: "very-secret-token-12345" now masked as "ver..." instead of "very-sec..."
- Documentation Updates
- Updated README.md to highlight
[standard]as recommended installation option - Updated docs/development/setup.md to recommend
[standard,dev]for development environment - Enhanced installation documentation to explain standard profile benefits
- Improved test documentation with note about A2A tests being optional
- Added detailed [standard] configuration documentation in setup.md
- Updated README.md to highlight
- API Client Enhancement
- APIClient.list_tasks() now supports
offsetparameter for pagination beyond initial limit
- APIClient.list_tasks() now supports
- Event Loop Conflict Resolution
- Fixed hangs in async CLI tests by implementing ThreadPoolExecutor-based event loop isolation in conftest.py
- Root cause: run_async_safe() was attempting nested event loop execution in pytest-asyncio context
- Solution: Patched run_async_safe() to use separate thread with independent event loop
- Resolves issue where @pytest.mark.asyncio + runner.invoke() caused tests to hang with exit code 130
- Added tests for TaskBuilder handling multiple dependencies and multi-level dependency chains to prevent regressions when wiring dependent tasks.
- ConfigManager integration validation
- New integration test verifies
.envloading and dynamic hook registration work end-to-end viadistribute_task_tree.
- New integration test verifies
- CLI → API Gateway Architecture (Priority 2)
- New
APIClientclass (src/apflow/cli/api_client.py) for CLI to communicate with API server via HTTP - ConfigManager extended with API configuration:
api_server_url,api_auth_token,use_local_db,api_timeout,api_retry_attempts,api_retry_backoff - APIClient supports exponential backoff retry (configurable attempts and initial backoff)
- APIClient supports auth token injection via Bearer header
- Comprehensive error handling for connection failures, timeouts, and API errors with custom exception types
- Added
httpx>=0.27.0to CLI optional dependencies for HTTP communication - Integration tests for APIClient initialization, context manager, and ConfigManager API methods (8 tests)
- Enables single source of truth (API) for task data when both CLI and API are running
- Solves DuckDB concurrent write limitation (all writes go through API)
- Foundation for future protocol adapters (GraphQL, MQTT, WebSocket)
- New
- CLI Command Layer Refactoring for API Gateway Integration
- Created
api_gateway_helper.pywith helper functions for transparent API usage and fallback to local database should_use_api(): Check if API is configuredget_api_client_if_configured(): Async context manager yielding APIClient when configured, None otherwiseapi_with_fallback_decorator(): Try API first, fall back to local database if unavailablelog_api_usage(): Log whether command is using API or local database- Refactored CLI task commands to use API gateway when configured:
tasks list: Supports API viaclient.list_tasks()with status and user filterstasks get: Supports API viaclient.get_task()tasks status: Supports API viaclient.get_task_status()tasks cancel: Supports API viaclient.cancel_task()
- All commands automatically fall back to local database if API is not configured
- Graceful error handling with warning logs when API unavailable but fallback enabled
- Added property accessors to ConfigManager for convenience:
api_server_url,api_auth_token,use_local_db,api_timeout,api_retry_attempts,api_retry_backoff - Integration tests for CLI API gateway with fallback scenarios (13 tests)
- All existing CLI tests pass without modification (59 tests, backward compatible)
- Created
- CLI Configuration Management System with Multi-Location & Separated Secrets
- New
apflow configcommand with 13 subcommands for comprehensive configuration management - Basic Config:
set,get,unset,list,reset- Full CRUD operations with alias support - Token Management:
gen-token,verify-token- Generate and verify JWT tokens with role-based claims - Quick Setup:
init,init-server- Interactive wizard and one-command server setup - Utilities:
path,show-path,edit,validate- Config file management and validation - Multi-location configuration support:
- Priority 1 (highest):
APFLOW_CONFIG_DIRenvironment variable - Priority 2: Project-local
.data/directory (if in project with pyproject.toml/.git) - Priority 3: User-global
~/.aiperceivable/apflow/(default fallback)
- Priority 1 (highest):
- Separated secrets architecture:
config.cli.yaml(600 permissions) - Unified configuration (all settings: api_server_url, timeouts, tokens, jwt_secret, etc.)
- Single file with multi-location priority system (project-local and user-global)
- API server can also read from same multi-location config structure
gen-token --saveparameter to save tokens to config.cli.yaml- Token security: Automatic masking in all outputs, expiry validation, signature verification
- Alias support for convenience:
api-server/api-url→api_server_url,api-token/token→admin_auth_token apflow config validatechecks YAML syntax, API server connectivity, and token validityapflow config verify-tokendisplays token details (subject, issuer, expiry, role) with status indicatorsapflow config initprovides interactive wizard for first-time setup- Added
PyJWT>=2.8.0to CLI dependencies for JWT token support - 19 new tests for config persistence and CLI commands (all passing)
- New
- Refactor import paths from aiperceivableflow to apflow across test files
- Updated import statements in various test files to reflect the new module structure under apflow.
- Ensured all references to aiperceivableflow are replaced with apflow in test modules related to extensions, including but not limited to:
- crewai
- docker
- generate
- grpc
- http
- llm
- mcp
- ssh
- stdio
- websocket
- Adjusted integration tests to align with the new import paths.
-
Hook Execution Context for Database Access
- New
get_hook_repository()function allows hooks to access database within task execution context - New
get_hook_session()function provides direct access to database session in hooks - Hooks now share the same database session/transaction as TaskManager (no need for separate sessions)
- Auto-persistence for
task.inputsmodifications in pre-hooks (detected and saved automatically) - Explicit repository methods available for other field modifications (name, priority, status, etc.)
- Thread-safe context isolation using Python's
ContextVar(similar to Flask/Celery patterns) - Added
set_hook_context()andclear_hook_context()internal functions for context management - Exported to public API:
apflow.get_hook_repositoryandapflow.get_hook_session - Added comprehensive test coverage (16 tests):
- Hook context basic operations and lifecycle
- Multiple hooks sharing same session instance
- Hooks sharing transaction context and seeing uncommitted changes
- Hooks cooperating via shared session
- Auto-persistence of task.inputs modifications
- Explicit field updates via repository methods
- New
-
Database Session Safety Enhancements
- Added
flag_modified()calls for all JSON fields (result, inputs, dependencies, params, schemas) to ensure SQLAlchemy detects in-place modifications - Added
db.refresh()after critical status updates to ensure fresh data from database - Added concurrent execution protection: same task_tree cannot run multiple times simultaneously
- Returns
{"status": "already_running"}when attempting concurrent execution of same task tree - Added 12 new tests for concurrent protection and JSON field persistence
- Added
-
CLI Extension Decorator (
cli_register)- New
@cli_register()decorator for registering CLI extensions, similar to@executor_register() - Decorator supports
name,help, andoverrideparameters - Auto-derives command name from class name (converts
_to-) - New
get_cli_registry()function to access registered CLI extensions - CLI extensions are loaded from decorator registry before entry_points discovery
- Added comprehensive test coverage (18 tests) for CLI extension decorators
- New
-
Exception Handling Architecture
- New exception hierarchy based on FastAPI/production framework best practices
ApflowErrorbase exception for all framework-specific errorsBusinessErrorfor expected user/configuration errors (logged without stack traces)ValidationErrorfor input validation failuresConfigurationErrorfor missing configuration/dependencies
SystemErrorfor unexpected system-level errors (logged with stack traces)ExecutorErrorfor executor runtime failuresStorageErrorfor database/storage failures
- Created
core/execution/errors.pywith comprehensive exception documentation - Created
docs/development/exception.mdwith implementation guidelines
-
Executor Error Handling Refactoring
- All executors now raise exceptions instead of returning error dictionaries
- Technical exceptions (TimeoutError, ConnectionError, etc.) now propagate naturally to TaskManager
- Executors validate inputs and raise
ValidationErrororConfigurationErrorfor expected failures - Updated executors:
DockerExecutor,GrpcExecutor,RestExecutor,SshExecutor,CommandExecutor,LLMExecutor - TaskManager now catches all exceptions, marks tasks as failed, and logs appropriately based on exception type
BusinessErrorlogged without stack trace (clean logs), other exceptions logged with full stack trace
-
CrewAI Executor/Batch Executor Renaming and Test Fixes
crewai/crew_manager.py→crewai/crewai_executor.py, with the class name updated toCrewaiExecutorcrewai/batch_manager.py→crewai/batch_crewai_executor.py, with the class name updated toBatchCrewaiExecutor- All related test cases (test_crewai_executor.py, test_batch_crewai_executor.py, etc.) have been batch-updated with corrected patch paths, mocks, imports, and class names to align with the new naming
- Resolved AttributeError: module 'apflow.extensions.crewai' has no attribute 'crew_manager' and similar test failures caused by the renaming
-
LLM Executor Integration
- Added
LLMExecutor(llm_executor) for direct LLM interaction via LiteLLM - Supports unified
modelparameter for 100+ providers (OpenAI, Anthropic, Gemini, etc.) - Support for
stream=Truein inputs or context metadata for Server-Sent Events (SSE) - Automatic API key handling via
LLMKeyConfigManageror environment variables - Auto-registration via extensions mechanism
- Added
[llm]optional dependency includinglitellm
- Added
-
CLI: Plugin Mechanism for Extensions
- Added
CLIExtensionclass to facilitate creating CLI subcommands in external projects. - Implemented dynamic subcommands discovery using Python
entry_points(apflow.cli_plugins). - Allows projects like
apflow-demoto register commands (e.g.,apflow users stat) without modifying the core library. - Supports both full
typer.Typerapps and single-command callables as plugins.
- Added
-
CLI: Improved Task Count Output
- Changed default output format of
apflow tasks countfromjsontotablefor better terminal readability.
- Changed default output format of
- CLI: Simplified
apflow taskscommandsapflow tasks countnow defaults to providing comprehensive database statistics grouped by status.- Removed redundant
--alland--statusflags fromcountcommand (database statistics are now the default). - Renamed
apflow tasks allcommand toapflow tasks listfor better alignment with API naming conventions. - Removed the legacy
apflow tasks listcommand (which only showed running tasks). - The new
apflow tasks listcommand now lists all tasks from the database with support for filtering and pagination.
- Tests: Infrastructure and LLM Integration
- Updated
tests/conftest.pyto automatically load.envfile environment variables at the start of the test session. - Added auto-registration for
LLMExecutorin the testconftest.pyfixture. - Fixed
LLMExecutorintegration tests to correctly use real API keys from.envwhen available.
- Updated
-
CLI: event-loop handling for async database operations
- Ensured async database sessions and repositories are created and closed inside the same event loop to avoid "no running event loop" and "Event loop is closed" errors
- Updated
apflow taskscommands to run async work in a safe context - Added
nest_asynciosupport for nested event loops in test environments
-
Logging: clean CLI output by default
- Default log level for the library is now
ERRORto keep CLI output clean - Support
LOG_LEVELandDEBUGenvironment variables to override logging when needed - Debug logs can be enabled with
LOG_LEVEL=DEBUG apflow ...
- Default log level for the library is now
-
Extensions registration noise reduced
- Demoted expected registration instantiation messages to
DEBUG(no longer printed by default) - This prevents benign initialization messages from appearing during normal CLI runs
- Demoted expected registration instantiation messages to
-
Miscellaneous
- Added
nest_asyncioto CLI optional dependencies to improve compatibility in nested-loop contexts
- Added
-
Documentation Corrections for
schemas.methodField- Clarified that
schemas.methodis a required field whenschemasis provided - Updated documentation to explicitly state that
schemas.methodmust match an executor ID from the extensions registry - Fixed all documentation examples to use real executor IDs instead of placeholder values
- Updated examples across all documentation files:
docs/api/http.md: Replaced generic"executor_id"with concrete IDs like"system_info_executor","rest_executor","command_executor"docs/getting-started/quick-start.md: Updated all task examples to use valid executor IDsdocs/guides/cli.md: Fixed CLI command examples with correct executor IDsdocs/development/design/cli-design.md: Updated design documentation examplesdocs/development/setup.md: Fixed setup guide examples
- Fixed
generate_executor.pyLLM prompt to correctly instruct LLM to useschemas.method(notname) as executor ID - Updated task structure examples in LLM prompt to reflect correct usage
- Clarified that
-
API Endpoint Test Coverage
- Added missing test cases for API endpoints:
test_jsonrpc_tasks_list: Teststasks.listendpoint with paginationtest_jsonrpc_tasks_running_status: Teststasks.running.statusendpoint with array formattest_jsonrpc_tasks_running_count: Teststasks.running.countendpointtest_jsonrpc_tasks_cancel: Teststasks.cancelendpoint with array formattest_jsonrpc_tasks_generate: Teststasks.generateendpoint for task tree generation
- Fixed test parameter format issues:
tasks.running.statusandtasks.cancelnow correctly usetask_idsarray parameter instead of singletask_id- Tests now expect array responses instead of single object responses
- All API endpoints now have comprehensive test coverage
- Added missing test cases for API endpoints:
-
CLI Command Test Coverage
- Added
test_tasks_watchtest cases fortasks watchCLI command- Uses mock to avoid interactive
Livedisplay component issues in automated tests - Tests parameter validation and basic functionality
- Properly handles error messages in stderr
- Uses mock to avoid interactive
- Added
-
API Documentation Completeness
- Added missing response example for
tasks.running.statusendpoint- Includes complete response format with all fields (task_id, context_id, status, progress, error, is_running, timestamps)
- Documents error cases (not_found, permission_denied)
- Clarifies that method returns array format even for single task queries
- Added missing response example for
- Comprehensive Documentation Review
- Verified all documentation examples use valid executor IDs
- Ensured all examples are functional and can be parsed correctly
- Validated that all CLI commands have corresponding test cases
- Confirmed API endpoint documentation matches actual implementation
- DuckDB Custom Path Directory Creation
- Fixed issue where DuckDB would fail when using custom directory paths that don't exist
- Added
_ensure_database_directory_exists()function to automatically create parent directories before creating DuckDB connections - Directory creation is now handled automatically in
create_session(),SessionPoolManager.initialize(), andPooledSessionContext.__init__() - Skips directory creation for in-memory databases (
:memory:) and handles errors gracefully with appropriate logging - Users can now specify custom DuckDB file paths without manually creating directories first
- Missing Return Type Annotations
- Added missing return type annotation
-> Nonetocheck_input_schema()function incore/utils/helpers.py - Added missing return type annotation
-> ParseResulttovalidate_url()function incore/utils/helpers.py - Fixed type checker errors and ensured 100% type annotation compliance as required by code quality rules
- Added missing return type annotation
- Module-Level Resource Creation
- Refactored
core/storage/factory.pyto eliminate module-level global variables for database sessions - Replaced
_default_sessionand_session_pool_managermodule-level globals withSessionRegistryclass - Session state is now encapsulated in
SessionRegistryclass following dependency injection principles - All session management functions (
get_default_session(),set_default_session(),reset_default_session(),get_session_pool_manager(),reset_session_pool_manager()) now useSessionRegistryclass methods - Maintains full backward compatibility - all public APIs remain unchanged
- Follows code quality rules requiring dependency injection instead of module-level resource creation
- Refactored
-
Task Context Sharing and LLM Key Management
- Task Context Sharing: TaskManager now passes the entire
taskobject (TaskModel instance) to executors- Executors can access all task fields including custom TaskModel fields via
self.task - Supports custom TaskModel classes with additional fields
- Enables executors to modify task context (e.g., update status, progress, custom fields)
- BaseTask uses weak references (
weakref.ref) to store task objects, preventing memory leaks - Task context is automatically cleared after execution or cancellation
task_idis stored separately for future extension (e.g., Redis-based task storage)
- Executors can access all task fields including custom TaskModel fields via
- Unified user_id Access: BaseTask provides
user_idproperty that automatically retrieves fromtask.user_id- Executors can use
self.user_idinstead ofinputs.get("user_id") - Falls back to
_user_idwhen task is not available (for backward compatibility and testing) - All LLM executors (
generate_executor,crewai_executor) now useself.user_id
- Executors can use
- Unified LLM Key Retrieval: Centralized LLM key management with context-aware priority order
- New
get_llm_key()function with unified priority logic for API and CLI contexts - API context priority: header → LLMKeyConfigManager → environment variables
- CLI context priority: params → LLMKeyConfigManager → environment variables
- Auto-detection mode (
context="auto") automatically detects API or CLI context - All LLM executors now proactively retrieve keys using unified mechanism
- Removed hardcoded LLM key injection logic from TaskManager (separation of concerns)
- New
- LLM Key Context Optimization: Refactored
llm_key_context.pyto eliminate code duplication- Extracted
_get_key_from_user_config()helper function for user config lookup - Extracted
_get_key_from_source()helper function for header/CLI params retrieval - Reduced code duplication by ~40%, improved maintainability
- All functionality preserved, backward compatible
- Extracted
- Task Context Sharing: TaskManager now passes the entire
-
Enhanced Task Copy Functionality
- UUID Generation for Task IDs: Task copy now always generates new UUIDs for copied tasks, regardless of
saveparameter value- Ensures clear task tree relationships and prevents ID conflicts
- All copied tasks receive unique IDs for proper dependency mapping
- Compatible with
tasks.createAPI whensave=False(returns task array with complete data)
- Save Parameter Support: New
saveparameter forcreate_task_copy()method andtasks.copyAPIsave=True(default): Saves copied tasks to database and returns TaskTreeNodesave=False: Returns task array without saving to database, suitable for preview or direct use withtasks.create- Task array format includes all required fields (id, name, parent_id, dependencies) with new UUIDs
- Dependencies correctly reference new task IDs within the copied tree
- Parameter Renaming for Clarity: Renamed parameters in custom copy mode for better clarity
task_ids→custom_task_ids(required whencopy_mode="custom")include_children→custom_include_children(used whencopy_mode="custom")- Old parameter names removed (no backward compatibility)
- CLI:
--task-ids→--custom-task-ids,--include-children→--custom-include-children
- Improved Dependency Mapping: Fixed dependency resolution in copied task trees
- Dependencies now correctly reference new task IDs within the copied tree
- Original task IDs properly mapped to new IDs for all tasks in the tree
- Circular dependency detection works correctly with new task IDs
original_task_idcorrectly points to each task's direct original counterpart (not root)
- Comprehensive Test Coverage: Added extensive test cases for API and CLI
- API tests: 11 test cases covering all copy modes, save parameter, error handling
- CLI tests: 7 test cases covering all copy modes, dry-run, reset_fields
- Tests verify UUID generation, dependency mapping, and database interaction
- UUID Generation for Task IDs: Task copy now always generates new UUIDs for copied tasks, regardless of
-
API Module Refactoring for Better Library Usage
- Split
api/main.pyinto modular components for improved code organization - New
api/extensions.py: Extension management module withinitialize_extensions()and extension configuration - New
api/protocols.py: Protocol management module with protocol selection and dependency checking - New
api/app.py: Application creation module withcreate_app_by_protocol()and protocol-specific server creation functions api/main.pynow contains library-friendly entry points (main()andcreate_runnable_app()functions)- Benefits: Better separation of concerns, easier to use in external projects like apflow-demo
- Migration: Import paths updated:
from apflow.api.extensions import initialize_extensionsfrom apflow.api.protocols import get_protocol_from_env, check_protocol_dependencyfrom apflow.api.app import create_app_by_protocol, create_a2a_server, create_mcp_server
- All existing imports from
api/maincontinue to work via re-exports for backward compatibility
- Split
-
Enhanced Library Usage Support in
api/main.py- New
create_runnable_app()function: Replacescreate_app()with clearer naming- Returns a fully initialized, runnable application instance
- Handles all initialization steps: .env loading, extension initialization, custom TaskModel loading, examples initialization
- Supports custom routes, middleware, and TaskRoutes class via
**kwargs - Can be used when you need the app object but want to run the server yourself
- Usage:
from apflow.api.main import create_runnable_app; app = create_runnable_app()
- Enhanced
main()function: Now fully supports library usage- Can be called directly from external projects with custom configuration
- Separates application configuration (passed to
create_runnable_app()) from server configuration (uvicorn parameters) - Supports all uvicorn parameters:
host,port,workers,loop,limit_concurrency,limit_max_requests,access_log - Usage:
from apflow.api.main import main; main(custom_routes=[...], port=8080)
- Smart .env File Loading: New
_load_env_file()function with priority-based discovery- Priority order: 1) Current working directory, 2) Main script's directory, 3) Library's own directory (development only)
- Ensures that when used as a library, it loads
.envfrom the consuming project, not from the library's installation directory - Respects existing environment variables (
override=False) - Gracefully handles missing
python-dotenvpackage
- Development Environment Setup: New
_setup_development_environment()function- Only runs when executing library's own
main.pydirectly (not when installed as package) - Suppresses specific warnings for cleaner output
- Adds project root to Python path for development mode
- Does not affect library usage in external projects
- Only runs when executing library's own
- Backward Compatibility: All existing code continues to work
create_app()name deprecated but still available via alias- All initialization steps remain the same, just better organized
- New
-
Enhanced API Server Creation Functions
- Added
auto_initialize_extensionsparameter tocreate_a2a_server()inapi/a2a/server.py - Matches behavior of
create_app_by_protocol()for consistent API - Default:
False(backward compatible) - Added
task_routes_classparameter tocreate_app_by_protocol()and server creation functions - Supports custom
TaskRoutesclass injection throughout the server creation chain - Enables apflow-demo to use standard API functions directly without workarounds
- All new parameters are optional with safe defaults for backward compatibility
- Added
-
Executor Metadata API
- New
get_executor_metadata(executor_id)function to query executor metadata - New
validate_task_format(task, executor_id)function to validate tasks against executor schemas - New
get_all_executor_metadata()function to get metadata for all executors - Located in
apflow.core.extensions.executor_metadata - Used by demo applications to generate accurate demo tasks
- Returns: id, name, description, input_schema, examples, tags
- New
-
Examples Module Deprecation
- Removed
apflow.examplesmodule from core library - Removed
examplesCLI command (apflow examples init) - Removed
examples = []optional dependency frompyproject.toml - Migration: Demo task initialization has been moved to the apflow-demo project
- Demo task definitions are now managed separately from the core library
- This keeps the core library focused on orchestration functionality
- For demo tasks, please use apflow-demo
- Removed
-
Examples API Methods
- Removed
examples.initandexamples.statusAPI methods from system routes - These methods are no longer available in the API
- Migration: Use apflow-demo for demo task initialization
- Removed
- Session Management Refactoring
- Replaced
get_default_session()withcreate_pooled_session()context manager in all API routes - Renamed
create_task_tree_sessiontocreate_pooled_sessioninstorage/factory.py - Updated
TaskExecutorto usecreate_pooled_sessionas fallback - Improved concurrency safety for API requests
- Breaking Change:
get_default_session()is now deprecated for route handlers
- Replaced
- LLM Key Management Architecture
- Executors now proactively retrieve LLM keys instead of receiving them via inputs
- TaskManager no longer handles LLM key injection for specific executors
- LLM key retrieval is now executor responsibility, following separation of concerns
- All executors use unified
get_llm_key()function with consistent priority order
-
JWT Token Generation Support
- New
generate_token()function inapflow.api.a2a.serverfor generating JWT tokens - Supports custom payload, secret key, algorithm (default: HS256), and expiration (default: 30 days)
- Uses
python-jose[cryptography]for token generation and verification - Complements existing
verify_token()function for complete JWT token lifecycle management - Usage:
from apflow.api.a2a.server import generate_token; token = generate_token({"user_id": "user123"}, secret_key)
- New
-
Cookie-based JWT Authentication
- Support for JWT token extraction from
request.cookies.get("Authorization")in addition to Authorization header - Priority: Authorization header is checked first, then falls back to cookie if header is not present
- Enables cookie-based authentication for web applications and browser-based clients
- Maintains security: Only JWT tokens are trusted (no fallback to HTTP headers for user identification)
- Updated
_extract_user_id_from_request()method inBaseRouteHandlerto support both header and cookie sources
- Support for JWT token extraction from
-
Dependency Updates
- Added
python-jose[cryptography]>=3.3.0to[a2a]optional dependencies inpyproject.toml - Required for JWT token generation and verification functionality
- Added
-
TaskRoutes Extension Mechanism
- Added
task_routes_classparameter tocreate_a2a_server()and_create_request_handler()for custom TaskRoutes injection - Eliminates the need for monkey patching when extending TaskRoutes functionality
- Supports custom routes via
custom_routesparameter inCustomA2AStarletteApplication - Backward compatible: optional parameter with default
TaskRoutesclass - Usage:
create_a2a_server(task_routes_class=CustomTaskRoutes, custom_routes=[...])
- Added
-
Task Tree Lifecycle Hooks
- New
register_task_tree_hook()decorator for task tree lifecycle events - Four hook types:
on_tree_created,on_tree_started,on_tree_completed,on_tree_failed - Explicit lifecycle tracking without manual root task detection
- Hooks receive root task and relevant context (status, error message)
- Usage:
@register_task_tree_hook("on_tree_completed") async def on_completed(root_task, status): ...
- New
-
Executor-Specific Hooks
- Added
pre_hookandpost_hookparameters to@executor_register()decorator - Runtime hook registration via
add_executor_hook(executor_id, hook_type, hook_func) - Inject custom logic (e.g., quota checks, demo data fallback) for specific executors
pre_hookcan return a result to skip executor execution (useful for demo mode)post_hookreceives executor, task, inputs, and result for post-processing- Supports both decorator-based and runtime registration for existing executors
- Added
-
Automatic user_id Extraction
- Automatic
user_idextraction from JWT token inTaskRoutes.handle_task_generateandhandle_task_create - Only extracts from JWT token payload for security (HTTP headers can be spoofed)
- Supports
user_idfield or standard JWTsubclaim in token payload - Extracted
user_idautomatically set on task data - Simplifies custom route implementations and ensures consistent user identification
- Security: Only trusted JWT tokens are used, no fallback to HTTP headers
- Automatic
-
Demo Mode Support
- Built-in demo mode via
use_demoparameter in task inputs - CLI support:
--use-demoflag forapflow run flowcommand - API support:
use_demoparameter in task creation and execution - Executors can override
get_demo_result()method inBaseTaskfor custom demo data - Default demo data format:
{"result": "Demo execution result", "demo_mode": True} - All built-in executors now implement
get_demo_result()method:SystemInfoExecutor,CommandExecutor,AggregateResultsExecutorRestExecutor,GenerateExecutor,ApiExecutorSshExecutor,GrpcExecutor,WebSocketExecutorMcpExecutor,DockerExecutorCrewaiExecutor,BatchCrewaiExecutor(CrewAI executors)
- Realistic Demo Execution Timing: All executors include
_demo_sleepvalues to simulate real execution time:- Network operations (HTTP, SSH, API): 0.2-0.5 seconds
- Container operations (Docker): 1.0 second
- LLM operations (CrewAI, Generate): 1.0-1.5 seconds
- Local operations (SystemInfo, Command, Aggregate): 0.05-0.1 seconds
- Global Demo Sleep Scale: Configurable via
APFLOW_DEMO_SLEEP_SCALEenvironment variable (default: 1.0)- Allows adjusting demo execution speed globally (e.g.,
0.5for faster,2.0for slower) - API:
set_demo_sleep_scale(scale)andget_demo_sleep_scale()functions
- Allows adjusting demo execution speed globally (e.g.,
- CrewAI Demo Support:
CrewaiExecutorandBatchCrewaiExecutorgenerate realistic demo results:- Based on
worksdefinition (agents and tasks) from task params or schemas - Includes simulated
token_usagematching real LLM execution patterns BatchCrewaiExecutoraggregates token usage across multiple works
- Based on
- Demo mode helps developers test workflows without external dependencies
- Built-in demo mode via
-
TaskModel Customization Improvements
-
Enhanced
set_task_model_class()with improved validation and error messages -
New
@task_model_register()decorator for convenient TaskModel registration -
Validation ensures custom classes inherit from
TaskModelwith helpful error messages -
Supports
__table_args__ = {'extend_existing': True}for extending existing table definitions -
Better support for user-defined
MyTaskModel(TaskModel)with additional fields -
Documentation for Hook Types
-
Added comprehensive documentation explaining differences between hook types
-
pre_hook/post_hook: Task-level hooks for individual task execution -
task_tree_hook: Task tree-level hooks for tree lifecycle events -
Clear usage scenarios and examples in
docs/development/extending.md
-
- LLM Model Parameter Naming: Unified LLM model parameter naming to
modelacross all components- Breaking Change:
llm_modelparameter ingenerate_executorhas been renamed tomodel- GenerateExecutor:
inputs["llm_model"]→inputs["model"] - API Routes:
params["llm_model"]→params["model"] - CLI:
--modelparameter remains unchanged (internal mapping updated)
- GenerateExecutor:
- New Feature: Support for
schemas["model"]configuration for CrewAI executor- Model configuration can now be specified in task schemas and will be passed to CrewaiExecutor
- Priority:
schemas["model"]>params.works.agents[].llm(CrewAI standard)
- Impact: Only affects generate functionality introduced in 0.5.0, minimal breaking change
- Migration: Update any code using
llm_modelparameter to usemodelinstead
- Breaking Change:
- Redundant decorators.py file
- Removed
src/apflow/decorators.pyas it was no longer used - Functionality superseded by
src/apflow/core/decorators.py - No impact on existing code (file was not imported by any other modules)
- Removed
-
Extended Executor Framework with Mainstream Execution Methods
-
HTTP/REST API Executor (
rest_executor)- Support for all HTTP methods (GET, POST, PUT, DELETE, PATCH, HEAD, OPTIONS)
- Authentication support (Bearer token, Basic auth, API keys)
- Configurable timeout and retry mechanisms
- Request/response headers and body handling
- JSON and form data support
- Comprehensive error handling for HTTP status codes
- Full test coverage with 15+ test cases
-
SSH Remote Executor (
ssh_executor)- Execute commands on remote servers via SSH
- Support for password and key-based authentication
- Key file validation with security checks (permissions, existence)
- Environment variable injection
- Custom SSH port configuration
- Timeout and cancellation support
- Comprehensive error handling for connection and execution failures
- Full test coverage with 12+ test cases
-
Docker Container Executor (
docker_executor)- Execute commands in isolated Docker containers
- Support for custom Docker images
- Volume mounts for data persistence
- Environment variable configuration
- Resource limits (CPU, memory)
- Container lifecycle management (create, start, wait, remove)
- Timeout handling with automatic container cleanup
- Option to keep containers after execution
- Comprehensive error handling for image not found, execution failures
- Full test coverage with 13+ test cases
-
gRPC Executor (
grpc_executor)- Call gRPC services and microservices
- Support for dynamic proto file loading
- Method invocation with parameter serialization
- Metadata and timeout configuration
- Error handling for gRPC status codes
- Support for unary, server streaming, client streaming, and bidirectional streaming
- Full test coverage with 10+ test cases
-
WebSocket Executor (
websocket_executor)- Bidirectional WebSocket communication
- Send and receive messages in real-time
- Support for JSON and text messages
- Custom headers for authentication
- Optional response waiting with timeout
- Connection error handling (invalid URI, connection closed, timeout)
- Cancellation support
- Full test coverage with 13+ test cases
-
apflow API Executor (
apflow_api_executor)- Call other apflow API instances for distributed execution
- Support for all task management methods (tasks.execute, tasks.create, tasks.get, etc.)
- Authentication via JWT tokens
- Task completion polling with production-grade retry logic:
- Exponential backoff on failures (1s → 2s → 4s → 8s → 30s max)
- Circuit breaker pattern (stops after 10 consecutive failures)
- Error classification (retryable vs non-retryable)
- Total failure threshold (20 failures across all polls)
- Detailed logging for debugging
- Timeout protection and cancellation support
- Comprehensive error handling for network, server, and client errors
- Full test coverage with 12+ test cases
-
Dependency Management
- Optional dependencies for new executors:
[ssh]: asyncssh for SSH executor[docker]: docker for Docker executor[grpc]: grpcio, grpcio-tools for gRPC executor[all]: Includes all optional dependencies
- Graceful handling when optional dependencies are not installed
- Clear error messages with installation instructions
- Optional dependencies for new executors:
-
Documentation
- Comprehensive usage examples for all new executors in
docs/guides/custom-tasks.md - Configuration parameters and examples for each executor
- Best practices and common patterns
- Error handling guidelines
- Comprehensive usage examples for all new executors in
-
Auto-discovery
- All new executors automatically registered via extension system
- Auto-imported in API service startup
- Available immediately after installation
-
-
MCP (Model Context Protocol) Executor (
mcp_executor)- Interact with MCP servers to access external tools and data sources
- Support for stdio and HTTP transport modes
- Operations: list_tools, call_tool, list_resources, read_resource
- JSON-RPC 2.0 protocol compliance
- Environment variable injection for stdio mode
- Custom headers support for HTTP mode
- Timeout and cancellation support
- Comprehensive error handling for MCP protocol errors
- Full test coverage with 20+ test cases
-
MCP (Model Context Protocol) Server (
api/mcp/)- Expose apflow task orchestration capabilities as MCP tools and resources
- Support for stdio and HTTP/SSE transport modes
- MCP Tools (8 tools):
execute_task- Execute tasks or task treescreate_task- Create new tasks or task treesget_task- Get task details by IDupdate_task- Update existing tasksdelete_task- Delete tasks (if all pending)list_tasks- List tasks with filteringget_task_status- Get status of running taskscancel_task- Cancel running tasks
- MCP Resources:
task://{task_id}- Access individual task datatasks://- Access task list with query parameters
- JSON-RPC 2.0 protocol compliance
- Integration with existing TaskRoutes for protocol-agnostic design
- HTTP mode: FastAPI/Starlette integration with
/mcpendpoint - stdio mode: Standalone process for local integration
- Comprehensive error handling with proper HTTP status codes
- Full test coverage with 45+ test cases across all components
- Protocol selection via
APFLOW_API_PROTOCOL=mcpenvironment variable - CLI protocol selection:
--protocolparameter forserveanddaemoncommands- Default protocol:
a2a - Supported protocols:
a2a,mcp - Usage:
apflow serve --protocol mcporapflow daemon start --protocol mcp
- Default protocol:
-
Task Tree Generator Executor (
generate_executor)- Generate valid task tree JSON arrays from natural language requirements using LLM
- Automatically collects available executors and their input schemas for LLM context
- Loads framework documentation (task orchestration, examples, concepts) as LLM context
- Supports multiple LLM providers (OpenAI, Anthropic) via configurable backend
- Comprehensive validation ensures generated tasks conform to TaskCreator requirements:
- Validates task structure (name, id consistency, parent_id, dependencies)
- Detects circular dependencies
- Ensures single root task
- Validates all references exist in the array
- LLM prompt engineering with framework context, executor information, and examples
- JSON response parsing with markdown code block support
- Can be used through both API and CLI as a standard executor
- API Endpoint:
tasks.generatemethod via JSON-RPC/tasksendpoint- Supports all LLM configuration parameters (provider, model, temperature, max_tokens)
- Optional
saveparameter to automatically save generated tasks to database - Returns generated task tree JSON array with count and status message
- Full test coverage with 8 API endpoint test cases
- CLI Command:
apflow generate task-treefor direct task tree generation- Supports output to file or stdout
- Optional database persistence with
--saveflag - Comprehensive test command examples in documentation
- Configuration via environment variables or input parameters:
OPENAI_API_KEYorANTHROPIC_API_KEYfor LLM authenticationAPFLOW_LLM_PROVIDERfor provider selection (default: openai)OPENAI_MODELorANTHROPIC_MODELfor model selection
- Full test coverage with 28+ executor test cases and 8 API endpoint test cases
- Usage examples:
# Python API task = await task_manager.task_repository.create_task( name="generate_executor", inputs={"requirement": "Fetch data from API, process it, and save to database"} )
// JSON-RPC API { "jsonrpc": "2.0", "method": "tasks.generate", "params": { "requirement": "Fetch data from API and process it", "save": true } }
# CLI apflow generate task-tree "Fetch data from API and process it" --save
-
CLI Task Commands Synchronization with API
- Complete synchronization of CLI task commands with API task routes
- Unified data format: CLI commands now return the same data structure as API endpoints using
task.to_dict() - New CLI commands matching API functionality:
tasks get <task_id>- Get task details (equivalent totasks.getAPI)tasks create --file <file>|--stdin- Create task tree from JSON file or stdin (equivalent totasks.createAPI)tasks update <task_id> [options]- Update task fields (equivalent totasks.updateAPI)tasks delete <task_id> [--force]- Delete task with validation (equivalent totasks.deleteAPI)tasks tree <task_id>- Get task tree structure (equivalent totasks.treeAPI)tasks children --parent-id <id>|--task-id <id>- Get child tasks (equivalent totasks.childrenAPI)tasks all [options]- List all tasks from database with filters (equivalent totasks.listAPI)
- Enhanced existing commands:
tasks list- Now returns fulltask.to_dict()format matching APItasks.running.listtasks status- Now includescontext_id,started_at, andupdated_atfields matching API format
- All CLI commands maintain API compatibility for consistent data formats
- Comprehensive test coverage with 43 test cases covering all CLI task commands
-
Unified A2A Protocol Task Management
- All task management operations now fully supported through A2A Protocol
/route - Standardized method naming:
tasks.execute,tasks.create,tasks.get,tasks.update,tasks.delete,tasks.detail,tasks.tree,tasks.children,tasks.running.list,tasks.running.status,tasks.running.count,tasks.cancel,tasks.copy,tasks.generate TaskRoutesAdaptercomponent bridges A2A ProtocolRequestContext/EventQueuewith existingTaskRouteshandlers- Automatic conversion between A2A Protocol format and internal task representation
- Real-time task status updates via
TaskStatusUpdateEventfor all task management operations - Backward compatibility:
execute_task_treeskill ID still supported (maps totasks.execute) - All 14 task management skills registered in Agent Card for protocol compliance
- Comprehensive test coverage with 17+ test cases for all task management methods
- Fixed
MessageSendConfigurationaccess error: Properly handle Pydantic model attributes instead of dictionary methods
- All task management operations now fully supported through A2A Protocol
-
A2A Protocol Configuration Access
- Fixed
AttributeError: 'MessageSendConfiguration' object has no attribute 'get'inTaskRoutesAdapter - Properly handle Pydantic model attributes using
getattr()andmodel_dump()instead of dictionary methods - Compatible with both Pydantic v1 and v2
- All 5 previously failing integration tests now pass
- Fixed
-
A2A Protocol Cancel Method Implementation
- Complete implementation of
AgentExecutor.cancel()method for A2A Protocol - Task ID extraction with priority:
task_id>context_id>metadata.task_id>metadata.context_id - Support for custom error messages via
metadata.error_message - Graceful cancellation by calling executor's
cancel()method if supported - Token usage and partial results preservation during cancellation
- Proper
TaskStatusUpdateEventgeneration with A2A Protocol compliance - Comprehensive error handling for task not found, already completed, and exception scenarios
- Complete test coverage with 13 test cases covering all scenarios
- Documentation updates in HTTP API and Python API references
- Complete implementation of
-
Enhanced Task Copy Functionality
childrenparameter forcreate_task_copy(): WhenTrue, also copy each direct child task with its dependencies- Deduplication ensures tasks depending on multiple copied tasks are only copied once
childrenparameter fortasks.copyAPI endpoint--childrenflag for CLItasks copycommand
-
Copy Before Execution
copy_executionparameter fortasks.executeAPI: Copy task before execution to preserve original task historycopy_childrenparameter: WhenTruewithcopy_execution=True, also copy each direct child task with its dependencies- Response includes both
task_id(copied task) andoriginal_task_id(original task) whencopy_execution=True - Combines
tasks.copyandtasks.executeinto a single API call for better user experience
-
Enhanced Task Deletion with Validation
- Physical deletion: Tasks are now physically removed from the database (not soft-deleted)
- Conditional deletion: Tasks can only be deleted if all tasks (task itself + all children) are in
pendingstatus - Recursive child deletion: When deletion is allowed, all child tasks (including grandchildren) are automatically deleted
- Dependency validation: Deletion is prevented if other tasks depend on the task being deleted
- Detailed error messages: Returns specific error information when deletion fails:
- Lists non-pending children with their statuses
- Lists tasks that depend on the task being deleted
- Indicates if the task itself is not pending
- New TaskRepository methods:
get_all_children_recursive(): Recursively get all child tasksfind_dependent_tasks(): Find all tasks that depend on a given task (reverse dependencies)delete_task(): Physically delete a task from the database
- Comprehensive test coverage with 16 test cases covering all scenarios
-
Enhanced Task Update with Critical Field Validation
- Critical field protection: Three critical fields (
parent_id,user_id,dependencies) are now strictly validated to prevent fatal errors parent_idanduser_id: Always rejected - these fields cannot be modified after task creation (task hierarchy and ownership are fixed)dependencies: Conditional validation with four critical checks:- Status check: Can only be updated when task is in
pendingstatus - Reference validation: All dependency references must exist in the same task tree
- Circular dependency detection: Uses DFS algorithm to detect and prevent circular dependencies
- Execution check: Prevents updates if any dependent tasks are currently executing
- Status check: Can only be updated when task is in
- Other fields: Can be updated freely without status restrictions (inputs, name, priority, params, schemas, status, result, error, progress, timestamps)
- Comprehensive error reporting: All validation errors are collected and returned in a single response
- New TaskRepository methods:
update_task(): Update task dependencies with validationupdate_task_name(): Update task nameupdate_task_priority(): Update task priorityupdate_task_params(): Update executor parametersupdate_task_schemas(): Update validation schemas
- New utility module:
dependency_validator.pywith reusable validation functions - Comprehensive test coverage with 23 test cases covering all validation scenarios
- Critical field protection: Three critical fields (
-
Docker Executor Exit Code Extraction
- Fixed incorrect exit code handling:
container.wait()returns{"StatusCode": 0}dict, not integer - Properly extract
StatusCodefrom wait result dictionary - Fixed container removal logic to prevent duplicate cleanup calls
- Improved cancellation handling before container start
- Fixed incorrect exit code handling:
-
WebSocket Executor Exception Handling
- Fixed
ConnectionClosedexception construction in tests - Fixed
InvalidURIexception construction in tests - Added proper
asyncioimport for timeout error handling - Improved error handling for WebSocket connection failures
- Fixed
-
SSH Executor Key File Validation
- Fixed key file validation in tests by properly mocking file system operations
- Added proper handling for key file permissions and existence checks
- Improved error messages for authentication failures
-
API Executor Infinite Polling Loop
- Fixed infinite polling loop when API calls fail repeatedly
- Implemented production-grade retry logic with exponential backoff
- Added circuit breaker pattern to stop polling after consecutive failures
- Added total failure threshold to prevent resource waste
- Improved error classification (retryable vs non-retryable errors)
- Enhanced logging for better debugging in production environments
-
Webhook Support for Task Execution
- Webhook callbacks for
tasks.executeJSON-RPC endpoint (similar to A2A Protocol push notifications) WebhookStreamingContextclass for sending HTTP callbacks during task execution- Real-time progress updates sent to configured webhook URL
- Configurable webhook settings:
url(required): Webhook callback URLheaders(optional): Custom HTTP headers (e.g., Authorization)method(optional): HTTP method (default: POST)timeout(optional): Request timeout in seconds (default: 30.0)max_retries(optional): Maximum retry attempts (default: 3)
- Automatic retry mechanism with exponential backoff for failed requests
- Error handling: Client errors (4xx) are not retried, server errors (5xx) and network errors are retried
- Webhook payload includes: protocol identifier, task status, progress, result, error, and timestamp
- Update types:
task_start,progress,task_completed,task_failed,final - Comprehensive test coverage for webhook functionality
- Webhook callbacks for
-
Protocol Identification
- Added
protocolfield to all API responses to distinguish between execution modes- JSON-RPC endpoints return
"protocol": "jsonrpc"in response - A2A Protocol endpoints include
"protocol": "a2a"in metadata and event data
- JSON-RPC endpoints return
- Enables clients to identify which protocol was used for task execution
- Consistent protocol identification across streaming updates and webhook callbacks
- Added
-
Streaming Mode for JSON-RPC
- Streaming mode support for
tasks.executeendpoint viause_streamingparameter - Real-time progress updates via Server-Sent Events (SSE) - returns
StreamingResponsedirectly whenuse_streaming=true TaskStreamingContextclass for in-memory event storage- Consistent behavior with A2A Protocol streaming mode
- Asynchronous task execution with immediate response
- Streaming mode support for
-
Examples Module
- New
examplesmodule for initializing example task data to help beginners get started - CLI command
examples initfor initializing example tasks in the database --forceflag to re-initialize examples even if they already exist- Example tasks demonstrate various features:
- Tasks with different statuses (completed, failed, pending, in_progress)
- Task trees with parent-child relationships
- Tasks with different priorities
- Tasks with dependencies
- CrewAI task example (requires LLM key)
- Auto-initialization: API server automatically initializes examples if database is empty on startup
- Example user ID:
example_userfor demo tasks
- New
-
LLM API Key Management
- Request header support:
X-LLM-API-KEYheader for demo/one-time usage- Simple format:
X-LLM-API-KEY: <api-key>(provider auto-detected from model name) - Provider-specific format:
X-LLM-API-KEY: <provider>:<api-key>(e.g.,openai:sk-xxx,anthropic:sk-ant-xxx)
- Simple format:
- User configuration support via
llm-key-configextension for multi-user scenarios- In-memory storage of user-specific LLM keys (never stored in database)
- Support for multiple providers per user (provider-specific keys)
LLMKeyConfigManagersingleton for managing user keys
- LLM Key Context Manager: Thread-local context for LLM keys during task execution
- LLM Key Injector: Automatic provider detection from model names and works configuration
- Priority order: Request header > User config > Environment variables
- Supported providers: OpenAI, Anthropic, Google/Gemini, Mistral, Groq, Cohere, Together, and more
- Automatic environment variable injection for CrewAI/LiteLLM compatibility
- Keys are never stored in database, only used during task execution
- Request header support:
-
Documentation Updates
- Added
examplescommand documentation in CLI guide - Added LLM API key management documentation in API server guide
- Added LLM key header documentation in HTTP API reference
- Updated examples documentation with quick start guide and example task structure
- Comprehensive examples for using LLM keys with CrewAI tasks
- Added webhook configuration documentation in HTTP API reference
- Added protocol identification documentation
- Added
-
Unified Task Execution Architecture
- Refactored task execution logic to unify behavior across A2A Protocol and JSON-RPC endpoints
- Added
TaskExecutor.execute_task_by_id()method for executing tasks by ID with automatic dependency handling - Moved dependency collection and subtree building logic from protocol handlers to core
TaskExecutorlayer - Unified execution modes:
- Root task execution: Executes the entire task tree when a root task is specified
- Child task execution: Automatically collects all dependencies (including transitive) and executes the task with required dependencies
- Both A2A Protocol and JSON-RPC now use the same core execution methods for consistent behavior
- Protocol handlers are now thin wrappers around core execution logic, making the system more library-friendly
- Improved code reusability and maintainability by centralizing execution logic in
TaskExecutor
-
Task Re-execution Support
- Added support for re-executing failed tasks via
tasks.executeendpoint - Failed tasks can now be re-executed by calling
tasks.executewith the failed task's ID - When re-executing a task, all its dependency tasks are also re-executed (even if they are completed) to ensure consistency
- Only
pendingandfailedstatus tasks are executed;completedandin_progresstasks are skipped unless marked for re-execution - Dependency satisfaction logic updated to allow completed tasks marked for re-execution to satisfy dependencies (results are available)
- Newly created tasks (status:
pending) are not marked for re-execution and execute normally
- Added support for re-executing failed tasks via
-
Task Execution Order Clarification
- Important: Parent-child relationships (
parent_id) are now explicitly documented as organizational only and do NOT affect execution order - Only dependencies (
dependencies) determine execution order - a task executes when its dependencies are satisfied - This clarification ensures developers understand that task tree structure (parent-child) is separate from execution order (dependencies)
- Updated all documentation to clearly distinguish between organizational relationships and execution dependencies
- Important: Parent-child relationships (
-
CLI Command Improvements
servecommand now accepts options directly (e.g.,apflow serve --port 8000) without requiringstartsubcommandserve startsubcommand still works for backward compatibility- Improved command structure and user experience
-
CORS Support
- Added CORS middleware to API server for cross-origin requests
- Default configuration allows
localhost:3000,localhost:3001and common development ports - Configurable via
APFLOW_CORS_ORIGINSenvironment variable (comma-separated list) - Development mode:
APFLOW_CORS_ALLOW_ALL=trueto allow all origins - Supports credentials, all HTTP methods, and all headers
-
API Architecture Refactoring
- Moved documentation routes (
/docs,/openapi.json) toapi/routes/docs.pyfor better code organization - Consolidated OpenAPI schema generation logic into
DocsRoutesclass - Improved separation of concerns: route handlers in
api/routes/, documentation tools inapi/docs/ - All custom routes (tasks, system, docs) are now defined in a unified structure
- Moved documentation routes (
-
SSE Streaming Simplification
- SSE streaming is now handled directly by
tasks.executewithuse_streaming=true - When
use_streaming=true,tasks.executereturns aStreamingResponsewith Server-Sent Events - Simplified API design: one endpoint (
tasks.execute) handles both regular POST and SSE streaming modes - Webhook callbacks remain independent and can be used with either response mode
- SSE streaming is now handled directly by
-
Test Infrastructure
- Consolidated database session management in
conftest.pywith globaluse_test_db_sessionfixture - All tests now use isolated in-memory DuckDB databases to prevent data pollution
- Removed dependency on persistent database files for testing
- Improved test isolation and reliability
- Fixed
test_webhook_config_validationtest by usingpatch.object()for TaskTracker singleton - Updated
test_jsonrpc_tasks_execute_with_streamingto correctly parse SSE stream responses - Added comprehensive test coverage for documentation routes (
/docsand/openapi.json) with 14 test cases
- Consolidated database session management in
-
Documentation Corrections
- Fixed incorrect command examples in README.md and docs:
- Corrected
run my_batchto properrun flow --tasksorrun flow executor_idformat - Corrected
run flow example_flowto proper executor ID format - Removed non-existent
list-flowscommand from documentation
- Corrected
- Ensured all command examples in documentation are accurate and testable
- Updated SSE streaming documentation to reflect direct integration with
tasks.execute
- Fixed incorrect command examples in README.md and docs:
-
Task Tree Validation
- Circular dependency detection using DFS algorithm in
TaskCreator.create_task_tree_from_array() - Single task tree validation ensuring all tasks are in the same tree structure
- Validation that only one root task exists
- Verification that all tasks are reachable from root task via parent_id chain
- Dependent task inclusion validation (ensures all tasks that depend on tasks in the tree are included)
- Comprehensive test coverage for circular dependency scenarios
- Circular dependency detection using DFS algorithm in
-
Task Copy Functionality
TaskCreator.create_task_copy()method for creating executable copies of task trees- Automatic inclusion of dependent tasks (including transitive dependencies) when copying
- Special handling for failed leaf nodes (filters out pending dependents)
- Minimal subtree construction to include only required tasks
- Task copy fields in
TaskModel:original_task_id(links copy to original),task_tree_id(for tree grouping),origin_type(tracks origin type), andhas_references(indicates if task has copies) - API endpoint
tasks.copyvia JSON-RPC/tasksendpoint - CLI command
tasks copy <task_id>for copying task trees - Comprehensive test coverage for task copy functionality
-
Task Orchestration Engine
TaskManager: Core task orchestration with dependency management and tree executionTaskRepository: Data access layer for task CRUD operationsTaskModel: Task definition model with support for custom fields via inheritance- Task tree structure with parent-child relationships
- Priority-based task execution
- Dependency resolution and satisfaction checking
-
A2A Protocol Integration
- A2A Protocol as the standard communication protocol
- Task definition (TaskModel) vs execution instance (A2A Task) separation
- Context ID mapping: TaskModel.id → A2A Task.context_id
- Uses A2A SDK's InMemoryTaskStore for task execution instances
- TaskModel persistence handled by TaskRepository (separate from A2A TaskStore)
-
API Service Layer
- A2A server implementation with Starlette
- Custom A2A Starlette application with system routes
- Task management APIs:
/system/task_create,/system/task_get,/system/task_update,/system/task_delete - Optional JWT authentication middleware
- Support for custom TaskModel via
task_model_classparameter - Environment variable support for custom TaskModel loading (
APFLOW_TASK_MODEL_CLASS)
-
Storage Module
- SQLAlchemy-based storage with DuckDB (default) and PostgreSQL support
- Automatic table creation on first use
- Configurable table name via
APFLOW_TASK_TABLE_NAME(default:apflow_tasks) - Session factory with sync/async support
-
Custom TaskModel Support
- Users can define custom TaskModel subclasses with additional fields
- TaskRepository supports custom TaskModel classes
- Custom fields automatically handled in task creation APIs
- Example:
MyTaskModel(TaskModel)withproject_id,departmentfields
-
Event Queue Bridge
- Streaming callbacks integration with A2A Protocol EventQueue
- Real-time task execution progress updates
-
Base Task Infrastructure
BaseTask: Optional base class for executable tasks with common implementations- Input validation utilities (
get_input_schema,validate_input_schema,check_input_schema) - Cancellation support via
cancellation_checkercallback - Streaming context support for progress updates
- Support for Pydantic BaseModel or JSON schema dict for input validation
-
Executor Extension System
- Unified
ExtensionRegistryfor executor registration and discovery @executor_register()decorator for automatic registration- Category and type-based executor discovery
- Extension system supporting executors, storage, hooks, and tools
- Globally unique ID-based extension lookup
- Unified
-
Built-in Executors
AggregateResultsExecutor: Aggregates dependency task results into structured formatSystemInfoExecutor: Safe system resource queries (CPU, memory, disk) with predefined commandsCommandExecutor: Shell command execution (stdio extension)CrewaiExecutor: LLM-based agent crew execution via CrewAI with token usage trackingBatchCrewaiExecutor: Atomic batch execution of multiple crews with result merging
- Project structure with
src-layout pyproject.tomlwith optional dependencies ([a2a],[crewai],[cli],[postgres],[all])- Comprehensive documentation in
docs/directory - Test suite with pytest fixtures
- Modular architecture separating core from optional features