The __Secure-next-auth.session-token cannot be used directly as a Bearer token. It must first be exchanged via GET /api/auth/session (with the token sent as a Cookie) to obtain a short-lived accessToken. This accessToken is then used as the Authorization: Bearer header for all backend-api calls. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
299 lines
11 KiB
Python
299 lines
11 KiB
Python
"""ChatGPT provider — accesses chat.openai.com internal web API."""
|
|
|
|
import logging
|
|
import os
|
|
from typing import Any
|
|
|
|
from src.providers.base import BaseProvider, ProviderError, REQUEST_TIMEOUT
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
BASE_URL = "https://chatgpt.com/backend-api"
|
|
AUTH_SESSION_URL = "https://chatgpt.com/api/auth/session"
|
|
|
|
|
|
class ChatGPTProvider(BaseProvider):
|
|
"""Provider for ChatGPT conversations via the internal web API.
|
|
|
|
Authentication is a two-step process:
|
|
1. Send __Secure-next-auth.session-token as a Cookie header to
|
|
/api/auth/session to obtain a short-lived accessToken.
|
|
2. Use that accessToken as the Bearer token for all backend-api calls.
|
|
|
|
Token: __Secure-next-auth.session-token cookie (~7 day lifetime).
|
|
"""
|
|
|
|
provider_name = "chatgpt"
|
|
|
|
def __init__(self, session_token: str | None = None) -> None:
|
|
super().__init__()
|
|
token = session_token or os.getenv("CHATGPT_SESSION_TOKEN", "").strip()
|
|
if not token:
|
|
raise ProviderError(
|
|
self.provider_name,
|
|
"init",
|
|
RuntimeError(
|
|
"CHATGPT_SESSION_TOKEN is not set. "
|
|
"Run 'python -m src.main auth' to configure it."
|
|
),
|
|
)
|
|
self._session_token = token
|
|
self._session.headers.update(
|
|
{
|
|
"Referer": "https://chatgpt.com/",
|
|
"Origin": "https://chatgpt.com",
|
|
}
|
|
)
|
|
# Exchange the session cookie for an access token immediately
|
|
self._access_token: str = self._fetch_access_token(token)
|
|
self._session.headers["Authorization"] = f"Bearer {self._access_token}"
|
|
logger.debug("[chatgpt] Session initialised — access token obtained (token: [REDACTED])")
|
|
|
|
def _fetch_access_token(self, session_token: str) -> str:
|
|
"""Exchange the session cookie for a Bearer access token.
|
|
|
|
Calls GET /api/auth/session with the session cookie, which returns
|
|
{"accessToken": "...", "user": {...}}.
|
|
"""
|
|
logger.debug("[chatgpt] Fetching access token from %s", AUTH_SESSION_URL)
|
|
try:
|
|
resp = self._session.get(
|
|
AUTH_SESSION_URL,
|
|
headers={"Cookie": f"__Secure-next-auth.session-token={session_token}"},
|
|
timeout=REQUEST_TIMEOUT,
|
|
)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
except Exception as e:
|
|
raise ProviderError(
|
|
self.provider_name,
|
|
"fetch_access_token",
|
|
RuntimeError(
|
|
f"Could not exchange session token for access token: {e}. "
|
|
"Check that your CHATGPT_SESSION_TOKEN is current."
|
|
),
|
|
) from e
|
|
|
|
access_token = data.get("accessToken")
|
|
if not access_token:
|
|
raise ProviderError(
|
|
self.provider_name,
|
|
"fetch_access_token",
|
|
RuntimeError(
|
|
"No accessToken in /api/auth/session response. "
|
|
"Your session token may be expired — run 'python -m src.main auth' to refresh."
|
|
),
|
|
)
|
|
return access_token
|
|
|
|
def _handle_401(self) -> None:
|
|
msg = (
|
|
"[chatgpt] Authentication failed (401 Unauthorized). "
|
|
"Your __Secure-next-auth.session-token has likely expired (~7 day lifetime). "
|
|
"The session token is used to obtain a short-lived access token via /api/auth/session. "
|
|
"To refresh: open chatgpt.com in Chrome → F12 → Application → Cookies "
|
|
"→ find '__Secure-next-auth.session-token' → copy the value. "
|
|
"Then run 'python -m src.main auth' or update CHATGPT_SESSION_TOKEN in .env."
|
|
)
|
|
logger.error(msg)
|
|
raise ProviderError(
|
|
self.provider_name,
|
|
"authentication",
|
|
RuntimeError("401 Unauthorized — ChatGPT token expired"),
|
|
)
|
|
|
|
def list_conversations(self, offset: int = 0, limit: int = 100) -> list[dict]:
|
|
"""Fetch one page of conversations.
|
|
|
|
Returns:
|
|
List of conversation summary dicts.
|
|
"""
|
|
url = f"{BASE_URL}/conversations"
|
|
params = {"offset": offset, "limit": limit, "order": "updated"}
|
|
try:
|
|
data = self._make_request("GET", url, params=params)
|
|
except ProviderError:
|
|
raise
|
|
except Exception as e:
|
|
raise ProviderError(self.provider_name, "list_conversations", e) from e
|
|
|
|
if not isinstance(data, dict):
|
|
self._warn_unexpected_schema("list_conversations", "root")
|
|
return []
|
|
|
|
items = data.get("items")
|
|
if items is None:
|
|
self._warn_unexpected_schema("list_conversations", "items")
|
|
return []
|
|
|
|
return items
|
|
|
|
def get_conversation(self, conv_id: str) -> dict:
|
|
"""Fetch full conversation detail for a single ID."""
|
|
url = f"{BASE_URL}/conversation/{conv_id}"
|
|
try:
|
|
data = self._make_request("GET", url)
|
|
except ProviderError:
|
|
raise
|
|
except Exception as e:
|
|
raise ProviderError(self.provider_name, "get_conversation", e) from e
|
|
|
|
if not isinstance(data, dict):
|
|
self._warn_unexpected_schema("get_conversation", "root")
|
|
return {}
|
|
|
|
return data
|
|
|
|
def normalize_conversation(self, raw: dict) -> dict:
|
|
"""Transform ChatGPT raw schema to the common normalized schema.
|
|
|
|
ChatGPT stores messages in a nested ``mapping`` dict where each node
|
|
has an ``id``, ``message``, and ``children`` list. We walk the tree
|
|
from the root node to build a flat ordered message list.
|
|
"""
|
|
conv_id = raw.get("id", "")
|
|
title = raw.get("title") or "Untitled"
|
|
created_at = _ts_to_iso(raw.get("create_time"))
|
|
updated_at = _ts_to_iso(raw.get("update_time"))
|
|
|
|
# Project info — ChatGPT calls it "gizmo_id" or stores project info differently.
|
|
# As of 2024, personal projects appear as a separate projects API; conversations
|
|
# linked to a project have a non-null `workspace_id` or similar field.
|
|
# We use `project_title` if present, else None.
|
|
project: str | None = raw.get("project_title") or raw.get("workspace_title") or None
|
|
|
|
mapping: dict = raw.get("mapping", {})
|
|
messages = _extract_messages(mapping, raw, conv_id)
|
|
|
|
return {
|
|
"id": conv_id,
|
|
"title": title,
|
|
"provider": "chatgpt",
|
|
"project": project,
|
|
"created_at": created_at,
|
|
"updated_at": updated_at,
|
|
"message_count": len(messages),
|
|
"messages": messages,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Internal helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _ts_to_iso(ts: float | int | str | None) -> str:
|
|
"""Convert a Unix timestamp (float) or ISO string to ISO8601."""
|
|
if ts is None:
|
|
return ""
|
|
if isinstance(ts, (int, float)):
|
|
from datetime import datetime, timezone
|
|
return datetime.fromtimestamp(float(ts), tz=timezone.utc).isoformat()
|
|
return str(ts)
|
|
|
|
|
|
def _extract_messages(
|
|
mapping: dict[str, Any], raw: dict, conv_id: str
|
|
) -> list[dict]:
|
|
"""Walk the ChatGPT conversation mapping tree to produce an ordered message list."""
|
|
if not mapping:
|
|
logger.warning("[chatgpt] Conversation %s has empty mapping", conv_id[:8])
|
|
return []
|
|
|
|
# Find the root node (the one that has no parent, or whose parent is None/not in mapping)
|
|
root_id = _find_root(mapping)
|
|
if root_id is None:
|
|
logger.warning(
|
|
"[chatgpt] Could not determine root node for conversation %s", conv_id[:8]
|
|
)
|
|
return []
|
|
|
|
messages: list[dict] = []
|
|
visited: set[str] = set()
|
|
|
|
def walk(node_id: str) -> None:
|
|
if node_id in visited:
|
|
return
|
|
visited.add(node_id)
|
|
|
|
node = mapping.get(node_id, {})
|
|
msg_data = node.get("message")
|
|
|
|
if msg_data:
|
|
role = msg_data.get("author", {}).get("role", "")
|
|
# Skip system/tool messages silently unless they have visible content
|
|
if role in ("user", "assistant"):
|
|
content_obj = msg_data.get("content", {})
|
|
content_type = content_obj.get("content_type", "text")
|
|
text = _extract_text(content_obj, conv_id, node_id)
|
|
|
|
if content_type != "text":
|
|
logger.warning(
|
|
"[chatgpt] Skipping %s content in conversation %s message %s "
|
|
"— rich content not yet supported (see FUTURE.md)",
|
|
content_type,
|
|
conv_id[:8],
|
|
node_id[:8],
|
|
)
|
|
elif text:
|
|
ts = msg_data.get("create_time")
|
|
messages.append(
|
|
{
|
|
"role": role,
|
|
"content": text,
|
|
"content_type": "text",
|
|
"timestamp": _ts_to_iso(ts) if ts else None,
|
|
}
|
|
)
|
|
else:
|
|
logger.debug(
|
|
"[chatgpt] Skipping empty message in conversation %s", conv_id[:8]
|
|
)
|
|
|
|
# Walk children in order (ChatGPT typically has one child per node in a linear chat)
|
|
for child_id in node.get("children", []):
|
|
walk(child_id)
|
|
|
|
walk(root_id)
|
|
return messages
|
|
|
|
|
|
def _find_root(mapping: dict[str, Any]) -> str | None:
|
|
"""Find the root node ID — the node whose parent is absent or None."""
|
|
child_ids: set[str] = set()
|
|
for node in mapping.values():
|
|
for child in node.get("children", []):
|
|
child_ids.add(child)
|
|
|
|
for node_id in mapping:
|
|
if node_id not in child_ids:
|
|
return node_id
|
|
|
|
return None
|
|
|
|
|
|
def _extract_text(content_obj: dict, conv_id: str, node_id: str) -> str:
|
|
"""Extract plain text from a ChatGPT content object."""
|
|
parts = content_obj.get("parts", [])
|
|
if not parts:
|
|
return ""
|
|
|
|
text_parts = []
|
|
for part in parts:
|
|
if isinstance(part, str):
|
|
text_parts.append(part)
|
|
elif isinstance(part, dict):
|
|
# Could be an image or file reference — skip and warn
|
|
part_type = part.get("content_type", "unknown")
|
|
if part_type != "text":
|
|
logger.warning(
|
|
"[chatgpt] Skipping %s attachment in conversation %s "
|
|
"— rich content not yet supported (see FUTURE.md)",
|
|
part_type,
|
|
conv_id[:8],
|
|
)
|
|
else:
|
|
text_parts.append(part.get("text", ""))
|
|
|
|
return "\n".join(t for t in text_parts if t)
|