Security hardening release addressing CodeQL and Dependabot alerts: - Fix stack trace exposure in error responses - Add SSRF protection with DNS resolution checking - Implement proper URL hostname validation (replaces substring matching) - Add centralized path sanitization to prevent path traversal - Fix ReDoS vulnerability in email validation regex - Improve HTML sanitization in validation utilities - Fix capability wildcard matching in auth utilities - Update glob dependency to address CVE - Add CodeQL suppression comments for verified false positives 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
664 lines
21 KiB
Python
664 lines
21 KiB
Python
"""
|
|
Two-Factor Authentication API endpoints
|
|
|
|
Handles TFA enable, disable, verification, and status operations.
|
|
"""
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Optional
|
|
from fastapi import APIRouter, Depends, HTTPException, status, Request, Cookie
|
|
from fastapi.responses import Response
|
|
from pydantic import BaseModel
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
import structlog
|
|
import uuid
|
|
import base64
|
|
import io
|
|
|
|
from app.core.database import get_db
|
|
from app.core.auth import get_current_user, JWTHandler
|
|
from app.models.user import User
|
|
from app.models.audit import AuditLog
|
|
from app.models.tfa_rate_limit import TFAVerificationRateLimit
|
|
from app.models.used_temp_token import UsedTempToken
|
|
from app.core.tfa import get_tfa_manager
|
|
|
|
logger = structlog.get_logger()
|
|
router = APIRouter(prefix="/tfa", tags=["tfa"])
|
|
|
|
|
|
# Pydantic models
|
|
class TFAEnableResponse(BaseModel):
|
|
success: bool
|
|
message: str
|
|
qr_code_uri: str
|
|
manual_entry_key: str
|
|
|
|
|
|
class TFAVerifySetupRequest(BaseModel):
|
|
code: str
|
|
|
|
|
|
class TFAVerifySetupResponse(BaseModel):
|
|
success: bool
|
|
message: str
|
|
|
|
|
|
class TFADisableRequest(BaseModel):
|
|
password: str
|
|
|
|
|
|
class TFADisableResponse(BaseModel):
|
|
success: bool
|
|
message: str
|
|
|
|
|
|
class TFAVerifyLoginRequest(BaseModel):
|
|
code: str # Only code needed - temp_token from session cookie
|
|
|
|
|
|
class TFAVerifyLoginResponse(BaseModel):
|
|
success: bool
|
|
access_token: Optional[str] = None
|
|
expires_in: Optional[int] = None
|
|
user: Optional[dict] = None
|
|
message: Optional[str] = None
|
|
|
|
|
|
class TFAStatusResponse(BaseModel):
|
|
tfa_enabled: bool
|
|
tfa_required: bool
|
|
tfa_status: str
|
|
|
|
|
|
class TFASessionDataResponse(BaseModel):
|
|
user_email: str
|
|
tfa_configured: bool
|
|
qr_code_uri: Optional[str] = None
|
|
manual_entry_key: Optional[str] = None
|
|
|
|
|
|
# Endpoints
|
|
@router.get("/session-data", response_model=TFASessionDataResponse)
|
|
async def get_tfa_session_data(
|
|
tfa_session: Optional[str] = Cookie(None),
|
|
db: AsyncSession = Depends(get_db)
|
|
):
|
|
"""
|
|
Get TFA setup data from server-side session.
|
|
Session ID from HTTP-only cookie.
|
|
Used by /verify-tfa page to fetch QR code on mount.
|
|
"""
|
|
if not tfa_session:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="No TFA session found"
|
|
)
|
|
|
|
# Get session from database
|
|
result = await db.execute(
|
|
select(UsedTempToken).where(UsedTempToken.token_id == tfa_session)
|
|
)
|
|
session = result.scalar_one_or_none()
|
|
|
|
if not session:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid TFA session"
|
|
)
|
|
|
|
# Check expiry
|
|
if datetime.now(timezone.utc) > session.expires_at:
|
|
await db.delete(session)
|
|
await db.commit()
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="TFA session expired"
|
|
)
|
|
|
|
# Check if already used
|
|
if session.used_at:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="TFA session already used"
|
|
)
|
|
|
|
logger.info(
|
|
"TFA session data retrieved",
|
|
session_id=tfa_session,
|
|
user_id=session.user_id,
|
|
tfa_configured=session.tfa_configured
|
|
)
|
|
|
|
return TFASessionDataResponse(
|
|
user_email=session.user_email,
|
|
tfa_configured=session.tfa_configured,
|
|
qr_code_uri=None, # Security: Don't expose QR code data URI - use blob endpoint
|
|
manual_entry_key=session.manual_entry_key
|
|
)
|
|
|
|
|
|
@router.get("/session-qr-code")
|
|
async def get_tfa_session_qr_code(
|
|
tfa_session: Optional[str] = Cookie(None, alias="tfa_session"),
|
|
db: AsyncSession = Depends(get_db)
|
|
):
|
|
"""
|
|
Get TFA QR code as PNG blob (secure: never exposes TOTP secret to JavaScript).
|
|
Session ID from HTTP-only cookie.
|
|
Returns raw PNG bytes with image/png content type.
|
|
"""
|
|
if not tfa_session:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="No TFA session found"
|
|
)
|
|
|
|
# Get session from database
|
|
result = await db.execute(
|
|
select(UsedTempToken).where(UsedTempToken.token_id == tfa_session)
|
|
)
|
|
session = result.scalar_one_or_none()
|
|
|
|
if not session:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid TFA session"
|
|
)
|
|
|
|
# Check expiry
|
|
if datetime.now(timezone.utc) > session.expires_at:
|
|
await db.delete(session)
|
|
await db.commit()
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="TFA session expired"
|
|
)
|
|
|
|
# Check if already used
|
|
if session.used_at:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="TFA session already used"
|
|
)
|
|
|
|
# Check if QR code exists (only for setup flow)
|
|
if not session.qr_code_uri:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="No QR code available for this session"
|
|
)
|
|
|
|
# Extract base64 PNG data from data URI
|
|
# Format: data:image/png;base64,iVBORw0KGgoAAAANS...
|
|
if not session.qr_code_uri.startswith("data:image/png;base64,"):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Invalid QR code format"
|
|
)
|
|
|
|
base64_data = session.qr_code_uri.split(",", 1)[1]
|
|
png_bytes = base64.b64decode(base64_data)
|
|
|
|
logger.info(
|
|
"TFA QR code blob retrieved",
|
|
session_id=tfa_session,
|
|
user_id=session.user_id,
|
|
size_bytes=len(png_bytes)
|
|
)
|
|
|
|
# Return raw PNG bytes
|
|
return Response(
|
|
content=png_bytes,
|
|
media_type="image/png",
|
|
headers={
|
|
"Cache-Control": "no-store, no-cache, must-revalidate",
|
|
"Pragma": "no-cache",
|
|
"Expires": "0"
|
|
}
|
|
)
|
|
|
|
|
|
#
|
|
@router.post("/enable", response_model=TFAEnableResponse)
|
|
async def enable_tfa(
|
|
request: Request,
|
|
current_user: User = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_db)
|
|
):
|
|
"""
|
|
Enable TFA for current user (user-initiated from settings)
|
|
Generates TOTP secret and returns QR code for scanning
|
|
"""
|
|
try:
|
|
# Check if already enabled
|
|
if current_user.tfa_enabled:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="TFA is already enabled for this account"
|
|
)
|
|
|
|
# Get tenant name for QR code branding
|
|
tenant_name = None
|
|
if current_user.tenant_id:
|
|
from app.models.tenant import Tenant
|
|
tenant_result = await db.execute(
|
|
select(Tenant).where(Tenant.id == current_user.tenant_id)
|
|
)
|
|
tenant = tenant_result.scalar_one_or_none()
|
|
if tenant:
|
|
tenant_name = tenant.name
|
|
|
|
# Validate tenant name exists (fail fast - no fallback)
|
|
if not tenant_name:
|
|
logger.error("Tenant name not configured", user_id=current_user.id, tenant_id=current_user.tenant_id)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Tenant configuration error: tenant name not set"
|
|
)
|
|
|
|
# Get TFA manager
|
|
tfa_manager = get_tfa_manager()
|
|
|
|
# Setup TFA: generate secret, encrypt, create QR code with tenant branding
|
|
encrypted_secret, qr_code_uri, manual_entry_key = tfa_manager.setup_new_tfa(current_user.email, tenant_name)
|
|
|
|
# Save encrypted secret to user (but don't enable yet - wait for verification)
|
|
current_user.tfa_secret = encrypted_secret
|
|
await db.commit()
|
|
|
|
# Create audit log
|
|
audit_log = AuditLog.create_log(
|
|
action="user.tfa_setup_initiated",
|
|
user_id=current_user.id,
|
|
tenant_id=current_user.tenant_id,
|
|
details={"email": current_user.email},
|
|
ip_address=request.client.host if request.client else None,
|
|
user_agent=request.headers.get("user-agent")
|
|
)
|
|
db.add(audit_log)
|
|
await db.commit()
|
|
|
|
logger.info("TFA setup initiated", user_id=current_user.id, email=current_user.email)
|
|
|
|
return TFAEnableResponse(
|
|
success=True,
|
|
message="Scan QR code with Google Authenticator and enter the code to complete setup",
|
|
qr_code_uri=qr_code_uri,
|
|
manual_entry_key=manual_entry_key
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error("TFA enable error", error=str(e), user_id=current_user.id)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Failed to enable TFA"
|
|
)
|
|
|
|
|
|
@router.post("/verify-setup", response_model=TFAVerifySetupResponse)
|
|
async def verify_setup(
|
|
verify_data: TFAVerifySetupRequest,
|
|
request: Request,
|
|
current_user: User = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_db)
|
|
):
|
|
"""
|
|
Verify initial TFA setup code and enable TFA
|
|
"""
|
|
try:
|
|
# Check if TFA secret exists
|
|
if not current_user.tfa_secret:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="TFA setup not initiated. Call /tfa/enable first."
|
|
)
|
|
|
|
# Check if already enabled
|
|
if current_user.tfa_enabled:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="TFA is already enabled"
|
|
)
|
|
|
|
# Get TFA manager
|
|
tfa_manager = get_tfa_manager()
|
|
|
|
# Decrypt secret
|
|
secret = tfa_manager.decrypt_secret(current_user.tfa_secret)
|
|
|
|
# Verify code
|
|
if not tfa_manager.verify_totp(secret, verify_data.code):
|
|
logger.warning("TFA setup verification failed", user_id=current_user.id)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Invalid verification code"
|
|
)
|
|
|
|
# Enable TFA
|
|
current_user.tfa_enabled = True
|
|
await db.commit()
|
|
|
|
# Create audit log
|
|
audit_log = AuditLog.create_log(
|
|
action="user.tfa_enabled",
|
|
user_id=current_user.id,
|
|
tenant_id=current_user.tenant_id,
|
|
details={"email": current_user.email},
|
|
ip_address=request.client.host if request.client else None,
|
|
user_agent=request.headers.get("user-agent")
|
|
)
|
|
db.add(audit_log)
|
|
await db.commit()
|
|
|
|
logger.info("TFA enabled successfully", user_id=current_user.id, email=current_user.email)
|
|
|
|
return TFAVerifySetupResponse(
|
|
success=True,
|
|
message="Two-Factor Authentication enabled successfully"
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error("TFA verify setup error", error=str(e), user_id=current_user.id)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Failed to verify TFA setup"
|
|
)
|
|
|
|
|
|
@router.post("/disable", response_model=TFADisableResponse)
|
|
async def disable_tfa(
|
|
disable_data: TFADisableRequest,
|
|
request: Request,
|
|
current_user: User = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_db)
|
|
):
|
|
"""
|
|
Disable TFA for current user (requires password confirmation)
|
|
Only allowed if TFA is not required by admin
|
|
"""
|
|
try:
|
|
# Check if TFA is required by admin
|
|
if current_user.tfa_required:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Cannot disable TFA - it is required by your administrator"
|
|
)
|
|
|
|
# Check if TFA is enabled
|
|
if not current_user.tfa_enabled:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="TFA is not enabled"
|
|
)
|
|
|
|
# Verify password
|
|
from passlib.context import CryptContext
|
|
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
|
|
|
if not pwd_context.verify(disable_data.password, current_user.hashed_password):
|
|
logger.warning("TFA disable failed - invalid password", user_id=current_user.id)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Invalid password"
|
|
)
|
|
|
|
# Disable TFA and clear secret
|
|
current_user.tfa_enabled = False
|
|
current_user.tfa_secret = None
|
|
await db.commit()
|
|
|
|
# Create audit log
|
|
audit_log = AuditLog.create_log(
|
|
action="user.tfa_disabled",
|
|
user_id=current_user.id,
|
|
tenant_id=current_user.tenant_id,
|
|
details={"email": current_user.email},
|
|
ip_address=request.client.host if request.client else None,
|
|
user_agent=request.headers.get("user-agent")
|
|
)
|
|
db.add(audit_log)
|
|
await db.commit()
|
|
|
|
logger.info("TFA disabled successfully", user_id=current_user.id, email=current_user.email)
|
|
|
|
return TFADisableResponse(
|
|
success=True,
|
|
message="Two-Factor Authentication disabled successfully"
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error("TFA disable error", error=str(e), user_id=current_user.id)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Failed to disable TFA"
|
|
)
|
|
|
|
|
|
@router.post("/verify-login", response_model=TFAVerifyLoginResponse)
|
|
async def verify_login(
|
|
verify_data: TFAVerifyLoginRequest,
|
|
request: Request,
|
|
tfa_session: Optional[str] = Cookie(None),
|
|
db: AsyncSession = Depends(get_db)
|
|
):
|
|
"""
|
|
Verify TFA code during login and issue final JWT
|
|
Handles both setup (State 2) and verification (State 3)
|
|
Uses session cookie to get temp_token (server-side session)
|
|
"""
|
|
try:
|
|
# Get session from cookie
|
|
if not tfa_session:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="No TFA session found"
|
|
)
|
|
|
|
# Get session from database
|
|
result = await db.execute(
|
|
select(UsedTempToken).where(UsedTempToken.token_id == tfa_session)
|
|
)
|
|
session = result.scalar_one_or_none()
|
|
|
|
if not session or not session.temp_token:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid TFA session"
|
|
)
|
|
|
|
# Check expiry
|
|
if datetime.now(timezone.utc) > session.expires_at:
|
|
await db.delete(session)
|
|
await db.commit()
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="TFA session expired"
|
|
)
|
|
|
|
# Check if already used
|
|
if session.used_at:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="TFA session already used"
|
|
)
|
|
|
|
# Get user_id and token_id from session
|
|
user_id = session.user_id
|
|
token_id = session.token_id
|
|
|
|
# Check for replay attack
|
|
if await UsedTempToken.is_token_used(token_id, db):
|
|
logger.warning("Temp token replay attempt detected", user_id=user_id, token_id=token_id)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Token has already been used"
|
|
)
|
|
|
|
# Check rate limiting
|
|
if await TFAVerificationRateLimit.is_rate_limited(user_id, db):
|
|
logger.warning("TFA verification rate limited", user_id=user_id)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
|
|
detail="Too many attempts. Please wait 60 seconds and try again."
|
|
)
|
|
|
|
# Record attempt for rate limiting
|
|
await TFAVerificationRateLimit.record_attempt(user_id, db)
|
|
|
|
# Get user
|
|
result = await db.execute(select(User).where(User.id == user_id))
|
|
user = result.scalar_one_or_none()
|
|
|
|
if not user or not user.is_active:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="User not found or inactive"
|
|
)
|
|
|
|
# Check if TFA secret exists
|
|
if not user.tfa_secret:
|
|
logger.error("TFA secret missing during verification", user_id=user_id)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="TFA not properly configured"
|
|
)
|
|
|
|
# Get TFA manager
|
|
tfa_manager = get_tfa_manager()
|
|
|
|
# Decrypt secret
|
|
secret = tfa_manager.decrypt_secret(user.tfa_secret)
|
|
|
|
# Verify TOTP code
|
|
if not tfa_manager.verify_totp(secret, verify_data.code):
|
|
logger.warning("TFA verification failed", user_id=user_id)
|
|
|
|
# Create audit log for failed attempt
|
|
audit_log = AuditLog.create_log(
|
|
action="user.tfa_verification_failed",
|
|
user_id=user_id,
|
|
tenant_id=user.tenant_id,
|
|
details={"email": user.email},
|
|
ip_address=request.client.host if request.client else None,
|
|
user_agent=request.headers.get("user-agent")
|
|
)
|
|
db.add(audit_log)
|
|
await db.commit()
|
|
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Invalid verification code"
|
|
)
|
|
|
|
# If TFA was enforced but not enabled, enable it now
|
|
if user.tfa_required and not user.tfa_enabled:
|
|
user.tfa_enabled = True
|
|
logger.info("TFA auto-enabled after mandatory setup", user_id=user_id)
|
|
|
|
# Mark session as used
|
|
session.used_at = datetime.now(timezone.utc)
|
|
await db.commit()
|
|
|
|
# Update last login
|
|
user.last_login_at = datetime.now(timezone.utc)
|
|
|
|
# Get tenant context
|
|
from app.models.tenant import Tenant
|
|
if user.tenant_id:
|
|
tenant_result = await db.execute(
|
|
select(Tenant).where(Tenant.id == user.tenant_id)
|
|
)
|
|
tenant = tenant_result.scalar_one_or_none()
|
|
|
|
current_tenant_context = {
|
|
"id": str(user.tenant_id),
|
|
"domain": tenant.domain if tenant else f"tenant_{user.tenant_id}",
|
|
"name": tenant.name if tenant else f"Tenant {user.tenant_id}",
|
|
"role": user.user_type,
|
|
"display_name": user.full_name,
|
|
"email": user.email,
|
|
"is_primary": True
|
|
}
|
|
available_tenants = [current_tenant_context]
|
|
else:
|
|
current_tenant_context = {
|
|
"id": None,
|
|
"domain": "none",
|
|
"name": "No Tenant",
|
|
"role": user.user_type
|
|
}
|
|
available_tenants = []
|
|
|
|
# Create final JWT token
|
|
token = JWTHandler.create_access_token(
|
|
user_id=user.id,
|
|
user_email=user.email,
|
|
user_type=user.user_type,
|
|
current_tenant=current_tenant_context,
|
|
available_tenants=available_tenants,
|
|
capabilities=user.capabilities or []
|
|
)
|
|
|
|
# Create audit log for successful verification
|
|
audit_log = AuditLog.create_log(
|
|
action="user.tfa_verification_success",
|
|
user_id=user_id,
|
|
tenant_id=user.tenant_id,
|
|
details={"email": user.email},
|
|
ip_address=request.client.host if request.client else None,
|
|
user_agent=request.headers.get("user-agent")
|
|
)
|
|
db.add(audit_log)
|
|
await db.commit()
|
|
|
|
logger.info("TFA verification successful", user_id=user_id, email=user.email)
|
|
|
|
# Return response with user object for frontend validation
|
|
from fastapi.responses import JSONResponse
|
|
response = JSONResponse(content={
|
|
"success": True,
|
|
"access_token": token,
|
|
"user": {
|
|
"id": user.id,
|
|
"email": user.email,
|
|
"full_name": user.full_name,
|
|
"user_type": user.user_type,
|
|
"tenant_id": user.tenant_id,
|
|
"capabilities": user.capabilities or [],
|
|
"tfa_setup_pending": False
|
|
}
|
|
})
|
|
|
|
# Delete TFA session cookie
|
|
response.delete_cookie(key="tfa_session")
|
|
|
|
return response
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error("TFA verify login error", error=str(e))
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Failed to verify TFA code"
|
|
)
|
|
|
|
|
|
@router.get("/status", response_model=TFAStatusResponse)
|
|
async def get_tfa_status(
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""Get TFA status for current user"""
|
|
return TFAStatusResponse(
|
|
tfa_enabled=current_user.tfa_enabled,
|
|
tfa_required=current_user.tfa_required,
|
|
tfa_status=current_user.tfa_status
|
|
)
|