""" Authentication and authorization utilities """ import jwt from datetime import datetime, timedelta, timezone from typing import Optional, Dict, Any from fastapi import HTTPException, Security, Depends, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from app.core.config import settings from app.core.database import get_db from app.models.user import User security = HTTPBearer() class JWTHandler: """JWT token handler""" @staticmethod def create_access_token( user_id: int, user_email: str, user_type: str, current_tenant: Optional[dict] = None, available_tenants: Optional[list] = None, capabilities: Optional[list] = None, # For token refresh: preserve original login time and absolute expiry original_iat: Optional[datetime] = None, original_absolute_exp: Optional[float] = None, # Server-side session token (Issue #264) session_token: Optional[str] = None ) -> str: """Create a JWT access token with tenant context NIST SP 800-63B AAL2 Compliant Session Management (Issues #242, #264): - exp: 12 hours (matches absolute timeout) - serves as JWT-level backstop - absolute_exp: Absolute timeout (12 hours) - NOT refreshable, forces re-login - iat: Original login time - preserved across token refreshes - session_id: Server-side session token for authoritative validation The server-side session (via SessionService) enforces the 30-minute idle timeout by tracking last_activity_at. JWT exp is set to 12 hours so it doesn't block requests before the server-side session validation can check activity-based idle timeout. """ now = datetime.now(timezone.utc) # Use original iat if refreshing, otherwise current time (new login) iat = original_iat if original_iat else now # Calculate absolute expiry: iat + absolute timeout hours (only set on initial login) if original_absolute_exp is not None: absolute_exp = original_absolute_exp else: absolute_exp = (iat + timedelta(hours=settings.JWT_ABSOLUTE_TIMEOUT_HOURS)).timestamp() payload = { "sub": str(user_id), "email": user_email, "user_type": user_type, # Current tenant context (most important) "current_tenant": current_tenant or {}, # Available tenants for switching "available_tenants": available_tenants or [], # Base capabilities (rarely used - tenant-specific capabilities are in current_tenant) "capabilities": capabilities or [], # NIST/OWASP Session Timeouts (Issues #242, #264) # exp: Idle timeout - 4 hours from now (refreshable) "exp": now + timedelta(minutes=settings.JWT_EXPIRES_MINUTES), # iat: Original login time (preserved across refreshes) "iat": iat, # absolute_exp: Absolute timeout from original login (NOT refreshable) "absolute_exp": absolute_exp, # session_id: Server-side session token for authoritative validation (Issue #264) # The server-side session is the source of truth - JWT expiry is secondary "session_id": session_token } # Use HS256 with JWT_SECRET from settings (auto-generated by installer) return jwt.encode(payload, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM) @staticmethod def decode_token(token: str) -> Dict[str, Any]: """Decode and validate a JWT token""" try: # Use HS256 with JWT_SECRET from settings (auto-generated by installer) payload = jwt.decode(token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM]) return payload except jwt.ExpiredSignatureError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token has expired" ) except jwt.InvalidTokenError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token" ) async def get_current_user( credentials: HTTPAuthorizationCredentials = Security(security), db: AsyncSession = Depends(get_db) ) -> User: """Get the current authenticated user""" token = credentials.credentials payload = JWTHandler.decode_token(token) user_id = int(payload["sub"]) # Get user from database result = await db.execute( select(User).where(User.id == user_id) ) user = result.scalar_one_or_none() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) if not user.is_active: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="User account is inactive" ) return user async def require_admin(current_user: User = Depends(get_current_user)) -> User: """Require the current user to be a super admin (control panel access)""" if current_user.user_type != "super_admin": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Super admin access required" ) return current_user async def require_super_admin(current_user: User = Depends(get_current_user)) -> User: """Require the current user to be a super admin""" if current_user.user_type != "super_admin": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Super admin access required" ) return current_user