From f4ef937aa137384a677a090f0997b364629cb68e Mon Sep 17 00:00:00 2001 From: JesseMarkowitz Date: Fri, 27 Feb 2026 23:01:15 -0500 Subject: [PATCH] feat: add cache module Co-Authored-By: Claude Sonnet 4.6 --- src/cache.py | 228 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 228 insertions(+) create mode 100644 src/cache.py diff --git a/src/cache.py b/src/cache.py new file mode 100644 index 0000000..0036240 --- /dev/null +++ b/src/cache.py @@ -0,0 +1,228 @@ +"""Local cache manifest for tracking exported conversations.""" + +import json +import logging +import os +import shutil +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +logger = logging.getLogger(__name__) + +MANIFEST_VERSION = 1 + + +class CacheError(Exception): + """Raised when the cache manifest cannot be loaded or written.""" + + +class Cache: + """Manages the local JSON manifest of exported conversations. + + The manifest is the single source of truth for what has been exported. + Every run compares the provider's full conversation list against this + manifest to determine what is new or updated. + + File security: + - Permissions: 600 (owner read/write only) + - Atomic writes: .tmp → os.replace() + - chmod happens BEFORE os.replace() to prevent permission race windows + """ + + def __init__(self, cache_dir: Path | str) -> None: + self._dir = Path(cache_dir).expanduser() + self._dir.mkdir(parents=True, exist_ok=True) + self._path = self._dir / "manifest.json" + self._data: dict[str, Any] = self._load() + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + def is_cached(self, provider: str, conv_id: str, updated_at: str) -> bool: + """Return True if this conversation is already exported and up to date. + + A conversation is considered stale if the provider's ``updated_at`` + is newer than the cached ``exported_at``. + """ + entry = self._data.get(provider, {}).get(conv_id) + if entry is None: + logger.debug("[cache] MISS %s/%s", provider, conv_id[:8]) + return False + + cached_updated = entry.get("updated_at", "") + if not cached_updated or not updated_at: + logger.debug("[cache] HIT (no date comparison) %s/%s", provider, conv_id[:8]) + return True + + try: + from src.utils import _parse_dt + cached_dt = _parse_dt(cached_updated) + provider_dt = _parse_dt(updated_at) + if provider_dt > cached_dt: + logger.debug("[cache] STALE %s/%s", provider, conv_id[:8]) + return False + except Exception: + pass + + logger.debug("[cache] HIT %s/%s", provider, conv_id[:8]) + return True + + def mark_exported(self, provider: str, conv_id: str, metadata: dict) -> None: + """Record a successfully exported conversation in the manifest. + + Writes to disk immediately (not batched) so progress is preserved + if the process is interrupted. + """ + if provider not in self._data: + self._data[provider] = {} + + self._data[provider][conv_id] = { + "title": metadata.get("title", ""), + "project": metadata.get("project"), + "updated_at": metadata.get("updated_at", ""), + "exported_at": datetime.now(tz=timezone.utc).isoformat(), + "file_path": metadata.get("file_path", ""), + } + self._data["last_run"] = datetime.now(tz=timezone.utc).isoformat() + self._save() + + def get_new_or_updated(self, provider: str, conversations: list[dict]) -> list[dict]: + """Filter a conversation list to only new or updated conversations. + + Args: + provider: "chatgpt" or "claude" + conversations: List of raw conversation dicts from the provider. + Each must have an ``id``/``uuid`` and ``updated_at``/``update_time``. + + Returns: + Subset that needs to be exported. + """ + result = [] + for conv in conversations: + conv_id = conv.get("id") or conv.get("uuid", "") + updated_at = conv.get("updated_at") or conv.get("update_time") or "" + if conv_id and not self.is_cached(provider, conv_id, updated_at): + result.append(conv) + return result + + def stats(self) -> dict[str, int]: + """Return count of cached conversations per provider.""" + return { + provider: len(entries) + for provider, entries in self._data.items() + if isinstance(entries, dict) and provider not in ("version", "last_run", "tos_acknowledged_at") + } + + def clear(self, provider: str | None = None) -> None: + """Clear cached entries. + + Args: + provider: If given, clear only that provider. If None, clear all. + """ + if provider: + cleared = len(self._data.get(provider, {})) + self._data[provider] = {} + logger.info("[cache] Cleared %d entries for provider '%s'", cleared, provider) + else: + for key in list(self._data.keys()): + if isinstance(self._data[key], dict) and key not in ( + "version", + "last_run", + "tos_acknowledged_at", + ): + self._data[key] = {} + logger.info("[cache] Cleared all provider entries") + self._save() + + def is_tos_acknowledged(self) -> bool: + """Return True if the user has acknowledged the ToS notice.""" + return bool(self._data.get("tos_acknowledged_at")) + + def acknowledge_tos(self) -> None: + """Record ToS acknowledgement with a timestamp.""" + self._data["tos_acknowledged_at"] = datetime.now(tz=timezone.utc).isoformat() + self._save() + logger.info("[cache] ToS acknowledged at %s", self._data["tos_acknowledged_at"]) + + def get_all_entries(self, provider: str) -> dict: + """Return all cached entries for a provider (for --cache --show).""" + return dict(self._data.get(provider, {})) + + def last_run(self) -> str | None: + """Return the ISO8601 timestamp of the last export run, or None.""" + return self._data.get("last_run") + + # ------------------------------------------------------------------ + # Private helpers + # ------------------------------------------------------------------ + + def _load(self) -> dict[str, Any]: + """Load the manifest from disk. Returns a fresh manifest if absent.""" + if not self._path.exists(): + return self._fresh_manifest() + + try: + text = self._path.read_text(encoding="utf-8") + data = json.loads(text) + except (json.JSONDecodeError, OSError) as e: + backup = self._path.with_suffix(".json.bak") + logger.warning( + "[cache] Manifest file is invalid (%s). " + "Backing up to %s and starting fresh.", + e, + backup, + ) + try: + shutil.copy2(self._path, backup) + except OSError: + pass + return self._fresh_manifest() + + # Version check + version = data.get("version") + if version is None: + logger.warning( + "[cache] Manifest has no 'version' field — treating as v1." + ) + data["version"] = MANIFEST_VERSION + elif version > MANIFEST_VERSION: + logger.critical( + "[cache] Manifest version %d is newer than supported version %d. " + "Please update ai-chat-exporter, or delete %s to start fresh.", + version, + MANIFEST_VERSION, + self._path, + ) + raise CacheError( + f"Unsupported manifest version {version}. " + "Update the tool or delete the manifest to start fresh." + ) + + return data + + def _save(self) -> None: + """Atomically write the manifest to disk with 600 permissions.""" + tmp = self._path.with_suffix(".json.tmp") + try: + tmp.write_text( + json.dumps(self._data, indent=2, default=str), + encoding="utf-8", + ) + # chmod BEFORE os.replace() to prevent permission race window + os.chmod(tmp, 0o600) + os.replace(tmp, self._path) + except OSError as e: + logger.error("[cache] Failed to write manifest: %s", e) + raise CacheError(f"Cannot write cache manifest to {self._path}: {e}") from e + + @staticmethod + def _fresh_manifest() -> dict[str, Any]: + return { + "version": MANIFEST_VERSION, + "tos_acknowledged_at": None, + "last_run": None, + "chatgpt": {}, + "claude": {}, + }