Refactor configuration management: migrate to per-user storage, update encryption handling, and enhance versioning in CI

This commit is contained in:
SitiWeb
2026-01-23 17:17:25 +01:00
parent 43c5bdac8c
commit 428e3306a0
11 changed files with 570 additions and 279 deletions

View File

@@ -1,188 +1,325 @@
from cryptography.fernet import Fernet
from __future__ import annotations
import json
import os
import sys
from pathlib import Path
from typing import Any, Dict, List, Optional
import platformdirs
import keyring
from cryptography.fernet import Fernet
class ConfigEncryptor:
def __init__(self, key, filename="config.enc"):
self.key = key
self.filename = filename
self.fernet = Fernet(self.key)
APP_NAME = "Image Processor"
APP_AUTHOR = "images_py"
KEYRING_SERVICE = "images_py.image_processor"
def encrypt_config(self, data):
"""
Encrypt the given data and save it to a file.
Args:
data (dict): The dictionary containing credentials and options to encrypt and save.
"""
try:
json_data = json.dumps(data)
encrypted_data = self.fernet.encrypt(json_data.encode())
with open(self.filename, "wb") as encrypted_file:
encrypted_file.write(encrypted_data)
print(f"Encrypted configuration saved to {self.filename}")
except Exception as e:
print(f"Error encrypting config: {e}")
PERSISTED_OPTION_KEYS = {
"canvas_width",
"canvas_height",
"template",
"delete_images",
"transparent",
"background_color",
"image_format",
"image_size",
"destination_path",
"selected_directory",
}
def get_key(self):
"""
Return the encryption key.
Returns:
str: The encryption key as a string.
"""
return self.key.decode()
def save_credentials(self, credentials):
"""
Save WooCommerce credentials to the config file, handling multiple credential sets.
Args:
credentials (dict): Dictionary containing WooCommerce credentials.
"""
# Load the existing configuration
config = self.load_config() or {"credentials": [], "options": {}}
# Ensure credentials is a list of dictionaries (if this is the first time saving, initialize it)
if not isinstance(config.get("credentials"), list):
config["credentials"] = []
# Check if the credential with the same 'name' or 'nice_name' already exists and update it
existing_credential = None
for cred in config["credentials"]:
print(credentials)
if cred.get("nice_name") == credentials.get("nice_name"):
existing_credential = cred
break
if existing_credential:
# Update the existing credential set
existing_credential.update(credentials)
else:
# Add new credentials if they don't exist
config["credentials"].append(credentials)
# Set 'active' flag to True for this credential and False for others
for cred in config["credentials"]:
cred['active'] = cred.get("nice_name") == credentials.get("nice_name")
# Encrypt and save the updated config
self.encrypt_config(config)
print(f"Credentials for {credentials.get('nice_name', 'Unnamed')} saved successfully.")
def delete_credentials(self, credentials):
"""
Save WooCommerce credentials to the config file, handling multiple credential sets.
Args:
credentials (dict): Dictionary containing WooCommerce credentials.
"""
# Load the existing configuration
config = self.load_config() or {"credentials": [], "options": {}}
new_config = []
for credi in config["credentials"]:
if credi.get("nice_name") != credentials:
new_config.append(credi)
config["credentials"] = new_config
print(config)
# Encrypt and save the updated config
self.encrypt_config(config)
# Legacy key used ONLY to decrypt an existing legacy ./config.enc and migrate it.
LEGACY_FERNET_KEY = b"u4xTBY5Ns4WYdLvqMjEr138mpMmDEhhqTszKCcDy2cI="
def _config_dir() -> Path:
path = Path(platformdirs.user_config_dir(APP_NAME, APP_AUTHOR))
path.mkdir(parents=True, exist_ok=True)
return path
def save_options(self, options):
"""
Save options to the config file. Filters out non-serializable data.
Args:
options (dict): Dictionary containing options such as canvas width, height, etc.
"""
config = self.load_config() or {"credentials": {}, "options": {}}
serializable_options = {k: v for k, v in options.items() if self.is_json_serializable(v)}
config["options"] = serializable_options
self.encrypt_config(config)
def load_config(self):
"""
Load and decrypt the config file.
Returns:
dict: Decrypted configuration data containing credentials and options, or None if file not found.
"""
if not os.path.exists(self.filename):
print(f"Config file {self.filename} not found.")
return None
def _options_path() -> Path:
return _config_dir() / "options.json"
try:
with open(self.filename, "rb") as encrypted_file:
encrypted_data = encrypted_file.read()
decrypted_data = self.fernet.decrypt(encrypted_data).decode()
config = json.loads(decrypted_data)
return config
except Exception as e:
print(f"Error loading or decrypting config: {e}")
return None
def load_credentials(self):
"""
Load the active WooCommerce credentials from the config file.
Returns:
dict: The active WooCommerce credentials if found, otherwise None.
"""
config = self.load_config()
if config:
# Check if credentials exist and search for the one marked as 'active'
credentials_list = config.get("credentials", [])
if isinstance(credentials_list, list):
for credentials in credentials_list:
if credentials.get("active"):
return credentials
elif isinstance(credentials_list, dict):
return credentials_list
def _secrets_enc_path() -> Path:
# Only used if keyring is unavailable.
return _config_dir() / "credentials.enc"
def _master_key_path() -> Path:
# Only used if keyring is unavailable.
return _config_dir() / "master.key"
def _legacy_candidates() -> List[Path]:
candidates: List[Path] = []
try:
candidates.append(Path.cwd() / "config.enc")
except Exception:
pass
try:
candidates.append(Path(sys.argv[0]).resolve().parent / "config.enc")
except Exception:
pass
# Deduplicate while keeping order
seen = set()
unique: List[Path] = []
for c in candidates:
if str(c) not in seen:
seen.add(str(c))
unique.append(c)
return unique
def _try_keyring_get(username: str) -> Optional[str]:
try:
return keyring.get_password(KEYRING_SERVICE, username)
except Exception:
return None
def _try_keyring_set(username: str, value: str) -> bool:
try:
keyring.set_password(KEYRING_SERVICE, username, value)
return True
except Exception:
return False
def _get_or_create_master_key() -> bytes:
# 1) Allow overriding in dev/CI
env_key = os.environ.get("IMAGE_PROCESSOR_MASTER_KEY")
if env_key:
try:
return env_key.encode() if isinstance(env_key, str) else env_key
except Exception:
pass
# 2) Prefer OS keyring
stored = _try_keyring_get("master_key")
if stored:
return stored.encode()
# 3) Fallback to per-user file
key_path = _master_key_path()
if key_path.exists():
try:
return key_path.read_bytes().strip()
except Exception:
pass
new_key = Fernet.generate_key()
if not _try_keyring_set("master_key", new_key.decode()):
# best-effort file persistence
try:
key_path.write_bytes(new_key)
except Exception:
pass
return new_key
def _load_options() -> Dict[str, Any]:
path = _options_path()
if not path.exists():
return {}
try:
data = json.loads(path.read_text(encoding="utf-8"))
if not isinstance(data, dict):
return {}
cleaned = {k: v for k, v in data.items() if k in PERSISTED_OPTION_KEYS}
# If we had to drop keys, persist the cleaned version.
if cleaned != data:
try:
_save_options(cleaned)
except Exception:
pass
return cleaned
except Exception:
return {}
def _save_options(options: Dict[str, Any]) -> None:
path = _options_path()
path.write_text(json.dumps(options, indent=2), encoding="utf-8")
def _load_credentials_list() -> List[Dict[str, Any]]:
# Prefer keyring storage
raw = _try_keyring_get("credentials_json")
if raw:
try:
data = json.loads(raw)
return data if isinstance(data, list) else []
except Exception:
return []
# Fallback: encrypted file in per-user config dir
enc_path = _secrets_enc_path()
if enc_path.exists():
try:
f = Fernet(_get_or_create_master_key())
decrypted = f.decrypt(enc_path.read_bytes()).decode("utf-8")
data = json.loads(decrypted)
return data if isinstance(data, list) else []
except Exception:
return []
return []
def _save_credentials_list(credentials_list: List[Dict[str, Any]]) -> None:
payload = json.dumps(credentials_list)
# Prefer keyring
if _try_keyring_set("credentials_json", payload):
return
# Fallback: encrypted file
f = Fernet(_get_or_create_master_key())
enc = f.encrypt(payload.encode("utf-8"))
_secrets_enc_path().write_bytes(enc)
def _migrate_legacy_config_if_present() -> None:
# One-time migration from legacy ./config.enc using LEGACY_FERNET_KEY.
for legacy_path in _legacy_candidates():
if not legacy_path.exists():
continue
try:
encrypted_data = legacy_path.read_bytes()
decrypted_data = Fernet(LEGACY_FERNET_KEY).decrypt(encrypted_data).decode("utf-8")
legacy_config = json.loads(decrypted_data)
except Exception:
continue
legacy_options = legacy_config.get("options") if isinstance(legacy_config, dict) else None
if isinstance(legacy_options, dict):
_save_options(legacy_options)
legacy_credentials = legacy_config.get("credentials") if isinstance(legacy_config, dict) else None
credentials_list: List[Dict[str, Any]] = []
if isinstance(legacy_credentials, list):
credentials_list = [c for c in legacy_credentials if isinstance(c, dict)]
elif isinstance(legacy_credentials, dict):
legacy_credentials["active"] = True
credentials_list = [legacy_credentials]
if credentials_list:
# Ensure only one is active
active_found = False
for c in credentials_list:
if c.get("active") and not active_found:
active_found = True
elif c.get("active") and active_found:
c["active"] = False
if not active_found:
credentials_list[0]["active"] = True
_get_or_create_master_key() # generate new key (v2) for per-user storage
_save_credentials_list(credentials_list)
# Remove legacy file after successful migration
try:
legacy_path.unlink(missing_ok=True)
except Exception:
pass
class ConfigEncryptor:
"""Compatibility wrapper.
Historically this class wrote `config.enc` in the working directory with a hardcoded key.
It now stores per-user config (options.json) and secrets (keyring or encrypted fallback).
"""
def __init__(self, key: Optional[bytes] = None, filename: str = "config.enc"):
self.key = key # kept for backward compatibility; no longer required
self.filename = filename # kept for backward compatibility
# One-time legacy migration
_migrate_legacy_config_if_present()
# Ensure a v2 key exists (for encrypted-file fallback)
_get_or_create_master_key()
def get_key(self) -> str:
# Return the v2 master key (mainly useful for debugging).
return _get_or_create_master_key().decode("utf-8")
def save_credentials(self, credentials: Dict[str, Any]) -> None:
config = self.load_config() or {"credentials": [], "options": {}}
credentials_list = config.get("credentials", [])
if not isinstance(credentials_list, list):
credentials_list = []
# Update existing or append
existing = None
for cred in credentials_list:
if isinstance(cred, dict) and cred.get("nice_name") == credentials.get("nice_name"):
existing = cred
break
if existing is not None:
existing.update(credentials)
else:
credentials_list.append(credentials)
# Mark active
target = credentials.get("nice_name")
for cred in credentials_list:
if isinstance(cred, dict):
cred["active"] = cred.get("nice_name") == target
_save_credentials_list([c for c in credentials_list if isinstance(c, dict)])
def delete_credentials(self, credentials: str) -> None:
config = self.load_config() or {"credentials": [], "options": {}}
credentials_list = config.get("credentials", [])
if not isinstance(credentials_list, list):
credentials_list = []
remaining = [c for c in credentials_list if isinstance(c, dict) and c.get("nice_name") != credentials]
if remaining:
# Ensure one active remains
if not any(c.get("active") for c in remaining):
remaining[0]["active"] = True
_save_credentials_list(remaining)
def save_options(self, options: Dict[str, Any]) -> None:
serializable_options = {
k: v
for k, v in options.items()
if k in PERSISTED_OPTION_KEYS and self.is_json_serializable(v)
}
_save_options(serializable_options)
def load_config(self) -> Optional[Dict[str, Any]]:
return {
"credentials": _load_credentials_list(),
"options": _load_options(),
}
def load_credentials(self) -> Optional[Dict[str, Any]]:
config = self.load_config()
if not config:
return None
credentials_list = config.get("credentials", [])
if isinstance(credentials_list, list):
for credentials in credentials_list:
if isinstance(credentials, dict) and credentials.get("active"):
return credentials
return credentials_list[0] if credentials_list else None
if isinstance(credentials_list, dict):
return credentials_list
return None
@staticmethod
def is_json_serializable(value):
"""
Check if a value is JSON serializable.
Args:
value: The value to check.
Returns:
bool: True if value is serializable, False otherwise.
"""
def is_json_serializable(value: Any) -> bool:
try:
json.dumps(value)
return True
except (TypeError, OverflowError):
return False
# Define your key here
key = b"u4xTBY5Ns4WYdLvqMjEr138mpMmDEhhqTszKCcDy2cI="
if __name__ == "__main__":
config_data = {
"credentials": {
"url": "https://yourstore.com",
"consumer_key": "ck_yourconsumerkey",
"consumer_secret": "cs_yoursecret",
"username": "yourusername",
"password": "yourpassword"
},
"options": {
"canvas_width": 900,
"canvas_height": 900,
"template": "{slug}_{sku}_{width}x{height}",
"delete_images": False,
"background_color": "#FFFFFF"
}
}
encryptor = ConfigEncryptor(key)
encryptor.encrypt_config(config_data)