代码优化
This commit is contained in:
parent
328311d723
commit
862e1f56dd
20
main.py
20
main.py
|
|
@ -8,7 +8,7 @@ from typing import Dict, Any, Optional, List
|
||||||
from fastapi import FastAPI, HTTPException, Depends, Header
|
from fastapi import FastAPI, HTTPException, Depends, Header
|
||||||
from public_function.auth import verify_tk_token
|
from public_function.auth import verify_tk_token
|
||||||
from public_function.public_func import read_config, create_logger
|
from public_function.public_func import read_config, create_logger
|
||||||
from model.model import GoodsInfo, DataReceive, AccountCreate, AccountUpdate, TokenItem, CrawlerItem
|
from model.model import GoodsInfo, DataReceive, AccountCreate, AccountUpdate, TokenItem, CrawlerItem, ResetTask
|
||||||
|
|
||||||
app = FastAPI()
|
app = FastAPI()
|
||||||
app.middleware("http")(verify_tk_token)
|
app.middleware("http")(verify_tk_token)
|
||||||
|
|
@ -185,7 +185,7 @@ async def get_goods_info(task_data: GoodsInfo, task_manager: Any = Depends(get_t
|
||||||
raise HTTPException(status_code=503, detail="抓取商品数据失败,请重新尝试")
|
raise HTTPException(status_code=503, detail="抓取商品数据失败,请重新尝试")
|
||||||
|
|
||||||
|
|
||||||
@app.post("/add_token", summary="设备重置")
|
@app.post("/add_token", summary="添加token")
|
||||||
async def add_token(task_data: TokenItem, task_manager: Any = Depends(get_task_manager)):
|
async def add_token(task_data: TokenItem, task_manager: Any = Depends(get_task_manager)):
|
||||||
"""设备重置接口"""
|
"""设备重置接口"""
|
||||||
try:
|
try:
|
||||||
|
|
@ -202,5 +202,21 @@ async def add_token(task_data: TokenItem, task_manager: Any = Depends(get_task_m
|
||||||
raise HTTPException(status_code=500, detail=f"新增token失败;失败原因{e}")
|
raise HTTPException(status_code=500, detail=f"新增token失败;失败原因{e}")
|
||||||
|
|
||||||
|
|
||||||
|
@app.post("/reset_task", summary="重置任务")
|
||||||
|
async def reset_task(task_data: ResetTask, task_manager: Any = Depends(get_task_manager)):
|
||||||
|
# 任务上报失败,需要对任务进行重置
|
||||||
|
try:
|
||||||
|
params = task_data.model_dump()
|
||||||
|
result = await task_manager.deal_reset_task(params)
|
||||||
|
if result:
|
||||||
|
print(f"任务:{params.get("task_id")} 重启成功")
|
||||||
|
return {"code": 200, "message": f"任务:{params.get("task_id")} 重启成功", }
|
||||||
|
else:
|
||||||
|
return {"code": 200, "message": f"没有查询到相关任务:{params.get("task_id")}"}, 404
|
||||||
|
except Exception as e:
|
||||||
|
print(f"新增token:{params.get("token")} 失败,失败原因: {e}")
|
||||||
|
raise HTTPException(status_code=404, detail=f"任务:{params.get("task_id")} 重启失败")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
uvicorn.run(app, host="0.0.0.0", port=8000)
|
uvicorn.run(app, host="0.0.0.0", port=8000)
|
||||||
|
|
|
||||||
|
|
@ -39,3 +39,7 @@ class TokenItem(BaseModel):
|
||||||
class CrawlerItem(BaseModel):
|
class CrawlerItem(BaseModel):
|
||||||
region: str = Field(..., description="账号所在地区")
|
region: str = Field(..., description="账号所在地区")
|
||||||
app_name: str = Field(..., description="应用名称")
|
app_name: str = Field(..., description="应用名称")
|
||||||
|
|
||||||
|
|
||||||
|
class ResetTask(BaseModel):
|
||||||
|
task_id: str = Field(..., description="任务ID")
|
||||||
|
|
|
||||||
|
|
@ -98,7 +98,7 @@ class AllTask:
|
||||||
|
|
||||||
async def deal_receive_data(self, data: Dict[str, Any]):
|
async def deal_receive_data(self, data: Dict[str, Any]):
|
||||||
# 将商品数据写入mysql和redis
|
# 将商品数据写入mysql和redis
|
||||||
print("开始处理接收到得数据:{}".format(data))
|
print("开始处理接收到得数据:{}".format(data.get("task_id")))
|
||||||
await self.db_pool.initialize()
|
await self.db_pool.initialize()
|
||||||
key = f"{data['app_name'].lower()}:{data['region'].lower()}:{data['shop_id']}:{data['item_id']}"
|
key = f"{data['app_name'].lower()}:{data['region'].lower()}:{data['shop_id']}:{data['item_id']}"
|
||||||
params = data.get("goods_info")
|
params = data.get("goods_info")
|
||||||
|
|
@ -107,7 +107,6 @@ class AllTask:
|
||||||
print(f"{key};数据已存入redis中")
|
print(f"{key};数据已存入redis中")
|
||||||
params_data = {"item_id": data["item_id"], "shop_id": data["shop_id"], "region": data["region"], "app_name": data["app_name"],
|
params_data = {"item_id": data["item_id"], "shop_id": data["shop_id"], "region": data["region"], "app_name": data["app_name"],
|
||||||
"goods_info": json.dumps(params)}
|
"goods_info": json.dumps(params)}
|
||||||
print(f"{params}<")
|
|
||||||
try:
|
try:
|
||||||
affected_rows = await self.db_pool.insert_many(table='goods_information_record', data=[params_data])
|
affected_rows = await self.db_pool.insert_many(table='goods_information_record', data=[params_data])
|
||||||
if affected_rows:
|
if affected_rows:
|
||||||
|
|
@ -123,3 +122,27 @@ class AllTask:
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"{data['task_id']} 任务状态修改失败,失败原因为:{e}")
|
print(f"{data['task_id']} 任务状态修改失败,失败原因为:{e}")
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
async def deal_reset_task(self, data: Dict[str, Any]):
|
||||||
|
print("<UNK>{}".format(data))
|
||||||
|
await self.db_pool.initialize()
|
||||||
|
sql_str = f"""select task_id,app_name,shop_id,item_id,region,token from crawler_task_record_info
|
||||||
|
where task_id='{data['task_id']}' and status=4"""
|
||||||
|
param = await self.db_pool.fetch_all(sql_str, )
|
||||||
|
if param:
|
||||||
|
if isinstance(param, list):
|
||||||
|
param = param[0]
|
||||||
|
print(f"获取到对应任:{data['task_id']}务详情")
|
||||||
|
# # 将任务提交到redis队列
|
||||||
|
self.redis_conn.add_task_to_set(task_data=param)
|
||||||
|
# 修改状态为3
|
||||||
|
try:
|
||||||
|
# 更改任务状态
|
||||||
|
params = {"task_id": data["task_id"], "status": 3}
|
||||||
|
result = await self.update_task_record(params)
|
||||||
|
if result:
|
||||||
|
print(f"task_id{data['task_id']} 任务完成,任务状态已修改成功")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"{data['task_id']} 任务状态修改失败,失败原因为:{e}")
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue