Providers and Pipelines
Torale uses the Provider Pattern and Pipeline Architecture to separate concerns and enable flexible monitoring implementations.
Architecture Overview
Workflow (orchestrator)
↓
Activities (thin wrappers)
↓
Pipeline (coordinator)
↓
Providers (implementations)Provider Pattern
Providers abstract implementation details for different aspects of monitoring.
SchemaProvider
Generates task-specific extraction schemas:
class SchemaProvider(ABC):
async def generate_schema(self, task: dict) -> dict:
"""Generate extraction schema for task"""
pass
async def get_or_create_schema(self, task: dict) -> dict:
"""Get cached or generate new schema"""
passExample schema:
{
"release_date": {
"type": "date",
"description": "Official release date"
},
"confirmed": {
"type": "bool",
"description": "Whether officially confirmed"
}
}ExtractionProvider
Extracts structured data from search results:
class ExtractionProvider(ABC):
async def extract(self, search_result: dict, schema: dict) -> dict:
"""Extract data matching schema"""
passComparisonProvider
Compares states semantically:
class ComparisonProvider(ABC):
async def compare(
self,
previous_state: dict,
current_state: dict,
schema: dict
) -> dict:
"""Semantic comparison with StateChange result"""
passImplementation: Gemini Providers
Current implementation uses Gemini for all providers:
- GeminiSchemaProvider - Generates schemas using Gemini
- GeminiExtractionProvider - Extracts data with Gemini
- GeminiComparisonProvider - Semantic comparison with Gemini
- GeminiSearchProvider - Grounded search with temporal context
Future flexibility: Easy to swap providers (OpenAI, Claude, etc.)
Monitoring Pipeline
The pipeline coordinates execution using providers:
class MonitoringPipeline:
def __init__(
self,
schema_provider: SchemaProvider,
extraction_provider: ExtractionProvider,
comparison_provider: ComparisonProvider,
):
self.schema_provider = schema_provider
self.extraction_provider = extraction_provider
self.comparison_provider = comparison_provider
async def execute(
self,
task: dict,
search_result: dict,
previous_state: dict | None
) -> MonitoringResult:
# 1. Get or generate schema
schema = await self.schema_provider.get_or_create_schema(task)
# 2. Extract current state
current_state = await self.extraction_provider.extract(
search_result, schema
)
# 3. Fast hash pre-filter
if previous_state:
from torale.core.state_utils import compute_state_hash
prev_hash = compute_state_hash(previous_state)
curr_hash = compute_state_hash(current_state)
if prev_hash == curr_hash:
return MonitoringResult(changed=False)
# 4. Semantic comparison (LLM)
change = await self.comparison_provider.compare(
previous_state, current_state, schema
)
# 5. Generate summary
return MonitoringResult(
summary=self._generate_summary(search_result, change),
sources=search_result["sources"],
metadata={"changed": change["changed"]}
)Structured Agency Approach
The system uses structured agency - agents work with structure they design:
- Agent designs schema (one-time per task)
- Agent extracts to schema (each execution)
- Fast hash comparison (deterministic, no LLM)
- Agent semantic comparison (if hash differs)
This solves Issue #111 by using a single extraction source for all decisions.
State Hash Utility
Deterministic state hashing for fast comparison:
from torale.core.state_utils import compute_state_hash
# Fast pre-filter before expensive LLM comparison
prev_hash = compute_state_hash(previous_state)
curr_hash = compute_state_hash(current_state)
if prev_hash == curr_hash:
# Identical - skip LLM comparison
return StateChange(changed=False)Minimal Schema Results
Results use natural language summaries over rigid fields:
class MonitoringResult(BaseModel):
summary: str # Natural language for user
sources: list[dict] # Attribution
actions: list[str] # ["searched", "extracted", "compared"]
metadata: dict # Provider-specific dataExample result:
{
"summary": "**What Changed:** Release date announced: Sept 12, 2024. Now officially confirmed.\n\n**Latest Info:** Apple announced iPhone 16 will be released on September 12, 2024.",
"sources": [
{"url": "https://apple.com", "title": "apple.com"}
],
"actions": ["searched", "extracted", "compared"],
"metadata": {
"changed": true,
"current_state": {"release_date": "2024-09-12", "confirmed": true},
"comparison_method": "semantic"
}
}Benefits
- Swappable implementations - Change providers without touching workflow
- Clear separation - Workflow orchestrates, pipeline coordinates, providers implement
- Single extraction source - Prevents contradictory notifications
- Deterministic optimization - Hash pre-filter reduces LLM calls
- Semantic understanding - LLM handles "Sept" == "September"
- Natural communication - Summaries over booleans
Future: Autonomous Agents
The provider pattern supports future autonomous agents:
class AutonomousAgentProvider(MonitoringProvider):
async def execute_monitoring(self, task, context):
# Single agent call with tools
return await self.agent.run(
task=task,
tools=[SearchTool(), NotificationTool(), ComparisonTool()]
)The agent could handle search, comparison, and even send notifications autonomously.