Skip to main content

Flow Authoring Guide

Create FunnelStory agent definitions (flows) — JSON configurations you edit in the agent builder or export/import that define multi-step data processing and LLM agent workflows.

Before You Start

Clarify these before writing any JSON:

  1. What's the goal? One-shot batch job, chat-driven flow, or scheduled pipeline?
  2. Where does data come from? Semantic tables, external connections, or existing datasets?
  3. Which pattern fits? ETL, multi-query aggregation, or incremental processing? (See Common Patterns)
  4. Do you need an AGENT step? Not all flows require LLM calls (e.g., pure data pipelines, usage alerts). If yes, small for structured extraction, large for complex reasoning.

Workflow

Follow this order when building a flow:

  1. Define input_schema (if needed) — what the user provides when triggering. Omit if the flow is self-contained.
  2. Decide how the flow starts — manual/API/MCP only, or add trigger_config for automatic runs (see Triggers).
  3. Sketch the step graph — entrypoint → ... → terminal ("next": "")
  4. Choose the right op for each step — see Operation Types
  5. Wire up variable passing — decide local vs global for each output
  6. If this is a chat flow — the final AGENT step must store its output in @.response. This is the variable the runtime reads to send a reply to the user. Omitting this is the #1 cause of silent chat flows.
  7. Add guardrails — error checks after CALL steps, incremental processing for large datasets

Flow Structure

Published flows that should run automatically set "draft": false and include trigger_config. Draft flows are not picked up by the background runner.

{
"name": "My Flow",
"draft": true,
"trigger_config": null,
"input_schema": [...],
"config": {
"entrypoint": "first_step",
"steps": { ... }
}
}

Triggers (trigger_config)

Optional. When set on a non-draft flow, the agent can run automatically when events occur.

typePurposeConfig
scheduleCronschedule.expr (optional schedule.timezone)
intervalFixed repeatinterval.duration (e.g. "6h", "30m")
activityModel activity eventsactivity.activity_ids (array); optional filter_expr
signalSignal eventssignal.rule_ids (array); optional filter_expr
needle_moverNeedle mover rowsneedle_mover.labels and/or needle_mover.impacts (arrays; need at least one value across both); optional filter_expr
conversationConversationsconversation.types (array); optional filter_expr
querySemantic DB rowsquery.query — SQL against the semantic DB; each result row starts one run

Query trigger

  • The SQL is the same dialect/workspace tables you use in semantic.query inside the flow (e.g. accounts, dataset_records('my_dataset'), …).
  • Each row becomes one run. The row is available at runtime as @.trigger.row.<column> (and in templates as {{ $.trigger.row.<column> }}). Only include columns you need; the idempotency key is derived from the entire row JSON (stable row → deduped runs).
  • Cadence: query evaluation runs at most once per UTC day (first successful tick of the flows runner that day for the workspace). Use LIMIT in SQL to cap work per day.
  • Prefer narrow WHERE clauses so you do not enqueue more than you need; the runner also caps how many new runs it creates per cycle.

Example:

"trigger_config": {
"type": "query",
"query": {
"query": "SELECT account_id, name FROM accounts WHERE subscription_remaining_days < 90 LIMIT 50"
}
}

Steps can reference {{ $.trigger.row.account_id }}, etc.

Trigger data available in your flow

Each trigger type exposes different fields under @.trigger (and $.trigger in templates). Only one shape applies per run.

Trigger typeRuntime fields under $.trigger
queryrow.<column> — columns from your SQL SELECT
activityactivity.activity_id, activity.model_id, activity.account_id, activity.user_id, activity.timestamp, activity.count
signalsignal.signal_id, signal.rule_id, signal.type, signal.account_id, signal.timestamp, plus optional signal.message, signal.attributes, signal.value, signal.previous_value
needle_moverneedle_mover.needle_mover_id, needle_mover.title, needle_mover.description, needle_mover.state, needle_mover.impact, needle_mover.label, needle_mover.created_at
conversationconversation.key, conversation.metadata, conversation.timestamp

Event-driven runs also include $.trigger.account_ids when account scope is available.

Important: $.trigger.row only exists for query triggers. If a step references $.trigger.row but the run was started by an activity, signal, needle mover, or conversation trigger, the value will be empty or missing. Use the matching path for the trigger type — for example $.trigger.activity.account_id on an activity-triggered run. If you need data beyond what the trigger provides, add a semantic.query step to look it up.

Naming differences between saved config and runtime:

TopicIn the saved trigger JSONOn the run under $.trigger
Needle moverimpacts (plural array of filters)needle_mover.impact (singular string for this event)
Activityactivity_ids (which activities fire the flow)activity.activity_id (the specific event)
Conversationtypes (which conversation kinds fire the flow)conversation.key, conversation.metadata, conversation.timestamp

Testing trigger-shaped flows without waiting

When running from the builder or starting a test run, you can supply sample trigger data so the run behaves as though a real trigger started it. The sample JSON must match the trigger type you are building for.

Query-shaped sample (columns from your SQL):

{
"trigger": {
"row": {
"account_id": "acct_123",
"name": "Example Corp"
}
}
}

Activity-shaped sample:

{
"trigger": {
"activity": {
"activity_id": "019bacd1-e737-7bef-a310-c35ff896febd",
"account_id": "acct_456",
"timestamp": "2026-04-01T09:00:00Z",
"count": 1
}
}
}

The same shapes apply when an assistant runs a flow with a simulated trigger via MCP.

Input Schema (Optional)

"input_schema": [
{
"id": "account_id",
"type": "string",
"description": "The account ID to process",
"value": null
},
{
"id": "limit",
"type": "number",
"description": "Max records to process",
"value": 100
}
]
  • id (required): Variable name (access via {{ $.account_id }})
  • type (required): "string", "number", "boolean", "object", "array"
  • description (optional): Human-readable label for UI
  • value (optional): Default value if input not provided

Note: input_schema is optional. Many flows (scheduled pipelines, usage alerts) don't need user input at all. Only add it when the flow requires parameters at trigger time.

Operation Types

OpPurposeKey Fields
CALLCall a functioncall.function_id, call.args
AGENTRun LLM agentagent.system, agent.user, agent.model_type
LOOPIterate over arrayloop.over, loop.var, loop.step
CONDITIONBoolean gate (stops if false)condition.condition
BRANCHRun parallel pathsbranch.parallel_paths
JOINWait for all branches(no config)
TRANSFORMFormat or extract datatransform.type, transform.input
WAITPause executionwait.duration
SPAWNStart subplanspawn.plan_id, spawn.input

Step Structure

Every step has these common fields:

{
"step_name": {
"id": "step_name",
"op": "CALL",
"next": "next_step_name",
"out": { "set": "@.result_var" }
}
}
  • id: Must match the key name
  • next: Next step to execute. Empty string "" = end of flow.
  • out: Where to store the result

Output Configuration

"out": { "set": "@.my_result" } // Global — accessible by any subsequent step
"out": { "set": "my_result" } // Local — only in current scope
"out": { "append": "@.all_results" } // Append to array (useful in loops)
"out": { "merge": "@.summary" } // Merge object fields into existing object

CRITICAL decision rule for @. vs bare name:

  • ALWAYS use @. globals when a downstream step outside the current loop/branch needs the value
  • Use bare names only for values consumed by the immediately next step in the same scope
  • When in doubt, use @. — it's always safe

Variable Syntax

SyntaxWhen to useExample
"$.var" (quoted, no braces)Pass an entire object/array as-is"record": { "data": "$.analysis" }
"{{ $.var }}"Interpolate into a string"WHERE id = '{{ $.id }}'"
@.varReference a global variable"out": { "set": "@.results" }
@.trigger.*Payload from the run’s trigger (event, query row, …)@.trigger.row.account_id, {{ $.trigger.row.account_id }}

@.trigger is only present when the run was started with trigger context (automatic triggers or a manual request that supplied trigger).

NEVER use {{ $.var }} to pass an entire object — template interpolation stringifies objects unpredictably. Use "$.var" (quoted, no braces) instead.

Variable Scopes

PrefixScopeLifecycle
$.varLocalInput variables + loop vars. Available to the current step and chained steps within the same scope.
@.varGlobalPersisted across the entire flow run. Any step can read/write.

CALL — Calling Functions

{
"id": "fetch_data",
"op": "CALL",
"next": "process_data",
"out": { "set": "@.data" },
"call": {
"function_id": "semantic.query",
"args": {
"query": "SELECT * FROM accounts WHERE id = '{{ $.account_id }}'"
}
}
}

IMPORTANT: If a CALL step fails (function error, query failure, size limit exceeded), the step returns an error — no result is stored and execution of the current path stops. This is true for all step types, not just CALL.

Available Functions

semantic.query

Query the semantic database (workspace data).

"call": {
"function_id": "semantic.query",
"args": {
"query": "SELECT * FROM accounts LIMIT 10"
}
}

Returns: { "results": [...], "columns": [...], "total_rows": N }

Common tables and columns:

TableColumns
accountsaccount_id TEXT, domain TEXT, name TEXT, amount REAL, created_at TIMESTAMP, properties JSON, expires_at TIMESTAMP, churned BOOLEAN, churned_at TIMESTAMP, prediction TEXT, prediction_score REAL, assignees JSON (array of assignee emails), activity_score REAL, conversation_sentiment REAL, feature_adoption REAL, health_score REAL, license_utilization REAL, product_engagement TEXT, subscription_remaining_days REAL, support_sentiment REAL, total_conversations REAL, total_support_tickets REAL, total_users REAL
meetingsid TEXT, source TEXT, title TEXT, link TEXT, timestamp TIMESTAMP, duration_seconds INTEGER, sentiment REAL, summary TEXT, key JSON, data JSON, metadata JSON, participants JSON
conversationsid TEXT, parent_conversation_id TEXT, key JSON, metadata JSON, data JSON, timestamp TIMESTAMP
ticketssource TEXT, id TEXT, timestamp TIMESTAMP, key JSON, sentiment REAL, link TEXT, title TEXT, text TEXT, contact_email TEXT, contact_name TEXT, assignee_email TEXT, resolved_at TIMESTAMP, status TEXT, priority TEXT, metadata JSON, data JSON, custom_fields JSON, tags JSON
topicsreference_type TEXT, reference_id TEXT, account_id TEXT, user_id TEXT, user_email TEXT, topic TEXT, sentiment TEXT, created_at TIMESTAMP, link TEXT
notesid TEXT, title TEXT, content TEXT, link TEXT, note_type TEXT, created_at TIMESTAMP, updated_at TIMESTAMP, created_by_email TEXT, updated_by_email TEXT, account_id TEXT, timestamp TIMESTAMP
tasksid TEXT, title TEXT, body TEXT, link TEXT, status TEXT, created_at TIMESTAMP, updated_at TIMESTAMP, expires_at TIMESTAMP, created_by_email TEXT, assigned_to_email TEXT, account_id TEXT
activitiesactivity_id TEXT, activity_name TEXT, account_id TEXT, user_id TEXT, count INT, timestamp TIMESTAMP, user_email TEXT
contactsid TEXT, name TEXT, email TEXT, domain TEXT
workspace_usersuser_id TEXT, name TEXT, email TEXT, user_role TEXT, user_designation TEXT, assignable BOOLEAN, last_activity TIMESTAMP, deactivated_at TIMESTAMP
account_metricsaccount_id TEXT, metric_id TEXT, value REAL
dataset_records(name)key TEXT, record JSON — see Dataset Operations

data_connection.query

Query external data connections (CRM, etc.).

"call": {
"function_id": "data_connection.query",
"args": {
"data_connection_id": "019b3c9e-...",
"query": "SELECT * FROM companies WHERE ..."
}
}

Returns: Same shape as semantic.query. On failure, the CALL step fails (no error payload is returned).

dataset.record.upsert

Save or update a record in a dataset.

"call": {
"function_id": "dataset.record.upsert",
"args": {
"dataset": "my_dataset",
"key": "{{ $.item.id }}",
"record": {
"field1": "{{ $.item.name }}",
"field2": "$.analysis"
}
}
}

Note: Use "$.analysis" (quoted, no braces) to store entire objects. Use "{{ $.item.name }}" to interpolate strings.

Returns: { "dataset": "...", "key": "..." }

dataset.record.set_field

Update a single field in a dataset record.

"call": {
"function_id": "dataset.record.set_field",
"args": {
"dataset": "my_dataset",
"key": "{{ $.item.id }}",
"field": "status",
"value": "completed"
}
}

Returns: { "dataset": "...", "key": "...", "field": "..." }

dataset.record.delete

Delete a record from a dataset.

"call": {
"function_id": "dataset.record.delete",
"args": {
"dataset": "my_dataset",
"key": "{{ $.item.id }}"
}
}

Returns: null

tasks.create

Create a task.

"call": {
"function_id": "tasks.create",
"args": {
"title": "{{ $.summary }}",
"body": "{{ $.details }}"
}
}

Returns: { "task_id": "...", "title": "..." }

slack.send_message

Send a Slack message.

"call": {
"function_id": "slack.send_message",
"args": {
"connection_id": "slack_conn_123",
"channel_id": "C01234567",
"text": "{{ $.message }}"
}
}
  • connection_id (required): Slack connection ID
  • channel_id (required): Slack channel ID
  • text or blocks (at least one required): Plain text message or Slack Block Kit blocks. If both are provided, text becomes the notification fallback.

Returns: { "success": true, "response_channel": "...", "response_timestamp": "..." }

email.send

Send an email through FunnelStory or a user-provided data connection.

"call": {
"function_id": "email.send",
"args": {
"to": ["owner@acme.com", "csm@acme.com"],
"cc": ["lead@acme.com"],
"bcc": [],
"send_separately": false,
"subject": "Weekly summary",
"body": "Your weekly summary is ready",
"html": "<p>Your weekly summary is ready</p>"
}
}

Use a specific connected provider when needed:

"call": {
"function_id": "email.send",
"args": {
"data_connection_id": "019b3c9e-...",
"from": "hello@acme.com",
"to": ["owner@acme.com", "csm@acme.com"],
"subject": "Weekly summary",
"body": "Your weekly summary is ready"
}
}

Returns: { "sent": true, "recipients": [...], "recipient_count": N }

Note: Use data_connection_id only when you explicitly want to send from a specific connected provider. When data_connection_id is provided, include from. Connection-backed mode expands trusted-recipient allowlisting to include contacts.

IMPORTANT: to, subject, and body are all mandatory. Even if you provide html, you must still include body with a plain-text version — emails will fail without it.

salesforce.read_record

Read a Salesforce record.

"call": {
"function_id": "salesforce.read_record",
"args": {
"data_connection_id": "sf_conn_123",
"object_type": "Account",
"record_id": "001XXXXXXXXXXXXXXX",
"fields": ["Name", "Industry", "AnnualRevenue"]
}
}

Returns: { "record": { ... } }

salesforce.update_record

Update a Salesforce record.

"call": {
"function_id": "salesforce.update_record",
"args": {
"data_connection_id": "sf_conn_123",
"object_type": "Account",
"record_id": "001XXXXXXXXXXXXXXX",
"fields": {
"Customer_Health__c": "At Risk",
"Renewal_Risk_Score__c": "82"
}
}
}

Returns: { "ok": true }

salesforce.create_record

Create a Salesforce record via the REST API (single sObject insert).

"call": {
"function_id": "salesforce.create_record",
"args": {
"data_connection_id": "sf_conn_123",
"object_type": "Note",
"fields": {
"ParentId": "001XXXXXXXXXXXXXXX",
"Title": "Follow up from QBR"
}
}
}

fields and parent records: Whatever you pass must match the Salesforce REST create shape for that object_type—different sObjects use different fields to relate a new row to an Account or other parent. For example:

  • NoteParentId is the record the note attaches to; when the flow is creating a note on an Account, set ParentId to that account’s Id (001...).
  • Task — use WhatId for the related Account, Opportunity, or other allowed object, and set standard fields such as Subject (and Status if your org requires it). Use WhoId when you also relate the task to a Contact or Lead.

Other object types follow their own API field names; use Salesforce’s object reference or metadata for required and writable fields.

Args: Same data_connection_id and object_type pattern as salesforce.read_record / salesforce.update_record. fields is the JSON body sent on create: API names to values. Values may be strings, numbers, booleans, or nested objects where Salesforce accepts them (unlike salesforce.update_record, where fields is a string map for PATCH).

Returns: { "id": "<Salesforce Id>", "ok": true }

Test runs: Validates arguments and connection type; does not call Salesforce — returns a synthetic id and test: true in the result (same pattern as other side-effect CALLs in test mode).

hubspot.read_record

Read a HubSpot record.

"call": {
"function_id": "hubspot.read_record",
"args": {
"data_connection_id": "hs_conn_123",
"object_type": "companies",
"record_id": "123456789",
"fields": ["name", "domain", "industry"]
}
}

Returns: { "record": { ... } }

hubspot.update_record

Update a HubSpot record.

"call": {
"function_id": "hubspot.update_record",
"args": {
"data_connection_id": "hs_conn_123",
"object_type": "companies",
"record_id": "123456789",
"fields": {
"funnel_stage": "Expansion",
"health_status": "watch"
}
}
}

Returns: { "ok": true }

search.web

Run a web search.

"call": {
"function_id": "search.web",
"args": {
"query": "latest customer onboarding best practices",
"recency_filter": "30d"
}
}

Returns: [ { "title": "...", "url": "...", "snippet": "...", "date": "..." } ]

recency_filter options: 7d, 30d, 90d, 1y

accounts.select

Select accounts using Funnel filters. Uses the UniversalFilter (FilterGroup) shape.

Filters use boolean logic: and_group contains or_group arrays, each or_group contains individual filter entries.

"call": {
"function_id": "accounts.select",
"args": {
"filter": {
"and_group": [
{
"or_group": [
{
"filter": {
"name": "",
"metric_filter": {
"metric_id": "product_engagement",
"condition": "equal",
"value": "daily_active"
}
}
},
{
"filter": {
"name": "",
"rule_filter": {
"activity_id": "019bacd1-e737-7bef-a310-c35ff896febd",
"condition": "count_is_more_than_or_equal",
"value": "5"
}
}
}
]
}
]
}
}
}

Returns: { "total_count": N, "account_ids": ["acct_1", "acct_2"] }

Note: metric_id and activity_id values are workspace-specific. These must be looked up from the workspace configuration — they cannot be guessed.

template.render

Render a stored template with variables.

"call": {
"function_id": "template.render",
"args": {
"template_id": "my_template",
"vars": {
"name": "{{ $.account_name }}",
"score": "{{ $.health_score }}"
}
}
}

Returns: The rendered template string.

  • template_id (required): ID of the template to render
  • vars (optional): Key-value map of variables to inject into the template

AGENT — Running an LLM

{
"id": "analyze",
"op": "AGENT",
"next": "",
"out": { "set": "@.response" },
"agent": {
"model_type": "small",
"system": "You are an analyst. Output ONLY raw JSON, no markdown, no backticks.",
"user": "Analyze this data:\n{{ $.data }}",
"tools": [
{ "name": "semantic_query", "function_id": "semantic.query" }
]
}
}

Decision rules:

  • small: Use for structured extraction, classification, formatting — any task with a clear expected output shape
  • large: Use only for complex reasoning over multiple inputs, nuanced analysis, or open-ended generation
  • For JSON output: ALWAYS include "Output ONLY raw JSON, no markdown, no backticks" in the system prompt
  • Chat final response: The last AGENT step must use "out": { "set": "@.response" }. This is the variable the chat runtime reads to send a reply. If you omit this, the chat will produce no visible response.

Tools on AGENT steps (vs CALL steps)

CALL steps can invoke the full function catalog — the flow author decides when each function runs and what arguments go in. AGENT steps can also invoke functions, but only when you grant access by attaching tools. The set of functions available as agent tools is currently smaller than the full CALL catalog:

  • semantic.query
  • email.send
  • slack.send_message
  • tasks.create

Each tool entry has name (what the model invokes) and function_id (which function runs). Example: { "name": "query_semantic_db", "function_id": "semantic.query" }.

Functions not in this list (CRM read/update, dataset operations, web search, etc.) remain available in ordinary CALL steps.

Fixed arguments on tools

For each tool you can set fixed_args — argument keys the model must not choose. Fixed values are merged at runtime and those keys are hidden from the model's view of the tool schema, so it only sees parameters it is allowed to fill in.

{
"tools": [
{
"name": "post_to_slack",
"function_id": "slack.send_message",
"fixed_args": {
"connection_id": "slack_conn_123",
"channel_id": "C01234567"
}
}
]
}

In this example the model can only compose the message; connection and channel are locked by the flow author.

Advanced: threads, memory, and multi-turn

These are optional add-ons for specialized flows:

  • thread_id: Optional. Restores/continues a conversation thread from previous runs, enabling multi-turn agent interactions.
  • variable_store: Set to true to give the agent persistent key-value storage tools (variable_get, variable_set, variable_push, variable_pop, variable_clear) for scratchpad memory across tool calls.
  • multi_turn: Set to true to allow the agent to pause execution and wait for external input before continuing.

LOOP — Iterating Over Arrays

{
"id": "process_items",
"op": "LOOP",
"next": "",
"loop": {
"over": "items.results",
"var": "item",
"step": "handle_item"
}
}
  • over: Path to array (e.g., "items.results")
  • var: Current item variable name (access via $.item)
  • step: Step to execute for each item — this step can chain to further steps via next
  • The loop variable ($.item) and any local variables set within the loop body are available to all chained steps within the same iteration
  • Max 1000 iterations — the step fails if the array exceeds this

CRITICAL — Loop Scheduling Gotcha:

Loops use breadth-first scheduling, not depth-first. For a loop over [A, B, C] with steps process → save, the execution order is:

process(A) → process(B) → process(C) → save(A) → save(B) → save(C)

NOT the intuitive process(A) → save(A) → process(B) → save(B) → ...

All first steps in each iteration run before any second steps. This means:

  • NEVER write to a @. global in one step and read it in a later step within the same loop — the global will hold the value from the last iteration by the time any later steps run
  • ALWAYS use local variables (bare names, no @.) to pass data between chained steps within the same iteration — each iteration has its own isolated scope, so this is safe
  • Use @. globals inside loops only for accumulating results (e.g., out.append) that you read after the loop completes

CONDITION — Boolean Gate

{
"id": "check_data",
"op": "CONDITION",
"next": "process_data",
"condition": {
"condition": "$.has_data"
}
}
  • If true: proceeds to next
  • If false: execution stops (no next step scheduled)
  • There is no if/else. For branching, use BRANCH with separate CONDITION steps on each path.

BRANCH + JOIN — Parallel Execution

{
"id": "parallel_tasks",
"op": "BRANCH",
"next": "join_results",
"branch": {
"parallel_paths": ["task1", "task2", "task3"]
}
}
{
"id": "join_results",
"op": "JOIN",
"next": "aggregate"
}
  • All paths in parallel_paths execute concurrently
  • JOIN waits for all branches to complete before proceeding

TRANSFORM — Formatting and Extraction

{
"id": "format_output",
"op": "TRANSFORM",
"next": "save",
"out": { "set": "@.formatted" },
"transform": {
"type": "format",
"input": "$.raw_data",
"template": "Summary: {{ $.input.summary }}, Count: {{ $.input.count }}"
}
}

Transform types:

  1. format: Apply Go template to input

    • input: Path to input variable
    • template: Go template string (access input as $.input)
  2. regexp_extract: Extract text using regex

    • input: Path to string variable
    • pattern: Regex pattern
    • group: Capture group index (default: 1)

WAIT — Pause Execution

{ "id": "wait", "op": "WAIT", "next": "continue", "wait": { "duration": "5m" } }

Duration — for example "1s", "5m", "1h".


SPAWN — Start a Subplan

{
"id": "spawn_subflow",
"op": "SPAWN",
"next": "continue",
"spawn": {
"plan_id": "subplan_name",
"input": { "param1": "{{ $.value1 }}" }
}
}
  • plan_id: ID of subplan defined in config.subplans
  • Subplan runs concurrently with the parent flow

Template Syntax

Flows use Go's text/template syntax for string interpolation.

Basic Interpolation

"SELECT * FROM accounts WHERE id = '{{ $.account_id }}'"

Conditionals

"{{if $.limit}}LIMIT {{ $.limit }}{{else}}LIMIT 100{{end}}"

Accessing Nested Data

Use the index function for array access and complex keys:

"{{ index (index $.data.results 0) \"domain\" }}"

IMPORTANT: Go templates do NOT support bracket syntax like $.results[0]. Always use the index function.

Important Notes

  • Missing variables resolve to empty string (not errors)
  • Use single quotes for SQL strings: '{{ $.id }}'
  • No | json filter available — LLM receives raw object representation
  • Template errors are silent — a malformed template produces empty output, not a runtime error

Dataset Operations

Datasets are persistent key-value stores for flow outputs. Records are stored as JSON objects with a string key.

IMPORTANT: Dataset writes (upsert, set_field, delete) are permanent, side-effectful operations. Only use them when the user has explicitly asked to store data, or when the flow's documented purpose is to persist results for later processing. Do not write to a dataset speculatively.

CRITICAL — Dataset Name Rules:

  • NEVER invent a dataset name. Only use a dataset if the user has explicitly provided the dataset name in their request.
  • ALWAYS verify the dataset exists before using it by querying dataset_records('name') (e.g., SELECT 1 FROM dataset_records('my_dataset') LIMIT 1). If the query fails or returns no schema, the dataset does not exist — stop and inform the user rather than proceeding.
  • There is no MCP tool to create a new dataset. If the user asks to create one, explain that dataset creation is not supported as of now.

Querying Dataset Records

Use dataset_records('name') as a table in semantic.query:

-- Basic query
SELECT * FROM dataset_records('my_dataset') LIMIT 100

-- Filter by record fields (JSONB operators)
SELECT * FROM dataset_records('my_dataset')
WHERE record->>'status' = 'completed'

-- Access nested JSON
SELECT
key,
record->>'name' as name,
record->'analysis'->>'category' as category
FROM dataset_records('my_dataset')

Writing Records

Use the CALL functions in flows:

  • dataset.record.upsert — Create or overwrite a full record
  • dataset.record.set_field — Update a single field
  • dataset.record.delete — Delete a record by key

Common Patterns

1. Fetch → Process → Store (ETL)

{
"entrypoint": "fetch",
"steps": {
"fetch": {
"id": "fetch", "op": "CALL", "next": "loop",
"out": { "set": "items" },
"call": { "function_id": "semantic.query", "args": { "query": "SELECT * FROM tickets LIMIT 100" } }
},
"loop": {
"id": "loop", "op": "LOOP", "next": "",
"loop": { "over": "items.results", "var": "item", "step": "process" }
},
"process": {
"id": "process", "op": "AGENT", "next": "save",
"out": { "set": "analysis" },
"agent": { "model_type": "small", "system": "Extract insights as JSON. Output ONLY raw JSON.", "user": "{{ $.item }}" }
},
"save": {
"id": "save", "op": "CALL", "next": "",
"call": {
"function_id": "dataset.record.upsert",
"args": { "dataset": "my_analysis", "key": "{{ $.item.id }}", "record": { "analysis": "$.analysis" } }
}
}
}
}

2. Multi-Query Aggregation → Report

{
"entrypoint": "query1",
"steps": {
"query1": {
"id": "query1", "op": "CALL", "next": "query2",
"out": { "set": "@.data1" },
"call": { "function_id": "semantic.query", "args": { "query": "..." } }
},
"query2": {
"id": "query2", "op": "CALL", "next": "generate",
"out": { "set": "@.data2" },
"call": { "function_id": "semantic.query", "args": { "query": "..." } }
},
"generate": {
"id": "generate", "op": "AGENT", "next": "",
"out": { "set": "@.response" },
"agent": { "model_type": "large", "system": "Generate a report.", "user": "Data 1:\n{{ $.data1 }}\n\nData 2:\n{{ $.data2 }}" }
}
}
}

3. Incremental Processing (Skip Already Processed)

SELECT t.* FROM tickets t
LEFT JOIN dataset_records('processed') d ON t.id = d.key
WHERE d.key IS NULL
LIMIT {{if $.limit}}{{ $.limit }}{{else}}100{{end}}

This is the standard pattern for flows that run repeatedly and should only process new records.


Common Mistakes

  • NEVER use {{ $.var }} to pass an entire object — use "$.var" (quoted, no braces). Template interpolation stringifies objects unpredictably.
  • NEVER assume a CALL step always succeeds. semantic.query and data_connection.query fail the step on query errors — the current path stops. Design flows to be resilient to step failures.
  • NEVER store loop results as bare local variables if you need them after the loop — use @. globals or out.append.
  • NEVER write to a @. global in one loop step and read it in a later step of the same loop — loops are breadth-first, so all first steps run before any second steps. Use local variables to pass data within an iteration.
  • NEVER use CONDITION expecting if/else — it's a gate that stops execution on false. Use BRANCH for true branching.
  • ALWAYS add "Output ONLY raw JSON, no markdown, no backticks" to AGENT system prompts when you need JSON.
  • ALWAYS set "next": "" on the last step — omitting it causes runtime errors.

Runtime Limits

LimitValueBehavior
Function output (CALL)100 KiBStep fails
Global variables1 MiBRun fails
Step memory10 MiBRun fails
Loop iterations1000Step fails
Events size5 MiBRun fails
Chat ticks per request1000Request stops
Background run ticks per execution100Run pauses until next runner cycle
Chat wakeup/wait1 minuteFails with WAIT_TOO_LONG
Run timeout24 hoursBackground/scheduled runs

Useful Tips

Getting the Current Date/Time

The flow runtime does not expose the current date directly. Use a semantic.query CALL to get it:

"call": {
"function_id": "semantic.query",
"args": {
"query": "SELECT date('now') as today, strftime('%w', 'now') as day_of_week"
}
}

This returns: { "results": [{ "today": "2026-03-02", "day_of_week": "1" }] } (0=Sunday, 6=Saturday). Use this before any step that needs date-aware logic.

Debugging

  1. Template not interpolating? Check variable path. Use {{ . }} to see entire context.
  2. Empty results? Verify query syntax and table/column names.
  3. Wrong variable scope? Use @.var for global, $.var for local. When in doubt, use @..
  4. LLM output malformed? Add explicit formatting instructions in system prompt.
  5. Object passed as string? You used {{ $.var }} instead of "$.var".
  6. Chat produces no response? Ensure the final AGENT step stores output in @.response.