Files
gt-ai-os-community/apps/tenant-backend/app/core/logging_config.py
HackWeasel b9dfb86260 GT AI OS Community Edition v2.0.33
Security hardening release addressing CodeQL and Dependabot alerts:

- Fix stack trace exposure in error responses
- Add SSRF protection with DNS resolution checking
- Implement proper URL hostname validation (replaces substring matching)
- Add centralized path sanitization to prevent path traversal
- Fix ReDoS vulnerability in email validation regex
- Improve HTML sanitization in validation utilities
- Fix capability wildcard matching in auth utilities
- Update glob dependency to address CVE
- Add CodeQL suppression comments for verified false positives

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-12 17:04:45 -05:00

169 lines
5.7 KiB
Python

"""
GT 2.0 Tenant Backend Logging Configuration
Structured logging with tenant isolation and security considerations.
"""
import logging
import logging.config
import sys
from typing import Dict, Any
from app.core.config import get_settings
def setup_logging() -> None:
"""Setup logging configuration for the tenant backend"""
settings = get_settings()
# Determine log directory based on environment
if settings.environment == "test":
log_dir = f"/tmp/gt2-data/{settings.tenant_domain}/logs"
else:
log_dir = f"/data/{settings.tenant_domain}/logs"
# Create logging configuration
log_config: Dict[str, Any] = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"default": {
"format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s",
"datefmt": "%Y-%m-%d %H:%M:%S",
},
"json": {
"format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s - %(pathname)s:%(lineno)d",
"datefmt": "%Y-%m-%d %H:%M:%S",
},
"detailed": {
"format": "%(asctime)s - %(name)s - %(levelname)s - %(pathname)s:%(lineno)d - %(funcName)s() - %(message)s",
"datefmt": "%Y-%m-%d %H:%M:%S",
}
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"level": settings.log_level,
"formatter": "json" if settings.log_format == "json" else "default",
"stream": sys.stdout,
},
"file": {
"class": "logging.handlers.RotatingFileHandler",
"level": "INFO",
"formatter": "json" if settings.log_format == "json" else "detailed",
"filename": f"{log_dir}/tenant-backend.log",
"maxBytes": 10485760, # 10MB
"backupCount": 5,
"encoding": "utf-8",
},
},
"loggers": {
"": { # Root logger
"level": settings.log_level,
"handlers": ["console"],
"propagate": False,
},
"app": {
"level": settings.log_level,
"handlers": ["console", "file"] if settings.environment == "production" else ["console"],
"propagate": False,
},
"sqlalchemy.engine": {
"level": "INFO" if settings.debug else "WARNING",
"handlers": ["console"],
"propagate": False,
},
"uvicorn.access": {
"level": "WARNING", # Suppress INFO level access logs (operational endpoints)
"handlers": ["console"],
"propagate": False,
},
"uvicorn.error": {
"level": "INFO",
"handlers": ["console"],
"propagate": False,
},
},
}
# Create log directory if it doesn't exist
import os
os.makedirs(log_dir, exist_ok=True, mode=0o700)
# Apply logging configuration
logging.config.dictConfig(log_config)
# Add tenant context to all logs
class TenantContextFilter(logging.Filter):
def filter(self, record):
record.tenant_id = settings.tenant_id
record.tenant_domain = settings.tenant_domain
return True
tenant_filter = TenantContextFilter()
# Add tenant filter to all handlers
for handler in logging.getLogger().handlers:
handler.addFilter(tenant_filter)
# Log startup information
logger = logging.getLogger("app.startup")
logger.info(
"Tenant backend logging initialized",
extra={
"tenant_id": settings.tenant_id,
"tenant_domain": settings.tenant_domain,
"environment": settings.environment,
"log_level": settings.log_level,
"log_format": settings.log_format,
}
)
def get_logger(name: str) -> logging.Logger:
"""Get logger with consistent naming and formatting"""
return logging.getLogger(f"app.{name}")
class SecurityRedactionFilter(logging.Filter):
"""Filter to redact sensitive information from logs"""
SENSITIVE_FIELDS = [
"password", "token", "secret", "key", "authorization",
"cookie", "session", "csrf", "api_key", "jwt"
]
def filter(self, record):
if hasattr(record, 'args') and record.args:
# Redact sensitive information from log messages
record.args = self._redact_sensitive_data(record.args)
if hasattr(record, 'msg') and isinstance(record.msg, str):
for field in self.SENSITIVE_FIELDS:
if field.lower() in record.msg.lower():
record.msg = record.msg.replace(field, "[REDACTED]")
return True
def _redact_sensitive_data(self, data):
"""Recursively redact sensitive data from log arguments"""
if isinstance(data, dict):
return {
key: "[REDACTED]" if any(sensitive in key.lower() for sensitive in self.SENSITIVE_FIELDS)
else self._redact_sensitive_data(value)
for key, value in data.items()
}
elif isinstance(data, (list, tuple)):
return type(data)(self._redact_sensitive_data(item) for item in data)
return data
def setup_security_logging():
"""Setup security-focused logging with redaction"""
security_filter = SecurityRedactionFilter()
# Add security filter to all loggers
for name in ["app", "uvicorn", "sqlalchemy"]:
logger = logging.getLogger(name)
logger.addFilter(security_filter)