ckit_bot_exec
The ckit_bot_exec module provides the core infrastructure for running Flexus bots: the RobotContext class, event handler decorators, and the main execution loop.
RobotContext
RobotContext (commonly named rcx) is the central object for your bot. It provides:
- Access to persona configuration
- Event handler registration
- Thread and task tracking
- Working directory for temporary files
Key Attributes
| Attribute | Type | Description |
|---|---|---|
rcx.persona | FPersonaOutput | Bot’s persona configuration |
rcx.latest_threads | Dict[str, FThreadOutput] | Active threads by ID |
rcx.latest_tasks | Dict[str, FPersonaKanbanTaskOutput] | Kanban tasks by ID |
rcx.workdir | str | Temporary directory path |
rcx.fclient | FlexusClient | GraphQL client |
Persona Fields
rcx.persona contains:
| Field | Type | Description |
|---|---|---|
persona_id | str | Unique persona ID |
persona_setup | dict | User’s setup values |
ws_id | str | Workspace ID |
ws_root_group_id | str | Root group for policy docs |
persona_marketable_name | str | Bot type name |
persona_daily_budget | int | Daily coin budget |
persona_daily_coins | int | Coins spent today |
Event Handlers
Register handlers using decorators on rcx:
@rcx.on_tool_call(tool_name)
Called when the LLM invokes one of your tools.
@rcx.on_tool_call("my_tool")async def handle_my_tool(toolcall: FCloudtoolCall, args: dict) -> str: """ Args: toolcall: Contains fcall_id, fcall_ft_id (thread), confirmed_by_human, etc. args: The arguments the LLM provided (already parsed from JSON)
Returns: str: Result text that LLM will see OR List[dict]: Multi-modal response [{m_type: "image/png", m_content: "base64..."}]
Raises: WaitForSubchats: To spawn subchats NeedsConfirmation: To request human approval """ result = do_work(args["input"]) return f"Result: {result}"Every tool in your TOOLS list must have a corresponding handler. Missing handlers cause tool calls to be silently ignored.
@rcx.on_updated_message
Called when a message is created or updated.
@rcx.on_updated_messageasync def on_message(msg: FThreadMessageOutput): """ Fields: msg.ftm_id: Message ID msg.ftm_ft_id: Thread ID msg.ftm_role: "user", "assistant", "tool", "cd_instruction" msg.ftm_content: Message content (may be None for tool calls) msg.ftm_tool_calls: List of tool calls (for assistant messages) msg.ftm_provenance: Metadata (model, timing, kernel logs) """ if msg.ftm_role == "user": print(f"User said: {msg.ftm_content}")@rcx.on_updated_thread
Called when a thread state changes.
@rcx.on_updated_threadasync def on_thread(thread: FThreadOutput): """ Fields: thread.ft_id: Thread ID thread.ft_title: Thread title thread.ft_need_assistant: >0 if waiting for LLM thread.ft_need_tool_calls: >0 if waiting for tools thread.ft_need_user: >0 if waiting for user thread.ft_error: Error message if any thread.ft_coins: Coins spent thread.ft_budget: Budget limit """ if thread.ft_error: print(f"Thread error: {thread.ft_error}")@rcx.on_updated_task
Called when a kanban task changes.
@rcx.on_updated_taskasync def on_task(task: FPersonaKanbanTaskOutput): """ Fields: task.ktask_id: Task ID task.ktask_column: "inbox", "todo", "inprogress", "done" task.ktask_title: Task title task.ktask_description: Task description task.ktask_payload_json: Additional data """ if task.ktask_column == "done": print(f"Task completed: {task.ktask_title}")@rcx.on_erp_change(table_name)
Called when an ERP table row changes.
@rcx.on_erp_change("crm_contact")async def on_contact_change(action: str, new_record, old_record): """ Args: action: "INSERT", "UPDATE", "DELETE", or "ARCHIVE" new_record: The new row data (typed dataclass from erp_schema) old_record: The old row data (for UPDATE/DELETE/ARCHIVE)
Note: ARCHIVE means soft delete (archived_ts: 0 -> >0) """ if action == "INSERT": print(f"New contact: {new_record.contact_first_name}")To receive ERP events, specify tables in run_bots_in_this_group:
asyncio.run(ckit_bot_exec.run_bots_in_this_group( fclient, ..., subscribe_to_erp_tables=["crm_contact", "crm_deal"],))Main Loop
unpark_collected_events()
The core method for processing events:
while not ckit_shutdown.shutdown_event.is_set(): await rcx.unpark_collected_events(sleep_if_no_work=10.0)This method:
- Processes all queued events sequentially
- Calls your registered handlers
- Catches exceptions (logs but doesn’t crash)
- Sleeps for
sleep_if_no_workseconds if queue is empty
Alternative: ckit_shutdown.wait()
For bots that don’t need event processing:
while True: # Do periodic work await some_periodic_task()
# Sleep with shutdown check if await ckit_shutdown.wait(120): # 120 seconds breakNever use asyncio.sleep() directly. Use ckit_shutdown.wait() or unpark_collected_events() to ensure quick shutdown when needed.
Setup Mixing
Merge default setup with user overrides:
setup = ckit_bot_exec.official_setup_mixing_procedure( mybot_install.mybot_setup_schema, # Default values rcx.persona.persona_setup # User overrides)
# Now use setup["some_key"]api_key = setup["API_KEY"]Entry Point Functions
parse_bot_args()
Parse command-line arguments:
scenario_fn = ckit_bot_exec.parse_bot_args()Supported arguments:
--scenario path/to/scenario.yaml— Run scenario test--group group_id— Run for specific group
run_bots_in_this_group()
Main execution function:
asyncio.run(ckit_bot_exec.run_bots_in_this_group( fclient, marketable_name="mybot", marketable_version_str="1.0.0", bot_main_loop=mybot_main_loop, inprocess_tools=TOOLS, scenario_fn=scenario_fn, install_func=mybot_install.install, subscribe_to_erp_tables=["crm_contact"], # Optional))| Parameter | Required | Description |
|---|---|---|
fclient | Yes | FlexusClient instance |
marketable_name | Yes | Bot’s marketplace name |
marketable_version_str | Yes | Semantic version |
bot_main_loop | Yes | Your async main loop function |
inprocess_tools | Yes | List of CloudTool objects |
scenario_fn | No | Scenario filename from args |
install_func | No | Installation function |
subscribe_to_erp_tables | No | ERP tables to watch |
FCloudtoolCall Structure
When your tool handler is called:
| Field | Type | Description |
|---|---|---|
fcall_id | str | Unique call ID |
fcall_ft_id | str | Thread ID |
fcall_name | str | Tool name |
fcall_arguments | str | JSON arguments (already parsed for you) |
ws_id | str | Workspace ID |
connected_persona_id | str | Bot’s persona ID |
confirmed_by_human | bool | True if user confirmed |
Complete Example
import asynciofrom flexus_client_kit import ckit_client, ckit_bot_exec, ckit_cloudtool, ckit_shutdownfrom flexus_client_kit import ckit_mongofrom pymongo import AsyncMongoClientfrom mybot import mybot_install
BOT_NAME = "mybot"BOT_VERSION = "1.0.0"
MY_TOOL = ckit_cloudtool.CloudTool( strict=True, name="my_tool", description="Does something", parameters={ "type": "object", "properties": {"input": {"type": "string"}}, "required": ["input"], "additionalProperties": False })
TOOLS = [MY_TOOL]
async def main_loop(fclient, rcx): setup = ckit_bot_exec.official_setup_mixing_procedure( mybot_install.mybot_setup_schema, rcx.persona.persona_setup)
# MongoDB setup mongo_conn = await ckit_mongo.mongo_fetch_creds(fclient, rcx.persona.persona_id) mongo = AsyncMongoClient(mongo_conn) db = mongo[rcx.persona.persona_id + "_db"]
@rcx.on_tool_call(MY_TOOL.name) async def handle(toolcall, args): await db.logs.insert_one({"input": args["input"]}) return f"Processed: {args['input']}"
@rcx.on_updated_task async def on_task(task): if task.ktask_column == "inprogress": print(f"Working on: {task.ktask_title}")
try: while not ckit_shutdown.shutdown_event.is_set(): await rcx.unpark_collected_events(sleep_if_no_work=10.0) finally: mongo.close()
def main(): scenario_fn = ckit_bot_exec.parse_bot_args() fclient = ckit_client.FlexusClient( ckit_client.bot_service_name(BOT_NAME, BOT_VERSION), endpoint="/v1/jailed-bot")
asyncio.run(ckit_bot_exec.run_bots_in_this_group( fclient, marketable_name=BOT_NAME, marketable_version_str=BOT_VERSION, bot_main_loop=main_loop, inprocess_tools=TOOLS, scenario_fn=scenario_fn, install_func=mybot_install.install, ))
if __name__ == "__main__": main()