326 lines
10 KiB
Python
326 lines
10 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
import platformdirs
|
|
import keyring
|
|
from cryptography.fernet import Fernet
|
|
|
|
|
|
APP_NAME = "Image Processor"
|
|
APP_AUTHOR = "images_py"
|
|
KEYRING_SERVICE = "images_py.image_processor"
|
|
|
|
PERSISTED_OPTION_KEYS = {
|
|
"canvas_width",
|
|
"canvas_height",
|
|
"template",
|
|
"delete_images",
|
|
"transparent",
|
|
"background_color",
|
|
"image_format",
|
|
"image_size",
|
|
"destination_path",
|
|
"selected_directory",
|
|
}
|
|
|
|
# Legacy key used ONLY to decrypt an existing legacy ./config.enc and migrate it.
|
|
LEGACY_FERNET_KEY = b"u4xTBY5Ns4WYdLvqMjEr138mpMmDEhhqTszKCcDy2cI="
|
|
|
|
|
|
def _config_dir() -> Path:
|
|
path = Path(platformdirs.user_config_dir(APP_NAME, APP_AUTHOR))
|
|
path.mkdir(parents=True, exist_ok=True)
|
|
return path
|
|
|
|
|
|
def _options_path() -> Path:
|
|
return _config_dir() / "options.json"
|
|
|
|
|
|
def _secrets_enc_path() -> Path:
|
|
# Only used if keyring is unavailable.
|
|
return _config_dir() / "credentials.enc"
|
|
|
|
|
|
def _master_key_path() -> Path:
|
|
# Only used if keyring is unavailable.
|
|
return _config_dir() / "master.key"
|
|
|
|
|
|
def _legacy_candidates() -> List[Path]:
|
|
candidates: List[Path] = []
|
|
try:
|
|
candidates.append(Path.cwd() / "config.enc")
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
candidates.append(Path(sys.argv[0]).resolve().parent / "config.enc")
|
|
except Exception:
|
|
pass
|
|
|
|
# Deduplicate while keeping order
|
|
seen = set()
|
|
unique: List[Path] = []
|
|
for c in candidates:
|
|
if str(c) not in seen:
|
|
seen.add(str(c))
|
|
unique.append(c)
|
|
return unique
|
|
|
|
|
|
def _try_keyring_get(username: str) -> Optional[str]:
|
|
try:
|
|
return keyring.get_password(KEYRING_SERVICE, username)
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _try_keyring_set(username: str, value: str) -> bool:
|
|
try:
|
|
keyring.set_password(KEYRING_SERVICE, username, value)
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def _get_or_create_master_key() -> bytes:
|
|
# 1) Allow overriding in dev/CI
|
|
env_key = os.environ.get("IMAGE_PROCESSOR_MASTER_KEY")
|
|
if env_key:
|
|
try:
|
|
return env_key.encode() if isinstance(env_key, str) else env_key
|
|
except Exception:
|
|
pass
|
|
|
|
# 2) Prefer OS keyring
|
|
stored = _try_keyring_get("master_key")
|
|
if stored:
|
|
return stored.encode()
|
|
|
|
# 3) Fallback to per-user file
|
|
key_path = _master_key_path()
|
|
if key_path.exists():
|
|
try:
|
|
return key_path.read_bytes().strip()
|
|
except Exception:
|
|
pass
|
|
|
|
new_key = Fernet.generate_key()
|
|
if not _try_keyring_set("master_key", new_key.decode()):
|
|
# best-effort file persistence
|
|
try:
|
|
key_path.write_bytes(new_key)
|
|
except Exception:
|
|
pass
|
|
return new_key
|
|
|
|
|
|
def _load_options() -> Dict[str, Any]:
|
|
path = _options_path()
|
|
if not path.exists():
|
|
return {}
|
|
try:
|
|
data = json.loads(path.read_text(encoding="utf-8"))
|
|
if not isinstance(data, dict):
|
|
return {}
|
|
cleaned = {k: v for k, v in data.items() if k in PERSISTED_OPTION_KEYS}
|
|
# If we had to drop keys, persist the cleaned version.
|
|
if cleaned != data:
|
|
try:
|
|
_save_options(cleaned)
|
|
except Exception:
|
|
pass
|
|
return cleaned
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
def _save_options(options: Dict[str, Any]) -> None:
|
|
path = _options_path()
|
|
path.write_text(json.dumps(options, indent=2), encoding="utf-8")
|
|
|
|
|
|
def _load_credentials_list() -> List[Dict[str, Any]]:
|
|
# Prefer keyring storage
|
|
raw = _try_keyring_get("credentials_json")
|
|
if raw:
|
|
try:
|
|
data = json.loads(raw)
|
|
return data if isinstance(data, list) else []
|
|
except Exception:
|
|
return []
|
|
|
|
# Fallback: encrypted file in per-user config dir
|
|
enc_path = _secrets_enc_path()
|
|
if enc_path.exists():
|
|
try:
|
|
f = Fernet(_get_or_create_master_key())
|
|
decrypted = f.decrypt(enc_path.read_bytes()).decode("utf-8")
|
|
data = json.loads(decrypted)
|
|
return data if isinstance(data, list) else []
|
|
except Exception:
|
|
return []
|
|
|
|
return []
|
|
|
|
|
|
def _save_credentials_list(credentials_list: List[Dict[str, Any]]) -> None:
|
|
payload = json.dumps(credentials_list)
|
|
|
|
# Prefer keyring
|
|
if _try_keyring_set("credentials_json", payload):
|
|
return
|
|
|
|
# Fallback: encrypted file
|
|
f = Fernet(_get_or_create_master_key())
|
|
enc = f.encrypt(payload.encode("utf-8"))
|
|
_secrets_enc_path().write_bytes(enc)
|
|
|
|
|
|
def _migrate_legacy_config_if_present() -> None:
|
|
# One-time migration from legacy ./config.enc using LEGACY_FERNET_KEY.
|
|
for legacy_path in _legacy_candidates():
|
|
if not legacy_path.exists():
|
|
continue
|
|
try:
|
|
encrypted_data = legacy_path.read_bytes()
|
|
decrypted_data = Fernet(LEGACY_FERNET_KEY).decrypt(encrypted_data).decode("utf-8")
|
|
legacy_config = json.loads(decrypted_data)
|
|
except Exception:
|
|
continue
|
|
|
|
legacy_options = legacy_config.get("options") if isinstance(legacy_config, dict) else None
|
|
if isinstance(legacy_options, dict):
|
|
_save_options(legacy_options)
|
|
|
|
legacy_credentials = legacy_config.get("credentials") if isinstance(legacy_config, dict) else None
|
|
credentials_list: List[Dict[str, Any]] = []
|
|
if isinstance(legacy_credentials, list):
|
|
credentials_list = [c for c in legacy_credentials if isinstance(c, dict)]
|
|
elif isinstance(legacy_credentials, dict):
|
|
legacy_credentials["active"] = True
|
|
credentials_list = [legacy_credentials]
|
|
|
|
if credentials_list:
|
|
# Ensure only one is active
|
|
active_found = False
|
|
for c in credentials_list:
|
|
if c.get("active") and not active_found:
|
|
active_found = True
|
|
elif c.get("active") and active_found:
|
|
c["active"] = False
|
|
if not active_found:
|
|
credentials_list[0]["active"] = True
|
|
|
|
_get_or_create_master_key() # generate new key (v2) for per-user storage
|
|
_save_credentials_list(credentials_list)
|
|
|
|
# Remove legacy file after successful migration
|
|
try:
|
|
legacy_path.unlink(missing_ok=True)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
class ConfigEncryptor:
|
|
"""Compatibility wrapper.
|
|
|
|
Historically this class wrote `config.enc` in the working directory with a hardcoded key.
|
|
It now stores per-user config (options.json) and secrets (keyring or encrypted fallback).
|
|
"""
|
|
|
|
def __init__(self, key: Optional[bytes] = None, filename: str = "config.enc"):
|
|
self.key = key # kept for backward compatibility; no longer required
|
|
self.filename = filename # kept for backward compatibility
|
|
|
|
# One-time legacy migration
|
|
_migrate_legacy_config_if_present()
|
|
|
|
# Ensure a v2 key exists (for encrypted-file fallback)
|
|
_get_or_create_master_key()
|
|
|
|
def get_key(self) -> str:
|
|
# Return the v2 master key (mainly useful for debugging).
|
|
return _get_or_create_master_key().decode("utf-8")
|
|
|
|
def save_credentials(self, credentials: Dict[str, Any]) -> None:
|
|
config = self.load_config() or {"credentials": [], "options": {}}
|
|
credentials_list = config.get("credentials", [])
|
|
if not isinstance(credentials_list, list):
|
|
credentials_list = []
|
|
|
|
# Update existing or append
|
|
existing = None
|
|
for cred in credentials_list:
|
|
if isinstance(cred, dict) and cred.get("nice_name") == credentials.get("nice_name"):
|
|
existing = cred
|
|
break
|
|
|
|
if existing is not None:
|
|
existing.update(credentials)
|
|
else:
|
|
credentials_list.append(credentials)
|
|
|
|
# Mark active
|
|
target = credentials.get("nice_name")
|
|
for cred in credentials_list:
|
|
if isinstance(cred, dict):
|
|
cred["active"] = cred.get("nice_name") == target
|
|
|
|
_save_credentials_list([c for c in credentials_list if isinstance(c, dict)])
|
|
|
|
def delete_credentials(self, credentials: str) -> None:
|
|
config = self.load_config() or {"credentials": [], "options": {}}
|
|
credentials_list = config.get("credentials", [])
|
|
if not isinstance(credentials_list, list):
|
|
credentials_list = []
|
|
|
|
remaining = [c for c in credentials_list if isinstance(c, dict) and c.get("nice_name") != credentials]
|
|
if remaining:
|
|
# Ensure one active remains
|
|
if not any(c.get("active") for c in remaining):
|
|
remaining[0]["active"] = True
|
|
_save_credentials_list(remaining)
|
|
|
|
def save_options(self, options: Dict[str, Any]) -> None:
|
|
serializable_options = {
|
|
k: v
|
|
for k, v in options.items()
|
|
if k in PERSISTED_OPTION_KEYS and self.is_json_serializable(v)
|
|
}
|
|
_save_options(serializable_options)
|
|
|
|
def load_config(self) -> Optional[Dict[str, Any]]:
|
|
return {
|
|
"credentials": _load_credentials_list(),
|
|
"options": _load_options(),
|
|
}
|
|
|
|
def load_credentials(self) -> Optional[Dict[str, Any]]:
|
|
config = self.load_config()
|
|
if not config:
|
|
return None
|
|
credentials_list = config.get("credentials", [])
|
|
if isinstance(credentials_list, list):
|
|
for credentials in credentials_list:
|
|
if isinstance(credentials, dict) and credentials.get("active"):
|
|
return credentials
|
|
return credentials_list[0] if credentials_list else None
|
|
if isinstance(credentials_list, dict):
|
|
return credentials_list
|
|
return None
|
|
|
|
@staticmethod
|
|
def is_json_serializable(value: Any) -> bool:
|
|
try:
|
|
json.dumps(value)
|
|
return True
|
|
except (TypeError, OverflowError):
|
|
return False
|