feat: v0.2.0 — Joplin import, ChatGPT Projects, --project filter

Core features:
- Add `joplin` command: syncs exported Markdown to Joplin via local REST API
- Notebooks auto-created per provider+project (e.g. "ChatGPT - My Project")
- Idempotent: notes updated (not duplicated) on re-run; note ID tracked in manifest
- Add `--project` filter to `export` and `list` commands (substring or 'none')
- Add ChatGPT Projects support via CHATGPT_PROJECT_IDS env var

Config:
- Add JOPLIN_API_TOKEN, JOPLIN_API_URL, JOPLIN_REQUEST_TIMEOUT
- Version now read from importlib.metadata (single source of truth: pyproject.toml)
- Bump version to 0.2.0

Quality:
- Explicit Timeout handling in JoplinClient with actionable error messages
- token validation (validate_token) separate from connectivity (ping)
- Remove debug_auth.py, debug_claude.py, and untracked .har file
- Add *.har to .gitignore (may contain auth cookies/session tokens)
- Update README, CHANGELOG, FUTURE.md to reflect v0.2.0

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
JesseMarkowitz
2026-03-01 06:04:03 -05:00
parent 23d7c17255
commit 304cf4fde4
16 changed files with 1795 additions and 133 deletions

View File

@@ -1,4 +1,4 @@
"""Local cache manifest for tracking exported conversations."""
"""Local cache manifest for tracking exported and Joplin-synced conversations."""
import json
import logging
@@ -18,11 +18,17 @@ class CacheError(Exception):
class Cache:
"""Manages the local JSON manifest of exported conversations.
"""Manages the local JSON manifest of exported and Joplin-synced conversations.
The manifest is the single source of truth for what has been exported.
Every run compares the provider's full conversation list against this
manifest to determine what is new or updated.
The manifest is the single source of truth for what has been exported and
synced. Every export run compares the provider's full conversation list
against this manifest to determine what is new or updated. The Joplin sync
run reads it to find conversations not yet pushed to Joplin (or re-exported
since the last sync).
Each entry tracks:
title, project, updated_at, exported_at, file_path,
joplin_note_id (after first sync), joplin_synced_at (after first sync)
File security:
- Permissions: 600 (owner read/write only)
@@ -150,6 +156,59 @@ class Cache:
"""Return all cached entries for a provider (for --cache --show)."""
return dict(self._data.get(provider, {}))
def mark_joplin_synced(self, provider: str, conv_id: str, note_id: str) -> None:
"""Record a successful Joplin sync for a conversation.
Adds ``joplin_note_id`` and ``joplin_synced_at`` to the manifest entry
and writes atomically to disk.
"""
entry = self._data.get(provider, {}).get(conv_id)
if entry is None:
logger.warning(
"[cache] mark_joplin_synced: no cache entry for %s/%s", provider, conv_id[:8]
)
return
entry["joplin_note_id"] = note_id
entry["joplin_synced_at"] = datetime.now(tz=timezone.utc).isoformat()
self._save()
def get_joplin_pending(self, provider: str) -> list[tuple[str, dict]]:
"""Return (conv_id, entry) pairs that need to be synced to Joplin.
A conversation is pending when:
- It has never been synced (no ``joplin_note_id``), OR
- It was re-exported after the last Joplin sync
(``exported_at`` > ``joplin_synced_at``).
Returns:
List of (conv_id, entry_dict) tuples, where entry_dict includes
``file_path``, ``title``, ``project``, and optionally ``joplin_note_id``.
"""
pending = []
for conv_id, entry in self._data.get(provider, {}).items():
if not isinstance(entry, dict):
continue
if not entry.get("file_path"):
continue
note_id = entry.get("joplin_note_id")
if not note_id:
pending.append((conv_id, entry))
continue
# Re-sync if the file was re-exported after the last Joplin sync
exported_at = entry.get("exported_at", "")
synced_at = entry.get("joplin_synced_at", "")
if exported_at and synced_at:
try:
from src.utils import _parse_dt
if _parse_dt(exported_at) > _parse_dt(synced_at):
pending.append((conv_id, entry))
except Exception:
pass
return pending
def last_run(self) -> str | None:
"""Return the ISO8601 timestamp of the last export run, or None."""
return self._data.get("last_run")

View File

@@ -35,6 +35,13 @@ class Config:
log_file: str
# Decoded ChatGPT JWT expiry (None if token absent or not a JWT)
chatgpt_token_expiry: datetime | None = field(default=None, repr=False)
# ChatGPT Project gizmo IDs (g-p-xxx) — project conversations are not
# included in the default /conversations listing; they must be fetched
# separately via /backend-api/gizmos/{id}/conversations.
chatgpt_project_ids: list[str] = field(default_factory=list)
# Joplin local REST API settings (Web Clipper service)
joplin_api_token: str | None = None
joplin_api_url: str = "http://localhost:41184"
def load_config() -> Config:
@@ -54,6 +61,24 @@ def load_config() -> Config:
cache_dir = Path(os.getenv("CACHE_DIR", "~/.ai-chat-exporter")).expanduser()
log_file = os.getenv("LOG_FILE", "~/.ai-chat-exporter/logs/exporter.log").strip()
# Joplin
joplin_token = os.getenv("JOPLIN_API_TOKEN", "").strip() or None
joplin_url = os.getenv("JOPLIN_API_URL", "http://localhost:41184").strip()
# Parse CHATGPT_PROJECT_IDS — comma-separated list of gizmo IDs (g-p-xxx)
_project_ids_raw = os.getenv("CHATGPT_PROJECT_IDS", "").strip()
chatgpt_project_ids = [
pid.strip()
for pid in _project_ids_raw.split(",")
if pid.strip() and pid.strip().startswith("g-p-")
] if _project_ids_raw else []
if _project_ids_raw and not chatgpt_project_ids:
logger.warning(
"CHATGPT_PROJECT_IDS is set but contains no valid project IDs. "
"Each ID should start with 'g-p-' (e.g. g-p-68c2b2b3037c8191890036fb4ae3ed9f). "
"Find your project ID in the browser URL when viewing a project."
)
errors: list[str] = []
# Validate output structure
@@ -108,6 +133,9 @@ def load_config() -> Config:
cache_dir=cache_dir,
log_file=log_file,
chatgpt_token_expiry=chatgpt_expiry,
chatgpt_project_ids=chatgpt_project_ids,
joplin_api_token=joplin_token,
joplin_api_url=joplin_url,
)
_log_startup_summary(config)
@@ -182,16 +210,21 @@ def _log_startup_summary(cfg: Config) -> None:
"""Log a single INFO line summarising the active configuration."""
chatgpt_status = format_token_status(cfg.chatgpt_session_token, cfg.chatgpt_token_expiry)
claude_status = format_token_status(cfg.claude_session_key)
joplin_status = "configured" if cfg.joplin_api_token else "not configured"
logger.info(
"Config loaded | "
"ChatGPT: %s | "
"Claude: %s | "
"chatgpt_projects: %d | "
"Joplin: %s | "
"export_dir=%s | "
"structure=%s | "
"cache_dir=%s",
chatgpt_status,
claude_status,
len(cfg.chatgpt_project_ids),
joplin_status,
cfg.export_dir,
cfg.output_structure,
cfg.cache_dir,

303
src/joplin.py Normal file
View File

@@ -0,0 +1,303 @@
"""Joplin Data API client for importing notes into Joplin desktop."""
import logging
import os
from typing import Any
import requests
logger = logging.getLogger(__name__)
# HTTP timeout for regular API calls (seconds). Notes can be large Markdown
# files so we allow more time than a typical JSON API call.
# Override with JOPLIN_REQUEST_TIMEOUT env var if you have very large conversations.
_REQUEST_TIMEOUT: int = int(os.getenv("JOPLIN_REQUEST_TIMEOUT", "30"))
class JoplinError(Exception):
"""Raised when the Joplin API returns an error or is unreachable."""
class JoplinClient:
"""HTTP client for the Joplin local REST API (Web Clipper service).
Requires Joplin desktop to be running with the Web Clipper service enabled.
Get your API token from: Joplin → Tools → Options → Web Clipper.
Args:
base_url: Joplin API base URL (default: http://localhost:41184).
token: API authorization token from Joplin Web Clipper settings.
"""
def __init__(self, base_url: str, token: str) -> None:
self._base_url = base_url.rstrip("/")
self._token = token
# In-memory cache of notebook title → ID to avoid repeated GET /folders
self._notebook_cache: dict[str, str] = {}
self._notebooks_loaded = False
logger.debug("[joplin] Client initialised with base_url=%s", self._base_url)
# ------------------------------------------------------------------
# Connectivity
# ------------------------------------------------------------------
def ping(self) -> bool:
"""Return True if the Joplin API is reachable and responding.
Note: /ping does not require authentication. A successful ping only
confirms Joplin is running — not that the token is valid. Call
``validate_token()`` to confirm authentication separately.
Raises:
JoplinError: If the API returns an unexpected non-connection error.
"""
url = f"{self._base_url}/ping"
logger.debug("[joplin] GET %s", url)
try:
resp = requests.get(url, timeout=5)
resp.raise_for_status()
ok = "JoplinClipperServer" in resp.text
logger.debug("[joplin] ping → %s (body: %r)", "OK" if ok else "unexpected response", resp.text[:80])
return ok
except requests.exceptions.ConnectionError:
logger.debug("[joplin] ping → connection refused at %s", url)
return False
except requests.exceptions.Timeout:
logger.debug("[joplin] ping → timed out after 5s at %s", url)
return False
except requests.exceptions.RequestException as e:
raise JoplinError(f"Joplin ping failed: {e}") from e
def validate_token(self) -> None:
"""Verify the API token is accepted by Joplin.
Does a minimal authenticated call (GET /folders?limit=1) and raises
``JoplinError`` if authentication fails.
Raises:
JoplinError: If the token is rejected (401) or Joplin is unreachable.
"""
logger.debug("[joplin] Validating API token…")
self._get("/folders", params={"limit": 1, "fields": "id"})
logger.debug("[joplin] Token validated OK")
# ------------------------------------------------------------------
# Notebooks (folders)
# ------------------------------------------------------------------
def list_notebooks(self) -> list[dict]:
"""Return all Joplin notebooks (folders), handling pagination.
Returns:
List of folder dicts with at least ``id`` and ``title`` keys.
"""
results: list[dict] = []
page = 1
while True:
logger.debug("[joplin] GET /folders page=%d", page)
resp = self._get("/folders", params={"page": page, "fields": "id,title"})
items = resp.get("items", [])
results.extend(items)
logger.debug("[joplin] /folders page=%d%d items, has_more=%s", page, len(items), resp.get("has_more"))
if not resp.get("has_more"):
break
page += 1
return results
def get_or_create_notebook(self, title: str) -> str:
"""Return the Joplin folder ID for ``title``, creating it if needed.
Args:
title: Notebook display name (e.g. "ChatGPT - My Project").
Returns:
Joplin folder ID string.
"""
if not self._notebooks_loaded:
self._load_notebook_cache()
if title in self._notebook_cache:
folder_id = self._notebook_cache[title]
logger.debug("[joplin] Notebook cache hit: %r%s", title, folder_id)
return folder_id
# Not found — create it
logger.info("[joplin] Creating notebook: %r", title)
resp = self._post("/folders", {"title": title})
folder_id = resp["id"]
self._notebook_cache[title] = folder_id
logger.debug("[joplin] Notebook created: %r%s", title, folder_id)
return folder_id
# ------------------------------------------------------------------
# Notes
# ------------------------------------------------------------------
def create_note(self, title: str, body: str, parent_id: str) -> str:
"""Create a new note in the specified notebook.
Args:
title: Note title.
body: Note body (Markdown).
parent_id: Notebook (folder) ID.
Returns:
ID of the created note.
"""
logger.debug(
"[joplin] Creating note: %r in notebook %s (%d chars)",
title, parent_id, len(body),
)
resp = self._post("/notes", {"title": title, "body": body, "parent_id": parent_id})
note_id = resp["id"]
logger.info("[joplin] Note created: %r%s", title, note_id)
return note_id
def update_note(self, note_id: str, title: str, body: str) -> None:
"""Update the title and body of an existing note.
Args:
note_id: Joplin note ID.
title: New note title.
body: New note body (Markdown).
"""
logger.debug(
"[joplin] Updating note %s: %r (%d chars)",
note_id, title, len(body),
)
self._put(f"/notes/{note_id}", {"title": title, "body": body})
logger.info("[joplin] Note updated: %r (%s)", title, note_id)
# ------------------------------------------------------------------
# HTTP helpers
# ------------------------------------------------------------------
def _get(self, path: str, params: dict | None = None) -> dict[str, Any]:
url = f"{self._base_url}{path}"
query = {"token": self._token, **(params or {})}
logger.debug("[joplin] GET %s params=%s", path, {k: v for k, v in (params or {}).items()})
try:
resp = requests.get(url, params=query, timeout=_REQUEST_TIMEOUT)
logger.debug("[joplin] GET %s → HTTP %d", path, resp.status_code)
resp.raise_for_status()
return resp.json()
except requests.exceptions.ConnectionError as e:
raise JoplinError(
"Cannot connect to Joplin. Is Joplin desktop running with Web Clipper enabled?"
) from e
except requests.exceptions.Timeout as e:
raise JoplinError(_timeout_message("GET", path)) from e
except requests.exceptions.HTTPError as e:
raise JoplinError(_http_error_message("GET", path, e)) from e
except requests.exceptions.RequestException as e:
raise JoplinError(f"Joplin GET {path} failed: {e}") from e
def _post(self, path: str, data: dict) -> dict[str, Any]:
url = f"{self._base_url}{path}"
logger.debug("[joplin] POST %s", path)
try:
resp = requests.post(url, params={"token": self._token}, json=data, timeout=_REQUEST_TIMEOUT)
logger.debug("[joplin] POST %s → HTTP %d", path, resp.status_code)
resp.raise_for_status()
return resp.json()
except requests.exceptions.ConnectionError as e:
raise JoplinError(
"Cannot connect to Joplin. Is Joplin desktop running with Web Clipper enabled?"
) from e
except requests.exceptions.Timeout as e:
raise JoplinError(_timeout_message("POST", path)) from e
except requests.exceptions.HTTPError as e:
raise JoplinError(_http_error_message("POST", path, e)) from e
except requests.exceptions.RequestException as e:
raise JoplinError(f"Joplin POST {path} failed: {e}") from e
def _put(self, path: str, data: dict) -> dict[str, Any]:
url = f"{self._base_url}{path}"
logger.debug("[joplin] PUT %s", path)
try:
resp = requests.put(url, params={"token": self._token}, json=data, timeout=_REQUEST_TIMEOUT)
logger.debug("[joplin] PUT %s → HTTP %d", path, resp.status_code)
resp.raise_for_status()
return resp.json()
except requests.exceptions.ConnectionError as e:
raise JoplinError(
"Cannot connect to Joplin. Is Joplin desktop running with Web Clipper enabled?"
) from e
except requests.exceptions.Timeout as e:
raise JoplinError(_timeout_message("PUT", path)) from e
except requests.exceptions.HTTPError as e:
raise JoplinError(_http_error_message("PUT", path, e)) from e
except requests.exceptions.RequestException as e:
raise JoplinError(f"Joplin PUT {path} failed: {e}") from e
def _load_notebook_cache(self) -> None:
logger.debug("[joplin] Loading notebook list from Joplin…")
notebooks = self.list_notebooks()
self._notebook_cache = {nb["title"]: nb["id"] for nb in notebooks}
self._notebooks_loaded = True
logger.debug("[joplin] Notebook cache loaded: %d notebooks", len(self._notebook_cache))
for title, folder_id in self._notebook_cache.items():
logger.debug("[joplin] %r%s", title, folder_id)
# ------------------------------------------------------------------
# Error message helper
# ------------------------------------------------------------------
def _timeout_message(method: str, path: str) -> str:
"""Build a clear timeout error message with actionable suggestions."""
return (
f"Joplin {method} {path} timed out after {_REQUEST_TIMEOUT}s. "
"Possible causes:\n"
" • The note body is very large and Joplin is slow to process it.\n"
" • Joplin is busy (syncing, indexing, or loading a large library).\n"
" • Joplin has frozen — try restarting it.\n"
f"If this happens repeatedly, increase JOPLIN_REQUEST_TIMEOUT in your .env "
f"(currently {_REQUEST_TIMEOUT}s)."
)
def _http_error_message(method: str, path: str, e: requests.exceptions.HTTPError) -> str:
"""Build a human-friendly error message from an HTTP error, with auth hint on 401."""
resp = e.response
status = resp.status_code if resp is not None else "?"
if status == 401:
return (
f"Joplin rejected the API token (HTTP 401 on {method} {path}). "
"Check that JOPLIN_API_TOKEN is correct: "
"Joplin → Tools → Options → Web Clipper → Authorization token."
)
if status == 404:
return f"Joplin resource not found (HTTP 404 on {method} {path}). The note may have been deleted in Joplin."
body_snippet = ""
if resp is not None:
try:
body_snippet = f"{resp.text[:120]}"
except Exception:
pass
return f"Joplin {method} {path} failed: HTTP {status}{body_snippet}"
# ------------------------------------------------------------------
# Notebook naming helper
# ------------------------------------------------------------------
_PROVIDER_DISPLAY = {
"chatgpt": "ChatGPT",
"claude": "Claude",
}
def notebook_title(provider: str, project: str | None) -> str:
"""Derive a flat Joplin notebook title from provider and project name.
Examples:
notebook_title("chatgpt", "no-project") → "ChatGPT - No Project"
notebook_title("claude", "budget-tracker") → "Claude - Budget Tracker"
notebook_title("chatgpt", None) → "ChatGPT - No Project"
"""
prov_display = _PROVIDER_DISPLAY.get(provider, provider.capitalize())
proj = (project or "no-project").replace("-", " ").title()
return f"{prov_display} - {proj}"

View File

@@ -1,5 +1,6 @@
"""CLI entry point for ai-chat-exporter."""
import importlib.metadata
import logging
import platform
import shutil
@@ -19,6 +20,7 @@ from src.providers.base import ProviderError
console = Console()
err_console = Console(stderr=True)
logger = logging.getLogger(__name__)
TOS_NOTICE = """\
⚠️ IMPORTANT — TERMS OF SERVICE NOTICE
@@ -45,7 +47,10 @@ Type 'yes' to acknowledge and continue, or Ctrl+C to exit: \
@click.group()
@click.version_option(version="0.1.0", prog_name="ai-chat-exporter")
@click.version_option(
version=importlib.metadata.version("ai-chat-exporter"),
prog_name="ai-chat-exporter",
)
@click.option("--verbose", "-v", is_flag=True, help="Enable DEBUG output to console.")
@click.option("--quiet", "-q", is_flag=True, help="Show WARNING and above only.")
@click.option("--debug", is_flag=True, help="DEBUG + full tracebacks + redacted API bodies.")
@@ -175,6 +180,39 @@ def _auth_chatgpt(os_name: str) -> None:
_write_token_to_env("CHATGPT_SESSION_TOKEN", token)
# --- ChatGPT Projects ---
console.print("\n[bold]ChatGPT Projects (optional)[/bold]")
console.print(
"Project conversations are stored separately and are not included in the\n"
"default conversation listing. To export them, you need each project's ID.\n"
)
console.print("How to find a project ID:")
console.print(" 1. Open ChatGPT and click into a Project in the left sidebar.")
console.print(" 2. Look at the browser URL — it will look like:")
console.print(" [dim]https://chatgpt.com/g/[bold]g-p-68c2b2b3037c8191890036fb4ae3ed9f[/bold]-my-project/project[/dim]")
console.print(" 3. Copy the part starting with [bold]g-p-[/bold] up to (but not including) the slug.")
console.print(" Enter multiple IDs separated by commas. Leave blank to skip.\n")
project_ids_raw = click.prompt(
"ChatGPT project IDs (comma-separated, e.g. g-p-xxx,g-p-yyy)",
default="",
show_default=False,
).strip()
if project_ids_raw:
ids = [pid.strip() for pid in project_ids_raw.split(",") if pid.strip()]
valid = [pid for pid in ids if pid.startswith("g-p-")]
invalid = [pid for pid in ids if not pid.startswith("g-p-")]
if invalid:
console.print(f"[yellow]Warning: skipping IDs that don't start with 'g-p-': {invalid}[/yellow]")
if valid:
_write_token_to_env("CHATGPT_PROJECT_IDS", ",".join(valid))
console.print(f"[green]Saved {len(valid)} project ID(s).[/green]")
else:
console.print("[yellow]No valid project IDs — skipping.[/yellow]")
else:
console.print("[dim]Skipped project IDs.[/dim]")
def _auth_claude(os_name: str) -> None:
console.print("\n[bold]─── Claude ───[/bold]")
@@ -395,6 +433,15 @@ def _print_doctor_table(checks: list[dict]) -> None:
default=None,
help="Only export conversations updated after this date (YYYY-MM-DD).",
)
@click.option(
"--project",
"project_filter",
default=None,
help=(
"Only export conversations in a matching project (case-insensitive substring). "
"Use 'none' for conversations outside any project."
),
)
@click.option("--dry-run", is_flag=True, help="Show what would be exported without writing anything.")
@click.pass_context
def export(
@@ -403,6 +450,7 @@ def export(
fmt: str,
output_dir: str | None,
since: str | None,
project_filter: str | None,
dry_run: bool,
) -> None:
"""Export new and updated conversations to Markdown or JSON.
@@ -474,6 +522,12 @@ def export(
summary[prov_name]["failed"] += len(all_convs) if "all_convs" in dir() else 0
continue
if project_filter is not None:
all_convs = _filter_by_project(all_convs, project_filter)
console.print(
f" [dim]--project filter '{project_filter}': {len(all_convs)} matching conversations.[/dim]"
)
to_export = cache.get_new_or_updated(prov_name, all_convs)
skipped = len(all_convs) - len(to_export)
summary[prov_name]["skipped"] = skipped
@@ -522,13 +576,11 @@ def export(
progress.advance(task)
except ProviderError as e:
logger = logging.getLogger(__name__)
logger.error("Failed to export conversation %s: %s", conv_id[:8], e)
summary[prov_name]["failed"] += 1
progress.advance(task)
continue
except OSError as e:
logger = logging.getLogger(__name__)
logger.error("File write failed for conversation %s: %s", conv_id[:8], e)
summary[prov_name]["failed"] += 1
progress.advance(task)
@@ -560,7 +612,21 @@ def _resolve_providers(provider: str, cfg) -> list[tuple[str, object]]:
from src.providers.claude import ClaudeProvider
if provider in ("chatgpt", "all"):
try_add("chatgpt", cfg.chatgpt_session_token, ChatGPTProvider)
if cfg.chatgpt_session_token:
try:
result.append((
"chatgpt",
ChatGPTProvider(
session_token=cfg.chatgpt_session_token,
project_ids=cfg.chatgpt_project_ids,
),
))
except ProviderError as e:
logging.getLogger(__name__).warning(
"[chatgpt] Could not initialise provider: %s", e
)
elif provider == "chatgpt" or provider == "all":
logging.getLogger(__name__).warning("[chatgpt] Skipping — token not configured.")
if provider in ("claude", "all"):
try_add("claude", cfg.claude_session_key, ClaudeProvider)
@@ -596,6 +662,44 @@ def _print_dry_run_table(prov_name, to_export, prov_instance, export_base, struc
console.print(f" [dim]{skipped} conversations already cached (would be skipped).[/dim]")
def _raw_project_name(conv: dict) -> str | None:
"""Extract the project name from a raw conversation summary dict.
Handles both ChatGPT (annotated _project_name) and Claude (project dict).
"""
# ChatGPT: annotated during fetch_all_conversations
if "_project_name" in conv:
return conv["_project_name"] or None
# Claude: project is a dict with a 'name' key, or a plain string
project = conv.get("project")
if isinstance(project, dict):
return project.get("name") or None
if isinstance(project, str):
return project or None
return None
def _filter_by_project(convs: list[dict], project_filter: str) -> list[dict]:
"""Filter conversations by project name.
project_filter='none' → keep only conversations with no project.
Otherwise → case-insensitive substring match on the project name.
"""
want_none = project_filter.lower() == "none"
needle = project_filter.lower()
result = []
for conv in convs:
name = _raw_project_name(conv)
if want_none:
if name is None:
result.append(conv)
else:
if name and needle in name.lower():
result.append(conv)
return result
def _print_export_summary(summary: dict[str, dict[str, int]]) -> None:
table = Table(title="Export Summary")
table.add_column("Provider", style="bold")
@@ -626,8 +730,17 @@ def _print_export_summary(summary: dict[str, dict[str, int]]) -> None:
default="all",
show_default=True,
)
@click.option(
"--project",
"project_filter",
default=None,
help=(
"Only list conversations in a matching project (case-insensitive substring). "
"Use 'none' for conversations outside any project."
),
)
@click.pass_context
def list_conversations(ctx: click.Context, provider: str) -> None:
def list_conversations(ctx: click.Context, provider: str, project_filter: str | None) -> None:
"""List conversations without exporting them."""
debug = ctx.obj.get("debug", False)
cfg = _load_config_or_exit(debug)
@@ -641,6 +754,9 @@ def list_conversations(ctx: click.Context, provider: str) -> None:
_handle_provider_error(e, debug)
continue
if project_filter is not None:
all_convs = _filter_by_project(all_convs, project_filter)
table = Table()
table.add_column("Title")
table.add_column("Project")
@@ -649,9 +765,7 @@ def list_conversations(ctx: click.Context, provider: str) -> None:
for conv in all_convs:
title = conv.get("title") or "Untitled"
project = conv.get("project_title") or ""
if isinstance(conv.get("project"), dict):
project = conv["project"].get("name", "")
project = _raw_project_name(conv) or ""
updated = (conv.get("updated_at") or conv.get("update_time") or "")[:10]
conv_id = (conv.get("id") or conv.get("uuid") or "")[:8]
table.add_row(title[:60], project[:30], updated, conv_id)
@@ -700,6 +814,240 @@ def cache(ctx: click.Context, show: bool, clear: bool, provider: str) -> None:
console.print("Specify --show or --clear. Use --help for options.")
# ──────────────────────────────────────────────────────────────────────────────
# joplin command
# ──────────────────────────────────────────────────────────────────────────────
@cli.command()
@click.option(
"--provider",
type=click.Choice(["chatgpt", "claude", "all"], case_sensitive=False),
default="all",
show_default=True,
help="Which provider's conversations to sync to Joplin.",
)
@click.option(
"--project",
"project_filter",
default=None,
help=(
"Only sync conversations in a matching project (case-insensitive substring). "
"Use 'none' for conversations outside any project."
),
)
@click.option("--dry-run", is_flag=True, help="Show what would be synced without sending anything to Joplin.")
@click.pass_context
def joplin(ctx: click.Context, provider: str, project_filter: str | None, dry_run: bool) -> None:
"""Sync exported conversations to Joplin as notes.
Reads the local export cache and pushes exported Markdown files to Joplin
via its local REST API. Requires Joplin desktop to be running with the
Web Clipper service enabled.
Notebooks are created automatically based on provider and project:
exports/chatgpt/my-project/ → "ChatGPT - My Project" notebook
exports/claude/no-project/ → "Claude - No Project" notebook
Re-running is safe: notes are updated (not duplicated) on subsequent runs.
Setup:
1. Open Joplin desktop.
2. Go to Tools → Options → Web Clipper.
3. Enable the Web Clipper service.
4. Copy the Authorization token.
5. Set JOPLIN_API_TOKEN=<token> in your .env file.
"""
debug = ctx.obj.get("debug", False)
cache_obj: Cache = ctx.obj["cache"]
cfg = _load_config_or_exit(debug)
if not cfg.joplin_api_token:
err_console.print(
"[red]JOPLIN_API_TOKEN is not set.[/red]\n"
" 1. Open Joplin → Tools → Options → Web Clipper.\n"
" 2. Enable the Web Clipper service.\n"
" 3. Copy the Authorization token.\n"
" 4. Add [bold]JOPLIN_API_TOKEN=<token>[/bold] to your .env file."
)
sys.exit(1)
from src.joplin import JoplinClient, JoplinError, notebook_title
client = JoplinClient(cfg.joplin_api_url, cfg.joplin_api_token)
if not dry_run:
console.print(f"[dim]Connecting to Joplin at {cfg.joplin_api_url}…[/dim]")
try:
if not client.ping():
err_console.print(
"[red]Joplin is not responding.[/red] "
"Make sure Joplin desktop is open and Web Clipper is enabled."
)
sys.exit(1)
# Ping succeeded but doesn't validate the token — check auth separately
client.validate_token()
except JoplinError as e:
err_console.print(f"[red]Joplin connection error:[/red] {e}")
sys.exit(1)
console.print("[green]Joplin connected and token validated.[/green]")
# Determine which providers to process
providers_to_sync: list[str] = []
if provider in ("chatgpt", "all"):
providers_to_sync.append("chatgpt")
if provider in ("claude", "all"):
providers_to_sync.append("claude")
summary: dict[str, dict[str, int]] = {}
for prov_name in providers_to_sync:
summary[prov_name] = {"created": 0, "updated": 0, "skipped": 0, "failed": 0}
pending = cache_obj.get_joplin_pending(prov_name)
logger.debug("[joplin] %s: %d pending before filter", prov_name, len(pending))
# Apply --project filter against the cached entry's project field
if project_filter is not None:
want_none = project_filter.lower() == "none"
needle = project_filter.lower()
filtered = []
for conv_id, entry in pending:
proj = entry.get("project") or None
if want_none:
if proj is None or proj == "no-project":
filtered.append((conv_id, entry))
else:
if proj and needle in proj.lower():
filtered.append((conv_id, entry))
logger.debug(
"[joplin] %s: --project %r filtered %d%d",
prov_name, project_filter, len(pending), len(filtered),
)
pending = filtered
if not pending:
console.print(f"\n[bold cyan][{prov_name.upper()}][/bold cyan] All up to date — nothing to sync.")
continue
console.print(
f"\n[bold cyan][{prov_name.upper()}][/bold cyan] "
f"{len(pending)} conversation(s) to sync to Joplin."
)
if dry_run:
_print_joplin_dry_run_table(prov_name, pending)
continue
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TaskProgressColumn(),
console=console,
) as progress:
task = progress.add_task(f"Syncing {prov_name}", total=len(pending))
for conv_id, entry in pending:
file_path = entry.get("file_path", "")
title = entry.get("title") or "Untitled"
project = entry.get("project") or None
existing_note_id = entry.get("joplin_note_id")
action = "update" if existing_note_id else "create"
logger.debug(
"[joplin] %s %s/%s: %s (file=%s)",
action, prov_name, conv_id[:8], title[:60], file_path,
)
try:
# Read the exported Markdown file
body = Path(file_path).read_text(encoding="utf-8")
logger.debug("[joplin] Read %d chars from %s", len(body), file_path)
# Get or create the notebook
nb_title = notebook_title(prov_name, project)
notebook_id = client.get_or_create_notebook(nb_title)
if existing_note_id:
client.update_note(existing_note_id, title, body)
cache_obj.mark_joplin_synced(prov_name, conv_id, existing_note_id)
summary[prov_name]["updated"] += 1
else:
note_id = client.create_note(title, body, notebook_id)
cache_obj.mark_joplin_synced(prov_name, conv_id, note_id)
summary[prov_name]["created"] += 1
except FileNotFoundError:
logger.warning(
"[joplin] Skipping %s/%s — exported file not found: %s",
prov_name, conv_id[:8], file_path,
)
summary[prov_name]["skipped"] += 1
except JoplinError as e:
logger.error(
"[joplin] Failed to %s note for %s/%s: %s",
action, prov_name, conv_id[:8], e,
)
summary[prov_name]["failed"] += 1
except OSError as e:
logger.error(
"[joplin] File read error for %s/%s (%s): %s",
prov_name, conv_id[:8], file_path, e,
)
summary[prov_name]["failed"] += 1
finally:
progress.advance(task)
if not dry_run:
_print_joplin_summary(summary)
def _print_joplin_dry_run_table(prov_name: str, pending: list[tuple[str, dict]]) -> None:
from src.joplin import notebook_title
table = Table(title=f"[DRY RUN] {prov_name.upper()} — Would sync {len(pending)} conversation(s)")
table.add_column("Title")
table.add_column("Project")
table.add_column("Notebook")
table.add_column("Action")
for conv_id, entry in pending[:50]:
title = entry.get("title") or "Untitled"
project = entry.get("project") or "no-project"
nb = notebook_title(prov_name, entry.get("project"))
action = "update" if entry.get("joplin_note_id") else "create"
table.add_row(title[:50], project[:30], nb, action)
if len(pending) > 50:
table.add_row(f"… and {len(pending) - 50} more", "", "", "")
console.print(table)
def _print_joplin_summary(summary: dict[str, dict[str, int]]) -> None:
table = Table(title="Joplin Sync Summary")
table.add_column("Provider", style="bold")
table.add_column("Created", justify="right")
table.add_column("Updated", justify="right")
table.add_column("Skipped", justify="right")
table.add_column("Failed", justify="right")
for prov, counts in summary.items():
table.add_row(
prov.capitalize(),
str(counts["created"]),
str(counts["updated"]),
str(counts["skipped"]),
f"[red]{counts['failed']}[/red]" if counts["failed"] else "0",
)
console.print(table)
# ──────────────────────────────────────────────────────────────────────────────
# Helpers
# ──────────────────────────────────────────────────────────────────────────────

View File

@@ -11,6 +11,21 @@ import requests
from src.utils import redact_secrets
# curl_cffi has its own exception hierarchy (rooted at CurlError → OSError),
# completely separate from requests.exceptions. Import them so _make_request
# can catch both when a curl_cffi session is in use.
try:
from curl_cffi.requests.exceptions import (
HTTPError as _CurlHTTPError,
ConnectionError as _CurlConnectionError,
Timeout as _CurlTimeout,
)
except ImportError:
# Fall back to requests types — catching them twice is harmless.
_CurlHTTPError = requests.HTTPError # type: ignore[misc,assignment]
_CurlConnectionError = requests.ConnectionError # type: ignore[misc,assignment]
_CurlTimeout = requests.Timeout # type: ignore[misc,assignment]
logger = logging.getLogger(__name__)
# Request timeouts (connect, read) in seconds
@@ -271,7 +286,7 @@ class BaseProvider(ABC):
except ProviderError:
raise
except (requests.ConnectionError, requests.Timeout) as e:
except (requests.ConnectionError, requests.Timeout, _CurlConnectionError, _CurlTimeout) as e:
last_exc = e
if attempt > MAX_RETRIES:
raise ProviderError(
@@ -293,7 +308,7 @@ class BaseProvider(ABC):
)
time.sleep(wait)
except requests.HTTPError as e:
except (requests.HTTPError, _CurlHTTPError) as e:
raise ProviderError(
self.provider_name, f"{method} {url}", e
) from e

View File

@@ -1,4 +1,23 @@
"""ChatGPT provider — accesses chat.openai.com internal web API."""
"""ChatGPT provider — accesses chat.openai.com internal web API.
ChatGPT Projects discovery
--------------------------
ChatGPT Projects are internally implemented as "snorlax"-type gizmos with IDs
starting with "g-p-". They are *not* returned by any gizmo listing endpoint
(/gizmos/mine, /gizmos/pinned, /gizmos/discovery, /gizmos/search). The
frontend appears to load project IDs from page-level state, not a dedicated
listing API.
Therefore, project IDs must be supplied by the user via CHATGPT_PROJECT_IDS.
Each project gizmo ID looks like "g-p-68c2b2b3037c8191890036fb4ae3ed9f" and
can be read from the browser URL when viewing a project:
https://chatgpt.com/g/{project-gizmo-id}-{slug}/project
Project conversations are fetched via cursor-based pagination at:
GET /backend-api/gizmos/{project_gizmo_id}/conversations?cursor=0
Response: {"items": [...], "cursor": "<opaque_base64_or_null>"}
Pagination ends when cursor is null or an empty string.
"""
import logging
import os
@@ -34,17 +53,22 @@ class ChatGPTProvider(BaseProvider):
provider_name = "chatgpt"
def __init__(self, session_token: str | None = None) -> None:
def __init__(
self,
session_token: str | None = None,
project_ids: list[str] | None = None,
) -> None:
# Pass a curl_cffi session to the base class instead of a requests.Session.
# curl_cffi.requests.Session is API-compatible with requests.Session.
cf_session = curl_requests.Session(impersonate=IMPERSONATE)
super().__init__(session=cf_session) # type: ignore[arg-type]
# Remove the User-Agent set by BaseProvider. curl_cffi sets a UA that is
# consistent with its TLS JA3 fingerprint for chrome120. If we leave a
# mismatched UA (e.g. Chrome/121 header with Chrome/120 TLS), Cloudflare's
# bot detection flags it. Removing it lets curl_cffi manage its own UA.
# Remove headers that curl_cffi manages as part of its Chrome fingerprint.
# Overriding User-Agent, Accept, or Accept-Language with non-Chrome values
# creates header/TLS inconsistencies that Cloudflare's bot detection flags.
self._session.headers.pop("User-Agent", None)
self._session.headers.pop("Accept", None)
self._session.headers.pop("Accept-Language", None)
token = session_token or os.getenv("CHATGPT_SESSION_TOKEN", "").strip()
if not token:
@@ -58,6 +82,17 @@ class ChatGPTProvider(BaseProvider):
)
self._session_token = token
# Project gizmo IDs (g-p-xxx) whose conversations we'll fetch.
# ChatGPT project conversations do not appear in the default
# /conversations listing — they require explicit project IDs.
self._project_ids: list[str] = project_ids or []
# Maps conv_id → project_name; populated by fetch_all_conversations()
self._project_map: dict[str, str] = {}
# Cache of project_id → display name (avoids re-fetching gizmo details)
self._project_name_cache: dict[str, str] = {}
# Set the session cookie in the cookie jar
self._session.cookies.set(
"__Secure-next-auth.session-token",
@@ -66,10 +101,13 @@ class ChatGPTProvider(BaseProvider):
path="/",
)
# Set only Referer and sec-fetch-* headers for the auth exchange.
# Origin is intentionally omitted: Chrome does not send Origin on
# same-origin GET requests, and its presence alongside
# sec-fetch-site: same-origin contradicts the browser fingerprint.
self._session.headers.update(
{
"Referer": "https://chatgpt.com/",
"Origin": "https://chatgpt.com",
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-origin",
@@ -78,8 +116,16 @@ class ChatGPTProvider(BaseProvider):
# Exchange the session cookie for an access token
self._access_token: str = self._fetch_access_token()
# Now set backend-api headers (after auth, so they don't interfere with
# the auth exchange which expects a browser-style request).
self._session.headers["Authorization"] = f"Bearer {self._access_token}"
logger.debug("[chatgpt] Session initialised with Chrome TLS impersonation (token: [REDACTED])")
self._session.headers["Accept"] = "application/json"
self._session.headers["Origin"] = "https://chatgpt.com"
logger.debug(
"[chatgpt] Session initialised (Chrome TLS impersonation, %d project ID(s) configured)",
len(self._project_ids),
)
def _fetch_access_token(self) -> str:
"""Exchange the session cookie for a Bearer access token.
@@ -132,14 +178,22 @@ class ChatGPTProvider(BaseProvider):
RuntimeError("401 Unauthorized — ChatGPT token expired"),
)
# ------------------------------------------------------------------
# Default workspace conversations (offset-based pagination)
# ------------------------------------------------------------------
def list_conversations(self, offset: int = 0, limit: int = 100) -> list[dict]:
"""Fetch one page of conversations.
"""Fetch one page of conversations from the default workspace.
Note: Project conversations are NOT included here. They require
separate fetching via list_project_conversations().
Returns:
List of conversation summary dicts.
"""
url = f"{BASE_URL}/conversations"
params = {"offset": offset, "limit": limit, "order": "updated"}
logger.debug("[chatgpt] list_conversations: GET %s params=%s", url, params)
try:
data = self._make_request("GET", url, params=params)
except ProviderError:
@@ -149,18 +203,315 @@ class ChatGPTProvider(BaseProvider):
if not isinstance(data, dict):
self._warn_unexpected_schema("list_conversations", "root")
logger.debug("[chatgpt] list_conversations: unexpected root type %s", type(data))
return []
items = data.get("items")
if items is None:
self._warn_unexpected_schema("list_conversations", "items")
logger.debug("[chatgpt] list_conversations: response keys = %s", list(data.keys()))
return []
logger.debug("[chatgpt] list_conversations: got %d items (offset=%d)", len(items), offset)
return items
# ------------------------------------------------------------------
# Project conversations (cursor-based pagination)
# ------------------------------------------------------------------
def _fetch_project_name(self, project_id: str) -> str:
"""Fetch the display name for a project gizmo.
Calls GET /backend-api/gizmos/{project_id} and returns the display
name from gizmo.display.name. Falls back to the project_id itself
if the fetch fails or the name is missing.
Result is cached in self._project_name_cache.
"""
if project_id in self._project_name_cache:
return self._project_name_cache[project_id]
url = f"{BASE_URL}/gizmos/{project_id}"
logger.debug("[chatgpt] _fetch_project_name: GET %s", url)
try:
data = self._make_request("GET", url)
gizmo = data.get("gizmo", {}) if isinstance(data, dict) else {}
name = (gizmo.get("display") or {}).get("name") or gizmo.get("name") or ""
name = name.strip() or project_id
gizmo_type = gizmo.get("gizmo_type", "?")
logger.debug(
"[chatgpt] _fetch_project_name[%s]: name=%r gizmo_type=%r",
project_id[:12],
name,
gizmo_type,
)
except ProviderError as e:
logger.warning(
"[chatgpt] Could not fetch project name for %s: %s — using ID as name",
project_id,
e,
)
name = project_id
self._project_name_cache[project_id] = name
return name
def list_project_conversations(
self, project_id: str, cursor: str = "0"
) -> tuple[list[dict], str | None]:
"""Fetch one page of conversations for a project gizmo.
Uses cursor-based pagination (not offset). The initial cursor is "0".
Subsequent cursors come from the response's "cursor" field.
Endpoint: GET /backend-api/gizmos/{project_id}/conversations?cursor=<cursor>
Returns:
(items, next_cursor) — next_cursor is None or "" when exhausted.
"""
url = f"{BASE_URL}/gizmos/{project_id}/conversations"
params = {"cursor": cursor}
logger.debug(
"[chatgpt] list_project_conversations[%s]: GET %s cursor=%r",
project_id[:12],
url,
cursor,
)
try:
data = self._make_request("GET", url, params=params)
except ProviderError:
raise
except Exception as e:
raise ProviderError(self.provider_name, "list_project_conversations", e) from e
logger.debug(
"[chatgpt] list_project_conversations[%s]: response type=%s",
project_id[:12],
type(data).__name__,
)
if isinstance(data, list):
# Bare list — no next cursor available
logger.debug(
"[chatgpt] list_project_conversations[%s]: bare list with %d items",
project_id[:12],
len(data),
)
return data, None
if not isinstance(data, dict):
self._warn_unexpected_schema("list_project_conversations", "root")
logger.debug(
"[chatgpt] list_project_conversations[%s]: unexpected type %s value=%r",
project_id[:12],
type(data),
data,
)
return [], None
logger.debug(
"[chatgpt] list_project_conversations[%s]: response keys=%s",
project_id[:12],
list(data.keys()),
)
items = data.get("items") or data.get("conversations") or []
next_cursor = data.get("cursor") or None # empty string → treat as None
if not items and data:
logger.debug(
"[chatgpt] list_project_conversations[%s]: no items found; full response=%r",
project_id[:12],
data,
)
logger.debug(
"[chatgpt] list_project_conversations[%s]: %d items, next_cursor=%r",
project_id[:12],
len(items),
next_cursor[:20] + "" if next_cursor and len(next_cursor) > 20 else next_cursor,
)
return items, next_cursor
# ------------------------------------------------------------------
# Combined fetch (default workspace + all configured projects)
# ------------------------------------------------------------------
def fetch_all_conversations(self, since=None) -> list[dict]:
"""Fetch all conversations: default workspace + every configured project.
ChatGPT project conversations are not included in the default
/conversations listing. They must be fetched separately via the
gizmos conversations endpoint using project IDs from CHATGPT_PROJECT_IDS.
Builds self._project_map (conv_id → project_name) as a side effect so
that normalize_conversation() can attach the project name without an
additional API call.
Args:
since: Optional datetime — only return conversations updated at or
after this time (client-side filter, same as base class).
Returns:
Combined list of raw conversation summary dicts.
"""
# Reset maps so a fresh fetch always rebuilds them cleanly
self._project_map = {}
# --- Default workspace (base class handles offset-based pagination) ---
logger.info("[chatgpt] Fetching default workspace conversations…")
default_convs = super().fetch_all_conversations(since=None)
logger.info("[chatgpt] Default workspace: %d conversations", len(default_convs))
# --- Project conversations ---
if not self._project_ids:
logger.info(
"[chatgpt] No project IDs configured — skipping project conversations. "
"To include projects, set CHATGPT_PROJECT_IDS in .env "
"(see 'python -m src.main auth' for instructions)."
)
return self._apply_since_filter(default_convs, since)
logger.info(
"[chatgpt] Fetching conversations for %d project(s): %s",
len(self._project_ids),
self._project_ids,
)
project_convs: list[dict] = []
for project_id in self._project_ids:
project_name = self._fetch_project_name(project_id)
logger.info(
"[chatgpt] Project '%s' (%s): fetching conversations…",
project_name,
project_id,
)
cursor: str = "0"
page = 0
project_total = 0
while True:
page += 1
logger.debug(
"[chatgpt] Project '%s': page %d cursor=%r",
project_name,
page,
cursor[:20] + "" if len(cursor) > 20 else cursor,
)
try:
batch, next_cursor = self.list_project_conversations(
project_id, cursor=cursor
)
except ProviderError as e:
logger.warning(
"[chatgpt] Project '%s': failed to fetch page %d: %s — stopping pagination",
project_name,
page,
e,
)
break
if not batch:
logger.debug(
"[chatgpt] Project '%s': empty batch on page %d — done",
project_name,
page,
)
break
for conv in batch:
conv_id = conv.get("id")
if conv_id:
self._project_map[conv_id] = project_name
else:
logger.debug(
"[chatgpt] Project '%s': conversation with no id: %r",
project_name,
conv,
)
# Annotate so callers can filter by project without the map
conv["_project_name"] = project_name
project_convs.extend(batch)
project_total += len(batch)
logger.debug(
"[chatgpt] Project '%s': page %d%d items (project total: %d)",
project_name,
page,
len(batch),
project_total,
)
if not next_cursor:
logger.debug(
"[chatgpt] Project '%s': no next cursor — pagination complete",
project_name,
)
break
cursor = next_cursor
logger.info(
"[chatgpt] Project '%s': %d conversations fetched",
project_name,
project_total,
)
all_convs = default_convs + project_convs
logger.info(
"[chatgpt] Total: %d conversations (%d default + %d from %d project(s))",
len(all_convs),
len(default_convs),
len(project_convs),
len(self._project_ids),
)
logger.debug(
"[chatgpt] _project_map: %d entries → %s",
len(self._project_map),
{k[:8]: v for k, v in self._project_map.items()},
)
return self._apply_since_filter(all_convs, since)
def _apply_since_filter(self, convs: list[dict], since) -> list[dict]:
"""Filter conversations to those updated at or after `since`."""
if since is None:
return convs
since_naive = since.replace(tzinfo=None)
filtered = []
for c in convs:
raw_ts = c.get("updated_at") or c.get("update_time") or ""
if raw_ts:
try:
from src.utils import _parse_dt
updated = _parse_dt(str(raw_ts)).replace(tzinfo=None)
if updated >= since_naive:
filtered.append(c)
except Exception:
filtered.append(c) # include if date unparseable
else:
filtered.append(c)
logger.info(
"[chatgpt] After --since filter: %d/%d conversations",
len(filtered),
len(convs),
)
return filtered
# ------------------------------------------------------------------
# Single conversation detail
# ------------------------------------------------------------------
def get_conversation(self, conv_id: str) -> dict:
"""Fetch full conversation detail for a single ID."""
url = f"{BASE_URL}/conversation/{conv_id}"
logger.debug("[chatgpt] get_conversation: GET %s", url)
try:
data = self._make_request("GET", url)
except ProviderError:
@@ -172,25 +523,41 @@ class ChatGPTProvider(BaseProvider):
self._warn_unexpected_schema("get_conversation", "root")
return {}
logger.debug(
"[chatgpt] get_conversation[%s]: keys=%s mapping_size=%d",
conv_id[:8],
list(data.keys()),
len(data.get("mapping", {})),
)
return data
# ------------------------------------------------------------------
# Normalization
# ------------------------------------------------------------------
def normalize_conversation(self, raw: dict) -> dict:
"""Transform ChatGPT raw schema to the common normalized schema.
ChatGPT stores messages in a nested ``mapping`` dict where each node
has an ``id``, ``message``, and ``children`` list. We walk the tree
from the root node to build a flat ordered message list.
Project name is looked up from self._project_map (populated by
fetch_all_conversations). The conversation detail endpoint does not
include project information.
"""
conv_id = raw.get("id", "")
title = raw.get("title") or "Untitled"
created_at = _ts_to_iso(raw.get("create_time"))
updated_at = _ts_to_iso(raw.get("update_time"))
# Project info — ChatGPT calls it "gizmo_id" or stores project info differently.
# As of 2024, personal projects appear as a separate projects API; conversations
# linked to a project have a non-null `workspace_id` or similar field.
# We use `project_title` if present, else None.
project: str | None = raw.get("project_title") or raw.get("workspace_title") or None
# Look up project name from the map built during fetch_all_conversations.
project = self._project_map.get(conv_id) if conv_id else None
logger.debug(
"[chatgpt] normalize_conversation[%s]: project_map lookup → %r",
conv_id[:8] if conv_id else "?",
project,
)
mapping: dict = raw.get("mapping", {})
messages = _extract_messages(mapping, raw, conv_id)