Browse Source

多线程终止处理

master
dongchangxi 3 weeks ago
parent
commit
6578f4732f
  1. 89
      script/factory_sliceing_v2/main.py
  2. 90
      script/factory_sliceing_v2/utils/funcs.py

89
script/factory_sliceing_v2/main.py

@ -72,46 +72,55 @@ def main(work_dir=None): @@ -72,46 +72,55 @@ def main(work_dir=None):
exit(0)
# 循环处理,直到队列为空
while True:
r = redisClient()
#检测队列是否有值
if r.scard('pb:sliceing') == 0:
print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 队列为空,等待10秒')
time.sleep(10)
continue
#获取队列中的值
data = r.spop('pb:sliceing')
if data is None:
print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 取出的数据为空,等待10秒')
time.sleep(10)
continue
data = data.decode('utf-8')
#判断是否是数字
if not data.isdigit():
print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 取出的数据不是数字,等待10秒')
time.sleep(10)
continue
versionId = str(data)
print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 正在处理版次ID={versionId}')
res = step1(versionId)
if res == False:
print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} JSON文件下载数据失败,等待10秒')
time.sleep(10)
continue
# 在长时间操作后,确保 Redis 连接仍然有效
# 通过重新获取客户端来触发连接检查
try:
r = redisClient()
r.ping() # 测试连接
except Exception as e:
print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} Redis连接检查失败: {str(e)},将在下次循环时自动重连')
#time.sleep(10)
try:
while True:
try:
r = redisClient()
#检测队列是否有值
if r.scard('pb:sliceing') == 0:
print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 队列为空,等待10秒')
time.sleep(10)
continue
#获取队列中的值
data = r.spop('pb:sliceing')
if data is None:
print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 取出的数据为空,等待10秒')
time.sleep(10)
continue
data = data.decode('utf-8')
#判断是否是数字
if not data.isdigit():
print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 取出的数据不是数字,等待10秒')
time.sleep(10)
continue
versionId = str(data)
print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 正在处理版次ID={versionId}')
res = step1(versionId)
if res == False:
print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} JSON文件下载数据失败,等待10秒')
time.sleep(10)
continue
# 在长时间操作后,确保 Redis 连接仍然有效
# 通过重新获取客户端来触发连接检查
try:
r = redisClient()
r.ping() # 测试连接
except Exception as e:
print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} Redis连接检查失败: {str(e)},将在下次循环时自动重连')
#time.sleep(10)
except KeyboardInterrupt:
# 在循环内部捕获 KeyboardInterrupt,允许在 sleep 或操作中被中断
raise # 重新抛出,让外层捕获
except KeyboardInterrupt:
print(f'\n{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 收到中断信号,正在优雅退出...')
print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 程序已停止')
sys.exit(0)
def testMain():

90
script/factory_sliceing_v2/utils/funcs.py

@ -438,41 +438,69 @@ def downloadDataByOssAndTransformSave(dirNewName, isSmallMachine=False, max_work @@ -438,41 +438,69 @@ def downloadDataByOssAndTransformSave(dirNewName, isSmallMachine=False, max_work
beginTime = time.time()
# 使用线程池并发处理
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交所有任务
future_to_item = {
executor.submit(_process_single_item, v, arrDownloadPath, dirNewName, dirPath, isSmallMachine): v
for v in listData
}
# 收集结果
success_count = 0
fail_count = 0
for future in as_completed(future_to_item):
v = future_to_item[future]
try:
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交所有任务
future_to_item = {
executor.submit(_process_single_item, v, arrDownloadPath, dirNewName, dirPath, isSmallMachine): v
for v in listData
}
# 收集结果
success_count = 0
fail_count = 0
try:
success, error_msg = future.result()
if success:
success_count += 1
log(f"处理成功: {v.get('file_name', 'unknown')} ({success_count}/{len(listData)})")
else:
fail_count += 1
log(f"处理失败: {v.get('file_name', 'unknown')}, 错误: {error_msg} ({fail_count}/{len(listData)})")
# 如果任何一个任务失败,记录错误但继续处理其他任务
except Exception as e:
fail_count += 1
error_msg = f"处理数据项时发生异常: {str(e)}"
log(f"处理异常: {v.get('file_name', 'unknown')}, 错误: {error_msg} ({fail_count}/{len(listData)})")
for future in as_completed(future_to_item):
v = future_to_item[future]
try:
success, error_msg = future.result()
if success:
success_count += 1
log(f"处理成功: {v.get('file_name', 'unknown')} ({success_count}/{len(listData)})")
else:
fail_count += 1
log(f"处理失败: {v.get('file_name', 'unknown')}, 错误: {error_msg} ({fail_count}/{len(listData)})")
# 如果任何一个任务失败,记录错误但继续处理其他任务
except KeyboardInterrupt:
# 收到中断信号,取消所有未完成的任务
log(f"收到中断信号,正在取消未完成的任务...")
for f in future_to_item:
f.cancel()
raise # 重新抛出,让外层捕获
except Exception as e:
fail_count += 1
error_msg = f"处理数据项时发生异常: {str(e)}"
log(f"处理异常: {v.get('file_name', 'unknown')}, 错误: {error_msg} ({fail_count}/{len(listData)})")
except KeyboardInterrupt:
# 收到中断信号,取消所有未完成的任务
log(f"收到中断信号,正在取消未完成的任务...")
for f in future_to_item:
f.cancel()
# 尝试等待正在执行的任务完成(非阻塞方式)
import concurrent.futures
for f in list(future_to_item.keys()):
if not f.done():
try:
# 尝试获取结果,如果任务还在执行则立即返回
f.result(timeout=0.1)
except (concurrent.futures.TimeoutError, concurrent.futures.CancelledError):
pass
except:
pass
raise # 重新抛出,让外层捕获
endTime = time.time()
log(f"多线程处理完成, 总耗时{endTime - beginTime}秒, 成功:{success_count}, 失败:{fail_count}, 总计:{len(listData)}")
endTime = time.time()
log(f"多线程处理完成, 总耗时{endTime - beginTime}秒, 成功:{success_count}, 失败:{fail_count}, 总计:{len(listData)}")
# 如果有任何失败,返回 False
if fail_count > 0:
log(f"部分任务处理失败,共{fail_count}个失败")
return False
# 如果有任何失败,返回 False
if fail_count > 0:
log(f"部分任务处理失败,共{fail_count}个失败")
return False
return True
return True
except KeyboardInterrupt:
log(f"多线程处理被中断")
raise # 重新抛出,让调用者处理
def findBpyModule():

Loading…
Cancel
Save