Skip to main content

Overview

Station’s workflow engine enables you to chain agents together into durable, multi-step automated procedures. Workflows support:
  • Parallel execution - Run multiple agents concurrently
  • Conditional routing - Branch based on agent outputs
  • Human approval gates - Block until human approves
  • Data transformation - Reshape data between steps
  • Loop iteration - Process arrays with concurrency control
┌─────────────────────────────────────────────────────────────────────┐
│                        WORKFLOW EXECUTION                           │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  [Workflow Input]                                                   │
│       │                                                             │
│       ▼                                                             │
│  ┌─────────────────┐                                                │
│  │  parallel-step  │  ← Run 3 agents concurrently                   │
│  └────────┬────────┘                                                │
│     ┌─────┼─────┐                                                   │
│     ▼     ▼     ▼                                                   │
│  [agent] [agent] [agent]                                            │
│     │     │     │                                                   │
│     └─────┼─────┘                                                   │
│           ▼                                                         │
│  ┌─────────────────┐                                                │
│  │  synthesize     │  ← Combine results                             │
│  └────────┬────────┘                                                │
│           ▼                                                         │
│  ┌─────────────────┐                                                │
│  │  switch         │  ← Route based on severity                     │
│  └────────┬────────┘                                                │
│     ┌─────┴─────┐                                                   │
│     ▼           ▼                                                   │
│  [approval]  [report]                                               │
│     │           │                                                   │
│     └─────┬─────┘                                                   │
│           ▼                                                         │
│  [Workflow Output]                                                  │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

Quick Start

1

Create a Workflow File

Create ~/.config/station/environments/default/workflows/my-workflow.workflow.yaml:
id: hello-workflow
name: "Hello Workflow"
version: "1.0.0"
description: "A simple workflow example"

start: greet

states:
  - id: greet
    type: inject
    data:
      message: "Hello from Station!"
    output:
      greeting: "$.data"
    transition: analyze

  - id: analyze
    type: agent
    agent: my-analyzer           # Your agent name
    input:
      text: "$.greeting.message"
    output:
      analysis: "$.result"
    end: true
2

Sync and Run

stn sync                                          # Load the workflow
stn workflow run hello-workflow --wait            # Execute it
3

View Results

stn workflow runs hello-workflow                  # List runs
stn workflow runs <run-id> --steps                # View step details

State Types

Workflows are composed of states. Each state has a type that determines its behavior.

Agent

Execute a Station agent by name
type: agent
agent: my-agent

Inject

Insert static data into context
type: inject
data: { key: "value" }

Switch

Conditional branching
type: switch
conditions:
  - if: "expr"
    next: state

Transform

Reshape data with Starlark
type: transform
expression: |
  { "key": value }

Parallel

Run branches concurrently
type: parallel
branches:
  - id: branch1
    states: [...]

Foreach

Iterate over arrays
type: foreach
itemsPath: "$.items"
maxConcurrency: 5

Agent State

Execute an AI agent and capture its output.
- id: analyze-logs
  name: "Analyze Logs"
  type: agent
  agent: log-analyzer                    # Agent name (must exist)
  input:
    userInput: "${service_name}"         # Template variable (recommended)
    variables:
      time_range: 60                     # Static value passed to agent
  output:
    log_analysis: "$.result"             # Store full result
    severity: "$.result.severity"        # Store specific field
  timeout: 2m
  transition: next-step

Agent Input Patterns

When passing data to agent steps, use template variables (${var}) for the task/userInput field:
- id: process-data
  type: agent
  agent: my-agent
  input:
    userInput: "${query}"              # Template variable - WORKS!
JSONPath ($.path) does NOT work in input.task or input.userInput fields - only template variables (${var}) are resolved. JSONPath works correctly in:
  • output mappings
  • input.variables
  • dataPath in switch states
Workaround: Always use template variables like ${my_var} instead of $.my_var for task/userInput fields.

Multi-Step Data Flow

Pass data between workflow steps using output mappings and template variables:
states:
  # Step 1: Get user data
  - id: get-user
    type: agent
    agent: user-lookup
    input:
      userInput: "Get email for user ${user_id}"
    output:
      user_email: "$.response"           # Store response at $.user_email
    transition: send-notification

  # Step 2: Use Step 1's output
  - id: send-notification
    type: agent
    agent: notifier
    input:
      userInput: "Send alert to ${user_email}"  # Uses Step 1 output!
    end: true
SyntaxWhere It WorksExample
${var}task, userInput, anywhere"Process ${query}"
$.pathoutput mappings, variables, dataPathoutput: { result: "$.response" }

Switch State

Route execution based on conditions using Starlark expressions.
Always use hasattr() for safe field access. Agent outputs may not include all expected fields.
- id: check-severity
  type: switch
  dataPath: "$.analysis_result"
  conditions:
    # ALWAYS use hasattr() to check field existence
    - if: "hasattr(analysis_result, 'severity') and analysis_result.severity == 'critical'"
      next: escalate
    - if: "hasattr(analysis_result, 'rollback_needed') and analysis_result.rollback_needed"
      next: request-approval
  defaultNext: complete
Built-in Functions:
FunctionDescriptionExample
hasattr(obj, "field")Check if field existshasattr(result, "error")
getattr(obj, "field", default)Get field with defaultgetattr(result, "count", 0)
len(collection)Get lengthlen(pods) > 0

Transform State

Reshape data using Starlark expressions. Use getattr() for safe access with defaults.
- id: build-report
  type: transform
  expression: |
    # All context variables available as globals
    # Use getattr for safe access with defaults
    
    report = {
      "service": getattr(input, "service_name", "unknown"),
      "root_cause": getattr(analysis, "root_cause", {}),
      "actions": getattr(analysis, "recommended_actions", []),
      "severity": getattr(analysis, "severity", "unknown")
    }
    
    # Last expression is the output
    report
  output:
    final_report: "$.result"
  end: true

Parallel State

Execute multiple branches concurrently and wait for all to complete.
- id: gather-data
  type: parallel
  branches:
    - id: check-k8s
      start: k8s-step
      states:
        - id: k8s-step
          type: agent
          agent: k8s-investigator
          input:
            namespace: "$.input.namespace"
          output:
            k8s_data: "$.result"
          end: true

    - id: check-logs
      start: logs-step
      states:
        - id: logs-step
          type: agent
          agent: log-analyzer
          input:
            service: "$.input.service"
          output:
            log_data: "$.result"
          end: true

  output:
    all_data: "$.branches"
  transition: synthesize

Foreach State

Iterate over an array with optional concurrency.
- id: check-all-services
  type: foreach
  itemsPath: "$.input.services"      # Path to array
  itemName: "service"                 # Variable name for current item
  maxConcurrency: 5                   # Run up to 5 in parallel
  iterator:
    start: check-service
    states:
      - id: check-service
        type: agent
        agent: health-checker
        input:
          name: "$.service.name"
          namespace: "$.service.namespace"
        output:
          health: "$.result"
        end: true
  output:
    all_results: "$.iterations"
  transition: summarize

Human Approval State

Block workflow until a human approves or rejects.
- id: request-approval
  type: human_approval
  approval_title: "Production Deployment"
  message: |
    **Service:** {{input.service_name}}
    **Changes:** {{build_result.changelog}}
    
    Do you approve this deployment?
  approvers:
    - oncall-sre
    - platform-leads
  timeout: 30m
  transition: deploy

Complete Example

Here’s a full incident RCA workflow that demonstrates parallel execution, conditional routing, and human approval:
id: incident-rca
name: "Incident Root Cause Analysis"
version: "1.0.0"
description: "Parallel investigation with synthesis and approval"

inputSchema:
  type: object
  properties:
    service_name:
      type: string
  required:
    - service_name

start: parallel-investigation

states:
  # Step 1: Gather data in parallel
  - id: parallel-investigation
    type: parallel
    branches:
      - id: k8s-branch
        start: check-k8s
        states:
          - id: check-k8s
            type: agent
            agent: k8s-investigator
            input:
              service: "$.input.service_name"
            output:
              k8s_data: "$.result"
            timeout: 2m
            end: true

      - id: logs-branch
        start: check-logs
        states:
          - id: check-logs
            type: agent
            agent: log-analyzer
            input:
              service: "$.input.service_name"
            output:
              log_data: "$.result"
            timeout: 2m
            end: true
    output:
      investigation: "$.branches"
    transition: synthesize

  # Step 2: Synthesize findings
  - id: synthesize
    type: agent
    agent: rca-synthesizer
    input:
      k8s_data: "$.investigation.k8s_branch.k8s_data"
      log_data: "$.investigation.logs_branch.log_data"
    output:
      rca_result: "$.result"
    timeout: 2m
    transition: check-rollback

  # Step 3: Route based on analysis
  - id: check-rollback
    type: switch
    dataPath: "$.rca_result"
    conditions:
      - if: "hasattr(rca_result, 'rollback_recommended') and rca_result.rollback_recommended == True"
        next: request-approval
    defaultNext: generate-report

  # Step 4: Human approval (if rollback needed)
  - id: request-approval
    type: human_approval
    message: |
      **Root Cause Analysis Complete**
      
      Service: {{input.service_name}}
      Cause: {{rca_result.root_cause.description}}
      
      Rollback recommended to: {{rca_result.rollback_target}}
      
      Do you approve?
    timeout: 30m
    transition: generate-report

  # Step 5: Build final output
  - id: generate-report
    type: transform
    expression: |
      {
        "service": getattr(input, "service_name", "unknown"),
        "root_cause": getattr(rca_result, "root_cause", {}),
        "actions": getattr(rca_result, "recommended_actions", []),
        "rollback_needed": getattr(rca_result, "rollback_recommended", False)
      }
    output:
      final_report: "$.result"
    end: true

MCP Integration

Workflows are available through Station’s MCP server. When authoring workflows via MCP tools, always read the DSL reference first:
station://docs/workflow-dsl
This resource contains the complete DSL specification, all state types, and Starlark expression reference.

Available Tools

ToolDescription
create_workflowCreate a new workflow definition
update_workflowUpdate existing workflow (creates new version)
validate_workflowValidate definition without saving
start_workflow_runStart a new workflow run
get_workflow_runGet run status and details

Best Practices

Agent outputs may not include all expected fields. Always check existence:
# BAD - will fail if field missing
- if: "result.rollback == True"

# GOOD - checks existence first
- if: "hasattr(result, 'rollback') and result.rollback == True"
Prevent workflows from hanging indefinitely:
- id: long-analysis
  type: agent
  agent: deep-analyzer
  timeout: 5m              # Always set a timeout
Makes debugging much easier:
# BAD
- id: step1

# GOOD
- id: gather-pod-metrics
Handle unexpected values gracefully:
- id: route
  type: switch
  conditions:
    - if: "..."
      next: known-path
  defaultNext: fallback-path   # Always include

Troubleshooting

The agent didn’t output the expected field. Use hasattr():
if: "hasattr(result, 'field') and result.field == 'value'"
Check NATS is running and workflow consumer started:
stn status
# Look for "Workflow consumer started" in logs
Check Starlark syntax:
  • Use = for assignment, == for comparison
  • Strings need quotes: "value" not value
  • Dict syntax: {"key": value}

REST API Reference

Station exposes a REST API for programmatic workflow management.

Workflow Definitions

MethodEndpointDescription
POST/api/v1/workflowsCreate a new workflow
GET/api/v1/workflowsList all workflows
GET/api/v1/workflows/:idGet workflow by ID
PUT/api/v1/workflows/:idUpdate workflow (creates new version)
DELETE/api/v1/workflows/:idDisable workflow
POST/api/v1/workflows/validateValidate definition without saving
GET/api/v1/workflows/:id/versionsList all versions
GET/api/v1/workflows/:id/versions/:vGet specific version
Create Workflow Request:
{
  "workflowId": "incident-rca",
  "name": "Incident Root Cause Analysis",
  "description": "Parallel investigation with synthesis",
  "definition": {
    "id": "incident-rca",
    "start": "investigate",
    "states": [...]
  }
}

Workflow Runs

MethodEndpointDescription
POST/api/v1/workflow-runsStart a new run
GET/api/v1/workflow-runsList runs (filter by workflow_id, status)
GET/api/v1/workflow-runs/:idGet run status and output
GET/api/v1/workflow-runs/:id/streamSSE stream of run updates
GET/api/v1/workflow-runs/:id/stepsList completed steps
POST/api/v1/workflow-runs/:id/pausePause execution
POST/api/v1/workflow-runs/:id/resumeResume paused run
POST/api/v1/workflow-runs/:id/cancelCancel run
Start Run Request:
{
  "workflowId": "incident-rca",
  "input": {
    "service_name": "payment-service",
    "namespace": "production"
  }
}

Approvals

MethodEndpointDescription
GET/api/v1/workflow-approvalsList pending approvals
GET/api/v1/workflow-approvals/:idGet approval details
POST/api/v1/workflow-approvals/:id/approveApprove with optional comment
POST/api/v1/workflow-approvals/:id/rejectReject with reason
Approve Request:
{
  "comment": "Reviewed and approved for production rollout"
}
Reject Request:
{
  "reason": "Risk too high - needs additional review"
}
Set X-Approver-ID header to identify the approver. Defaults to api-user if not provided.