UIpdate image script
This commit is contained in:
@@ -1,24 +1,43 @@
|
||||
"""
|
||||
Module for WooCommerce API interactions and image processing.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import base64
|
||||
import tempfile
|
||||
import pprint
|
||||
from tkinter import messagebox
|
||||
from cryptography.fernet import Fernet
|
||||
import requests
|
||||
from tkinter import messagebox
|
||||
from woocommerce import API
|
||||
from cryptography.fernet import Fernet
|
||||
from utils.image_processing import ImageProcessor
|
||||
from config.encrypt_config import ConfigEncryptor
|
||||
from utils.file_operations import FileProcessor
|
||||
import hashlib
|
||||
import pprint
|
||||
CREDENTIALS_FILE = "credentials.json"
|
||||
|
||||
# Hardcoded key (replace with your generated key)
|
||||
KEY = b"u4xTBY5Ns4WYdLvqMjEr138mpMmDEhhqTszKCcDy2cI="
|
||||
|
||||
def save_active_credential_set(active_set_name):
|
||||
"""
|
||||
Update the active credential set in the saved credentials file.
|
||||
|
||||
Args:
|
||||
active_set_name (str): The name of the active credential set.
|
||||
"""
|
||||
if not os.path.exists(CREDENTIALS_FILE):
|
||||
return
|
||||
|
||||
with open(CREDENTIALS_FILE, 'r+') as file:
|
||||
data = json.load(file)
|
||||
|
||||
# Find the credential set and mark it as active
|
||||
for cred in data.get('credentials', []):
|
||||
cred['active'] = (cred['name'] == active_set_name)
|
||||
|
||||
# Rewrite the updated data back to the file
|
||||
file.seek(0)
|
||||
json.dump(data, file, indent=4)
|
||||
file.truncate()
|
||||
|
||||
|
||||
def save_credentials(url, consumer_key, consumer_secret, username, password):
|
||||
"""
|
||||
@@ -39,7 +58,7 @@ def save_credentials(url, consumer_key, consumer_secret, username, password):
|
||||
"password": password,
|
||||
}
|
||||
|
||||
ConfigEncryptor(KEY).save_credentials(credentials)
|
||||
ConfigEncryptor(KEY).save_credentials(consumer_key, consumer_secret, username, password)
|
||||
|
||||
|
||||
def load_credentials():
|
||||
@@ -49,37 +68,30 @@ def load_credentials():
|
||||
Returns:
|
||||
dict: The decrypted credentials, or None if the file does not exist.
|
||||
"""
|
||||
if not os.path.exists("config.enc"):
|
||||
return None
|
||||
fernet = Fernet(KEY)
|
||||
with open("config.enc", "rb") as file:
|
||||
encrypted = file.read()
|
||||
decrypted = fernet.decrypt(encrypted).decode()
|
||||
return json.loads(decrypted).get("credentials")
|
||||
creds = ConfigEncryptor(KEY).load_credentials()
|
||||
return creds
|
||||
|
||||
|
||||
def get_wcapi():
|
||||
"""
|
||||
Get a WooCommerce API client instance.
|
||||
Get a WooCommerce API client instance using the active credentials.
|
||||
|
||||
Returns:
|
||||
woocommerce.API: The WooCommerce API client instance, or None if credentials are missing.
|
||||
"""
|
||||
credentials = load_credentials()
|
||||
if not credentials:
|
||||
messagebox.showerror(
|
||||
"Error",
|
||||
"No WooCommerce credentials found. Please set them in the settings.",
|
||||
)
|
||||
return None
|
||||
active_credentials = load_credentials()
|
||||
|
||||
pprint.pprint(active_credentials)
|
||||
|
||||
return API(
|
||||
url=credentials["url"],
|
||||
consumer_key=credentials["consumer_key"],
|
||||
consumer_secret=credentials["consumer_secret"],
|
||||
url=active_credentials["url"],
|
||||
consumer_key=active_credentials["consumer_key"],
|
||||
consumer_secret=active_credentials["consumer_secret"],
|
||||
version="wc/v3",
|
||||
)
|
||||
|
||||
|
||||
|
||||
def get_product(product_id):
|
||||
"""
|
||||
Get a WooCommerce product and download its images.
|
||||
@@ -95,8 +107,13 @@ def get_product(product_id):
|
||||
return None
|
||||
result = wcapi.get(f"products/{product_id}")
|
||||
|
||||
image_paths = {}
|
||||
|
||||
product = result.json()
|
||||
|
||||
return product
|
||||
|
||||
def get_images(product, limit = 0):
|
||||
image_paths = {}
|
||||
if product.get("images"):
|
||||
images = product.get("images")
|
||||
|
||||
@@ -116,14 +133,19 @@ def get_product(product_id):
|
||||
print(
|
||||
f"Image {index + 1}/{len(images)} downloaded and saved: {file_path}"
|
||||
)
|
||||
if limit and limit >= index +1:
|
||||
break
|
||||
else:
|
||||
print(f"Failed to download image {index + 1}/{len(images)}")
|
||||
|
||||
return image_paths
|
||||
|
||||
else:
|
||||
if product.get("name"):
|
||||
print(f"No images found for {product.get('name')}")
|
||||
else:
|
||||
print("No images found")
|
||||
return image_paths, product
|
||||
return []
|
||||
|
||||
|
||||
def upload_image(img_path):
|
||||
@@ -139,6 +161,8 @@ def upload_image(img_path):
|
||||
with open(img_path, "rb") as img_file:
|
||||
data = img_file.read()
|
||||
file_name = os.path.basename(img_path)
|
||||
file_name = file_name.replace("–", "-")
|
||||
|
||||
credentials = load_credentials()
|
||||
if not credentials:
|
||||
messagebox.showerror(
|
||||
@@ -155,7 +179,7 @@ def upload_image(img_path):
|
||||
"Content-Disposition": f"attachment; filename={file_name}",
|
||||
"Authorization": f"basic {credentials_base64.decode()}",
|
||||
}
|
||||
|
||||
print(f"Uploading image {img_path}")
|
||||
try:
|
||||
res = requests.post(url=url, data=data, headers=headers, timeout=10)
|
||||
res.raise_for_status()
|
||||
@@ -180,6 +204,7 @@ def delete_img(image_id):
|
||||
Args:
|
||||
image_id (int): The ID of the image to delete.
|
||||
"""
|
||||
|
||||
credentials = load_credentials()
|
||||
if not credentials:
|
||||
messagebox.showerror(
|
||||
@@ -205,64 +230,114 @@ def delete_img(image_id):
|
||||
print(f"Failed to delete image with ID {image_id}. Error: {res.text}")
|
||||
|
||||
|
||||
def update_product(image_ids, product_id):
|
||||
|
||||
|
||||
|
||||
|
||||
def update_product(product_id, new_list, old_list, options):
|
||||
"""
|
||||
Update a WooCommerce product with new image IDs.
|
||||
Update the images and meta data of a WooCommerce product.
|
||||
|
||||
Args:
|
||||
image_ids (list): A list of new image IDs.
|
||||
product_id (int): The ID of the WooCommerce product.
|
||||
"""
|
||||
|
||||
wcapi = get_wcapi()
|
||||
if not wcapi:
|
||||
return
|
||||
|
||||
product = wcapi.get(f"products/{product_id}").json()
|
||||
product["images"] = [{"id": image_id} for image_id in image_ids]
|
||||
response = wcapi.put(f"products/{product_id}", data=product)
|
||||
# Prepare the data with images and meta data fields
|
||||
product_data = {
|
||||
"images": [{"id": image_id} for image_id in new_list],
|
||||
"meta_data": [
|
||||
{
|
||||
"key": "_image_processed",
|
||||
"value": options['hash_string']
|
||||
},
|
||||
{
|
||||
"key": "_old_image_ids",
|
||||
"value": [{"id": image_id} for image_id in old_list]
|
||||
}
|
||||
|
||||
]
|
||||
}
|
||||
|
||||
# Print product data for debugging
|
||||
print(f"Updating product {product_id} with the following data:")
|
||||
print(json.dumps(product_data, indent=2))
|
||||
|
||||
# Send the update request with images and meta data fields
|
||||
response = wcapi.put(f"products/{product_id}", data=product_data) # Using 'json' to pass data
|
||||
|
||||
if response.status_code == 200:
|
||||
print(
|
||||
f"Product with ID {product_id} updated successfully with new image IDs.")
|
||||
print(f"Product with ID {product_id} updated successfully with new image IDs and meta data.")
|
||||
else:
|
||||
print(
|
||||
f"Failed to update product with ID {product_id}. Error: {response.text}")
|
||||
print(f"Failed to update product with ID {product_id}. Error: {response.text}")
|
||||
|
||||
|
||||
def process_product_images( options):
|
||||
|
||||
def process_product_images(options):
|
||||
"""
|
||||
Process images for a WooCommerce product by resizing and uploading them.
|
||||
|
||||
Args:
|
||||
product_id (int): The ID of the WooCommerce product.
|
||||
name_template (str): The template for generating image filenames.
|
||||
canvas_width (int): The width of the canvas for resizing images.
|
||||
canvas_height (int): The height of the canvas for resizing images.
|
||||
options (dict): Contains options such as product_id, name_template, canvas_width, canvas_height.
|
||||
"""
|
||||
|
||||
# Concatenate the values into a string
|
||||
hash_input = f"{options['background_color']}_{options['canvas_height']}_{options['canvas_width']}_{options['image_format']}_{options['image_size']}"
|
||||
|
||||
# Create a SHA256 hash from the concatenated string
|
||||
hash_object = hashlib.sha256(hash_input.encode())
|
||||
hash_string = hash_object.hexdigest()
|
||||
options['hash_string'] = hash_string
|
||||
pprint.pprint(hash_string)
|
||||
product_id = options.get("product_id")
|
||||
if not product_id:
|
||||
print("No product ID")
|
||||
return
|
||||
image_paths, product = get_product(product_id)
|
||||
product = options.get("product")
|
||||
# Check if the product meta_data contains _image_processed with the current hash
|
||||
if product['meta_data']:
|
||||
for meta in product['meta_data']:
|
||||
if meta['key'] == '_image_processed' and meta['value'] == hash_string:
|
||||
print(f"Skipping product {product_id}, already processed with the current hash.")
|
||||
return
|
||||
image_paths = get_images(product)
|
||||
if not image_paths:
|
||||
return
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
with tempfile.TemporaryDirectory() as temp_output_directory:
|
||||
print(f"Using temporary directory: {temp_output_directory}")
|
||||
|
||||
old_list = []
|
||||
new_list = []
|
||||
|
||||
pprint.pprint ( list(image_paths.values()))
|
||||
file = FileProcessor()
|
||||
log = options.get("log_message", None)
|
||||
|
||||
|
||||
for image_id, file_path in image_paths.items():
|
||||
file = FileProcessor()
|
||||
img = ImageProcessor()
|
||||
output_path = file.generate_output_path(temp_output_directory, file_path, options, product)
|
||||
|
||||
img.resize_image(file_path, output_path, options)
|
||||
new_id = upload_image(output_path)
|
||||
|
||||
processed = file.process_images([file_path], temp_output_directory, options, log, product)
|
||||
|
||||
|
||||
new_id = upload_image(processed[0])
|
||||
|
||||
if new_id:
|
||||
old_list.append(image_id)
|
||||
new_list.append(new_id)
|
||||
|
||||
update_product(new_list, product_id)
|
||||
if new_list:
|
||||
options["image_ids"] = new_list # Store new image IDs in options
|
||||
update_product(product_id, new_list, old_list, options) # Pass new image IDs here
|
||||
for old in old_list:
|
||||
delete_img(old)
|
||||
print("Temporary files processed and uploaded successfully.")
|
||||
|
||||
|
||||
@@ -294,35 +369,95 @@ def generate_output_path(
|
||||
)
|
||||
return os.path.join(temp_output_directory, new_filename + ext)
|
||||
|
||||
|
||||
def process_all_products(options):
|
||||
def get_first_image_path(product):
|
||||
images = get_images(product, 1)
|
||||
# Loop through the dictionary
|
||||
if images:
|
||||
for image_id, file_path in images.items():
|
||||
print(f"Processing Image ID: {image_id}")
|
||||
print(f"File Path: {file_path}")
|
||||
return file_path
|
||||
def get_first_image():
|
||||
"""
|
||||
Process images for all WooCommerce products by resizing and uploading them.
|
||||
|
||||
Args:
|
||||
name_template (str): The template for generating image filenames.
|
||||
canvas_width (int): The width of the canvas for resizing images.
|
||||
canvas_height (int): The height of the canvas for resizing images.
|
||||
options (dict): Contains options such as name_template, canvas_width, canvas_height.
|
||||
"""
|
||||
wcapi = get_wcapi()
|
||||
if not wcapi:
|
||||
return
|
||||
|
||||
page = 1
|
||||
total_products = 0 # Initialize the counter for total products
|
||||
|
||||
|
||||
products = wcapi.get("products", params={"per_page": 5, "page": page}).json()
|
||||
if not products:
|
||||
return
|
||||
for product in products:
|
||||
total_products += 1 # Update the total count
|
||||
return get_first_image_path(product)
|
||||
|
||||
def search_product(search):
|
||||
"""
|
||||
Process images for all WooCommerce products by resizing and uploading them.
|
||||
|
||||
Args:
|
||||
options (dict): Contains options such as name_template, canvas_width, canvas_height.
|
||||
"""
|
||||
wcapi = get_wcapi()
|
||||
if not wcapi:
|
||||
return
|
||||
|
||||
page = 1
|
||||
total_products = 0 # Initialize the counter for total products
|
||||
|
||||
while True:
|
||||
products = wcapi.get("products", params={
|
||||
"per_page": 100, "page": page}).json()
|
||||
products = wcapi.get("products", params={"per_page": 100, "page": page, "search": search}).json()
|
||||
if not products:
|
||||
break
|
||||
return products
|
||||
|
||||
def process_all_products(options):
|
||||
"""
|
||||
Process images for all WooCommerce products by resizing and uploading them.
|
||||
|
||||
Args:
|
||||
options (dict): Contains options such as name_template, canvas_width, canvas_height.
|
||||
"""
|
||||
wcapi = get_wcapi()
|
||||
if not wcapi:
|
||||
return
|
||||
|
||||
page = 1
|
||||
total_products = 0 # Initialize the counter for total products
|
||||
|
||||
while True:
|
||||
products = wcapi.get("products", params={"per_page": 100, "page": page}).json()
|
||||
if not products:
|
||||
break
|
||||
|
||||
product_count = len(products) # Get the count of products on the current page
|
||||
|
||||
|
||||
for product in products:
|
||||
total_products += 1 # Update the total count
|
||||
options["product_id"] = product["id"]
|
||||
process_product_images(
|
||||
options
|
||||
)
|
||||
options["product"] = product
|
||||
log = options.get("log_message", None)
|
||||
if log:
|
||||
if product:
|
||||
name = product.get("name", "")
|
||||
log.log_message(f"#{total_products} Processing {name} ") # Log the product name
|
||||
process_product_images(options)
|
||||
|
||||
page += 1
|
||||
|
||||
# Log the total number of products processed
|
||||
log(f"Total products processed: {total_products}")
|
||||
|
||||
# Show completion message
|
||||
messagebox.showinfo(
|
||||
"Process Complete", "All product images processing is complete."
|
||||
"Process Complete", f"All product images processing is complete. Total products processed: {total_products}"
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user