Courier MFT

Job Engine Design

Multi-step job pipelines — define, schedule, and execute file transfer workflows.

The Job Engine is the core runtime of Courier. It is responsible for scheduling, executing, pausing, resuming, and monitoring multi-step file transfer workflows. The engine supports both standalone jobs and ordered execution chains with configurable dependency behavior.

5.1 Core Concepts

Job: A named, versioned pipeline of one or more Steps that execute sequentially. A Job has configuration (connections, paths, encryption keys), a schedule, a failure policy, and an execution history. When a Job definition is edited, previous runs retain their original configuration — new runs use the updated definition.

Step: A single unit of work within a Job. Each Step has a type (e.g., SftpDownload, PgpEncrypt, Zip), its own configuration, a timeout, and independent state tracking. Steps pass data to downstream steps via a shared JobContext.

Job Chain: A named, ordered group of Jobs that must execute in sequence. Chains define execution order and dependency relationships. A Chain can be scheduled or triggered on demand, and it manages the lifecycle of all its member Jobs as a coordinated unit.

Job Context: A key-value dictionary that accumulates outputs as Steps execute. For example, an SftpDownload step writes \{ "downloaded_file": "/tmp/courier/abc123/invoice.pgp" \} and the subsequent PgpDecrypt step reads that path as its input. The context is persisted to the database alongside step state so that resumed jobs have access to all prior outputs.

5.2 Step Type Registry

Step types are implemented as classes that implement the IJobStep interface. This provides a plugin-style architecture so new step types can be added without modifying the engine core.

public interface IJobStep
{
    string TypeKey { get; }                          // e.g., "sftp.download", "pgp.encrypt"
    Task<StepResult> ExecuteAsync(
        StepConfiguration config,
        JobContext context,
        CancellationToken cancellationToken);
    Task<ValidationResult> ValidateAsync(
        StepConfiguration config);
    Task RollbackAsync(
        StepConfiguration config,
        JobContext context);                          // Best-effort cleanup on failure
}

V1 Step Types:

Step Type KeyDescription
sftp.uploadUpload file(s) to an SFTP server
sftp.downloadDownload file(s) from an SFTP server
ftp.uploadUpload file(s) to an FTP/FTPS server
ftp.downloadDownload file(s) from an FTP/FTPS server
pgp.encryptEncrypt file(s) using a PGP public key
pgp.decryptDecrypt file(s) using a PGP private key
file.zipCompress file(s) into an archive
file.unzipExtract file(s) from an archive
file.moveMove file(s) to a destination path
file.copyCopy file(s) to a destination path
file.deleteDelete file(s) from a path
azure_function.executeTrigger an Azure Function and poll for completion via Application Insights

Each step type is registered in a StepTypeRegistry at startup via dependency injection. The registry resolves the correct IJobStep implementation by TypeKey at runtime.

5.3 Job State Machine

Jobs follow a strict state machine with defined valid transitions:

                  ┌──────────┐
                  │ Created  │
                  └────┬─────┘
                       │ (enqueue)
                  ┌────▼─────┐
            ┌─────│  Queued   │
            │     └────┬─────┘
            │          │ (slot available)
            │     ┌────▼─────┐
            │     │ Running  │◄──── (resume)
            │     └──┬──┬──┬─┘
            │        │  │  │
            │        │  │  └──────────┐
            │        │  │             │ (pause)
            │        │  │        ┌────▼─────┐
            │        │  │        │  Paused   │
            │        │  │        └───────────┘
            │        │  │
            │        │  └─────┐ (failure, policy = stop)
            │        │        │
   (cancel) │   ┌────▼────┐ ┌─▼───────┐
            └──►│Cancelled│ │ Failed  │
                └─────────┘ └─────────┘
                       │
                  ┌────▼─────┐
                  │Completed │
                  └──────────┘

Valid transitions are enforced in code. Any invalid transition throws an InvalidJobStateTransitionException.

5.4 Step State Machine

Each step within a job tracks its own state independently:

  • Pending → Step has not yet started
  • Running → Step is currently executing
  • Completed → Step finished successfully (output written to JobContext)
  • Failed → Step failed (error details persisted)
  • Skipped → Step was skipped due to failure policy configuration

After each step completes (or fails), its state and any context outputs are persisted to the database in a single transaction. This is the foundation of the checkpoint/resume system.

5.5 Checkpoint & Resume

When a Job is paused or fails at Step N, the engine persists:

  • The Job's current state (Paused or Failed)
  • Each Step's state (Completed, Failed, or Pending)
  • The full JobContext dictionary at the point of interruption
  • Error details for the failed step (if applicable)

On resume, the engine:

  1. Loads the Job and its step states from the database
  2. Restores the JobContext from the persisted snapshot
  3. Identifies the first step in Pending or Failed state
  4. Begins execution from that step, skipping all Completed steps

This means a 10-step job that failed at step 7 will resume from step 7 with all outputs from steps 1–6 intact. File outputs from completed steps must still exist on disk — the engine validates this before resuming and fails fast if intermediate files are missing.

5.6 Step Context & Data Passing

Steps communicate via the JobContext, a typed dictionary scoped to a single job execution:

public class JobContext
{
    private readonly Dictionary<string, object> _data = new();

    public void Set<T>(string key, T value);
    public T Get<T>(string key);
    public bool TryGet<T>(string key, out T value);
    public IReadOnlyDictionary<string, object> Snapshot();  // For persistence
}

Convention for keys: Steps write outputs using the pattern \{stepIndex\}.\{outputName\}, e.g., "0.downloaded_files", "1.decrypted_file". Step configuration references upstream outputs using the same keys, which the engine resolves at runtime before passing config to the step.

The entire context is serialized to JSON and stored in the job_executions table after each step, enabling checkpoint/resume.

5.7 Scheduling

Courier uses Quartz.NET as its scheduling backbone, with schedule definitions stored in PostgreSQL so they survive application restarts and container re-deployments.

Cron scheduling: Jobs and Chains can be assigned a cron expression (e.g., 0 0 3 ? * TUE for every Tuesday at 3:00 AM). Quartz manages trigger firing and misfire handling.

One-shot scheduling: Jobs can be scheduled to run once at a specific future datetime. Implemented as a Quartz SimpleTrigger with a repeat count of zero.

On-demand execution: An API endpoint (POST /api/jobs/\{id\}/trigger) enqueues a job for immediate execution, bypassing the scheduler. This creates a new JobExecution record and places it in the queue.

All three mechanisms feed into the same execution queue, ensuring consistent concurrency management.

5.8 Concurrency Management

The engine enforces a configurable maximum of concurrent job executions (default: 5). This is implemented as a persistent semaphore backed by the database, not an in-memory SemaphoreSlim, so it works correctly across container restarts.

Queue dequeue pattern (runs on a 5-second poll in the Worker host):

-- Atomic dequeue: claim the next queued execution if a slot is available
WITH running AS (
    SELECT COUNT(*) AS cnt
    FROM job_executions
    WHERE state = 'running'
),
next_job AS (
    SELECT je.id
    FROM job_executions je, running r
    WHERE je.state = 'queued'
      AND r.cnt < (SELECT value::int FROM system_settings WHERE key = 'job.concurrency_limit')
    ORDER BY je.queued_at ASC
    LIMIT 1
    FOR UPDATE SKIP LOCKED   -- prevents duplicate pickup across concurrent polls
)
UPDATE job_executions
SET state = 'running', started_at = now()
FROM next_job
WHERE job_executions.id = next_job.id
RETURNING job_executions.id, job_executions.job_id;

FOR UPDATE SKIP LOCKED is critical: if two poll cycles overlap (e.g., Quartz fires a trigger while the queue poll is running), the second query skips the row already locked by the first, preventing double-execution. This pattern is used consistently wherever Courier claims "one worker picks it up" — queue dequeue, monitor event processing, and partition maintenance.

Queue behavior: When all slots are occupied, new job executions enter Queued state and are picked up in FIFO order as slots free up. The queue is polled on a short interval (default: 5 seconds) by a background service.

Concurrency limit is global, not per-job. A single job definition can have multiple concurrent executions if the schedule overlaps and slots are available. If this needs to be prevented, a "max instances" setting per job can be added (not in V1 scope, but the schema supports it).

5.9 Job Dependencies

Jobs can declare dependencies on other Jobs. A dependency means "do not start this Job until the specified upstream Job's most recent execution has completed."

Dependency configuration per edge:

  • required_status: The upstream job must reach this status. Default: Completed. Can be set to Any to allow the downstream job to run even if the upstream failed.
  • run_on_failure: Boolean, default false. If true, the downstream job starts even when the upstream job fails. This is configured per-dependency, giving fine-grained control.

Validation: At job creation/edit time, the system performs a topological sort to detect circular dependencies. If a cycle is detected, the save is rejected with a descriptive error.

5.10 Job Chains (Execution Groups)

A Job Chain is a first-class entity that represents an ordered sequence of Jobs:

Chain: "Daily Partner Invoice Processing"
  ├── Job 1: Download invoices from Partner SFTP
  ├── Job 2: Decrypt PGP files (depends on Job 1)
  ├── Job 3: Unzip archives (depends on Job 2)
  └── Job 4: Move to processing folder (depends on Job 3)

Chain properties:

  • Name & description: Human-readable identification
  • Member jobs: Ordered list of Job references with dependency edges
  • Schedule: A Chain can have its own cron/one-shot schedules via the separate chain_schedules table. When triggered, it starts the first Job(s) with no upstream dependencies
  • Chain-level state: Pending → Running → Completed | Failed | Paused | Cancelled
  • Propagation behavior: As each Job completes, the Chain evaluates which downstream Jobs are unblocked and enqueues them

Chain Scheduling: Chains support the same scheduling types as Jobs — cron (recurring via Quartz cron expressions) and one_shot (single future execution that auto-disables after firing). Chain schedules are stored in a separate chain_schedules table (not the job_schedules table) to maintain clean separation. The Worker runs a ChainScheduleManager that registers chain schedules with Quartz.NET under the "courier-chains" group (separate from job schedules in the "courier" group). The ScheduleStartupSync background service syncs both job and chain schedules on a 30-second interval. When a chain schedule fires, QuartzChainAdapter calls ChainExecutionService.TriggerAsync() with triggeredBy: "schedule".

Chain vs. standalone dependencies: Jobs can have dependencies both within a Chain and as standalone relationships. Chains are a convenience for defining and scheduling a related group. Standalone dependencies allow cross-chain relationships.

Chain failure behavior: When a Job within a Chain fails and the dependency is configured with run_on_failure: false, all downstream Jobs in the chain are marked Skipped and the Chain transitions to Failed. If run_on_failure: true, execution continues.

5.11 Failure Handling

Each Job has a configurable failure policy that determines what happens when a Step fails:

PolicyBehavior
stopMark the Job as Failed. No further steps execute.
retry_stepRetry the failed step up to N times with exponential backoff. If all retries fail, mark Failed.
retry_jobRe-run the entire Job from step 1, up to N times. If all retries fail, mark Failed.
skip_and_continueMark the failed step as Skipped, continue to the next step.

Retry configuration:

  • max_retries: Maximum number of retry attempts (default: 3)
  • backoff_base_seconds: Base delay for exponential backoff (default: 1)
  • backoff_max_seconds: Ceiling for backoff delay (default: 60)
  • Backoff formula: min(backoff_base * 2^attempt, backoff_max)

The failure policy is set at the Job level. A future enhancement could allow per-step failure policy overrides.

5.12 Idempotency Rules by Step Type

Retries without strict idempotency turn into duplicate uploads, partial overwrites, or "successful but wrong" downstream states. Each step type declares its idempotency strategy, detection mechanism, and rollback behavior:

Step TypeIdempotency StrategyDetection KeyRollback on RetryNotes
sftp.uploadOverwrite — re-upload overwrites the remote fileRemote path (full destination path)Delete partial remote file before re-uploadUses atomic rename (upload to .tmp, rename on completion). If retry finds the .tmp file, it deletes and restarts. If it finds the final file from a previous attempt, it overwrites.
sftp.downloadResume — continue from last byte offsetLocal file path + sizeDelete partial local file and restart if resume failsSSH.NET supports offset-based reads. If the local file exists with size < remote size, resume. If sizes match, skip (already complete). If local > remote, delete and restart.
ftp.uploadOverwrite — same as SFTPRemote pathDelete and re-uploadAtomic rename via FTP RNFR/RNTO. Resume not reliable across FTP servers.
ftp.downloadResume — same as SFTPLocal file path + sizeDelete and re-downloadFluentFTP FtpLocalExists.Resume handles offset.
pgp.encryptOverwrite — re-encrypt to same output pathOutput file pathDelete output filePGP encryption is deterministic for the same input + key. Re-encryption produces functionally equivalent output (different session key, same plaintext).
pgp.decryptOverwrite — re-decrypt to same output pathOutput file pathDelete output fileDeterministic given same input + key.
file.zipOverwrite — recreate archiveOutput archive pathDelete partial archiveArchive creation is atomic (write to .tmp, rename).
file.unzipClean and re-extractOutput directoryDelete entire output directory and re-extractCannot safely resume partial extraction. Directory is wiped and re-extracted from scratch.
file.renameCheck-and-skipDestination path exists with expected sizeNo rollback neededIf destination already exists with correct size, skip. If source still exists, perform rename. If neither exists, fail.
file.copyOverwriteDestination pathDelete destinationCopy is always safe to repeat.
file.deleteCheck-and-skipFile existenceNo rollbackIf file is already gone, succeed silently.

Retry safety contract: Every step implementation must satisfy this contract:

/// <summary>
/// Step implementations must be safe to retry. Specifically:
/// 1. If the step partially completed, retry must not produce duplicates
/// 2. If the step fully completed, retry must detect this and either
///    skip (no-op) or overwrite with an equivalent result
/// 3. The step must clean up its own partial outputs before retrying
/// </summary>
public interface IJobStep
{
    // Called before retry to clean up partial state from the failed attempt.
    // Default implementation: delete all files this step wrote to the temp directory.
    Task CleanupBeforeRetryAsync(StepContext context, CancellationToken ct);

    Task ExecuteAsync(StepContext context, CancellationToken ct);
}

Detection on resume: When a paused or failed job resumes from a checkpoint, the engine validates the state before continuing:

  1. All output files from completed steps must still exist on disk (checked via path + size from the context snapshot)
  2. The step being resumed has its CleanupBeforeRetryAsync() called to clear any partial state
  3. The JobContext is restored from the database snapshot, so downstream steps see the same inputs as the original run

Upload duplicate prevention: For sftp.upload and ftp.upload, the atomic rename pattern is the primary defense. The file is uploaded as \{filename\}.courier-tmp, and only renamed to the final name on successful completion. If a retry finds \{filename\}.courier-tmp, it knows the previous attempt was incomplete and deletes it before restarting. If it finds \{filename\} (no .courier-tmp), the previous attempt succeeded and the step can skip or overwrite. Partner systems polling for files never see partial uploads because the .courier-tmp suffix doesn't match their expected patterns.

5.13 Step Timeouts

Each Step has a configurable timeout_seconds (default: 300 — 5 minutes). The timeout is enforced via a CancellationTokenSource linked to the step's execution. When the timeout fires:

  1. The CancellationToken is cancelled
  2. The step implementation is expected to observe the token and terminate gracefully
  3. If the step does not terminate within a grace period (10 seconds), the engine forcibly marks it as Failed with a timeout error
  4. The Job's failure policy then determines what happens next

5.14 Cancellation Support

A CancellationToken is threaded through the entire execution pipeline: Job → Step → underlying library calls. This allows:

  • User-initiated cancellation: Via POST /api/jobs/\{executionId\}/cancel
  • Timeout-initiated cancellation: Per-step timeout (see above)
  • Application shutdown: Aspire host shutdown triggers graceful cancellation of all running jobs, which then persist their checkpoint state

All step implementations are required to observe the cancellation token at regular intervals (e.g., between file chunks during transfer) and throw OperationCancelledException when triggered.

5.15 Temp File Management

Each Job execution is assigned a unique working directory:

/data/courier/temp/{executionId}/

Steps write all intermediate files to this directory. The JobContext stores relative paths so they resolve correctly. On job completion (success or failure after all retries exhausted), the engine:

  1. Moves final output files to their configured destinations
  2. Deletes the temp directory
  3. Logs total temp disk usage in the execution audit record

For paused or resumable-failed jobs, the temp directory is retained until the job is either resumed and completed, or manually cancelled/cleaned up. A background cleanup service purges orphaned temp directories older than a configurable threshold (default: 7 days).

5.16 Job Audit Trail

Every state transition at both the Job and Step level is recorded in the job_audit_log table:

  • Timestamp of the transition
  • From state → To state
  • Duration of the previous state
  • Bytes transferred (for file transfer steps)
  • Error details (message, stack trace, retry attempt number)
  • User who initiated the action (for manual triggers, cancellations, pauses)

This audit log is append-only and never modified. It serves as the foundation for the V2 metrics dashboard and SLA monitoring system.

5.17 Job Versioning

Job definitions are versioned using a simple incrementing version number. When a Job is edited:

  1. The current definition is snapshotted as version N
  2. The updated definition becomes version N+1
  3. Any in-progress or paused executions continue using the version they were started with
  4. New executions use the latest version

The job_definitions table stores the current version, and job_definition_versions stores the full configuration for each historical version as a JSON column. This ensures auditability and prevents mid-execution configuration changes.

5.18 Notification Hooks (V2 Preparation)

The engine emits domain events at key lifecycle points:

  • JobStarted, JobCompleted, JobFailed, JobPaused, JobResumed, JobCancelled
  • StepStarted, StepCompleted, StepFailed, StepSkipped
  • ChainStarted, ChainCompleted, ChainFailed

In V1, these events are used only for the audit log. In V2, subscribers can be registered to send email notifications (via SMTP), call webhooks (via REST), or trigger other jobs. The event infrastructure is built in V1 so V2 doesn't require engine changes — only new subscribers.

5.19 Dry Run Mode (Future Consideration)

Not in V1 scope, but the IJobStep interface includes ValidateAsync specifically to support a future dry-run mode. A dry run would execute ValidateAsync on each step (testing connections, verifying paths, checking key availability) without performing actual transfers. This can be implemented in V2 without modifying the step interface.

5.20 Control Flow (If/Else + ForEach)

The job engine supports branching and iteration through four special step types that use the existing flat step model with explicit block markers.

5.20.1 Control Flow Step Types

Step TypePurposeRequired Config
flow.foreachIterate over a collectionsource — context reference or JSON array
flow.ifConditional branchleft, operator, right (right optional for exists)
flow.elseAlternate branch (must follow flow.if body)none
flow.endCloses a flow.foreach or flow.if blocknone

5.20.2 Block Structure

Control flow steps are regular entries in the flat step list. Blocks are delimited by flow.foreach/flow.if at the start and flow.end at the end. The engine parses the flat list into a tree before execution.

Step 0: sftp.list         { "connection_id": "...", "remote_path": "/incoming" }
Step 1: flow.foreach      { "source": "context:0.file_list" }
Step 2:   sftp.download   { "connection_id": "...", "remote_path": "context:loop.current_item.name" }
Step 3:   flow.if         { "left": "context:loop.current_item.size", "operator": "greater_than", "right": "1048576" }
Step 4:     pgp.encrypt   { "input_path": "context:2.downloaded_file", "recipient_key_ids": ["..."] }
Step 5:   flow.else
Step 6:     file.copy     { "source_path": "context:2.downloaded_file", "destination_path": "/archive/" }
Step 7:   flow.end        {}   ← closes flow.if
Step 8: flow.end          {}   ← closes flow.foreach
Step 9: file.delete       { "path": "/staging/*" }

The ExecutionPlanParser converts this flat list into a tree:

Root (Sequence)
  ├── StepNode(sftp.list)
  ├── ForEachNode(flow.foreach)
  │     └── Body:
  │           ├── StepNode(sftp.download)
  │           └── IfElseNode(flow.if)
  │                 ├── Then: StepNode(pgp.encrypt)
  │                 └── Else: StepNode(file.copy)
  └── StepNode(file.delete)

Parsing rules:

  • Every flow.foreach and flow.if must have a matching flow.end
  • flow.else can only appear inside a flow.if block (at most once)
  • Blocks can be nested (foreach inside foreach, if inside foreach, etc.)
  • Malformed block structure causes the parser to throw before execution begins

5.20.3 Loop Context Variables

When iterating inside a flow.foreach, the engine injects magic context keys:

KeyValue
loop.current_itemThe current item from the collection (JsonElement)
loop.current_item.\{prop\}Property access on the current item (for objects)
loop.indexZero-based iteration index

These are injected directly into the JobContext data dictionary, so existing context: resolution works unchanged. Step handlers don't need any modifications to work inside loops.

Nested loops: Inner loops shadow loop.current_item and loop.index. Outer loop values are preserved at loop.\{depth\}.current_item and loop.\{depth\}.index (zero-based depth), and restored when the inner loop exits.

5.20.4 Condition Operators

The flow.if step evaluates left \{operator\} right using string comparison (case-insensitive) with numeric fallback for comparison operators.

OperatorBehavior
equalsCase-insensitive string equality
not_equalsNegation of equals
containsLeft contains right (case-insensitive)
greater_thanDecimal comparison (string fallback if non-numeric)
less_thanDecimal comparison (string fallback if non-numeric)
existsTrue if left is non-null and non-empty (right is ignored)
regexRight is a regex pattern matched against left

Both left and right support context: references, which are resolved before evaluation.

5.20.5 Edge Cases

ScenarioBehavior
Empty collection in foreachBody skipped entirely. Execution continues with next step after flow.end.
Step outputs inside loopsKeyed as "\{stepOrder\}.\{key\}" — each iteration overwrites previous. Last iteration's values persist after loop.
Pause mid-loopPause checked before each step including inside loops. On resume, containing foreach restarts from iteration 0 with restored context.
Failure + Stop policyJob fails immediately; abort signal propagates up through the loop.
Failure + SkipAndContinueFailed step skipped; iteration continues with next step in body.
Malformed block structureParser throws descriptive error; job fails before execution begins.

5.20.6 Step Executions for Loop Bodies

Steps executed inside a flow.foreach record their iteration_index in the step_executions table. This allows tracking which iteration produced which output or error. Non-loop steps have iteration_index = NULL.