From 6578f4732fcbfca5f21fada8856b3b8dc9ad7b06 Mon Sep 17 00:00:00 2001 From: dongchangxi <458593490@qq.com> Date: Fri, 5 Dec 2025 10:37:17 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A4=9A=E7=BA=BF=E7=A8=8B=E7=BB=88=E6=AD=A2?= =?UTF-8?q?=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- script/factory_sliceing_v2/main.py | 89 ++++++++++++---------- script/factory_sliceing_v2/utils/funcs.py | 90 +++++++++++++++-------- 2 files changed, 108 insertions(+), 71 deletions(-) diff --git a/script/factory_sliceing_v2/main.py b/script/factory_sliceing_v2/main.py index f7006de..6dd02f3 100644 --- a/script/factory_sliceing_v2/main.py +++ b/script/factory_sliceing_v2/main.py @@ -72,46 +72,55 @@ def main(work_dir=None): exit(0) # 循环处理,直到队列为空 - while True: - r = redisClient() - #检测队列是否有值 - if r.scard('pb:sliceing') == 0: - print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 队列为空,等待10秒') - time.sleep(10) - continue - #获取队列中的值 - data = r.spop('pb:sliceing') - if data is None: - print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 取出的数据为空,等待10秒') - time.sleep(10) - continue - data = data.decode('utf-8') - #判断是否是数字 - if not data.isdigit(): - print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 取出的数据不是数字,等待10秒') - time.sleep(10) - continue - - versionId = str(data) - print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 正在处理版次ID={versionId}') - res = step1(versionId) - if res == False: - print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} JSON文件下载数据失败,等待10秒') - time.sleep(10) - continue - - - - - # 在长时间操作后,确保 Redis 连接仍然有效 - # 通过重新获取客户端来触发连接检查 - try: - r = redisClient() - r.ping() # 测试连接 - except Exception as e: - print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} Redis连接检查失败: {str(e)},将在下次循环时自动重连') - - #time.sleep(10) + try: + while True: + try: + r = redisClient() + #检测队列是否有值 + if r.scard('pb:sliceing') == 0: + print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 队列为空,等待10秒') + time.sleep(10) + continue + #获取队列中的值 + data = r.spop('pb:sliceing') + if data is None: + print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 取出的数据为空,等待10秒') + time.sleep(10) + continue + data = data.decode('utf-8') + #判断是否是数字 + if not data.isdigit(): + print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 取出的数据不是数字,等待10秒') + time.sleep(10) + continue + + versionId = str(data) + print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 正在处理版次ID={versionId}') + res = step1(versionId) + if res == False: + print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} JSON文件下载数据失败,等待10秒') + time.sleep(10) + continue + + + + + # 在长时间操作后,确保 Redis 连接仍然有效 + # 通过重新获取客户端来触发连接检查 + try: + r = redisClient() + r.ping() # 测试连接 + except Exception as e: + print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} Redis连接检查失败: {str(e)},将在下次循环时自动重连') + + #time.sleep(10) + except KeyboardInterrupt: + # 在循环内部捕获 KeyboardInterrupt,允许在 sleep 或操作中被中断 + raise # 重新抛出,让外层捕获 + except KeyboardInterrupt: + print(f'\n{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 收到中断信号,正在优雅退出...') + print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 程序已停止') + sys.exit(0) def testMain(): diff --git a/script/factory_sliceing_v2/utils/funcs.py b/script/factory_sliceing_v2/utils/funcs.py index a5df218..e276db6 100644 --- a/script/factory_sliceing_v2/utils/funcs.py +++ b/script/factory_sliceing_v2/utils/funcs.py @@ -438,41 +438,69 @@ def downloadDataByOssAndTransformSave(dirNewName, isSmallMachine=False, max_work beginTime = time.time() # 使用线程池并发处理 - with ThreadPoolExecutor(max_workers=max_workers) as executor: - # 提交所有任务 - future_to_item = { - executor.submit(_process_single_item, v, arrDownloadPath, dirNewName, dirPath, isSmallMachine): v - for v in listData - } - - # 收集结果 - success_count = 0 - fail_count = 0 - for future in as_completed(future_to_item): - v = future_to_item[future] + try: + with ThreadPoolExecutor(max_workers=max_workers) as executor: + # 提交所有任务 + future_to_item = { + executor.submit(_process_single_item, v, arrDownloadPath, dirNewName, dirPath, isSmallMachine): v + for v in listData + } + + # 收集结果 + success_count = 0 + fail_count = 0 try: - success, error_msg = future.result() - if success: - success_count += 1 - log(f"处理成功: {v.get('file_name', 'unknown')} ({success_count}/{len(listData)})") - else: - fail_count += 1 - log(f"处理失败: {v.get('file_name', 'unknown')}, 错误: {error_msg} ({fail_count}/{len(listData)})") - # 如果任何一个任务失败,记录错误但继续处理其他任务 - except Exception as e: - fail_count += 1 - error_msg = f"处理数据项时发生异常: {str(e)}" - log(f"处理异常: {v.get('file_name', 'unknown')}, 错误: {error_msg} ({fail_count}/{len(listData)})") + for future in as_completed(future_to_item): + v = future_to_item[future] + try: + success, error_msg = future.result() + if success: + success_count += 1 + log(f"处理成功: {v.get('file_name', 'unknown')} ({success_count}/{len(listData)})") + else: + fail_count += 1 + log(f"处理失败: {v.get('file_name', 'unknown')}, 错误: {error_msg} ({fail_count}/{len(listData)})") + # 如果任何一个任务失败,记录错误但继续处理其他任务 + except KeyboardInterrupt: + # 收到中断信号,取消所有未完成的任务 + log(f"收到中断信号,正在取消未完成的任务...") + for f in future_to_item: + f.cancel() + raise # 重新抛出,让外层捕获 + except Exception as e: + fail_count += 1 + error_msg = f"处理数据项时发生异常: {str(e)}" + log(f"处理异常: {v.get('file_name', 'unknown')}, 错误: {error_msg} ({fail_count}/{len(listData)})") + except KeyboardInterrupt: + # 收到中断信号,取消所有未完成的任务 + log(f"收到中断信号,正在取消未完成的任务...") + for f in future_to_item: + f.cancel() + # 尝试等待正在执行的任务完成(非阻塞方式) + import concurrent.futures + for f in list(future_to_item.keys()): + if not f.done(): + try: + # 尝试获取结果,如果任务还在执行则立即返回 + f.result(timeout=0.1) + except (concurrent.futures.TimeoutError, concurrent.futures.CancelledError): + pass + except: + pass + raise # 重新抛出,让外层捕获 - endTime = time.time() - log(f"多线程处理完成, 总耗时{endTime - beginTime}秒, 成功:{success_count}, 失败:{fail_count}, 总计:{len(listData)}") + endTime = time.time() + log(f"多线程处理完成, 总耗时{endTime - beginTime}秒, 成功:{success_count}, 失败:{fail_count}, 总计:{len(listData)}") - # 如果有任何失败,返回 False - if fail_count > 0: - log(f"部分任务处理失败,共{fail_count}个失败") - return False + # 如果有任何失败,返回 False + if fail_count > 0: + log(f"部分任务处理失败,共{fail_count}个失败") + return False - return True + return True + except KeyboardInterrupt: + log(f"多线程处理被中断") + raise # 重新抛出,让调用者处理 def findBpyModule():