Compare commits

..

2 Commits

Author SHA1 Message Date
liujianjiang d08b4e203b 新增获取任务接口 2025-11-28 15:38:58 +08:00
liujianjiang 3df782999b 新增获取任务接口 2025-11-28 15:38:52 +08:00
3 changed files with 123 additions and 11 deletions

View File

@ -1,6 +1,16 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from typing import Dict, Any
from public_function.redis_task_manager import RedisTaskManager
class CrawlerManagement: class CrawlerManagement:
def __init__(self, config_data: Dict[str, Any]): def __init__(self, config_data: Dict[str, Any]):
self.config_data = config_data self.config_data = config_data
self.redis_conn = RedisTaskManager(self.config_data) self.redis_conn = RedisTaskManager(self.config_data)
def get_task_item(self, task_data: Dict[str, Any]):
redis_key = f"{task_data["country"].lower()}_{task_data['app_name'].lower()}"
result = self.redis_conn.get_random_task_and_delete(redis_key)
if result:
return result
return None

17
main.py
View File

@ -38,18 +38,18 @@ def get_account_manager():
return None return None
def get_task_manager(): def get_crawler_manager():
"""任务管理器实例""" """任务管理器实例"""
config = get_config() config = get_config()
try: try:
from crawler_management.crawler_management import CrawlerManagement from crawler_management.crawler_management import CrawlerManagement
return AllTask(config) return CrawlerManagement(config)
except ImportError: except ImportError:
print(f"未找到AllTask类返回模拟实例") print(f"未找到AllTask类返回模拟实例")
return None return None
def get_crawler_manager(): def get_task_manager():
"""任务获取管理器实例""" """任务获取管理器实例"""
config = get_config() config = get_config()
try: try:
@ -133,13 +133,22 @@ async def update_account(account_data: AccountUpdate, account_manager: Any = Dep
raise HTTPException(status_code=500, detail="账号失败,请重试,失败原因:{}".format(e)) raise HTTPException(status_code=500, detail="账号失败,请重试,失败原因:{}".format(e))
@app.get("/get_crawler_task", summary="获取任务") @app.post("/get_crawler_task", summary="获取任务")
async def get_crawler_task(task_data: CrawlerItem, task_manager: Any = Depends(get_crawler_manager)): async def get_crawler_task(task_data: CrawlerItem, task_manager: Any = Depends(get_crawler_manager)):
""" """
获取指定应用的可用账号 获取指定应用的可用账号
- **app_name**: 应用名称 - **app_name**: 应用名称
- **country**: 应用名称 - **country**: 应用名称
""" """
try:
params = task_data.model_dump()
result = task_manager.get_task_item(params)
if result:
return {"code": 200, "message": "任务获取成功", "data": result}
raise HTTPException(status_code=404, detail="队列暂时没有任务,请等待一段时间后重新尝试")
except Exception as e:
print(f"获取任务失败,失败原因: {e}")
raise HTTPException(status_code=500, detail="获取任务失败;失败原因{}".format(e))
@app.post("/receive_data", summary="接收抓取得数据") @app.post("/receive_data", summary="接收抓取得数据")

View File

@ -1,6 +1,9 @@
import os import os
import json import json
import uuid
import redis import redis
import random
from datetime import datetime
from typing import List, Dict, Any, Optional from typing import List, Dict, Any, Optional
@ -16,6 +19,8 @@ class RedisTaskManager:
decode_responses=True decode_responses=True
) )
self.expire_hours = 24 # 过期时间24小时 self.expire_hours = 24 # 过期时间24小时
self.processing_timeout = 300
self.lock_timeout = 30 # 锁超时时间
def write_data(self, key: str, data: Any, expire_time: Optional[int] = None): def write_data(self, key: str, data: Any, expire_time: Optional[int] = None):
""" """
@ -69,16 +74,104 @@ class RedisTaskManager:
print(f"读取Redis数据时发生错误: {e}") print(f"读取Redis数据时发生错误: {e}")
return None return None
def add_task_to_set(self, task_data: Dict[str, Any], ): def add_task_to_set(self, task_data: Dict[str, Any]) -> bool:
"""添加任务到Redis集合"""
try: try:
# 将任务数据序列化为JSON字符串 # 生成唯一任务标识
task_data['task_id'] = uuid.uuid4().hex
task_data['created_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
key = f"{task_data['shop_id']}_{task_data['item_id']}" key = f"{task_data['shop_id']}_{task_data['item_id']}"
redis_key = f'{task_data["country"].lower()}_{task_data["app_name"].lower()}' redis_key = f"{task_data["country"].lower()}_{task_data['app_name'].lower()}"
# 检查任务是否已存在
existing_task = self.redis_client.hget(redis_key, key)
if existing_task:
print(f"任务已存在: {key}")
return False
task_json = json.dumps(task_data) task_json = json.dumps(task_data)
# 为每个pad_code在集合中添加相同的任务数据 # 使用HSET添加任务并设置过期时间
# Redis集合会自动去重相同的pad_code只会保存一次 result = self.redis_client.hset(redis_key, key, task_json)
self.redis_client.hset(redis_key, key, task_json) self.redis_client.expire(redis_key, self.processing_timeout)
return True return result > 0
except Exception as e: except Exception as e:
print(f"添加任务失败: {e}") print(f"添加任务失败: {e}")
return False return False
def get_random_task_and_delete(self, key: str) -> Optional[Dict[str, Any]]:
"""随机获取一个任务并从HSET中删除确保一次性消费"""
lock_key = f"{key}:lock"
# 尝试获取分布式锁
if self._acquire_lock(lock_key):
try:
# 获取HSET中的所有任务
all_tasks = self.redis_client.hgetall(key)
if not all_tasks:
return None
# 随机选择一个任务
task_id = random.choice(list(all_tasks.keys()))
task_json = all_tasks[task_id]
task_data = json.loads(task_json)
# 删除该任务
self.redis_client.hdel(key, task_id)
return task_data
finally:
self._release_lock(lock_key)
else:
print("获取锁失败,可能其他进程正在处理")
return None
def get_and_process_task(self, key: str, process_callback: callable) -> bool:
"""获取并处理任务"""
task_data = self.get_random_task_and_delete(key)
if task_data:
try:
result = process_callback(task_data)
return result
except Exception as e:
print(f"任务处理失败: {e}")
return False
return False
def _acquire_lock(self, lock_key: str) -> bool:
"""获取分布式锁"""
try:
# 使用SET命令获取锁NX表示不存在时才设置EX设置过期时间
result = self.redis_client.set(lock_key, "locked", ex=self.lock_timeout, nx=True)
return result is not None
except Exception as e:
print(f"获取锁失败: {e}")
return False
def _release_lock(self, lock_key: str) -> bool:
"""释放分布式锁"""
try:
self.redis_client.delete(lock_key)
return True
except Exception as e:
print(f"释放锁失败: {e}")
return False
def get_task_count(self, key: str) -> int:
"""获取指定key中的任务数量"""
try:
return self.redis_client.hlen(key)
except Exception as e:
print(f"获取任务数量失败: {e}")
return 0
def get_all_tasks(self, key: str) -> List[Dict[str, Any]]:
"""获取所有任务(仅查看,不删除)"""
try:
all_tasks = self.redis_client.hgetall(key)
return [json.loads(task_json) for task_json in all_tasks.values()]
except Exception as e:
print(f"获取所有任务失败: {e}")
return []
def clear_all_tasks(self, key: str) -> bool:
"""清空所有任务"""
try:
self.redis_client.delete(key)
return True
except Exception as e:
print(f"清空任务失败: {e}")
return False