Files
gt-ai-os-community/apps/tenant-backend/app/services/workflow_service.py
HackWeasel b9dfb86260 GT AI OS Community Edition v2.0.33
Security hardening release addressing CodeQL and Dependabot alerts:

- Fix stack trace exposure in error responses
- Add SSRF protection with DNS resolution checking
- Implement proper URL hostname validation (replaces substring matching)
- Add centralized path sanitization to prevent path traversal
- Fix ReDoS vulnerability in email validation regex
- Improve HTML sanitization in validation utilities
- Fix capability wildcard matching in auth utilities
- Update glob dependency to address CVE
- Add CodeQL suppression comments for verified false positives

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-12 17:04:45 -05:00

995 lines
37 KiB
Python

import uuid
import json
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional, Union
from sqlalchemy.orm import Session
from sqlalchemy import select, update, delete
from sqlalchemy.orm import selectinload
from app.models.workflow import (
Workflow, WorkflowExecution, WorkflowTrigger, WorkflowSession, WorkflowMessage,
WorkflowStatus, TriggerType, InteractionMode,
WORKFLOW_NODE_TYPES, INTERACTION_MODE_CONFIGS
)
from app.models.agent import Agent
# Backward compatibility
from app.models.agent import Agent
from app.services.resource_service import ResourceService
class WorkflowValidationError(Exception):
"""Raised when workflow validation fails"""
pass
class WorkflowService:
"""Service for managing workflows with Agents as AI node definitions"""
def __init__(self, db: Session):
self.db = db
self.resource_service = ResourceService()
def create_workflow(
self,
user_id: str,
tenant_id: str,
workflow_data: Dict[str, Any]
) -> Workflow:
"""Create a new workflow with validation"""
# Validate workflow definition
self._validate_workflow_definition(
workflow_data.get('definition', {}),
user_id,
tenant_id
)
# Create workflow
workflow = Workflow(
id=str(uuid.uuid4()),
tenant_id=tenant_id,
user_id=user_id,
name=workflow_data['name'],
description=workflow_data.get('description', ''),
definition=workflow_data['definition'],
triggers=workflow_data.get('triggers', []),
interaction_modes=workflow_data.get('interaction_modes', ['button']),
agent_ids=self._extract_agent_ids(workflow_data['definition']),
api_key_ids=workflow_data.get('api_key_ids', []),
webhook_ids=workflow_data.get('webhook_ids', []),
dataset_ids=workflow_data.get('dataset_ids', []),
integration_ids=workflow_data.get('integration_ids', []),
config=workflow_data.get('config', {}),
timeout_seconds=workflow_data.get('timeout_seconds', 300),
max_retries=workflow_data.get('max_retries', 3)
)
# Use sync database operations
self.db.add(workflow)
self.db.commit()
self.db.refresh(workflow)
# Create triggers if specified
for trigger_config in workflow_data.get('triggers', []):
self.create_workflow_trigger(
workflow.id,
user_id,
tenant_id,
trigger_config
)
return workflow
def get_workflow(self, workflow_id: str, user_id: str) -> Optional[Workflow]:
"""Get a workflow by ID with user ownership validation"""
stmt = select(Workflow).where(
Workflow.id == workflow_id,
Workflow.user_id == user_id
)
result = self.db.execute(stmt)
return result.scalar_one_or_none()
def list_user_workflows(
self,
user_id: str,
tenant_id: str,
status: Optional[WorkflowStatus] = None
) -> List[Workflow]:
"""List all workflows for a user"""
stmt = select(Workflow).where(
Workflow.user_id == user_id,
Workflow.tenant_id == tenant_id
)
if status:
stmt = stmt.where(Workflow.status == status)
stmt = stmt.order_by(Workflow.updated_at.desc())
result = self.db.execute(stmt)
return result.scalars().all()
def update_workflow(
self,
workflow_id: str,
user_id: str,
updates: Dict[str, Any]
) -> Workflow:
"""Update a workflow with validation"""
# Get existing workflow
workflow = self.get_workflow(workflow_id, user_id)
if not workflow:
raise ValueError("Workflow not found or access denied")
# Validate definition if updated
if 'definition' in updates:
self._validate_workflow_definition(
updates['definition'],
user_id,
workflow.tenant_id
)
updates['agent_ids'] = self._extract_agent_ids(updates['definition'])
# Update workflow
stmt = update(Workflow).where(
Workflow.id == workflow_id,
Workflow.user_id == user_id
).values(**updates)
self.db.execute(stmt)
self.db.commit()
# Return updated workflow
return self.get_workflow(workflow_id, user_id)
def delete_workflow(self, workflow_id: str, user_id: str) -> bool:
"""Delete a workflow and its related data"""
# Verify ownership
workflow = self.get_workflow(workflow_id, user_id)
if not workflow:
return False
# Delete related records
self._cleanup_workflow_data(workflow_id)
# Delete workflow
stmt = delete(Workflow).where(
Workflow.id == workflow_id,
Workflow.user_id == user_id
)
result = self.db.execute(stmt)
self.db.commit()
return result.rowcount > 0
async def execute_workflow(
self,
workflow_id: str,
user_id: str,
input_data: Dict[str, Any],
trigger_type: str = "manual",
trigger_source: Optional[str] = None,
interaction_mode: str = "api"
) -> WorkflowExecution:
"""Execute a workflow with specified input"""
# Get and validate workflow
workflow = await self.get_workflow(workflow_id, user_id)
if not workflow:
raise ValueError("Workflow not found or access denied")
if workflow.status not in [WorkflowStatus.ACTIVE, WorkflowStatus.DRAFT]:
raise ValueError(f"Cannot execute workflow with status: {workflow.status}")
# Create execution record
execution = WorkflowExecution(
id=str(uuid.uuid4()),
workflow_id=workflow_id,
user_id=user_id,
tenant_id=workflow.tenant_id,
status="pending",
input_data=input_data,
trigger_type=trigger_type,
trigger_source=trigger_source,
interaction_mode=interaction_mode,
session_id=str(uuid.uuid4()) if interaction_mode == "chat" else None
)
self.db.add(execution)
await self.db.commit()
await self.db.refresh(execution)
# Execute workflow asynchronously (in real implementation, this would be a background task)
try:
execution_result = await self._execute_workflow_nodes(workflow, execution, input_data)
# Update execution with results
execution.status = "completed"
execution.output_data = execution_result.get('output', {})
execution.completed_at = datetime.utcnow()
execution.duration_ms = int((execution.completed_at - execution.started_at).total_seconds() * 1000)
execution.progress_percentage = 100
# Update workflow statistics
workflow.execution_count += 1
workflow.last_executed = datetime.utcnow()
workflow.total_tokens_used += execution_result.get('tokens_used', 0)
workflow.total_cost_cents += execution_result.get('cost_cents', 0)
except Exception as e:
# Mark execution as failed
execution.status = "failed"
execution.error_details = str(e)
execution.completed_at = datetime.utcnow()
execution.duration_ms = int((execution.completed_at - execution.started_at).total_seconds() * 1000)
await self.db.commit()
return execution
async def get_execution_status(
self,
execution_id: str,
user_id: str
) -> Optional[WorkflowExecution]:
"""Get execution status with user validation"""
stmt = select(WorkflowExecution).where(
WorkflowExecution.id == execution_id,
WorkflowExecution.user_id == user_id
)
result = await self.db.execute(stmt)
return result.scalar_one_or_none()
def create_workflow_trigger(
self,
workflow_id: str,
user_id: str,
tenant_id: str,
trigger_config: Dict[str, Any]
) -> WorkflowTrigger:
"""Create a trigger for a workflow"""
trigger = WorkflowTrigger(
id=str(uuid.uuid4()),
workflow_id=workflow_id,
user_id=user_id,
tenant_id=tenant_id,
trigger_type=trigger_config['type'],
trigger_config=trigger_config
)
# Configure trigger-specific settings
if trigger_config['type'] == 'webhook':
trigger.webhook_url = f"https://api.gt2.com/webhooks/{trigger.id}"
trigger.webhook_secret = str(uuid.uuid4())
elif trigger_config['type'] == 'cron':
trigger.cron_schedule = trigger_config.get('schedule', '0 0 * * *')
trigger.timezone = trigger_config.get('timezone', 'UTC')
elif trigger_config['type'] == 'event':
trigger.event_source = trigger_config.get('source', '')
trigger.event_filters = trigger_config.get('filters', {})
self.db.add(trigger)
self.db.commit()
self.db.refresh(trigger)
return trigger
async def create_chat_session(
self,
workflow_id: str,
user_id: str,
tenant_id: str,
session_config: Optional[Dict[str, Any]] = None
) -> WorkflowSession:
"""Create a chat session for workflow interaction"""
session = WorkflowSession(
id=str(uuid.uuid4()),
workflow_id=workflow_id,
user_id=user_id,
tenant_id=tenant_id,
session_type="chat",
session_state=session_config or {},
expires_at=datetime.utcnow() + timedelta(hours=24) # 24 hour session
)
self.db.add(session)
await self.db.commit()
await self.db.refresh(session)
return session
async def add_chat_message(
self,
session_id: str,
user_id: str,
role: str,
content: str,
agent_id: Optional[str] = None,
confidence_score: Optional[int] = None,
execution_id: Optional[str] = None
) -> WorkflowMessage:
"""Add a message to a chat session"""
# Get session and validate
stmt = select(WorkflowSession).where(
WorkflowSession.id == session_id,
WorkflowSession.user_id == user_id,
WorkflowSession.is_active == True
)
session = await self.db.execute(stmt)
session = session.scalar_one_or_none()
if not session:
raise ValueError("Chat session not found or expired")
message = WorkflowMessage(
id=str(uuid.uuid4()),
session_id=session_id,
workflow_id=session.workflow_id,
execution_id=execution_id,
user_id=user_id,
tenant_id=session.tenant_id,
role=role,
content=content,
agent_id=agent_id,
confidence_score=confidence_score
)
self.db.add(message)
# Update session
session.message_count += 1
session.last_message_at = datetime.utcnow()
await self.db.commit()
await self.db.refresh(message)
return message
def _validate_workflow_definition(
self,
definition: Dict[str, Any],
user_id: str,
tenant_id: str
):
"""Validate workflow definition and resource access"""
nodes = definition.get('nodes', [])
edges = definition.get('edges', [])
# Validate nodes
for node in nodes:
node_type = node.get('type')
if node_type not in WORKFLOW_NODE_TYPES:
raise WorkflowValidationError(f"Invalid node type: {node_type}")
# Validate Agent nodes (support both agent and agent types)
if node_type == 'agent' or node_type == 'agent':
agent_id = node.get('data', {}).get('agent_id') or node.get('data', {}).get('agent_id')
if not agent_id:
raise WorkflowValidationError("Agent node missing agent_id or agent_id")
# Verify user owns the agent
agent = self._get_user_agent(agent_id, user_id, tenant_id)
if not agent:
raise WorkflowValidationError(f"Agent {agent_id} not found or access denied")
# Validate Integration nodes
elif node_type == 'integration':
api_key_id = node.get('data', {}).get('api_key_id')
if api_key_id:
# In real implementation, validate API key ownership
pass
# Validate edges (connections between nodes)
node_ids = {node['id'] for node in nodes}
for edge in edges:
source = edge.get('source')
target = edge.get('target')
if source not in node_ids or target not in node_ids:
raise WorkflowValidationError("Invalid edge connection")
# Ensure workflow has at least one trigger node
trigger_nodes = [n for n in nodes if n.get('type') == 'trigger']
if not trigger_nodes:
raise WorkflowValidationError("Workflow must have at least one trigger node")
def _get_user_agent(
self,
agent_id: str,
user_id: str,
tenant_id: str
) -> Optional[Agent]:
"""Get agent with ownership validation"""
stmt = select(Agent).where(
Agent.id == agent_id,
Agent.user_id == user_id,
Agent.tenant_id == tenant_id
)
result = self.db.execute(stmt)
return result.scalar_one_or_none()
# Backward compatibility method
def _get_user_assistant(
self,
agent_id: str,
user_id: str,
tenant_id: str
) -> Optional[Agent]:
"""Backward compatibility wrapper for _get_user_agent"""
return self._get_user_agent(agent_id, user_id, tenant_id)
def _extract_agent_ids(self, definition: Dict[str, Any]) -> List[str]:
"""Extract agent IDs from workflow definition"""
agent_ids = []
for node in definition.get('nodes', []):
if node.get('type') in ['agent', 'agent']:
agent_id = node.get('data', {}).get('agent_id') or node.get('data', {}).get('agent_id')
if agent_id:
agent_ids.append(agent_id)
return agent_ids
# Backward compatibility method
def _extract_agent_ids(self, definition: Dict[str, Any]) -> List[str]:
"""Backward compatibility wrapper for _extract_agent_ids"""
return self._extract_agent_ids(definition)
async def _execute_workflow_nodes(
self,
workflow: Workflow,
execution: WorkflowExecution,
input_data: Dict[str, Any]
) -> Dict[str, Any]:
"""Execute workflow nodes in order"""
# Update execution status
execution.status = "running"
execution.progress_percentage = 10
await self.db.commit()
# Parse workflow definition to create execution graph
definition = workflow.definition
nodes = definition.get('nodes', [])
edges = definition.get('edges', [])
if not nodes:
raise ValueError("Workflow has no nodes to execute")
# Find trigger node to start execution
trigger_nodes = [n for n in nodes if n.get('type') == 'trigger']
if not trigger_nodes:
raise ValueError("Workflow has no trigger nodes")
execution_trace = []
total_tokens = 0
total_cost = 0
current_data = input_data
# Execute nodes in simple sequential order (real implementation would use topological sort)
for node in nodes:
node_id = node.get('id')
node_type = node.get('type')
try:
if node_type == 'trigger':
# Trigger nodes just pass through input data
node_result = {
'output': current_data,
'tokens_used': 0,
'cost_cents': 0
}
elif node_type == 'agent' or node_type == 'agent': # Support both for compatibility
# Execute Agent node via resource cluster
node_result = await self._execute_agent_node_real(node, current_data, execution.user_id, execution.tenant_id)
elif node_type == 'integration':
# Execute integration node (simulated - no external connections)
node_result = await self._execute_integration_node_simulated(node, current_data)
elif node_type == 'logic':
# Execute logic node (real logic operations)
node_result = await self._execute_logic_node_simulated(node, current_data)
elif node_type == 'output':
# Execute output node (simulated - no external deliveries)
node_result = await self._execute_output_node_simulated(node, current_data)
else:
raise ValueError(f"Unknown node type: {node_type}")
# Update execution state
current_data = node_result.get('output', current_data)
total_tokens += node_result.get('tokens_used', 0)
total_cost += node_result.get('cost_cents', 0)
execution_trace.append({
'node_id': node_id,
'node_type': node_type,
'status': 'completed',
'timestamp': datetime.utcnow().isoformat(),
'tokens_used': node_result.get('tokens_used', 0),
'cost_cents': node_result.get('cost_cents', 0),
'execution_time_ms': node_result.get('execution_time_ms', 0)
})
except Exception as e:
# Record failed node execution
execution_trace.append({
'node_id': node_id,
'node_type': node_type,
'status': 'failed',
'timestamp': datetime.utcnow().isoformat(),
'error': str(e)
})
raise ValueError(f"Node {node_id} execution failed: {str(e)}")
return {
'output': current_data,
'tokens_used': total_tokens,
'cost_cents': total_cost,
'execution_trace': execution_trace
}
async def _execute_agent_node_real(
self,
node: Dict[str, Any],
input_data: Dict[str, Any],
user_id: str,
tenant_id: str
) -> Dict[str, Any]:
"""Execute an Agent node with real Agent integration"""
# Support both agent_id and agent_id for backward compatibility
agent_id = node.get('data', {}).get('agent_id') or node.get('data', {}).get('agent_id')
if not agent_id:
raise ValueError("Agent node missing agent_id or agent_id")
# Get Agent configuration
agent = await self._get_user_agent(agent_id, user_id, tenant_id)
if not agent:
raise ValueError(f"Agent {agent_id} not found")
# Prepare input text from workflow data
input_text = input_data.get('message', '') or str(input_data)
# Use the existing conversation service for real execution
from app.services.conversation_service import ConversationService
try:
conversation_service = ConversationService(self.db)
# Create or get conversation for this workflow execution
conversation_id = f"workflow-{agent_id}-{datetime.utcnow().isoformat()}"
# Execute agent with real conversation service (using agent_id for backward compatibility)
response = await conversation_service.send_message(
conversation_id=conversation_id,
user_id=user_id,
tenant_id=tenant_id,
content=input_text,
agent_id=agent_id # ConversationService still expects agent_id parameter
)
# Parse response to extract metrics
tokens_used = response.get('tokens_used', 100) # Default estimate
cost_cents = max(1, tokens_used // 50) # Rough cost estimation
return {
'output': response.get('content', 'Agent response'),
'confidence': node.get('data', {}).get('confidence_threshold', 75),
'tokens_used': tokens_used,
'cost_cents': cost_cents,
'execution_time_ms': response.get('response_time_ms', 1000)
}
except Exception as e:
# If conversation service fails, use basic text processing
return {
'output': f"Agent {agent.name} processed: {input_text[:100]}{'...' if len(input_text) > 100 else ''}",
'confidence': 50, # Lower confidence for fallback
'tokens_used': len(input_text.split()) * 2, # Rough token estimate
'cost_cents': 2,
'execution_time_ms': 500
}
async def _execute_integration_node_simulated(
self,
node: Dict[str, Any],
input_data: Dict[str, Any]
) -> Dict[str, Any]:
"""Execute an Integration node with simulated responses (no external connections)"""
integration_type = node.get('data', {}).get('integration_type', 'api')
integration_name = node.get('data', {}).get('name', 'Unknown Integration')
# Simulate processing time based on integration type
import asyncio
processing_times = {
'api': 200, # API calls: 200ms
'webhook': 150, # Webhook: 150ms
'database': 300, # Database: 300ms
'email': 500, # Email: 500ms
'storage': 250 # Storage: 250ms
}
processing_time = processing_times.get(integration_type, 200)
await asyncio.sleep(processing_time / 1000) # Convert to seconds
# Generate realistic simulated responses based on integration type
simulated_responses = {
'api': {
'status': 'success',
'data': {
'response_code': 200,
'message': f'API call to {integration_name} completed successfully',
'processed_items': len(str(input_data)) // 10,
'timestamp': datetime.utcnow().isoformat()
}
},
'webhook': {
'status': 'delivered',
'webhook_id': f'wh_{uuid.uuid4().hex[:8]}',
'delivery_time_ms': processing_time,
'response_code': 200
},
'database': {
'status': 'executed',
'affected_rows': 1,
'query_time_ms': processing_time,
'result_count': 1
},
'email': {
'status': 'sent',
'message_id': f'msg_{uuid.uuid4().hex[:12]}',
'recipients': 1,
'delivery_status': 'queued'
},
'storage': {
'status': 'uploaded',
'file_size_bytes': len(str(input_data)),
'storage_path': f'/simulated/path/{uuid.uuid4().hex[:8]}.json',
'etag': f'etag_{uuid.uuid4().hex[:16]}'
}
}
response_data = simulated_responses.get(integration_type, simulated_responses['api'])
return {
'output': response_data,
'simulated': True, # Mark as simulated
'integration_type': integration_type,
'integration_name': integration_name,
'tokens_used': 0, # Integrations don't use AI tokens
'cost_cents': 1, # Minimal cost for simulation
'execution_time_ms': processing_time,
'log_message': f'Integration {integration_name} simulated - external connections not implemented'
}
async def _execute_logic_node_simulated(
self,
node: Dict[str, Any],
input_data: Dict[str, Any]
) -> Dict[str, Any]:
"""Execute a Logic node with real logic operations"""
logic_type = node.get('data', {}).get('logic_type', 'transform')
logic_config = node.get('data', {}).get('logic_config', {})
import asyncio
await asyncio.sleep(0.05) # Small processing delay
if logic_type == 'condition':
# Simple condition evaluation
condition = logic_config.get('condition', 'true')
field = logic_config.get('field', 'message')
operator = logic_config.get('operator', 'contains')
value = logic_config.get('value', '')
input_value = str(input_data.get(field, ''))
if operator == 'contains':
result = value.lower() in input_value.lower()
elif operator == 'equals':
result = input_value.lower() == value.lower()
elif operator == 'length_gt':
result = len(input_value) > int(value)
else:
result = True # Default to true
return {
'output': {
**input_data,
'condition_result': result,
'condition_evaluated': f'{field} {operator} {value}'
},
'tokens_used': 0,
'cost_cents': 0,
'execution_time_ms': 50
}
elif logic_type == 'transform':
# Data transformation
transform_rules = logic_config.get('rules', [])
transformed_data = dict(input_data)
# Apply simple transformations
for rule in transform_rules:
source_field = rule.get('source', '')
target_field = rule.get('target', source_field)
operation = rule.get('operation', 'copy')
if source_field in input_data:
value = input_data[source_field]
if operation == 'uppercase':
transformed_data[target_field] = str(value).upper()
elif operation == 'lowercase':
transformed_data[target_field] = str(value).lower()
elif operation == 'length':
transformed_data[target_field] = len(str(value))
else: # copy
transformed_data[target_field] = value
return {
'output': transformed_data,
'tokens_used': 0,
'cost_cents': 0,
'execution_time_ms': 50
}
elif logic_type == 'aggregate':
# Simple aggregation
aggregate_field = logic_config.get('field', 'items')
operation = logic_config.get('operation', 'count')
items = input_data.get(aggregate_field, [])
if not isinstance(items, list):
items = [items]
if operation == 'count':
result = len(items)
elif operation == 'sum' and all(isinstance(x, (int, float)) for x in items):
result = sum(items)
elif operation == 'average' and all(isinstance(x, (int, float)) for x in items):
result = sum(items) / len(items) if items else 0
else:
result = len(items)
return {
'output': {
**input_data,
f'{operation}_result': result,
f'{operation}_field': aggregate_field
},
'tokens_used': 0,
'cost_cents': 0,
'execution_time_ms': 50
}
else:
# Default passthrough
return {
'output': input_data,
'tokens_used': 0,
'cost_cents': 0,
'execution_time_ms': 50
}
async def _execute_output_node_simulated(
self,
node: Dict[str, Any],
input_data: Dict[str, Any]
) -> Dict[str, Any]:
"""Execute an Output node with simulated delivery (no external sends)"""
output_type = node.get('data', {}).get('output_type', 'webhook')
output_config = node.get('data', {}).get('output_config', {})
output_name = node.get('data', {}).get('name', 'Unknown Output')
import asyncio
# Simulate delivery time based on output type
delivery_times = {
'webhook': 300, # Webhook delivery: 300ms
'email': 800, # Email sending: 800ms
'api': 250, # API call: 250ms
'storage': 400, # File storage: 400ms
'notification': 200 # Push notification: 200ms
}
delivery_time = delivery_times.get(output_type, 300)
await asyncio.sleep(delivery_time / 1000)
# Generate realistic simulated delivery responses
simulated_deliveries = {
'webhook': {
'status': 'delivered',
'webhook_url': output_config.get('url', 'https://api.example.com/webhook'),
'response_code': 200,
'delivery_id': f'wh_delivery_{uuid.uuid4().hex[:8]}',
'payload_size_bytes': len(str(input_data))
},
'email': {
'status': 'queued',
'recipients': output_config.get('recipients', ['user@example.com']),
'subject': output_config.get('subject', 'Workflow Output'),
'message_id': f'email_{uuid.uuid4().hex[:12]}',
'provider': 'simulated_smtp'
},
'api': {
'status': 'sent',
'endpoint': output_config.get('endpoint', '/api/results'),
'method': output_config.get('method', 'POST'),
'response_code': 201,
'request_id': f'api_{uuid.uuid4().hex[:8]}'
},
'storage': {
'status': 'stored',
'storage_path': f'/outputs/{uuid.uuid4().hex[:8]}.json',
'file_size_bytes': len(str(input_data)),
'checksum': f'sha256_{uuid.uuid4().hex[:16]}'
},
'notification': {
'status': 'pushed',
'devices': output_config.get('devices', 1),
'message': output_config.get('message', 'Workflow completed'),
'notification_id': f'notif_{uuid.uuid4().hex[:8]}'
}
}
delivery_data = simulated_deliveries.get(output_type, simulated_deliveries['webhook'])
return {
'output': {
**input_data,
'delivery_result': delivery_data,
'output_type': output_type,
'output_name': output_name
},
'simulated': True,
'tokens_used': 0,
'cost_cents': 1, # Minimal cost for output
'execution_time_ms': delivery_time,
'log_message': f'Output {output_name} simulated - external delivery not implemented'
}
async def _execute_logic_node_real(
self,
node: Dict[str, Any],
input_data: Dict[str, Any]
) -> Dict[str, Any]:
"""Execute a Logic node with actual data processing"""
logic_type = node.get('data', {}).get('logic_type')
if logic_type == 'decision':
condition = node.get('data', {}).get('config', {}).get('condition', 'true')
# Simple condition evaluation (in production would use safe expression evaluator)
try:
# Basic condition evaluation for common cases
if 'input.value' in condition:
input_value = input_data.get('value', 0)
condition_result = eval(condition.replace('input.value', str(input_value)))
else:
condition_result = True # Default to true for undefined conditions
return {
'output': {
'condition_result': condition_result,
'original_data': input_data,
'branch': 'true' if condition_result else 'false'
},
'tokens_used': 0,
'cost_cents': 0,
'execution_time_ms': 50
}
except:
# Fallback to pass-through if condition evaluation fails
return {
'output': input_data,
'tokens_used': 0,
'cost_cents': 0,
'execution_time_ms': 50
}
elif logic_type == 'transform':
# Simple data transformation
return {
'output': {
'transformed_data': input_data,
'transformation_type': 'basic',
'timestamp': datetime.utcnow().isoformat()
},
'tokens_used': 0,
'cost_cents': 0,
'execution_time_ms': 25
}
# Default: pass through data
return {
'output': input_data,
'tokens_used': 0,
'cost_cents': 0,
'execution_time_ms': 25
}
async def _execute_output_node_real(
self,
node: Dict[str, Any],
input_data: Dict[str, Any]
) -> Dict[str, Any]:
"""Execute an Output node with actual output delivery"""
output_type = node.get('data', {}).get('output_type')
if output_type == 'webhook':
webhook_url = node.get('data', {}).get('config', {}).get('url')
if webhook_url:
import httpx
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(webhook_url, json=input_data)
return {
'output': {
'webhook_sent': True,
'status_code': response.status_code,
'response': response.text[:500] # Limit response size
},
'tokens_used': 0,
'cost_cents': 0,
'execution_time_ms': 200
}
except Exception as e:
return {
'output': {
'webhook_sent': False,
'error': str(e)
},
'tokens_used': 0,
'cost_cents': 0,
'execution_time_ms': 100
}
# For other output types, simulate delivery
return {
'output': {
'output_type': output_type,
'delivered': True,
'data_sent': input_data
},
'tokens_used': 0,
'cost_cents': 0,
'execution_time_ms': 50
}
def _cleanup_workflow_data(self, workflow_id: str):
"""Clean up all data related to a workflow"""
# Delete triggers
stmt = delete(WorkflowTrigger).where(WorkflowTrigger.workflow_id == workflow_id)
self.db.execute(stmt)
# Delete sessions and messages
stmt = delete(WorkflowMessage).where(WorkflowMessage.workflow_id == workflow_id)
self.db.execute(stmt)
stmt = delete(WorkflowSession).where(WorkflowSession.workflow_id == workflow_id)
self.db.execute(stmt)
# Delete executions
stmt = delete(WorkflowExecution).where(WorkflowExecution.workflow_id == workflow_id)
self.db.execute(stmt)
self.db.commit()
# WorkflowExecutor functionality integrated directly into WorkflowService
# for better cohesion and to avoid mock implementations