Update files
This commit is contained in:
39
-.pre-commit-config.yaml
Normal file
39
-.pre-commit-config.yaml
Normal file
@@ -0,0 +1,39 @@
|
||||
# .pre-commit-config.yaml
|
||||
|
||||
repos:
|
||||
- repo: https://github.com/pre-commit/pre-commit-hooks
|
||||
rev: v3.4.0 # Use the latest version
|
||||
hooks:
|
||||
- id: trailing-whitespace
|
||||
- id: end-of-file-fixer
|
||||
- id: check-yaml
|
||||
- id: check-json
|
||||
- id: check-added-large-files
|
||||
|
||||
- repo: https://github.com/psf/black
|
||||
rev: 24.4.2 # Use the latest version
|
||||
hooks:
|
||||
- id: black
|
||||
- repo: https://github.com/PyCQA/autoflake
|
||||
rev: v2.3.1
|
||||
hooks:
|
||||
|
||||
- id: autoflake
|
||||
args: [--remove-all-unused-imports, --remove-unused-variables]
|
||||
- repo: https://github.com/hhatto/autopep8
|
||||
rev: v2.3.1 # select the tag or revision you want, or run `pre-commit autoupdate`
|
||||
hooks:
|
||||
- id: autopep8
|
||||
- repo: local
|
||||
hooks:
|
||||
- id: pylint
|
||||
name: pylint
|
||||
entry: pylint
|
||||
language: system
|
||||
types: [python]
|
||||
require_serial: true
|
||||
args:
|
||||
[
|
||||
"-rn", # Only display messages
|
||||
"-sn", # Don't display the score
|
||||
]
|
||||
@@ -1,202 +0,0 @@
|
||||
from cryptography.fernet import Fernet
|
||||
import json
|
||||
import os
|
||||
import requests
|
||||
import base64
|
||||
from woocommerce import API
|
||||
from tkinter import messagebox
|
||||
import tempfile
|
||||
from utils.image_processing import resize_image
|
||||
import pprint
|
||||
credentials_file = 'credentials.json'
|
||||
|
||||
# Hardcoded key (replace with your generated key)
|
||||
key = b'u4xTBY5Ns4WYdLvqMjEr138mpMmDEhhqTszKCcDy2cI='
|
||||
|
||||
def save_credentials(url, consumer_key, consumer_secret, username, password):
|
||||
credentials = {
|
||||
'url': url,
|
||||
'consumer_key': consumer_key,
|
||||
'consumer_secret': consumer_secret,
|
||||
'username': username,
|
||||
'password': password
|
||||
}
|
||||
credentials_str = json.dumps(credentials)
|
||||
fernet = Fernet(key)
|
||||
encrypted = fernet.encrypt(credentials_str.encode())
|
||||
with open('config.enc', 'wb') as file:
|
||||
file.write(encrypted)
|
||||
|
||||
def load_credentials():
|
||||
if not os.path.exists('config.enc'):
|
||||
return None
|
||||
fernet = Fernet(key)
|
||||
with open('config.enc', 'rb') as file:
|
||||
encrypted = file.read()
|
||||
decrypted = fernet.decrypt(encrypted).decode()
|
||||
return json.loads(decrypted)
|
||||
|
||||
|
||||
def get_wcapi():
|
||||
credentials = load_credentials()
|
||||
if not credentials:
|
||||
messagebox.showerror("Error", "No WooCommerce credentials found. Please set them in the settings.")
|
||||
return None
|
||||
return API(
|
||||
url=credentials['url'],
|
||||
consumer_key=credentials['consumer_key'],
|
||||
consumer_secret=credentials['consumer_secret'],
|
||||
version="wc/v3"
|
||||
)
|
||||
|
||||
def get_product(id):
|
||||
wcapi = get_wcapi()
|
||||
if not wcapi:
|
||||
return None
|
||||
result = wcapi.get("products/"+str(id))
|
||||
|
||||
image_paths = {}
|
||||
product = result.json()
|
||||
if product.get('images'):
|
||||
images = product.get('images')
|
||||
|
||||
if not os.path.exists('temp'):
|
||||
os.makedirs('temp')
|
||||
|
||||
for index, image in enumerate(images):
|
||||
image_url = image.get('src')
|
||||
image_id = image.get('id')
|
||||
response = requests.get(image_url)
|
||||
if response.status_code == 200:
|
||||
file_name = image_url.split('/')[-1]
|
||||
file_path = os.path.join('temp', file_name)
|
||||
image_paths[image_id] = file_path
|
||||
with open(file_path, 'wb') as file:
|
||||
file.write(response.content)
|
||||
print(f"Image {index + 1}/{len(images)} downloaded and saved: {file_path}")
|
||||
else:
|
||||
print(f"Failed to download image {index + 1}/{len(images)}")
|
||||
else:
|
||||
if product.get('name'):
|
||||
print(f"No images found for {product.get('name')}")
|
||||
else:
|
||||
print("No images found")
|
||||
return image_paths, product
|
||||
|
||||
def upload_image(imgPath):
|
||||
data = open(imgPath, 'rb').read()
|
||||
fileName = os.path.basename(imgPath)
|
||||
credentials = load_credentials()
|
||||
if not credentials:
|
||||
messagebox.showerror("Error", "No WordPress credentials found. Please set them in the settings.")
|
||||
return None
|
||||
|
||||
username = credentials['username']
|
||||
password = credentials['password']
|
||||
credentials_base64 = base64.b64encode(f"{username}:{password}".encode())
|
||||
credentials_base64 = base64.b64encode(f"{username}:{password}".encode())
|
||||
url = f"{credentials['url']}/wp-json/wp/v2/media"
|
||||
headers = {
|
||||
'Content-Type': 'image/jpg',
|
||||
'Content-Disposition': f'attachment; filename={fileName}',
|
||||
'Authorization': f'basic {credentials_base64.decode()}'
|
||||
}
|
||||
|
||||
try:
|
||||
res = requests.post(url=url, data=data, headers=headers)
|
||||
res.raise_for_status() # Raise an HTTPError if the HTTP request returned an unsuccessful status code
|
||||
newDict = res.json()
|
||||
newID = newDict.get('id')
|
||||
link = newDict.get('guid').get("rendered") if newDict.get('guid') else None
|
||||
print(newID, link)
|
||||
return newID if newID else False
|
||||
except requests.exceptions.RequestException as e:
|
||||
print(f"Error uploading image: {e}")
|
||||
return False
|
||||
|
||||
def delete_img(image_id):
|
||||
credentials = load_credentials()
|
||||
if not credentials:
|
||||
messagebox.showerror("Error", "No WordPress credentials found. Please set them in the settings.")
|
||||
return None
|
||||
|
||||
url = f"{credentials['url']}/wp-json/wp/v2/media/{image_id}"
|
||||
username = credentials['username']
|
||||
password = credentials['password']
|
||||
credentials_base64 = base64.b64encode(f"{username}:{password}".encode())
|
||||
|
||||
res = requests.delete(url=url,
|
||||
headers={'Authorization': f'basic {credentials_base64.decode()}'},
|
||||
params={'force': 'true'})
|
||||
|
||||
if res.status_code == 200:
|
||||
print(f"Image with ID {image_id} deleted successfully.")
|
||||
else:
|
||||
print(f"Failed to delete image with ID {image_id}. Error: {res.text}")
|
||||
|
||||
def update_product(image_ids, old_image_ids, product_id):
|
||||
wcapi = get_wcapi()
|
||||
if not wcapi:
|
||||
return
|
||||
|
||||
product = wcapi.get(f"products/{product_id}").json()
|
||||
product['images'] = [{'id': image_id} for image_id in image_ids]
|
||||
response = wcapi.put(f"products/{product_id}", data=product)
|
||||
if response.status_code == 200:
|
||||
print(f"Product with ID {product_id} updated successfully with new image IDs.")
|
||||
else:
|
||||
print(f"Failed to update product with ID {product_id}. Error: {response.text}")
|
||||
|
||||
def process_product_images(id, name_template, canvas_width, canvas_height):
|
||||
print(name_template)
|
||||
image_paths, product = get_product(id)
|
||||
if not image_paths:
|
||||
return
|
||||
|
||||
with tempfile.TemporaryDirectory() as temp_output_directory:
|
||||
print(f"Using temporary directory: {temp_output_directory}")
|
||||
|
||||
old_list = []
|
||||
new_list = []
|
||||
|
||||
for image_id, file_path in image_paths.items():
|
||||
output_path = generate_output_path(temp_output_directory, file_path, name_template, product, canvas_width, canvas_height)
|
||||
resize_image(file_path, output_path, '')
|
||||
new_id = upload_image(output_path)
|
||||
if new_id:
|
||||
old_list.append(image_id)
|
||||
new_list.append(new_id)
|
||||
|
||||
update_product(new_list, old_list, id)
|
||||
print("Temporary files processed and uploaded successfully.")
|
||||
|
||||
def generate_output_path(temp_output_directory, file_path, template, product, canvas_width, canvas_height):
|
||||
# Generate the new filename based on the template
|
||||
name, ext = os.path.splitext(os.path.basename(file_path))
|
||||
width = canvas_width
|
||||
height = canvas_height
|
||||
sku = product.get('sku', '')
|
||||
slug = product.get('name', '')
|
||||
title = product.get('slug', '')
|
||||
pprint.pprint(product)
|
||||
# Here you can add more attributes to the template if needed
|
||||
new_filename = template.format(name=name, sku=sku, width=width, height=height, slug=slug, title=title)
|
||||
return os.path.join(temp_output_directory, new_filename + ext)
|
||||
|
||||
def process_all_products(name_template):
|
||||
wcapi = get_wcapi()
|
||||
if not wcapi:
|
||||
return
|
||||
|
||||
page = 1
|
||||
while True:
|
||||
products = wcapi.get("products", params={"per_page": 100, "page": page}).json()
|
||||
if not products:
|
||||
break
|
||||
|
||||
for product in products:
|
||||
process_product_images(product['id'], name_template)
|
||||
|
||||
page += 1
|
||||
|
||||
messagebox.showinfo("Process Complete", "All product images processing is complete.")
|
||||
332
api/woocommerce_api.py
Normal file
332
api/woocommerce_api.py
Normal file
@@ -0,0 +1,332 @@
|
||||
"""
|
||||
Module for WooCommerce API interactions and image processing.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import base64
|
||||
import tempfile
|
||||
import pprint
|
||||
from tkinter import messagebox
|
||||
from cryptography.fernet import Fernet
|
||||
import requests
|
||||
from woocommerce import API
|
||||
from utils.image_processing import ImageProcessor
|
||||
|
||||
CREDENTIALS_FILE = "credentials.json"
|
||||
|
||||
# Hardcoded key (replace with your generated key)
|
||||
KEY = b"u4xTBY5Ns4WYdLvqMjEr138mpMmDEhhqTszKCcDy2cI="
|
||||
|
||||
|
||||
def save_credentials(url, consumer_key, consumer_secret, username, password):
|
||||
"""
|
||||
Save WooCommerce and WordPress credentials to an encrypted file.
|
||||
|
||||
Args:
|
||||
url (str): The base URL for the WooCommerce store.
|
||||
consumer_key (str): The consumer key for WooCommerce API.
|
||||
consumer_secret (str): The consumer secret for WooCommerce API.
|
||||
username (str): The username for WordPress.
|
||||
password (str): The password for WordPress.
|
||||
"""
|
||||
credentials = {
|
||||
"url": url,
|
||||
"consumer_key": consumer_key,
|
||||
"consumer_secret": consumer_secret,
|
||||
"username": username,
|
||||
"password": password,
|
||||
}
|
||||
credentials_str = json.dumps(credentials)
|
||||
fernet = Fernet(KEY)
|
||||
encrypted = fernet.encrypt(credentials_str.encode())
|
||||
with open("config.enc", "wb") as file:
|
||||
file.write(encrypted)
|
||||
|
||||
|
||||
def load_credentials():
|
||||
"""
|
||||
Load WooCommerce and WordPress credentials from an encrypted file.
|
||||
|
||||
Returns:
|
||||
dict: The decrypted credentials, or None if the file does not exist.
|
||||
"""
|
||||
if not os.path.exists("config.enc"):
|
||||
return None
|
||||
fernet = Fernet(KEY)
|
||||
with open("config.enc", "rb") as file:
|
||||
encrypted = file.read()
|
||||
decrypted = fernet.decrypt(encrypted).decode()
|
||||
return json.loads(decrypted)
|
||||
|
||||
|
||||
def get_wcapi():
|
||||
"""
|
||||
Get a WooCommerce API client instance.
|
||||
|
||||
Returns:
|
||||
woocommerce.API: The WooCommerce API client instance, or None if credentials are missing.
|
||||
"""
|
||||
credentials = load_credentials()
|
||||
if not credentials:
|
||||
messagebox.showerror(
|
||||
"Error",
|
||||
"No WooCommerce credentials found. Please set them in the settings.",
|
||||
)
|
||||
return None
|
||||
return API(
|
||||
url=credentials["url"],
|
||||
consumer_key=credentials["consumer_key"],
|
||||
consumer_secret=credentials["consumer_secret"],
|
||||
version="wc/v3",
|
||||
)
|
||||
|
||||
|
||||
def get_product(product_id):
|
||||
"""
|
||||
Get a WooCommerce product and download its images.
|
||||
|
||||
Args:
|
||||
product_id (int): The ID of the WooCommerce product.
|
||||
|
||||
Returns:
|
||||
tuple: A dictionary of image paths and the product data.
|
||||
"""
|
||||
wcapi = get_wcapi()
|
||||
if not wcapi:
|
||||
return None
|
||||
result = wcapi.get(f"products/{product_id}")
|
||||
|
||||
image_paths = {}
|
||||
product = result.json()
|
||||
if product.get("images"):
|
||||
images = product.get("images")
|
||||
|
||||
if not os.path.exists("temp"):
|
||||
os.makedirs("temp")
|
||||
|
||||
for index, image in enumerate(images):
|
||||
image_url = image.get("src")
|
||||
image_id = image.get("id")
|
||||
response = requests.get(image_url, timeout=10)
|
||||
if response.status_code == 200:
|
||||
file_name = image_url.split("/")[-1]
|
||||
file_path = os.path.join("temp", file_name)
|
||||
image_paths[image_id] = file_path
|
||||
with open(file_path, "wb") as file:
|
||||
file.write(response.content)
|
||||
print(
|
||||
f"Image {index + 1}/{len(images)} downloaded and saved: {file_path}"
|
||||
)
|
||||
else:
|
||||
print(f"Failed to download image {index + 1}/{len(images)}")
|
||||
else:
|
||||
if product.get("name"):
|
||||
print(f"No images found for {product.get('name')}")
|
||||
else:
|
||||
print("No images found")
|
||||
return image_paths, product
|
||||
|
||||
|
||||
def upload_image(img_path):
|
||||
"""
|
||||
Upload an image to WordPress.
|
||||
|
||||
Args:
|
||||
img_path (str): The path to the image file.
|
||||
|
||||
Returns:
|
||||
int: The ID of the uploaded image, or False if the upload failed.
|
||||
"""
|
||||
with open(img_path, "rb") as img_file:
|
||||
data = img_file.read()
|
||||
file_name = os.path.basename(img_path)
|
||||
credentials = load_credentials()
|
||||
if not credentials:
|
||||
messagebox.showerror(
|
||||
"Error", "No WordPress credentials found. Please set them in the settings."
|
||||
)
|
||||
return None
|
||||
|
||||
username = credentials["username"]
|
||||
password = credentials["password"]
|
||||
credentials_base64 = base64.b64encode(f"{username}:{password}".encode())
|
||||
url = f"{credentials['url']}/wp-json/wp/v2/media"
|
||||
headers = {
|
||||
"Content-Type": "image/jpg",
|
||||
"Content-Disposition": f"attachment; filename={file_name}",
|
||||
"Authorization": f"basic {credentials_base64.decode()}",
|
||||
}
|
||||
|
||||
try:
|
||||
res = requests.post(url=url, data=data, headers=headers, timeout=10)
|
||||
res.raise_for_status()
|
||||
response_dict = res.json()
|
||||
new_id = response_dict.get("id")
|
||||
link = (
|
||||
response_dict.get("guid").get("rendered")
|
||||
if response_dict.get("guid")
|
||||
else None
|
||||
)
|
||||
print(new_id, link)
|
||||
return new_id if new_id else False
|
||||
except requests.exceptions.RequestException as e:
|
||||
print(f"Error uploading image: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def delete_img(image_id):
|
||||
"""
|
||||
Delete an image from WordPress.
|
||||
|
||||
Args:
|
||||
image_id (int): The ID of the image to delete.
|
||||
"""
|
||||
credentials = load_credentials()
|
||||
if not credentials:
|
||||
messagebox.showerror(
|
||||
"Error", "No WordPress credentials found. Please set them in the settings."
|
||||
)
|
||||
return None
|
||||
|
||||
url = f"{credentials['url']}/wp-json/wp/v2/media/{image_id}"
|
||||
username = credentials["username"]
|
||||
password = credentials["password"]
|
||||
credentials_base64 = base64.b64encode(f"{username}:{password}".encode())
|
||||
|
||||
res = requests.delete(
|
||||
url=url,
|
||||
headers={"Authorization": f"basic {credentials_base64.decode()}"},
|
||||
params={"force": "true"},
|
||||
timeout=10,
|
||||
)
|
||||
|
||||
if res.status_code == 200:
|
||||
print(f"Image with ID {image_id} deleted successfully.")
|
||||
else:
|
||||
print(f"Failed to delete image with ID {image_id}. Error: {res.text}")
|
||||
|
||||
|
||||
def update_product(image_ids, product_id):
|
||||
"""
|
||||
Update a WooCommerce product with new image IDs.
|
||||
|
||||
Args:
|
||||
image_ids (list): A list of new image IDs.
|
||||
product_id (int): The ID of the WooCommerce product.
|
||||
"""
|
||||
wcapi = get_wcapi()
|
||||
if not wcapi:
|
||||
return
|
||||
|
||||
product = wcapi.get(f"products/{product_id}").json()
|
||||
product["images"] = [{"id": image_id} for image_id in image_ids]
|
||||
response = wcapi.put(f"products/{product_id}", data=product)
|
||||
if response.status_code == 200:
|
||||
print(
|
||||
f"Product with ID {product_id} updated successfully with new image IDs.")
|
||||
else:
|
||||
print(
|
||||
f"Failed to update product with ID {product_id}. Error: {response.text}")
|
||||
|
||||
|
||||
def process_product_images(product_id, name_template, canvas_width, canvas_height):
|
||||
"""
|
||||
Process images for a WooCommerce product by resizing and uploading them.
|
||||
|
||||
Args:
|
||||
product_id (int): The ID of the WooCommerce product.
|
||||
name_template (str): The template for generating image filenames.
|
||||
canvas_width (int): The width of the canvas for resizing images.
|
||||
canvas_height (int): The height of the canvas for resizing images.
|
||||
"""
|
||||
print(name_template)
|
||||
image_paths, product = get_product(product_id)
|
||||
if not image_paths:
|
||||
return
|
||||
|
||||
with tempfile.TemporaryDirectory() as temp_output_directory:
|
||||
print(f"Using temporary directory: {temp_output_directory}")
|
||||
|
||||
old_list = []
|
||||
new_list = []
|
||||
|
||||
for image_id, file_path in image_paths.items():
|
||||
output_path = generate_output_path(
|
||||
temp_output_directory,
|
||||
file_path,
|
||||
name_template,
|
||||
product,
|
||||
canvas_width,
|
||||
canvas_height,
|
||||
)
|
||||
resize_image(file_path, output_path, "")
|
||||
new_id = upload_image(output_path)
|
||||
if new_id:
|
||||
old_list.append(image_id)
|
||||
new_list.append(new_id)
|
||||
|
||||
update_product(new_list, product_id)
|
||||
print("Temporary files processed and uploaded successfully.")
|
||||
|
||||
|
||||
def generate_output_path(
|
||||
temp_output_directory, file_path, template, product, canvas_width, canvas_height
|
||||
):
|
||||
"""
|
||||
Generate the output path for resized images based on a template.
|
||||
|
||||
Args:
|
||||
temp_output_directory (str): The path to the temporary output directory.
|
||||
file_path (str): The original file path.
|
||||
template (str): The template for generating the new filename.
|
||||
product (dict): The WooCommerce product data.
|
||||
canvas_width (int): The width of the canvas for resizing images.
|
||||
canvas_height (int): The height of the canvas for resizing images.
|
||||
|
||||
Returns:
|
||||
str: The generated output path.
|
||||
"""
|
||||
name, ext = os.path.splitext(os.path.basename(file_path))
|
||||
width = canvas_width
|
||||
height = canvas_height
|
||||
sku = product.get("sku", "")
|
||||
slug = product.get("name", "")
|
||||
title = product.get("slug", "")
|
||||
pprint.pprint(product)
|
||||
new_filename = template.format(
|
||||
name=name, sku=sku, width=width, height=height, slug=slug, title=title
|
||||
)
|
||||
return os.path.join(temp_output_directory, new_filename + ext)
|
||||
|
||||
|
||||
def process_all_products(name_template, canvas_width, canvas_height):
|
||||
"""
|
||||
Process images for all WooCommerce products by resizing and uploading them.
|
||||
|
||||
Args:
|
||||
name_template (str): The template for generating image filenames.
|
||||
canvas_width (int): The width of the canvas for resizing images.
|
||||
canvas_height (int): The height of the canvas for resizing images.
|
||||
"""
|
||||
wcapi = get_wcapi()
|
||||
if not wcapi:
|
||||
return
|
||||
|
||||
page = 1
|
||||
while True:
|
||||
products = wcapi.get("products", params={
|
||||
"per_page": 100, "page": page}).json()
|
||||
if not products:
|
||||
break
|
||||
|
||||
for product in products:
|
||||
process_product_images(
|
||||
product["id"], name_template, canvas_width, canvas_height
|
||||
)
|
||||
|
||||
page += 1
|
||||
|
||||
messagebox.showinfo(
|
||||
"Process Complete", "All product images processing is complete."
|
||||
)
|
||||
@@ -1,30 +1,63 @@
|
||||
from cryptography.fernet import Fernet
|
||||
"""
|
||||
Module for decrypting configuration files using Fernet symmetric encryption.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
from cryptography.fernet import Fernet
|
||||
|
||||
|
||||
class ConfigDecryptor:
|
||||
def __init__(self, key):
|
||||
self.key = key
|
||||
"""
|
||||
Class to handle decryption of configuration files.
|
||||
"""
|
||||
|
||||
def __init__(self, decryption_key):
|
||||
"""
|
||||
Initialize the ConfigDecryptor with a given decryption key.
|
||||
|
||||
Args:
|
||||
decryption_key (bytes): The key to use for decryption.
|
||||
"""
|
||||
self.decryption_key = decryption_key
|
||||
|
||||
def decrypt(self):
|
||||
if not os.path.exists("config.enc"):
|
||||
raise FileNotFoundError("The encrypted configuration file 'config.enc' does not exist.")
|
||||
"""
|
||||
Decrypt the 'config.enc' file and return the configuration data.
|
||||
|
||||
fernet = Fernet(self.key)
|
||||
Returns:
|
||||
dict: The decrypted configuration data.
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the 'config.enc' file does not exist.
|
||||
Exception: If any other error occurs during decryption.
|
||||
"""
|
||||
if not os.path.exists("config.enc"):
|
||||
raise FileNotFoundError(
|
||||
"The encrypted configuration file 'config.enc' does not exist."
|
||||
)
|
||||
|
||||
fernet = Fernet(self.decryption_key)
|
||||
with open("config.enc", "rb") as encrypted_file:
|
||||
encrypted = encrypted_file.read()
|
||||
decrypted = fernet.decrypt(encrypted).decode()
|
||||
return json.loads(decrypted)
|
||||
|
||||
def hello_world(self):
|
||||
"""
|
||||
Placeholder
|
||||
"""
|
||||
return "Hello world"
|
||||
|
||||
|
||||
# Define your key here
|
||||
key = b'u4xTBY5Ns4WYdLvqMjEr138mpMmDEhhqTszKCcDy2cI=' # Replace with your actual key
|
||||
# Replace with your actual key
|
||||
DECRYPTION_KEY = b"u4xTBY5Ns4WYdLvqMjEr138mpMmDEhhqTszKCcDy2cI="
|
||||
|
||||
if __name__ == "__main__":
|
||||
key = b'u4xTBY5Ns4WYdLvqMjEr138mpMmDEhhqTszKCcDy2cI=' # Replace with your actual key
|
||||
decryptor = ConfigDecryptor(key)
|
||||
decryptor = ConfigDecryptor(DECRYPTION_KEY)
|
||||
try:
|
||||
config = decryptor.decrypt()
|
||||
print(config)
|
||||
except FileNotFoundError as e:
|
||||
print(e)
|
||||
except Exception as e:
|
||||
print(f"An error occurred: {e}")
|
||||
|
||||
@@ -1,20 +1,45 @@
|
||||
"""
|
||||
Module for encrypting configuration files using Fernet symmetric encryption.
|
||||
"""
|
||||
|
||||
from cryptography.fernet import Fernet
|
||||
|
||||
|
||||
class ConfigEncryptor:
|
||||
"""
|
||||
Class to handle encryption of configuration data.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""
|
||||
Initialize the ConfigEncryptor with a generated encryption key.
|
||||
"""
|
||||
self.key = Fernet.generate_key()
|
||||
|
||||
def encrypt_config(self, data):
|
||||
"""
|
||||
Encrypt the configuration data and save it to 'config.enc'.
|
||||
|
||||
Args:
|
||||
data (str): The configuration data to be encrypted.
|
||||
"""
|
||||
fernet = Fernet(self.key)
|
||||
encrypted = fernet.encrypt(data.encode())
|
||||
with open("config.enc", "wb") as encrypted_file:
|
||||
encrypted_file.write(encrypted)
|
||||
|
||||
def get_key(self):
|
||||
"""
|
||||
Get the generated encryption key.
|
||||
|
||||
Returns:
|
||||
str: The generated encryption key as a string.
|
||||
"""
|
||||
return self.key.decode()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
config_data = """
|
||||
CONFIG_DATA = """
|
||||
{
|
||||
"url": "https://yourstore.com",
|
||||
"consumer_key": "ck_yourconsumerkey",
|
||||
@@ -25,4 +50,4 @@ if __name__ == "__main__":
|
||||
"""
|
||||
encryptor = ConfigEncryptor()
|
||||
print(f"Encryption key: {encryptor.get_key()}")
|
||||
encryptor.encrypt_config(config_data)
|
||||
encryptor.encrypt_config(CONFIG_DATA)
|
||||
|
||||
44
main.py
44
main.py
@@ -1,12 +1,27 @@
|
||||
"""
|
||||
Main module for the Image Processor application.
|
||||
"""
|
||||
|
||||
import tkinter as tk
|
||||
from tkinter import ttk
|
||||
from ui.log_window import LogWindow
|
||||
from ui.local_processing_tab import LocalProcessingTab
|
||||
from ui.settings_tab import SettingsTab
|
||||
from config.decrypt_config import ConfigDecryptor, key
|
||||
from config.decrypt_config import ConfigDecryptor, DECRYPTION_KEY
|
||||
|
||||
|
||||
class ImageProcessorApp:
|
||||
"""
|
||||
Main application class for the Image Processor.
|
||||
"""
|
||||
|
||||
def __init__(self, root):
|
||||
"""
|
||||
Initialize the ImageProcessorApp.
|
||||
|
||||
Args:
|
||||
root (tk.Tk): The root Tkinter window.
|
||||
"""
|
||||
self.root = root
|
||||
self.root.title("Image Processor")
|
||||
self.root.geometry("700x400")
|
||||
@@ -14,33 +29,40 @@ class ImageProcessorApp:
|
||||
self.tab_parent = ttk.Notebook(self.root)
|
||||
self.log_window = None
|
||||
|
||||
self.local_processing_tab = LocalProcessingTab(self.tab_parent, "Local Processing", self.open_log_window)
|
||||
self.local_processing_tab = LocalProcessingTab(
|
||||
self.tab_parent, "Local Processing", self.open_log_window
|
||||
)
|
||||
self.settings_tab = SettingsTab(self.tab_parent, "Settings")
|
||||
|
||||
self.tab_parent.pack(expand=True, fill='both')
|
||||
self.tab_parent.pack(expand=True, fill="both")
|
||||
|
||||
def open_log_window(self):
|
||||
"""
|
||||
Open the log window. If it already exists, bring it to the front.
|
||||
"""
|
||||
if self.log_window is None or not self.log_window.winfo_exists():
|
||||
self.log_window = LogWindow(self.root)
|
||||
else:
|
||||
self.log_window.lift()
|
||||
|
||||
def run(self):
|
||||
"""
|
||||
Run the Tkinter main loop.
|
||||
"""
|
||||
self.root.mainloop()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
decryptor = ConfigDecryptor(key)
|
||||
decryptor = ConfigDecryptor(DECRYPTION_KEY)
|
||||
config = decryptor.decrypt()
|
||||
wc_url = config['url']
|
||||
wc_consumer_key = config['consumer_key']
|
||||
wc_consumer_secret = config['consumer_secret']
|
||||
wp_username = config['username']
|
||||
wp_password = config['password']
|
||||
wc_url = config["url"]
|
||||
wc_consumer_key = config["consumer_key"]
|
||||
wc_consumer_secret = config["consumer_secret"]
|
||||
wp_username = config["username"]
|
||||
wp_password = config["password"]
|
||||
except FileNotFoundError as e:
|
||||
print(f"File not found: {e}")
|
||||
except Exception as e:
|
||||
print(f"An error occurred: {e}")
|
||||
|
||||
root = tk.Tk()
|
||||
app = ImageProcessorApp(root)
|
||||
|
||||
Binary file not shown.
@@ -1,17 +1,18 @@
|
||||
from tkinter import Toplevel, Text
|
||||
|
||||
|
||||
class LogWindow(Toplevel):
|
||||
def __init__(self, master=None, **kwargs):
|
||||
super().__init__(master, **kwargs)
|
||||
self.title("Log Window")
|
||||
self.geometry("500x300")
|
||||
self.text = Text(self)
|
||||
self.text.pack(expand=True, fill='both')
|
||||
self.text.pack(expand=True, fill="both")
|
||||
self.protocol("WM_DELETE_WINDOW", self.hide)
|
||||
|
||||
def log(self, message):
|
||||
self.text.insert('end', message + '\n')
|
||||
self.text.see('end')
|
||||
self.text.insert("end", message + "\n")
|
||||
self.text.see("end")
|
||||
|
||||
def hide(self):
|
||||
self.withdraw()
|
||||
|
||||
131
ui/options_window.py
Normal file
131
ui/options_window.py
Normal file
@@ -0,0 +1,131 @@
|
||||
import tkinter as tk
|
||||
from tkinter import ttk
|
||||
|
||||
|
||||
class OptionsWindow(tk.Toplevel):
|
||||
def __init__(self, parent, apply_callback, current_options):
|
||||
super().__init__(parent)
|
||||
self.title("Options")
|
||||
self.geometry("400x400")
|
||||
|
||||
self.apply_callback = apply_callback
|
||||
self.options = current_options
|
||||
self.inputs = {}
|
||||
|
||||
self.setup_ui()
|
||||
|
||||
def setup_ui(self):
|
||||
"""
|
||||
Set up the UI components.
|
||||
"""
|
||||
self.row_index = 0
|
||||
for name, details in self.options.items():
|
||||
if details["type"] == "number":
|
||||
self.add_number_input(
|
||||
name,
|
||||
details["label"],
|
||||
details["default"],
|
||||
details["min"],
|
||||
details["max"],
|
||||
)
|
||||
elif details["type"] == "text":
|
||||
self.add_text_input(name, details["label"], details["default"])
|
||||
elif details["type"] == "checkbox":
|
||||
self.add_checkbox(name, details["label"], details["default"])
|
||||
|
||||
self.create_apply_button()
|
||||
|
||||
def add_number_input(self, name, label, default, min_val, max_val):
|
||||
"""
|
||||
Add a number input field.
|
||||
|
||||
Args:
|
||||
name (str): The name of the input field.
|
||||
label (str): The label for the input field.
|
||||
default (int): The default value.
|
||||
min_val (int): The minimum value.
|
||||
max_val (int): The maximum value.
|
||||
"""
|
||||
lbl = tk.Label(self, text=label)
|
||||
lbl.grid(row=self.row_index, column=0, padx=5, pady=5, sticky="w")
|
||||
|
||||
entry = tk.Entry(self)
|
||||
entry.insert(0, str(default))
|
||||
entry.grid(row=self.row_index, column=1, padx=5, pady=5, sticky="w")
|
||||
|
||||
self.inputs[name] = {
|
||||
"type": "number",
|
||||
"widget": entry,
|
||||
"min": min_val,
|
||||
"max": max_val,
|
||||
}
|
||||
self.row_index += 1
|
||||
|
||||
def add_text_input(self, name, label, default):
|
||||
"""
|
||||
Add a text input field.
|
||||
|
||||
Args:
|
||||
name (str): The name of the input field.
|
||||
label (str): The label for the input field.
|
||||
default (str): The default value.
|
||||
"""
|
||||
lbl = tk.Label(self, text=label)
|
||||
lbl.grid(row=self.row_index, column=0, padx=5, pady=5, sticky="w")
|
||||
|
||||
entry = tk.Entry(self)
|
||||
entry.insert(0, default)
|
||||
entry.grid(row=self.row_index, column=1, padx=5, pady=5, sticky="w")
|
||||
|
||||
self.inputs[name] = {"type": "text", "widget": entry}
|
||||
self.row_index += 1
|
||||
|
||||
def add_checkbox(self, name, label, default):
|
||||
"""
|
||||
Add a checkbox.
|
||||
|
||||
Args:
|
||||
name (str): The name of the input field.
|
||||
label (str): The label for the input field.
|
||||
default (bool): The default value.
|
||||
"""
|
||||
var = tk.BooleanVar(value=default)
|
||||
chk = tk.Checkbutton(self, text=label, variable=var)
|
||||
chk.grid(row=self.row_index, column=0,
|
||||
columnspan=2, padx=5, pady=5, sticky="w")
|
||||
|
||||
self.inputs[name] = {"type": "checkbox", "variable": var}
|
||||
self.row_index += 1
|
||||
|
||||
def create_apply_button(self):
|
||||
"""
|
||||
Create the apply button.
|
||||
"""
|
||||
apply_button = tk.Button(
|
||||
self, text="Apply", command=self.apply_options)
|
||||
apply_button.grid(row=self.row_index, column=0, columnspan=2, pady=10)
|
||||
|
||||
def apply_options(self):
|
||||
"""
|
||||
Apply the options and call the callback function.
|
||||
"""
|
||||
options = {}
|
||||
for name, details in self.inputs.items():
|
||||
if details["type"] == "number":
|
||||
value = int(details["widget"].get())
|
||||
min_val = details["min"]
|
||||
max_val = details["max"]
|
||||
if min_val <= value <= max_val:
|
||||
options[name] = value
|
||||
else:
|
||||
messagebox.showerror(
|
||||
"Error", f"{name} must be between {min_val} and {max_val}"
|
||||
)
|
||||
return
|
||||
elif details["type"] == "text":
|
||||
options[name] = details["widget"].get()
|
||||
elif details["type"] == "checkbox":
|
||||
options[name] = details["variable"].get()
|
||||
|
||||
self.apply_callback(options)
|
||||
self.destroy()
|
||||
Binary file not shown.
@@ -1,69 +1,150 @@
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
from tkinter import filedialog, messagebox
|
||||
from utils.image_processing import resize_image
|
||||
from utils.image_processing import ImageProcessor
|
||||
|
||||
selected_directory = ""
|
||||
|
||||
def browse_directory():
|
||||
global selected_directory
|
||||
selected_directory = filedialog.askdirectory()
|
||||
return selected_directory
|
||||
class FileProcessor:
|
||||
"""
|
||||
Class to handle file processing operations.
|
||||
"""
|
||||
|
||||
def get_first_image_path():
|
||||
if not selected_directory:
|
||||
def __init__(self):
|
||||
self.selected_directory = ""
|
||||
|
||||
def browse_directory(self):
|
||||
"""
|
||||
Open a dialog to select a directory.
|
||||
|
||||
Returns:
|
||||
str: The selected directory path.
|
||||
"""
|
||||
self.selected_directory = filedialog.askdirectory()
|
||||
return self.selected_directory
|
||||
|
||||
def get_first_image_path(self):
|
||||
"""
|
||||
Get the path of the first image in the selected directory.
|
||||
|
||||
Returns:
|
||||
str: The path to the first image, or None if no images found.
|
||||
"""
|
||||
if not self.selected_directory:
|
||||
return None
|
||||
|
||||
for root, dirs, files in os.walk(selected_directory):
|
||||
if 'ProcessedImages' in dirs:
|
||||
dirs.remove('ProcessedImages')
|
||||
for root, dirs, files in os.walk(self.selected_directory):
|
||||
if "ProcessedImages" in dirs:
|
||||
dirs.remove("ProcessedImages")
|
||||
for file in files:
|
||||
if file.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp', '.avif')):
|
||||
if file.lower().endswith(
|
||||
(".png", ".jpg", ".jpeg", ".gif", ".webp", ".avif")
|
||||
):
|
||||
return os.path.join(root, file)
|
||||
return None
|
||||
|
||||
def process_directory_with_logging(selected_directory: str, additional_name: str = '', is_checked: bool = False, log = None, update_previews = None):
|
||||
print(f"is_checked: {is_checked}")
|
||||
if not selected_directory:
|
||||
messagebox.showwarning("No Directory", "Please select a directory.")
|
||||
return
|
||||
def log_message(self, message, log=None):
|
||||
"""
|
||||
Log a message or print it if no log function is provided.
|
||||
|
||||
Args:
|
||||
message (str): The message to log or print.
|
||||
log (function, optional): The log function to use. Defaults to None.
|
||||
"""
|
||||
if log:
|
||||
log(f"Processing started for directory: {selected_directory}")
|
||||
output_directory = os.path.join(selected_directory, 'ProcessedImages')
|
||||
log(message)
|
||||
else:
|
||||
print(message)
|
||||
|
||||
def process_directory_with_logging(self, options):
|
||||
"""
|
||||
Process images in the selected directory with logging.
|
||||
|
||||
Args:
|
||||
options (dict): Processing options.
|
||||
"""
|
||||
if not self.selected_directory:
|
||||
messagebox.showwarning(
|
||||
"No Directory", "Please select a directory.")
|
||||
return
|
||||
log = options.get("log_message", None)
|
||||
self.log_message(
|
||||
f"Processing started for directory: {self.selected_directory}", log
|
||||
)
|
||||
|
||||
output_directory = self.create_output_directory(log)
|
||||
image_paths = self.collect_image_paths(log)
|
||||
|
||||
self.process_images(image_paths, output_directory, options, log)
|
||||
|
||||
messagebox.showinfo("Process Complete",
|
||||
"Image processing is complete.")
|
||||
self.log_message("Processing complete.", log)
|
||||
|
||||
def create_output_directory(self, log):
|
||||
"""
|
||||
Create the output directory for processed images.
|
||||
|
||||
Args:
|
||||
log (function): The log function to use.
|
||||
|
||||
Returns:
|
||||
str: The path to the output directory.
|
||||
"""
|
||||
output_directory = os.path.join(
|
||||
self.selected_directory, "ProcessedImages")
|
||||
if os.path.exists(output_directory):
|
||||
shutil.rmtree(output_directory)
|
||||
if log:
|
||||
log("Existing directory removed.")
|
||||
self.log_message("Existing directory removed.", log)
|
||||
os.makedirs(output_directory, exist_ok=True)
|
||||
if log:
|
||||
log(f"Output directory created: {output_directory}")
|
||||
self.log_message(f"Output directory created: {output_directory}", log)
|
||||
return output_directory
|
||||
|
||||
def collect_image_paths(self, log):
|
||||
"""
|
||||
Collect all image paths in the selected directory.
|
||||
|
||||
Args:
|
||||
log (function): The log function to use.
|
||||
|
||||
Returns:
|
||||
list: A list of image paths.
|
||||
"""
|
||||
image_paths = []
|
||||
for root, dirs, files in os.walk(selected_directory):
|
||||
if 'ProcessedImages' in dirs:
|
||||
dirs.remove('ProcessedImages')
|
||||
for root, dirs, files in os.walk(self.selected_directory):
|
||||
if "ProcessedImages" in dirs:
|
||||
dirs.remove("ProcessedImages")
|
||||
for file in files:
|
||||
if file.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp', '.avif')):
|
||||
if file.lower().endswith(
|
||||
(".png", ".jpg", ".jpeg", ".gif", ".webp", ".avif")
|
||||
):
|
||||
file_path = os.path.join(root, file)
|
||||
image_paths.append(file_path)
|
||||
if log:
|
||||
log(f"Found: {file_path}")
|
||||
if log:
|
||||
log(f"Total images found: {len(image_paths)}")
|
||||
self.log_message(f"Found: {file_path}", log)
|
||||
self.log_message(f"Total images found: {len(image_paths)}", log)
|
||||
return image_paths
|
||||
|
||||
def process_images(self, image_paths, output_directory, options, log):
|
||||
"""
|
||||
Process each image by resizing and saving it to the output directory.
|
||||
|
||||
Args:
|
||||
image_paths (list): A list of image paths.
|
||||
output_directory (str): The path to the output directory.
|
||||
options (dict): Processing options.
|
||||
log (function): The log function to use.
|
||||
"""
|
||||
image = ImageProcessor()
|
||||
for file_path in image_paths:
|
||||
output_path = os.path.join(output_directory, os.path.relpath(file_path, selected_directory))
|
||||
output_path = os.path.join(
|
||||
output_directory, os.path.relpath(
|
||||
file_path, self.selected_directory)
|
||||
)
|
||||
os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
||||
resize_image(file_path, output_path, additional_name)
|
||||
image.resize_image(
|
||||
file_path, output_path, options.get("additional_name", "")
|
||||
)
|
||||
|
||||
if os.path.exists(file_path) and is_checked:
|
||||
if log:
|
||||
log(f"removing: {file_path}")
|
||||
if os.path.exists(file_path) and options.get("is_checked", False):
|
||||
self.log_message(f"Removing: {file_path}", log)
|
||||
os.remove(file_path)
|
||||
if log:
|
||||
log(f"Processed: {file_path}")
|
||||
|
||||
messagebox.showinfo("Process Complete", "Image processing is complete.")
|
||||
if log:
|
||||
log("Processing complete.")
|
||||
self.log_message(f"Processed: {file_path}", log)
|
||||
|
||||
@@ -2,33 +2,70 @@ import os
|
||||
from wand.image import Image
|
||||
from wand.color import Color
|
||||
|
||||
def set_canvas_size(width, height):
|
||||
global canvas_width, canvas_height
|
||||
canvas_width = int(width)
|
||||
canvas_height = int(height)
|
||||
|
||||
def resize_image(image_path, output_path, additional_name):
|
||||
class ImageProcessor:
|
||||
def __init__(
|
||||
self, canvas_width=900, canvas_height=900, background_color="transparent"
|
||||
):
|
||||
"""
|
||||
Initialize the ImageProcessor with default values.
|
||||
"""
|
||||
self.canvas_width = canvas_width
|
||||
self.canvas_height = canvas_height
|
||||
self.background_color = background_color
|
||||
|
||||
def set_canvas_size(self, width, height):
|
||||
"""
|
||||
Set the canvas size.
|
||||
"""
|
||||
self.canvas_width = int(width)
|
||||
self.canvas_height = int(height)
|
||||
|
||||
def set_background_color(self, color):
|
||||
"""
|
||||
Set the background color.
|
||||
"""
|
||||
self.background_color = Color(color)
|
||||
|
||||
def resize_image(self, image_path, output_path, additional_name=None):
|
||||
"""
|
||||
Resize and process the image.
|
||||
"""
|
||||
# Normalize the paths to ensure consistency
|
||||
image_path = os.path.normpath(image_path)
|
||||
output_path = os.path.normpath(output_path)
|
||||
|
||||
print(image_path)
|
||||
print(output_path)
|
||||
with Image(filename=image_path) as img:
|
||||
img.transform(resize=f'{canvas_width}x{canvas_height}>')
|
||||
img.transform(resize=f"{self.canvas_width}x{self.canvas_height}>")
|
||||
|
||||
x_offset = int((canvas_width - img.width) / 2)
|
||||
y_offset = int((canvas_height - img.height) / 2)
|
||||
x_offset = int((self.canvas_width - img.width) / 2)
|
||||
y_offset = int((self.canvas_height - img.height) / 2)
|
||||
|
||||
with Image(width=canvas_width, height=canvas_height, background=Color('transparent')) as canvas:
|
||||
with Image(
|
||||
width=self.canvas_width,
|
||||
height=self.canvas_height,
|
||||
background=self.background_color,
|
||||
) as canvas:
|
||||
canvas.composite(img, left=x_offset, top=y_offset)
|
||||
# Create a new filename
|
||||
new_filename = os.path.splitext(os.path.basename(output_path))[0]
|
||||
new_filename = os.path.splitext(
|
||||
os.path.basename(output_path))[0]
|
||||
if additional_name:
|
||||
new_filename += " - " + additional_name.strip()
|
||||
new_filename += os.path.splitext(output_path)[1]
|
||||
# Construct the final output path
|
||||
final_output_path = os.path.join(os.path.dirname(output_path), new_filename)
|
||||
final_output_path = os.path.join(
|
||||
os.path.dirname(output_path), new_filename
|
||||
)
|
||||
# Save the image to the final output path
|
||||
canvas.save(filename=final_output_path)
|
||||
print(f"Saved to: {final_output_path}")
|
||||
set_canvas_size(900, 900)
|
||||
|
||||
|
||||
# Example usage
|
||||
if __name__ == "__main__":
|
||||
processor = ImageProcessor()
|
||||
processor.set_canvas_size(900, 900)
|
||||
processor.set_background_color("white")
|
||||
processor.resize_image("input_image.jpg", "output_image.jpg", "example")
|
||||
|
||||
Reference in New Issue
Block a user