diff --git a/src/providers/chatgpt.py b/src/providers/chatgpt.py new file mode 100644 index 0000000..86707f8 --- /dev/null +++ b/src/providers/chatgpt.py @@ -0,0 +1,254 @@ +"""ChatGPT provider — accesses chat.openai.com internal web API.""" + +import logging +import os +from typing import Any + +from src.providers.base import BaseProvider, ProviderError + +logger = logging.getLogger(__name__) + +BASE_URL = "https://chatgpt.com/backend-api" + + +class ChatGPTProvider(BaseProvider): + """Provider for ChatGPT conversations via the internal web API. + + Authentication: Authorization: Bearer + Token: __Secure-next-auth.session-token cookie value (a JWT). + Typical validity: ~7 days. + """ + + provider_name = "chatgpt" + + def __init__(self, session_token: str | None = None) -> None: + super().__init__() + token = session_token or os.getenv("CHATGPT_SESSION_TOKEN", "").strip() + if not token: + raise ProviderError( + self.provider_name, + "init", + RuntimeError( + "CHATGPT_SESSION_TOKEN is not set. " + "Run 'python -m src.main auth' to configure it." + ), + ) + # Never log the token value + self._session.headers.update( + { + "Authorization": f"Bearer {token}", + "Referer": "https://chatgpt.com/", + "Origin": "https://chatgpt.com", + } + ) + logger.debug("[chatgpt] Session initialised (token: [REDACTED])") + + def _handle_401(self) -> None: + msg = ( + "[chatgpt] Authentication failed (401 Unauthorized). " + "Your __Secure-next-auth.session-token has likely expired (~7 day lifetime). " + "To refresh: open chatgpt.com in Chrome → F12 → Application → Cookies " + "→ find '__Secure-next-auth.session-token' → copy the value. " + "Then run 'python -m src.main auth' or update CHATGPT_SESSION_TOKEN in .env." + ) + logger.error(msg) + raise ProviderError( + self.provider_name, + "authentication", + RuntimeError("401 Unauthorized — ChatGPT token expired"), + ) + + def list_conversations(self, offset: int = 0, limit: int = 100) -> list[dict]: + """Fetch one page of conversations. + + Returns: + List of conversation summary dicts. + """ + url = f"{BASE_URL}/conversations" + params = {"offset": offset, "limit": limit, "order": "updated"} + try: + data = self._make_request("GET", url, params=params) + except ProviderError: + raise + except Exception as e: + raise ProviderError(self.provider_name, "list_conversations", e) from e + + if not isinstance(data, dict): + self._warn_unexpected_schema("list_conversations", "root") + return [] + + items = data.get("items") + if items is None: + self._warn_unexpected_schema("list_conversations", "items") + return [] + + return items + + def get_conversation(self, conv_id: str) -> dict: + """Fetch full conversation detail for a single ID.""" + url = f"{BASE_URL}/conversation/{conv_id}" + try: + data = self._make_request("GET", url) + except ProviderError: + raise + except Exception as e: + raise ProviderError(self.provider_name, "get_conversation", e) from e + + if not isinstance(data, dict): + self._warn_unexpected_schema("get_conversation", "root") + return {} + + return data + + def normalize_conversation(self, raw: dict) -> dict: + """Transform ChatGPT raw schema to the common normalized schema. + + ChatGPT stores messages in a nested ``mapping`` dict where each node + has an ``id``, ``message``, and ``children`` list. We walk the tree + from the root node to build a flat ordered message list. + """ + conv_id = raw.get("id", "") + title = raw.get("title") or "Untitled" + created_at = _ts_to_iso(raw.get("create_time")) + updated_at = _ts_to_iso(raw.get("update_time")) + + # Project info — ChatGPT calls it "gizmo_id" or stores project info differently. + # As of 2024, personal projects appear as a separate projects API; conversations + # linked to a project have a non-null `workspace_id` or similar field. + # We use `project_title` if present, else None. + project: str | None = raw.get("project_title") or raw.get("workspace_title") or None + + mapping: dict = raw.get("mapping", {}) + messages = _extract_messages(mapping, raw, conv_id) + + return { + "id": conv_id, + "title": title, + "provider": "chatgpt", + "project": project, + "created_at": created_at, + "updated_at": updated_at, + "message_count": len(messages), + "messages": messages, + } + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + + +def _ts_to_iso(ts: float | int | str | None) -> str: + """Convert a Unix timestamp (float) or ISO string to ISO8601.""" + if ts is None: + return "" + if isinstance(ts, (int, float)): + from datetime import datetime, timezone + return datetime.fromtimestamp(float(ts), tz=timezone.utc).isoformat() + return str(ts) + + +def _extract_messages( + mapping: dict[str, Any], raw: dict, conv_id: str +) -> list[dict]: + """Walk the ChatGPT conversation mapping tree to produce an ordered message list.""" + if not mapping: + logger.warning("[chatgpt] Conversation %s has empty mapping", conv_id[:8]) + return [] + + # Find the root node (the one that has no parent, or whose parent is None/not in mapping) + root_id = _find_root(mapping) + if root_id is None: + logger.warning( + "[chatgpt] Could not determine root node for conversation %s", conv_id[:8] + ) + return [] + + messages: list[dict] = [] + visited: set[str] = set() + + def walk(node_id: str) -> None: + if node_id in visited: + return + visited.add(node_id) + + node = mapping.get(node_id, {}) + msg_data = node.get("message") + + if msg_data: + role = msg_data.get("author", {}).get("role", "") + # Skip system/tool messages silently unless they have visible content + if role in ("user", "assistant"): + content_obj = msg_data.get("content", {}) + content_type = content_obj.get("content_type", "text") + text = _extract_text(content_obj, conv_id, node_id) + + if content_type != "text": + logger.warning( + "[chatgpt] Skipping %s content in conversation %s message %s " + "— rich content not yet supported (see FUTURE.md)", + content_type, + conv_id[:8], + node_id[:8], + ) + elif text: + ts = msg_data.get("create_time") + messages.append( + { + "role": role, + "content": text, + "content_type": "text", + "timestamp": _ts_to_iso(ts) if ts else None, + } + ) + else: + logger.debug( + "[chatgpt] Skipping empty message in conversation %s", conv_id[:8] + ) + + # Walk children in order (ChatGPT typically has one child per node in a linear chat) + for child_id in node.get("children", []): + walk(child_id) + + walk(root_id) + return messages + + +def _find_root(mapping: dict[str, Any]) -> str | None: + """Find the root node ID — the node whose parent is absent or None.""" + child_ids: set[str] = set() + for node in mapping.values(): + for child in node.get("children", []): + child_ids.add(child) + + for node_id in mapping: + if node_id not in child_ids: + return node_id + + return None + + +def _extract_text(content_obj: dict, conv_id: str, node_id: str) -> str: + """Extract plain text from a ChatGPT content object.""" + parts = content_obj.get("parts", []) + if not parts: + return "" + + text_parts = [] + for part in parts: + if isinstance(part, str): + text_parts.append(part) + elif isinstance(part, dict): + # Could be an image or file reference — skip and warn + part_type = part.get("content_type", "unknown") + if part_type != "text": + logger.warning( + "[chatgpt] Skipping %s attachment in conversation %s " + "— rich content not yet supported (see FUTURE.md)", + part_type, + conv_id[:8], + ) + else: + text_parts.append(part.get("text", "")) + + return "\n".join(t for t in text_parts if t) diff --git a/src/providers/claude.py b/src/providers/claude.py new file mode 100644 index 0000000..54f0588 --- /dev/null +++ b/src/providers/claude.py @@ -0,0 +1,274 @@ +"""Claude provider — accesses claude.ai internal web API.""" + +import logging +import os + +from src.providers.base import BaseProvider, ProviderError + +logger = logging.getLogger(__name__) + +BASE_URL = "https://claude.ai/api" + + +class ClaudeProvider(BaseProvider): + """Provider for Claude conversations via the internal web API. + + Authentication: Cookie: sessionKey= + Token: sessionKey cookie value from claude.ai. + Typical validity: ~30 days (opaque; expiry cannot be decoded client-side). + """ + + provider_name = "claude" + + def __init__(self, session_key: str | None = None) -> None: + super().__init__() + key = session_key or os.getenv("CLAUDE_SESSION_KEY", "").strip() + if not key: + raise ProviderError( + self.provider_name, + "init", + RuntimeError( + "CLAUDE_SESSION_KEY is not set. " + "Run 'python -m src.main auth' to configure it." + ), + ) + # Set cookie header; never log the key value + self._session.headers.update( + { + "Cookie": f"sessionKey={key}", + "Referer": "https://claude.ai/", + "Origin": "https://claude.ai", + } + ) + self._org_id: str | None = None # cached per session + logger.debug("[claude] Session initialised (key: [REDACTED])") + + def _handle_401(self) -> None: + msg = ( + "[claude] Authentication failed (401 Unauthorized). " + "Your sessionKey has likely expired (~30 day lifetime). " + "Note: Claude session keys are opaque — a 401 is the only expiry signal. " + "To refresh: open claude.ai in Chrome → F12 → Application → Cookies " + "→ find 'sessionKey' → copy the value. " + "Then run 'python -m src.main auth' or update CLAUDE_SESSION_KEY in .env." + ) + logger.error(msg) + raise ProviderError( + self.provider_name, + "authentication", + RuntimeError("401 Unauthorized — Claude session key expired"), + ) + + def _get_org_id(self) -> str: + """Fetch and cache the organization ID (required for all Claude API calls).""" + if self._org_id: + return self._org_id + + logger.debug("[claude] Bootstrapping: fetching organization ID") + url = f"{BASE_URL}/organizations" + try: + data = self._make_request("GET", url) + except ProviderError: + raise + except Exception as e: + raise ProviderError(self.provider_name, "get_org_id", e) from e + + if not isinstance(data, list) or not data: + self._warn_unexpected_schema("get_org_id", "organizations array") + raise ProviderError( + self.provider_name, + "get_org_id", + RuntimeError("organizations endpoint returned empty or unexpected response"), + ) + + org = data[0] + org_id = org.get("uuid") or org.get("id") + if not org_id: + self._warn_unexpected_schema("get_org_id", "uuid/id") + raise ProviderError( + self.provider_name, + "get_org_id", + RuntimeError("Could not find organization ID in response"), + ) + + self._org_id = org_id + logger.debug("[claude] Got org_id: %s", org_id) + return org_id + + def list_conversations(self, offset: int = 0, limit: int = 100) -> list[dict]: + """Fetch one page of conversations. + + Claude's API may use cursor-based pagination. We attempt offset-based + first (via query param); if the response includes a ``next_cursor`` + field, a WARNING is logged indicating manual investigation is needed. + """ + org_id = self._get_org_id() + url = f"{BASE_URL}/organizations/{org_id}/chat_conversations" + params: dict = {"limit": limit} + if offset > 0: + params["offset"] = offset + + try: + data = self._make_request("GET", url, params=params) + except ProviderError: + raise + except Exception as e: + raise ProviderError(self.provider_name, "list_conversations", e) from e + + # Handle both list and dict responses + if isinstance(data, list): + conversations = data + elif isinstance(data, dict): + conversations = data.get("conversations") or data.get("chats") or [] + if not conversations and data: + # Check for unexpected pagination mechanism + if "next_cursor" in data or "cursor" in data or "next" in data: + logger.warning( + "[claude] API returned cursor-based pagination — " + "only first page will be fetched. " + "Please report this at GitHub Issues." + ) + else: + self._warn_unexpected_schema("list_conversations", "root") + return [] + + return conversations + + def get_conversation(self, conv_id: str) -> dict: + """Fetch full conversation detail for a single ID.""" + org_id = self._get_org_id() + url = f"{BASE_URL}/organizations/{org_id}/chat_conversations/{conv_id}" + try: + data = self._make_request("GET", url) + except ProviderError: + raise + except Exception as e: + raise ProviderError(self.provider_name, "get_conversation", e) from e + + if not isinstance(data, dict): + self._warn_unexpected_schema("get_conversation", "root") + return {} + + return data + + def normalize_conversation(self, raw: dict) -> dict: + """Transform Claude raw schema to the common normalized schema.""" + conv_id = raw.get("uuid") or raw.get("id", "") + title = raw.get("name") or raw.get("title") or "Untitled" + created_at = raw.get("created_at") or raw.get("create_time") or "" + updated_at = raw.get("updated_at") or raw.get("update_time") or "" + + # Project name — Claude may nest project info as project.name + project_data = raw.get("project") or {} + project: str | None = ( + project_data.get("name") + if isinstance(project_data, dict) + else project_data + ) or None + + # Messages + raw_messages = raw.get("chat_messages") or raw.get("messages") or [] + messages = [] + + for msg in raw_messages: + role = _map_role(msg.get("sender") or msg.get("role", "")) + if not role: + continue + + # Content can be a string or a list of content blocks + content_raw = msg.get("content") or msg.get("text") or "" + content, skipped_types = _extract_claude_text(content_raw, conv_id) + + for ctype in skipped_types: + logger.warning( + "[claude] Skipping %s content in conversation %s " + "— rich content not yet supported (see FUTURE.md)", + ctype, + conv_id[:8], + ) + + timestamp = msg.get("created_at") or msg.get("timestamp") or None + + if content is None: + logger.debug("[claude] Skipping empty message in conversation %s", conv_id[:8]) + continue + + messages.append( + { + "role": role, + "content": content, + "content_type": "text", + "timestamp": timestamp, + } + ) + + return { + "id": conv_id, + "title": title, + "provider": "claude", + "project": project, + "created_at": created_at, + "updated_at": updated_at, + "message_count": len(messages), + "messages": messages, + } + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + + +def _map_role(sender: str) -> str | None: + """Map Claude sender strings to normalized roles.""" + mapping = { + "human": "user", + "user": "user", + "assistant": "assistant", + "claude": "assistant", + "ai": "assistant", + "system": "system", + } + return mapping.get(sender.lower()) if sender else None + + +def _extract_claude_text( + content: str | list | dict, conv_id: str +) -> tuple[str | None, list[str]]: + """Extract plain text from a Claude content field. + + Returns: + (text_or_None, list_of_skipped_content_types) + """ + skipped: list[str] = [] + + if isinstance(content, str): + text = content.strip() + return (text if text else None), skipped + + if isinstance(content, list): + parts: list[str] = [] + for block in content: + if isinstance(block, str): + parts.append(block) + elif isinstance(block, dict): + btype = block.get("type", "text") + if btype == "text": + t = block.get("text", "").strip() + if t: + parts.append(t) + else: + skipped.append(btype) + text = "\n".join(parts).strip() + return (text if text else None), skipped + + if isinstance(content, dict): + btype = content.get("type", "text") + if btype == "text": + text = content.get("text", "").strip() + return (text if text else None), skipped + else: + skipped.append(btype) + return None, skipped + + return None, skipped