diff --git a/main.py b/main.py index e92a551..c6c00ee 100644 --- a/main.py +++ b/main.py @@ -8,7 +8,7 @@ from typing import Dict, Any, Optional, List from fastapi import FastAPI, HTTPException, Depends, Header from public_function.auth import verify_tk_token from public_function.public_func import read_config, create_logger -from model.model import GoodsInfo, DataReceive, AccountCreate, AccountUpdate, TokenItem, CrawlerItem +from model.model import GoodsInfo, DataReceive, AccountCreate, AccountUpdate, TokenItem, CrawlerItem, ResetTask app = FastAPI() app.middleware("http")(verify_tk_token) @@ -185,7 +185,7 @@ async def get_goods_info(task_data: GoodsInfo, task_manager: Any = Depends(get_t raise HTTPException(status_code=503, detail="抓取商品数据失败,请重新尝试") -@app.post("/add_token", summary="设备重置") +@app.post("/add_token", summary="添加token") async def add_token(task_data: TokenItem, task_manager: Any = Depends(get_task_manager)): """设备重置接口""" try: @@ -202,5 +202,21 @@ async def add_token(task_data: TokenItem, task_manager: Any = Depends(get_task_m raise HTTPException(status_code=500, detail=f"新增token失败;失败原因{e}") +@app.post("/reset_task", summary="重置任务") +async def reset_task(task_data: ResetTask, task_manager: Any = Depends(get_task_manager)): + # 任务上报失败,需要对任务进行重置 + try: + params = task_data.model_dump() + result = await task_manager.deal_reset_task(params) + if result: + print(f"任务:{params.get("task_id")} 重启成功") + return {"code": 200, "message": f"任务:{params.get("task_id")} 重启成功", } + else: + return {"code": 200, "message": f"没有查询到相关任务:{params.get("task_id")}"}, 404 + except Exception as e: + print(f"新增token:{params.get("token")} 失败,失败原因: {e}") + raise HTTPException(status_code=404, detail=f"任务:{params.get("task_id")} 重启失败") + + if __name__ == '__main__': uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/model/model.py b/model/model.py index 777c2ba..d6d75d5 100644 --- a/model/model.py +++ b/model/model.py @@ -39,3 +39,7 @@ class TokenItem(BaseModel): class CrawlerItem(BaseModel): region: str = Field(..., description="账号所在地区") app_name: str = Field(..., description="应用名称") + + +class ResetTask(BaseModel): + task_id: str = Field(..., description="任务ID") diff --git a/task_management/all_task_management.py b/task_management/all_task_management.py index c0a2c37..051e259 100644 --- a/task_management/all_task_management.py +++ b/task_management/all_task_management.py @@ -98,7 +98,7 @@ class AllTask: async def deal_receive_data(self, data: Dict[str, Any]): # 将商品数据写入mysql和redis - print("开始处理接收到得数据:{}".format(data)) + print("开始处理接收到得数据:{}".format(data.get("task_id"))) await self.db_pool.initialize() key = f"{data['app_name'].lower()}:{data['region'].lower()}:{data['shop_id']}:{data['item_id']}" params = data.get("goods_info") @@ -107,7 +107,6 @@ class AllTask: print(f"{key};数据已存入redis中") params_data = {"item_id": data["item_id"], "shop_id": data["shop_id"], "region": data["region"], "app_name": data["app_name"], "goods_info": json.dumps(params)} - print(f"{params}<") try: affected_rows = await self.db_pool.insert_many(table='goods_information_record', data=[params_data]) if affected_rows: @@ -123,3 +122,27 @@ class AllTask: except Exception as e: print(f"{data['task_id']} 任务状态修改失败,失败原因为:{e}") return True + + async def deal_reset_task(self, data: Dict[str, Any]): + print("{}".format(data)) + await self.db_pool.initialize() + sql_str = f"""select task_id,app_name,shop_id,item_id,region,token from crawler_task_record_info + where task_id='{data['task_id']}' and status=4""" + param = await self.db_pool.fetch_all(sql_str, ) + if param: + if isinstance(param, list): + param = param[0] + print(f"获取到对应任:{data['task_id']}务详情") + # # 将任务提交到redis队列 + self.redis_conn.add_task_to_set(task_data=param) + # 修改状态为3 + try: + # 更改任务状态 + params = {"task_id": data["task_id"], "status": 3} + result = await self.update_task_record(params) + if result: + print(f"task_id{data['task_id']} 任务完成,任务状态已修改成功") + except Exception as e: + print(f"{data['task_id']} 任务状态修改失败,失败原因为:{e}") + return True + return False