Skip to content

Architecture Overview

Flexus is built on a modern, scalable microservices architecture designed for high performance, reliability, and extensibility. This guide provides a comprehensive overview of how all components work together.

High-Level Architecture

Flexus Platform Architecture

HTTP/WS GraphQL Bot Control Task Ops MCP Calls Persist Events Integrations Pub/Sub Web UI (Nuxt.js) API Gateway GraphQL Server Bot Engine Task Manager MCP Server Notifications PostgreSQL Redis External APIs
Core components and their interactions in the Flexus ecosystem

Components

Service
Database
External
UI
API

Connections

Data Flow
API Call
Event
WebSocket

Core Components

Frontend Layer

Technology Stack: Nuxt.js 3 + TypeScript + PrimeVue

  • Purpose: Provides the user interface for all Flexus interactions
  • Features: Real-time updates, responsive design, rich interactions
  • Communication: GraphQL over HTTP/WebSocket with the backend
<!-- KanbanBoard.vue -->
<template>
<div class="kanban-board">
  <KanbanColumn
    v-for="column in columns"
    :key="column.id"
    :column="column"
    @task-moved="handleTaskMove"
  />
</div>
</template>

<script setup lang="ts">
import { useSubscription } from 'villus';

// Real-time task updates via GraphQL subscription
const { data: taskUpdates } = useSubscription(`
subscription TaskUpdates($boardId: ID!) {
  taskUpdates(boardId: $boardId) {
    task_id
    task_status
    task_assignee_id
  }
}
`);
</script>
// stores/tasks.ts - Pinia store
export const useTasksStore = defineStore('tasks', {
state: () => ({
  tasks: [] as Task[],
  loading: false,
  error: null
}),

actions: {
  async fetchTasks(boardId: string) {
    this.loading = true;
    try {
      const { data } = await execute(GetTasksQuery, { boardId });
      this.tasks = data.tasks;
    } catch (error) {
      this.error = error;
      console.error('TASKS FETCH FAILED:', error);
    } finally {
      this.loading = false;
    }
  }
}
});

Backend Services

Technology Stack: Python + Strawberry GraphQL + Prisma ORM

GraphQL API Server

  • Responsibility: Central API gateway for all client requests
  • Features: Query/Mutation/Subscription support, real-time updates
  • Scalability: Horizontally scalable, stateless design
@strawberry.type
class TaskMutation:
  @strawberry.mutation
  async def create_task(
      self, 
      info: Info, 
      input: TaskInput
  ) -> TaskOutput:
      # Authorization check
      fuser_id, sid, flags = await dig_out_sesh(info)
      
      # Create task in database
      task = await prisma.task.create({
          "task_title": input.task_title,
          "task_description": input.task_description,
          "owner_fuser_id": fuser_id,
          "located_fgroup_id": input.fgroup_id
      })
      
      # Publish event for real-time updates
      await publish_task_event("task_created", task)
      
      return TaskOutput.from_prisma(task)
@strawberry.type  
class TaskSubscription:
  @strawberry.subscription
  async def task_updates(
      self, 
      info: Info, 
      board_id: str
  ) -> AsyncGenerator[TaskUpdate, None]:
      # Subscribe to Redis pubsub
      channel = f"board:{board_id}:tasks"
      
      async with redis_client.pubsub() as pubsub:
          await pubsub.subscribe(channel)
          
          async for message in pubsub.listen():
              if message["type"] == "message":
                  data = json.loads(message["data"])
                  yield TaskUpdate.from_dict(data)

Bot Engine

  • Purpose: Executes automated workflows and bots
  • Architecture: Event-driven with Redis pub/sub
  • Scalability: Multiple workers for parallel execution
Bot Execution Model

Bots in Flexus follow an event-driven architecture. When events occur (like task creation or user actions), theyโ€™re published to Redis. Bot workers subscribe to relevant events and execute appropriate actions.

Data Layer

PostgreSQL Database

  • Schema Management: Prisma ORM with migration support
  • Features: ACID compliance, complex queries, triggers
  • Optimization: Connection pooling, query optimization
// prisma/schema.prisma
model flexus_task {
task_id              String   @id @default(cuid())
task_title           String
task_description     String?
task_status          String   @default("todo")
task_priority        Int      @default(0)
task_created_ts      DateTime @default(now())
task_updated_ts      DateTime @updatedAt

// Tangible object fields
owner_fuser_id       String
owner_shared         Boolean  @default(false)
located_fgroup_id    String

// Relations
owner_user           flexus_user  @relation(fields: [owner_fuser_id], references: [fuser_id])
group                flexus_group @relation(fields: [located_fgroup_id], references: [fgroup_id])

@@index([located_fgroup_id])
@@index([task_status])
}
-- SQL trigger for news generation
CREATE OR REPLACE FUNCTION news_from_task_update()
RETURNS TRIGGER AS $$
BEGIN
INSERT INTO flexus_workspace_news (
  fws_id,
  news_type,
  news_payload_json,
  news_created_ts
) VALUES (
  (SELECT ws_id FROM flexus_group WHERE fgroup_id = NEW.located_fgroup_id),
  'task_updated',
  json_build_object(
    'task_id', NEW.task_id,
    'task_title', NEW.task_title,
    'old_status', OLD.task_status,
    'new_status', NEW.task_status
  ),
  NOW()
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

Redis Cache & Pub/Sub

  • Caching: Session data, frequently accessed objects
  • Pub/Sub: Real-time event distribution
  • Performance: Sub-millisecond response times

Data Flow Patterns

Request-Response Pattern

Synchronous Operations

Used for operations that require immediate confirmation, like creating tasks or updating user profiles.

  1. Client Request: Frontend sends GraphQL mutation
  2. Authentication: API gateway validates user session
  3. Business Logic: GraphQL resolver processes request
  4. Data Persistence: Changes saved to PostgreSQL
  5. Response: Confirmation sent back to client

Event-Driven Pattern

Asynchronous Operations

Used for operations that can be processed in the background, like bot executions or notifications.

  1. Event Trigger: Database trigger or explicit event
  2. Event Publishing: Event published to Redis
  3. Event Processing: Background workers consume events
  4. Side Effects: Bots execute, notifications sent
  5. State Updates: Results published for real-time updates

Subscription Pattern

Real-time Updates

Provides live updates to connected clients without polling.

  1. Client Subscribe: Frontend opens WebSocket subscription
  2. Event Listening: GraphQL subscription listens to Redis
  3. Event Broadcast: Changes published to relevant channels
  4. Client Update: Frontend receives and applies updates

Security Architecture

Authentication & Authorization

async def dig_out_sesh(info: Info) -> tuple[str, str, int]:
  """Extract and validate user session"""
  # Get session token from headers
  auth_header = info.context.get("authorization")
  if not auth_header:
      raise PermissionError("No authorization header")
  
  token = auth_header.replace("Bearer ", "")
  
  # Validate session in Redis
  session_data = await redis_client.get(f"session:{token}")
  if not session_data:
      raise PermissionError("Invalid or expired session")
  
  session = json.loads(session_data)
  return session["fuser_id"], token, session["flags"]
// The actual roles are defined as bitmasks
ROLE_READ      = 0x0001
ROLE_WRITE     = 0x0002
ROLE_MODERATOR = 0x0004

async def validate_user_can_access_group(
  fuser_id: str,
  fgroup_id: str,
  need_role: int, // e.g., ROLE_WRITE
  want_403: bool,
) -> Optional[prisma_stubs.models.flexus_permissions_unrecursive]:
  """Checks if a user has the required role in a group using bitmasks."""
  
  # Look up the user's permission record for the group
  perm_rec = await my_prisma.flexus_permissions_unrecursive.find_first(
      where={"fgroup_id": fgroup_id, "fuser_id": fuser_id}
  )

  # Check if the user has the required role using a bitwise AND
  if not perm_rec or not (perm_rec.perm_roles & need_role):
      if want_403:
          raise HTTPException(status_code=403, detail="Permission denied.")
      return None
      
  return perm_rec

Data Protection

  • Encryption: TLS for all communications
  • Secrets Management: Environment-based configuration
  • Audit Trail: Complete audit log for all operations
  • Access Control: Role-based permissions (RBAC)

Integration Points

External APIs

  • REST APIs: Standard HTTP integration
  • GraphQL APIs: Native GraphQL federation
  • WebHooks: Event-driven integrations
  • MCP Servers: Model Context Protocol support

Marketplace Architecture

  • Plugin System: Sandboxed execution environment
  • Revenue Sharing: Automated payment processing
  • Version Management: Semantic versioning support
  • Security Scanning: Automated vulnerability detection

This architecture provides a solid foundation for scalable, maintainable, and secure operations. For implementation details, explore our API Reference.