feat: add CLI
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
721
src/main.py
Normal file
721
src/main.py
Normal file
@@ -0,0 +1,721 @@
|
||||
"""CLI entry point for ai-chat-exporter."""
|
||||
|
||||
import logging
|
||||
import platform
|
||||
import shutil
|
||||
import sys
|
||||
import traceback
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
import click
|
||||
from rich.console import Console
|
||||
from rich.table import Table
|
||||
|
||||
from src.cache import Cache, CacheError
|
||||
from src.config import ConfigError
|
||||
from src.logging_config import setup_logging
|
||||
from src.providers.base import ProviderError
|
||||
|
||||
console = Console()
|
||||
err_console = Console(stderr=True)
|
||||
|
||||
TOS_NOTICE = """\
|
||||
⚠️ IMPORTANT — TERMS OF SERVICE NOTICE
|
||||
|
||||
This tool accesses ChatGPT and Claude using internal web API endpoints
|
||||
that are not officially supported or documented by OpenAI or Anthropic.
|
||||
|
||||
This approach may conflict with their Terms of Service:
|
||||
• OpenAI ToS: https://openai.com/policies/terms-of-use
|
||||
• Anthropic ToS: https://www.anthropic.com/legal/consumer-terms
|
||||
|
||||
By using this tool you accept that:
|
||||
- You are using it at your own risk
|
||||
- Your account could potentially be suspended for automated access
|
||||
- These APIs may break at any time without notice
|
||||
- This tool is for personal archival use only, not commercial use
|
||||
|
||||
Type 'yes' to acknowledge and continue, or Ctrl+C to exit: \
|
||||
"""
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# CLI group
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@click.group()
|
||||
@click.version_option(version="0.1.0", prog_name="ai-chat-exporter")
|
||||
@click.option("--verbose", "-v", is_flag=True, help="Enable DEBUG output to console.")
|
||||
@click.option("--quiet", "-q", is_flag=True, help="Show WARNING and above only.")
|
||||
@click.option("--debug", is_flag=True, help="DEBUG + full tracebacks + redacted API bodies.")
|
||||
@click.option("--no-log-file", is_flag=True, help="Disable file logging.")
|
||||
@click.pass_context
|
||||
def cli(ctx: click.Context, verbose: bool, quiet: bool, debug: bool, no_log_file: bool) -> None:
|
||||
"""Export ChatGPT and Claude conversations to Markdown for personal archival."""
|
||||
ctx.ensure_object(dict)
|
||||
|
||||
# Determine console log level
|
||||
if debug or verbose:
|
||||
level = logging.DEBUG
|
||||
elif quiet:
|
||||
level = logging.WARNING
|
||||
else:
|
||||
level = logging.INFO
|
||||
|
||||
# Determine log file path from env (setup_logging handles "none")
|
||||
import os
|
||||
log_file = os.getenv("LOG_FILE", "~/.ai-chat-exporter/logs/exporter.log")
|
||||
|
||||
setup_logging(level=level, log_file=log_file, no_log_file=no_log_file)
|
||||
|
||||
ctx.obj["debug"] = debug
|
||||
ctx.obj["verbose"] = verbose
|
||||
|
||||
# Initialise cache (needed for ToS gate on every command)
|
||||
import os
|
||||
cache_dir = Path(os.getenv("CACHE_DIR", "~/.ai-chat-exporter")).expanduser()
|
||||
try:
|
||||
cache = Cache(cache_dir)
|
||||
except CacheError as e:
|
||||
err_console.print(f"[red]Cache error: {e}[/red]")
|
||||
sys.exit(1)
|
||||
|
||||
ctx.obj["cache"] = cache
|
||||
|
||||
# ToS gate: must happen before any command executes
|
||||
if not cache.is_tos_acknowledged():
|
||||
try:
|
||||
answer = click.prompt(TOS_NOTICE, default="", show_default=False).strip().lower()
|
||||
except (click.Abort, KeyboardInterrupt):
|
||||
console.print("\n[yellow]Exiting.[/yellow]")
|
||||
sys.exit(0)
|
||||
if answer != "yes":
|
||||
console.print("[yellow]You must type 'yes' to proceed. Exiting.[/yellow]")
|
||||
sys.exit(0)
|
||||
cache.acknowledge_tos()
|
||||
console.print("[green]Acknowledged. Proceeding.[/green]\n")
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# auth command
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.pass_context
|
||||
def auth(ctx: click.Context) -> None:
|
||||
"""Interactive setup wizard for session tokens.
|
||||
|
||||
Guides you through finding and saving your ChatGPT and Claude session
|
||||
tokens. Tokens are never echoed to the terminal.
|
||||
|
||||
Token lifetimes:
|
||||
ChatGPT (__Secure-next-auth.session-token): ~7 days (JWT)
|
||||
Claude (sessionKey): ~30 days (opaque string)
|
||||
"""
|
||||
os_name = platform.system()
|
||||
|
||||
console.print("\n[bold cyan]AI Chat Exporter — Token Setup Wizard[/bold cyan]\n")
|
||||
console.print("Session tokens let this tool access your chat history without your password.")
|
||||
console.print("They are stored in browser cookies and have limited lifetimes.\n")
|
||||
|
||||
# Offer choice of provider to configure
|
||||
providers = click.prompt(
|
||||
"Which provider(s) do you want to configure? [chatgpt/claude/both]",
|
||||
default="both",
|
||||
).strip().lower()
|
||||
|
||||
configure_chatgpt = providers in ("chatgpt", "both")
|
||||
configure_claude = providers in ("claude", "both")
|
||||
|
||||
if configure_chatgpt:
|
||||
_auth_chatgpt(os_name)
|
||||
|
||||
if configure_claude:
|
||||
_auth_claude(os_name)
|
||||
|
||||
console.print("\n[green]Done! Run 'python -m src.main doctor' to verify your setup.[/green]")
|
||||
|
||||
|
||||
def _auth_chatgpt(os_name: str) -> None:
|
||||
import jwt as pyjwt
|
||||
|
||||
console.print("\n[bold]─── ChatGPT ───[/bold]")
|
||||
console.print("1. Open [link=https://chatgpt.com]https://chatgpt.com[/link] and log in.")
|
||||
if os_name == "Darwin":
|
||||
console.print("2. Press [bold]Cmd+Option+I[/bold] to open DevTools → Application tab.")
|
||||
else:
|
||||
console.print("2. Press [bold]F12[/bold] to open DevTools → Application tab.")
|
||||
console.print("3. Expand [bold]Cookies[/bold] → [bold]https://chatgpt.com[/bold]")
|
||||
console.print("4. Find [bold]__Secure-next-auth.session-token[/bold] → copy the Value.")
|
||||
console.print(" (Token starts with 'eyJ...' — it is a long JWT string)")
|
||||
console.print("5. Paste it below (input is hidden).\n")
|
||||
|
||||
token = click.prompt("ChatGPT session token", hide_input=True, default="", show_default=False).strip()
|
||||
if not token:
|
||||
console.print("[yellow]Skipped ChatGPT token.[/yellow]")
|
||||
return
|
||||
|
||||
# Validate
|
||||
if not token.startswith("eyJ"):
|
||||
console.print("[yellow]Warning: token doesn't look like a JWT (expected 'eyJ...').[/yellow]")
|
||||
|
||||
expiry_str = ""
|
||||
try:
|
||||
payload = pyjwt.decode(token, options={"verify_signature": False})
|
||||
exp = payload.get("exp")
|
||||
if exp:
|
||||
from datetime import timezone
|
||||
expiry = datetime.fromtimestamp(exp, tz=timezone.utc)
|
||||
expiry_str = expiry.strftime("%Y-%m-%d %H:%M UTC")
|
||||
console.print(f"[green]Token decoded — expires: {expiry_str}[/green]")
|
||||
except Exception:
|
||||
console.print("[yellow]Could not decode token expiry.[/yellow]")
|
||||
|
||||
_write_token_to_env("CHATGPT_SESSION_TOKEN", token)
|
||||
|
||||
|
||||
def _auth_claude(os_name: str) -> None:
|
||||
console.print("\n[bold]─── Claude ───[/bold]")
|
||||
console.print("1. Open [link=https://claude.ai]https://claude.ai[/link] and log in.")
|
||||
if os_name == "Darwin":
|
||||
console.print("2. Press [bold]Cmd+Option+I[/bold] to open DevTools → Application tab.")
|
||||
else:
|
||||
console.print("2. Press [bold]F12[/bold] to open DevTools → Application tab.")
|
||||
console.print("3. Expand [bold]Cookies[/bold] → [bold]https://claude.ai[/bold]")
|
||||
console.print("4. Find [bold]sessionKey[/bold] → copy the Value.")
|
||||
console.print(" (Note: Claude tokens expire after ~30 days; a 401 error is the only signal.)")
|
||||
console.print("5. Paste it below (input is hidden).\n")
|
||||
|
||||
key = click.prompt("Claude session key", hide_input=True, default="", show_default=False).strip()
|
||||
if not key:
|
||||
console.print("[yellow]Skipped Claude token.[/yellow]")
|
||||
return
|
||||
|
||||
console.print("[green]Claude session key saved.[/green]")
|
||||
_write_token_to_env("CLAUDE_SESSION_KEY", key)
|
||||
|
||||
|
||||
def _write_token_to_env(key: str, value: str) -> None:
|
||||
"""Write or update a key in .env, offering to create the file if it doesn't exist."""
|
||||
env_path = Path(".env")
|
||||
|
||||
if click.confirm(f"Write {key} to .env?", default=True):
|
||||
if not env_path.exists():
|
||||
# Create from example if available
|
||||
example = Path(".env.example")
|
||||
if example.exists():
|
||||
import shutil as _shutil
|
||||
_shutil.copy2(example, env_path)
|
||||
console.print("[dim]Created .env from .env.example[/dim]")
|
||||
else:
|
||||
env_path.touch()
|
||||
|
||||
lines = env_path.read_text(encoding="utf-8").splitlines(keepends=True)
|
||||
updated = False
|
||||
new_lines = []
|
||||
for line in lines:
|
||||
if line.startswith(f"{key}=") or line.startswith(f"{key} ="):
|
||||
new_lines.append(f"{key}={value}\n")
|
||||
updated = True
|
||||
else:
|
||||
new_lines.append(line)
|
||||
|
||||
if not updated:
|
||||
new_lines.append(f"\n{key}={value}\n")
|
||||
|
||||
env_path.write_text("".join(new_lines), encoding="utf-8")
|
||||
import os
|
||||
os.chmod(env_path, 0o600)
|
||||
console.print(f"[green]{key} written to .env (permissions: 600)[/green]")
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# doctor command
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.pass_context
|
||||
def doctor(ctx: click.Context) -> None:
|
||||
"""Validate configuration and API connectivity.
|
||||
|
||||
Checks token presence, format, expiry, directory permissions, disk space,
|
||||
and live API reachability. Exits with code 1 if any checks fail.
|
||||
"""
|
||||
checks = _run_doctor_checks()
|
||||
_print_doctor_table(checks)
|
||||
|
||||
if any(not c["pass"] for c in checks):
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def _run_doctor_checks() -> list[dict]:
|
||||
"""Run all doctor checks and return results."""
|
||||
import os
|
||||
import jwt as pyjwt
|
||||
from datetime import timezone
|
||||
|
||||
checks = []
|
||||
|
||||
def add(name: str, passed: bool, detail: str = "") -> None:
|
||||
checks.append({"name": name, "pass": passed, "detail": detail})
|
||||
|
||||
# Token presence
|
||||
chatgpt_token = os.getenv("CHATGPT_SESSION_TOKEN", "").strip() or None
|
||||
claude_key = os.getenv("CLAUDE_SESSION_KEY", "").strip() or None
|
||||
|
||||
add("CHATGPT_SESSION_TOKEN set", bool(chatgpt_token), "" if chatgpt_token else "Set in .env or run 'auth'")
|
||||
add("CLAUDE_SESSION_KEY set", bool(claude_key), "" if claude_key else "Set in .env or run 'auth'")
|
||||
|
||||
# ChatGPT token format + expiry
|
||||
if chatgpt_token:
|
||||
is_jwt = chatgpt_token.startswith("eyJ")
|
||||
add("ChatGPT token is valid JWT", is_jwt, "" if is_jwt else "Expected token starting with 'eyJ'")
|
||||
if is_jwt:
|
||||
try:
|
||||
payload = pyjwt.decode(chatgpt_token, options={"verify_signature": False})
|
||||
exp = payload.get("exp")
|
||||
if exp:
|
||||
expiry = datetime.fromtimestamp(exp, tz=timezone.utc)
|
||||
now = datetime.now(tz=timezone.utc)
|
||||
delta = expiry - now
|
||||
detail = f"Expires {expiry.strftime('%Y-%m-%d %H:%M UTC')} ({delta.days}d)"
|
||||
ok = delta.total_seconds() > 0
|
||||
add("ChatGPT token not expired", ok, detail)
|
||||
if ok and delta.total_seconds() < 86400:
|
||||
add("ChatGPT token expiry warning", False, "Expires in < 24h — refresh soon")
|
||||
else:
|
||||
add("ChatGPT token expiry", False, "JWT has no 'exp' claim")
|
||||
except Exception as e:
|
||||
add("ChatGPT token decode", False, str(e))
|
||||
|
||||
# Claude key
|
||||
if claude_key:
|
||||
add("Claude session key non-empty", True, "Expiry cannot be decoded (opaque token)")
|
||||
|
||||
# Directories
|
||||
export_dir = Path(os.getenv("EXPORT_DIR", "./exports")).expanduser()
|
||||
cache_dir = Path(os.getenv("CACHE_DIR", "~/.ai-chat-exporter")).expanduser()
|
||||
|
||||
for label, dirpath in [("Export dir writable", export_dir), ("Cache dir writable", cache_dir)]:
|
||||
try:
|
||||
dirpath.mkdir(parents=True, exist_ok=True)
|
||||
test = dirpath / ".doctor_write_test"
|
||||
test.touch()
|
||||
test.unlink()
|
||||
add(label, True, str(dirpath))
|
||||
except OSError as e:
|
||||
add(label, False, str(e))
|
||||
|
||||
# Disk space
|
||||
try:
|
||||
usage = shutil.disk_usage(export_dir if export_dir.exists() else Path("."))
|
||||
free_mb = usage.free // (1024 * 1024)
|
||||
add("Disk space (≥100MB free)", free_mb >= 100, f"{free_mb}MB free")
|
||||
except OSError as e:
|
||||
add("Disk space check", False, str(e))
|
||||
|
||||
# API reachability
|
||||
if chatgpt_token:
|
||||
try:
|
||||
from src.providers.chatgpt import ChatGPTProvider
|
||||
p = ChatGPTProvider(chatgpt_token)
|
||||
results = p.list_conversations(offset=0, limit=1)
|
||||
add("ChatGPT API reachable", True, f"Got {len(results)} result(s)")
|
||||
except ProviderError as e:
|
||||
add("ChatGPT API reachable", False, str(e.original)[:80])
|
||||
except Exception as e:
|
||||
add("ChatGPT API reachable", False, str(e)[:80])
|
||||
|
||||
if claude_key:
|
||||
try:
|
||||
from src.providers.claude import ClaudeProvider
|
||||
p = ClaudeProvider(claude_key)
|
||||
results = p.list_conversations(offset=0, limit=1)
|
||||
add("Claude API reachable", True, f"Got {len(results)} result(s)")
|
||||
except ProviderError as e:
|
||||
add("Claude API reachable", False, str(e.original)[:80])
|
||||
except Exception as e:
|
||||
add("Claude API reachable", False, str(e)[:80])
|
||||
|
||||
return checks
|
||||
|
||||
|
||||
def _print_doctor_table(checks: list[dict]) -> None:
|
||||
table = Table(title="Doctor Check Results", show_header=True)
|
||||
table.add_column("Check", style="bold")
|
||||
table.add_column("Status", justify="center")
|
||||
table.add_column("Detail")
|
||||
|
||||
for c in checks:
|
||||
status = "[green]✓ PASS[/green]" if c["pass"] else "[red]✗ FAIL[/red]"
|
||||
table.add_row(c["name"], status, c.get("detail", ""))
|
||||
|
||||
console.print(table)
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# export command
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.option(
|
||||
"--provider",
|
||||
type=click.Choice(["chatgpt", "claude", "all"], case_sensitive=False),
|
||||
default="all",
|
||||
show_default=True,
|
||||
help="Which provider to export.",
|
||||
)
|
||||
@click.option(
|
||||
"--format",
|
||||
"fmt",
|
||||
type=click.Choice(["markdown", "json", "both"], case_sensitive=False),
|
||||
default="markdown",
|
||||
show_default=True,
|
||||
help="Output format.",
|
||||
)
|
||||
@click.option(
|
||||
"--output",
|
||||
"output_dir",
|
||||
default=None,
|
||||
type=click.Path(),
|
||||
help="Override EXPORT_DIR.",
|
||||
)
|
||||
@click.option(
|
||||
"--since",
|
||||
default=None,
|
||||
help="Only export conversations updated after this date (YYYY-MM-DD).",
|
||||
)
|
||||
@click.option("--dry-run", is_flag=True, help="Show what would be exported without writing anything.")
|
||||
@click.pass_context
|
||||
def export(
|
||||
ctx: click.Context,
|
||||
provider: str,
|
||||
fmt: str,
|
||||
output_dir: str | None,
|
||||
since: str | None,
|
||||
dry_run: bool,
|
||||
) -> None:
|
||||
"""Export new and updated conversations to Markdown or JSON.
|
||||
|
||||
Every run is automatically resumable — only new or updated conversations
|
||||
are exported. Re-running the same command after an interruption will pick
|
||||
up exactly where it left off.
|
||||
"""
|
||||
debug = ctx.obj.get("debug", False)
|
||||
cache: Cache = ctx.obj["cache"]
|
||||
|
||||
# Load config (may raise ConfigError)
|
||||
try:
|
||||
from src.config import load_config
|
||||
cfg = _load_config_or_exit(debug)
|
||||
except SystemExit:
|
||||
return
|
||||
|
||||
# First-run: auto-doctor
|
||||
if not cache.last_run():
|
||||
console.print("[dim]First run — checking configuration…[/dim]")
|
||||
checks = _run_doctor_checks()
|
||||
_print_doctor_table(checks)
|
||||
if any(not c["pass"] for c in checks):
|
||||
err_console.print(
|
||||
"[red]Doctor checks failed. Fix the issues above before exporting.[/red]"
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
export_base = Path(output_dir).expanduser() if output_dir else cfg.export_dir
|
||||
structure = cfg.output_structure
|
||||
|
||||
# Resolve since datetime
|
||||
since_dt: datetime | None = None
|
||||
if since:
|
||||
try:
|
||||
since_dt = datetime.fromisoformat(since)
|
||||
except ValueError:
|
||||
err_console.print(f"[red]Invalid --since date: '{since}'. Use YYYY-MM-DD.[/red]")
|
||||
sys.exit(1)
|
||||
|
||||
# Determine which providers to run
|
||||
providers_to_run = _resolve_providers(provider, cfg)
|
||||
if not providers_to_run:
|
||||
err_console.print(
|
||||
"[red]No providers configured. Run 'python -m src.main auth' to set up tokens.[/red]"
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
# Build exporters
|
||||
from src.exporters.markdown import MarkdownExporter
|
||||
from src.exporters.json_export import JSONExporter
|
||||
|
||||
md_exporter = MarkdownExporter(export_base, structure) if fmt in ("markdown", "both") else None
|
||||
json_exporter = JSONExporter(export_base, structure) if fmt in ("json", "both") else None
|
||||
|
||||
# Summary counters
|
||||
summary: dict[str, dict[str, int]] = {}
|
||||
|
||||
for prov_name, prov_instance in providers_to_run:
|
||||
summary[prov_name] = {"exported": 0, "skipped": 0, "failed": 0}
|
||||
|
||||
console.print(f"\n[bold cyan][{prov_name.upper()}][/bold cyan] Fetching conversation list…")
|
||||
|
||||
try:
|
||||
all_convs = prov_instance.fetch_all_conversations(since=since_dt)
|
||||
except ProviderError as e:
|
||||
_handle_provider_error(e, debug)
|
||||
summary[prov_name]["failed"] += len(all_convs) if "all_convs" in dir() else 0
|
||||
continue
|
||||
|
||||
to_export = cache.get_new_or_updated(prov_name, all_convs)
|
||||
skipped = len(all_convs) - len(to_export)
|
||||
summary[prov_name]["skipped"] = skipped
|
||||
|
||||
if dry_run:
|
||||
_print_dry_run_table(prov_name, to_export, prov_instance, export_base, structure, skipped)
|
||||
continue
|
||||
|
||||
if not to_export:
|
||||
console.print(f" [dim]{skipped} conversations already up to date.[/dim]")
|
||||
continue
|
||||
|
||||
console.print(f" [dim]{len(to_export)} to export, {skipped} already up to date.[/dim]")
|
||||
|
||||
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn
|
||||
|
||||
with Progress(
|
||||
SpinnerColumn(),
|
||||
TextColumn("[progress.description]{task.description}"),
|
||||
BarColumn(),
|
||||
TaskProgressColumn(),
|
||||
console=console,
|
||||
) as progress:
|
||||
task = progress.add_task(f"Exporting {prov_name}…", total=len(to_export))
|
||||
|
||||
for raw_conv in to_export:
|
||||
conv_id = raw_conv.get("id") or raw_conv.get("uuid", "unknown")
|
||||
try:
|
||||
full_raw = prov_instance.get_conversation(conv_id)
|
||||
normalized = prov_instance.normalize_conversation(full_raw)
|
||||
|
||||
exported_path: Path | None = None
|
||||
if md_exporter:
|
||||
exported_path = md_exporter.export(normalized)
|
||||
if json_exporter:
|
||||
exported_path = json_exporter.export(normalized)
|
||||
|
||||
# Write to cache immediately after successful export
|
||||
cache.mark_exported(prov_name, conv_id, {
|
||||
"title": normalized.get("title", ""),
|
||||
"project": normalized.get("project"),
|
||||
"updated_at": normalized.get("updated_at", ""),
|
||||
"file_path": str(exported_path) if exported_path else "",
|
||||
})
|
||||
summary[prov_name]["exported"] += 1
|
||||
progress.advance(task)
|
||||
|
||||
except ProviderError as e:
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.error("Failed to export conversation %s: %s", conv_id[:8], e)
|
||||
summary[prov_name]["failed"] += 1
|
||||
progress.advance(task)
|
||||
continue
|
||||
except OSError as e:
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.error("File write failed for conversation %s: %s", conv_id[:8], e)
|
||||
summary[prov_name]["failed"] += 1
|
||||
progress.advance(task)
|
||||
continue
|
||||
|
||||
if not dry_run:
|
||||
_print_export_summary(summary)
|
||||
|
||||
|
||||
def _resolve_providers(provider: str, cfg) -> list[tuple[str, object]]:
|
||||
"""Return (name, instance) pairs for configured providers."""
|
||||
result = []
|
||||
|
||||
def try_add(prov_name: str, token: str | None, provider_cls):
|
||||
if not token:
|
||||
if provider == prov_name or provider == "all":
|
||||
logging.getLogger(__name__).warning(
|
||||
"[%s] Skipping — token not configured.", prov_name
|
||||
)
|
||||
return
|
||||
try:
|
||||
result.append((prov_name, provider_cls(token)))
|
||||
except ProviderError as e:
|
||||
logging.getLogger(__name__).warning(
|
||||
"[%s] Could not initialise provider: %s", prov_name, e
|
||||
)
|
||||
|
||||
from src.providers.chatgpt import ChatGPTProvider
|
||||
from src.providers.claude import ClaudeProvider
|
||||
|
||||
if provider in ("chatgpt", "all"):
|
||||
try_add("chatgpt", cfg.chatgpt_session_token, ChatGPTProvider)
|
||||
if provider in ("claude", "all"):
|
||||
try_add("claude", cfg.claude_session_key, ClaudeProvider)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def _print_dry_run_table(prov_name, to_export, prov_instance, export_base, structure, skipped):
|
||||
from src.utils import generate_filename, build_export_path
|
||||
|
||||
table = Table(title=f"[DRY RUN] {prov_name.upper()} — Would export {len(to_export)} conversations")
|
||||
table.add_column("Title")
|
||||
table.add_column("Project")
|
||||
table.add_column("Destination")
|
||||
table.add_column("Updated")
|
||||
|
||||
for conv in to_export[:50]: # cap display at 50
|
||||
title = conv.get("title") or "Untitled"
|
||||
project = conv.get("project_title") or conv.get("project", {})
|
||||
if isinstance(project, dict):
|
||||
project = project.get("name", "")
|
||||
project = project or None
|
||||
updated = (conv.get("updated_at") or conv.get("update_time") or "")[:10]
|
||||
created = conv.get("created_at") or conv.get("create_time") or ""
|
||||
conv_id = conv.get("id") or conv.get("uuid", "")
|
||||
filename = generate_filename(title, conv_id, created or "2000-01-01")
|
||||
dest = build_export_path(export_base, prov_name, project, created or "2000-01-01", filename, structure)
|
||||
table.add_row(title[:50], str(project or "no-project"), str(dest), updated)
|
||||
|
||||
if len(to_export) > 50:
|
||||
table.add_row(f"… and {len(to_export) - 50} more", "", "", "")
|
||||
|
||||
console.print(table)
|
||||
console.print(f" [dim]{skipped} conversations already cached (would be skipped).[/dim]")
|
||||
|
||||
|
||||
def _print_export_summary(summary: dict[str, dict[str, int]]) -> None:
|
||||
table = Table(title="Export Summary")
|
||||
table.add_column("Provider", style="bold")
|
||||
table.add_column("Exported", justify="right")
|
||||
table.add_column("Skipped", justify="right")
|
||||
table.add_column("Failed", justify="right")
|
||||
|
||||
for prov, counts in summary.items():
|
||||
table.add_row(
|
||||
prov.capitalize(),
|
||||
str(counts["exported"]),
|
||||
str(counts["skipped"]),
|
||||
f"[red]{counts['failed']}[/red]" if counts["failed"] else "0",
|
||||
)
|
||||
|
||||
console.print(table)
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# list command
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@cli.command(name="list")
|
||||
@click.option(
|
||||
"--provider",
|
||||
type=click.Choice(["chatgpt", "claude", "all"], case_sensitive=False),
|
||||
default="all",
|
||||
show_default=True,
|
||||
)
|
||||
@click.pass_context
|
||||
def list_conversations(ctx: click.Context, provider: str) -> None:
|
||||
"""List conversations without exporting them."""
|
||||
debug = ctx.obj.get("debug", False)
|
||||
cfg = _load_config_or_exit(debug)
|
||||
providers_to_run = _resolve_providers(provider, cfg)
|
||||
|
||||
for prov_name, prov_instance in providers_to_run:
|
||||
console.print(f"\n[bold cyan][{prov_name.upper()}][/bold cyan]")
|
||||
try:
|
||||
all_convs = prov_instance.fetch_all_conversations()
|
||||
except ProviderError as e:
|
||||
_handle_provider_error(e, debug)
|
||||
continue
|
||||
|
||||
table = Table()
|
||||
table.add_column("Title")
|
||||
table.add_column("Project")
|
||||
table.add_column("Updated")
|
||||
table.add_column("ID")
|
||||
|
||||
for conv in all_convs:
|
||||
title = conv.get("title") or "Untitled"
|
||||
project = conv.get("project_title") or ""
|
||||
if isinstance(conv.get("project"), dict):
|
||||
project = conv["project"].get("name", "")
|
||||
updated = (conv.get("updated_at") or conv.get("update_time") or "")[:10]
|
||||
conv_id = (conv.get("id") or conv.get("uuid") or "")[:8]
|
||||
table.add_row(title[:60], project[:30], updated, conv_id)
|
||||
|
||||
console.print(table)
|
||||
console.print(f"Total: {len(all_convs)} conversations")
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# cache command
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.option("--show", is_flag=True, help="Show cache statistics.")
|
||||
@click.option("--clear", is_flag=True, help="Clear cached entries.")
|
||||
@click.option(
|
||||
"--provider",
|
||||
type=click.Choice(["chatgpt", "claude", "all"], case_sensitive=False),
|
||||
default="all",
|
||||
help="Provider to target (used with --clear).",
|
||||
)
|
||||
@click.pass_context
|
||||
def cache(ctx: click.Context, show: bool, clear: bool, provider: str) -> None:
|
||||
"""Manage the local export cache (sync manifest)."""
|
||||
cache_obj: Cache = ctx.obj["cache"]
|
||||
|
||||
if show:
|
||||
stats = cache_obj.stats()
|
||||
table = Table(title="Cache Statistics")
|
||||
table.add_column("Provider", style="bold")
|
||||
table.add_column("Cached Conversations", justify="right")
|
||||
for prov, count in stats.items():
|
||||
table.add_row(prov.capitalize(), str(count))
|
||||
last = cache_obj.last_run()
|
||||
console.print(table)
|
||||
console.print(f"Last run: {last or 'never'}")
|
||||
|
||||
if clear:
|
||||
prov_arg = None if provider == "all" else provider
|
||||
if click.confirm(f"Clear cache for {'all providers' if prov_arg is None else prov_arg}?"):
|
||||
cache_obj.clear(prov_arg)
|
||||
console.print("[green]Cache cleared.[/green]")
|
||||
|
||||
if not show and not clear:
|
||||
console.print("Specify --show or --clear. Use --help for options.")
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
# Helpers
|
||||
# ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _load_config_or_exit(debug: bool):
|
||||
"""Load config, printing a clean error on failure."""
|
||||
from src.config import load_config
|
||||
try:
|
||||
return load_config()
|
||||
except ConfigError as e:
|
||||
err_console.print(f"[red]Configuration error:[/red] {e}")
|
||||
if debug:
|
||||
traceback.print_exc()
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def _handle_provider_error(e: ProviderError, debug: bool) -> None:
|
||||
err_console.print(f"[red]Provider error ({e.provider_name}/{e.operation}):[/red] {e.original}")
|
||||
if debug:
|
||||
traceback.print_exc()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
cli()
|
||||
Reference in New Issue
Block a user