from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.common.keys import Keys from selenium.webdriver.chrome.service import Service from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC import time,os,sys,oss2,json,shutil from util import utils import requests def main(pid,orderId,styleNumber): #进行一些初始化的处理 flag = init(pid,orderId,styleNumber) if flag == False: print(f"初始化步骤失败 pid:{pid} orderId:{orderId} styleNumber:{styleNumber}") utils.notify(f"初始化步骤失败 pid:{pid} orderId:{orderId} styleNumber:{styleNumber}") return # 初始化浏览器 chrome_driver_path = driver_path service = Service(executable_path=chrome_driver_path) driver = webdriver.Chrome(service=service) # 打开目标网页 driver.get('http://127.0.0.1:8188') # 替换为你的目标网页地址 # 等待页面加载完成 time.sleep(5) # 等待2秒,确保页面加载完成 # 定位按钮并点击 try: # 假设按钮的HTML元素可以通过id定位 button = driver.find_element(By.CSS_SELECTOR, '#comfy-load-button .pysssss-workflow-arrow') button.click() print("弹出版本选择按钮已成功点击") time.sleep(3) # 使用XPath定位包含特定文本的div元素 target_text = "3.小鱼AI(灰度图)图片浮雕流程-V3" button = WebDriverWait(driver, 10).until( EC.element_to_be_clickable((By.XPATH, f'//div[contains(text(), "{target_text}")]')) ) button.click() print("选择版本按钮已成功点击") time.sleep(3) # 使用XPath定位包含特定文本的div元素 target_text = "Extra options" button = WebDriverWait(driver, 10).until( EC.element_to_be_clickable((By.XPATH, f'//label[contains(text(), "{target_text}")]')) ) button.click() # time.sleep(5) button = driver.find_element(By.ID, 'queue-button') print(button) button.click() #获取当前时间戳 timestamp = int(time.time()) #检测是否完成数据,一个死循环 while True: boolV = checkIsFinish(driver, pid, orderId,styleNumber) if boolV: break else: #计算时间戳是否超过5分钟了 now = int(time.time()) if now - timestamp >= 300: print("超过5分钟,任务还没结束,强行停止执行下一个任务") utils.notify(f"pid:{pid} orderId:{orderId} 超过5分钟,任务还没结束,强行停止执行下一个任务") break time.sleep(5) continue except Exception as e: print(f"发生错误:{e}") utils.notify(f"发生错误:{e}") def down_obj_from_oss(fileName,pid,styleNumber): path = os.path.join(inputFolder, fileName) # 根据前缀获取文件列表 prefix = f'photos/{pid}/cartoon/cartoon_image' filelist = oss2.ObjectIteratorV2(utils.oss(), prefix=prefix) print(f"filelist:{filelist} prefix{prefix}") isDownload = False for file in filelist: if file and hasattr(file, 'key') and f"{styleNumber}.png" in file.key: utils.oss().get_object_to_file(file.key, path) isDownload = True print(f"pid:{pid} styleNumber:{styleNumber} 文件下载成功") break return isDownload def init(pid,orderId,styleNumber): #清除输入输出里的一些文件 clearFolder(outputFolder) clearFolder(inputFolder) #下载oss的文件到指定目录 修改指定json文件里的内容为指定内容 imageName = f"{pid}_{orderId}_{styleNumber}.png" downloadRes = down_obj_from_oss(imageName,pid,styleNumber) if downloadRes == False: return False #修改配置文件 3.小鱼AI(灰度图)图片浮雕流程-V3.json, 替换数据 id = 277 的 LoadImage 的 inputs[0] 的 value 为 imageName with open("3.小鱼AI(灰度图)图片浮雕流程-V3.json", 'r', encoding='utf-8') as f: data = json.load(f) for item in data['nodes']: if item['id'] == 277: item['widgets_values'][0] = imageName jsonFilePathNew = os.path.join(jsonFilePath, "3.小鱼AI(灰度图)图片浮雕流程-V3.json") with open(jsonFilePathNew, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=4) print(f"pid:{pid} orderId:{orderId} styleNumber:{styleNumber} 配置文件修改成功") #复制到指定路劲下 jsonFilePath #shutil.copy(jsonFilePath, os.path.join(jsonFilePath, "3.小鱼AI(灰度图)图片浮雕流程-V3.json")) return True def checkIsFinish(driver, pid, orderId,styleNumber): # 检测指定目录下是否有文件产生 folder = outputFolder fileLength = os.listdir(folder) uploadCounts = 0 files = "" if len(fileLength) == 2: #上传文件到指定的目录 for file in os.listdir(folder): files += file+"," file_path = os.path.join(folder, file) if "tiff" in file: ossPath = f"photos/{pid}/cartoon_deep/{styleNumber}/{styleNumber}.tiff" time.sleep(5) utils.oss().put_object_from_file(ossPath, file_path) uploadCounts += 1 elif "png" in file: ossPath = f"photos/{pid}/cartoon_deep/{styleNumber}/{styleNumber}.png" time.sleep(5) utils.oss().put_object_from_file(ossPath, file_path) uploadCounts += 1 if uploadCounts == 2: print(f"pid={pid},orderid={orderId},style_number:{styleNumber} 任务处理完成,深度图已上传到OSS") #推入到队列中 r = utils.create_redis_connection() if pid > 1000: r.lpush('model:badge_cartoon_build', json.dumps({'pid': pid, 'order_id': orderId, 'style_number': styleNumber})) driver.quit() return True else: print(f"pid={pid},orderid={orderId},style_number={styleNumber} 任务处理失败,当前文件数据为{files}") utils.notify(f"pid={pid},orderid={orderId},style_number={styleNumber} 任务处理失败,当前文件数据为{files}") return False #清除指定文件夹下的所有文件 def clearFolder(folder): for file in os.listdir(folder): #判断如果是文件夹,则整个删除 if os.path.isdir(os.path.join(folder, file)): shutil.rmtree(os.path.join(folder, file)) else: os.remove(os.path.join(folder, file)) #检测输入目录里是否有输入的文件存在 def checkInputFolder(): folder = inputFolder fileLength = os.listdir(folder) if len(fileLength) == 1: return True return False if __name__ == '__main__': pid = 0 orderId = 0 styleNumber = 0 downloadPath = "D:\downloadPhotos" inputFolder = "C:\StableDiffusion_xiaoyu_V3\ComfyUI-aki-v1.6-XY\ComfyUI\input" #"c:\deep" #"C:\StableDiffusion_xiaoyu_V3\ComfyUI-aki-v1.6-XY\ComfyUI\input" outputFolder = "C:\StableDiffusion_xiaoyu_V3\ComfyUI-aki-v1.6-XY\ComfyUI\output" jsonFilePath = "C:\StableDiffusion_xiaoyu_V3\ComfyUI-aki-v1.6-XY\ComfyUI\pysssss-workflows" # 设置浏览器驱动路径(以Chrome为例) driver_path = 'D:\chromedriver-win64\chromedriver.exe' # 替换为你的chromedriver路径 #检查文件是否存在,不存在就创建, 测试使用 if not os.path.exists(inputFolder): os.makedirs(inputFolder) if not os.path.exists(outputFolder): os.makedirs(outputFolder) if not os.path.exists(jsonFilePath): os.makedirs(jsonFilePath) if not os.path.exists(downloadPath): os.makedirs(downloadPath) arrArgs = sys.argv if len(arrArgs) == 4: pid = arrArgs[1] orderId = arrArgs[2] styleNumber = arrArgs[3] #判断是否是数字 if pid.isdigit() and orderId.isdigit() and styleNumber.isdigit(): main(pid,orderId,styleNumber) else: print("输入的参数不是数字") else: r = utils.create_redis_connection() while True: try: # if r.llen('model:badge_cartoon') == 0: # print('队列为空,等待10秒') # time.sleep(10) # continue info = r.spop('model:badge_cartoon') if info is None: print('队列为空,等待10秒') time.sleep(10) continue # 确保info是字符串类型 if isinstance(info, bytes): info = info.decode('utf-8') info = json.loads(info) print(info) # 检查必要的字段是否存在 if not all(key in info for key in ['pid', 'order_id', 'style_number']): print(f"Redis数据格式错误,缺少必要字段: {info}") utils.notify(f"Redis数据格式错误,缺少必要字段: {info}") continue print(info) exit pid = int(info['pid']) orderId = int(info['order_id']) styleNumber = int(info['style_number']) print(f"pid:{pid} orderId:{orderId} style_number:{styleNumber}") if pid > 0 and orderId > 0 and styleNumber > 0: #创建临时文件 以pid 和 orderId 和 styleNumber 为文件名 文件内容为空 tempFile = os.path.join(downloadPath, f"{pid}_{orderId}_{styleNumber}.txt") with open(tempFile, 'w') as f: f.write('') main(pid,orderId,styleNumber) except json.JSONDecodeError as e: print(f'JSON解析错误:{e}') utils.notify(f'JSON解析错误:{e}') tempFile = os.path.join(downloadPath, f"{pid}_{orderId}_{styleNumber}_解析错误.txt") with open(tempFile, 'w') as f: f.write('') time.sleep(10) continue except Exception as e: print(f'错误:{e}') tempFile = os.path.join(downloadPath, f"{pid}_{orderId}_{styleNumber}_异常错误.txt") with open(tempFile, 'w') as f: f.write(e) utils.notify(f'错误:{e}') time.sleep(10) r = utils.create_redis_connection() continue # from selenium import webdriver # from selenium.webdriver.common.by import By # from selenium.webdriver.common.keys import Keys # from selenium.webdriver.chrome.service import Service # from selenium.webdriver.support.ui import WebDriverWait # from selenium.webdriver.support import expected_conditions as EC # import time # import os # import sys # import oss2 # import json # import shutil # import traceback # import datetime # from util import utils # import requests # # ===================== 核心修复:Chrome配置 ===================== # def init_chrome_driver(driver_path, chrome_binary_path=None): # """初始化Chrome驱动,解决找不到Chrome二进制文件问题""" # from selenium.webdriver.chrome.options import Options # chrome_options = Options() # # 1. 指定Chrome二进制文件路径(关键修复) # if chrome_binary_path: # chrome_options.binary_location = chrome_binary_path # # 2. 添加通用优化参数 # chrome_options.add_argument("--no-sandbox") # 解决沙箱问题 # chrome_options.add_argument("--disable-dev-shm-usage") # 解决内存不足 # chrome_options.add_argument("--disable-gpu") # 禁用GPU加速(兼容老系统) # chrome_options.add_argument("--window-size=1920,1080") # 设置窗口大小 # chrome_options.add_experimental_option("excludeSwitches", ["enable-logging"]) # 屏蔽无用日志 # # 3. 初始化驱动 # try: # service = Service(executable_path=driver_path) # driver = webdriver.Chrome(service=service, options=chrome_options) # driver.set_page_load_timeout(60) # 设置页面加载超时 # return driver # except Exception as e: # raise Exception(f"Chrome驱动初始化失败:{str(e)}\n堆栈信息:{traceback.format_exc()}") # # ===================== 原有业务函数修改 ===================== # def main(pid, orderId, styleNumber): # driver = None # try: # # 进行一些初始化的处理 # flag = init(pid, orderId, styleNumber) # if flag == False: # err_msg = f"初始化步骤失败 pid:{pid} orderId:{orderId} styleNumber:{styleNumber}" # print(err_msg) # utils.notify(err_msg) # return # # 初始化浏览器(修复核心:指定Chrome二进制路径) # chrome_binary_path = r"C:\Program Files\Google\Chrome\Application\chrome.exe" # 根据实际路径修改 # driver = init_chrome_driver(driver_path, chrome_binary_path) # # 打开目标网页 # driver.get('http://127.0.0.1:8188') # 替换为你的目标网页地址 # # 等待页面加载完成 # time.sleep(5) # 等待2秒,确保页面加载完成 # # 定位按钮并点击 # # 1. 点击弹出版本选择按钮 # button = WebDriverWait(driver, 10).until( # EC.element_to_be_clickable((By.CSS_SELECTOR, '#comfy-load-button .pysssss-workflow-arrow')) # ) # button.click() # print("弹出版本选择按钮已成功点击") # time.sleep(3) # # 2. 点击"深度图批处理" # target_text = "深度图批处理" # button = WebDriverWait(driver, 10).until( # EC.element_to_be_clickable((By.XPATH, f'//div[contains(text(), "{target_text}")]')) # ) # button.click() # print("选择版本按钮已成功点击") # time.sleep(3) # # 3. 点击"Extra options" # target_text = "Extra options" # button = WebDriverWait(driver, 10).until( # EC.element_to_be_clickable((By.XPATH, f'//label[contains(text(), "{target_text}")]')) # ) # button.click() # time.sleep(5) # # 4. 点击队列按钮 # button = WebDriverWait(driver, 10).until( # EC.element_to_be_clickable((By.ID, 'queue-button')) # ) # button.click() # print("队列按钮点击成功") # # 获取当前时间戳 # timestamp = int(time.time()) # # 检测是否完成数据,一个死循环 # while True: # boolV = checkIsFinish(driver, pid, orderId, styleNumber) # if boolV: # break # else: # # 计算时间戳是否超过5分钟了 # now = int(time.time()) # if now - timestamp >= 300: # err_msg = f"pid:{pid} orderId:{orderId} 超过5分钟,任务还没结束,强行停止执行下一个任务" # print(err_msg) # utils.notify(err_msg) # break # time.sleep(5) # continue # except Exception as e: # err_msg = f"pid:{pid} orderId:{orderId} styleNumber:{styleNumber} 执行错误:{str(e)}\n堆栈信息:{traceback.format_exc()}" # print(err_msg) # utils.notify(err_msg) # finally: # # 确保浏览器关闭 # if driver: # try: # driver.quit() # except: # pass # def down_obj_from_oss(fileName, pid, styleNumber): # """从OSS下载文件""" # path = os.path.join(inputFolder, fileName) # # 根据前缀获取文件列表 # prefix = f'photos/{pid}/cartoon/cartoon_image' # filelist = oss2.ObjectIteratorV2(utils.oss(), prefix=prefix) # print(f"filelist:{filelist} prefix{prefix}") # isDownload = False # for file in filelist: # if file and hasattr(file, 'key') and f"{styleNumber}.png" in file.key: # utils.oss().get_object_to_file(file.key, path) # isDownload = True # print(f"pid:{pid} styleNumber:{styleNumber} 文件下载成功") # break # return isDownload # def init(pid, orderId, styleNumber): # """初始化:清空文件夹、下载OSS文件、修改配置文件""" # try: # # 清除输入输出里的一些文件 # clearFolder(outputFolder) # clearFolder(inputFolder) # # 下载oss的文件到指定目录 # imageName = f"{pid}_{orderId}_{styleNumber}.png" # downloadRes = down_obj_from_oss(imageName, pid, styleNumber) # if downloadRes == False: # print(f"pid:{pid} styleNumber:{styleNumber} OSS文件下载失败") # return False # # 修改配置文件:替换LoadImage的inputs[0]的value # json_file = "3.小鱼AI(灰度图)图片浮雕流程-V3.json" # json_origin_path = os.path.join(jsonFilePath, json_file) # 修正路径拼接 # with open(json_origin_path, 'r', encoding='utf-8') as f: # data = json.load(f) # # 遍历节点修改值 # for item in data.get('nodes', []): # if item.get('id') == 277: # if len(item.get('widgets_values', [])) > 0: # item['widgets_values'][0] = imageName # else: # print(f"pid:{pid} 节点277的widgets_values为空,无法修改") # return False # # 写入修改后的配置文件 # with open(json_origin_path, 'w', encoding='utf-8') as f: # json.dump(data, f, ensure_ascii=False, indent=4) # print(f"pid:{pid} orderId:{orderId} styleNumber:{styleNumber} 配置文件修改成功") # return True # except Exception as e: # print(f"初始化失败:{str(e)}") # utils.notify(f"pid:{pid} orderId:{orderId} 初始化失败:{str(e)}") # return False # def checkIsFinish(driver, pid, orderId, styleNumber): # """检测任务是否完成:检查输出文件夹、上传OSS""" # try: # folder = outputFolder # # 检查文件夹是否存在 # if not os.path.exists(folder): # print(f"输出文件夹不存在:{folder}") # return False # file_list = os.listdir(folder) # fileLength = len(file_list) # uploadCounts = 0 # files = "" # if fileLength == 2: # # 上传文件到指定的目录 # for file in file_list: # files += file + "," # file_path = os.path.join(folder, file) # if not os.path.exists(file_path): # continue # # 上传TIFF文件 # if "tiff" in file.lower(): # 忽略大小写 # ossPath = f"photos/{pid}/cartoon_deep/{styleNumber}/{styleNumber}.tiff" # time.sleep(5) # utils.oss().put_object_from_file(ossPath, file_path) # uploadCounts += 1 # # 上传PNG文件 # elif "png" in file.lower(): # ossPath = f"photos/{pid}/cartoon_deep/{styleNumber}/{styleNumber}.png" # time.sleep(5) # utils.oss().put_object_from_file(ossPath, file_path) # uploadCounts += 1 # if uploadCounts == 2: # success_msg = f"pid={pid},orderid={orderId},style_number:{styleNumber} 任务处理完成,深度图已上传到OSS" # print(success_msg) # # 推入到队列中 # r = utils.create_redis_connection() # if pid > 1000: # r.lpush('model:badge_cartoon_build', # json.dumps({'pid': pid, 'order_id': orderId, 'style_number': styleNumber})) # return True # else: # err_msg = f"pid={pid},orderid={orderId},style_number:{styleNumber} 任务处理失败,当前文件数据为{files},仅上传{uploadCounts}个文件" # print(err_msg) # utils.notify(err_msg) # return False # except Exception as e: # err_msg = f"检测任务完成状态失败:{str(e)}" # print(err_msg) # utils.notify(err_msg) # return False # def clearFolder(folder): # """清除指定文件夹下的所有文件(增强容错)""" # if not os.path.exists(folder): # os.makedirs(folder) # return # for file in os.listdir(folder): # file_path = os.path.join(folder, file) # try: # if os.path.isdir(file_path): # shutil.rmtree(file_path) # else: # os.remove(file_path) # except Exception as e: # print(f"删除文件/文件夹失败 {file_path}:{str(e)}") # def checkInputFolder(): # """检测输入目录里是否有输入的文件存在""" # folder = inputFolder # if not os.path.exists(folder): # return False # fileLength = os.listdir(folder) # return len(fileLength) == 1 # # ===================== 主程序入口修改 ===================== # if __name__ == '__main__': # # 全局变量定义(修正路径格式,使用原始字符串避免转义) # pid = 0 # orderId = 0 # styleNumber = 0 # downloadPath = r"D:\downloadPhotos" # inputFolder = r"C:\deep" # 实际路径请确认 # outputFolder = r"C:\StableDiffusion_xiaoyu_V3\ComfyUI-aki-v1.6-XY\ComfyUI\output" # jsonFilePath = r"C:\StableDiffusion_xiaoyu_V3\ComfyUI-aki-v1.6-XY\ComfyUI\pysssss-workflows" # driver_path = r"D:\chromedriver-win64\chromedriver.exe" # ChromeDriver路径 # # 检查并创建文件夹 # for folder in [inputFolder, outputFolder, jsonFilePath, downloadPath]: # if not os.path.exists(folder): # os.makedirs(folder) # print(f"创建文件夹:{folder}") # # 处理命令行参数 # arrArgs = sys.argv # if len(arrArgs) == 4: # pid = arrArgs[1] # orderId = arrArgs[2] # styleNumber = arrArgs[3] # # 判断是否是数字 # if pid.isdigit() and orderId.isdigit() and styleNumber.isdigit(): # main(int(pid), int(orderId), int(styleNumber)) # 转为int避免字符串比较问题 # else: # print("输入的参数不是数字") # else: # # 从Redis消费任务 # while True: # try: # r = utils.create_redis_connection() # info = r.spop('model:badge_cartoon') # if info is None: # print('队列为空,等待10秒') # time.sleep(10) # continue # # 确保info是字符串类型 # if isinstance(info, bytes): # info = info.decode('utf-8') # # 解析JSON # info_dict = json.loads(info) # print(f"从Redis获取任务:{info_dict}") # # 检查必要的字段是否存在 # if not all(key in info_dict for key in ['pid', 'order_id', 'style_number']): # err_msg = f"Redis数据格式错误,缺少必要字段: {info_dict}" # print(err_msg) # utils.notify(err_msg) # continue # # 转换为整数 # pid = int(info_dict['pid']) # orderId = int(info_dict['order_id']) # styleNumber = int(info_dict['style_number']) # print(f"开始处理任务 - pid:{pid} orderId:{orderId} style_number:{styleNumber}") # # 创建临时文件标记 # tempFile = os.path.join(downloadPath, f"{pid}_{orderId}_{styleNumber}.txt") # with open(tempFile, 'w', encoding='utf-8') as f: # f.write(f"任务开始时间:{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") # # 执行主任务 # main(pid, orderId, styleNumber) # except json.JSONDecodeError as e: # err_msg = f'JSON解析错误:{str(e)}\n原始数据:{info if "info" in locals() else "未知"}\n堆栈:{traceback.format_exc()}' # print(err_msg) # utils.notify(err_msg) # # 写入错误标记文件 # if 'pid' in locals() and 'orderId' in locals() and 'styleNumber' in locals(): # tempFile = os.path.join(downloadPath, f"{pid}_{orderId}_{styleNumber}_解析错误.txt") # with open(tempFile, 'w', encoding='utf-8') as f: # f.write(err_msg) # time.sleep(10) # continue # except Exception as e: # err_msg = f'执行任务异常:{str(e)}\n堆栈:{traceback.format_exc()}' # print(err_msg) # utils.notify(err_msg) # # 修复:异常对象转字符串后写入文件 # if 'pid' in locals() and 'orderId' in locals() and 'styleNumber' in locals(): # tempFile = os.path.join(downloadPath, f"{pid}_{orderId}_{styleNumber}_异常错误.txt") # with open(tempFile, 'w', encoding='utf-8') as f: # f.write(err_msg) # time.sleep(10) # continue