Architecture Overview
Flexus is built on a modern, scalable microservices architecture designed for high performance, reliability, and extensibility. This guide provides a comprehensive overview of how all components work together.
High-Level Architecture
Flexus Platform Architecture
Components
Connections
Core Components
Frontend Layer
Technology Stack: Nuxt.js 3 + TypeScript + PrimeVue
- Purpose: Provides the user interface for all Flexus interactions
- Features: Real-time updates, responsive design, rich interactions
- Communication: GraphQL over HTTP/WebSocket with the backend
<!-- KanbanBoard.vue -->
<template>
<div class="kanban-board">
<KanbanColumn
v-for="column in columns"
:key="column.id"
:column="column"
@task-moved="handleTaskMove"
/>
</div>
</template>
<script setup lang="ts">
import { useSubscription } from 'villus';
// Real-time task updates via GraphQL subscription
const { data: taskUpdates } = useSubscription(`
subscription TaskUpdates($boardId: ID!) {
taskUpdates(boardId: $boardId) {
task_id
task_status
task_assignee_id
}
}
`);
</script> // stores/tasks.ts - Pinia store
export const useTasksStore = defineStore('tasks', {
state: () => ({
tasks: [] as Task[],
loading: false,
error: null
}),
actions: {
async fetchTasks(boardId: string) {
this.loading = true;
try {
const { data } = await execute(GetTasksQuery, { boardId });
this.tasks = data.tasks;
} catch (error) {
this.error = error;
console.error('TASKS FETCH FAILED:', error);
} finally {
this.loading = false;
}
}
}
}); Backend Services
Technology Stack: Python + Strawberry GraphQL + Prisma ORM
GraphQL API Server
- Responsibility: Central API gateway for all client requests
- Features: Query/Mutation/Subscription support, real-time updates
- Scalability: Horizontally scalable, stateless design
@strawberry.type
class TaskMutation:
@strawberry.mutation
async def create_task(
self,
info: Info,
input: TaskInput
) -> TaskOutput:
# Authorization check
fuser_id, sid, flags = await dig_out_sesh(info)
# Create task in database
task = await prisma.task.create({
"task_title": input.task_title,
"task_description": input.task_description,
"owner_fuser_id": fuser_id,
"located_fgroup_id": input.fgroup_id
})
# Publish event for real-time updates
await publish_task_event("task_created", task)
return TaskOutput.from_prisma(task) @strawberry.type
class TaskSubscription:
@strawberry.subscription
async def task_updates(
self,
info: Info,
board_id: str
) -> AsyncGenerator[TaskUpdate, None]:
# Subscribe to Redis pubsub
channel = f"board:{board_id}:tasks"
async with redis_client.pubsub() as pubsub:
await pubsub.subscribe(channel)
async for message in pubsub.listen():
if message["type"] == "message":
data = json.loads(message["data"])
yield TaskUpdate.from_dict(data) Bot Engine
- Purpose: Executes automated workflows and bots
- Architecture: Event-driven with Redis pub/sub
- Scalability: Multiple workers for parallel execution
Bots in Flexus follow an event-driven architecture. When events occur (like task creation or user actions), theyโre published to Redis. Bot workers subscribe to relevant events and execute appropriate actions.
Data Layer
PostgreSQL Database
- Schema Management: Prisma ORM with migration support
- Features: ACID compliance, complex queries, triggers
- Optimization: Connection pooling, query optimization
// prisma/schema.prisma
model flexus_task {
task_id String @id @default(cuid())
task_title String
task_description String?
task_status String @default("todo")
task_priority Int @default(0)
task_created_ts DateTime @default(now())
task_updated_ts DateTime @updatedAt
// Tangible object fields
owner_fuser_id String
owner_shared Boolean @default(false)
located_fgroup_id String
// Relations
owner_user flexus_user @relation(fields: [owner_fuser_id], references: [fuser_id])
group flexus_group @relation(fields: [located_fgroup_id], references: [fgroup_id])
@@index([located_fgroup_id])
@@index([task_status])
} -- SQL trigger for news generation
CREATE OR REPLACE FUNCTION news_from_task_update()
RETURNS TRIGGER AS $$
BEGIN
INSERT INTO flexus_workspace_news (
fws_id,
news_type,
news_payload_json,
news_created_ts
) VALUES (
(SELECT ws_id FROM flexus_group WHERE fgroup_id = NEW.located_fgroup_id),
'task_updated',
json_build_object(
'task_id', NEW.task_id,
'task_title', NEW.task_title,
'old_status', OLD.task_status,
'new_status', NEW.task_status
),
NOW()
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql; Redis Cache & Pub/Sub
- Caching: Session data, frequently accessed objects
- Pub/Sub: Real-time event distribution
- Performance: Sub-millisecond response times
Data Flow Patterns
Request-Response Pattern
Used for operations that require immediate confirmation, like creating tasks or updating user profiles.
- Client Request: Frontend sends GraphQL mutation
- Authentication: API gateway validates user session
- Business Logic: GraphQL resolver processes request
- Data Persistence: Changes saved to PostgreSQL
- Response: Confirmation sent back to client
Event-Driven Pattern
Used for operations that can be processed in the background, like bot executions or notifications.
- Event Trigger: Database trigger or explicit event
- Event Publishing: Event published to Redis
- Event Processing: Background workers consume events
- Side Effects: Bots execute, notifications sent
- State Updates: Results published for real-time updates
Subscription Pattern
Provides live updates to connected clients without polling.
- Client Subscribe: Frontend opens WebSocket subscription
- Event Listening: GraphQL subscription listens to Redis
- Event Broadcast: Changes published to relevant channels
- Client Update: Frontend receives and applies updates
Security Architecture
Authentication & Authorization
async def dig_out_sesh(info: Info) -> tuple[str, str, int]:
"""Extract and validate user session"""
# Get session token from headers
auth_header = info.context.get("authorization")
if not auth_header:
raise PermissionError("No authorization header")
token = auth_header.replace("Bearer ", "")
# Validate session in Redis
session_data = await redis_client.get(f"session:{token}")
if not session_data:
raise PermissionError("Invalid or expired session")
session = json.loads(session_data)
return session["fuser_id"], token, session["flags"] // The actual roles are defined as bitmasks
ROLE_READ = 0x0001
ROLE_WRITE = 0x0002
ROLE_MODERATOR = 0x0004
async def validate_user_can_access_group(
fuser_id: str,
fgroup_id: str,
need_role: int, // e.g., ROLE_WRITE
want_403: bool,
) -> Optional[prisma_stubs.models.flexus_permissions_unrecursive]:
"""Checks if a user has the required role in a group using bitmasks."""
# Look up the user's permission record for the group
perm_rec = await my_prisma.flexus_permissions_unrecursive.find_first(
where={"fgroup_id": fgroup_id, "fuser_id": fuser_id}
)
# Check if the user has the required role using a bitwise AND
if not perm_rec or not (perm_rec.perm_roles & need_role):
if want_403:
raise HTTPException(status_code=403, detail="Permission denied.")
return None
return perm_rec Data Protection
- Encryption: TLS for all communications
- Secrets Management: Environment-based configuration
- Audit Trail: Complete audit log for all operations
- Access Control: Role-based permissions (RBAC)
Integration Points
External APIs
- REST APIs: Standard HTTP integration
- GraphQL APIs: Native GraphQL federation
- WebHooks: Event-driven integrations
- MCP Servers: Model Context Protocol support
Marketplace Architecture
- Plugin System: Sandboxed execution environment
- Revenue Sharing: Automated payment processing
- Version Management: Semantic versioning support
- Security Scanning: Automated vulnerability detection
This architecture provides a solid foundation for scalable, maintainable, and secure operations. For implementation details, explore our API Reference.