Documentation Index
Fetch the complete documentation index at: https://docs.cloudshipai.com/llms.txt
Use this file to discover all available pages before exploring further.
Lattice Architecture
This document provides a technical deep-dive into Station Lattice’s architecture, including message flows, NATS subjects, and internal components.
System Components
┌────────────────────────────────────────────────────────────────────────┐
│ ORCHESTRATOR STATION │
├────────────────────────────────────────────────────────────────────────┤
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Registry │ │ Presence │ │ Work Store │ │
│ │ (JetStream) │ │ Manager │ │ (JetStream) │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │ │ │ │
│ └────────────────┼─────────────────┘ │
│ │ │
│ ┌────────────────┐ │
│ │ Embedded NATS │ │
│ │ + JetStream │ │
│ └────────────────┘ │
│ │ │
└───────────────────────────┼────────────────────────────────────────────┘
│
┌───────────────────┼───────────────────┐
│ │ │
▼ ▼ ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ MEMBER STATION│ │ MEMBER STATION│ │ MEMBER STATION│
├───────────────┤ ├───────────────┤ ├───────────────┤
│ ┌───────────┐ │ │ ┌───────────┐ │ │ ┌───────────┐ │
│ │ Client │ │ │ │ Client │ │ │ │ Client │ │
│ └───────────┘ │ │ └───────────┘ │ │ └───────────┘ │
│ ┌───────────┐ │ │ ┌───────────┐ │ │ ┌───────────┐ │
│ │ Presence │ │ │ │ Presence │ │ │ │ Presence │ │
│ └───────────┘ │ │ └───────────┘ │ │ └───────────┘ │
│ ┌───────────┐ │ │ ┌───────────┐ │ │ ┌───────────┐ │
│ │ Executor │ │ │ │ Executor │ │ │ │ Executor │ │
│ │ Adapter │ │ │ │ Adapter │ │ │ │ Adapter │ │
│ └───────────┘ │ │ └───────────┘ │ │ └───────────┘ │
│ ┌───────────┐ │ │ ┌───────────┐ │ │ ┌───────────┐ │
│ │ Router │ │ │ │ Router │ │ │ │ Router │ │
│ └───────────┘ │ │ └───────────┘ │ │ └───────────┘ │
└───────────────┘ └───────────────┘ └───────────────┘
Core Components
Client (internal/lattice/client.go)
The NATS connection wrapper that handles:
- Connection lifecycle management
- Automatic reconnection with backoff
- TLS and NKey authentication
- Subscription management
type Client struct {
nc *nats.Conn
js nats.JetStreamContext
config *Config
stationID string
}
Registry (internal/lattice/registry.go)
JetStream KV-backed registry for stations and agents:
| Bucket | Key Pattern | Value |
|---|
stations | {station_id} | Station metadata JSON |
agents | {station_id}.{agent_name} | Agent metadata JSON |
type Registry struct {
stationsBucket nats.KeyValue
agentsBucket nats.KeyValue
}
// Station metadata
type StationInfo struct {
ID string `json:"id"`
Name string `json:"name"`
Status StationStatus `json:"status"`
Metadata map[string]string `json:"metadata"`
LastSeen time.Time `json:"last_seen"`
Version string `json:"version"`
}
// Agent metadata
type AgentInfo struct {
Name string `json:"name"`
StationID string `json:"station_id"`
Description string `json:"description"`
Capabilities []string `json:"capabilities"`
Tags map[string]string `json:"tags"`
Status AgentStatus `json:"status"`
}
Presence (internal/lattice/presence.go)
Heartbeat system for station health monitoring:
type PresenceManager struct {
client *Client
registry *Registry
interval time.Duration
timeout time.Duration
}
// Heartbeat message
type Heartbeat struct {
StationID string `json:"station_id"`
Timestamp time.Time `json:"timestamp"`
AgentCount int `json:"agent_count"`
Load float64 `json:"load"`
}
Router (internal/lattice/router.go)
Capability-based agent routing:
type Router struct {
registry *Registry
}
// Route request to best matching agent
func (r *Router) Route(capability string) (*AgentInfo, error)
// Route with preferences
func (r *Router) RouteWithPrefs(capability string, prefs RoutePrefs) (*AgentInfo, error)
type RoutePrefs struct {
PreferredStation string
RequiredTags map[string]string
ExcludeStations []string
}
Invoker (internal/lattice/invoker.go)
Remote agent invocation via request-reply:
type Invoker struct {
client *Client
router *Router
}
// Invoke agent synchronously
func (i *Invoker) Invoke(ctx context.Context, req *InvokeRequest) (*InvokeResponse, error)
// Invoke agent asynchronously (returns work ID)
func (i *Invoker) InvokeAsync(ctx context.Context, req *InvokeRequest) (string, error)
Work Store (internal/lattice/work/store.go)
JetStream-backed async work tracking:
type WorkStore struct {
js nats.JetStreamContext
stream string
kv nats.KeyValue
}
type WorkItem struct {
ID string `json:"id"`
AgentName string `json:"agent_name"`
StationID string `json:"station_id"`
Task string `json:"task"`
Status WorkStatus `json:"status"`
Progress int `json:"progress"`
Result string `json:"result"`
Error string `json:"error"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
type WorkStatus string
const (
WorkStatusPending WorkStatus = "pending"
WorkStatusRunning WorkStatus = "running"
WorkStatusCompleted WorkStatus = "completed"
WorkStatusFailed WorkStatus = "failed"
WorkStatusCancelled WorkStatus = "cancelled"
)
NATS Subject Conventions
Core Subjects
| Subject | Type | Purpose |
|---|
lattice.station.{id}.heartbeat | Pub | Station heartbeat |
lattice.station.{id}.status | Pub | Station status changes |
lattice.agent.register | Pub | Agent registration |
lattice.agent.deregister | Pub | Agent deregistration |
Request-Reply Subjects
| Subject | Purpose |
|---|
lattice.invoke.{station}.{agent} | Direct agent invocation |
lattice.invoke.capability.{cap} | Capability-based invocation |
Work Queue Subjects
| Subject | Purpose |
|---|
lattice.work.assign.{station} | Work assignment to station |
lattice.work.{id}.status | Work status updates |
lattice.work.{id}.result | Work completion result |
lattice.work.{id}.cancel | Work cancellation request |
Message Flows
Station Registration
┌──────────┐ ┌──────────────┐ ┌──────────┐
│ Member │ │ NATS │ │ Orch. │
└────┬─────┘ └──────┬───────┘ └────┬─────┘
│ │ │
│ 1. Connect │ │
│────────────────────────────────>│ │
│ │ │
│ 2. Publish station info │ │
│ lattice.station.{id}.register │ │
│────────────────────────────────>│ ──────────────────────────────> │
│ │ │
│ │ 3. Store in KV │
│ │ <────────────────────────────── │
│ │ │
│ 4. Start heartbeat loop │ │
│ lattice.station.{id}.heartbeat│ │
│────────────────────────────────>│ ──────────────────────────────> │
│ │ │
│ 5. Register agents │ │
│ lattice.agent.register │ │
│────────────────────────────────>│ ──────────────────────────────> │
│ │ │
Synchronous Invocation
┌──────────┐ ┌──────────┐ ┌──────────────┐ ┌──────────┐
│ Invoker │ │ Router │ │ NATS │ │ Target │
└────┬─────┘ └────┬─────┘ └──────┬───────┘ └────┬─────┘
│ │ │ │
│ 1. Invoke(cap) │ │ │
│───────────────>│ │ │
│ │ │ │
│ 2. Route │ │ │
│<───────────────│ │ │
│ (agent,station) │ │
│ │ │ │
│ 3. Request │ │
│ lattice.invoke.{station}.{agent}│ │
│──────────────────────────────────>│ │
│ │ 4. Deliver │
│ │─────────────────>│
│ │ │
│ │ 5. Execute agent │
│ │ │
│ │ 6. Reply │
│ │<─────────────────│
│ 7. Response │ │
│<──────────────────────────────────│ │
│ │ │
Async Work Assignment
┌──────────┐ ┌───────────┐ ┌──────────────┐ ┌──────────┐
│ Invoker │ │WorkStore │ │ NATS │ │ Target │
└────┬─────┘ └─────┬─────┘ └──────┬───────┘ └────┬─────┘
│ │ │ │
│ 1. InvokeAsync │ │ │
│────────────────>│ │ │
│ │ │ │
│ 2. Create work │ │ │
│ item in KV │ │ │
│<────────────────│ │ │
│ (work_id) │ │ │
│ │ │ │
│ │ 3. Publish │ │
│ │ lattice.work.assign.{station} │
│ │─────────────────>│ │
│ │ │ 4. Deliver │
│ │ │─────────────────>│
│ │ │ │
│ │ │ 5. Execute │
│ │ │ (async) │
│ │ │ │
│ │ │ 6. Status update │
│ │ │ lattice.work.{id}.status
│ │ │<─────────────────│
│ │ 7. Update KV │ │
│ │<─────────────────│ │
│ │ │ │
│ │ │ 8. Result │
│ │ │ lattice.work.{id}.result
│ │ │<─────────────────│
│ │ 9. Store result │ │
│ │<─────────────────│ │
│ │ │ │
State Machines
Station Lifecycle
┌─────────────┐
│ Disconnected│
└──────┬──────┘
│ connect()
▼
┌─────────────┐
┌────>│ Connecting │
│ └──────┬──────┘
│ │ connected
│ ▼
│ ┌─────────────┐
│ │ Registering│
│ └──────┬──────┘
│ │ registered
│ ▼
│ ┌─────────────┐
│ │ Online │<────────┐
│ └──────┬──────┘ │
│ │ │
│ ┌───────┴───────┐ │
│ │ │ │
│ ▼ ▼ │
│ timeout reconnected │
│ │ │ │
│ ▼ └────────┘
│ ┌─────────────┐
│ │ Stale │
│ └──────┬──────┘
│ │ timeout
│ ▼
│ ┌─────────────┐
└─│ Offline │
└─────────────┘
Work Item Lifecycle
┌─────────────┐
│ Pending │
└──────┬──────┘
│
┌────────────┼────────────┐
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Cancelled│ │ Running │ │ Failed │
└──────────┘ └────┬─────┘ └──────────┘
│
┌──────────┼──────────┐
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│Completed │ │ Failed │ │ Cancelled│
└──────────┘ └──────────┘ └──────────┘
JetStream Configuration
Streams
// Work stream configuration
&nats.StreamConfig{
Name: "LATTICE_WORK",
Subjects: []string{"lattice.work.>"},
Retention: nats.WorkQueuePolicy,
MaxAge: 24 * time.Hour,
MaxMsgs: 100000,
MaxBytes: 1 << 30, // 1GB
Replicas: 1, // Increase for HA
Storage: nats.FileStorage,
}
KV Buckets
// Stations bucket
&nats.KeyValueConfig{
Bucket: "LATTICE_STATIONS",
TTL: 30 * time.Second, // Auto-expire stale stations
MaxBytes: 100 << 20, // 100MB
Replicas: 1,
}
// Agents bucket
&nats.KeyValueConfig{
Bucket: "LATTICE_AGENTS",
MaxBytes: 100 << 20,
Replicas: 1,
}
// Work items bucket
&nats.KeyValueConfig{
Bucket: "LATTICE_WORK_ITEMS",
TTL: 24 * time.Hour,
MaxBytes: 500 << 20, // 500MB
Replicas: 1,
}
Error Handling
Retry Policies
| Operation | Max Retries | Backoff |
|---|
| NATS connect | Infinite | Exponential (1s-30s) |
| KV operations | 3 | Linear (100ms) |
| Invocations | 0 (caller decides) | N/A |
| Work assignment | 3 | Exponential (1s-10s) |
Error Types
var (
ErrNotConnected = errors.New("not connected to lattice")
ErrStationNotFound = errors.New("station not found")
ErrAgentNotFound = errors.New("agent not found")
ErrAgentBusy = errors.New("agent is busy")
ErrInvocationTimeout = errors.New("invocation timed out")
ErrWorkNotFound = errors.New("work item not found")
ErrWorkCancelled = errors.New("work was cancelled")
)
Scaling Limits
| Component | Soft Limit | Hard Limit |
|---|
| Stations per mesh | 100 | 1000 |
| Agents per station | 50 | 500 |
| Concurrent invocations | 1000 | 10000 |
| Work items (pending) | 10000 | 100000 |
| Message size | 1MB | 8MB |
Optimization Tips
- Use capability routing instead of direct station targeting when possible
- Batch heartbeats if running many stations on same host
- Set appropriate timeouts - don’t use default 60s for fast operations
- Monitor JetStream storage - purge old work items regularly
- Use async invocation for long-running tasks (>5s)