From 7de0c8c265ecb41c9f11dbfe11e1f53d746255b9 Mon Sep 17 00:00:00 2001 From: liujianjiang Date: Fri, 28 Nov 2025 18:32:41 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- crawler_management/crawler_management.py | 16 ---------------- main.py | 17 +++-------------- public_function/table_log.sql | 2 +- task_management/all_task_management.py | 9 +++++++++ 4 files changed, 13 insertions(+), 31 deletions(-) delete mode 100644 crawler_management/crawler_management.py diff --git a/crawler_management/crawler_management.py b/crawler_management/crawler_management.py deleted file mode 100644 index 72b6e72..0000000 --- a/crawler_management/crawler_management.py +++ /dev/null @@ -1,16 +0,0 @@ -# -*- coding: utf-8 -*- -from typing import Dict, Any -from public_function.redis_task_manager import RedisTaskManager - - -class CrawlerManagement: - def __init__(self, config_data: Dict[str, Any]): - self.config_data = config_data - self.redis_conn = RedisTaskManager(self.config_data) - - def get_task_item(self, task_data: Dict[str, Any]): - redis_key = f"{task_data["region"].lower()}_{task_data['app_name'].lower()}" - result = self.redis_conn.get_random_task_and_delete(redis_key) - if result: - return result - return None diff --git a/main.py b/main.py index b271c52..87256a6 100644 --- a/main.py +++ b/main.py @@ -38,17 +38,6 @@ def get_account_manager(): return None -def get_crawler_manager(): - """任务管理器实例""" - config = get_config() - try: - from crawler_management.crawler_management import CrawlerManagement - return CrawlerManagement(config) - except ImportError: - print(f"未找到AllTask类,返回模拟实例") - return None - - def get_task_manager(): """任务获取管理器实例""" config = get_config() @@ -136,7 +125,7 @@ async def update_account(account_data: AccountUpdate, account_manager: Any = Dep @app.post("/get_crawler_task", summary="获取任务") -async def get_crawler_task(task_data: CrawlerItem, task_manager: Any = Depends(get_crawler_manager)): +async def get_crawler_task(task_data: CrawlerItem, task_manager: Any = Depends(get_task_manager)): """ 获取指定应用的可用账号 - **app_name**: 应用名称 @@ -146,6 +135,8 @@ async def get_crawler_task(task_data: CrawlerItem, task_manager: Any = Depends(g params = task_data.model_dump() result = task_manager.get_task_item(params) if result: + data = {"task_id": result["task_id"], "status": 4} + await task_manager.update_task_record(data) return {"code": 200, "message": "任务获取成功", "data": result} raise HTTPException(status_code=404, detail="队列暂时没有任务,请等待一段时间后重新尝试") except Exception as e: @@ -190,8 +181,6 @@ async def get_goods_info(task_data: GoodsInfo, task_manager: Any = Depends(get_t raise HTTPException(status_code=503, detail="抓取商品数据失败,请重新尝试") except Exception as e: print(f"任务异常 - ID: {task_id}, 错误: {str(e)}") - data = {"task_id": params["task_id"], "status": 3} - await task_manager.update_task_record(data) raise HTTPException(status_code=503, detail="抓取商品数据失败,请重新尝试") diff --git a/public_function/table_log.sql b/public_function/table_log.sql index 2f898d9..b920150 100644 --- a/public_function/table_log.sql +++ b/public_function/table_log.sql @@ -7,7 +7,7 @@ create table crawler_task_record_info item_id VARCHAR(50) NOT NULL COMMENT '商品ID', region VARCHAR(50) NOT NULL COMMENT '客户所在地区', token VARCHAR(128) NOT NULL COMMENT '用户标识', - status int NOT NULL DEFAULT 1 COMMENT '任务状态:1、开始执行;2、执行成功;3、执行失败', + status int NOT NULL DEFAULT 1 COMMENT '任务状态:1、开始执行;2、执行成功;3、提交到redis队列;4、从redis消耗', create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', PRIMARY KEY (id), diff --git a/task_management/all_task_management.py b/task_management/all_task_management.py index de86dee..5311151 100644 --- a/task_management/all_task_management.py +++ b/task_management/all_task_management.py @@ -16,6 +16,13 @@ class AllTask: self.redis_conn = RedisTaskManager(self.config_data) self.db_pool: Optional[AsyncMySQL] = AsyncMySQL(self.config_data["advert_policy"]) + def get_task_item(self, task_data: Dict[str, Any]): + redis_key = f"{task_data["region"].lower()}_{task_data['app_name'].lower()}" + result = self.redis_conn.get_random_task_and_delete(redis_key) + if result: + return result + return None + async def deal_shopee_task(self, param: Dict[str, Any]): """ 处理Shopee任务的异步方法 @@ -32,6 +39,8 @@ class AllTask: print(f"{key}:{param['shop_id']}:{param['item_id']} 在redis中获取数据失败,将任务提交到队列") # 确保add_task_to_set是异步方法或正确处理 self.redis_conn.add_task_to_set(task_data=param) + update_parms = {"task_id": param["task_id"], "status": 3} + await self.update_task_record(update_parms) # 任务结束后开始等待 endtime = time.time() + 55 counter = 0