import asyncio
import html
import json
import re
import time
from typing import Any, Literal
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
from pydantic import BaseModel, Field
QUERY_PATH = "/v2/sandp_500/query"
DEFAULT_API_BASE = "https://api-beta.ai-baseline.xyz"
DEFAULT_USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36"
)
VALID_MODES = {"basic", "research", "agentic_research"}
VALID_EFFORTS = {"low", "medium", "high"}
MAX_ERROR_DETAIL_CHARS = 4000
FAILURE_STATUS_DESCRIPTION = "Query failed. Details were printed in chat."
class ToolInputError(ValueError):
"""Invalid user or valve input for the OpenWebUI tool."""
class ToolExecutionError(RuntimeError):
"""Expected runtime failure while calling the AI Baseline API."""
def normalize_query(query: str) -> str:
if not isinstance(query, str):
raise ToolInputError("query must be a string.")
normalized_query = " ".join(query.split())
if not normalized_query:
raise ToolInputError("query must be non-empty.")
return normalized_query
def normalize_choice(
value: str | None,
*,
default: str,
allowed: set[str],
field_name: str,
) -> str:
selected_value = default if value is None or not str(value).strip() else str(value)
normalized_value = selected_value.strip().lower()
if normalized_value not in allowed:
allowed_values = ", ".join(sorted(allowed))
raise ToolInputError(f"{field_name} must be one of: {allowed_values}.")
return normalized_value
def build_api_url(api_base: str) -> str:
normalized_base = str(api_base or DEFAULT_API_BASE).strip().rstrip("/")
return f"{normalized_base}{QUERY_PATH}"
def build_research_payload(query: str, mode: str, effort: str) -> dict[str, Any]:
return {
"query": normalize_query(query),
"mode": normalize_choice(
mode,
default="research",
allowed=VALID_MODES,
field_name="mode",
),
"effort": normalize_choice(
effort,
default="medium",
allowed=VALID_EFFORTS,
field_name="effort",
),
"stream": False,
"include": {
"answer": False,
"evidence": False,
"evidence_instructions": False,
"agent_context": True,
"evidence_format": "rendered",
"metadata_dashboard": True,
"summary": True,
},
}
def redact_secrets(text: Any, secrets: list[str]) -> str:
redacted_text = str(text)
for secret in secrets:
secret_value = str(secret or "")
if len(secret_value) >= 4:
redacted_text = redacted_text.replace(secret_value, "[redacted]")
return redacted_text
def ensure_text_code_fence(text: Any) -> str:
value = str(text or "").strip()
if value.startswith("```text"):
return f"{value}\n\n"
return f"```text\n{value}\n```\n\n"
def extract_model_context(api_response: dict[str, Any]) -> str:
agent_context = api_response.get("agent_context")
if isinstance(agent_context, dict):
text = str(agent_context.get("text") or "").strip()
if text:
return text
warnings = []
summary = api_response.get("summary")
if isinstance(summary, dict):
raw_warnings = summary.get("warnings")
if isinstance(raw_warnings, list):
warnings = [str(warning) for warning in raw_warnings if str(warning).strip()]
if warnings:
return "No data found.\n\nWarnings: " + ", ".join(warnings)
return "No data found."
def _compact_error_detail(detail: Any) -> str:
if isinstance(detail, dict):
message = detail.get("message")
code = detail.get("code")
detail_payload = detail.get("detail")
parts = [str(value) for value in (code, message, detail_payload) if value]
detail_text = " | ".join(parts)
else:
detail_text = str(detail or "")
normalized_detail = " ".join(detail_text.split())
if len(normalized_detail) <= MAX_ERROR_DETAIL_CHARS:
return normalized_detail
return f"{normalized_detail[:MAX_ERROR_DETAIL_CHARS].rstrip()}..."
def _plain_text_error_body(response_body: str) -> str:
without_scripts = re.sub(
r"<(script|style)\b[^>]*>.*?</\1>",
" ",
response_body,
flags=re.IGNORECASE | re.DOTALL,
)
without_tags = re.sub(r"<[^>]+>", " ", without_scripts)
return html.unescape(without_tags)
def _decode_http_error_detail(exc: HTTPError) -> str:
try:
charset = exc.headers.get_content_charset() if exc.headers else None
response_body = exc.read().decode(charset or "utf-8", errors="replace")
except (OSError, UnicodeError):
return ""
if not response_body.strip():
return ""
try:
parsed_body = json.loads(response_body)
except json.JSONDecodeError:
return _compact_error_detail(_plain_text_error_body(response_body))
return _compact_error_detail(parsed_body)
def _http_status_error(status: int, detail: str = "") -> ToolExecutionError:
detail_suffix = f" Detail: {detail}" if detail else ""
if status == 401:
return ToolExecutionError(
"AI Baseline API key was rejected. Update API_KEY in the OpenWebUI tool valves."
+ detail_suffix
)
if status == 403:
return ToolExecutionError(
"AI Baseline API returned HTTP 403. Check API_KEY, API_BASE, and USER_AGENT."
+ detail_suffix
)
if status == 429:
return ToolExecutionError(
"AI Baseline API rate limit exceeded. Wait and try again." + detail_suffix
)
if status >= 500:
return ToolExecutionError(
f"AI Baseline API is temporarily unavailable (HTTP {status})." + detail_suffix
)
return ToolExecutionError(f"AI Baseline API request failed (HTTP {status}).{detail_suffix}")
def _post_json(
url: str,
payload: dict[str, Any],
headers: dict[str, str],
timeout_seconds: float,
) -> dict[str, Any]:
request = Request(
url,
data=json.dumps(payload).encode("utf-8"),
headers=headers,
method="POST",
)
try:
with urlopen(request, timeout=timeout_seconds) as response:
status = response.status
if status < 200 or status >= 300:
raise _http_status_error(status)
charset = response.headers.get_content_charset() or "utf-8"
response_body = response.read().decode(charset, errors="replace")
except HTTPError as exc:
raise _http_status_error(exc.code, _decode_http_error_detail(exc)) from exc
except TimeoutError as exc:
raise ToolExecutionError("AI Baseline API request timed out.") from exc
except URLError as exc:
raise ToolExecutionError(f"Could not reach AI Baseline API: {exc.reason}") from exc
try:
parsed_response = json.loads(response_body)
except json.JSONDecodeError as exc:
raise ToolExecutionError("AI Baseline API returned invalid JSON.") from exc
if not isinstance(parsed_response, dict):
raise ToolExecutionError("AI Baseline API returned an unexpected JSON payload.")
return parsed_response
async def _emit(event_emitter: Any, event_type: str, data: dict[str, Any]) -> None:
if event_emitter is None:
return
await event_emitter({"type": event_type, "data": data})
async def _emit_failure(event_emitter: Any, message: str) -> None:
await _emit(
event_emitter,
"status",
{"description": FAILURE_STATUS_DESCRIPTION, "done": True, "hidden": False},
)
await _emit(
event_emitter,
"message",
{"content": f"Error calling AI Baseline:\n\n{ensure_text_code_fence(message)}"},
)
class Tools:
class Valves(BaseModel):
API_BASE: str = Field(
default=DEFAULT_API_BASE,
description="Default: https://api-beta.ai-baseline.xyz.",
)
API_KEY: str = Field(
default="",
description="Required. Sent only as the X-API-Key header.",
)
DEFAULT_MODE: str = Field(
default="research",
description="Default: research. Used when a tool call omits mode.",
)
DEFAULT_EFFORT: str = Field(
default="medium",
description="Default: medium. Used when a tool call omits effort.",
)
REQUEST_TIMEOUT_SECONDS: float = Field(
default=120,
description="Default: 120 seconds.",
)
USER_AGENT: str = Field(
default=DEFAULT_USER_AGENT,
description="Default: Chrome-like browser User-Agent.",
)
def __init__(self):
self.valves = self.Valves()
async def search_sandp500_disclosures(
self,
query: str,
mode: Literal["basic", "research", "agentic_research"] | None = None,
effort: Literal["low", "medium", "high"] | None = None,
__event_emitter__=None,
) -> str:
"""
Search AI Baseline's S&P 500 company-disclosure graph through the /v2 API.
Use this tool for evidence-backed questions about S&P 500 companies, SEC
filings, disclosures, risk factors, supply chains, management discussion,
strategy, business segments, financial-reporting context, or comparisons
across companies.
CRITICAL: Pass a self-contained query. Resolve pronouns and follow-up
references from the chat history, but do not add unprompted SEC jargon,
filing types, company names, topics, or constraints that the user did not
ask for.
:param query: Standalone natural-language disclosure research question.
:param mode: Optional retrieval mode. Use "basic" for fast direct
retrieval, "research" by default for normal metadata-aware disclosure
research, and "agentic_research" for comparisons, broad multi-part
questions, or comprehensive reports.
:param effort: Optional retrieval depth. Use "low" for quick checks,
"medium" by default, and "high" when the user asks for deeper, broader,
more exhaustive, or comparative research.
:return: Model-ready agent_context text returned by the AI Baseline API.
"""
api_key = str(self.valves.API_KEY or "").strip()
secrets = [api_key]
try:
if not api_key:
raise ToolExecutionError(
"AI Baseline API key is missing. Set API_KEY in the OpenWebUI tool valves."
)
selected_mode = normalize_choice(
mode,
default=self.valves.DEFAULT_MODE,
allowed=VALID_MODES,
field_name="mode",
)
selected_effort = normalize_choice(
effort,
default=self.valves.DEFAULT_EFFORT,
allowed=VALID_EFFORTS,
field_name="effort",
)
payload = build_research_payload(query, selected_mode, selected_effort)
user_agent = str(self.valves.USER_AGENT or DEFAULT_USER_AGENT).strip()
headers = {
"Accept": "application/json",
"Accept-Language": "en-US,en;q=0.9",
"Content-Type": "application/json",
"User-Agent": user_agent or DEFAULT_USER_AGENT,
"X-API-Key": api_key,
}
await _emit(
__event_emitter__,
"status",
{
"description": "Querying AI Baseline S&P 500 disclosure graph...",
"done": False,
"hidden": False,
},
)
start_time = time.perf_counter()
api_response = await asyncio.to_thread(
_post_json,
build_api_url(self.valves.API_BASE),
payload,
headers,
float(self.valves.REQUEST_TIMEOUT_SECONDS),
)
except (ToolInputError, ToolExecutionError) as exc:
message = redact_secrets(exc, secrets)
await _emit_failure(__event_emitter__, message)
return f"Error calling AI Baseline: {message}"
except Exception as exc:
message = redact_secrets(exc, secrets)
diagnostic = f"Unexpected OpenWebUI tool failure: {message}"
await _emit_failure(__event_emitter__, diagnostic)
return f"Error calling AI Baseline: {diagnostic}"
elapsed_time = time.perf_counter() - start_time
await _emit(
__event_emitter__,
"status",
{
"description": f"Query complete after {elapsed_time:.2f} seconds",
"done": True,
"hidden": False,
},
)
metadata_dashboard = api_response.get("metadata_dashboard")
if metadata_dashboard:
await _emit(
__event_emitter__,
"message",
{"content": ensure_text_code_fence(metadata_dashboard)},
)
await _emit(
__event_emitter__,
"status",
{
"description": "Generating answer from retrieved context...",
"done": True,
"hidden": False,
},
)
return extract_model_context(api_response)