代码优化

This commit is contained in:
liujianjiang 2025-11-28 15:08:58 +08:00
parent 3a1e13dcab
commit a751a9a26a
5 changed files with 37 additions and 4 deletions

View File

@ -0,0 +1,6 @@
# -*- coding: utf-8 -*-
class CrawlerManagement:
def __init__(self, config_data: Dict[str, Any]):
self.config_data = config_data
self.redis_conn = RedisTaskManager(self.config_data)

24
main.py
View File

@ -8,7 +8,7 @@ from typing import Dict, Any, Optional, List
from fastapi import FastAPI, HTTPException, Depends from fastapi import FastAPI, HTTPException, Depends
from public_function.auth import verify_tk_token from public_function.auth import verify_tk_token
from public_function.public_func import read_config, create_logger from public_function.public_func import read_config, create_logger
from model.model import GoodsInfo, DataReceive, AccountCreate, AccountUpdate, DeviceResetData from model.model import GoodsInfo, DataReceive, AccountCreate, AccountUpdate, DeviceResetData, CrawlerItem
app = FastAPI() app = FastAPI()
app.middleware("http")(verify_tk_token) app.middleware("http")(verify_tk_token)
@ -39,7 +39,18 @@ def get_account_manager():
def get_task_manager(): def get_task_manager():
"""获任务管理器实例""" """任务管理器实例"""
config = get_config()
try:
from crawler_management.crawler_management import CrawlerManagement
return AllTask(config)
except ImportError:
print(f"未找到AllTask类返回模拟实例")
return None
def get_crawler_manager():
"""任务获取管理器实例"""
config = get_config() config = get_config()
try: try:
from task_management.all_task_management import AllTask from task_management.all_task_management import AllTask
@ -122,6 +133,15 @@ async def update_account(account_data: AccountUpdate, account_manager: Any = Dep
raise HTTPException(status_code=500, detail="账号失败,请重试,失败原因:{}".format(e)) raise HTTPException(status_code=500, detail="账号失败,请重试,失败原因:{}".format(e))
@app.get("/get_crawler_task", summary="获取任务")
async def get_crawler_task(task_data: CrawlerItem, task_manager: Any = Depends(get_crawler_manager)):
"""
获取指定应用的可用账号
- **app_name**: 应用名称
- **country**: 应用名称
"""
@app.post("/receive_data", summary="接收抓取得数据") @app.post("/receive_data", summary="接收抓取得数据")
async def receive_data(task_data: DataReceive, task_manager: Any = Depends(get_task_manager)): async def receive_data(task_data: DataReceive, task_manager: Any = Depends(get_task_manager)):
"""数据接收接口""" """数据接收接口"""

View File

@ -38,3 +38,8 @@ class DeviceResetData(BaseModel):
device_id: str = Field(..., description="设备ID") device_id: str = Field(..., description="设备ID")
country: str = Field(..., description="国家") country: str = Field(..., description="国家")
app_name: str = Field(..., description="应用名称") app_name: str = Field(..., description="应用名称")
class CrawlerItem(BaseModel):
country: str = Field(..., description="账号所在国家")
app_name: str = Field(..., description="应用名称")

View File

@ -69,10 +69,11 @@ class RedisTaskManager:
print(f"读取Redis数据时发生错误: {e}") print(f"读取Redis数据时发生错误: {e}")
return None return None
def add_task_to_set(self, task_data: Dict[str, Any], redis_key='crawler_task'): def add_task_to_set(self, task_data: Dict[str, Any], ):
try: try:
# 将任务数据序列化为JSON字符串 # 将任务数据序列化为JSON字符串
key = f"{task_data['shop_id']}_{task_data['item_id']}" key = f"{task_data['shop_id']}_{task_data['item_id']}"
redis_key = f'{task_data["country"].lower()}_{task_data["app_name"].lower()}'
task_json = json.dumps(task_data) task_json = json.dumps(task_data)
# 为每个pad_code在集合中添加相同的任务数据 # 为每个pad_code在集合中添加相同的任务数据
# Redis集合会自动去重相同的pad_code只会保存一次 # Redis集合会自动去重相同的pad_code只会保存一次

View File

@ -22,7 +22,8 @@ class AllTask:
修复了NoneType对象无法await的问题 修复了NoneType对象无法await的问题
""" """
# 查询redis数据库redis数据库存在该数据直接返回 # 查询redis数据库redis数据库存在该数据直接返回
key = f"{param['shop_id']}:{param['item_id']}" # key = f"{param['shop_id']}:{param['item_id']}"
key = f"{param['app_name'].lower()}:{param['country'].lower()}"
# 如果不是重新抓取先尝试从Redis获取数据 # 如果不是重新抓取先尝试从Redis获取数据
if not param.get('is_re_crawl', False): if not param.get('is_re_crawl', False):
print(f"{key} 开始在redis中获取数据") print(f"{key} 开始在redis中获取数据")