feat: add ChatGPT and Claude providers

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
JesseMarkowitz
2026-02-27 22:59:06 -05:00
parent 6073034789
commit 3adb2d2b48
2 changed files with 528 additions and 0 deletions

254
src/providers/chatgpt.py Normal file
View File

@@ -0,0 +1,254 @@
"""ChatGPT provider — accesses chat.openai.com internal web API."""
import logging
import os
from typing import Any
from src.providers.base import BaseProvider, ProviderError
logger = logging.getLogger(__name__)
BASE_URL = "https://chatgpt.com/backend-api"
class ChatGPTProvider(BaseProvider):
"""Provider for ChatGPT conversations via the internal web API.
Authentication: Authorization: Bearer <CHATGPT_SESSION_TOKEN>
Token: __Secure-next-auth.session-token cookie value (a JWT).
Typical validity: ~7 days.
"""
provider_name = "chatgpt"
def __init__(self, session_token: str | None = None) -> None:
super().__init__()
token = session_token or os.getenv("CHATGPT_SESSION_TOKEN", "").strip()
if not token:
raise ProviderError(
self.provider_name,
"init",
RuntimeError(
"CHATGPT_SESSION_TOKEN is not set. "
"Run 'python -m src.main auth' to configure it."
),
)
# Never log the token value
self._session.headers.update(
{
"Authorization": f"Bearer {token}",
"Referer": "https://chatgpt.com/",
"Origin": "https://chatgpt.com",
}
)
logger.debug("[chatgpt] Session initialised (token: [REDACTED])")
def _handle_401(self) -> None:
msg = (
"[chatgpt] Authentication failed (401 Unauthorized). "
"Your __Secure-next-auth.session-token has likely expired (~7 day lifetime). "
"To refresh: open chatgpt.com in Chrome → F12 → Application → Cookies "
"→ find '__Secure-next-auth.session-token' → copy the value. "
"Then run 'python -m src.main auth' or update CHATGPT_SESSION_TOKEN in .env."
)
logger.error(msg)
raise ProviderError(
self.provider_name,
"authentication",
RuntimeError("401 Unauthorized — ChatGPT token expired"),
)
def list_conversations(self, offset: int = 0, limit: int = 100) -> list[dict]:
"""Fetch one page of conversations.
Returns:
List of conversation summary dicts.
"""
url = f"{BASE_URL}/conversations"
params = {"offset": offset, "limit": limit, "order": "updated"}
try:
data = self._make_request("GET", url, params=params)
except ProviderError:
raise
except Exception as e:
raise ProviderError(self.provider_name, "list_conversations", e) from e
if not isinstance(data, dict):
self._warn_unexpected_schema("list_conversations", "root")
return []
items = data.get("items")
if items is None:
self._warn_unexpected_schema("list_conversations", "items")
return []
return items
def get_conversation(self, conv_id: str) -> dict:
"""Fetch full conversation detail for a single ID."""
url = f"{BASE_URL}/conversation/{conv_id}"
try:
data = self._make_request("GET", url)
except ProviderError:
raise
except Exception as e:
raise ProviderError(self.provider_name, "get_conversation", e) from e
if not isinstance(data, dict):
self._warn_unexpected_schema("get_conversation", "root")
return {}
return data
def normalize_conversation(self, raw: dict) -> dict:
"""Transform ChatGPT raw schema to the common normalized schema.
ChatGPT stores messages in a nested ``mapping`` dict where each node
has an ``id``, ``message``, and ``children`` list. We walk the tree
from the root node to build a flat ordered message list.
"""
conv_id = raw.get("id", "")
title = raw.get("title") or "Untitled"
created_at = _ts_to_iso(raw.get("create_time"))
updated_at = _ts_to_iso(raw.get("update_time"))
# Project info — ChatGPT calls it "gizmo_id" or stores project info differently.
# As of 2024, personal projects appear as a separate projects API; conversations
# linked to a project have a non-null `workspace_id` or similar field.
# We use `project_title` if present, else None.
project: str | None = raw.get("project_title") or raw.get("workspace_title") or None
mapping: dict = raw.get("mapping", {})
messages = _extract_messages(mapping, raw, conv_id)
return {
"id": conv_id,
"title": title,
"provider": "chatgpt",
"project": project,
"created_at": created_at,
"updated_at": updated_at,
"message_count": len(messages),
"messages": messages,
}
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _ts_to_iso(ts: float | int | str | None) -> str:
"""Convert a Unix timestamp (float) or ISO string to ISO8601."""
if ts is None:
return ""
if isinstance(ts, (int, float)):
from datetime import datetime, timezone
return datetime.fromtimestamp(float(ts), tz=timezone.utc).isoformat()
return str(ts)
def _extract_messages(
mapping: dict[str, Any], raw: dict, conv_id: str
) -> list[dict]:
"""Walk the ChatGPT conversation mapping tree to produce an ordered message list."""
if not mapping:
logger.warning("[chatgpt] Conversation %s has empty mapping", conv_id[:8])
return []
# Find the root node (the one that has no parent, or whose parent is None/not in mapping)
root_id = _find_root(mapping)
if root_id is None:
logger.warning(
"[chatgpt] Could not determine root node for conversation %s", conv_id[:8]
)
return []
messages: list[dict] = []
visited: set[str] = set()
def walk(node_id: str) -> None:
if node_id in visited:
return
visited.add(node_id)
node = mapping.get(node_id, {})
msg_data = node.get("message")
if msg_data:
role = msg_data.get("author", {}).get("role", "")
# Skip system/tool messages silently unless they have visible content
if role in ("user", "assistant"):
content_obj = msg_data.get("content", {})
content_type = content_obj.get("content_type", "text")
text = _extract_text(content_obj, conv_id, node_id)
if content_type != "text":
logger.warning(
"[chatgpt] Skipping %s content in conversation %s message %s "
"— rich content not yet supported (see FUTURE.md)",
content_type,
conv_id[:8],
node_id[:8],
)
elif text:
ts = msg_data.get("create_time")
messages.append(
{
"role": role,
"content": text,
"content_type": "text",
"timestamp": _ts_to_iso(ts) if ts else None,
}
)
else:
logger.debug(
"[chatgpt] Skipping empty message in conversation %s", conv_id[:8]
)
# Walk children in order (ChatGPT typically has one child per node in a linear chat)
for child_id in node.get("children", []):
walk(child_id)
walk(root_id)
return messages
def _find_root(mapping: dict[str, Any]) -> str | None:
"""Find the root node ID — the node whose parent is absent or None."""
child_ids: set[str] = set()
for node in mapping.values():
for child in node.get("children", []):
child_ids.add(child)
for node_id in mapping:
if node_id not in child_ids:
return node_id
return None
def _extract_text(content_obj: dict, conv_id: str, node_id: str) -> str:
"""Extract plain text from a ChatGPT content object."""
parts = content_obj.get("parts", [])
if not parts:
return ""
text_parts = []
for part in parts:
if isinstance(part, str):
text_parts.append(part)
elif isinstance(part, dict):
# Could be an image or file reference — skip and warn
part_type = part.get("content_type", "unknown")
if part_type != "text":
logger.warning(
"[chatgpt] Skipping %s attachment in conversation %s "
"— rich content not yet supported (see FUTURE.md)",
part_type,
conv_id[:8],
)
else:
text_parts.append(part.get("text", ""))
return "\n".join(t for t in text_parts if t)

274
src/providers/claude.py Normal file
View File

@@ -0,0 +1,274 @@
"""Claude provider — accesses claude.ai internal web API."""
import logging
import os
from src.providers.base import BaseProvider, ProviderError
logger = logging.getLogger(__name__)
BASE_URL = "https://claude.ai/api"
class ClaudeProvider(BaseProvider):
"""Provider for Claude conversations via the internal web API.
Authentication: Cookie: sessionKey=<CLAUDE_SESSION_KEY>
Token: sessionKey cookie value from claude.ai.
Typical validity: ~30 days (opaque; expiry cannot be decoded client-side).
"""
provider_name = "claude"
def __init__(self, session_key: str | None = None) -> None:
super().__init__()
key = session_key or os.getenv("CLAUDE_SESSION_KEY", "").strip()
if not key:
raise ProviderError(
self.provider_name,
"init",
RuntimeError(
"CLAUDE_SESSION_KEY is not set. "
"Run 'python -m src.main auth' to configure it."
),
)
# Set cookie header; never log the key value
self._session.headers.update(
{
"Cookie": f"sessionKey={key}",
"Referer": "https://claude.ai/",
"Origin": "https://claude.ai",
}
)
self._org_id: str | None = None # cached per session
logger.debug("[claude] Session initialised (key: [REDACTED])")
def _handle_401(self) -> None:
msg = (
"[claude] Authentication failed (401 Unauthorized). "
"Your sessionKey has likely expired (~30 day lifetime). "
"Note: Claude session keys are opaque — a 401 is the only expiry signal. "
"To refresh: open claude.ai in Chrome → F12 → Application → Cookies "
"→ find 'sessionKey' → copy the value. "
"Then run 'python -m src.main auth' or update CLAUDE_SESSION_KEY in .env."
)
logger.error(msg)
raise ProviderError(
self.provider_name,
"authentication",
RuntimeError("401 Unauthorized — Claude session key expired"),
)
def _get_org_id(self) -> str:
"""Fetch and cache the organization ID (required for all Claude API calls)."""
if self._org_id:
return self._org_id
logger.debug("[claude] Bootstrapping: fetching organization ID")
url = f"{BASE_URL}/organizations"
try:
data = self._make_request("GET", url)
except ProviderError:
raise
except Exception as e:
raise ProviderError(self.provider_name, "get_org_id", e) from e
if not isinstance(data, list) or not data:
self._warn_unexpected_schema("get_org_id", "organizations array")
raise ProviderError(
self.provider_name,
"get_org_id",
RuntimeError("organizations endpoint returned empty or unexpected response"),
)
org = data[0]
org_id = org.get("uuid") or org.get("id")
if not org_id:
self._warn_unexpected_schema("get_org_id", "uuid/id")
raise ProviderError(
self.provider_name,
"get_org_id",
RuntimeError("Could not find organization ID in response"),
)
self._org_id = org_id
logger.debug("[claude] Got org_id: %s", org_id)
return org_id
def list_conversations(self, offset: int = 0, limit: int = 100) -> list[dict]:
"""Fetch one page of conversations.
Claude's API may use cursor-based pagination. We attempt offset-based
first (via query param); if the response includes a ``next_cursor``
field, a WARNING is logged indicating manual investigation is needed.
"""
org_id = self._get_org_id()
url = f"{BASE_URL}/organizations/{org_id}/chat_conversations"
params: dict = {"limit": limit}
if offset > 0:
params["offset"] = offset
try:
data = self._make_request("GET", url, params=params)
except ProviderError:
raise
except Exception as e:
raise ProviderError(self.provider_name, "list_conversations", e) from e
# Handle both list and dict responses
if isinstance(data, list):
conversations = data
elif isinstance(data, dict):
conversations = data.get("conversations") or data.get("chats") or []
if not conversations and data:
# Check for unexpected pagination mechanism
if "next_cursor" in data or "cursor" in data or "next" in data:
logger.warning(
"[claude] API returned cursor-based pagination — "
"only first page will be fetched. "
"Please report this at GitHub Issues."
)
else:
self._warn_unexpected_schema("list_conversations", "root")
return []
return conversations
def get_conversation(self, conv_id: str) -> dict:
"""Fetch full conversation detail for a single ID."""
org_id = self._get_org_id()
url = f"{BASE_URL}/organizations/{org_id}/chat_conversations/{conv_id}"
try:
data = self._make_request("GET", url)
except ProviderError:
raise
except Exception as e:
raise ProviderError(self.provider_name, "get_conversation", e) from e
if not isinstance(data, dict):
self._warn_unexpected_schema("get_conversation", "root")
return {}
return data
def normalize_conversation(self, raw: dict) -> dict:
"""Transform Claude raw schema to the common normalized schema."""
conv_id = raw.get("uuid") or raw.get("id", "")
title = raw.get("name") or raw.get("title") or "Untitled"
created_at = raw.get("created_at") or raw.get("create_time") or ""
updated_at = raw.get("updated_at") or raw.get("update_time") or ""
# Project name — Claude may nest project info as project.name
project_data = raw.get("project") or {}
project: str | None = (
project_data.get("name")
if isinstance(project_data, dict)
else project_data
) or None
# Messages
raw_messages = raw.get("chat_messages") or raw.get("messages") or []
messages = []
for msg in raw_messages:
role = _map_role(msg.get("sender") or msg.get("role", ""))
if not role:
continue
# Content can be a string or a list of content blocks
content_raw = msg.get("content") or msg.get("text") or ""
content, skipped_types = _extract_claude_text(content_raw, conv_id)
for ctype in skipped_types:
logger.warning(
"[claude] Skipping %s content in conversation %s "
"— rich content not yet supported (see FUTURE.md)",
ctype,
conv_id[:8],
)
timestamp = msg.get("created_at") or msg.get("timestamp") or None
if content is None:
logger.debug("[claude] Skipping empty message in conversation %s", conv_id[:8])
continue
messages.append(
{
"role": role,
"content": content,
"content_type": "text",
"timestamp": timestamp,
}
)
return {
"id": conv_id,
"title": title,
"provider": "claude",
"project": project,
"created_at": created_at,
"updated_at": updated_at,
"message_count": len(messages),
"messages": messages,
}
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _map_role(sender: str) -> str | None:
"""Map Claude sender strings to normalized roles."""
mapping = {
"human": "user",
"user": "user",
"assistant": "assistant",
"claude": "assistant",
"ai": "assistant",
"system": "system",
}
return mapping.get(sender.lower()) if sender else None
def _extract_claude_text(
content: str | list | dict, conv_id: str
) -> tuple[str | None, list[str]]:
"""Extract plain text from a Claude content field.
Returns:
(text_or_None, list_of_skipped_content_types)
"""
skipped: list[str] = []
if isinstance(content, str):
text = content.strip()
return (text if text else None), skipped
if isinstance(content, list):
parts: list[str] = []
for block in content:
if isinstance(block, str):
parts.append(block)
elif isinstance(block, dict):
btype = block.get("type", "text")
if btype == "text":
t = block.get("text", "").strip()
if t:
parts.append(t)
else:
skipped.append(btype)
text = "\n".join(parts).strip()
return (text if text else None), skipped
if isinstance(content, dict):
btype = content.get("type", "text")
if btype == "text":
text = content.get("text", "").strip()
return (text if text else None), skipped
else:
skipped.append(btype)
return None, skipped
return None, skipped