Pattern: Pipeline Bot
The Pipeline Bot pattern executes a series of dependent steps, where each step is handled by a specialized expert. This is useful for complex workflows where order matters and earlier results inform later steps.
Reference implementation: Owl Strategist (marketing strategy pipeline)
When to Use
Use this pattern when you need:
- Sequential execution with dependencies
- Different prompts/tools per step
- Results from step N used in step N+1
- User ability to re-run specific steps
Architecture
┌─────────────────────────────────────────────────────────────┐│ PIPELINE BOT ││ ││ Step Dependencies: ││ input ──► diagnostic ──► metrics ──┬──► segment ││ └──► channels ││ └──► messaging ││ └──► tactics││ ││ User: "Run diagnostic step" ││ │ ││ ▼ ││ ┌──────────────────────────────────────────────────────┐ ││ │ Tool: run_agent │ ││ │ 1. Check dependencies (input exists?) │ ││ │ 2. Load context from prior steps │ ││ │ 3. Create subchat with step-specific expert │ ││ └──────────────────────────────────────────────────────┘ ││ │ ││ ▼ ││ ┌──────────────────────────────────────────────────────┐ ││ │ Subchat: diagnostic expert │ ││ │ - Specialized prompt for diagnostic analysis │ ││ │ - Saves result to policy document │ ││ │ - Kernel detects AGENT_COMPLETE marker │ ││ └──────────────────────────────────────────────────────┘ ││ │ ││ ▼ ││ Result saved, user can run next step │└─────────────────────────────────────────────────────────────┘Complete Implementation
pipeline_bot.py
import asyncioimport jsonfrom typing import Dict, Any, List
from flexus_client_kit import ckit_clientfrom flexus_client_kit import ckit_cloudtoolfrom flexus_client_kit import ckit_bot_execfrom flexus_client_kit import ckit_shutdownfrom flexus_client_kit import ckit_ask_modelfrom flexus_client_kit.integrations import fi_pdocfrom pipeline import pipeline_install
BOT_NAME = "pipeline"BOT_VERSION = "1.0.0"
# Step dependencies: each step requires these prior stepsSTEP_DEPENDENCIES = { "diagnostic": ["input"], "metrics": ["input", "diagnostic"], "segment": ["input", "diagnostic", "metrics"], "channels": ["input", "diagnostic", "metrics"], "messaging": ["input", "diagnostic", "segment", "channels"], "tactics": ["input", "diagnostic", "segment", "channels", "messaging"],}
# All steps in execution orderALL_STEPS = ["input", "diagnostic", "metrics", "segment", "channels", "messaging", "tactics"]
# --- Tool Definitions ---
RUN_AGENT_TOOL = ckit_cloudtool.CloudTool( strict=True, name="run_agent", description="Run a specific step in the pipeline.", parameters={ "type": "object", "properties": { "step": { "type": "string", "enum": ALL_STEPS[1:], # All except "input" "description": "Which step to run", }, "experiment_id": { "type": "string", "description": "Experiment/project ID for storing results", }, }, "required": ["step", "experiment_id"], "additionalProperties": False, },)
RERUN_AGENT_TOOL = ckit_cloudtool.CloudTool( strict=True, name="rerun_agent", description="Re-run a step with new parameters or context.", parameters={ "type": "object", "properties": { "step": { "type": "string", "enum": ALL_STEPS[1:], "description": "Which step to re-run", }, "experiment_id": { "type": "string", "description": "Experiment ID", }, "additional_context": { "type": ["string", "null"], "description": "Additional instructions or context", }, }, "required": ["step", "experiment_id", "additional_context"], "additionalProperties": False, },)
TOOLS = [ RUN_AGENT_TOOL, RERUN_AGENT_TOOL, fi_pdoc.POLICY_DOCUMENT_TOOL,]
async def pipeline_main_loop(fclient: ckit_client.FlexusClient, rcx: ckit_bot_exec.RobotContext): setup = ckit_bot_exec.official_setup_mixing_procedure( pipeline_install.pipeline_setup_schema, rcx.persona.persona_setup )
pdoc = fi_pdoc.IntegrationPdoc(rcx, rcx.persona.ws_root_group_id)
async def check_dependencies(experiment_id: str, step: str) -> tuple[bool, list]: """Check if all dependencies are satisfied.""" deps = STEP_DEPENDENCIES.get(step, []) missing = []
for dep in deps: path = f"/experiments/{experiment_id}/{dep}" result = await pdoc.pdoc_read(path) if not result or "error" in result.lower(): missing.append(dep)
return len(missing) == 0, missing
async def load_context(experiment_id: str, steps: List[str]) -> str: """Load results from prior steps as context.""" context_parts = []
for step in steps: path = f"/experiments/{experiment_id}/{step}" result = await pdoc.pdoc_read(path) if result and "error" not in result.lower(): context_parts.append(f"## {step.title()} Results\n\n{result}")
return "\n\n---\n\n".join(context_parts)
@rcx.on_tool_call(RUN_AGENT_TOOL.name) async def handle_run_agent(toolcall: ckit_cloudtool.FCloudtoolCall, args: Dict[str, Any]) -> str: step = args["step"] experiment_id = args["experiment_id"]
# Check dependencies deps_ok, missing = await check_dependencies(experiment_id, step) if not deps_ok: return f"Error: Missing required steps: {', '.join(missing)}. Run them first."
# Load context from prior steps context = await load_context(experiment_id, STEP_DEPENDENCIES.get(step, []))
# Build the question for this step question = f"""Execute the {step} step for experiment {experiment_id}.
## Prior Context
{context}
## Your Task
Complete the {step} analysis. Save your results to /experiments/{experiment_id}/{step}
When done, end your response with AGENT_COMPLETE."""
# Create subchat with step-specific expert subchats = await ckit_ask_model.bot_subchat_create_multiple( client=fclient, who_is_asking=f"pipeline_{step}", persona_id=rcx.persona.persona_id, first_question=[question], first_calls=["null"], title=[f"{step.title()} Analysis"], fcall_id=toolcall.fcall_id, fexp_name=step, # Each step has its own expert )
raise ckit_cloudtool.WaitForSubchats(subchats)
@rcx.on_tool_call(RERUN_AGENT_TOOL.name) async def handle_rerun_agent(toolcall: ckit_cloudtool.FCloudtoolCall, args: Dict[str, Any]) -> str: step = args["step"] experiment_id = args["experiment_id"] additional_context = args.get("additional_context", "")
# Same as run_agent but with additional context context = await load_context(experiment_id, STEP_DEPENDENCIES.get(step, []))
question = f"""Re-execute the {step} step for experiment {experiment_id}.
## Prior Context
{context}
## Additional Instructions
{additional_context}
## Your Task
Redo the {step} analysis considering the additional instructions. Save to /experiments/{experiment_id}/{step}
When done, end with AGENT_COMPLETE."""
subchats = await ckit_ask_model.bot_subchat_create_multiple( client=fclient, who_is_asking=f"pipeline_{step}_rerun", persona_id=rcx.persona.persona_id, first_question=[question], first_calls=["null"], title=[f"{step.title()} Re-analysis"], fcall_id=toolcall.fcall_id, fexp_name=step, )
raise ckit_cloudtool.WaitForSubchats(subchats)
@rcx.on_tool_call(fi_pdoc.POLICY_DOCUMENT_TOOL.name) async def handle_pdoc(toolcall, args): return await pdoc.called_by_model(toolcall, args)
try: while not ckit_shutdown.shutdown_event.is_set(): await rcx.unpark_collected_events(sleep_if_no_work=10.0) finally: pass
def main(): scenario_fn = ckit_bot_exec.parse_bot_args() fclient = ckit_client.FlexusClient( ckit_client.bot_service_name(BOT_NAME, BOT_VERSION), endpoint="/v1/jailed-bot" )
asyncio.run(ckit_bot_exec.run_bots_in_this_group( fclient, marketable_name=BOT_NAME, marketable_version_str=BOT_VERSION, bot_main_loop=pipeline_main_loop, inprocess_tools=TOOLS, scenario_fn=scenario_fn, install_func=pipeline_install.install, ))
if __name__ == "__main__": main()pipeline_install.py
import jsonfrom flexus_client_kit import ckit_bot_installfrom flexus_client_kit.ckit_bot_install import FMarketplaceExpertInputfrom pipeline import pipeline_bot, pipeline_prompts
pipeline_setup_schema = {}
# Tools for the orchestratororchestrator_tools = json.dumps([t.openai_style_tool() for t in pipeline_bot.TOOLS])
# Tools for step agents (just need pdoc to save results)agent_tools = json.dumps([ fi_pdoc.POLICY_DOCUMENT_TOOL.openai_style_tool()])
# Kernel for step agentsAGENT_KERNEL = """msg = messages[-1]if msg["role"] == "assistant": content = str(msg.get("content", "")) if "AGENT_COMPLETE" in content: subchat_result = content elif not msg.get("tool_calls"): post_cd_instruction = "Save your results and end with AGENT_COMPLETE.""""
def install(ws_id: str): # Build experts list: one orchestrator + one per step experts = [ ("default", FMarketplaceExpertInput( fexp_system_prompt=pipeline_prompts.SYSTEM_PROMPT_ORCHESTRATOR, fexp_app_capture_tools=orchestrator_tools, )), ]
# Add an expert for each step for step in pipeline_bot.ALL_STEPS[1:]: # Skip "input" prompt = getattr(pipeline_prompts, f"SYSTEM_PROMPT_{step.upper()}", pipeline_prompts.SYSTEM_PROMPT_GENERIC_STEP) experts.append((step, FMarketplaceExpertInput( fexp_system_prompt=prompt, fexp_python_kernel=AGENT_KERNEL, fexp_app_capture_tools=agent_tools, )))
return ckit_bot_install.marketplace_upsert_dev_bot( ws_id=ws_id, marketable_name="pipeline", marketable_version="1.0.0", marketable_title1="Pipeline Bot", marketable_title2="Execute multi-step workflows with dependencies", marketable_occupation="Workflow Manager", marketable_author="Your Name", marketable_description="""Pipeline Bot executes complex multi-step workflows.
## Steps1. **input** - User provides initial data2. **diagnostic** - Analyze current state3. **metrics** - Define success metrics4. **segment** - Identify target segments5. **channels** - Select distribution channels6. **messaging** - Craft messaging7. **tactics** - Plan tactical execution
Each step can be run or re-run independently.""", marketable_typical_group="Business / Strategy", marketable_tags=["pipeline", "workflow", "strategy"], marketable_setup_schema=json.dumps(pipeline_setup_schema), marketable_picture_big_b64="", marketable_picture_small_b64="", marketable_experts=experts, marketable_schedule=[], )
if __name__ == "__main__": ckit_bot_install.main_install_dev_bot(install)Key Concepts
Dependency Management
Define which steps must complete before others:
STEP_DEPENDENCIES = { "diagnostic": ["input"], "metrics": ["input", "diagnostic"], # ...}Check dependencies before running a step:
async def check_dependencies(experiment_id, step): for dep in STEP_DEPENDENCIES[step]: result = await pdoc.pdoc_read(f"/experiments/{experiment_id}/{dep}") if not result: return False, [dep] return True, []Context Accumulation
Load results from prior steps to inform current step:
async def load_context(experiment_id, steps): context = [] for step in steps: result = await pdoc.pdoc_read(f"/experiments/{experiment_id}/{step}") context.append(f"## {step}\n{result}") return "\n\n".join(context)Per-Step Experts
Each step has its own expert with:
- Specialized prompt — Focused on that step’s task
- Shared kernel — Detects
AGENT_COMPLETEmarker - Minimal tools — Just what’s needed (usually just pdoc)
Having separate experts allows different prompts, tools, and behaviors for each step. The orchestrator routes to the right expert based on the step being executed.
Variations
Automatic Pipeline
Run all steps automatically in sequence:
RUN_ALL_TOOL = ckit_cloudtool.CloudTool( name="run_all", description="Run all pipeline steps in order", ...)
@rcx.on_tool_call("run_all")async def handle_run_all(toolcall, args): for step in ALL_STEPS[1:]: # Run each step, wait for completion # Check if previous step succeeded before continuingParallel Steps
For steps without dependencies on each other:
# metrics and segment can run in parallel after diagnosticif step in ["metrics", "segment"]: # Create both subchats at onceBranching Pipelines
Different paths based on intermediate results:
if diagnostic_result["category"] == "B2B": next_steps = ["segment_b2b", "channels_b2b"]else: next_steps = ["segment_b2c", "channels_b2c"]