Skip to content

Pattern: Pipeline Bot

The Pipeline Bot pattern executes a series of dependent steps, where each step is handled by a specialized expert. This is useful for complex workflows where order matters and earlier results inform later steps.

Reference implementation: Owl Strategist (marketing strategy pipeline)

When to Use

Use this pattern when you need:

  • Sequential execution with dependencies
  • Different prompts/tools per step
  • Results from step N used in step N+1
  • User ability to re-run specific steps

Architecture

┌─────────────────────────────────────────────────────────────┐
│ PIPELINE BOT │
│ │
│ Step Dependencies: │
│ input ──► diagnostic ──► metrics ──┬──► segment │
│ └──► channels │
│ └──► messaging │
│ └──► tactics│
│ │
│ User: "Run diagnostic step" │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Tool: run_agent │ │
│ │ 1. Check dependencies (input exists?) │ │
│ │ 2. Load context from prior steps │ │
│ │ 3. Create subchat with step-specific expert │ │
│ └──────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Subchat: diagnostic expert │ │
│ │ - Specialized prompt for diagnostic analysis │ │
│ │ - Saves result to policy document │ │
│ │ - Kernel detects AGENT_COMPLETE marker │ │
│ └──────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ Result saved, user can run next step │
└─────────────────────────────────────────────────────────────┘

Complete Implementation

pipeline_bot.py

import asyncio
import json
from typing import Dict, Any, List
from flexus_client_kit import ckit_client
from flexus_client_kit import ckit_cloudtool
from flexus_client_kit import ckit_bot_exec
from flexus_client_kit import ckit_shutdown
from flexus_client_kit import ckit_ask_model
from flexus_client_kit.integrations import fi_pdoc
from pipeline import pipeline_install
BOT_NAME = "pipeline"
BOT_VERSION = "1.0.0"
# Step dependencies: each step requires these prior steps
STEP_DEPENDENCIES = {
"diagnostic": ["input"],
"metrics": ["input", "diagnostic"],
"segment": ["input", "diagnostic", "metrics"],
"channels": ["input", "diagnostic", "metrics"],
"messaging": ["input", "diagnostic", "segment", "channels"],
"tactics": ["input", "diagnostic", "segment", "channels", "messaging"],
}
# All steps in execution order
ALL_STEPS = ["input", "diagnostic", "metrics", "segment", "channels", "messaging", "tactics"]
# --- Tool Definitions ---
RUN_AGENT_TOOL = ckit_cloudtool.CloudTool(
strict=True,
name="run_agent",
description="Run a specific step in the pipeline.",
parameters={
"type": "object",
"properties": {
"step": {
"type": "string",
"enum": ALL_STEPS[1:], # All except "input"
"description": "Which step to run",
},
"experiment_id": {
"type": "string",
"description": "Experiment/project ID for storing results",
},
},
"required": ["step", "experiment_id"],
"additionalProperties": False,
},
)
RERUN_AGENT_TOOL = ckit_cloudtool.CloudTool(
strict=True,
name="rerun_agent",
description="Re-run a step with new parameters or context.",
parameters={
"type": "object",
"properties": {
"step": {
"type": "string",
"enum": ALL_STEPS[1:],
"description": "Which step to re-run",
},
"experiment_id": {
"type": "string",
"description": "Experiment ID",
},
"additional_context": {
"type": ["string", "null"],
"description": "Additional instructions or context",
},
},
"required": ["step", "experiment_id", "additional_context"],
"additionalProperties": False,
},
)
TOOLS = [
RUN_AGENT_TOOL,
RERUN_AGENT_TOOL,
fi_pdoc.POLICY_DOCUMENT_TOOL,
]
async def pipeline_main_loop(fclient: ckit_client.FlexusClient, rcx: ckit_bot_exec.RobotContext):
setup = ckit_bot_exec.official_setup_mixing_procedure(
pipeline_install.pipeline_setup_schema,
rcx.persona.persona_setup
)
pdoc = fi_pdoc.IntegrationPdoc(rcx, rcx.persona.ws_root_group_id)
async def check_dependencies(experiment_id: str, step: str) -> tuple[bool, list]:
"""Check if all dependencies are satisfied."""
deps = STEP_DEPENDENCIES.get(step, [])
missing = []
for dep in deps:
path = f"/experiments/{experiment_id}/{dep}"
result = await pdoc.pdoc_read(path)
if not result or "error" in result.lower():
missing.append(dep)
return len(missing) == 0, missing
async def load_context(experiment_id: str, steps: List[str]) -> str:
"""Load results from prior steps as context."""
context_parts = []
for step in steps:
path = f"/experiments/{experiment_id}/{step}"
result = await pdoc.pdoc_read(path)
if result and "error" not in result.lower():
context_parts.append(f"## {step.title()} Results\n\n{result}")
return "\n\n---\n\n".join(context_parts)
@rcx.on_tool_call(RUN_AGENT_TOOL.name)
async def handle_run_agent(toolcall: ckit_cloudtool.FCloudtoolCall, args: Dict[str, Any]) -> str:
step = args["step"]
experiment_id = args["experiment_id"]
# Check dependencies
deps_ok, missing = await check_dependencies(experiment_id, step)
if not deps_ok:
return f"Error: Missing required steps: {', '.join(missing)}. Run them first."
# Load context from prior steps
context = await load_context(experiment_id, STEP_DEPENDENCIES.get(step, []))
# Build the question for this step
question = f"""Execute the {step} step for experiment {experiment_id}.
## Prior Context
{context}
## Your Task
Complete the {step} analysis. Save your results to /experiments/{experiment_id}/{step}
When done, end your response with AGENT_COMPLETE."""
# Create subchat with step-specific expert
subchats = await ckit_ask_model.bot_subchat_create_multiple(
client=fclient,
who_is_asking=f"pipeline_{step}",
persona_id=rcx.persona.persona_id,
first_question=[question],
first_calls=["null"],
title=[f"{step.title()} Analysis"],
fcall_id=toolcall.fcall_id,
fexp_name=step, # Each step has its own expert
)
raise ckit_cloudtool.WaitForSubchats(subchats)
@rcx.on_tool_call(RERUN_AGENT_TOOL.name)
async def handle_rerun_agent(toolcall: ckit_cloudtool.FCloudtoolCall, args: Dict[str, Any]) -> str:
step = args["step"]
experiment_id = args["experiment_id"]
additional_context = args.get("additional_context", "")
# Same as run_agent but with additional context
context = await load_context(experiment_id, STEP_DEPENDENCIES.get(step, []))
question = f"""Re-execute the {step} step for experiment {experiment_id}.
## Prior Context
{context}
## Additional Instructions
{additional_context}
## Your Task
Redo the {step} analysis considering the additional instructions. Save to /experiments/{experiment_id}/{step}
When done, end with AGENT_COMPLETE."""
subchats = await ckit_ask_model.bot_subchat_create_multiple(
client=fclient,
who_is_asking=f"pipeline_{step}_rerun",
persona_id=rcx.persona.persona_id,
first_question=[question],
first_calls=["null"],
title=[f"{step.title()} Re-analysis"],
fcall_id=toolcall.fcall_id,
fexp_name=step,
)
raise ckit_cloudtool.WaitForSubchats(subchats)
@rcx.on_tool_call(fi_pdoc.POLICY_DOCUMENT_TOOL.name)
async def handle_pdoc(toolcall, args):
return await pdoc.called_by_model(toolcall, args)
try:
while not ckit_shutdown.shutdown_event.is_set():
await rcx.unpark_collected_events(sleep_if_no_work=10.0)
finally:
pass
def main():
scenario_fn = ckit_bot_exec.parse_bot_args()
fclient = ckit_client.FlexusClient(
ckit_client.bot_service_name(BOT_NAME, BOT_VERSION),
endpoint="/v1/jailed-bot"
)
asyncio.run(ckit_bot_exec.run_bots_in_this_group(
fclient,
marketable_name=BOT_NAME,
marketable_version_str=BOT_VERSION,
bot_main_loop=pipeline_main_loop,
inprocess_tools=TOOLS,
scenario_fn=scenario_fn,
install_func=pipeline_install.install,
))
if __name__ == "__main__":
main()

pipeline_install.py

import json
from flexus_client_kit import ckit_bot_install
from flexus_client_kit.ckit_bot_install import FMarketplaceExpertInput
from pipeline import pipeline_bot, pipeline_prompts
pipeline_setup_schema = {}
# Tools for the orchestrator
orchestrator_tools = json.dumps([t.openai_style_tool() for t in pipeline_bot.TOOLS])
# Tools for step agents (just need pdoc to save results)
agent_tools = json.dumps([
fi_pdoc.POLICY_DOCUMENT_TOOL.openai_style_tool()
])
# Kernel for step agents
AGENT_KERNEL = """
msg = messages[-1]
if msg["role"] == "assistant":
content = str(msg.get("content", ""))
if "AGENT_COMPLETE" in content:
subchat_result = content
elif not msg.get("tool_calls"):
post_cd_instruction = "Save your results and end with AGENT_COMPLETE."
"""
def install(ws_id: str):
# Build experts list: one orchestrator + one per step
experts = [
("default", FMarketplaceExpertInput(
fexp_system_prompt=pipeline_prompts.SYSTEM_PROMPT_ORCHESTRATOR,
fexp_app_capture_tools=orchestrator_tools,
)),
]
# Add an expert for each step
for step in pipeline_bot.ALL_STEPS[1:]: # Skip "input"
prompt = getattr(pipeline_prompts, f"SYSTEM_PROMPT_{step.upper()}",
pipeline_prompts.SYSTEM_PROMPT_GENERIC_STEP)
experts.append((step, FMarketplaceExpertInput(
fexp_system_prompt=prompt,
fexp_python_kernel=AGENT_KERNEL,
fexp_app_capture_tools=agent_tools,
)))
return ckit_bot_install.marketplace_upsert_dev_bot(
ws_id=ws_id,
marketable_name="pipeline",
marketable_version="1.0.0",
marketable_title1="Pipeline Bot",
marketable_title2="Execute multi-step workflows with dependencies",
marketable_occupation="Workflow Manager",
marketable_author="Your Name",
marketable_description="""
Pipeline Bot executes complex multi-step workflows.
## Steps
1. **input** - User provides initial data
2. **diagnostic** - Analyze current state
3. **metrics** - Define success metrics
4. **segment** - Identify target segments
5. **channels** - Select distribution channels
6. **messaging** - Craft messaging
7. **tactics** - Plan tactical execution
Each step can be run or re-run independently.
""",
marketable_typical_group="Business / Strategy",
marketable_tags=["pipeline", "workflow", "strategy"],
marketable_setup_schema=json.dumps(pipeline_setup_schema),
marketable_picture_big_b64="",
marketable_picture_small_b64="",
marketable_experts=experts,
marketable_schedule=[],
)
if __name__ == "__main__":
ckit_bot_install.main_install_dev_bot(install)

Key Concepts

Dependency Management

Define which steps must complete before others:

STEP_DEPENDENCIES = {
"diagnostic": ["input"],
"metrics": ["input", "diagnostic"],
# ...
}

Check dependencies before running a step:

async def check_dependencies(experiment_id, step):
for dep in STEP_DEPENDENCIES[step]:
result = await pdoc.pdoc_read(f"/experiments/{experiment_id}/{dep}")
if not result:
return False, [dep]
return True, []

Context Accumulation

Load results from prior steps to inform current step:

async def load_context(experiment_id, steps):
context = []
for step in steps:
result = await pdoc.pdoc_read(f"/experiments/{experiment_id}/{step}")
context.append(f"## {step}\n{result}")
return "\n\n".join(context)

Per-Step Experts

Each step has its own expert with:

  • Specialized prompt — Focused on that step’s task
  • Shared kernel — Detects AGENT_COMPLETE marker
  • Minimal tools — Just what’s needed (usually just pdoc)
Expert Per Step

Having separate experts allows different prompts, tools, and behaviors for each step. The orchestrator routes to the right expert based on the step being executed.

Variations

Automatic Pipeline

Run all steps automatically in sequence:

RUN_ALL_TOOL = ckit_cloudtool.CloudTool(
name="run_all",
description="Run all pipeline steps in order",
...
)
@rcx.on_tool_call("run_all")
async def handle_run_all(toolcall, args):
for step in ALL_STEPS[1:]:
# Run each step, wait for completion
# Check if previous step succeeded before continuing

Parallel Steps

For steps without dependencies on each other:

# metrics and segment can run in parallel after diagnostic
if step in ["metrics", "segment"]:
# Create both subchats at once

Branching Pipelines

Different paths based on intermediate results:

if diagnostic_result["category"] == "B2B":
next_steps = ["segment_b2b", "channels_b2b"]
else:
next_steps = ["segment_b2c", "channels_b2c"]