From 93e016b8d6803c5fa86574616f437ee7622263c7 Mon Sep 17 00:00:00 2001 From: liujianjiang Date: Thu, 27 Nov 2025 14:57:57 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main.py | 15 +++++----- model/model.py | 3 +- task_management/all_task_management.py | 40 +++++++++++++------------- 3 files changed, 29 insertions(+), 29 deletions(-) diff --git a/main.py b/main.py index 7f8a0e9..8d859e5 100644 --- a/main.py +++ b/main.py @@ -8,7 +8,7 @@ from typing import Dict, Any, Optional, List from fastapi import FastAPI, HTTPException, Depends from public_function.public_func import read_config -from model.model import CrawlerTask, DataReceive +from model.model import GoodsInfo, DataReceive from model.model import AccountCreate, AccountUpdate, DeviceResetData app = FastAPI() @@ -129,23 +129,22 @@ async def receive_data(task_data: DataReceive, task_manager: Any = Depends(get_t raise HTTPException(status_code=500, detail="获取数据失败;失败原因{}".format(e)) -@app.get("/crawler_task", summary="爬虫任务调用接口") -async def crawler_task(task_data: CrawlerTask, task_manager: Any = Depends(get_task_manager)): - """爬虫任务接口""" +@app.post("/get_goods_info", summary="客户端获取商品数据") +async def get_goods_info(task_data: GoodsInfo, task_manager: Any = Depends(get_task_manager)): + """客户端获取商品数据接口""" try: params = task_data.model_dump() - params['task_id'] = uuid.uuid4().hex result = await task_manager.task_distribution(params) if result: - return {"code": 200, "message": "", "data": result} + return {"code": 200, "message": "数据获取成功", "data": result} raise HTTPException(status_code=404, detail="抓取商品数据失败,请重新尝试") except Exception as e: print(f": {e}") raise HTTPException(status_code=500, detail="获取数据失败;失败原因{}".format(e)) -# @app.get("/device_reset", summary="爬虫任务调用接口") -# async def device_reset(task_data: CrawlerTask, reset_manager: Any = Depends(get_reset_manager)): +# @app.get("/device_reset", summary="设备重置") +# async def device_reset(task_data: GoodsInfo, reset_manager: Any = Depends(get_reset_manager)): # """设备重置接口""" # try: # params = task_data.model_dump() diff --git a/model/model.py b/model/model.py index 4724962..965e744 100644 --- a/model/model.py +++ b/model/model.py @@ -17,11 +17,12 @@ class AccountUpdate(BaseModel): status: int = Field(..., ge=1, le=10, description="状态:1-空闲,2-使用中,3-暂停使用(后续还能使用),4-账号已无法使用") -class CrawlerTask(BaseModel): +class GoodsInfo(BaseModel): country: str = Field(..., min_length=1, max_length=128, description="账号所在国家") app_name: str = Field(..., min_length=1, max_length=128, description="应用名称") goods_id: str = Field(..., min_length=1, max_length=128, description="商品ID") store_id: str = Field(..., min_length=1, max_length=128, description="店铺ID") + is_re_crawl: str = Field(..., min_length=0, max_length=128, description="是否重新抓取") class DataReceive(BaseModel): diff --git a/task_management/all_task_management.py b/task_management/all_task_management.py index 0929f7f..a10f628 100644 --- a/task_management/all_task_management.py +++ b/task_management/all_task_management.py @@ -15,25 +15,39 @@ class AllTask: async def deal_shopee_task(self, param): # 查询redis数据库,redis 数据库存在该数据直接返回 - key = f"{param['app_name']}:{param['store_id']}:{param['goods_id']}" - result = self.redis_conn.read_data(key) - if result: - return result + key = f"{param['store_id']}:{param['goods_id']}" + if not param.get('is_re_crawl', False): + print(f"{key} 开始在redis中获取数据") + result = self.redis_conn.read_data(key) + if result: + return result # 调用对应爬虫任务 + print(f"{key} 在redis中获取数据失败,开始调用爬虫抓取数据") # 任务结束后开始等待 + result = dict() endtime = time.time() + 55 while time.time() < endtime: await time.sleep(5) result = self.redis_conn.read_data(key) if result: + print(f"{key} 调用爬虫抓取数据后在redis获取到数据") return result - return [] + return result + + async def task_distribution(self, data: Dict[str, Any]): + result = dict() + if data["app_name"] == "Shopee": + try: + result = await self.deal_shopee_task(data) + except Exception as e: + print(f"{data['task_id']} 获取商品数据失败,失败原因为:{e}") + return result async def deal_receive_data(self, data: Dict[str, Any]): # 将商品数据写入mysql和redis print("开始处理接收到得数据:{}".format(data)) await self.db_pool.initialize() - key = f"{data['app_name']}:{data['store_id']}:{data['goods_id']}" + key = f"{data['store_id']}:{data['goods_id']}" params = data.get("goods_info") affected_rows = self.redis_conn.write_data(key, params) if affected_rows: @@ -52,17 +66,3 @@ class AllTask: if affected_rows: print(f"task_id{data['task_id']} 任务完成,任务状态已修改成功") return True - - async def task_distribution(self, data: Dict[str, Any]): - # 需要对任务进行记录 - try: - await self.db_pool.initialize() - # 将任务记录到mysql - param = {"app_name": data["app_name"], "task_id": data["task_id"], "country": data["country"]} - await self.db_pool.insert_many(table="crawler_task_record_info", data=param) - except Exception as e: - print("将任务记录到数据库失败,失败原因为:{}".format(e)) - result = [] - if param["app_name"] == "Shopee": - result = await self.deal_shopee_task(data) - return result