Flow Authoring Guide
Create FunnelStory agent definitions (flows) — JSON configurations you edit in the agent builder or export/import that define multi-step data processing and LLM agent workflows.
Before You Start
Clarify these before writing any JSON:
- What's the goal? One-shot batch job, chat-driven flow, or scheduled pipeline?
- Where does data come from? Semantic tables, external connections, or existing datasets?
- Which pattern fits? ETL, multi-query aggregation, or incremental processing? (See Common Patterns)
- Do you need an AGENT step? Not all flows require LLM calls (e.g., pure data pipelines, usage alerts). If yes,
smallfor structured extraction,largefor complex reasoning.
Workflow
Follow this order when building a flow:
- Define
input_schema(if needed) — what the user provides when triggering. Omit if the flow is self-contained. - Decide how the flow starts — manual/API/MCP only, or add
trigger_configfor automatic runs (see Triggers). - Sketch the step graph — entrypoint → ... → terminal (
"next": "") - Choose the right op for each step — see Operation Types
- Wire up variable passing — decide local vs global for each output
- If this is a chat flow — the final AGENT step must store its output in
@.response. This is the variable the runtime reads to send a reply to the user. Omitting this is the #1 cause of silent chat flows. - Add guardrails — error checks after CALL steps, incremental processing for large datasets
Flow Structure
Published flows that should run automatically set "draft": false and include trigger_config. Draft flows are not picked up by the background runner.
{
"name": "My Flow",
"draft": true,
"trigger_config": null,
"input_schema": [...],
"config": {
"entrypoint": "first_step",
"steps": { ... }
}
}
Triggers (trigger_config)
Optional. When set on a non-draft flow, the agent can run automatically when events occur.
type | Purpose | Config |
|---|---|---|
schedule | Cron | schedule.expr (optional schedule.timezone) |
interval | Fixed repeat | interval.duration (e.g. "6h", "30m") |
activity | Model activity events | activity.activity_ids (array); optional filter_expr |
signal | Signal events | signal.rule_ids (array); optional filter_expr |
needle_mover | Needle mover rows | needle_mover.labels and/or needle_mover.impacts (arrays; need at least one value across both); optional filter_expr |
conversation | Conversations | conversation.types (array); optional filter_expr |
query | Semantic DB rows | query.query — SQL against the semantic DB; each result row starts one run |
Query trigger
- The SQL is the same dialect/workspace tables you use in
semantic.queryinside the flow (e.g.accounts,dataset_records('my_dataset'), …). - Each row becomes one run. The row is available at runtime as
@.trigger.row.<column>(and in templates as{{ $.trigger.row.<column> }}). Only include columns you need; the idempotency key is derived from the entire row JSON (stable row → deduped runs). - Cadence: query evaluation runs at most once per UTC day (first successful tick of the
flowsrunner that day for the workspace). UseLIMITin SQL to cap work per day. - Prefer narrow
WHEREclauses so you do not enqueue more than you need; the runner also caps how many new runs it creates per cycle.
Example:
"trigger_config": {
"type": "query",
"query": {
"query": "SELECT account_id, name FROM accounts WHERE subscription_remaining_days < 90 LIMIT 50"
}
}
Steps can reference {{ $.trigger.row.account_id }}, etc.
Trigger data available in your flow
Each trigger type exposes different fields under @.trigger (and $.trigger in templates). Only one shape applies per run.
| Trigger type | Runtime fields under $.trigger |
|---|---|
| query | row.<column> — columns from your SQL SELECT |
| activity | activity.activity_id, activity.model_id, activity.account_id, activity.user_id, activity.timestamp, activity.count |
| signal | signal.signal_id, signal.rule_id, signal.type, signal.account_id, signal.timestamp, plus optional signal.message, signal.attributes, signal.value, signal.previous_value |
| needle_mover | needle_mover.needle_mover_id, needle_mover.title, needle_mover.description, needle_mover.state, needle_mover.impact, needle_mover.label, needle_mover.created_at |
| conversation | conversation.key, conversation.metadata, conversation.timestamp |
Event-driven runs also include $.trigger.account_ids when account scope is available.
Important: $.trigger.row only exists for query triggers. If a step references $.trigger.row but the run was started by an activity, signal, needle mover, or conversation trigger, the value will be empty or missing. Use the matching path for the trigger type — for example $.trigger.activity.account_id on an activity-triggered run. If you need data beyond what the trigger provides, add a semantic.query step to look it up.
Naming differences between saved config and runtime:
| Topic | In the saved trigger JSON | On the run under $.trigger |
|---|---|---|
| Needle mover | impacts (plural array of filters) | needle_mover.impact (singular string for this event) |
| Activity | activity_ids (which activities fire the flow) | activity.activity_id (the specific event) |
| Conversation | types (which conversation kinds fire the flow) | conversation.key, conversation.metadata, conversation.timestamp |
Testing trigger-shaped flows without waiting
When running from the builder or starting a test run, you can supply sample trigger data so the run behaves as though a real trigger started it. The sample JSON must match the trigger type you are building for.
Query-shaped sample (columns from your SQL):
{
"trigger": {
"row": {
"account_id": "acct_123",
"name": "Example Corp"
}
}
}
Activity-shaped sample:
{
"trigger": {
"activity": {
"activity_id": "019bacd1-e737-7bef-a310-c35ff896febd",
"account_id": "acct_456",
"timestamp": "2026-04-01T09:00:00Z",
"count": 1
}
}
}
The same shapes apply when an assistant runs a flow with a simulated trigger via MCP.
Input Schema (Optional)
"input_schema": [
{
"id": "account_id",
"type": "string",
"description": "The account ID to process",
"value": null
},
{
"id": "limit",
"type": "number",
"description": "Max records to process",
"value": 100
}
]
id(required): Variable name (access via{{ $.account_id }})type(required):"string","number","boolean","object","array"description(optional): Human-readable label for UIvalue(optional): Default value if input not provided
Note: input_schema is optional. Many flows (scheduled pipelines, usage alerts) don't need user input at all. Only add it when the flow requires parameters at trigger time.
Operation Types
| Op | Purpose | Key Fields |
|---|---|---|
CALL | Call a function | call.function_id, call.args |
AGENT | Run LLM agent | agent.system, agent.user, agent.model_type |
LOOP | Iterate over array | loop.over, loop.var, loop.step |
CONDITION | Boolean gate (stops if false) | condition.condition |
BRANCH | Run parallel paths | branch.parallel_paths |
JOIN | Wait for all branches | (no config) |
TRANSFORM | Format or extract data | transform.type, transform.input |
WAIT | Pause execution | wait.duration |
SPAWN | Start subplan | spawn.plan_id, spawn.input |
Step Structure
Every step has these common fields:
{
"step_name": {
"id": "step_name",
"op": "CALL",
"next": "next_step_name",
"out": { "set": "@.result_var" }
}
}
id: Must match the key namenext: Next step to execute. Empty string""= end of flow.out: Where to store the result
Output Configuration
"out": { "set": "@.my_result" } // Global — accessible by any subsequent step
"out": { "set": "my_result" } // Local — only in current scope
"out": { "append": "@.all_results" } // Append to array (useful in loops)
"out": { "merge": "@.summary" } // Merge object fields into existing object
CRITICAL decision rule for @. vs bare name:
- ALWAYS use
@.globals when a downstream step outside the current loop/branch needs the value - Use bare names only for values consumed by the immediately next step in the same scope
- When in doubt, use
@.— it's always safe
Variable Syntax
| Syntax | When to use | Example |
|---|---|---|
"$.var" (quoted, no braces) | Pass an entire object/array as-is | "record": { "data": "$.analysis" } |
"{{ $.var }}" | Interpolate into a string | "WHERE id = '{{ $.id }}'" |
@.var | Reference a global variable | "out": { "set": "@.results" } |
@.trigger.* | Payload from the run’s trigger (event, query row, …) | @.trigger.row.account_id, {{ $.trigger.row.account_id }} |
@.trigger is only present when the run was started with trigger context (automatic triggers or a manual request that supplied trigger).
NEVER use {{ $.var }} to pass an entire object — template interpolation stringifies objects unpredictably. Use "$.var" (quoted, no braces) instead.
Variable Scopes
| Prefix | Scope | Lifecycle |
|---|---|---|
$.var | Local | Input variables + loop vars. Available to the current step and chained steps within the same scope. |
@.var | Global | Persisted across the entire flow run. Any step can read/write. |
CALL — Calling Functions
{
"id": "fetch_data",
"op": "CALL",
"next": "process_data",
"out": { "set": "@.data" },
"call": {
"function_id": "semantic.query",
"args": {
"query": "SELECT * FROM accounts WHERE id = '{{ $.account_id }}'"
}
}
}
IMPORTANT: If a CALL step fails (function error, query failure, size limit exceeded), the step returns an error — no result is stored and execution of the current path stops. This is true for all step types, not just CALL.
Available Functions
semantic.query
Query the semantic database (workspace data).
"call": {
"function_id": "semantic.query",
"args": {
"query": "SELECT * FROM accounts LIMIT 10"
}
}
Returns: { "results": [...], "columns": [...], "total_rows": N }
Common tables and columns:
| Table | Columns |
|---|---|
accounts | account_id TEXT, domain TEXT, name TEXT, amount REAL, created_at TIMESTAMP, properties JSON, expires_at TIMESTAMP, churned BOOLEAN, churned_at TIMESTAMP, prediction TEXT, prediction_score REAL, assignees JSON (array of assignee emails), activity_score REAL, conversation_sentiment REAL, feature_adoption REAL, health_score REAL, license_utilization REAL, product_engagement TEXT, subscription_remaining_days REAL, support_sentiment REAL, total_conversations REAL, total_support_tickets REAL, total_users REAL |
meetings | id TEXT, source TEXT, title TEXT, link TEXT, timestamp TIMESTAMP, duration_seconds INTEGER, sentiment REAL, summary TEXT, key JSON, data JSON, metadata JSON, participants JSON |
conversations | id TEXT, parent_conversation_id TEXT, key JSON, metadata JSON, data JSON, timestamp TIMESTAMP |
tickets | source TEXT, id TEXT, timestamp TIMESTAMP, key JSON, sentiment REAL, link TEXT, title TEXT, text TEXT, contact_email TEXT, contact_name TEXT, assignee_email TEXT, resolved_at TIMESTAMP, status TEXT, priority TEXT, metadata JSON, data JSON, custom_fields JSON, tags JSON |
topics | reference_type TEXT, reference_id TEXT, account_id TEXT, user_id TEXT, user_email TEXT, topic TEXT, sentiment TEXT, created_at TIMESTAMP, link TEXT |
notes | id TEXT, title TEXT, content TEXT, link TEXT, note_type TEXT, created_at TIMESTAMP, updated_at TIMESTAMP, created_by_email TEXT, updated_by_email TEXT, account_id TEXT, timestamp TIMESTAMP |
tasks | id TEXT, title TEXT, body TEXT, link TEXT, status TEXT, created_at TIMESTAMP, updated_at TIMESTAMP, expires_at TIMESTAMP, created_by_email TEXT, assigned_to_email TEXT, account_id TEXT |
activities | activity_id TEXT, activity_name TEXT, account_id TEXT, user_id TEXT, count INT, timestamp TIMESTAMP, user_email TEXT |
contacts | id TEXT, name TEXT, email TEXT, domain TEXT |
workspace_users | user_id TEXT, name TEXT, email TEXT, user_role TEXT, user_designation TEXT, assignable BOOLEAN, last_activity TIMESTAMP, deactivated_at TIMESTAMP |
account_metrics | account_id TEXT, metric_id TEXT, value REAL |
dataset_records(name) | key TEXT, record JSON — see Dataset Operations |
data_connection.query
Query external data connections (CRM, etc.).
"call": {
"function_id": "data_connection.query",
"args": {
"data_connection_id": "019b3c9e-...",
"query": "SELECT * FROM companies WHERE ..."
}
}
Returns: Same shape as semantic.query. On failure, the CALL step fails (no error payload is returned).
dataset.record.upsert
Save or update a record in a dataset.
"call": {
"function_id": "dataset.record.upsert",
"args": {
"dataset": "my_dataset",
"key": "{{ $.item.id }}",
"record": {
"field1": "{{ $.item.name }}",
"field2": "$.analysis"
}
}
}
Note: Use "$.analysis" (quoted, no braces) to store entire objects. Use "{{ $.item.name }}" to interpolate strings.
Returns: { "dataset": "...", "key": "..." }
dataset.record.set_field
Update a single field in a dataset record.
"call": {
"function_id": "dataset.record.set_field",
"args": {
"dataset": "my_dataset",
"key": "{{ $.item.id }}",
"field": "status",
"value": "completed"
}
}
Returns: { "dataset": "...", "key": "...", "field": "..." }
dataset.record.delete
Delete a record from a dataset.
"call": {
"function_id": "dataset.record.delete",
"args": {
"dataset": "my_dataset",
"key": "{{ $.item.id }}"
}
}
Returns: null
tasks.create
Create a task.
"call": {
"function_id": "tasks.create",
"args": {
"title": "{{ $.summary }}",
"body": "{{ $.details }}"
}
}
Returns: { "task_id": "...", "title": "..." }
slack.send_message
Send a Slack message.
"call": {
"function_id": "slack.send_message",
"args": {
"connection_id": "slack_conn_123",
"channel_id": "C01234567",
"text": "{{ $.message }}"
}
}
connection_id(required): Slack connection IDchannel_id(required): Slack channel IDtextorblocks(at least one required): Plain text message or Slack Block Kit blocks. If both are provided,textbecomes the notification fallback.
Returns: { "success": true, "response_channel": "...", "response_timestamp": "..." }
email.send
Send an email through FunnelStory or a user-provided data connection.
"call": {
"function_id": "email.send",
"args": {
"to": ["owner@acme.com", "csm@acme.com"],
"cc": ["lead@acme.com"],
"bcc": [],
"send_separately": false,
"subject": "Weekly summary",
"body": "Your weekly summary is ready",
"html": "<p>Your weekly summary is ready</p>"
}
}
Use a specific connected provider when needed:
"call": {
"function_id": "email.send",
"args": {
"data_connection_id": "019b3c9e-...",
"from": "hello@acme.com",
"to": ["owner@acme.com", "csm@acme.com"],
"subject": "Weekly summary",
"body": "Your weekly summary is ready"
}
}
Returns: { "sent": true, "recipients": [...], "recipient_count": N }
Note: Use data_connection_id only when you explicitly want to send from a specific connected provider. When data_connection_id is provided, include from. Connection-backed mode expands trusted-recipient allowlisting to include contacts.
IMPORTANT: to, subject, and body are all mandatory. Even if you provide html, you must still include body with a plain-text version — emails will fail without it.
salesforce.read_record
Read a Salesforce record.
"call": {
"function_id": "salesforce.read_record",
"args": {
"data_connection_id": "sf_conn_123",
"object_type": "Account",
"record_id": "001XXXXXXXXXXXXXXX",
"fields": ["Name", "Industry", "AnnualRevenue"]
}
}
Returns: { "record": { ... } }
salesforce.update_record
Update a Salesforce record.
"call": {
"function_id": "salesforce.update_record",
"args": {
"data_connection_id": "sf_conn_123",
"object_type": "Account",
"record_id": "001XXXXXXXXXXXXXXX",
"fields": {
"Customer_Health__c": "At Risk",
"Renewal_Risk_Score__c": "82"
}
}
}
Returns: { "ok": true }
salesforce.create_record
Create a Salesforce record via the REST API (single sObject insert).
"call": {
"function_id": "salesforce.create_record",
"args": {
"data_connection_id": "sf_conn_123",
"object_type": "Note",
"fields": {
"ParentId": "001XXXXXXXXXXXXXXX",
"Title": "Follow up from QBR"
}
}
}
fields and parent records: Whatever you pass must match the Salesforce REST create shape for that object_type—different sObjects use different fields to relate a new row to an Account or other parent. For example:
Note—ParentIdis the record the note attaches to; when the flow is creating a note on an Account, setParentIdto that account’s Id (001...).Task— useWhatIdfor the related Account, Opportunity, or other allowed object, and set standard fields such asSubject(andStatusif your org requires it). UseWhoIdwhen you also relate the task to a Contact or Lead.
Other object types follow their own API field names; use Salesforce’s object reference or metadata for required and writable fields.
Args: Same data_connection_id and object_type pattern as salesforce.read_record / salesforce.update_record. fields is the JSON body sent on create: API names to values. Values may be strings, numbers, booleans, or nested objects where Salesforce accepts them (unlike salesforce.update_record, where fields is a string map for PATCH).
Returns: { "id": "<Salesforce Id>", "ok": true }
Test runs: Validates arguments and connection type; does not call Salesforce — returns a synthetic id and test: true in the result (same pattern as other side-effect CALLs in test mode).
hubspot.read_record
Read a HubSpot record.
"call": {
"function_id": "hubspot.read_record",
"args": {
"data_connection_id": "hs_conn_123",
"object_type": "companies",
"record_id": "123456789",
"fields": ["name", "domain", "industry"]
}
}
Returns: { "record": { ... } }
hubspot.update_record
Update a HubSpot record.
"call": {
"function_id": "hubspot.update_record",
"args": {
"data_connection_id": "hs_conn_123",
"object_type": "companies",
"record_id": "123456789",
"fields": {
"funnel_stage": "Expansion",
"health_status": "watch"
}
}
}
Returns: { "ok": true }
search.web
Run a web search.
"call": {
"function_id": "search.web",
"args": {
"query": "latest customer onboarding best practices",
"recency_filter": "30d"
}
}
Returns: [ { "title": "...", "url": "...", "snippet": "...", "date": "..." } ]
recency_filter options: 7d, 30d, 90d, 1y
accounts.select
Select accounts using Funnel filters. Uses the UniversalFilter (FilterGroup) shape.
Filters use boolean logic: and_group contains or_group arrays, each or_group contains individual filter entries.
"call": {
"function_id": "accounts.select",
"args": {
"filter": {
"and_group": [
{
"or_group": [
{
"filter": {
"name": "",
"metric_filter": {
"metric_id": "product_engagement",
"condition": "equal",
"value": "daily_active"
}
}
},
{
"filter": {
"name": "",
"rule_filter": {
"activity_id": "019bacd1-e737-7bef-a310-c35ff896febd",
"condition": "count_is_more_than_or_equal",
"value": "5"
}
}
}
]
}
]
}
}
}
Returns: { "total_count": N, "account_ids": ["acct_1", "acct_2"] }
Note: metric_id and activity_id values are workspace-specific. These must be looked up from the workspace configuration — they cannot be guessed.
template.render
Render a stored template with variables.
"call": {
"function_id": "template.render",
"args": {
"template_id": "my_template",
"vars": {
"name": "{{ $.account_name }}",
"score": "{{ $.health_score }}"
}
}
}
Returns: The rendered template string.
template_id(required): ID of the template to rendervars(optional): Key-value map of variables to inject into the template
AGENT — Running an LLM
{
"id": "analyze",
"op": "AGENT",
"next": "",
"out": { "set": "@.response" },
"agent": {
"model_type": "small",
"system": "You are an analyst. Output ONLY raw JSON, no markdown, no backticks.",
"user": "Analyze this data:\n{{ $.data }}",
"tools": [
{ "name": "semantic_query", "function_id": "semantic.query" }
]
}
}
Decision rules:
small: Use for structured extraction, classification, formatting — any task with a clear expected output shapelarge: Use only for complex reasoning over multiple inputs, nuanced analysis, or open-ended generation- For JSON output: ALWAYS include "Output ONLY raw JSON, no markdown, no backticks" in the system prompt
- Chat final response: The last AGENT step must use
"out": { "set": "@.response" }. This is the variable the chat runtime reads to send a reply. If you omit this, the chat will produce no visible response.
Tools on AGENT steps (vs CALL steps)
CALL steps can invoke the full function catalog — the flow author decides when each function runs and what arguments go in. AGENT steps can also invoke functions, but only when you grant access by attaching tools. The set of functions available as agent tools is currently smaller than the full CALL catalog:
semantic.queryemail.sendslack.send_messagetasks.create
Each tool entry has name (what the model invokes) and function_id (which function runs). Example: { "name": "query_semantic_db", "function_id": "semantic.query" }.
Functions not in this list (CRM read/update, dataset operations, web search, etc.) remain available in ordinary CALL steps.
Fixed arguments on tools
For each tool you can set fixed_args — argument keys the model must not choose. Fixed values are merged at runtime and those keys are hidden from the model's view of the tool schema, so it only sees parameters it is allowed to fill in.
{
"tools": [
{
"name": "post_to_slack",
"function_id": "slack.send_message",
"fixed_args": {
"connection_id": "slack_conn_123",
"channel_id": "C01234567"
}
}
]
}
In this example the model can only compose the message; connection and channel are locked by the flow author.
Advanced: threads, memory, and multi-turn
These are optional add-ons for specialized flows:
thread_id: Optional. Restores/continues a conversation thread from previous runs, enabling multi-turn agent interactions.variable_store: Set totrueto give the agent persistent key-value storage tools (variable_get,variable_set,variable_push,variable_pop,variable_clear) for scratchpad memory across tool calls.multi_turn: Set totrueto allow the agent to pause execution and wait for external input before continuing.
LOOP — Iterating Over Arrays
{
"id": "process_items",
"op": "LOOP",
"next": "",
"loop": {
"over": "items.results",
"var": "item",
"step": "handle_item"
}
}
over: Path to array (e.g.,"items.results")var: Current item variable name (access via$.item)step: Step to execute for each item — this step can chain to further steps vianext- The loop variable (
$.item) and any local variables set within the loop body are available to all chained steps within the same iteration - Max 1000 iterations — the step fails if the array exceeds this
CRITICAL — Loop Scheduling Gotcha:
Loops use breadth-first scheduling, not depth-first. For a loop over [A, B, C] with steps process → save, the execution order is:
process(A) → process(B) → process(C) → save(A) → save(B) → save(C)
NOT the intuitive process(A) → save(A) → process(B) → save(B) → ...
All first steps in each iteration run before any second steps. This means:
- NEVER write to a
@.global in one step and read it in a later step within the same loop — the global will hold the value from the last iteration by the time any later steps run - ALWAYS use local variables (bare names, no
@.) to pass data between chained steps within the same iteration — each iteration has its own isolated scope, so this is safe - Use
@.globals inside loops only for accumulating results (e.g.,out.append) that you read after the loop completes
CONDITION — Boolean Gate
{
"id": "check_data",
"op": "CONDITION",
"next": "process_data",
"condition": {
"condition": "$.has_data"
}
}
- If true: proceeds to
next - If false: execution stops (no next step scheduled)
- There is no if/else. For branching, use BRANCH with separate CONDITION steps on each path.
BRANCH + JOIN — Parallel Execution
{
"id": "parallel_tasks",
"op": "BRANCH",
"next": "join_results",
"branch": {
"parallel_paths": ["task1", "task2", "task3"]
}
}
{
"id": "join_results",
"op": "JOIN",
"next": "aggregate"
}
- All paths in
parallel_pathsexecute concurrently - JOIN waits for all branches to complete before proceeding
TRANSFORM — Formatting and Extraction
{
"id": "format_output",
"op": "TRANSFORM",
"next": "save",
"out": { "set": "@.formatted" },
"transform": {
"type": "format",
"input": "$.raw_data",
"template": "Summary: {{ $.input.summary }}, Count: {{ $.input.count }}"
}
}
Transform types:
-
format: Apply Go template to inputinput: Path to input variabletemplate: Go template string (access input as$.input)
-
regexp_extract: Extract text using regexinput: Path to string variablepattern: Regex patterngroup: Capture group index (default: 1)
WAIT — Pause Execution
{ "id": "wait", "op": "WAIT", "next": "continue", "wait": { "duration": "5m" } }
Duration — for example "1s", "5m", "1h".
SPAWN — Start a Subplan
{
"id": "spawn_subflow",
"op": "SPAWN",
"next": "continue",
"spawn": {
"plan_id": "subplan_name",
"input": { "param1": "{{ $.value1 }}" }
}
}
plan_id: ID of subplan defined inconfig.subplans- Subplan runs concurrently with the parent flow
Template Syntax
Flows use Go's text/template syntax for string interpolation.
Basic Interpolation
"SELECT * FROM accounts WHERE id = '{{ $.account_id }}'"
Conditionals
"{{if $.limit}}LIMIT {{ $.limit }}{{else}}LIMIT 100{{end}}"
Accessing Nested Data
Use the index function for array access and complex keys:
"{{ index (index $.data.results 0) \"domain\" }}"
IMPORTANT: Go templates do NOT support bracket syntax like $.results[0]. Always use the index function.
Important Notes
- Missing variables resolve to empty string (not errors)
- Use single quotes for SQL strings:
'{{ $.id }}' - No
| jsonfilter available — LLM receives raw object representation - Template errors are silent — a malformed template produces empty output, not a runtime error
Dataset Operations
Datasets are persistent key-value stores for flow outputs. Records are stored as JSON objects with a string key.
IMPORTANT: Dataset writes (upsert, set_field, delete) are permanent, side-effectful operations. Only use them when the user has explicitly asked to store data, or when the flow's documented purpose is to persist results for later processing. Do not write to a dataset speculatively.
CRITICAL — Dataset Name Rules:
- NEVER invent a dataset name. Only use a dataset if the user has explicitly provided the dataset name in their request.
- ALWAYS verify the dataset exists before using it by querying
dataset_records('name')(e.g.,SELECT 1 FROM dataset_records('my_dataset') LIMIT 1). If the query fails or returns no schema, the dataset does not exist — stop and inform the user rather than proceeding. - There is no MCP tool to create a new dataset. If the user asks to create one, explain that dataset creation is not supported as of now.
Querying Dataset Records
Use dataset_records('name') as a table in semantic.query:
-- Basic query
SELECT * FROM dataset_records('my_dataset') LIMIT 100
-- Filter by record fields (JSONB operators)
SELECT * FROM dataset_records('my_dataset')
WHERE record->>'status' = 'completed'
-- Access nested JSON
SELECT
key,
record->>'name' as name,
record->'analysis'->>'category' as category
FROM dataset_records('my_dataset')
Writing Records
Use the CALL functions in flows:
dataset.record.upsert— Create or overwrite a full recorddataset.record.set_field— Update a single fielddataset.record.delete— Delete a record by key
Common Patterns
1. Fetch → Process → Store (ETL)
{
"entrypoint": "fetch",
"steps": {
"fetch": {
"id": "fetch", "op": "CALL", "next": "loop",
"out": { "set": "items" },
"call": { "function_id": "semantic.query", "args": { "query": "SELECT * FROM tickets LIMIT 100" } }
},
"loop": {
"id": "loop", "op": "LOOP", "next": "",
"loop": { "over": "items.results", "var": "item", "step": "process" }
},
"process": {
"id": "process", "op": "AGENT", "next": "save",
"out": { "set": "analysis" },
"agent": { "model_type": "small", "system": "Extract insights as JSON. Output ONLY raw JSON.", "user": "{{ $.item }}" }
},
"save": {
"id": "save", "op": "CALL", "next": "",
"call": {
"function_id": "dataset.record.upsert",
"args": { "dataset": "my_analysis", "key": "{{ $.item.id }}", "record": { "analysis": "$.analysis" } }
}
}
}
}
2. Multi-Query Aggregation → Report
{
"entrypoint": "query1",
"steps": {
"query1": {
"id": "query1", "op": "CALL", "next": "query2",
"out": { "set": "@.data1" },
"call": { "function_id": "semantic.query", "args": { "query": "..." } }
},
"query2": {
"id": "query2", "op": "CALL", "next": "generate",
"out": { "set": "@.data2" },
"call": { "function_id": "semantic.query", "args": { "query": "..." } }
},
"generate": {
"id": "generate", "op": "AGENT", "next": "",
"out": { "set": "@.response" },
"agent": { "model_type": "large", "system": "Generate a report.", "user": "Data 1:\n{{ $.data1 }}\n\nData 2:\n{{ $.data2 }}" }
}
}
}
3. Incremental Processing (Skip Already Processed)
SELECT t.* FROM tickets t
LEFT JOIN dataset_records('processed') d ON t.id = d.key
WHERE d.key IS NULL
LIMIT {{if $.limit}}{{ $.limit }}{{else}}100{{end}}
This is the standard pattern for flows that run repeatedly and should only process new records.
Common Mistakes
- NEVER use
{{ $.var }}to pass an entire object — use"$.var"(quoted, no braces). Template interpolation stringifies objects unpredictably. - NEVER assume a
CALLstep always succeeds.semantic.queryanddata_connection.queryfail the step on query errors — the current path stops. Design flows to be resilient to step failures. - NEVER store loop results as bare local variables if you need them after the loop — use
@.globals orout.append. - NEVER write to a
@.global in one loop step and read it in a later step of the same loop — loops are breadth-first, so all first steps run before any second steps. Use local variables to pass data within an iteration. - NEVER use
CONDITIONexpecting if/else — it's a gate that stops execution on false. UseBRANCHfor true branching. - ALWAYS add "Output ONLY raw JSON, no markdown, no backticks" to AGENT system prompts when you need JSON.
- ALWAYS set
"next": ""on the last step — omitting it causes runtime errors.
Runtime Limits
| Limit | Value | Behavior |
|---|---|---|
Function output (CALL) | 100 KiB | Step fails |
| Global variables | 1 MiB | Run fails |
| Step memory | 10 MiB | Run fails |
| Loop iterations | 1000 | Step fails |
| Events size | 5 MiB | Run fails |
| Chat ticks per request | 1000 | Request stops |
| Background run ticks per execution | 100 | Run pauses until next runner cycle |
| Chat wakeup/wait | 1 minute | Fails with WAIT_TOO_LONG |
| Run timeout | 24 hours | Background/scheduled runs |
Useful Tips
Getting the Current Date/Time
The flow runtime does not expose the current date directly. Use a semantic.query CALL to get it:
"call": {
"function_id": "semantic.query",
"args": {
"query": "SELECT date('now') as today, strftime('%w', 'now') as day_of_week"
}
}
This returns: { "results": [{ "today": "2026-03-02", "day_of_week": "1" }] } (0=Sunday, 6=Saturday). Use this before any step that needs date-aware logic.
Debugging
- Template not interpolating? Check variable path. Use
{{ . }}to see entire context. - Empty results? Verify query syntax and table/column names.
- Wrong variable scope? Use
@.varfor global,$.varfor local. When in doubt, use@.. - LLM output malformed? Add explicit formatting instructions in system prompt.
- Object passed as string? You used
{{ $.var }}instead of"$.var". - Chat produces no response? Ensure the final AGENT step stores output in
@.response.