Temporal Workflows
Torale uses Temporal Cloud for reliable task scheduling and execution.
Why Temporal?
- Durable execution - Survives failures and restarts
- Automatic retries - Configurable retry policies
- Cron scheduling - Native support for periodic execution
- Observability - Full execution history and debugging
Workflow Architecture
Task Creation → Temporal Schedule → Workflow Execution → Activity → ResultMonitoring Workflow
python
@workflow.defn
class MonitoringWorkflow:
@workflow.run
async def run(self, task_config: TaskConfig) -> dict:
# Fetch task
task = await workflow.execute_activity(
get_task,
task_config.task_id,
start_to_close_timeout=timedelta(seconds=30)
)
# Execute grounded search
result = await workflow.execute_activity(
execute_grounded_search,
task,
start_to_close_timeout=timedelta(seconds=60)
)
# Compare with previous state
has_changed = await workflow.execute_activity(
compare_states,
task.id,
result,
start_to_close_timeout=timedelta(seconds=10)
)
# Notify if needed
if result.condition_met and has_changed:
await workflow.execute_activity(
send_notification,
task.user_id,
result,
start_to_close_timeout=timedelta(seconds=30)
)
return resultSchedule Management
Creating Schedules
python
async def create_task_schedule(task_id: str, cron: str):
schedule = await temporal_client.create_schedule(
id=f"task-{task_id}",
schedule=Schedule(
spec=ScheduleSpec(
cron_expressions=[cron]
),
action=ScheduleActionStartWorkflow(
MonitoringWorkflow.run,
TaskConfig(task_id=task_id),
id=f"task-{task_id}-{uuid4()}",
task_queue="torale-workers"
)
)
)Updating Schedules
python
async def update_task_schedule(task_id: str, new_cron: str):
handle = temporal_client.get_schedule_handle(f"task-{task_id}")
await handle.update(
lambda schedule: Schedule(
**{**schedule.__dict__, "spec": ScheduleSpec(cron_expressions=[new_cron])}
)
)Deleting Schedules
python
async def delete_task_schedule(task_id: str):
handle = temporal_client.get_schedule_handle(f"task-{task_id}")
await handle.delete()Retry Configuration
python
@activity.defn
async def execute_grounded_search(task: Task) -> dict:
# Activity automatically retries on failure
pass
# Retry policy defined at worker level
worker = Worker(
client,
task_queue="torale-workers",
workflows=[MonitoringWorkflow],
activities=[execute_grounded_search, send_notification],
activity_executor=ThreadPoolExecutor(max_workers=10),
)Default retry policy:
- Initial interval: 1 second
- Maximum interval: 30 seconds
- Maximum attempts: 3
- Backoff coefficient: 2.0
Temporal Cloud Configuration
python
TEMPORAL_HOST = "us-central1.gcp.api.temporal.io:7233"
TEMPORAL_NAMESPACE = "torale.abc123"
TEMPORAL_API_KEY = os.getenv("TEMPORAL_API_KEY")
client = await Client.connect(
TEMPORAL_HOST,
namespace=TEMPORAL_NAMESPACE,
tls=True,
rpc_metadata={"temporal-namespace": TEMPORAL_NAMESPACE},
api_key=TEMPORAL_API_KEY
)Monitoring
Temporal UI
Access workflows at: https://cloud.temporal.io/namespaces/torale.abc123/workflows
Features:
- View workflow history
- Inspect activity results
- Debug failures
- Retry failed workflows
Admin Endpoints
bash
# View recent workflows
GET /admin/temporal/workflows
# View active schedules
GET /admin/temporal/schedulesNext Steps
- Learn about State Tracking
- View System Overview
- Read Deployment Guide