Skip to content

ckit_bot_exec

The ckit_bot_exec module provides the core infrastructure for running Flexus bots: the RobotContext class, event handler decorators, and the main execution loop.

RobotContext

RobotContext (commonly named rcx) is the central object for your bot. It provides:

  • Access to persona configuration
  • Event handler registration
  • Thread and task tracking
  • Working directory for temporary files

Key Attributes

AttributeTypeDescription
rcx.personaFPersonaOutputBot’s persona configuration
rcx.latest_threadsDict[str, FThreadOutput]Active threads by ID
rcx.latest_tasksDict[str, FPersonaKanbanTaskOutput]Kanban tasks by ID
rcx.workdirstrTemporary directory path
rcx.fclientFlexusClientGraphQL client

Persona Fields

rcx.persona contains:

FieldTypeDescription
persona_idstrUnique persona ID
persona_setupdictUser’s setup values
ws_idstrWorkspace ID
ws_root_group_idstrRoot group for policy docs
persona_marketable_namestrBot type name
persona_daily_budgetintDaily coin budget
persona_daily_coinsintCoins spent today

Event Handlers

Register handlers using decorators on rcx:

@rcx.on_tool_call(tool_name)

Called when the LLM invokes one of your tools.

@rcx.on_tool_call("my_tool")
async def handle_my_tool(toolcall: FCloudtoolCall, args: dict) -> str:
"""
Args:
toolcall: Contains fcall_id, fcall_ft_id (thread), confirmed_by_human, etc.
args: The arguments the LLM provided (already parsed from JSON)
Returns:
str: Result text that LLM will see
OR List[dict]: Multi-modal response [{m_type: "image/png", m_content: "base64..."}]
Raises:
WaitForSubchats: To spawn subchats
NeedsConfirmation: To request human approval
"""
result = do_work(args["input"])
return f"Result: {result}"
Handler Required

Every tool in your TOOLS list must have a corresponding handler. Missing handlers cause tool calls to be silently ignored.

@rcx.on_updated_message

Called when a message is created or updated.

@rcx.on_updated_message
async def on_message(msg: FThreadMessageOutput):
"""
Fields:
msg.ftm_id: Message ID
msg.ftm_ft_id: Thread ID
msg.ftm_role: "user", "assistant", "tool", "cd_instruction"
msg.ftm_content: Message content (may be None for tool calls)
msg.ftm_tool_calls: List of tool calls (for assistant messages)
msg.ftm_provenance: Metadata (model, timing, kernel logs)
"""
if msg.ftm_role == "user":
print(f"User said: {msg.ftm_content}")

@rcx.on_updated_thread

Called when a thread state changes.

@rcx.on_updated_thread
async def on_thread(thread: FThreadOutput):
"""
Fields:
thread.ft_id: Thread ID
thread.ft_title: Thread title
thread.ft_need_assistant: >0 if waiting for LLM
thread.ft_need_tool_calls: >0 if waiting for tools
thread.ft_need_user: >0 if waiting for user
thread.ft_error: Error message if any
thread.ft_coins: Coins spent
thread.ft_budget: Budget limit
"""
if thread.ft_error:
print(f"Thread error: {thread.ft_error}")

@rcx.on_updated_task

Called when a kanban task changes.

@rcx.on_updated_task
async def on_task(task: FPersonaKanbanTaskOutput):
"""
Fields:
task.ktask_id: Task ID
task.ktask_column: "inbox", "todo", "inprogress", "done"
task.ktask_title: Task title
task.ktask_description: Task description
task.ktask_payload_json: Additional data
"""
if task.ktask_column == "done":
print(f"Task completed: {task.ktask_title}")

@rcx.on_erp_change(table_name)

Called when an ERP table row changes.

@rcx.on_erp_change("crm_contact")
async def on_contact_change(action: str, new_record, old_record):
"""
Args:
action: "INSERT", "UPDATE", "DELETE", or "ARCHIVE"
new_record: The new row data (typed dataclass from erp_schema)
old_record: The old row data (for UPDATE/DELETE/ARCHIVE)
Note: ARCHIVE means soft delete (archived_ts: 0 -> >0)
"""
if action == "INSERT":
print(f"New contact: {new_record.contact_first_name}")

To receive ERP events, specify tables in run_bots_in_this_group:

asyncio.run(ckit_bot_exec.run_bots_in_this_group(
fclient,
...,
subscribe_to_erp_tables=["crm_contact", "crm_deal"],
))

Main Loop

unpark_collected_events()

The core method for processing events:

while not ckit_shutdown.shutdown_event.is_set():
await rcx.unpark_collected_events(sleep_if_no_work=10.0)

This method:

  1. Processes all queued events sequentially
  2. Calls your registered handlers
  3. Catches exceptions (logs but doesn’t crash)
  4. Sleeps for sleep_if_no_work seconds if queue is empty

Alternative: ckit_shutdown.wait()

For bots that don’t need event processing:

while True:
# Do periodic work
await some_periodic_task()
# Sleep with shutdown check
if await ckit_shutdown.wait(120): # 120 seconds
break
Always Use Shutdown-Aware Sleep

Never use asyncio.sleep() directly. Use ckit_shutdown.wait() or unpark_collected_events() to ensure quick shutdown when needed.

Setup Mixing

Merge default setup with user overrides:

setup = ckit_bot_exec.official_setup_mixing_procedure(
mybot_install.mybot_setup_schema, # Default values
rcx.persona.persona_setup # User overrides
)
# Now use setup["some_key"]
api_key = setup["API_KEY"]

Entry Point Functions

parse_bot_args()

Parse command-line arguments:

scenario_fn = ckit_bot_exec.parse_bot_args()

Supported arguments:

  • --scenario path/to/scenario.yaml — Run scenario test
  • --group group_id — Run for specific group

run_bots_in_this_group()

Main execution function:

asyncio.run(ckit_bot_exec.run_bots_in_this_group(
fclient,
marketable_name="mybot",
marketable_version_str="1.0.0",
bot_main_loop=mybot_main_loop,
inprocess_tools=TOOLS,
scenario_fn=scenario_fn,
install_func=mybot_install.install,
subscribe_to_erp_tables=["crm_contact"], # Optional
))
ParameterRequiredDescription
fclientYesFlexusClient instance
marketable_nameYesBot’s marketplace name
marketable_version_strYesSemantic version
bot_main_loopYesYour async main loop function
inprocess_toolsYesList of CloudTool objects
scenario_fnNoScenario filename from args
install_funcNoInstallation function
subscribe_to_erp_tablesNoERP tables to watch

FCloudtoolCall Structure

When your tool handler is called:

FieldTypeDescription
fcall_idstrUnique call ID
fcall_ft_idstrThread ID
fcall_namestrTool name
fcall_argumentsstrJSON arguments (already parsed for you)
ws_idstrWorkspace ID
connected_persona_idstrBot’s persona ID
confirmed_by_humanboolTrue if user confirmed

Complete Example

import asyncio
from flexus_client_kit import ckit_client, ckit_bot_exec, ckit_cloudtool, ckit_shutdown
from flexus_client_kit import ckit_mongo
from pymongo import AsyncMongoClient
from mybot import mybot_install
BOT_NAME = "mybot"
BOT_VERSION = "1.0.0"
MY_TOOL = ckit_cloudtool.CloudTool(
strict=True, name="my_tool",
description="Does something",
parameters={
"type": "object",
"properties": {"input": {"type": "string"}},
"required": ["input"],
"additionalProperties": False
}
)
TOOLS = [MY_TOOL]
async def main_loop(fclient, rcx):
setup = ckit_bot_exec.official_setup_mixing_procedure(
mybot_install.mybot_setup_schema, rcx.persona.persona_setup)
# MongoDB setup
mongo_conn = await ckit_mongo.mongo_fetch_creds(fclient, rcx.persona.persona_id)
mongo = AsyncMongoClient(mongo_conn)
db = mongo[rcx.persona.persona_id + "_db"]
@rcx.on_tool_call(MY_TOOL.name)
async def handle(toolcall, args):
await db.logs.insert_one({"input": args["input"]})
return f"Processed: {args['input']}"
@rcx.on_updated_task
async def on_task(task):
if task.ktask_column == "inprogress":
print(f"Working on: {task.ktask_title}")
try:
while not ckit_shutdown.shutdown_event.is_set():
await rcx.unpark_collected_events(sleep_if_no_work=10.0)
finally:
mongo.close()
def main():
scenario_fn = ckit_bot_exec.parse_bot_args()
fclient = ckit_client.FlexusClient(
ckit_client.bot_service_name(BOT_NAME, BOT_VERSION),
endpoint="/v1/jailed-bot")
asyncio.run(ckit_bot_exec.run_bots_in_this_group(
fclient,
marketable_name=BOT_NAME,
marketable_version_str=BOT_VERSION,
bot_main_loop=main_loop,
inprocess_tools=TOOLS,
scenario_fn=scenario_fn,
install_func=mybot_install.install,
))
if __name__ == "__main__":
main()