Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2409dde5c6 | ||
|
|
78382680a4 | ||
|
|
8d77c9c90b | ||
|
|
428e3306a0 | ||
|
|
43c5bdac8c | ||
|
|
551948d828 |
70
.github/workflows/build-release.yml
vendored
Normal file
70
.github/workflows/build-release.yml
vendored
Normal file
@@ -0,0 +1,70 @@
|
||||
name: Build & Release (Windows)
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: [ main ]
|
||||
tags:
|
||||
- 'v*'
|
||||
workflow_dispatch: {}
|
||||
|
||||
permissions:
|
||||
contents: write
|
||||
|
||||
jobs:
|
||||
build-windows:
|
||||
runs-on: windows-latest
|
||||
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: '3.12'
|
||||
cache: 'pip'
|
||||
|
||||
- name: Install dependencies
|
||||
shell: pwsh
|
||||
run: |
|
||||
python -m pip install --upgrade pip
|
||||
python -m pip install -r requirements.txt
|
||||
python -m pip install pyinstaller
|
||||
|
||||
- name: Stamp version (dev builds)
|
||||
if: startsWith(github.ref, 'refs/heads/')
|
||||
shell: pwsh
|
||||
run: |
|
||||
$sha = "${{ github.sha }}".Substring(0,7)
|
||||
Set-Content -Path version.py -Value "__version__ = `"0.0.0.dev0+$sha`"`n" -Encoding utf8
|
||||
|
||||
- name: Stamp version (tag builds)
|
||||
if: startsWith(github.ref, 'refs/tags/v')
|
||||
shell: pwsh
|
||||
run: |
|
||||
$tag = "${{ github.ref_name }}"
|
||||
$ver = ($tag -replace '^v','')
|
||||
Set-Content -Path version.py -Value "__version__ = `"$ver`"`n" -Encoding utf8
|
||||
|
||||
- name: Build (PyInstaller spec)
|
||||
shell: pwsh
|
||||
run: |
|
||||
pyinstaller --noconfirm --clean main.spec
|
||||
|
||||
- name: Upload build artifact (push/dispatch)
|
||||
if: startsWith(github.ref, 'refs/heads/') || github.event_name == 'workflow_dispatch'
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: image_processor-windows
|
||||
path: |
|
||||
dist/**
|
||||
|
||||
- name: Create GitHub Release (tags)
|
||||
if: startsWith(github.ref, 'refs/tags/v')
|
||||
uses: softprops/action-gh-release@v2
|
||||
with:
|
||||
name: ${{ github.ref_name }}
|
||||
tag_name: ${{ github.ref_name }}
|
||||
generate_release_notes: true
|
||||
files: |
|
||||
dist/**
|
||||
@@ -5,38 +5,12 @@ import tempfile
|
||||
import requests
|
||||
from tkinter import messagebox
|
||||
from woocommerce import API
|
||||
from cryptography.fernet import Fernet
|
||||
from utils.image_processing import ImageProcessor
|
||||
from config.encrypt_config import ConfigEncryptor
|
||||
from utils.file_operations import FileProcessor
|
||||
import hashlib
|
||||
import pprint
|
||||
CREDENTIALS_FILE = "credentials.json"
|
||||
|
||||
# Hardcoded key (replace with your generated key)
|
||||
KEY = b"u4xTBY5Ns4WYdLvqMjEr138mpMmDEhhqTszKCcDy2cI="
|
||||
|
||||
def save_active_credential_set(active_set_name):
|
||||
"""
|
||||
Update the active credential set in the saved credentials file.
|
||||
|
||||
Args:
|
||||
active_set_name (str): The name of the active credential set.
|
||||
"""
|
||||
if not os.path.exists(CREDENTIALS_FILE):
|
||||
return
|
||||
|
||||
with open(CREDENTIALS_FILE, 'r+') as file:
|
||||
data = json.load(file)
|
||||
|
||||
# Find the credential set and mark it as active
|
||||
for cred in data.get('credentials', []):
|
||||
cred['active'] = (cred['name'] == active_set_name)
|
||||
|
||||
# Rewrite the updated data back to the file
|
||||
file.seek(0)
|
||||
json.dump(data, file, indent=4)
|
||||
file.truncate()
|
||||
|
||||
|
||||
def save_credentials(url, consumer_key, consumer_secret, username, password):
|
||||
@@ -58,7 +32,7 @@ def save_credentials(url, consumer_key, consumer_secret, username, password):
|
||||
"password": password,
|
||||
}
|
||||
|
||||
ConfigEncryptor(KEY).save_credentials(consumer_key, consumer_secret, username, password)
|
||||
ConfigEncryptor().save_credentials(credentials)
|
||||
|
||||
|
||||
def load_credentials():
|
||||
@@ -68,7 +42,7 @@ def load_credentials():
|
||||
Returns:
|
||||
dict: The decrypted credentials, or None if the file does not exist.
|
||||
"""
|
||||
creds = ConfigEncryptor(KEY).load_credentials()
|
||||
creds = ConfigEncryptor().load_credentials()
|
||||
return creds
|
||||
|
||||
|
||||
@@ -81,6 +55,13 @@ def get_wcapi():
|
||||
"""
|
||||
active_credentials = load_credentials()
|
||||
|
||||
if not active_credentials:
|
||||
messagebox.showerror(
|
||||
"Missing credentials",
|
||||
"No active credentials found. Please configure them in Settings first.",
|
||||
)
|
||||
return None
|
||||
|
||||
pprint.pprint(active_credentials)
|
||||
|
||||
return API(
|
||||
@@ -433,6 +414,7 @@ def process_all_products(options):
|
||||
|
||||
page = 1
|
||||
total_products = 0 # Initialize the counter for total products
|
||||
log = options.get("log_message", None)
|
||||
|
||||
while True:
|
||||
products = wcapi.get("products", params={"per_page": 100, "page": page}).json()
|
||||
@@ -446,7 +428,6 @@ def process_all_products(options):
|
||||
total_products += 1 # Update the total count
|
||||
options["product_id"] = product["id"]
|
||||
options["product"] = product
|
||||
log = options.get("log_message", None)
|
||||
if log:
|
||||
if product:
|
||||
name = product.get("name", "")
|
||||
@@ -456,7 +437,8 @@ def process_all_products(options):
|
||||
page += 1
|
||||
|
||||
# Log the total number of products processed
|
||||
log(f"Total products processed: {total_products}")
|
||||
if log:
|
||||
log.log_message(f"Total products processed: {total_products}")
|
||||
|
||||
# Show completion message
|
||||
messagebox.showinfo(
|
||||
|
||||
@@ -1,62 +1,28 @@
|
||||
"""
|
||||
Module for decrypting configuration files using Fernet symmetric encryption.
|
||||
"""Deprecated legacy module.
|
||||
|
||||
Historically this project stored settings and credentials in an encrypted file in the working directory.
|
||||
The application now uses per-user storage (options.json under the user config directory) and stores
|
||||
credentials via OS keyring with an encrypted-file fallback.
|
||||
|
||||
This module is kept only to avoid breaking old imports.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
from cryptography.fernet import Fernet
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
from config.encrypt_config import ConfigEncryptor
|
||||
|
||||
|
||||
DECRYPTION_KEY = None
|
||||
|
||||
|
||||
class ConfigDecryptor:
|
||||
"""
|
||||
Class to handle decryption of configuration files.
|
||||
"""
|
||||
|
||||
def __init__(self, decryption_key):
|
||||
"""
|
||||
Initialize the ConfigDecryptor with a given decryption key.
|
||||
|
||||
Args:
|
||||
decryption_key (bytes): The key to use for decryption.
|
||||
"""
|
||||
def __init__(self, decryption_key=None):
|
||||
self.decryption_key = decryption_key
|
||||
|
||||
def decrypt(self):
|
||||
"""
|
||||
Decrypt the 'config.enc' file and return the configuration data.
|
||||
def decrypt(self) -> Optional[Dict[str, Any]]:
|
||||
return ConfigEncryptor().load_config()
|
||||
|
||||
Returns:
|
||||
dict: The decrypted configuration data.
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the 'config.enc' file does not exist.
|
||||
Exception: If any other error occurs during decryption.
|
||||
"""
|
||||
if not os.path.exists("config.enc"):
|
||||
raise FileNotFoundError(
|
||||
"The encrypted configuration file 'config.enc' does not exist."
|
||||
)
|
||||
|
||||
fernet = Fernet(self.decryption_key)
|
||||
with open("config.enc", "rb") as encrypted_file:
|
||||
encrypted = encrypted_file.read()
|
||||
decrypted = fernet.decrypt(encrypted).decode()
|
||||
return json.loads(decrypted)
|
||||
|
||||
def hello_world(self):
|
||||
"""
|
||||
Placeholder
|
||||
"""
|
||||
def hello_world(self) -> str:
|
||||
return "Hello world"
|
||||
|
||||
|
||||
# Define your key here
|
||||
# Replace with your actual key
|
||||
DECRYPTION_KEY = b"u4xTBY5Ns4WYdLvqMjEr138mpMmDEhhqTszKCcDy2cI="
|
||||
|
||||
if __name__ == "__main__":
|
||||
decryptor = ConfigDecryptor(DECRYPTION_KEY)
|
||||
try:
|
||||
config = decryptor.decrypt()
|
||||
except FileNotFoundError as e:
|
||||
print(e)
|
||||
|
||||
@@ -1,188 +1,325 @@
|
||||
from cryptography.fernet import Fernet
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
import platformdirs
|
||||
import keyring
|
||||
from cryptography.fernet import Fernet
|
||||
|
||||
|
||||
APP_NAME = "Image Processor"
|
||||
APP_AUTHOR = "images_py"
|
||||
KEYRING_SERVICE = "images_py.image_processor"
|
||||
|
||||
PERSISTED_OPTION_KEYS = {
|
||||
"canvas_width",
|
||||
"canvas_height",
|
||||
"template",
|
||||
"delete_images",
|
||||
"transparent",
|
||||
"background_color",
|
||||
"image_format",
|
||||
"image_size",
|
||||
"destination_path",
|
||||
"selected_directory",
|
||||
}
|
||||
|
||||
# Legacy key used ONLY to decrypt an existing legacy ./config.enc and migrate it.
|
||||
LEGACY_FERNET_KEY = b"u4xTBY5Ns4WYdLvqMjEr138mpMmDEhhqTszKCcDy2cI="
|
||||
|
||||
|
||||
def _config_dir() -> Path:
|
||||
path = Path(platformdirs.user_config_dir(APP_NAME, APP_AUTHOR))
|
||||
path.mkdir(parents=True, exist_ok=True)
|
||||
return path
|
||||
|
||||
|
||||
def _options_path() -> Path:
|
||||
return _config_dir() / "options.json"
|
||||
|
||||
|
||||
def _secrets_enc_path() -> Path:
|
||||
# Only used if keyring is unavailable.
|
||||
return _config_dir() / "credentials.enc"
|
||||
|
||||
|
||||
def _master_key_path() -> Path:
|
||||
# Only used if keyring is unavailable.
|
||||
return _config_dir() / "master.key"
|
||||
|
||||
|
||||
def _legacy_candidates() -> List[Path]:
|
||||
candidates: List[Path] = []
|
||||
try:
|
||||
candidates.append(Path.cwd() / "config.enc")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
try:
|
||||
candidates.append(Path(sys.argv[0]).resolve().parent / "config.enc")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Deduplicate while keeping order
|
||||
seen = set()
|
||||
unique: List[Path] = []
|
||||
for c in candidates:
|
||||
if str(c) not in seen:
|
||||
seen.add(str(c))
|
||||
unique.append(c)
|
||||
return unique
|
||||
|
||||
|
||||
def _try_keyring_get(username: str) -> Optional[str]:
|
||||
try:
|
||||
return keyring.get_password(KEYRING_SERVICE, username)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def _try_keyring_set(username: str, value: str) -> bool:
|
||||
try:
|
||||
keyring.set_password(KEYRING_SERVICE, username, value)
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def _get_or_create_master_key() -> bytes:
|
||||
# 1) Allow overriding in dev/CI
|
||||
env_key = os.environ.get("IMAGE_PROCESSOR_MASTER_KEY")
|
||||
if env_key:
|
||||
try:
|
||||
return env_key.encode() if isinstance(env_key, str) else env_key
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# 2) Prefer OS keyring
|
||||
stored = _try_keyring_get("master_key")
|
||||
if stored:
|
||||
return stored.encode()
|
||||
|
||||
# 3) Fallback to per-user file
|
||||
key_path = _master_key_path()
|
||||
if key_path.exists():
|
||||
try:
|
||||
return key_path.read_bytes().strip()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
new_key = Fernet.generate_key()
|
||||
if not _try_keyring_set("master_key", new_key.decode()):
|
||||
# best-effort file persistence
|
||||
try:
|
||||
key_path.write_bytes(new_key)
|
||||
except Exception:
|
||||
pass
|
||||
return new_key
|
||||
|
||||
|
||||
def _load_options() -> Dict[str, Any]:
|
||||
path = _options_path()
|
||||
if not path.exists():
|
||||
return {}
|
||||
try:
|
||||
data = json.loads(path.read_text(encoding="utf-8"))
|
||||
if not isinstance(data, dict):
|
||||
return {}
|
||||
cleaned = {k: v for k, v in data.items() if k in PERSISTED_OPTION_KEYS}
|
||||
# If we had to drop keys, persist the cleaned version.
|
||||
if cleaned != data:
|
||||
try:
|
||||
_save_options(cleaned)
|
||||
except Exception:
|
||||
pass
|
||||
return cleaned
|
||||
except Exception:
|
||||
return {}
|
||||
|
||||
|
||||
def _save_options(options: Dict[str, Any]) -> None:
|
||||
path = _options_path()
|
||||
path.write_text(json.dumps(options, indent=2), encoding="utf-8")
|
||||
|
||||
|
||||
def _load_credentials_list() -> List[Dict[str, Any]]:
|
||||
# Prefer keyring storage
|
||||
raw = _try_keyring_get("credentials_json")
|
||||
if raw:
|
||||
try:
|
||||
data = json.loads(raw)
|
||||
return data if isinstance(data, list) else []
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
# Fallback: encrypted file in per-user config dir
|
||||
enc_path = _secrets_enc_path()
|
||||
if enc_path.exists():
|
||||
try:
|
||||
f = Fernet(_get_or_create_master_key())
|
||||
decrypted = f.decrypt(enc_path.read_bytes()).decode("utf-8")
|
||||
data = json.loads(decrypted)
|
||||
return data if isinstance(data, list) else []
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
return []
|
||||
|
||||
|
||||
def _save_credentials_list(credentials_list: List[Dict[str, Any]]) -> None:
|
||||
payload = json.dumps(credentials_list)
|
||||
|
||||
# Prefer keyring
|
||||
if _try_keyring_set("credentials_json", payload):
|
||||
return
|
||||
|
||||
# Fallback: encrypted file
|
||||
f = Fernet(_get_or_create_master_key())
|
||||
enc = f.encrypt(payload.encode("utf-8"))
|
||||
_secrets_enc_path().write_bytes(enc)
|
||||
|
||||
|
||||
def _migrate_legacy_config_if_present() -> None:
|
||||
# One-time migration from legacy ./config.enc using LEGACY_FERNET_KEY.
|
||||
for legacy_path in _legacy_candidates():
|
||||
if not legacy_path.exists():
|
||||
continue
|
||||
try:
|
||||
encrypted_data = legacy_path.read_bytes()
|
||||
decrypted_data = Fernet(LEGACY_FERNET_KEY).decrypt(encrypted_data).decode("utf-8")
|
||||
legacy_config = json.loads(decrypted_data)
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
legacy_options = legacy_config.get("options") if isinstance(legacy_config, dict) else None
|
||||
if isinstance(legacy_options, dict):
|
||||
_save_options(legacy_options)
|
||||
|
||||
legacy_credentials = legacy_config.get("credentials") if isinstance(legacy_config, dict) else None
|
||||
credentials_list: List[Dict[str, Any]] = []
|
||||
if isinstance(legacy_credentials, list):
|
||||
credentials_list = [c for c in legacy_credentials if isinstance(c, dict)]
|
||||
elif isinstance(legacy_credentials, dict):
|
||||
legacy_credentials["active"] = True
|
||||
credentials_list = [legacy_credentials]
|
||||
|
||||
if credentials_list:
|
||||
# Ensure only one is active
|
||||
active_found = False
|
||||
for c in credentials_list:
|
||||
if c.get("active") and not active_found:
|
||||
active_found = True
|
||||
elif c.get("active") and active_found:
|
||||
c["active"] = False
|
||||
if not active_found:
|
||||
credentials_list[0]["active"] = True
|
||||
|
||||
_get_or_create_master_key() # generate new key (v2) for per-user storage
|
||||
_save_credentials_list(credentials_list)
|
||||
|
||||
# Remove legacy file after successful migration
|
||||
try:
|
||||
legacy_path.unlink(missing_ok=True)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
class ConfigEncryptor:
|
||||
def __init__(self, key, filename="config.enc"):
|
||||
self.key = key
|
||||
self.filename = filename
|
||||
self.fernet = Fernet(self.key)
|
||||
"""Compatibility wrapper.
|
||||
|
||||
def encrypt_config(self, data):
|
||||
Historically this class wrote `config.enc` in the working directory with a hardcoded key.
|
||||
It now stores per-user config (options.json) and secrets (keyring or encrypted fallback).
|
||||
"""
|
||||
Encrypt the given data and save it to a file.
|
||||
|
||||
Args:
|
||||
data (dict): The dictionary containing credentials and options to encrypt and save.
|
||||
"""
|
||||
try:
|
||||
json_data = json.dumps(data)
|
||||
encrypted_data = self.fernet.encrypt(json_data.encode())
|
||||
with open(self.filename, "wb") as encrypted_file:
|
||||
encrypted_file.write(encrypted_data)
|
||||
print(f"Encrypted configuration saved to {self.filename}")
|
||||
except Exception as e:
|
||||
print(f"Error encrypting config: {e}")
|
||||
def __init__(self, key: Optional[bytes] = None, filename: str = "config.enc"):
|
||||
self.key = key # kept for backward compatibility; no longer required
|
||||
self.filename = filename # kept for backward compatibility
|
||||
|
||||
def get_key(self):
|
||||
"""
|
||||
Return the encryption key.
|
||||
# One-time legacy migration
|
||||
_migrate_legacy_config_if_present()
|
||||
|
||||
Returns:
|
||||
str: The encryption key as a string.
|
||||
"""
|
||||
return self.key.decode()
|
||||
# Ensure a v2 key exists (for encrypted-file fallback)
|
||||
_get_or_create_master_key()
|
||||
|
||||
def save_credentials(self, credentials):
|
||||
"""
|
||||
Save WooCommerce credentials to the config file, handling multiple credential sets.
|
||||
def get_key(self) -> str:
|
||||
# Return the v2 master key (mainly useful for debugging).
|
||||
return _get_or_create_master_key().decode("utf-8")
|
||||
|
||||
Args:
|
||||
credentials (dict): Dictionary containing WooCommerce credentials.
|
||||
"""
|
||||
# Load the existing configuration
|
||||
def save_credentials(self, credentials: Dict[str, Any]) -> None:
|
||||
config = self.load_config() or {"credentials": [], "options": {}}
|
||||
credentials_list = config.get("credentials", [])
|
||||
if not isinstance(credentials_list, list):
|
||||
credentials_list = []
|
||||
|
||||
# Ensure credentials is a list of dictionaries (if this is the first time saving, initialize it)
|
||||
if not isinstance(config.get("credentials"), list):
|
||||
config["credentials"] = []
|
||||
|
||||
# Check if the credential with the same 'name' or 'nice_name' already exists and update it
|
||||
existing_credential = None
|
||||
for cred in config["credentials"]:
|
||||
print(credentials)
|
||||
if cred.get("nice_name") == credentials.get("nice_name"):
|
||||
existing_credential = cred
|
||||
# Update existing or append
|
||||
existing = None
|
||||
for cred in credentials_list:
|
||||
if isinstance(cred, dict) and cred.get("nice_name") == credentials.get("nice_name"):
|
||||
existing = cred
|
||||
break
|
||||
|
||||
if existing_credential:
|
||||
# Update the existing credential set
|
||||
existing_credential.update(credentials)
|
||||
if existing is not None:
|
||||
existing.update(credentials)
|
||||
else:
|
||||
# Add new credentials if they don't exist
|
||||
config["credentials"].append(credentials)
|
||||
credentials_list.append(credentials)
|
||||
|
||||
# Set 'active' flag to True for this credential and False for others
|
||||
for cred in config["credentials"]:
|
||||
cred['active'] = cred.get("nice_name") == credentials.get("nice_name")
|
||||
# Mark active
|
||||
target = credentials.get("nice_name")
|
||||
for cred in credentials_list:
|
||||
if isinstance(cred, dict):
|
||||
cred["active"] = cred.get("nice_name") == target
|
||||
|
||||
# Encrypt and save the updated config
|
||||
self.encrypt_config(config)
|
||||
print(f"Credentials for {credentials.get('nice_name', 'Unnamed')} saved successfully.")
|
||||
_save_credentials_list([c for c in credentials_list if isinstance(c, dict)])
|
||||
|
||||
def delete_credentials(self, credentials):
|
||||
"""
|
||||
Save WooCommerce credentials to the config file, handling multiple credential sets.
|
||||
|
||||
Args:
|
||||
credentials (dict): Dictionary containing WooCommerce credentials.
|
||||
"""
|
||||
# Load the existing configuration
|
||||
def delete_credentials(self, credentials: str) -> None:
|
||||
config = self.load_config() or {"credentials": [], "options": {}}
|
||||
credentials_list = config.get("credentials", [])
|
||||
if not isinstance(credentials_list, list):
|
||||
credentials_list = []
|
||||
|
||||
new_config = []
|
||||
for credi in config["credentials"]:
|
||||
remaining = [c for c in credentials_list if isinstance(c, dict) and c.get("nice_name") != credentials]
|
||||
if remaining:
|
||||
# Ensure one active remains
|
||||
if not any(c.get("active") for c in remaining):
|
||||
remaining[0]["active"] = True
|
||||
_save_credentials_list(remaining)
|
||||
|
||||
if credi.get("nice_name") != credentials:
|
||||
new_config.append(credi)
|
||||
config["credentials"] = new_config
|
||||
print(config)
|
||||
# Encrypt and save the updated config
|
||||
self.encrypt_config(config)
|
||||
def save_options(self, options: Dict[str, Any]) -> None:
|
||||
serializable_options = {
|
||||
k: v
|
||||
for k, v in options.items()
|
||||
if k in PERSISTED_OPTION_KEYS and self.is_json_serializable(v)
|
||||
}
|
||||
_save_options(serializable_options)
|
||||
|
||||
def load_config(self) -> Optional[Dict[str, Any]]:
|
||||
return {
|
||||
"credentials": _load_credentials_list(),
|
||||
"options": _load_options(),
|
||||
}
|
||||
|
||||
|
||||
def save_options(self, options):
|
||||
"""
|
||||
Save options to the config file. Filters out non-serializable data.
|
||||
|
||||
Args:
|
||||
options (dict): Dictionary containing options such as canvas width, height, etc.
|
||||
"""
|
||||
config = self.load_config() or {"credentials": {}, "options": {}}
|
||||
serializable_options = {k: v for k, v in options.items() if self.is_json_serializable(v)}
|
||||
config["options"] = serializable_options
|
||||
self.encrypt_config(config)
|
||||
|
||||
def load_config(self):
|
||||
"""
|
||||
Load and decrypt the config file.
|
||||
|
||||
Returns:
|
||||
dict: Decrypted configuration data containing credentials and options, or None if file not found.
|
||||
"""
|
||||
if not os.path.exists(self.filename):
|
||||
print(f"Config file {self.filename} not found.")
|
||||
return None
|
||||
|
||||
try:
|
||||
with open(self.filename, "rb") as encrypted_file:
|
||||
encrypted_data = encrypted_file.read()
|
||||
decrypted_data = self.fernet.decrypt(encrypted_data).decode()
|
||||
config = json.loads(decrypted_data)
|
||||
return config
|
||||
except Exception as e:
|
||||
print(f"Error loading or decrypting config: {e}")
|
||||
return None
|
||||
|
||||
def load_credentials(self):
|
||||
"""
|
||||
Load the active WooCommerce credentials from the config file.
|
||||
|
||||
Returns:
|
||||
dict: The active WooCommerce credentials if found, otherwise None.
|
||||
"""
|
||||
def load_credentials(self) -> Optional[Dict[str, Any]]:
|
||||
config = self.load_config()
|
||||
if config:
|
||||
# Check if credentials exist and search for the one marked as 'active'
|
||||
if not config:
|
||||
return None
|
||||
credentials_list = config.get("credentials", [])
|
||||
if isinstance(credentials_list, list):
|
||||
for credentials in credentials_list:
|
||||
if credentials.get("active"):
|
||||
if isinstance(credentials, dict) and credentials.get("active"):
|
||||
return credentials
|
||||
elif isinstance(credentials_list, dict):
|
||||
return credentials_list[0] if credentials_list else None
|
||||
if isinstance(credentials_list, dict):
|
||||
return credentials_list
|
||||
return None
|
||||
|
||||
|
||||
@staticmethod
|
||||
def is_json_serializable(value):
|
||||
"""
|
||||
Check if a value is JSON serializable.
|
||||
|
||||
Args:
|
||||
value: The value to check.
|
||||
|
||||
Returns:
|
||||
bool: True if value is serializable, False otherwise.
|
||||
"""
|
||||
def is_json_serializable(value: Any) -> bool:
|
||||
try:
|
||||
json.dumps(value)
|
||||
return True
|
||||
except (TypeError, OverflowError):
|
||||
return False
|
||||
|
||||
|
||||
# Define your key here
|
||||
key = b"u4xTBY5Ns4WYdLvqMjEr138mpMmDEhhqTszKCcDy2cI="
|
||||
|
||||
if __name__ == "__main__":
|
||||
config_data = {
|
||||
"credentials": {
|
||||
"url": "https://yourstore.com",
|
||||
"consumer_key": "ck_yourconsumerkey",
|
||||
"consumer_secret": "cs_yoursecret",
|
||||
"username": "yourusername",
|
||||
"password": "yourpassword"
|
||||
},
|
||||
"options": {
|
||||
"canvas_width": 900,
|
||||
"canvas_height": 900,
|
||||
"template": "{slug}_{sku}_{width}x{height}",
|
||||
"delete_images": False,
|
||||
"background_color": "#FFFFFF"
|
||||
}
|
||||
}
|
||||
encryptor = ConfigEncryptor(key)
|
||||
encryptor.encrypt_config(config_data)
|
||||
|
||||
@@ -6,6 +6,12 @@ from ui.options_window import OptionsWindow
|
||||
from config.encrypt_config import ConfigEncryptor
|
||||
from api.woocommerce_api import get_first_image
|
||||
from PIL import Image, ImageTk
|
||||
|
||||
# Enable AVIF support for Pillow previews when the optional plugin is installed.
|
||||
try:
|
||||
import pillow_avif # type: ignore
|
||||
except Exception:
|
||||
pillow_avif = None
|
||||
from pprint import pformat
|
||||
from api.woocommerce_api import process_product_images, process_all_products, search_product, get_first_image_path, get_product
|
||||
import customtkinter as ctk
|
||||
@@ -23,7 +29,6 @@ class AppController:
|
||||
Args:
|
||||
root (ctk.CTk): The root CustomTkinter window.
|
||||
"""
|
||||
key = b"u4xTBY5Ns4WYdLvqMjEr138mpMmDEhhqTszKCcDy2cI="
|
||||
self.root = root
|
||||
self.file = FileProcessor()
|
||||
self.image = ImageProcessor()
|
||||
@@ -40,7 +45,7 @@ class AppController:
|
||||
self.background_color = "#000000"
|
||||
self.image_format = "AUTO"
|
||||
self.image_size = "contain"
|
||||
self.config = ConfigEncryptor(key)
|
||||
self.config = ConfigEncryptor()
|
||||
self.type = None
|
||||
self.destination_path = None
|
||||
self.found_products = None
|
||||
@@ -468,7 +473,6 @@ class AppController:
|
||||
self.apply_canvas_size()
|
||||
self.apply_background_color()
|
||||
self.apply_image_size()
|
||||
key = b"u4xTBY5Ns4WYdLvqMjEr138mpMmDEhhqTszKCcDy2cI="
|
||||
self.config.save_options(self.get_options())
|
||||
self.update_previews()
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@ a = Analysis(
|
||||
pathex=[],
|
||||
binaries=[],
|
||||
datas=[],
|
||||
hiddenimports=[],
|
||||
hiddenimports=['pillow_avif'],
|
||||
hookspath=[],
|
||||
hooksconfig={},
|
||||
runtime_hooks=[],
|
||||
@@ -30,6 +30,7 @@ exe = EXE(
|
||||
upx_exclude=[],
|
||||
runtime_tmpdir=None,
|
||||
console=False,
|
||||
icon='ui/images/image_processor.ico',
|
||||
disable_windowed_traceback=False,
|
||||
argv_emulation=False,
|
||||
target_arch=None,
|
||||
|
||||
50
main.py
50
main.py
@@ -1,20 +1,29 @@
|
||||
"""
|
||||
Main module for the Image Processor application.
|
||||
"""
|
||||
from PIL import Image
|
||||
from PIL import Image, ImageTk
|
||||
import customtkinter as ctk
|
||||
import os
|
||||
import sys
|
||||
from ui.menu import MenuBar # Import the new MenuBar class
|
||||
from ui.log_frame import LogWindow
|
||||
from ui.button_frame import ButtonFrame
|
||||
from ui.frame_info import InfoFrame
|
||||
from ui.settings_tab import SettingsTab
|
||||
from config.decrypt_config import ConfigDecryptor, DECRYPTION_KEY
|
||||
from config.encrypt_config import ConfigEncryptor
|
||||
from controller import AppController
|
||||
|
||||
from ui.preview_frame import PreviewFrame # Import the new PreviewFrame class
|
||||
|
||||
|
||||
def resource_path(relative_path: str) -> str:
|
||||
"""Get absolute path to a resource (dev or PyInstaller)."""
|
||||
try:
|
||||
base_path = sys._MEIPASS # type: ignore[attr-defined]
|
||||
except Exception:
|
||||
base_path = os.path.abspath(".")
|
||||
return os.path.join(base_path, relative_path)
|
||||
|
||||
|
||||
|
||||
class ImageProcessorApp:
|
||||
"""
|
||||
@@ -34,6 +43,23 @@ class ImageProcessorApp:
|
||||
self.root.title("Image Processor")
|
||||
self.root.geometry("553x800")
|
||||
|
||||
# Window/taskbar icon (Windows prefers .ico)
|
||||
try:
|
||||
ico_path = resource_path("ui/images/image_processor.ico")
|
||||
if os.path.exists(ico_path):
|
||||
self.root.iconbitmap(ico_path)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Cross-platform icon (uses PNG)
|
||||
try:
|
||||
png_path = resource_path("ui/images/image_processor.png")
|
||||
if os.path.exists(png_path):
|
||||
self._icon_photo = ImageTk.PhotoImage(Image.open(png_path))
|
||||
self.root.iconphoto(True, self._icon_photo)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Initialize the controller
|
||||
self.controller = AppController(self.root)
|
||||
|
||||
@@ -125,24 +151,6 @@ class ImageProcessorApp:
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
decryptor = ConfigEncryptor(DECRYPTION_KEY)
|
||||
# Load the active credentials
|
||||
config = decryptor.load_credentials()
|
||||
print(config)
|
||||
if config:
|
||||
wc_url = config.get("url", "")
|
||||
wc_consumer_key = config.get("consumer_key", "")
|
||||
wc_consumer_secret = config.get("consumer_secret", "")
|
||||
wp_username = config.get("username", "")
|
||||
wp_password = config.get("password", "")
|
||||
else:
|
||||
print("No active credentials found.")
|
||||
except FileNotFoundError as e:
|
||||
print(f"File not found: {e}")
|
||||
except Exception as e:
|
||||
print(f"An error occurred: {e}")
|
||||
|
||||
root = ctk.CTk()
|
||||
ctk.set_appearance_mode("dark")
|
||||
ctk.set_default_color_theme("blue")
|
||||
|
||||
14
main.spec
14
main.spec
@@ -3,8 +3,12 @@ import glob
|
||||
import os
|
||||
# -*- mode: python ; coding: utf-8 -*-
|
||||
|
||||
# Collect all PNG and JPG images in the ui/images directory
|
||||
image_files = [(file, "ui/images") for file in glob.glob("ui/images/*.*") if file.endswith(('.png', '.jpg', '.jpeg'))]
|
||||
# Collect UI images/icons in the ui/images directory
|
||||
image_files = [
|
||||
(file, "ui/images")
|
||||
for file in glob.glob("ui/images/*.*")
|
||||
if file.lower().endswith((".png", ".jpg", ".jpeg", ".ico"))
|
||||
]
|
||||
|
||||
block_cipher = None
|
||||
|
||||
@@ -14,7 +18,7 @@ a = Analysis(
|
||||
pathex=[],
|
||||
binaries=[],
|
||||
datas=image_files,
|
||||
hiddenimports=[],
|
||||
hiddenimports=['pillow_avif'],
|
||||
hookspath=[],
|
||||
hooksconfig={},
|
||||
runtime_hooks=[],
|
||||
@@ -33,7 +37,7 @@ exe = EXE(
|
||||
a.zipfiles,
|
||||
a.datas,
|
||||
[],
|
||||
name='main',
|
||||
name='image_processor',
|
||||
debug=False,
|
||||
bootloader_ignore_signals=False,
|
||||
strip=False,
|
||||
@@ -41,6 +45,8 @@ exe = EXE(
|
||||
upx_exclude=[],
|
||||
runtime_tmpdir=None,
|
||||
|
||||
icon='ui/images/image_processor.ico',
|
||||
|
||||
disable_windowed_traceback=False,
|
||||
argv_emulation=False,
|
||||
target_arch=None,
|
||||
|
||||
20
readme.md
20
readme.md
@@ -12,6 +12,12 @@ Prerequisites
|
||||
|
||||
pip install pillow
|
||||
|
||||
Optional (AVIF input): Install the Pillow AVIF plugin so previews and conversions can open `.avif` files:
|
||||
|
||||
sh
|
||||
|
||||
pip install pillow-avif-plugin
|
||||
|
||||
Additional Libraries: Ensure you have any additional libraries your utility functions (file_operations, image_processing) depend on.
|
||||
|
||||
Application Setup
|
||||
@@ -48,6 +54,20 @@ sh
|
||||
|
||||
python main.py
|
||||
|
||||
Configuration & Credentials Storage
|
||||
|
||||
This app now stores data per-user (recommended for desktop apps):
|
||||
|
||||
- Options are stored as JSON under your user config directory (Windows example):
|
||||
- `%LOCALAPPDATA%\images_py\Image Processor\options.json`
|
||||
- Credentials are stored in the OS keychain (Windows Credential Manager) under the service name:
|
||||
- `images_py.image_processor`
|
||||
|
||||
Legacy migration:
|
||||
|
||||
- If a legacy `config.enc` is present in the project folder (or next to the exe), it is decrypted using the old key,
|
||||
migrated into the per-user storage, and the legacy `config.enc` is removed.
|
||||
|
||||
Creating an Executable
|
||||
|
||||
To create an executable for this application, you can use pyinstaller. Follow the steps below:
|
||||
|
||||
22
requirements.txt
Normal file
22
requirements.txt
Normal file
@@ -0,0 +1,22 @@
|
||||
certifi==2026.1.4
|
||||
cffi==2.0.0
|
||||
charset-normalizer==3.4.4
|
||||
cryptography==46.0.3
|
||||
customtkinter==5.2.2
|
||||
darkdetect==0.8.0
|
||||
idna==3.11
|
||||
jaraco.classes==3.4.0
|
||||
jaraco.context==6.1.0
|
||||
jaraco.functools==4.4.0
|
||||
keyring==25.7.0
|
||||
more-itertools==10.8.0
|
||||
packaging==26.0
|
||||
pillow==12.1.0
|
||||
pillow-avif-plugin==1.5.5
|
||||
platformdirs==4.5.1
|
||||
pycparser==3.0
|
||||
pywin32-ctypes==0.2.3
|
||||
requests==2.32.5
|
||||
urllib3==2.6.3
|
||||
Wand==0.6.13
|
||||
WooCommerce==3.0.0
|
||||
BIN
ui/images/image_processor.ico
Normal file
BIN
ui/images/image_processor.ico
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 18 KiB |
BIN
ui/images/image_processor.png
Normal file
BIN
ui/images/image_processor.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 2.7 KiB |
@@ -1,14 +1,18 @@
|
||||
import customtkinter as ctk
|
||||
from api.woocommerce_api import (
|
||||
load_credentials,
|
||||
save_active_credential_set,
|
||||
)
|
||||
from config.encrypt_config import ConfigEncryptor
|
||||
from tkinter import messagebox
|
||||
from PIL import Image, ImageTk
|
||||
|
||||
import threading
|
||||
import webbrowser
|
||||
|
||||
from utils.update_checker import UpdateCheckError, check_for_update, get_current_version
|
||||
|
||||
import os
|
||||
import sys
|
||||
KEY = b"u4xTBY5Ns4WYdLvqMjEr138mpMmDEhhqTszKCcDy2cI="
|
||||
def resource_path(relative_path):
|
||||
""" Get the absolute path to a resource, whether we're running in development or a PyInstaller package. """
|
||||
try:
|
||||
@@ -24,7 +28,7 @@ class SettingsTab:
|
||||
self.tab = ctk.CTkFrame(tab_parent)
|
||||
self.tab.grid(row=0, column=0, sticky="nsew")
|
||||
# Initialize an instance of ConfigEncryptor
|
||||
self.config_encryptor = ConfigEncryptor(KEY) # Ensure you pass any required arguments in the constructor if necessary
|
||||
self.config_encryptor = ConfigEncryptor() # per-user storage + legacy migration
|
||||
config = self.config_encryptor.load_config()
|
||||
self.credentials_list = []
|
||||
if config:
|
||||
@@ -119,6 +123,59 @@ class SettingsTab:
|
||||
)
|
||||
delete_button.grid(row=7, column=2, columnspan=1, pady=10)
|
||||
|
||||
# --- App updates ---
|
||||
self._current_version = get_current_version()
|
||||
self._version_var = ctk.StringVar(value=f"Version: {self._current_version}")
|
||||
self._update_status_var = ctk.StringVar(value="")
|
||||
|
||||
version_label = ctk.CTkLabel(self.tab, textvariable=self._version_var)
|
||||
version_label.grid(row=8, column=0, columnspan=2, padx=5, pady=(15, 5), sticky="w")
|
||||
|
||||
check_update_button = ctk.CTkButton(
|
||||
self.tab,
|
||||
width=140,
|
||||
text="Check updates",
|
||||
command=self.check_updates,
|
||||
)
|
||||
check_update_button.grid(row=8, column=2, columnspan=2, padx=5, pady=(15, 5), sticky="w")
|
||||
|
||||
update_status_label = ctk.CTkLabel(self.tab, textvariable=self._update_status_var)
|
||||
update_status_label.grid(row=9, column=0, columnspan=4, padx=5, pady=(0, 10), sticky="w")
|
||||
|
||||
def check_updates(self):
|
||||
self._update_status_var.set("Checking GitHub for updates...")
|
||||
threading.Thread(target=self._check_updates_worker, daemon=True).start()
|
||||
|
||||
def _check_updates_worker(self):
|
||||
try:
|
||||
info = check_for_update("SitiWeb", "images_py", current_version=self._current_version)
|
||||
except (UpdateCheckError, Exception) as exc:
|
||||
self.tab.after(0, lambda: self._on_update_check_failed(str(exc)))
|
||||
return
|
||||
|
||||
self.tab.after(0, lambda: self._on_update_check_complete(info))
|
||||
|
||||
def _on_update_check_failed(self, error_message: str):
|
||||
self._update_status_var.set("Update check failed.")
|
||||
messagebox.showerror("Update check failed", error_message)
|
||||
|
||||
def _on_update_check_complete(self, info):
|
||||
self._update_status_var.set(f"Latest: {info.latest_tag} (current: {info.current_version})")
|
||||
|
||||
if not info.update_available:
|
||||
messagebox.showinfo(
|
||||
"No updates",
|
||||
f"You're up to date.\n\nCurrent: {info.current_version}\nLatest: {info.latest_tag}",
|
||||
)
|
||||
return
|
||||
|
||||
open_release = messagebox.askyesno(
|
||||
"Update available",
|
||||
f"A newer version is available.\n\nCurrent: {info.current_version}\nLatest: {info.latest_tag}\n\nOpen the release page?",
|
||||
)
|
||||
if open_release and info.html_url:
|
||||
webbrowser.open(info.html_url)
|
||||
|
||||
def create_credentials_form(self, credentials, row_index):
|
||||
settings_options = {
|
||||
"url": {
|
||||
@@ -187,13 +244,15 @@ class SettingsTab:
|
||||
"active": True,
|
||||
}
|
||||
|
||||
ConfigEncryptor(KEY).save_credentials(credentials)
|
||||
save_active_credential_set(credentials["name"])
|
||||
ConfigEncryptor().save_credentials(credentials)
|
||||
|
||||
self.credentials_list.append(credentials)
|
||||
# Reload from storage to avoid duplicates and to reflect the active flag updates.
|
||||
config = ConfigEncryptor().load_config() or {}
|
||||
self.credentials_list = config.get("credentials", []) or []
|
||||
self.credential_dropdown.configure(
|
||||
values=[cred.get("nice_name", "Unnamed Credential") for cred in self.credentials_list]
|
||||
)
|
||||
self.credential_var.set(credentials.get("nice_name", "Default"))
|
||||
|
||||
def add_new_credential_set(self):
|
||||
self.active_credential_set = {
|
||||
@@ -218,7 +277,7 @@ class SettingsTab:
|
||||
]
|
||||
|
||||
# Save updated credentials list to storage
|
||||
ConfigEncryptor(KEY).delete_credentials(selected_name)
|
||||
ConfigEncryptor().delete_credentials(selected_name)
|
||||
|
||||
# Update the dropdown and form after deletion
|
||||
if self.credentials_list:
|
||||
|
||||
@@ -1,7 +1,13 @@
|
||||
import os
|
||||
import tempfile
|
||||
from wand.image import Image
|
||||
from wand.color import Color
|
||||
|
||||
try:
|
||||
from PIL import Image as PILImage
|
||||
except Exception: # Pillow is also used elsewhere; keep this optional here.
|
||||
PILImage = None
|
||||
|
||||
class ImageProcessor:
|
||||
def __init__(self, canvas_width=900, canvas_height=900, background_color="transparent", image_size="fit"):
|
||||
"""
|
||||
@@ -48,7 +54,20 @@ class ImageProcessor:
|
||||
image_path = os.path.normpath(image_path)
|
||||
output_path = os.path.normpath(output_path)
|
||||
|
||||
with Image(filename=image_path) as img:
|
||||
converted_tmp_path = None
|
||||
img = None
|
||||
try:
|
||||
try:
|
||||
img = Image(filename=image_path)
|
||||
except Exception as e:
|
||||
# Wand/ImageMagick AVIF support depends on the installed ImageMagick build.
|
||||
# If it can't read AVIF, fall back to Pillow (+ pillow-avif-plugin) and convert to PNG.
|
||||
if os.path.splitext(image_path)[1].lower() != ".avif":
|
||||
raise
|
||||
converted_tmp_path = self._convert_avif_to_temp_png(image_path, log)
|
||||
img = Image(filename=converted_tmp_path)
|
||||
self.log_message(f"Opened AVIF via Pillow fallback: {image_path}", log)
|
||||
|
||||
self.log_message(f"Original image size: {img.width}x{img.height}", log)
|
||||
if self.image_size == "contain":
|
||||
self._contain(img)
|
||||
@@ -71,6 +90,41 @@ class ImageProcessor:
|
||||
# Save the image to the final output path
|
||||
canvas.save(filename=final_output_path)
|
||||
self.log_message(f"Saved to: {final_output_path}", log)
|
||||
finally:
|
||||
try:
|
||||
if img is not None:
|
||||
img.close()
|
||||
finally:
|
||||
if converted_tmp_path and os.path.exists(converted_tmp_path):
|
||||
try:
|
||||
os.remove(converted_tmp_path)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def _convert_avif_to_temp_png(self, image_path, log=None):
|
||||
if PILImage is None:
|
||||
raise RuntimeError(
|
||||
"AVIF input requires Pillow. Install Pillow + pillow-avif-plugin to enable AVIF decoding."
|
||||
)
|
||||
|
||||
try:
|
||||
import pillow_avif # type: ignore
|
||||
except Exception:
|
||||
raise RuntimeError(
|
||||
"AVIF input requires the optional dependency 'pillow-avif-plugin'. "
|
||||
"Install it with: pip install pillow-avif-plugin"
|
||||
)
|
||||
|
||||
with PILImage.open(image_path) as im:
|
||||
# Preserve alpha if present; Wand will composite onto the selected background.
|
||||
if im.mode not in ("RGB", "RGBA"):
|
||||
im = im.convert("RGBA")
|
||||
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".png")
|
||||
tmp.close()
|
||||
im.save(tmp.name, format="PNG")
|
||||
self.log_message(f"Converted AVIF to temporary PNG: {tmp.name}", log)
|
||||
return tmp.name
|
||||
|
||||
|
||||
def _cover(self, img:Image):
|
||||
|
||||
127
utils/update_checker.py
Normal file
127
utils/update_checker.py
Normal file
@@ -0,0 +1,127 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
import requests
|
||||
from packaging.version import InvalidVersion, Version
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class UpdateInfo:
|
||||
current_version: str
|
||||
latest_version: str
|
||||
latest_tag: str
|
||||
html_url: Optional[str]
|
||||
|
||||
@property
|
||||
def update_available(self) -> bool:
|
||||
try:
|
||||
return Version(self.latest_version) > Version(self.current_version)
|
||||
except InvalidVersion:
|
||||
return False
|
||||
|
||||
|
||||
class UpdateCheckError(RuntimeError):
|
||||
pass
|
||||
|
||||
|
||||
def _normalize_tag_to_version(tag: str) -> str:
|
||||
tag = (tag or "").strip()
|
||||
if tag.lower().startswith("v"):
|
||||
tag = tag[1:]
|
||||
return tag
|
||||
|
||||
|
||||
def get_current_version() -> str:
|
||||
# Prefer an explicit env override (useful for ad-hoc builds)
|
||||
import os
|
||||
|
||||
env_ver = os.getenv("IMAGE_PROCESSOR_VERSION")
|
||||
if env_ver:
|
||||
return env_ver.strip()
|
||||
|
||||
# Prefer version.py in repo / bundled app
|
||||
try:
|
||||
from version import __version__ # type: ignore
|
||||
|
||||
return str(__version__).strip()
|
||||
except Exception:
|
||||
return "0.0.0.dev0"
|
||||
|
||||
|
||||
def _github_get_json(url: str, timeout_seconds: float = 10.0) -> dict:
|
||||
headers = {
|
||||
"Accept": "application/vnd.github+json",
|
||||
"User-Agent": "images_py-update-checker",
|
||||
}
|
||||
|
||||
try:
|
||||
response = requests.get(url, headers=headers, timeout=timeout_seconds)
|
||||
except requests.RequestException as exc:
|
||||
raise UpdateCheckError(f"GitHub request failed: {exc}") from exc
|
||||
|
||||
# Rate-limit or forbidden
|
||||
if response.status_code == 403:
|
||||
remaining = response.headers.get("X-RateLimit-Remaining")
|
||||
if remaining == "0":
|
||||
raise UpdateCheckError("GitHub API rate limit reached. Try again later.")
|
||||
|
||||
if response.status_code >= 400:
|
||||
raise UpdateCheckError(f"GitHub API error {response.status_code}: {response.text[:200]}")
|
||||
|
||||
try:
|
||||
return response.json()
|
||||
except ValueError as exc:
|
||||
raise UpdateCheckError("GitHub API returned invalid JSON") from exc
|
||||
|
||||
|
||||
def get_latest_github_release(owner: str, repo: str) -> tuple[str, str, Optional[str]]:
|
||||
"""Returns (latest_version, latest_tag, html_url).
|
||||
|
||||
Uses releases/latest first; falls back to tags if no releases exist.
|
||||
"""
|
||||
|
||||
releases_url = f"https://api.github.com/repos/{owner}/{repo}/releases/latest"
|
||||
try:
|
||||
payload = _github_get_json(releases_url)
|
||||
tag = str(payload.get("tag_name") or "").strip()
|
||||
html_url = payload.get("html_url")
|
||||
version = _normalize_tag_to_version(tag)
|
||||
if version:
|
||||
return version, tag, html_url
|
||||
except UpdateCheckError:
|
||||
# fall back to tags
|
||||
pass
|
||||
|
||||
tags_url = f"https://api.github.com/repos/{owner}/{repo}/tags?per_page=1"
|
||||
payload = _github_get_json(tags_url)
|
||||
if not isinstance(payload, list) or not payload:
|
||||
raise UpdateCheckError("No releases or tags found on GitHub")
|
||||
|
||||
tag = str(payload[0].get("name") or "").strip()
|
||||
version = _normalize_tag_to_version(tag)
|
||||
if not version:
|
||||
raise UpdateCheckError("Latest GitHub tag is missing a version")
|
||||
|
||||
# Tags endpoint does not include a nice html_url for a release.
|
||||
return version, tag, f"https://github.com/{owner}/{repo}/releases/tag/{tag}"
|
||||
|
||||
|
||||
def check_for_update(owner: str, repo: str, current_version: Optional[str] = None) -> UpdateInfo:
|
||||
current = (current_version or get_current_version()).strip()
|
||||
latest_version, latest_tag, html_url = get_latest_github_release(owner, repo)
|
||||
|
||||
# Validate versions for comparison; if invalid, still return info.
|
||||
try:
|
||||
Version(current)
|
||||
Version(latest_version)
|
||||
except InvalidVersion:
|
||||
pass
|
||||
|
||||
return UpdateInfo(
|
||||
current_version=current,
|
||||
latest_version=latest_version,
|
||||
latest_tag=latest_tag,
|
||||
html_url=html_url,
|
||||
)
|
||||
7
version.py
Normal file
7
version.py
Normal file
@@ -0,0 +1,7 @@
|
||||
"""Application version.
|
||||
|
||||
This value is used by the Settings update checker.
|
||||
GitHub Actions overwrites this file for tagged builds.
|
||||
"""
|
||||
|
||||
__version__ = "v1.3.5"
|
||||
Reference in New Issue
Block a user