CorpusIQ Architecture Overview
Technical architecture documentation for CorpusIQ Apps SDK MCP Server.
High-Level Architecture
┌─────────────────────────────────────────────────────────────────┐
│ ChatGPT / AI Client │
│ (MCP Protocol Client) │
└────────────────────────────┬────────────────────────────────────┘
│ HTTPS / MCP Protocol
│
┌────────────────────────────▼────────────────────────────────────┐
│ CorpusIQ MCP Server │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ FastAPI Application │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ MCP Routes │ │ OAuth Routes │ │ API Routes │ │ │
│ │ └──────┬───────┘ └──────┬────────┘ └──────┬───────┘ │ │
│ │ │ │ │ │ │
│ │ ┌──────▼──────────────────▼───────────────────▼───────┐ │ │
│ │ │ Middleware Layer │ │ │
│ │ │ • Rate Limiting • CORS • Auth • Logging │ │ │
│ │ └──────────────────────────────────────────────────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ MCP Server Core │ │
│ │ ┌────────────┐ ┌────────────┐ ┌──────────────────┐ │ │
│ │ │ Tool │ │ Resource │ │ Widget │ │ │
│ │ │ Handlers │ │ Management │ │ Rendering │ │ │
│ │ └────────────┘ └────────────┘ └──────────────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Connector Layer │ │
│ │ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ │ │
│ │ │Gmail │ │Drive │ │O365 │ │ QB │ │Slack │ ... │ │
│ │ └──────┘ └──────┘ └──────┘ └──────┘ └──────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
└───────────────────────────┬──────────────────────────────────────┘
│ OAuth / API Calls
│
┌───────────────────────────▼──────────────────────────────────────┐
│ External Data Sources │
│ Gmail • Drive • OneDrive • QuickBooks • Salesforce • Slack... │
└──────────────────────────────────────────────────────────────────┘
Component Details
1. FastAPI Application Layer
Location: src/corpusiq/app.py
Responsibilities:
- HTTP request handling
- Routing to appropriate handlers
- Middleware coordination
- Response formatting
Key Components:
# Main application factory
def build_app() -> FastAPI:
app = FastAPI(title="CorpusIQ Apps SDK")
# Add middleware
app.add_middleware(CORSMiddleware, ...)
app.add_middleware(RateLimitMiddleware, ...)
app.add_middleware(SecurityHeadersMiddleware, ...)
app.add_middleware(LoggingMiddleware, ...)
# Mount routers
app.mount("/api", api_app)
app.post("/mcp", mcp_endpoint)
return app
Design Patterns:
- Application Factory:
build_app()creates configured instances - Dependency Injection: FastAPI’s DI system for shared resources
- Middleware Pipeline: Layered request/response processing
2. MCP Server Core
Location: src/corpusiq/mcp_server.py
Responsibilities:
- MCP protocol implementation
- Tool definition and registration
- Tool invocation handling
- Widget resource management
Architecture:
# Tool registration
mcp = FastMCP("corpusiq")
@mcp.tool()
def corpus_search(query: str, max_results: int = 5):
"""Search across connected data sources."""
# Input validation
args = CorpusSearchArgs(query=query, max_results=max_results)
# Business logic
results = search_all_connectors(args.query, args.max_results)
# Format response with widget
return format_results_with_widget(results)
Key Patterns:
- Decorator Pattern:
@mcp.tool()for tool registration - Schema-First Design: Pydantic models define inputs
- Factory Functions: Cached resource generation
- Separation of Concerns: Tool logic separate from protocol handling
3. Middleware Layer
Location: src/corpusiq/app.py (middleware definitions)
Middleware Stack (execution order):
- LoggingMiddleware: Request/response logging
- SecurityHeadersMiddleware: Add security headers
- CORSMiddleware: Handle CORS preflight and headers
- RateLimitMiddleware: Rate limiting per IP
- AuthMiddleware (production): Token validation
Rate Limiting Implementation:
class RateLimitMiddleware:
def __init__(self, app, requests_per_minute: int):
self.app = app
self.limit = requests_per_minute
self.cache = {} # In-memory for single instance
async def __call__(self, scope, receive, send):
if scope["type"] == "http":
client_ip = self._get_client_ip(scope)
# Check rate limit
if self._is_rate_limited(client_ip):
response = JSONResponse(
{"error": "Rate limit exceeded"},
status_code=429,
headers={"Retry-After": "60"}
)
await response(scope, receive, send)
return
await self.app(scope, receive, send)
For multi-instance deployments, use Redis:
import redis
class RedisRateLimitMiddleware:
def __init__(self, app, redis_url: str, rpm: int):
self.app = app
self.redis = redis.from_url(redis_url)
self.rpm = rpm
4. Connector Layer
Concept: Abstraction over external data sources
Interface:
class Connector(Protocol):
"""Base interface for data source connectors."""
async def connect(self, oauth_token: str) -> bool:
"""Establish connection with OAuth token."""
...
async def search(self, query: str, max_results: int) -> List[SearchResult]:
"""Search this data source."""
...
async def disconnect(self) -> bool:
"""Disconnect and cleanup."""
...
Example Implementation:
class GmailConnector:
def __init__(self):
self.service = None
async def connect(self, oauth_token: str) -> bool:
credentials = Credentials(token=oauth_token)
self.service = build('gmail', 'v1', credentials=credentials)
return True
async def search(self, query: str, max_results: int) -> List[SearchResult]:
results = self.service.users().messages().list(
userId='me',
q=query,
maxResults=max_results
).execute()
return [self._parse_message(msg) for msg in results.get('messages', [])]
Connector Registry:
CONNECTORS = {
'gmail': GmailConnector,
'drive': DriveConnector,
'onedrive': OneDriveConnector,
# ...
}
def get_connector(name: str) -> Connector:
"""Factory function for connectors."""
connector_class = CONNECTORS.get(name)
if not connector_class:
raise ValueError(f"Unknown connector: {name}")
return connector_class()
5. Widget System
Purpose: Rich HTML interface for results display
Architecture:
┌─────────────────────────────────────────────────┐
│ Widget HTML Template │
│ (assets/corpusiq.html) │
│ │
│ • Embedded CSS (no external dependencies) │
│ • Vanilla JavaScript (no frameworks) │
│ • Responsive design │
│ • Accessibility features │
└─────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────┐
│ Widget Loading (Cached) │
│ │
│ @cache │
│ def _load_widget_html() -> str: │
│ return Path("assets/corpusiq.html") │
│ .read_text() │
└─────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────┐
│ MCP Resource Embedding │
│ │
│ EmbeddedResource( │
│ type="resource", │
│ resource=TextResourceContents( │
│ uri="ui://widget/corpusiq.html", │
│ mimeType="text/html+skybridge", │
│ text=widget_html │
│ ) │
│ ) │
└─────────────────────────────────────────────────┘
Widget Communication:
// Widget receives data from MCP server
window.addEventListener('message', (event) => {
if (event.data.type === 'search_results') {
displayResults(event.data.results);
}
});
// Widget sends actions back
function connectSource(sourceName) {
window.parent.postMessage({
type: 'connect_source',
source: sourceName
}, '*');
}
Data Flow
Search Request Flow
1. User asks ChatGPT: "Search my corpus for reports"
│
▼
2. ChatGPT sends MCP request:
POST /mcp
{
"method": "tools/call",
"params": {
"name": "corpus_search",
"arguments": {"query": "reports", "maxResults": 5}
}
}
│
▼
3. FastAPI receives request
│
▼
4. Middleware pipeline:
• Logging: Log request with ID
• CORS: Validate origin
• Rate Limit: Check IP quota
• Auth: Validate token (production)
│
▼
5. MCP Handler processes:
• Parse JSON-RPC request
• Validate tool name
• Extract arguments
│
▼
6. Tool Handler (corpus_search):
• Validate inputs (Pydantic)
• Query all connected connectors
• Aggregate results
• Rank by relevance
│
▼
7. Connector Layer:
Gmail.search("reports") ─┐
Drive.search("reports") ─┼─→ Parallel execution
Slack.search("reports") ─┘
│
▼
8. Format Response:
• Combine results
• Load widget HTML
• Embed in MCP response
• Add metadata
│
▼
9. Return to ChatGPT:
{
"result": {
"content": [
{"type": "text", "text": "Found 5 results..."}
],
"isError": false
}
}
│
▼
10. ChatGPT displays widget to user
Security Architecture
Defense in Depth
Layer 1: Network
- HTTPS/TLS encryption
- CORS restrictions
- Rate limiting
Layer 2: Authentication
- OAuth 2.1 tokens
- JWT validation
- Token expiration
Layer 3: Authorization
- Scope-based access
- Connector permissions
- User context
Layer 4: Input Validation
- Pydantic models
- Length limits
- Type checking
- Sanitization
Layer 5: Output Encoding
- HTML escaping
- JSON encoding
- Safe error messages
Layer 6: Logging & Monitoring
- Request logging
- Error tracking
- Audit trails
- Anomaly detection
OAuth Flow
1. User clicks "Connect Gmail"
│
▼
2. ChatGPT redirects to:
GET /.well-known/oauth-authorization-server
│
▼
3. CorpusIQ returns OAuth metadata
│
▼
4. ChatGPT constructs authorization URL
│
▼
5. User redirected to OAuth provider (Google)
│
▼
6. User authorizes access
│
▼
7. OAuth provider redirects to ChatGPT with code
│
▼
8. ChatGPT exchanges code for token
│
▼
9. ChatGPT uses token in subsequent MCP requests:
Authorization: Bearer <token>
│
▼
10. CorpusIQ validates token with OAuth provider
│
▼
11. Request processed with user context
Scalability Patterns
Horizontal Scaling
┌──────────────┐
│ Load Balancer│
└───────┬──────┘
│
┌───────────────┼───────────────┐
│ │ │
┌─────▼─────┐ ┌────▼─────┐ ┌────▼─────┐
│Instance 1 │ │Instance 2│ │Instance 3│
└───────────┘ └──────────┘ └──────────┘
│ │ │
└───────────────┼───────────────┘
│
┌───────▼────────┐
│ Shared Redis │
│ (Rate Limiting)│
└────────────────┘
Configuration:
# Use Redis for shared state
REDIS_URL = os.getenv("REDIS_URL")
if REDIS_URL:
app.add_middleware(RedisRateLimitMiddleware, redis_url=REDIS_URL)
else:
app.add_middleware(RateLimitMiddleware) # In-memory, single instance
Caching Strategy
from functools import cache, lru_cache
# Widget HTML (static, cache indefinitely)
@cache
def _load_widget_html() -> str:
return Path("assets/corpusiq.html").read_text()
# Connector results (cache with TTL)
@lru_cache(maxsize=1000)
def get_cached_search_results(query: str, connector: str) -> List[Result]:
# Cache expires after N seconds
pass
Configuration Management
Settings Hierarchy
- Defaults (in code)
- Environment variables (
.envfile) - Runtime overrides (for testing)
Implementation:
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
# Core settings with defaults
cors_allow_origins_csv: str = ""
debug_mode: bool = False
log_level: str = "INFO"
rate_limit_requests_per_minute: int = 60
# OAuth settings
oauth_resource_url: str = "https://example.com"
oauth_issuer: str = "https://auth.example.com"
class Config:
env_prefix = "CORPUSIQ_"
env_file = ".env"
env_file_encoding = "utf-8"
# Singleton instance
settings = Settings()
Error Handling
Error Hierarchy
Exception
│
├─ CorpusIQError (base)
│ │
│ ├─ ValidationError (input validation)
│ ├─ AuthenticationError (auth failures)
│ ├─ RateLimitError (rate limit exceeded)
│ ├─ ConnectorError (connector failures)
│ └─ ConfigurationError (config issues)
│
└─ External errors (caught and wrapped)
Error Response Format
{
"jsonrpc": "2.0",
"id": "request-id",
"error": {
"code": -32603, # JSON-RPC error code
"message": "Internal error",
"data": {
"type": "ValidationError",
"detail": "Query too long",
"request_id": "abc123"
}
}
}
Testing Architecture
Test Pyramid
┌──────────┐
│ E2E │ (Few, slow)
└──────────┘
┌──────────────┐
│ Integration │ (Some, medium)
└──────────────┘
┌────────────────────┐
│ Unit Tests │ (Many, fast)
└────────────────────┘
Test Structure:
tests/
├── unit/
│ ├── test_mcp_server.py # Tool logic
│ ├── test_validation.py # Input validation
│ └── test_connectors.py # Connector logic
├── integration/
│ ├── test_api_endpoints.py # API integration
│ ├── test_oauth_flow.py # OAuth integration
│ └── test_widget_loading.py # Widget integration
└── e2e/
└── test_full_search.py # End-to-end scenarios
Deployment Architecture
Production Setup
┌──────────────┐
Internet ────────│ CDN/WAF │
└──────┬───────┘
│
┌──────▼───────┐
│ Nginx │ (SSL termination, reverse proxy)
└──────┬───────┘
│
┌──────▼───────┐
│Load Balancer │
└──────┬───────┘
│
┌──────────────┼──────────────┐
│ │ │
┌────▼────┐ ┌───▼─────┐ ┌───▼─────┐
│CorpusIQ│ │CorpusIQ │ │CorpusIQ │
│Instance1│ │Instance2│ │Instance3│
└─────────┘ └─────────┘ └─────────┘
│ │ │
└──────────────┼──────────────┘
│
┌─────────┴─────────┐
│ │
┌────▼────┐ ┌────▼────┐
│ Redis │ │Monitoring│
└─────────┘ └─────────┘
Performance Optimization
Key Metrics
- Request latency: p50, p95, p99
- Throughput: requests/second
- Error rate: 5xx responses
- Resource usage: CPU, memory
- Connector latency: per-source response time
Optimization Techniques
- Async operations: Use
async/awaitfor I/O - Connection pooling: Reuse HTTP connections
- Caching: Cache static content and frequent queries
- Lazy loading: Load resources only when needed
- Batch operations: Combine multiple API calls
- CDN: Serve static assets from CDN
Future Enhancements
Planned Improvements
- Persistent storage: Database for user preferences
- Advanced search: Full-text search with Elasticsearch
- Real-time updates: WebSocket support
- Batch operations: Bulk actions on search results
- Analytics: Usage metrics and insights
- Plugin system: Third-party connector plugins
For implementation details, see: