"""Local cache manifest for tracking exported and Joplin-synced conversations.""" import json import logging import os import shutil from datetime import datetime, timezone from pathlib import Path from typing import Any logger = logging.getLogger(__name__) MANIFEST_VERSION = 1 class CacheError(Exception): """Raised when the cache manifest cannot be loaded or written.""" class Cache: """Manages the local JSON manifest of exported and Joplin-synced conversations. The manifest is the single source of truth for what has been exported and synced. Every export run compares the provider's full conversation list against this manifest to determine what is new or updated. The Joplin sync run reads it to find conversations not yet pushed to Joplin (or re-exported since the last sync). Each entry tracks: title, project, updated_at, exported_at, file_path, joplin_note_id (after first sync), joplin_synced_at (after first sync) File security: - Permissions: 600 (owner read/write only) - Atomic writes: .tmp → os.replace() - chmod happens BEFORE os.replace() to prevent permission race windows """ def __init__(self, cache_dir: Path | str) -> None: self._dir = Path(cache_dir).expanduser() self._dir.mkdir(parents=True, exist_ok=True) self._path = self._dir / "manifest.json" self._data: dict[str, Any] = self._load() # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ def is_cached(self, provider: str, conv_id: str, updated_at: str) -> bool: """Return True if this conversation is already exported and up to date. A conversation is considered stale if the provider's ``updated_at`` is newer than the cached ``exported_at``. """ entry = self._data.get(provider, {}).get(conv_id) if entry is None: logger.debug("[cache] MISS %s/%s", provider, conv_id[:8]) return False cached_updated = entry.get("updated_at", "") if not cached_updated or not updated_at: logger.debug("[cache] HIT (no date comparison) %s/%s", provider, conv_id[:8]) return True try: from src.utils import _parse_dt cached_dt = _parse_dt(cached_updated) provider_dt = _parse_dt(updated_at) if provider_dt > cached_dt: logger.debug("[cache] STALE %s/%s", provider, conv_id[:8]) return False except Exception: pass logger.debug("[cache] HIT %s/%s", provider, conv_id[:8]) return True def mark_exported(self, provider: str, conv_id: str, metadata: dict) -> None: """Record a successfully exported conversation in the manifest. Writes to disk immediately (not batched) so progress is preserved if the process is interrupted. """ if provider not in self._data: self._data[provider] = {} self._data[provider][conv_id] = { "title": metadata.get("title", ""), "project": metadata.get("project"), "updated_at": metadata.get("updated_at", ""), "exported_at": datetime.now(tz=timezone.utc).isoformat(), "file_path": metadata.get("file_path", ""), } self._data["last_run"] = datetime.now(tz=timezone.utc).isoformat() self._save() def get_new_or_updated(self, provider: str, conversations: list[dict]) -> list[dict]: """Filter a conversation list to only new or updated conversations. Args: provider: "chatgpt" or "claude" conversations: List of raw conversation dicts from the provider. Each must have an ``id``/``uuid`` and ``updated_at``/``update_time``. Returns: Subset that needs to be exported. """ result = [] for conv in conversations: conv_id = conv.get("id") or conv.get("uuid", "") updated_at = conv.get("updated_at") or conv.get("update_time") or "" if conv_id and not self.is_cached(provider, conv_id, updated_at): result.append(conv) return result def stats(self) -> dict[str, int]: """Return count of cached conversations per provider.""" return { provider: len(entries) for provider, entries in self._data.items() if isinstance(entries, dict) and provider not in ("version", "last_run", "tos_acknowledged_at") } def clear(self, provider: str | None = None) -> None: """Clear cached entries. Args: provider: If given, clear only that provider. If None, clear all. """ if provider: cleared = len(self._data.get(provider, {})) self._data[provider] = {} logger.info("[cache] Cleared %d entries for provider '%s'", cleared, provider) else: for key in list(self._data.keys()): if isinstance(self._data[key], dict) and key not in ( "version", "last_run", "tos_acknowledged_at", ): self._data[key] = {} logger.info("[cache] Cleared all provider entries") self._save() def is_tos_acknowledged(self) -> bool: """Return True if the user has acknowledged the ToS notice.""" return bool(self._data.get("tos_acknowledged_at")) def acknowledge_tos(self) -> None: """Record ToS acknowledgement with a timestamp.""" self._data["tos_acknowledged_at"] = datetime.now(tz=timezone.utc).isoformat() self._save() logger.info("[cache] ToS acknowledged at %s", self._data["tos_acknowledged_at"]) def get_all_entries(self, provider: str) -> dict: """Return all cached entries for a provider (for --cache --show).""" return dict(self._data.get(provider, {})) def mark_joplin_synced(self, provider: str, conv_id: str, note_id: str) -> None: """Record a successful Joplin sync for a conversation. Adds ``joplin_note_id`` and ``joplin_synced_at`` to the manifest entry and writes atomically to disk. """ entry = self._data.get(provider, {}).get(conv_id) if entry is None: logger.warning( "[cache] mark_joplin_synced: no cache entry for %s/%s", provider, conv_id[:8] ) return entry["joplin_note_id"] = note_id entry["joplin_synced_at"] = datetime.now(tz=timezone.utc).isoformat() self._save() def get_joplin_pending(self, provider: str) -> list[tuple[str, dict]]: """Return (conv_id, entry) pairs that need to be synced to Joplin. A conversation is pending when: - It has never been synced (no ``joplin_note_id``), OR - It was re-exported after the last Joplin sync (``exported_at`` > ``joplin_synced_at``). Returns: List of (conv_id, entry_dict) tuples, where entry_dict includes ``file_path``, ``title``, ``project``, and optionally ``joplin_note_id``. """ pending = [] for conv_id, entry in self._data.get(provider, {}).items(): if not isinstance(entry, dict): continue if not entry.get("file_path"): continue note_id = entry.get("joplin_note_id") if not note_id: pending.append((conv_id, entry)) continue # Re-sync if the file was re-exported after the last Joplin sync exported_at = entry.get("exported_at", "") synced_at = entry.get("joplin_synced_at", "") if exported_at and synced_at: try: from src.utils import _parse_dt if _parse_dt(exported_at) > _parse_dt(synced_at): pending.append((conv_id, entry)) except Exception: pass return pending def last_run(self) -> str | None: """Return the ISO8601 timestamp of the last export run, or None.""" return self._data.get("last_run") # ------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------ def _load(self) -> dict[str, Any]: """Load the manifest from disk. Returns a fresh manifest if absent.""" if not self._path.exists(): return self._fresh_manifest() try: text = self._path.read_text(encoding="utf-8") data = json.loads(text) except (json.JSONDecodeError, OSError) as e: backup = self._path.with_suffix(".json.bak") logger.warning( "[cache] Manifest file is invalid (%s). " "Backing up to %s and starting fresh.", e, backup, ) try: shutil.copy2(self._path, backup) except OSError: pass return self._fresh_manifest() # Version check version = data.get("version") if version is None: logger.warning( "[cache] Manifest has no 'version' field — treating as v1." ) data["version"] = MANIFEST_VERSION elif version > MANIFEST_VERSION: logger.critical( "[cache] Manifest version %d is newer than supported version %d. " "Please update ai-chat-exporter, or delete %s to start fresh.", version, MANIFEST_VERSION, self._path, ) raise CacheError( f"Unsupported manifest version {version}. " "Update the tool or delete the manifest to start fresh." ) return data def _save(self) -> None: """Atomically write the manifest to disk with 600 permissions.""" tmp = self._path.with_suffix(".json.tmp") try: tmp.write_text( json.dumps(self._data, indent=2, default=str), encoding="utf-8", ) # chmod BEFORE os.replace() to prevent permission race window os.chmod(tmp, 0o600) os.replace(tmp, self._path) except OSError as e: logger.error("[cache] Failed to write manifest: %s", e) raise CacheError(f"Cannot write cache manifest to {self._path}: {e}") from e @staticmethod def _fresh_manifest() -> dict[str, Any]: return { "version": MANIFEST_VERSION, "tos_acknowledged_at": None, "last_run": None, "chatgpt": {}, "claude": {}, }