14 changed files with 462 additions and 53 deletions
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -0,0 +1,370 @@
@@ -0,0 +1,370 @@
|
||||
# mysql数据库常用任务函数封装 |
||||
import pymysql, socket, time |
||||
import platform,sys |
||||
if platform.system() == 'Windows': |
||||
#sys.path.append('e:\\libs\\') |
||||
sys.path.append('libs') |
||||
else: |
||||
sys.path.append('/data/deploy/make3d/make2/libs/') |
||||
import config |
||||
# import multiprocessing |
||||
import logging |
||||
# 创建互斥锁 |
||||
# lock = multiprocessing.Lock() |
||||
hostname = socket.gethostname() |
||||
|
||||
logging.basicConfig(filename='task_distributed_error.log', level=logging.ERROR) |
||||
#公共连接库 |
||||
def pymysqlAlias(): |
||||
return pymysql.connect( |
||||
host=config.mysql_local['host'], |
||||
port=config.mysql_local['port'], |
||||
user=config.mysql_local['user'], |
||||
password=config.mysql_local['password'], |
||||
db=config.mysql_local['db'], |
||||
charset=config.mysql_local['charset'],) |
||||
|
||||
|
||||
#获取任务 {"hostname":"XXX","run_step":"xxxx","task_distributed_id":"xxxx"} |
||||
def get_task_distributed(): |
||||
|
||||
try: |
||||
with pymysqlAlias() as conn: |
||||
cursor = conn.cursor(pymysql.cursors.DictCursor) |
||||
sql = '' |
||||
# 获取数据前先获取锁 |
||||
# lock.acquire() |
||||
#非R11 R12的主机 没有办法处理step1,所以不能做初始化的任务,只能做后续的任务 |
||||
if hostname != "R11" and hostname != "R12" and hostname != "XJB-20220906FLC": |
||||
sql = f'select * from task_distributed where status = 1 order by priority desc,created_at asc limit 1 for update' |
||||
cursor.execute(sql) |
||||
result = cursor.fetchone() |
||||
#print("查询主任务表",result) |
||||
if result: |
||||
#获取需要执行的步骤 |
||||
next_step = need_run_stepx(result["id"]) |
||||
if next_step == "no" or next_step == "error": |
||||
print("获取需要执行的步骤 next_step",next_step) |
||||
return next_step |
||||
taskData = {"hostname":hostname,"run_step":next_step,"task_distributed_id":result["id"],"task_key":result["task_key"]} |
||||
flagRes = update_main_and_add_detail(taskData) |
||||
if flagRes == "error": |
||||
print(f'出现错误,有可能是多个进程获取同一个任务了') |
||||
return "error" |
||||
print(f'任务ID-{taskData["task_key"]}- "执行{taskData["run_step"]}" ') |
||||
return taskData |
||||
else: |
||||
return 'no_data' |
||||
else: |
||||
|
||||
#R11 R12的主机 可以执行step1 2 3 的任务 |
||||
#如果R11 R12的主机目前没有正在执行step2,则优先处理step2, |
||||
# print("次数",is_run_stepx_nums("step2")) |
||||
if is_run_stepx_nums("step2") < 2: |
||||
resultData = need_run_step2() |
||||
if resultData != "no": |
||||
resultData["hostname"] = hostname |
||||
flagRes = update_main_and_add_detail(resultData) |
||||
if flagRes == "error": |
||||
print(f'出现错误,有可能是多个进程获取同一个任务了,重新获取任务去执行了') |
||||
return "error" |
||||
print(f'任务ID-{resultData["task_key"]}- "执行step2" ') |
||||
return resultData |
||||
|
||||
#R11 R12的主机如果已经有在处理step2了,则不能再处理step2,只能处理step1 step3 |
||||
resultData = need_run_step_no_step2() |
||||
#print("resultData",resultData) |
||||
if resultData == "no": |
||||
return "no" |
||||
|
||||
resultData["hostname"] = hostname |
||||
flagRes = update_main_and_add_detail(resultData) |
||||
if flagRes == "error": |
||||
print(f'出现错误,有可能是多个进程获取同一个任务了') |
||||
return "error" |
||||
print(f'任务ID-{resultData["task_key"]}- "执行{resultData["run_step"]}" ') |
||||
return resultData |
||||
|
||||
#sql = f'select * from task_distributed where status != 2 order by priority desc,created_at asc limit 1' |
||||
# print(f'sql: {sql}') |
||||
|
||||
except Exception as e: |
||||
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行get_task_distributed({hostname})异常: {str(e)}") |
||||
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行get_task_distributed({hostname})异常: {str(e)}") |
||||
return 'error' |
||||
|
||||
# finally: |
||||
# # 释放锁 |
||||
# lock.release() |
||||
|
||||
#查询当前主机有没有正在执行某个任务 |
||||
def is_run_stepx(step): |
||||
try: |
||||
with pymysqlAlias() as conn: |
||||
cursor = conn.cursor(pymysql.cursors.DictCursor) |
||||
sql = f'select * from task_distributed_detail where hostname = "{hostname}" and step = "{step}" and finished_at is null for update' |
||||
# print(f'sql: {sql}') |
||||
cursor.execute(sql) |
||||
result = cursor.fetchone() |
||||
if result: |
||||
return "yes" |
||||
else: |
||||
return "no" |
||||
except Exception as e: |
||||
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行is_run_stepx({hostname},{step})异常: {str(e)}") |
||||
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行is_run_stepx({hostname},{step})异常: {str(e)}") |
||||
return "error" |
||||
|
||||
|
||||
#查询当前主机某个步骤正在执行的数量 |
||||
def is_run_stepx_nums(step): |
||||
try: |
||||
with pymysqlAlias() as conn: |
||||
cursor = conn.cursor(pymysql.cursors.DictCursor) |
||||
sql = f'select count(*) as nums from task_distributed_detail where hostname = "{hostname}" and step = "{step}" and finished_at is null for update' |
||||
# print(f'sql: {sql}') |
||||
cursor.execute(sql) |
||||
result = cursor.fetchone() |
||||
return result["nums"] |
||||
except Exception as e: |
||||
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行is_run_stepx_nums({hostname},{step})异常: {str(e)}") |
||||
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行is_run_stepx_nums({hostname},{step})异常: {str(e)}") |
||||
return "error" |
||||
|
||||
|
||||
#查询指定任务需要执行哪个步骤 "error" 表示异常 "no"表示没有任务需要执行 "step1"表示需要执行step1 "step2"表示需要执行step2 "step3"表示需要执行step3 |
||||
def need_run_stepx(task_distributed_id): |
||||
try: |
||||
with pymysqlAlias() as conn: |
||||
cursor = conn.cursor(pymysql.cursors.DictCursor) |
||||
#查询task_distributed_id 对应的子任务是否正在执行, |
||||
sql = f'select * from task_distributed_detail where hostname = "{hostname}" and task_distributed_id = "{task_distributed_id}" order by id desc limit 1 for update' |
||||
# print(f'sql: {sql}') |
||||
cursor.execute(sql) |
||||
result = cursor.fetchone() |
||||
#如果一个子任务都没有,说明该任务还没有开始执行,需要执行step1 |
||||
if result is None: |
||||
return "step1" |
||||
if result and result["finished_at"] is None: |
||||
#该任务正在运行中不需要执行下一步 |
||||
return "no" |
||||
#查询改任务的最后一个状态 |
||||
if result and result["finished_at"]: |
||||
if result["step"] == "step1": |
||||
return "step2" |
||||
elif result["step"] == "step2": |
||||
return "step3" |
||||
elif result["step"] == "step3": |
||||
#这里要将 主任务表的状态改为2,finished_at改为当前时间 |
||||
update_task_distributed({"id":task_distributed_id,"status":2,"finished_at":time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()),"step_last":"step3"}) |
||||
return "no" |
||||
return "no" |
||||
except Exception as e: |
||||
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_stepx({hostname},{task_distributed_id})异常: {str(e)}") |
||||
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_stepx({hostname},{task_distributed_id})异常: {str(e)}") |
||||
return 'error' |
||||
|
||||
#查询出哪些任务需要执行step2 |
||||
def need_run_step2(): |
||||
try: |
||||
with pymysqlAlias() as conn: |
||||
cursor = conn.cursor(pymysql.cursors.DictCursor) |
||||
sql = f'select * from task_distributed where status = 1 and step_last = "step1" and finished_at is null order by priority desc,created_at asc for update' |
||||
# print(f'sql: {sql}') |
||||
cursor.execute(sql) |
||||
result = cursor.fetchall() |
||||
|
||||
#判断result的长度 |
||||
#判断是否有值 |
||||
if len(result) == 0: |
||||
return "no" |
||||
#遍历循环哪笔需要执行step2 |
||||
|
||||
for row in result: |
||||
#判断是否有正在执行的step2 |
||||
if need_run_stepx(row["id"]) == "step2": |
||||
#没有正在执行的step2,则返回该任务 |
||||
return {"hostname":hostname,"run_step":"step2","task_distributed_id":row["id"],"task_key":row["task_key"]} |
||||
return "no" |
||||
except Exception as e: |
||||
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_step2()异常: {str(e)}") |
||||
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_step2()异常: {str(e)}") |
||||
return "error" |
||||
|
||||
#查询出哪些任务需要执行指定的step |
||||
def need_run_appoint_step(step): |
||||
try: |
||||
with pymysqlAlias() as conn: |
||||
cursor = conn.cursor(pymysql.cursors.DictCursor) |
||||
sql = f'select * from task_distributed where finished_at is null order by priority desc,created_at asc for update' |
||||
# print(f'sql: {sql}') |
||||
cursor.execute(sql) |
||||
result = cursor.fetchall() |
||||
#判断是否有值 |
||||
if len(result) == 0: |
||||
return "no" |
||||
#遍历循环哪笔需要执行step2 |
||||
for row in result: |
||||
#判断是否有正在执行的step2 |
||||
if need_run_stepx(row["id"]) == step: |
||||
#没有正在执行的step2,则返回该任务 |
||||
return {"hostname":hostname,"run_step":step,"task_distributed_id":row["id"],"task_key":row["task_key"]} |
||||
return "no" |
||||
except Exception as e: |
||||
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_appoint_step()异常: {str(e)}") |
||||
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_appoint_step()异常: {str(e)}") |
||||
return 'error' |
||||
|
||||
#查询出哪些任务需要执行非step2 |
||||
def need_run_step_no_step2(): |
||||
try: |
||||
with pymysqlAlias() as conn: |
||||
cursor = conn.cursor(pymysql.cursors.DictCursor) |
||||
sql = f'select * from task_distributed where finished_at is null order by priority desc,created_at asc for update' |
||||
# print(f'sql: {sql}') |
||||
cursor.execute(sql) |
||||
result = cursor.fetchall() |
||||
#判断是否有值 |
||||
if len(result) == 0: |
||||
return "no" |
||||
#遍历循环哪笔需要执行step2 |
||||
|
||||
for row in result: |
||||
#判断是否有正在执行的step2 |
||||
xstep = need_run_stepx(row["id"]) |
||||
# print("查询非step2的任务列表",xstep) |
||||
if xstep != "step2" and (xstep == "step1" or xstep == "step3"): |
||||
#没有正在执行的step2,则返回该任务 |
||||
return {"hostname":hostname,"run_step":xstep,"task_distributed_id":row["id"],"task_key":row["task_key"]} |
||||
return "no" |
||||
except Exception as e: |
||||
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_step_no_step2()异常: {str(e)}") |
||||
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_step_no_step2()异常: {str(e)}") |
||||
return 'error' |
||||
|
||||
|
||||
# 在任务分布执行主表插入任务,所有的任务执行都从这里获取 |
||||
def add_task_distributed(data): |
||||
try: |
||||
with pymysqlAlias() as conn: |
||||
cursor = conn.cursor() |
||||
|
||||
sql = f'insert into task_distributed (task_type, task_key,created_at) values ("{data["task_type"]}", "{data["task_key"]}","{now()}")' |
||||
# print(f'sql: {sql}') |
||||
cursor.execute(sql) |
||||
|
||||
# 获取插入数据的自增ID |
||||
last_insert_id = cursor.lastrowid |
||||
conn.commit() |
||||
return last_insert_id |
||||
except Exception as e: |
||||
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行add_main_task({data})异常: {str(e)}") |
||||
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行add_main_task({data})异常: {str(e)}") |
||||
return "error" |
||||
|
||||
# 在task_distributed_detail插入明细步骤 |
||||
def add_task_distributed_detail(data): |
||||
try: |
||||
with pymysqlAlias() as conn: |
||||
cursor = conn.cursor() |
||||
|
||||
sql = f'insert into task_distributed_detail (task_distributed_id,step,hostname,started_at) values ("{data["task_distributed_id"]}", "{data["step"]}","{hostname}","{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}")' |
||||
# print(f'sql: {sql}') |
||||
cursor.execute(sql) |
||||
conn.commit() |
||||
return "ok" |
||||
except Exception as e: |
||||
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行add_task_distributed_detail({data})异常: {str(e)}") |
||||
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行add_task_distributed_detail({data})异常: {str(e)}") |
||||
return "error" |
||||
|
||||
# 更新 task_distributed 主表 |
||||
def update_task_distributed(data): |
||||
try: |
||||
with pymysqlAlias() as conn: |
||||
cursor = conn.cursor() |
||||
|
||||
sql = f'update task_distributed set ' |
||||
#判断要更新哪些字段 |
||||
if "status" in data: |
||||
sql += f'status = "{data["status"]}",' |
||||
|
||||
if "hostname" in data: |
||||
sql += f'hostname = "{data["hostname"]}",' |
||||
|
||||
if "step_last" in data: |
||||
sql += f'step_last = "{data["step_last"]}",' |
||||
|
||||
if "priority" in data: |
||||
sql += f'priority = "{data["priority"]}",' |
||||
|
||||
if "started_at" in data: |
||||
sql += f'started_at = "{data["started_at"]}",' |
||||
|
||||
if "finished_at" in data: |
||||
sql += f'finished_at = "{data["finished_at"]}",' |
||||
|
||||
|
||||
#去掉 sql 最右边的逗号 |
||||
sql = sql.rstrip(',') |
||||
|
||||
|
||||
sql += f' where 1=1 ' |
||||
#条件要放在最后面 |
||||
if "id" in data: |
||||
sql += f' and id = "{data["id"]}"' |
||||
|
||||
if "task_key" in data: |
||||
sql += f' and task_type = "{data["task_key"]}" and status != 2' |
||||
|
||||
#sql = f'update task_distributed set status = "{data["status"]}",updated_at = "{now()}" where id = "{data["id"]}"' |
||||
# print(f'sql: {sql}') |
||||
cursor.execute(sql) |
||||
conn.commit() |
||||
except Exception as e: |
||||
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行update_task_distributed({data})异常: {str(e)}") |
||||
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行update_task_distributed({data})异常: {str(e)}") |
||||
return "error" |
||||
|
||||
# 更新 task_distributed_detail 主表 |
||||
def update_task_distributed_detail(data): |
||||
try: |
||||
with pymysqlAlias() as conn: |
||||
cursor = conn.cursor() |
||||
|
||||
sql = f'update task_distributed_detail set ' |
||||
#判断要更新哪些字段 |
||||
if "finished_at" in data: |
||||
sql += f'finished_at = "{data["finished_at"]}"' |
||||
|
||||
if "step" in data: |
||||
sql += f',step = "{data["step"]}"' |
||||
|
||||
if "hostname" in data: |
||||
sql += f',hostname = "{data["hostname"]}"' |
||||
|
||||
#where 条件 |
||||
sql += f' where 1=1 ' |
||||
if "task_distributed_id" in data: |
||||
sql += f' and task_distributed_id = "{data["task_distributed_id"]}"' |
||||
|
||||
if "step" in data: |
||||
sql += f' and step = "{data["step"]}"' |
||||
|
||||
cursor.execute(sql) |
||||
conn.commit() |
||||
except Exception as e: |
||||
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行update_task_distributed_detail({data})异常: {str(e)}") |
||||
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行update_task_distributed_detail({data})异常: {str(e)}") |
||||
return "error" |
||||
|
||||
#更新主表和插入明细表的步骤 |
||||
def update_main_and_add_detail(data): |
||||
if data["run_step"] == "step1": |
||||
updateData = {"id":data['task_distributed_id'],"status":1,"step_last":data["run_step"],"started_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} |
||||
else: |
||||
updateData = {"id":data['task_distributed_id'],"step_last":data["run_step"]} |
||||
#更新主表 |
||||
update_task_distributed(updateData) |
||||
#插入明细表数据 |
||||
return add_task_distributed_detail({"task_distributed_id":data['task_distributed_id'],"step":data["run_step"],"hostname":hostname,"started_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}) |
||||
@ -0,0 +1,46 @@
@@ -0,0 +1,46 @@
|
||||
import sys,socket,time |
||||
sys.path.append('logic') |
||||
import logic_main_service |
||||
import logging |
||||
import main_step1,main_step2,main_step3 |
||||
if __name__ == '__main__': |
||||
#循环值守 |
||||
while True: |
||||
data = logic_main_service.get_task_distributed() |
||||
#判断data数据类型 |
||||
if isinstance(data, str): |
||||
print("没有可执行的任务 sleep 3s") |
||||
time.sleep(3) |
||||
continue |
||||
else: |
||||
if data["run_step"] == "step1": |
||||
# 本地测试分布运行的用 |
||||
# time.sleep(5) |
||||
# print("更新step1的结束时间",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) |
||||
# logic_main_service.update_task_distributed_detail({"task_distributed_id":data["task_distributed_id"],"step":"step1","finished_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}) |
||||
|
||||
#生产线上用 |
||||
main_step1.step1(data["task_key"], experience, makeloop=False,data['task_distributed_id']) |
||||
elif data["run_step"] == "step2": |
||||
# 本地测试分布运行的用 |
||||
# time.sleep(15) |
||||
# print("更新step2的结束时间",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) |
||||
# logic_main_service.update_task_distributed_detail({"task_distributed_id":data["task_distributed_id"],"step":"step2","finished_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}) |
||||
|
||||
#生产线上用 |
||||
main_step2.step2(data["task_key"], data['task_distributed_id']) |
||||
elif data["run_step"] == "step3": |
||||
# 本地测试分布运行的用 |
||||
# time.sleep(8) |
||||
# #更新子表的finished_at |
||||
# print("更新step3的结束时间",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) |
||||
# logic_main_service.update_task_distributed_detail({"task_distributed_id":data["task_distributed_id"],"step":"step3","finished_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}) |
||||
#更新主表的status 和 finished_at |
||||
#logic_main_service.update_task_distributed({"id":data["task_distributed_id"],"status":2,"finished_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}) |
||||
|
||||
#生产线上用 |
||||
main_step3.step3(data["task_key"], data['task_distributed_id']) |
||||
|
||||
|
||||
|
||||
|
||||
@ -1,30 +0,0 @@
@@ -1,30 +0,0 @@
|
||||
import sys |
||||
from PyQt5.QtCore import QUrl |
||||
from PyQt5.QtWidgets import QApplication, QMainWindow, QVBoxLayout, QWidget |
||||
from PyQt5.QtWebEngineWidgets import QWebEngineView |
||||
|
||||
class WebBrowserWindow(QMainWindow): |
||||
def __init__(self): |
||||
super().__init__() |
||||
|
||||
self.browser = QWebEngineView() |
||||
self.browser.setUrl(QUrl("https://www.qq.com")) # 设置要打开的网页 |
||||
|
||||
layout = QVBoxLayout() |
||||
layout.addWidget(self.browser) |
||||
|
||||
central_widget = QWidget() |
||||
central_widget.setLayout(layout) |
||||
|
||||
self.setCentralWidget(central_widget) |
||||
self.setWindowTitle("Web Browser") |
||||
self.setGeometry(100, 100, 800, 600) |
||||
|
||||
def main(): |
||||
app = QApplication(sys.argv) |
||||
window = WebBrowserWindow() |
||||
window.show() |
||||
sys.exit(app.exec_()) |
||||
|
||||
if __name__ == "__main__": |
||||
main() |
||||
Loading…
Reference in new issue