feat: add utils module
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
157
src/utils.py
Normal file
157
src/utils.py
Normal file
@@ -0,0 +1,157 @@
|
||||
"""Shared utility functions for ai-chat-exporter."""
|
||||
|
||||
import re
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
from slugify import slugify
|
||||
|
||||
|
||||
# Keys whose values should be redacted in log output
|
||||
_SENSITIVE_KEYS = frozenset(
|
||||
{"token", "key", "secret", "password", "authorization", "session", "sessionkey"}
|
||||
)
|
||||
|
||||
|
||||
def generate_filename(title: str, conv_id: str, created_at: str) -> str:
|
||||
"""Build the export filename for a conversation.
|
||||
|
||||
Format: YYYY-MM-DD_{slug}_{id[:8]}.md
|
||||
|
||||
Args:
|
||||
title: Conversation title.
|
||||
conv_id: Conversation ID (at least 8 chars recommended).
|
||||
created_at: ISO8601 creation timestamp.
|
||||
|
||||
Returns:
|
||||
Filename string, e.g. "2024-06-10_my-conversation_abc12345.md"
|
||||
"""
|
||||
dt = _parse_dt(created_at)
|
||||
date_str = dt.strftime("%Y-%m-%d")
|
||||
slug = slugify(title, max_length=60, word_boundary=True) or "untitled"
|
||||
short_id = conv_id[:8]
|
||||
return f"{date_str}_{slug}_{short_id}.md"
|
||||
|
||||
|
||||
def build_export_path(
|
||||
base_dir: Path,
|
||||
provider: str,
|
||||
project: str | None,
|
||||
created_at: str,
|
||||
filename: str,
|
||||
structure: str = "provider/project/year",
|
||||
) -> Path:
|
||||
"""Build the full output path for an exported file.
|
||||
|
||||
Args:
|
||||
base_dir: Root export directory (e.g. Path("./exports")).
|
||||
provider: "chatgpt" or "claude".
|
||||
project: Project name (will be slugified), or None/empty for no-project.
|
||||
created_at: ISO8601 creation timestamp (used for year folder).
|
||||
filename: Already-generated filename from generate_filename().
|
||||
structure: OUTPUT_STRUCTURE value. One of:
|
||||
"provider/project/year" (default)
|
||||
"provider/project"
|
||||
"provider/year"
|
||||
|
||||
Returns:
|
||||
Full Path to the output file.
|
||||
"""
|
||||
dt = _parse_dt(created_at)
|
||||
year = str(dt.year)
|
||||
project_slug = slugify(project, max_length=60, word_boundary=True) if project else "no-project"
|
||||
|
||||
parts: list[str] = [provider]
|
||||
|
||||
if structure == "provider/project/year":
|
||||
parts += [project_slug, year]
|
||||
elif structure == "provider/project":
|
||||
parts += [project_slug]
|
||||
elif structure == "provider/year":
|
||||
parts += [year]
|
||||
else:
|
||||
# Unknown structure — fall back to default
|
||||
parts += [project_slug, year]
|
||||
|
||||
return base_dir.joinpath(*parts) / filename
|
||||
|
||||
|
||||
def redact_secrets(data: object) -> object:
|
||||
"""Recursively redact sensitive values from a dict/list for safe logging.
|
||||
|
||||
Keys matching _SENSITIVE_KEYS (case-insensitive) have their values
|
||||
replaced with "[REDACTED]".
|
||||
|
||||
Args:
|
||||
data: Any JSON-serializable object.
|
||||
|
||||
Returns:
|
||||
A new object with sensitive values replaced.
|
||||
"""
|
||||
if isinstance(data, dict):
|
||||
return {
|
||||
k: "[REDACTED]" if k.lower() in _SENSITIVE_KEYS else redact_secrets(v)
|
||||
for k, v in data.items()
|
||||
}
|
||||
if isinstance(data, list):
|
||||
return [redact_secrets(item) for item in data]
|
||||
return data
|
||||
|
||||
|
||||
def format_token_status(token: str | None, expiry_dt: datetime | None = None) -> str:
|
||||
"""Format a token for the startup config summary log line.
|
||||
|
||||
Never includes the actual token value.
|
||||
|
||||
Args:
|
||||
token: The token string, or None/empty if not set.
|
||||
expiry_dt: Decoded expiry datetime (for JWTs). None if unknown.
|
||||
|
||||
Returns:
|
||||
Human-readable status, e.g. "[SET - expires in 3 days]" or "[NOT SET]"
|
||||
"""
|
||||
if not token:
|
||||
return "[NOT SET]"
|
||||
|
||||
if expiry_dt is None:
|
||||
return "[SET]"
|
||||
|
||||
now = datetime.now(tz=timezone.utc)
|
||||
if expiry_dt.tzinfo is None:
|
||||
expiry_dt = expiry_dt.replace(tzinfo=timezone.utc)
|
||||
|
||||
delta = expiry_dt - now
|
||||
days = delta.days
|
||||
|
||||
if days < 0:
|
||||
return "[SET - EXPIRED]"
|
||||
if days == 0:
|
||||
hours = int(delta.seconds / 3600)
|
||||
return f"[SET - expires in {hours}h]"
|
||||
return f"[SET - expires in {days} day{'s' if days != 1 else ''}]"
|
||||
|
||||
|
||||
def _parse_dt(ts: str) -> datetime:
|
||||
"""Parse an ISO8601 timestamp to a datetime object.
|
||||
|
||||
Handles both timezone-aware and naive strings.
|
||||
"""
|
||||
ts = ts.rstrip("Z")
|
||||
# Remove sub-second precision beyond microseconds
|
||||
ts = re.sub(r"(\.\d{6})\d+", r"\1", ts)
|
||||
try:
|
||||
dt = datetime.fromisoformat(ts)
|
||||
except ValueError:
|
||||
# Fallback: try common formats
|
||||
for fmt in ("%Y-%m-%dT%H:%M:%S", "%Y-%m-%d"):
|
||||
try:
|
||||
dt = datetime.strptime(ts, fmt)
|
||||
break
|
||||
except ValueError:
|
||||
continue
|
||||
else:
|
||||
dt = datetime.now()
|
||||
|
||||
if dt.tzinfo is None:
|
||||
dt = dt.replace(tzinfo=timezone.utc)
|
||||
return dt
|
||||
Reference in New Issue
Block a user