Skip to Content
WhitepaperOrchestration EnginePart 1

Orchestration Engine

(Part 1 of 2 — same chapter in the PDF; split for the web site.)

Chapter 6 Orchestration Engine Strategic Takeaway Our fractal async pattern prevents the orchestration failures that kill other agent frameworks at scale—every step, whether synchronous or long-running, resolves through the same shadow-graph state machine with automatic retry and fault isolation. The orchestration engine turns a recipe invocation into a completed job. It resolves the DAG, dispatches each step to the appropriate execution mode, coordinates with external providers, and reports progress to clients. This chapter specifies the lifecycle precisely enough that an independent operator could reimplement it. The design follows a single governing principle: one action, one commit. Every database mutation is atomic; complex nested transactions are avoided entirely. This replaced earlier lock-contention patterns and eliminated an entire class of race conditions. 6.1 Job lifecycle A job is the execution instance of a recipe invocation. Its status follows this state machine: Status Meaning PENDING Created; awaiting dispatch. QUEUED Accepted into a Celery task queue. DISPATCHING Recipe executor is resolving the DAG and beginning step dispatch. PROCESSING At least one step is actively executing. PREVIEW Partial results available (e.g., an intermediate frame). COMPLETED All steps finished successfully; outputs available. FAILED A step failed beyond retry limits; the job is terminal. CANCELLED Marked for cancellation (see §6.8 for current limitations). EXPIRED TTL exceeded without completion. DELETED Soft-deleted from user view. Transitions are enforced by the recipe executor and worker tasks. Each transition triggers a job_events publication (see §6.9). 6.2 Recipe executor The RecipeExecutorV2 is the core engine. On receiving a job dispatch, it parses the recipe’s plan_definition JSON into a step graph, creates the shadow graph (see §6.3), resolves ready 27

PENDING QUEUED DISPATCHING PROCESSING PREVIEW COMPLETED FAILED CANCELLED EXPIRED partial continue TTL Figure 6.1: Job lifecycle (simplified). DELETED is a soft-delete visibility state omitted here. Terminal states are shaded. steps, dispatches each to the appropriate execution mode, and waits for completion events. On each event, it updates the shadow graph (one commit), re-resolves, and dispatches newly-ready steps. When all steps have completed (or one has failed beyond retry policy), the job reaches terminal state. 6.3 The shadow graph Strategic Takeaway The shadow graph is the durable, per-job execution state machine that makes crash recovery, parallel coordination, and external job management possible without holding any state in worker memory. Workers are expensive; waiting is free when the graph holds the state. 6.3.1 Evolution The shadow graph evolved through three generations, each motivated by a production failure in Delula (Chapter 10): Generation 1: in-memory queue. Steps tracked in a process-local map. Process restart lost all in-flight state—users saw generations vanish mid-processing with no error and no recovery path. Death: routine deployment restarts. Generation 2: database-backed queue. PostgreSQL FOR UPDATE SKIP LOCKED for atomic dequeue. Step results persisted between restarts. But step relationships (the DAG structure) were reconstructed from the recipe JSON on resume, not stored. Complex multi-step recipes with parallel branches could not reliably resume. Generation 3: shadow graph (current). The full DAG structure—nodes, edges, incoming dependency counts, ready queue—is persisted in PostgreSQL alongside step results. Any worker can load the graph, see exactly which steps completed, which failed, which are running, and which are waiting for external events. Resume from any point without re-parsing or re-executing. 28

6.3.2 Data model The shadow graph is stored in JobExecutionState (one row per job) with two JSONB columns: • step_results: accumulated outputs from completed steps, structured as {“steps”: {“<step_id>”: {“outputs”: {…}}}}. • step_metadata.workflow_instance: the graph structure—nodes (with complete, running, failed flags, retry count, timing), edges, incoming dependency counts, and the ready queue. Additional state tracks parallel operations (pending_operations, completed_operations), external dependencies (waiting_for: provider, external job ID, polling task ID per operation), workflow status (RUNNING, WAITING_SINGLE, WAITING_PARALLEL, READY_TO_RESUME), and a resume_token that changes on each resume to prevent duplicate processing. Every state transition is logged as a WorkflowEvent with event type, source, payload, correlation IDs, and an idempotency key—providing complete event-sourced audit trails. 6.3.3 Operations All shadow graph mutations follow the one-action-one-commit discipline, using with_for_update(nowait=True) row locking: init_instance(): Parse recipe → build nodes, edges, incoming counts, initial ready queue → commit. mark_running(): Flag node as running, set started_at → commit. mark_complete(): Flag node as complete → decrement incoming counts on successors → return newly-ready steps → commit. mark_failed(): Flag node as failed, store error → commit. pop_ready(): Dequeue next ready step(s) → commit. is_complete(): All nodes complete/failed AND ready queue empty. 6.3.4 What the shadow graph enables Crash recovery without re-execution. Worker dies mid-recipe → another worker loads the shadow graph → sees which steps completed, which timed out, which are waiting → re-dispatches only the interrupted step. acks_late ensures the broker redelivers. Parallel fan-out with partial completion. Steps B, C, D depend on A. A completes → mark_complete(“A”) decrements incoming counts → B, C, D enter the ready queue → dispatched in parallel. If D fails, E (which depends on B, C, D) never becomes ready. The graph knew which steps to wait for and which to abandon. External job coordination without blocking workers. Step invokes FAL image generation → returns ExternalJobSubmitted → shadow graph records in waiting_for → workflow enters WAITING_SINGLE → worker is released. Minutes later, webhook or polling discovers the result → shadow graph updated → newly-ready steps dispatched. No worker was blocked. Conditional skip propagation. When a condition evaluates to false, the step is stored as {“skipped”: true} and mark_complete is called—downstream dependents see it as done and proceed. The graph handles skipped steps as completed nodes. The shadow graph is inspectable via API (GET /jobs/{id}/graph), enabling developers to debug complex recipes and trace execution paths. Each nested recipe invocation (fractal async) creates its own JobExecutionState with its own shadow graph; the parent sees one async step while the child manages its own DAG independently. 29

6.4 Worker architecture Execution is distributed across Celery workers connected via Redis as the message broker: Service Queue Role Recipe executors recipe_executors DAG resolution, step dispatch, template resolution Background processors background_processing CPU-intensive work (image/video processing) Webhook listeners webhook_listeners Process inbound provider webhook events Polling watchdogs polling_watchdogs Fallback polling for providers without webhooks Main workers main_processing Job initialization, routing, general tasks Clients / agents (HTTP, future MCP/A2A) API layer (FastAPI) — auth, x402, job CRUD Orchestration (RecipeExecutorV2, shadow graph) PostgreSQL (jobs, registry, ledger) Redis (Celery broker, pub/sub, concurrency) Celery workers (recipe, background, webhooks, polling) Provider adapters (BaseProviderAdapter) External model / media APIs Artifact storage (S3 / CloudFront) SSE Figure 6.2: As-built execution stack (single-operator deployment). Federated cells would partition or replicate the Redis/Postgres roles (Chapter 15). All workers set task_acks_late = True globally, and many tasks redeclare it explicitly. This means a task is acknowledged only after completion, not on receipt. If a worker crashes mid-task, the broker redelivers the message to another worker. The trade-off is that steps must tolerate duplicate execution — a design constraint that propagates to webhook handlers, provider calls, and billing settlement. 6.4.1 The fractal async pattern Some ingredients (notably the video orchestrator) are themselves long-running orchestrators: they manage multiple sub-jobs internally (scene generation, stitching, transitions) while appearing as a single async step to the parent recipe. This is the fractal async pattern: the parent recipe executor treats the step as async (waiting for a webhook or polling event), while the ingredient internally manages its own synchronous or async sub-task sequence. This avoids “async coordination hell” — the parent sees one step with one completion event, regardless of internal complexity. 30

6.5 External job coordination: dual-path convergence External providers complete work out-of-band. Rather than treating polling as a fallback for webhooks, the system runs a dual-path race: both webhook and polling are always active for every external job. Whichever arrives first writes the result and cancels the other. Neither can stomp on the other. 6.5.1 The two paths Inbound webhooks (fast path): Providers POST completion events to dedicated endpoints. The handler verifies the HMAC-SHA256 signature, persists a ProviderWebhookEvent, and queues a Celery task to update the shadow graph. Polling watchdog (reliability path): A periodic task checks external job status and synthesizes the equivalent completion event. Uses sinusoidal backoff (30s → 60s → 120s). Catches anything the webhook missed—network failures, provider bugs, DNS issues, lost callbacks. 6.5.2 Three-layer convergence guarantee Layer 1: Shadow graph collision detection. Before doing any work, both paths check whether the step is already complete in the shadow graph. The second arrival finds the node marked complete and returns immediately as a no-op. Layer 2: Active cancellation. When the webhook succeeds, it revokes the polling task via celery_app.control.revoke() with terminate=True. When polling succeeds, the webhook arriving later hits Layer 1. Cancellation is an optimization—the system is safe without it. Layer 3: Atomic write. Both paths write through the same atomic_store_step_result function with with_for_update(nowait=True) row locking. The first writer commits; the second either finds the step complete (Layer 1) or fails to acquire the lock and backs off. 6.5.3 Failure scenarios Scenario Outcome Webhook arrives normally Result written, polling revoked. Fast path. Webhook lost Polling discovers result on next cycle. No data loss. Webhook delayed (arrives after polling) Polling writes first. Webhook is no-op via Layer 1. Simultaneous arrival Row lock serializes. First writer wins. Both fail Job TTL expires via type-aware recovery (24-hour safety net). This is defense in depth for external coordination: two independent paths racing, with three layers of convergence protection. The design was extracted from Delula’s production incident history, where early implementations without these guarantees produced “ghost steps”—steps that appeared complete in one view but were still processing in another. 6.6 Concurrency and provider slots The ConcurrencyManager enforces per-provider slot limits using Redis. Each provider has a configured maximum concurrency (default: 1). When a step attempts to use a provider:

  1. WATCH the provider’s concurrency key (concurrency:{provider}). 31

Source: transcribed from the compiled Scrypted Network Design whitepaper PDF for web reading. Layout, figures, and pagination may differ from the PDF.

Last updated on