feat: add cache module

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
JesseMarkowitz
2026-02-27 23:01:15 -05:00
parent 3adb2d2b48
commit f4ef937aa1

228
src/cache.py Normal file
View File

@@ -0,0 +1,228 @@
"""Local cache manifest for tracking exported conversations."""
import json
import logging
import os
import shutil
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
logger = logging.getLogger(__name__)
MANIFEST_VERSION = 1
class CacheError(Exception):
"""Raised when the cache manifest cannot be loaded or written."""
class Cache:
"""Manages the local JSON manifest of exported conversations.
The manifest is the single source of truth for what has been exported.
Every run compares the provider's full conversation list against this
manifest to determine what is new or updated.
File security:
- Permissions: 600 (owner read/write only)
- Atomic writes: .tmp → os.replace()
- chmod happens BEFORE os.replace() to prevent permission race windows
"""
def __init__(self, cache_dir: Path | str) -> None:
self._dir = Path(cache_dir).expanduser()
self._dir.mkdir(parents=True, exist_ok=True)
self._path = self._dir / "manifest.json"
self._data: dict[str, Any] = self._load()
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def is_cached(self, provider: str, conv_id: str, updated_at: str) -> bool:
"""Return True if this conversation is already exported and up to date.
A conversation is considered stale if the provider's ``updated_at``
is newer than the cached ``exported_at``.
"""
entry = self._data.get(provider, {}).get(conv_id)
if entry is None:
logger.debug("[cache] MISS %s/%s", provider, conv_id[:8])
return False
cached_updated = entry.get("updated_at", "")
if not cached_updated or not updated_at:
logger.debug("[cache] HIT (no date comparison) %s/%s", provider, conv_id[:8])
return True
try:
from src.utils import _parse_dt
cached_dt = _parse_dt(cached_updated)
provider_dt = _parse_dt(updated_at)
if provider_dt > cached_dt:
logger.debug("[cache] STALE %s/%s", provider, conv_id[:8])
return False
except Exception:
pass
logger.debug("[cache] HIT %s/%s", provider, conv_id[:8])
return True
def mark_exported(self, provider: str, conv_id: str, metadata: dict) -> None:
"""Record a successfully exported conversation in the manifest.
Writes to disk immediately (not batched) so progress is preserved
if the process is interrupted.
"""
if provider not in self._data:
self._data[provider] = {}
self._data[provider][conv_id] = {
"title": metadata.get("title", ""),
"project": metadata.get("project"),
"updated_at": metadata.get("updated_at", ""),
"exported_at": datetime.now(tz=timezone.utc).isoformat(),
"file_path": metadata.get("file_path", ""),
}
self._data["last_run"] = datetime.now(tz=timezone.utc).isoformat()
self._save()
def get_new_or_updated(self, provider: str, conversations: list[dict]) -> list[dict]:
"""Filter a conversation list to only new or updated conversations.
Args:
provider: "chatgpt" or "claude"
conversations: List of raw conversation dicts from the provider.
Each must have an ``id``/``uuid`` and ``updated_at``/``update_time``.
Returns:
Subset that needs to be exported.
"""
result = []
for conv in conversations:
conv_id = conv.get("id") or conv.get("uuid", "")
updated_at = conv.get("updated_at") or conv.get("update_time") or ""
if conv_id and not self.is_cached(provider, conv_id, updated_at):
result.append(conv)
return result
def stats(self) -> dict[str, int]:
"""Return count of cached conversations per provider."""
return {
provider: len(entries)
for provider, entries in self._data.items()
if isinstance(entries, dict) and provider not in ("version", "last_run", "tos_acknowledged_at")
}
def clear(self, provider: str | None = None) -> None:
"""Clear cached entries.
Args:
provider: If given, clear only that provider. If None, clear all.
"""
if provider:
cleared = len(self._data.get(provider, {}))
self._data[provider] = {}
logger.info("[cache] Cleared %d entries for provider '%s'", cleared, provider)
else:
for key in list(self._data.keys()):
if isinstance(self._data[key], dict) and key not in (
"version",
"last_run",
"tos_acknowledged_at",
):
self._data[key] = {}
logger.info("[cache] Cleared all provider entries")
self._save()
def is_tos_acknowledged(self) -> bool:
"""Return True if the user has acknowledged the ToS notice."""
return bool(self._data.get("tos_acknowledged_at"))
def acknowledge_tos(self) -> None:
"""Record ToS acknowledgement with a timestamp."""
self._data["tos_acknowledged_at"] = datetime.now(tz=timezone.utc).isoformat()
self._save()
logger.info("[cache] ToS acknowledged at %s", self._data["tos_acknowledged_at"])
def get_all_entries(self, provider: str) -> dict:
"""Return all cached entries for a provider (for --cache --show)."""
return dict(self._data.get(provider, {}))
def last_run(self) -> str | None:
"""Return the ISO8601 timestamp of the last export run, or None."""
return self._data.get("last_run")
# ------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------
def _load(self) -> dict[str, Any]:
"""Load the manifest from disk. Returns a fresh manifest if absent."""
if not self._path.exists():
return self._fresh_manifest()
try:
text = self._path.read_text(encoding="utf-8")
data = json.loads(text)
except (json.JSONDecodeError, OSError) as e:
backup = self._path.with_suffix(".json.bak")
logger.warning(
"[cache] Manifest file is invalid (%s). "
"Backing up to %s and starting fresh.",
e,
backup,
)
try:
shutil.copy2(self._path, backup)
except OSError:
pass
return self._fresh_manifest()
# Version check
version = data.get("version")
if version is None:
logger.warning(
"[cache] Manifest has no 'version' field — treating as v1."
)
data["version"] = MANIFEST_VERSION
elif version > MANIFEST_VERSION:
logger.critical(
"[cache] Manifest version %d is newer than supported version %d. "
"Please update ai-chat-exporter, or delete %s to start fresh.",
version,
MANIFEST_VERSION,
self._path,
)
raise CacheError(
f"Unsupported manifest version {version}. "
"Update the tool or delete the manifest to start fresh."
)
return data
def _save(self) -> None:
"""Atomically write the manifest to disk with 600 permissions."""
tmp = self._path.with_suffix(".json.tmp")
try:
tmp.write_text(
json.dumps(self._data, indent=2, default=str),
encoding="utf-8",
)
# chmod BEFORE os.replace() to prevent permission race window
os.chmod(tmp, 0o600)
os.replace(tmp, self._path)
except OSError as e:
logger.error("[cache] Failed to write manifest: %s", e)
raise CacheError(f"Cannot write cache manifest to {self._path}: {e}") from e
@staticmethod
def _fresh_manifest() -> dict[str, Any]:
return {
"version": MANIFEST_VERSION,
"tos_acknowledged_at": None,
"last_run": None,
"chatgpt": {},
"claude": {},
}