CorpusIQ

CorpusIQ Architecture Overview

Technical architecture documentation for CorpusIQ Apps SDK MCP Server.

High-Level Architecture

┌─────────────────────────────────────────────────────────────────┐
│                         ChatGPT / AI Client                      │
│                     (MCP Protocol Client)                        │
└────────────────────────────┬────────────────────────────────────┘
                             │ HTTPS / MCP Protocol
                             │
┌────────────────────────────▼────────────────────────────────────┐
│                      CorpusIQ MCP Server                         │
│  ┌──────────────────────────────────────────────────────────┐  │
│  │                    FastAPI Application                     │  │
│  │  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐   │  │
│  │  │  MCP Routes  │  │  OAuth Routes │  │  API Routes  │   │  │
│  │  └──────┬───────┘  └──────┬────────┘  └──────┬───────┘   │  │
│  │         │                  │                   │            │  │
│  │  ┌──────▼──────────────────▼───────────────────▼───────┐  │  │
│  │  │            Middleware Layer                          │  │  │
│  │  │  • Rate Limiting  • CORS  • Auth  • Logging         │  │  │
│  │  └──────────────────────────────────────────────────────┘  │  │
│  └──────────────────────────────────────────────────────────┘  │
│                                                                  │
│  ┌──────────────────────────────────────────────────────────┐  │
│  │                    MCP Server Core                         │  │
│  │  ┌────────────┐  ┌────────────┐  ┌──────────────────┐   │  │
│  │  │ Tool       │  │ Resource   │  │  Widget          │   │  │
│  │  │ Handlers   │  │ Management │  │  Rendering       │   │  │
│  │  └────────────┘  └────────────┘  └──────────────────┘   │  │
│  └──────────────────────────────────────────────────────────┘  │
│                                                                  │
│  ┌──────────────────────────────────────────────────────────┐  │
│  │                  Connector Layer                           │  │
│  │  ┌──────┐  ┌──────┐  ┌──────┐  ┌──────┐  ┌──────┐       │  │
│  │  │Gmail │  │Drive │  │O365  │  │ QB   │  │Slack │  ...  │  │
│  │  └──────┘  └──────┘  └──────┘  └──────┘  └──────┘       │  │
│  └──────────────────────────────────────────────────────────┘  │
└───────────────────────────┬──────────────────────────────────────┘
                            │ OAuth / API Calls
                            │
┌───────────────────────────▼──────────────────────────────────────┐
│                    External Data Sources                         │
│  Gmail • Drive • OneDrive • QuickBooks • Salesforce • Slack...  │
└──────────────────────────────────────────────────────────────────┘

Component Details

1. FastAPI Application Layer

Location: src/corpusiq/app.py

Responsibilities:

  • HTTP request handling
  • Routing to appropriate handlers
  • Middleware coordination
  • Response formatting

Key Components:

# Main application factory
def build_app() -> FastAPI:
    app = FastAPI(title="CorpusIQ Apps SDK")
    
    # Add middleware
    app.add_middleware(CORSMiddleware, ...)
    app.add_middleware(RateLimitMiddleware, ...)
    app.add_middleware(SecurityHeadersMiddleware, ...)
    app.add_middleware(LoggingMiddleware, ...)
    
    # Mount routers
    app.mount("/api", api_app)
    app.post("/mcp", mcp_endpoint)
    
    return app

Design Patterns:

  • Application Factory: build_app() creates configured instances
  • Dependency Injection: FastAPI’s DI system for shared resources
  • Middleware Pipeline: Layered request/response processing

2. MCP Server Core

Location: src/corpusiq/mcp_server.py

Responsibilities:

  • MCP protocol implementation
  • Tool definition and registration
  • Tool invocation handling
  • Widget resource management

Architecture:

# Tool registration
mcp = FastMCP("corpusiq")

@mcp.tool()
def corpus_search(query: str, max_results: int = 5):
    """Search across connected data sources."""
    # Input validation
    args = CorpusSearchArgs(query=query, max_results=max_results)
    
    # Business logic
    results = search_all_connectors(args.query, args.max_results)
    
    # Format response with widget
    return format_results_with_widget(results)

Key Patterns:

  • Decorator Pattern: @mcp.tool() for tool registration
  • Schema-First Design: Pydantic models define inputs
  • Factory Functions: Cached resource generation
  • Separation of Concerns: Tool logic separate from protocol handling

3. Middleware Layer

Location: src/corpusiq/app.py (middleware definitions)

Middleware Stack (execution order):

  1. LoggingMiddleware: Request/response logging
  2. SecurityHeadersMiddleware: Add security headers
  3. CORSMiddleware: Handle CORS preflight and headers
  4. RateLimitMiddleware: Rate limiting per IP
  5. AuthMiddleware (production): Token validation

Rate Limiting Implementation:

class RateLimitMiddleware:
    def __init__(self, app, requests_per_minute: int):
        self.app = app
        self.limit = requests_per_minute
        self.cache = {}  # In-memory for single instance
        
    async def __call__(self, scope, receive, send):
        if scope["type"] == "http":
            client_ip = self._get_client_ip(scope)
            
            # Check rate limit
            if self._is_rate_limited(client_ip):
                response = JSONResponse(
                    {"error": "Rate limit exceeded"},
                    status_code=429,
                    headers={"Retry-After": "60"}
                )
                await response(scope, receive, send)
                return
        
        await self.app(scope, receive, send)

For multi-instance deployments, use Redis:

import redis

class RedisRateLimitMiddleware:
    def __init__(self, app, redis_url: str, rpm: int):
        self.app = app
        self.redis = redis.from_url(redis_url)
        self.rpm = rpm

4. Connector Layer

Concept: Abstraction over external data sources

Interface:

class Connector(Protocol):
    """Base interface for data source connectors."""
    
    async def connect(self, oauth_token: str) -> bool:
        """Establish connection with OAuth token."""
        ...
    
    async def search(self, query: str, max_results: int) -> List[SearchResult]:
        """Search this data source."""
        ...
    
    async def disconnect(self) -> bool:
        """Disconnect and cleanup."""
        ...

Example Implementation:

class GmailConnector:
    def __init__(self):
        self.service = None
        
    async def connect(self, oauth_token: str) -> bool:
        credentials = Credentials(token=oauth_token)
        self.service = build('gmail', 'v1', credentials=credentials)
        return True
        
    async def search(self, query: str, max_results: int) -> List[SearchResult]:
        results = self.service.users().messages().list(
            userId='me',
            q=query,
            maxResults=max_results
        ).execute()
        
        return [self._parse_message(msg) for msg in results.get('messages', [])]

Connector Registry:

CONNECTORS = {
    'gmail': GmailConnector,
    'drive': DriveConnector,
    'onedrive': OneDriveConnector,
    # ...
}

def get_connector(name: str) -> Connector:
    """Factory function for connectors."""
    connector_class = CONNECTORS.get(name)
    if not connector_class:
        raise ValueError(f"Unknown connector: {name}")
    return connector_class()

5. Widget System

Purpose: Rich HTML interface for results display

Architecture:

┌─────────────────────────────────────────────────┐
│            Widget HTML Template                 │
│         (assets/corpusiq.html)                  │
│                                                  │
│  • Embedded CSS (no external dependencies)      │
│  • Vanilla JavaScript (no frameworks)           │
│  • Responsive design                            │
│  • Accessibility features                       │
└─────────────────────────────────────────────────┘
                     │
                     ▼
┌─────────────────────────────────────────────────┐
│        Widget Loading (Cached)                  │
│                                                  │
│  @cache                                         │
│  def _load_widget_html() -> str:                │
│      return Path("assets/corpusiq.html")        │
│              .read_text()                       │
└─────────────────────────────────────────────────┘
                     │
                     ▼
┌─────────────────────────────────────────────────┐
│         MCP Resource Embedding                  │
│                                                  │
│  EmbeddedResource(                              │
│      type="resource",                           │
│      resource=TextResourceContents(             │
│          uri="ui://widget/corpusiq.html",      │
│          mimeType="text/html+skybridge",       │
│          text=widget_html                       │
│      )                                          │
│  )                                              │
└─────────────────────────────────────────────────┘

Widget Communication:

// Widget receives data from MCP server
window.addEventListener('message', (event) => {
    if (event.data.type === 'search_results') {
        displayResults(event.data.results);
    }
});

// Widget sends actions back
function connectSource(sourceName) {
    window.parent.postMessage({
        type: 'connect_source',
        source: sourceName
    }, '*');
}

Data Flow

Search Request Flow

1. User asks ChatGPT: "Search my corpus for reports"
                │
                ▼
2. ChatGPT sends MCP request:
   POST /mcp
   {
     "method": "tools/call",
     "params": {
       "name": "corpus_search",
       "arguments": {"query": "reports", "maxResults": 5}
     }
   }
                │
                ▼
3. FastAPI receives request
                │
                ▼
4. Middleware pipeline:
   • Logging: Log request with ID
   • CORS: Validate origin
   • Rate Limit: Check IP quota
   • Auth: Validate token (production)
                │
                ▼
5. MCP Handler processes:
   • Parse JSON-RPC request
   • Validate tool name
   • Extract arguments
                │
                ▼
6. Tool Handler (corpus_search):
   • Validate inputs (Pydantic)
   • Query all connected connectors
   • Aggregate results
   • Rank by relevance
                │
                ▼
7. Connector Layer:
   Gmail.search("reports") ─┐
   Drive.search("reports") ─┼─→ Parallel execution
   Slack.search("reports") ─┘
                │
                ▼
8. Format Response:
   • Combine results
   • Load widget HTML
   • Embed in MCP response
   • Add metadata
                │
                ▼
9. Return to ChatGPT:
   {
     "result": {
       "content": [
         {"type": "text", "text": "Found 5 results..."}
       ],
       "isError": false
     }
   }
                │
                ▼
10. ChatGPT displays widget to user

Security Architecture

Defense in Depth

Layer 1: Network

  • HTTPS/TLS encryption
  • CORS restrictions
  • Rate limiting

Layer 2: Authentication

  • OAuth 2.1 tokens
  • JWT validation
  • Token expiration

Layer 3: Authorization

  • Scope-based access
  • Connector permissions
  • User context

Layer 4: Input Validation

  • Pydantic models
  • Length limits
  • Type checking
  • Sanitization

Layer 5: Output Encoding

  • HTML escaping
  • JSON encoding
  • Safe error messages

Layer 6: Logging & Monitoring

  • Request logging
  • Error tracking
  • Audit trails
  • Anomaly detection

OAuth Flow

1. User clicks "Connect Gmail"
        │
        ▼
2. ChatGPT redirects to:
   GET /.well-known/oauth-authorization-server
        │
        ▼
3. CorpusIQ returns OAuth metadata
        │
        ▼
4. ChatGPT constructs authorization URL
        │
        ▼
5. User redirected to OAuth provider (Google)
        │
        ▼
6. User authorizes access
        │
        ▼
7. OAuth provider redirects to ChatGPT with code
        │
        ▼
8. ChatGPT exchanges code for token
        │
        ▼
9. ChatGPT uses token in subsequent MCP requests:
   Authorization: Bearer <token>
        │
        ▼
10. CorpusIQ validates token with OAuth provider
        │
        ▼
11. Request processed with user context

Scalability Patterns

Horizontal Scaling

                    ┌──────────────┐
                    │ Load Balancer│
                    └───────┬──────┘
                            │
            ┌───────────────┼───────────────┐
            │               │               │
      ┌─────▼─────┐   ┌────▼─────┐   ┌────▼─────┐
      │Instance 1 │   │Instance 2│   │Instance 3│
      └───────────┘   └──────────┘   └──────────┘
            │               │               │
            └───────────────┼───────────────┘
                            │
                    ┌───────▼────────┐
                    │  Shared Redis  │
                    │ (Rate Limiting)│
                    └────────────────┘

Configuration:

# Use Redis for shared state
REDIS_URL = os.getenv("REDIS_URL")
if REDIS_URL:
    app.add_middleware(RedisRateLimitMiddleware, redis_url=REDIS_URL)
else:
    app.add_middleware(RateLimitMiddleware)  # In-memory, single instance

Caching Strategy

from functools import cache, lru_cache

# Widget HTML (static, cache indefinitely)
@cache
def _load_widget_html() -> str:
    return Path("assets/corpusiq.html").read_text()

# Connector results (cache with TTL)
@lru_cache(maxsize=1000)
def get_cached_search_results(query: str, connector: str) -> List[Result]:
    # Cache expires after N seconds
    pass

Configuration Management

Settings Hierarchy

  1. Defaults (in code)
  2. Environment variables (.env file)
  3. Runtime overrides (for testing)

Implementation:

from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    # Core settings with defaults
    cors_allow_origins_csv: str = ""
    debug_mode: bool = False
    log_level: str = "INFO"
    rate_limit_requests_per_minute: int = 60
    
    # OAuth settings
    oauth_resource_url: str = "https://example.com"
    oauth_issuer: str = "https://auth.example.com"
    
    class Config:
        env_prefix = "CORPUSIQ_"
        env_file = ".env"
        env_file_encoding = "utf-8"

# Singleton instance
settings = Settings()

Error Handling

Error Hierarchy

Exception
  │
  ├─ CorpusIQError (base)
  │    │
  │    ├─ ValidationError (input validation)
  │    ├─ AuthenticationError (auth failures)
  │    ├─ RateLimitError (rate limit exceeded)
  │    ├─ ConnectorError (connector failures)
  │    └─ ConfigurationError (config issues)
  │
  └─ External errors (caught and wrapped)

Error Response Format

{
    "jsonrpc": "2.0",
    "id": "request-id",
    "error": {
        "code": -32603,  # JSON-RPC error code
        "message": "Internal error",
        "data": {
            "type": "ValidationError",
            "detail": "Query too long",
            "request_id": "abc123"
        }
    }
}

Testing Architecture

Test Pyramid

           ┌──────────┐
           │    E2E   │ (Few, slow)
           └──────────┘
         ┌──────────────┐
         │ Integration  │ (Some, medium)
         └──────────────┘
      ┌────────────────────┐
      │   Unit Tests       │ (Many, fast)
      └────────────────────┘

Test Structure:

tests/
├── unit/
│   ├── test_mcp_server.py      # Tool logic
│   ├── test_validation.py      # Input validation
│   └── test_connectors.py      # Connector logic
├── integration/
│   ├── test_api_endpoints.py   # API integration
│   ├── test_oauth_flow.py      # OAuth integration
│   └── test_widget_loading.py  # Widget integration
└── e2e/
    └── test_full_search.py     # End-to-end scenarios

Deployment Architecture

Production Setup

                 ┌──────────────┐
Internet ────────│   CDN/WAF    │
                 └──────┬───────┘
                        │
                 ┌──────▼───────┐
                 │   Nginx      │ (SSL termination, reverse proxy)
                 └──────┬───────┘
                        │
                 ┌──────▼───────┐
                 │Load Balancer │
                 └──────┬───────┘
                        │
         ┌──────────────┼──────────────┐
         │              │              │
    ┌────▼────┐    ┌───▼─────┐   ┌───▼─────┐
    │CorpusIQ│    │CorpusIQ │   │CorpusIQ │
    │Instance1│    │Instance2│   │Instance3│
    └─────────┘    └─────────┘   └─────────┘
         │              │              │
         └──────────────┼──────────────┘
                        │
              ┌─────────┴─────────┐
              │                   │
         ┌────▼────┐        ┌────▼────┐
         │  Redis  │        │Monitoring│
         └─────────┘        └─────────┘

Performance Optimization

Key Metrics

  • Request latency: p50, p95, p99
  • Throughput: requests/second
  • Error rate: 5xx responses
  • Resource usage: CPU, memory
  • Connector latency: per-source response time

Optimization Techniques

  1. Async operations: Use async/await for I/O
  2. Connection pooling: Reuse HTTP connections
  3. Caching: Cache static content and frequent queries
  4. Lazy loading: Load resources only when needed
  5. Batch operations: Combine multiple API calls
  6. CDN: Serve static assets from CDN

Future Enhancements

Planned Improvements

  1. Persistent storage: Database for user preferences
  2. Advanced search: Full-text search with Elasticsearch
  3. Real-time updates: WebSocket support
  4. Batch operations: Bulk actions on search results
  5. Analytics: Usage metrics and insights
  6. Plugin system: Third-party connector plugins

For implementation details, see: