From a751a9a26a1cbfa850556283de6f8c26d550db22 Mon Sep 17 00:00:00 2001 From: liujianjiang Date: Fri, 28 Nov 2025 15:08:58 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- crawler_management/crawler_management.py | 6 ++++++ main.py | 24 ++++++++++++++++++++++-- model/model.py | 5 +++++ public_function/redis_task_manager.py | 3 ++- task_management/all_task_management.py | 3 ++- 5 files changed, 37 insertions(+), 4 deletions(-) create mode 100644 crawler_management/crawler_management.py diff --git a/crawler_management/crawler_management.py b/crawler_management/crawler_management.py new file mode 100644 index 0000000..96a2336 --- /dev/null +++ b/crawler_management/crawler_management.py @@ -0,0 +1,6 @@ +# -*- coding: utf-8 -*- + +class CrawlerManagement: + def __init__(self, config_data: Dict[str, Any]): + self.config_data = config_data + self.redis_conn = RedisTaskManager(self.config_data) diff --git a/main.py b/main.py index a1d197b..ac22180 100644 --- a/main.py +++ b/main.py @@ -8,7 +8,7 @@ from typing import Dict, Any, Optional, List from fastapi import FastAPI, HTTPException, Depends from public_function.auth import verify_tk_token from public_function.public_func import read_config, create_logger -from model.model import GoodsInfo, DataReceive, AccountCreate, AccountUpdate, DeviceResetData +from model.model import GoodsInfo, DataReceive, AccountCreate, AccountUpdate, DeviceResetData, CrawlerItem app = FastAPI() app.middleware("http")(verify_tk_token) @@ -39,7 +39,18 @@ def get_account_manager(): def get_task_manager(): - """获任务管理器实例""" + """任务管理器实例""" + config = get_config() + try: + from crawler_management.crawler_management import CrawlerManagement + return AllTask(config) + except ImportError: + print(f"未找到AllTask类,返回模拟实例") + return None + + +def get_crawler_manager(): + """任务获取管理器实例""" config = get_config() try: from task_management.all_task_management import AllTask @@ -122,6 +133,15 @@ async def update_account(account_data: AccountUpdate, account_manager: Any = Dep raise HTTPException(status_code=500, detail="账号失败,请重试,失败原因:{}".format(e)) +@app.get("/get_crawler_task", summary="获取任务") +async def get_crawler_task(task_data: CrawlerItem, task_manager: Any = Depends(get_crawler_manager)): + """ + 获取指定应用的可用账号 + - **app_name**: 应用名称 + - **country**: 应用名称 + """ + + @app.post("/receive_data", summary="接收抓取得数据") async def receive_data(task_data: DataReceive, task_manager: Any = Depends(get_task_manager)): """数据接收接口""" diff --git a/model/model.py b/model/model.py index a690e7c..76dc047 100644 --- a/model/model.py +++ b/model/model.py @@ -38,3 +38,8 @@ class DeviceResetData(BaseModel): device_id: str = Field(..., description="设备ID") country: str = Field(..., description="国家") app_name: str = Field(..., description="应用名称") + + +class CrawlerItem(BaseModel): + country: str = Field(..., description="账号所在国家") + app_name: str = Field(..., description="应用名称") diff --git a/public_function/redis_task_manager.py b/public_function/redis_task_manager.py index a637c77..558c7be 100644 --- a/public_function/redis_task_manager.py +++ b/public_function/redis_task_manager.py @@ -69,10 +69,11 @@ class RedisTaskManager: print(f"读取Redis数据时发生错误: {e}") return None - def add_task_to_set(self, task_data: Dict[str, Any], redis_key='crawler_task'): + def add_task_to_set(self, task_data: Dict[str, Any], ): try: # 将任务数据序列化为JSON字符串 key = f"{task_data['shop_id']}_{task_data['item_id']}" + redis_key = f'{task_data["country"].lower()}_{task_data["app_name"].lower()}' task_json = json.dumps(task_data) # 为每个pad_code在集合中添加相同的任务数据 # Redis集合会自动去重,相同的pad_code只会保存一次 diff --git a/task_management/all_task_management.py b/task_management/all_task_management.py index 89960ea..e6bcbdc 100644 --- a/task_management/all_task_management.py +++ b/task_management/all_task_management.py @@ -22,7 +22,8 @@ class AllTask: 修复了NoneType对象无法await的问题 """ # 查询redis数据库,redis数据库存在该数据直接返回 - key = f"{param['shop_id']}:{param['item_id']}" + # key = f"{param['shop_id']}:{param['item_id']}" + key = f"{param['app_name'].lower()}:{param['country'].lower()}" # 如果不是重新抓取,先尝试从Redis获取数据 if not param.get('is_re_crawl', False): print(f"{key} 开始在redis中获取数据")