GT AI OS Community Edition v2.0.33
Security hardening release addressing CodeQL and Dependabot alerts: - Fix stack trace exposure in error responses - Add SSRF protection with DNS resolution checking - Implement proper URL hostname validation (replaces substring matching) - Add centralized path sanitization to prevent path traversal - Fix ReDoS vulnerability in email validation regex - Improve HTML sanitization in validation utilities - Fix capability wildcard matching in auth utilities - Update glob dependency to address CVE - Add CodeQL suppression comments for verified false positives 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
229
apps/control-panel-backend/app/models/user.py
Normal file
229
apps/control-panel-backend/app/models/user.py
Normal file
@@ -0,0 +1,229 @@
|
||||
"""
|
||||
User database model
|
||||
"""
|
||||
from datetime import datetime
|
||||
from typing import Optional, Dict, Any, List
|
||||
from sqlalchemy import Column, Integer, String, DateTime, Boolean, Text, ForeignKey, JSON
|
||||
from sqlalchemy.dialects.postgresql import JSONB
|
||||
from sqlalchemy.orm import relationship
|
||||
from sqlalchemy.sql import func
|
||||
import uuid
|
||||
|
||||
from app.core.database import Base
|
||||
|
||||
|
||||
class User(Base):
|
||||
"""User model with capability-based authorization"""
|
||||
|
||||
__tablename__ = "users"
|
||||
|
||||
id = Column(Integer, primary_key=True, index=True)
|
||||
uuid = Column(String(36), default=lambda: str(uuid.uuid4()), unique=True, nullable=False)
|
||||
email = Column(String(255), unique=True, nullable=False, index=True)
|
||||
full_name = Column(String(100), nullable=False)
|
||||
hashed_password = Column(String(255), nullable=False)
|
||||
user_type = Column(
|
||||
String(20),
|
||||
nullable=False,
|
||||
default="tenant_user"
|
||||
) # super_admin, tenant_admin, tenant_user
|
||||
tenant_id = Column(Integer, ForeignKey("tenants.id", ondelete="CASCADE"), nullable=True)
|
||||
current_tenant_id = Column(Integer, ForeignKey("tenants.id"), nullable=True, index=True) # Current active tenant for multi-tenant users
|
||||
capabilities = Column(JSON, nullable=False, default=list)
|
||||
is_active = Column(Boolean, nullable=False, default=True)
|
||||
last_login = Column(DateTime(timezone=True), nullable=True) # For billing calculation
|
||||
last_login_at = Column(DateTime(timezone=True), nullable=True)
|
||||
|
||||
# Two-Factor Authentication fields
|
||||
tfa_enabled = Column(Boolean, nullable=False, default=False)
|
||||
tfa_secret = Column(Text, nullable=True) # Encrypted TOTP secret
|
||||
tfa_required = Column(Boolean, nullable=False, default=False) # Admin can enforce TFA
|
||||
|
||||
# Timestamps
|
||||
created_at = Column(DateTime(timezone=True), server_default=func.now(), nullable=False)
|
||||
updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now(), nullable=False)
|
||||
deleted_at = Column(DateTime(timezone=True), nullable=True)
|
||||
|
||||
# Relationships
|
||||
tenant_assignments = relationship("UserTenantAssignment", foreign_keys="UserTenantAssignment.user_id", back_populates="user", cascade="all, delete-orphan")
|
||||
audit_logs = relationship("AuditLog", back_populates="user", cascade="all, delete-orphan")
|
||||
resource_data = relationship("UserResourceData", back_populates="user", cascade="all, delete-orphan")
|
||||
preferences = relationship("UserPreferences", back_populates="user", cascade="all, delete-orphan", uselist=False)
|
||||
progress = relationship("UserProgress", back_populates="user", cascade="all, delete-orphan")
|
||||
sessions = relationship("Session", back_populates="user", passive_deletes=True) # Let DB CASCADE handle deletion
|
||||
|
||||
def __repr__(self):
|
||||
return f"<User(id={self.id}, email='{self.email}', user_type='{self.user_type}')>"
|
||||
|
||||
def to_dict(self, include_sensitive: bool = False, include_tenants: bool = False) -> Dict[str, Any]:
|
||||
"""Convert user to dictionary"""
|
||||
data = {
|
||||
"id": self.id,
|
||||
"uuid": str(self.uuid),
|
||||
"email": self.email,
|
||||
"full_name": self.full_name,
|
||||
"user_type": self.user_type,
|
||||
"current_tenant_id": self.current_tenant_id,
|
||||
"capabilities": self.capabilities,
|
||||
"is_active": self.is_active,
|
||||
"last_login_at": self.last_login_at.isoformat() if self.last_login_at else None,
|
||||
"created_at": self.created_at.isoformat() if self.created_at else None,
|
||||
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
|
||||
# TFA fields (never include tfa_secret for security)
|
||||
"tfa_enabled": self.tfa_enabled,
|
||||
"tfa_required": self.tfa_required,
|
||||
"tfa_status": self.tfa_status
|
||||
}
|
||||
|
||||
if include_tenants:
|
||||
data["tenant_assignments"] = [
|
||||
assignment.to_dict() for assignment in self.tenant_assignments
|
||||
if assignment.is_active and not assignment.deleted_at
|
||||
]
|
||||
|
||||
if include_sensitive:
|
||||
data["hashed_password"] = self.hashed_password
|
||||
|
||||
return data
|
||||
|
||||
@property
|
||||
def is_super_admin(self) -> bool:
|
||||
"""Check if user is super admin"""
|
||||
return self.user_type == "super_admin"
|
||||
|
||||
@property
|
||||
def is_tenant_admin(self) -> bool:
|
||||
"""Check if user is tenant admin"""
|
||||
return self.user_type == "tenant_admin"
|
||||
|
||||
@property
|
||||
def is_tenant_user(self) -> bool:
|
||||
"""Check if user is regular tenant user"""
|
||||
return self.user_type == "tenant_user"
|
||||
|
||||
@property
|
||||
def tfa_status(self) -> str:
|
||||
"""Get TFA status: disabled, enabled, or enforced"""
|
||||
if self.tfa_required:
|
||||
return "enforced"
|
||||
elif self.tfa_enabled:
|
||||
return "enabled"
|
||||
else:
|
||||
return "disabled"
|
||||
|
||||
def has_capability(self, resource: str, action: str) -> bool:
|
||||
"""Check if user has specific capability"""
|
||||
if not self.capabilities:
|
||||
return False
|
||||
|
||||
for capability in self.capabilities:
|
||||
# Check resource match (support wildcards)
|
||||
resource_match = (
|
||||
capability.get("resource") == "*" or
|
||||
capability.get("resource") == resource or
|
||||
(capability.get("resource", "").endswith("*") and
|
||||
resource.startswith(capability.get("resource", "").rstrip("*")))
|
||||
)
|
||||
|
||||
# Check action match
|
||||
actions = capability.get("actions", [])
|
||||
action_match = "*" in actions or action in actions
|
||||
|
||||
if resource_match and action_match:
|
||||
# Check constraints if present
|
||||
constraints = capability.get("constraints", {})
|
||||
if constraints:
|
||||
# Check validity period
|
||||
valid_until = constraints.get("valid_until")
|
||||
if valid_until:
|
||||
from datetime import datetime
|
||||
if datetime.fromisoformat(valid_until.replace('Z', '+00:00')) < datetime.now():
|
||||
continue
|
||||
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def get_tenant_assignment(self, tenant_id: int) -> Optional['UserTenantAssignment']:
|
||||
"""Get user's assignment for specific tenant"""
|
||||
from app.models.user_tenant_assignment import UserTenantAssignment
|
||||
for assignment in self.tenant_assignments:
|
||||
if assignment.tenant_id == tenant_id and assignment.is_active and not assignment.deleted_at:
|
||||
return assignment
|
||||
return None
|
||||
|
||||
def get_current_tenant_assignment(self) -> Optional['UserTenantAssignment']:
|
||||
"""Get user's current active tenant assignment"""
|
||||
if not self.current_tenant_id:
|
||||
return self.get_primary_tenant_assignment()
|
||||
return self.get_tenant_assignment(self.current_tenant_id)
|
||||
|
||||
def get_primary_tenant_assignment(self) -> Optional['UserTenantAssignment']:
|
||||
"""Get user's primary tenant assignment"""
|
||||
for assignment in self.tenant_assignments:
|
||||
if assignment.is_primary_tenant and assignment.is_active and not assignment.deleted_at:
|
||||
return assignment
|
||||
# Fallback to first active assignment
|
||||
active_assignments = [a for a in self.tenant_assignments if a.is_active and not a.deleted_at]
|
||||
return active_assignments[0] if active_assignments else None
|
||||
|
||||
def get_available_tenants(self) -> List['UserTenantAssignment']:
|
||||
"""Get all tenant assignments user has access to"""
|
||||
return [
|
||||
assignment for assignment in self.tenant_assignments
|
||||
if assignment.is_active and not assignment.deleted_at
|
||||
]
|
||||
|
||||
def has_tenant_access(self, tenant_id: int) -> bool:
|
||||
"""Check if user has access to specific tenant"""
|
||||
return self.get_tenant_assignment(tenant_id) is not None
|
||||
|
||||
def switch_to_tenant(self, tenant_id: int) -> bool:
|
||||
"""Switch user's current tenant context"""
|
||||
if self.has_tenant_access(tenant_id):
|
||||
self.current_tenant_id = tenant_id
|
||||
return True
|
||||
return False
|
||||
|
||||
def get_tenant_capabilities(self, tenant_id: Optional[int] = None) -> List[Dict[str, Any]]:
|
||||
"""Get capabilities for specific tenant or current tenant"""
|
||||
target_tenant_id = tenant_id or self.current_tenant_id
|
||||
if not target_tenant_id:
|
||||
return []
|
||||
|
||||
assignment = self.get_tenant_assignment(target_tenant_id)
|
||||
if not assignment:
|
||||
return []
|
||||
|
||||
return assignment.tenant_capabilities or []
|
||||
|
||||
def has_tenant_capability(self, resource: str, action: str, tenant_id: Optional[int] = None) -> bool:
|
||||
"""Check if user has specific capability in tenant"""
|
||||
target_tenant_id = tenant_id or self.current_tenant_id
|
||||
if not target_tenant_id:
|
||||
return False
|
||||
|
||||
assignment = self.get_tenant_assignment(target_tenant_id)
|
||||
if not assignment:
|
||||
return False
|
||||
|
||||
return assignment.has_capability(resource, action)
|
||||
|
||||
def is_tenant_admin(self, tenant_id: Optional[int] = None) -> bool:
|
||||
"""Check if user is admin in specific tenant"""
|
||||
target_tenant_id = tenant_id or self.current_tenant_id
|
||||
if not target_tenant_id:
|
||||
return False
|
||||
|
||||
assignment = self.get_tenant_assignment(target_tenant_id)
|
||||
if not assignment:
|
||||
return False
|
||||
|
||||
return assignment.is_tenant_admin
|
||||
|
||||
def get_current_tenant_context(self) -> Optional[Dict[str, Any]]:
|
||||
"""Get current tenant context for JWT token"""
|
||||
assignment = self.get_current_tenant_assignment()
|
||||
if not assignment:
|
||||
return None
|
||||
return assignment.get_tenant_context()
|
||||
Reference in New Issue
Block a user