First real-data export against v0.4.0 surfaced 66 unknown blocks across three content types — captured live and added. Added: - execution_output (Code Interpreter / container.exec / python tool output) → tool_result block. output=content.text, tool_name=author.name, is_error=metadata.aggregate_result.status, summary=metadata.reasoning_title - system_error → error tool_result with tool_name=author.name - tether_browsing_display: spinner placeholders (empty result+summary) skip silently with DEBUG log; defensive populated-case branch maps to tool_result (untested in real data) - tool_result block schema: optional `summary` field rendered as italic line between header and fence - tool_result rendering: tool_name appears in header when present (e.g. `📤 Result: container.exec`); existing tool_name=None calls unchanged - _ROLE_LABELS["tool"] = ("🔧 Tool", "tool") Fixed: - chatgpt.normalize_conversation reads `conversation_id` as fallback for `id`. Live API uses conversation_id; fixtures use id. Pre-fix: empty id in YAML frontmatter and missing context in WARNING logs. Tests: 11 new (192 total, 0 failures). Fixture extended with 4 tool-output cases (execution_output success, empty execution_output that should skip, system_error, tether_browsing_display spinner). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
644 lines
24 KiB
Python
644 lines
24 KiB
Python
"""Unit tests for src/providers/ using fixture files."""
|
|
|
|
import json
|
|
import logging
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from src.blocks import (
|
|
BLOCK_TYPE_FILE_PLACEHOLDER,
|
|
BLOCK_TYPE_HIDDEN_CONTEXT_MARKER,
|
|
BLOCK_TYPE_IMAGE_PLACEHOLDER,
|
|
BLOCK_TYPE_TEXT,
|
|
BLOCK_TYPE_THINKING,
|
|
BLOCK_TYPE_TOOL_RESULT,
|
|
BLOCK_TYPE_TOOL_USE,
|
|
BLOCK_TYPE_UNKNOWN,
|
|
render_blocks_to_markdown,
|
|
)
|
|
from src.loss_report import LossReport
|
|
|
|
FIXTURES = Path(__file__).parent / "fixtures"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _block_types(message: dict) -> list[str]:
|
|
return [b.get("type") for b in (message.get("blocks") or [])]
|
|
|
|
|
|
def _first_block(message: dict, block_type: str) -> dict | None:
|
|
for b in message.get("blocks") or []:
|
|
if b.get("type") == block_type:
|
|
return b
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# ChatGPT
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestChatGPTNormalization:
|
|
"""ChatGPT normalize_conversation block-extraction behavior."""
|
|
|
|
def _get_provider(self):
|
|
from src.providers.chatgpt import ChatGPTProvider
|
|
p = ChatGPTProvider.__new__(ChatGPTProvider)
|
|
import requests
|
|
p._session = requests.Session()
|
|
p._org_id = None
|
|
p._project_ids = []
|
|
p._project_map = {}
|
|
p._project_name_cache = {}
|
|
return p
|
|
|
|
def test_normalizes_conversation(self):
|
|
raw = json.loads((FIXTURES / "chatgpt_conversation.json").read_text())
|
|
p = self._get_provider()
|
|
result = p.normalize_conversation(raw)
|
|
|
|
assert result["id"] == "chatgpt-conv-001"
|
|
assert result["title"] == "Python Async Tutorial"
|
|
assert result["provider"] == "chatgpt"
|
|
assert result["project"] is None
|
|
assert result["created_at"] != ""
|
|
assert result["updated_at"] != ""
|
|
assert isinstance(result["messages"], list)
|
|
|
|
def test_normalizes_without_project(self):
|
|
raw = json.loads((FIXTURES / "chatgpt_no_project.json").read_text())
|
|
p = self._get_provider()
|
|
result = p.normalize_conversation(raw)
|
|
|
|
assert result["project"] is None
|
|
assert result["id"] == "chatgpt-conv-002"
|
|
|
|
def test_normalizes_with_project_from_map(self):
|
|
raw = json.loads((FIXTURES / "chatgpt_conversation.json").read_text())
|
|
p = self._get_provider()
|
|
p._project_map["chatgpt-conv-001"] = "My Research Project"
|
|
result = p.normalize_conversation(raw)
|
|
|
|
assert result["project"] == "My Research Project"
|
|
|
|
def test_text_message_emits_text_block(self):
|
|
raw = json.loads((FIXTURES / "chatgpt_conversation.json").read_text())
|
|
p = self._get_provider()
|
|
result = p.normalize_conversation(raw)
|
|
|
|
user_msgs = [m for m in result["messages"] if m["role"] == "user"]
|
|
# The "How does async/await..." message
|
|
async_msgs = [
|
|
m for m in user_msgs
|
|
if any(
|
|
"async" in (b.get("text") or "").lower()
|
|
for b in (m.get("blocks") or [])
|
|
)
|
|
]
|
|
assert async_msgs, "expected a user message about async/await"
|
|
assert _block_types(async_msgs[0]) == [BLOCK_TYPE_TEXT]
|
|
|
|
def test_code_block_preserved_with_language(self):
|
|
raw = json.loads((FIXTURES / "chatgpt_conversation.json").read_text())
|
|
p = self._get_provider()
|
|
result = p.normalize_conversation(raw)
|
|
|
|
assistant_msgs = [m for m in result["messages"] if m["role"] == "assistant"]
|
|
# The first assistant message is the async/await answer with a python fence
|
|
text_block = _first_block(assistant_msgs[0], BLOCK_TYPE_TEXT)
|
|
assert text_block is not None
|
|
assert "```python" in text_block["text"]
|
|
|
|
def test_multimodal_voice_user_message(self):
|
|
raw = json.loads((FIXTURES / "chatgpt_conversation.json").read_text())
|
|
p = self._get_provider()
|
|
result = p.normalize_conversation(raw)
|
|
|
|
# node-mm-user: audio_transcription "What is the capital of France?"
|
|
# + real_time_user_audio_video_asset_pointer wrapping a sediment:// URL
|
|
capital_msgs = [
|
|
m for m in result["messages"]
|
|
if any(
|
|
"capital of france" in (b.get("text") or "").lower()
|
|
for b in (m.get("blocks") or [])
|
|
)
|
|
]
|
|
assert capital_msgs, "expected the audio_transcription text to surface"
|
|
types = _block_types(capital_msgs[0])
|
|
assert BLOCK_TYPE_TEXT in types
|
|
assert BLOCK_TYPE_FILE_PLACEHOLDER in types
|
|
|
|
file_block = _first_block(capital_msgs[0], BLOCK_TYPE_FILE_PLACEHOLDER)
|
|
assert file_block["ref"].startswith("sediment://")
|
|
assert file_block["mime"] == "audio/wav"
|
|
assert file_block["size_bytes"] == 50000
|
|
assert file_block["duration_seconds"] == pytest.approx(2.5)
|
|
|
|
def test_multimodal_voice_reverse_order_preserved(self):
|
|
raw = json.loads((FIXTURES / "chatgpt_conversation.json").read_text())
|
|
p = self._get_provider()
|
|
result = p.normalize_conversation(raw)
|
|
|
|
# node-mm-user-rev has parts in REVERSE order: asset first, transcription second.
|
|
rev_msgs = [
|
|
m for m in result["messages"]
|
|
if any(
|
|
"tell me more" in (b.get("text") or "").lower()
|
|
for b in (m.get("blocks") or [])
|
|
)
|
|
]
|
|
assert rev_msgs, "expected the reverse-order voice message"
|
|
types = _block_types(rev_msgs[0])
|
|
# Order preserved: file_placeholder before text
|
|
assert types == [BLOCK_TYPE_FILE_PLACEHOLDER, BLOCK_TYPE_TEXT]
|
|
|
|
def test_image_only_user_message_renders(self):
|
|
raw = json.loads((FIXTURES / "chatgpt_conversation.json").read_text())
|
|
p = self._get_provider()
|
|
result = p.normalize_conversation(raw)
|
|
|
|
image_msgs = [
|
|
m for m in result["messages"]
|
|
if any(b.get("type") == BLOCK_TYPE_IMAGE_PLACEHOLDER for b in (m.get("blocks") or []))
|
|
]
|
|
assert image_msgs, "image-only user message should now render"
|
|
|
|
def test_user_editable_context_emits_blocks(self):
|
|
raw = json.loads((FIXTURES / "chatgpt_conversation.json").read_text())
|
|
p = self._get_provider()
|
|
result = p.normalize_conversation(raw)
|
|
|
|
# The user_editable_context message has user_profile + user_instructions.
|
|
# It should now appear (was silently dropped pre-v0.4.0).
|
|
uec_msgs = [
|
|
m for m in result["messages"]
|
|
if any(
|
|
"Custom Instructions" in (b.get("text") or "")
|
|
for b in (m.get("blocks") or [])
|
|
)
|
|
]
|
|
assert uec_msgs, "user_editable_context should be visible in output"
|
|
# Hidden context marker should be prepended.
|
|
assert uec_msgs[0]["blocks"][0]["type"] == BLOCK_TYPE_HIDDEN_CONTEXT_MARKER
|
|
|
|
def test_user_editable_context_uses_safe_fence(self):
|
|
"""The user_instructions value contains embedded triple-backticks; the rendered
|
|
Markdown must use a fence longer than 3 backticks so embedded fences are inert.
|
|
"""
|
|
from src.blocks import render_blocks_to_markdown
|
|
|
|
raw = json.loads((FIXTURES / "chatgpt_conversation.json").read_text())
|
|
p = self._get_provider()
|
|
result = p.normalize_conversation(raw)
|
|
|
|
uec_msgs = [
|
|
m for m in result["messages"]
|
|
if any(
|
|
"Custom Instructions" in (b.get("text") or "")
|
|
for b in (m.get("blocks") or [])
|
|
)
|
|
]
|
|
assert uec_msgs
|
|
rendered = render_blocks_to_markdown(uec_msgs[0]["blocks"])
|
|
# Content has ``` inside, so the wrap fence must be at least 4 backticks.
|
|
assert "````" in rendered, "expected a 4+ backtick safe-fence wrap"
|
|
|
|
def test_message_roles_are_valid(self):
|
|
raw = json.loads((FIXTURES / "chatgpt_conversation.json").read_text())
|
|
p = self._get_provider()
|
|
result = p.normalize_conversation(raw)
|
|
for msg in result["messages"]:
|
|
assert msg["role"] in ("user", "assistant", "system", "tool")
|
|
|
|
def test_message_count_matches(self):
|
|
raw = json.loads((FIXTURES / "chatgpt_conversation.json").read_text())
|
|
p = self._get_provider()
|
|
result = p.normalize_conversation(raw)
|
|
assert result["message_count"] == len(result["messages"])
|
|
|
|
def test_loss_report_records_messages(self):
|
|
raw = json.loads((FIXTURES / "chatgpt_conversation.json").read_text())
|
|
p = self._get_provider()
|
|
report = LossReport()
|
|
result = p.normalize_conversation(raw, report)
|
|
assert report.messages_rendered == len(result["messages"])
|
|
assert report.conversations == 1
|
|
|
|
|
|
class TestChatGPTUnknownContent:
|
|
"""Unrecognised content types should produce visible unknown blocks + WARNING + tally."""
|
|
|
|
def _get_provider(self):
|
|
from src.providers.chatgpt import ChatGPTProvider
|
|
p = ChatGPTProvider.__new__(ChatGPTProvider)
|
|
import requests
|
|
p._session = requests.Session()
|
|
p._org_id = None
|
|
p._project_ids = []
|
|
p._project_map = {}
|
|
p._project_name_cache = {}
|
|
return p
|
|
|
|
def _make_unknown_conv(self):
|
|
return {
|
|
"id": "test-unknown",
|
|
"title": "Test",
|
|
"create_time": 1700000000.0,
|
|
"update_time": 1700000001.0,
|
|
"mapping": {
|
|
"root": {"id": "root", "message": None, "parent": None, "children": ["msg1"]},
|
|
"msg1": {
|
|
"id": "msg1",
|
|
"message": {
|
|
"id": "msg1",
|
|
"author": {"role": "user"},
|
|
"content": {
|
|
"content_type": "future_unknown_type_xyz",
|
|
"some_field": "value",
|
|
},
|
|
},
|
|
"parent": "root",
|
|
"children": [],
|
|
},
|
|
},
|
|
}
|
|
|
|
def test_unknown_content_type_produces_unknown_block(self):
|
|
p = self._get_provider()
|
|
result = p.normalize_conversation(self._make_unknown_conv())
|
|
assert any(
|
|
b.get("type") == BLOCK_TYPE_UNKNOWN
|
|
for m in result["messages"]
|
|
for b in (m.get("blocks") or [])
|
|
)
|
|
|
|
def test_unknown_content_type_logs_warning(self, caplog):
|
|
p = self._get_provider()
|
|
with caplog.at_level(logging.WARNING):
|
|
p.normalize_conversation(self._make_unknown_conv())
|
|
assert any("future_unknown_type_xyz" in r.message for r in caplog.records)
|
|
|
|
def test_unknown_content_type_increments_loss_report(self):
|
|
p = self._get_provider()
|
|
report = LossReport()
|
|
p.normalize_conversation(self._make_unknown_conv(), report)
|
|
assert report.unknown_blocks["future_unknown_type_xyz"] == 1
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Claude
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestClaudeNormalization:
|
|
"""Claude normalize_conversation block-extraction behavior."""
|
|
|
|
def _get_provider(self):
|
|
from src.providers.claude import ClaudeProvider
|
|
import requests
|
|
p = ClaudeProvider.__new__(ClaudeProvider)
|
|
p._session = requests.Session()
|
|
p._org_id = None
|
|
return p
|
|
|
|
def test_normalizes_with_project(self):
|
|
raw = json.loads((FIXTURES / "claude_conversation.json").read_text())
|
|
p = self._get_provider()
|
|
result = p.normalize_conversation(raw)
|
|
|
|
assert result["id"] == "claude-conv-001"
|
|
assert result["title"] == "StartOS Service Packaging"
|
|
assert result["provider"] == "claude"
|
|
assert result["project"] == "StarTOS Packaging"
|
|
assert result["created_at"] == "2024-06-10T14:32:00.000Z"
|
|
|
|
def test_normalizes_without_project(self):
|
|
raw = json.loads((FIXTURES / "claude_no_project.json").read_text())
|
|
p = self._get_provider()
|
|
result = p.normalize_conversation(raw)
|
|
assert result["project"] is None
|
|
|
|
def test_string_content_emits_text_block(self):
|
|
raw = json.loads((FIXTURES / "claude_conversation.json").read_text())
|
|
p = self._get_provider()
|
|
result = p.normalize_conversation(raw)
|
|
|
|
thanks_msgs = [
|
|
m for m in result["messages"]
|
|
if any(
|
|
"thank you" in (b.get("text") or "").lower()
|
|
for b in (m.get("blocks") or [])
|
|
)
|
|
]
|
|
assert thanks_msgs
|
|
|
|
def test_list_content_emits_blocks_in_order(self):
|
|
raw = json.loads((FIXTURES / "claude_conversation.json").read_text())
|
|
p = self._get_provider()
|
|
result = p.normalize_conversation(raw)
|
|
|
|
assistant_msgs = [m for m in result["messages"] if m["role"] == "assistant"]
|
|
# msg-002 has text + tool_use, in that order.
|
|
assert assistant_msgs
|
|
types = _block_types(assistant_msgs[0])
|
|
assert BLOCK_TYPE_TEXT in types
|
|
assert BLOCK_TYPE_TOOL_USE in types
|
|
# Order preserved
|
|
assert types.index(BLOCK_TYPE_TEXT) < types.index(BLOCK_TYPE_TOOL_USE)
|
|
|
|
def test_tool_use_block_fields(self):
|
|
raw = json.loads((FIXTURES / "claude_conversation.json").read_text())
|
|
p = self._get_provider()
|
|
result = p.normalize_conversation(raw)
|
|
|
|
assistant_msgs = [m for m in result["messages"] if m["role"] == "assistant"]
|
|
tool_block = _first_block(assistant_msgs[0], BLOCK_TYPE_TOOL_USE)
|
|
assert tool_block["name"] == "search"
|
|
assert tool_block["input"] == {"query": "startOS docs"}
|
|
assert tool_block["tool_id"] == "tool-001"
|
|
|
|
def test_image_block_emits_image_placeholder(self):
|
|
raw = json.loads((FIXTURES / "claude_conversation.json").read_text())
|
|
p = self._get_provider()
|
|
result = p.normalize_conversation(raw)
|
|
|
|
msg004 = [
|
|
m for m in result["messages"]
|
|
if any(b.get("type") == BLOCK_TYPE_IMAGE_PLACEHOLDER for b in (m.get("blocks") or []))
|
|
]
|
|
assert msg004
|
|
img = _first_block(msg004[0], BLOCK_TYPE_IMAGE_PLACEHOLDER)
|
|
assert img["ref"] == "claude-image-uuid-1"
|
|
|
|
def test_unknown_block_type_records_loss(self):
|
|
from src.blocks import BLOCK_TYPE_UNKNOWN as _UNK
|
|
raw = {
|
|
"uuid": "test-unknown",
|
|
"name": "T",
|
|
"chat_messages": [
|
|
{
|
|
"uuid": "m1",
|
|
"sender": "human",
|
|
"content": [{"type": "future_block_xyz", "data": "..."}],
|
|
}
|
|
],
|
|
}
|
|
p = self._get_provider()
|
|
report = LossReport()
|
|
result = p.normalize_conversation(raw, report)
|
|
assert any(
|
|
b.get("type") == _UNK
|
|
for m in result["messages"]
|
|
for b in (m.get("blocks") or [])
|
|
)
|
|
assert report.unknown_blocks["future_block_xyz"] == 1
|
|
|
|
def test_thinking_block(self):
|
|
raw = {
|
|
"uuid": "thinking-test",
|
|
"name": "T",
|
|
"chat_messages": [
|
|
{
|
|
"uuid": "m1",
|
|
"sender": "assistant",
|
|
"content": [
|
|
{"type": "thinking", "thinking": "Let me reason about this."},
|
|
{"type": "text", "text": "Here's the answer."},
|
|
],
|
|
}
|
|
],
|
|
}
|
|
p = self._get_provider()
|
|
result = p.normalize_conversation(raw)
|
|
types = _block_types(result["messages"][0])
|
|
assert BLOCK_TYPE_THINKING in types
|
|
assert BLOCK_TYPE_TEXT in types
|
|
|
|
def test_tool_result_with_nested_text_blocks(self):
|
|
raw = {
|
|
"uuid": "tool-result-test",
|
|
"name": "T",
|
|
"chat_messages": [
|
|
{
|
|
"uuid": "m1",
|
|
"sender": "assistant",
|
|
"content": [
|
|
{
|
|
"type": "tool_result",
|
|
"tool_use_id": "tool-001",
|
|
"content": [
|
|
{"type": "text", "text": "search hit 1"},
|
|
{"type": "text", "text": "search hit 2"},
|
|
],
|
|
"is_error": False,
|
|
}
|
|
],
|
|
}
|
|
],
|
|
}
|
|
p = self._get_provider()
|
|
result = p.normalize_conversation(raw)
|
|
tool_result = _first_block(result["messages"][0], BLOCK_TYPE_TOOL_RESULT)
|
|
assert tool_result is not None
|
|
assert "search hit 1" in tool_result["output"]
|
|
assert "search hit 2" in tool_result["output"]
|
|
assert tool_result["is_error"] is False
|
|
|
|
def test_human_sender_maps_to_user(self):
|
|
raw = json.loads((FIXTURES / "claude_conversation.json").read_text())
|
|
p = self._get_provider()
|
|
result = p.normalize_conversation(raw)
|
|
roles = {m["role"] for m in result["messages"]}
|
|
assert "user" in roles
|
|
assert "human" not in roles
|
|
|
|
def test_loss_report_messages_recorded(self):
|
|
raw = json.loads((FIXTURES / "claude_conversation.json").read_text())
|
|
p = self._get_provider()
|
|
report = LossReport()
|
|
result = p.normalize_conversation(raw, report)
|
|
assert report.messages_rendered == len(result["messages"])
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# v0.4.1 — execution_output, system_error, tether_browsing_display, conv_id
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestChatGPTToolOutputs:
|
|
"""v0.4.1 ChatGPT tool-role content_types map onto tool_result blocks."""
|
|
|
|
def _get_provider(self):
|
|
from src.providers.chatgpt import ChatGPTProvider
|
|
p = ChatGPTProvider.__new__(ChatGPTProvider)
|
|
import requests
|
|
p._session = requests.Session()
|
|
p._org_id = None
|
|
p._project_ids = []
|
|
p._project_map = {}
|
|
p._project_name_cache = {}
|
|
return p
|
|
|
|
def test_execution_output_emits_tool_result_with_metadata(self):
|
|
raw = json.loads((FIXTURES / "chatgpt_conversation.json").read_text())
|
|
p = self._get_provider()
|
|
result = p.normalize_conversation(raw)
|
|
|
|
exec_msgs = [
|
|
m for m in result["messages"]
|
|
if any(
|
|
b.get("type") == BLOCK_TYPE_TOOL_RESULT
|
|
and b.get("tool_name") == "container.exec"
|
|
for b in (m.get("blocks") or [])
|
|
)
|
|
]
|
|
assert exec_msgs, "expected execution_output to render as tool_result"
|
|
block = next(
|
|
b for b in exec_msgs[0]["blocks"] if b.get("type") == BLOCK_TYPE_TOOL_RESULT
|
|
)
|
|
assert block["output"].startswith("Hello from container.exec")
|
|
assert block["is_error"] is False
|
|
assert block["summary"] == "Reading skill documentation"
|
|
|
|
def test_execution_output_message_role_is_tool(self):
|
|
raw = json.loads((FIXTURES / "chatgpt_conversation.json").read_text())
|
|
p = self._get_provider()
|
|
result = p.normalize_conversation(raw)
|
|
tool_msgs = [m for m in result["messages"] if m["role"] == "tool"]
|
|
assert tool_msgs, "tool-role messages must pass through (filter lifted in v0.4.0)"
|
|
|
|
def test_empty_execution_output_skipped(self, caplog):
|
|
raw = json.loads((FIXTURES / "chatgpt_conversation.json").read_text())
|
|
p = self._get_provider()
|
|
with caplog.at_level(logging.DEBUG, logger="src.providers.chatgpt"):
|
|
result = p.normalize_conversation(raw)
|
|
|
|
# The empty execution_output (author.name="python") must NOT appear.
|
|
python_msgs = [
|
|
m for m in result["messages"]
|
|
if any(
|
|
b.get("type") == BLOCK_TYPE_TOOL_RESULT and b.get("tool_name") == "python"
|
|
for b in (m.get("blocks") or [])
|
|
)
|
|
]
|
|
assert not python_msgs, "empty execution_output should be skipped"
|
|
assert any("Skipping empty execution_output" in r.message for r in caplog.records)
|
|
|
|
def test_system_error_emits_error_tool_result(self):
|
|
raw = json.loads((FIXTURES / "chatgpt_conversation.json").read_text())
|
|
p = self._get_provider()
|
|
result = p.normalize_conversation(raw)
|
|
|
|
web_err = [
|
|
m for m in result["messages"]
|
|
if any(
|
|
b.get("type") == BLOCK_TYPE_TOOL_RESULT
|
|
and b.get("tool_name") == "web"
|
|
and b.get("is_error") is True
|
|
for b in (m.get("blocks") or [])
|
|
)
|
|
]
|
|
assert web_err, "system_error should render as tool_result with is_error=True"
|
|
block = next(b for b in web_err[0]["blocks"] if b.get("tool_name") == "web")
|
|
assert "503" in block["output"]
|
|
|
|
def test_tether_browsing_display_spinner_skipped(self, caplog):
|
|
raw = json.loads((FIXTURES / "chatgpt_conversation.json").read_text())
|
|
p = self._get_provider()
|
|
with caplog.at_level(logging.DEBUG, logger="src.providers.chatgpt"):
|
|
result = p.normalize_conversation(raw)
|
|
|
|
spinner_msgs = [
|
|
m for m in result["messages"]
|
|
if any(
|
|
b.get("type") == BLOCK_TYPE_TOOL_RESULT and b.get("tool_name") == "file_search"
|
|
for b in (m.get("blocks") or [])
|
|
)
|
|
]
|
|
assert not spinner_msgs, "spinner tether_browsing_display should be skipped"
|
|
assert any("tether_browsing_display spinner" in r.message for r in caplog.records)
|
|
|
|
def test_tether_browsing_display_populated_renders_defensively(self):
|
|
"""Defensive case (never observed in real data) — populated browse renders."""
|
|
conv = {
|
|
"id": "test-tether",
|
|
"title": "T",
|
|
"create_time": 1700000000.0,
|
|
"update_time": 1700000001.0,
|
|
"mapping": {
|
|
"root": {"id": "root", "message": None, "parent": None, "children": ["m1"]},
|
|
"m1": {
|
|
"id": "m1",
|
|
"parent": "root",
|
|
"children": [],
|
|
"message": {
|
|
"id": "m1",
|
|
"author": {"role": "tool", "name": "browser"},
|
|
"content": {
|
|
"content_type": "tether_browsing_display",
|
|
"result": "Found 3 results about kubernetes ingress.",
|
|
"summary": "ingress search",
|
|
"assets": None,
|
|
"tether_id": None,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
p = self._get_provider()
|
|
result = p.normalize_conversation(conv)
|
|
assert any(
|
|
b.get("type") == BLOCK_TYPE_TOOL_RESULT and b.get("tool_name") == "browser"
|
|
for m in result["messages"]
|
|
for b in (m.get("blocks") or [])
|
|
)
|
|
|
|
|
|
class TestChatGPTConvIdFallback:
|
|
"""v0.4.1: live ChatGPT detail responses use conversation_id, not id."""
|
|
|
|
def _get_provider(self):
|
|
from src.providers.chatgpt import ChatGPTProvider
|
|
p = ChatGPTProvider.__new__(ChatGPTProvider)
|
|
import requests
|
|
p._session = requests.Session()
|
|
p._org_id = None
|
|
p._project_ids = []
|
|
p._project_map = {}
|
|
p._project_name_cache = {}
|
|
return p
|
|
|
|
def test_falls_back_to_conversation_id(self):
|
|
raw = {
|
|
"conversation_id": "live-chatgpt-uuid",
|
|
"title": "T",
|
|
"create_time": 1700000000.0,
|
|
"update_time": 1700000001.0,
|
|
"mapping": {
|
|
"root": {"id": "root", "message": None, "parent": None, "children": []},
|
|
},
|
|
}
|
|
p = self._get_provider()
|
|
result = p.normalize_conversation(raw)
|
|
assert result["id"] == "live-chatgpt-uuid"
|
|
|
|
def test_id_takes_precedence_when_both_present(self):
|
|
raw = {
|
|
"id": "from-id",
|
|
"conversation_id": "from-conversation-id",
|
|
"title": "T",
|
|
"create_time": 1700000000.0,
|
|
"update_time": 1700000001.0,
|
|
"mapping": {
|
|
"root": {"id": "root", "message": None, "parent": None, "children": []},
|
|
},
|
|
}
|
|
p = self._get_provider()
|
|
result = p.normalize_conversation(raw)
|
|
assert result["id"] == "from-id"
|