Compare commits
No commits in common. "d08b4e203b68b3e6e83a38e511ae1eb1d13553f1" and "a751a9a26a1cbfa850556283de6f8c26d550db22" have entirely different histories.
d08b4e203b
...
a751a9a26a
|
|
@ -1,16 +1,6 @@
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
from typing import Dict, Any
|
|
||||||
from public_function.redis_task_manager import RedisTaskManager
|
|
||||||
|
|
||||||
|
|
||||||
class CrawlerManagement:
|
class CrawlerManagement:
|
||||||
def __init__(self, config_data: Dict[str, Any]):
|
def __init__(self, config_data: Dict[str, Any]):
|
||||||
self.config_data = config_data
|
self.config_data = config_data
|
||||||
self.redis_conn = RedisTaskManager(self.config_data)
|
self.redis_conn = RedisTaskManager(self.config_data)
|
||||||
|
|
||||||
def get_task_item(self, task_data: Dict[str, Any]):
|
|
||||||
redis_key = f"{task_data["country"].lower()}_{task_data['app_name'].lower()}"
|
|
||||||
result = self.redis_conn.get_random_task_and_delete(redis_key)
|
|
||||||
if result:
|
|
||||||
return result
|
|
||||||
return None
|
|
||||||
|
|
|
||||||
17
main.py
17
main.py
|
|
@ -38,18 +38,18 @@ def get_account_manager():
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
def get_crawler_manager():
|
def get_task_manager():
|
||||||
"""任务管理器实例"""
|
"""任务管理器实例"""
|
||||||
config = get_config()
|
config = get_config()
|
||||||
try:
|
try:
|
||||||
from crawler_management.crawler_management import CrawlerManagement
|
from crawler_management.crawler_management import CrawlerManagement
|
||||||
return CrawlerManagement(config)
|
return AllTask(config)
|
||||||
except ImportError:
|
except ImportError:
|
||||||
print(f"未找到AllTask类,返回模拟实例")
|
print(f"未找到AllTask类,返回模拟实例")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
def get_task_manager():
|
def get_crawler_manager():
|
||||||
"""任务获取管理器实例"""
|
"""任务获取管理器实例"""
|
||||||
config = get_config()
|
config = get_config()
|
||||||
try:
|
try:
|
||||||
|
|
@ -133,22 +133,13 @@ async def update_account(account_data: AccountUpdate, account_manager: Any = Dep
|
||||||
raise HTTPException(status_code=500, detail="账号失败,请重试,失败原因:{}".format(e))
|
raise HTTPException(status_code=500, detail="账号失败,请重试,失败原因:{}".format(e))
|
||||||
|
|
||||||
|
|
||||||
@app.post("/get_crawler_task", summary="获取任务")
|
@app.get("/get_crawler_task", summary="获取任务")
|
||||||
async def get_crawler_task(task_data: CrawlerItem, task_manager: Any = Depends(get_crawler_manager)):
|
async def get_crawler_task(task_data: CrawlerItem, task_manager: Any = Depends(get_crawler_manager)):
|
||||||
"""
|
"""
|
||||||
获取指定应用的可用账号
|
获取指定应用的可用账号
|
||||||
- **app_name**: 应用名称
|
- **app_name**: 应用名称
|
||||||
- **country**: 应用名称
|
- **country**: 应用名称
|
||||||
"""
|
"""
|
||||||
try:
|
|
||||||
params = task_data.model_dump()
|
|
||||||
result = task_manager.get_task_item(params)
|
|
||||||
if result:
|
|
||||||
return {"code": 200, "message": "任务获取成功", "data": result}
|
|
||||||
raise HTTPException(status_code=404, detail="队列暂时没有任务,请等待一段时间后重新尝试")
|
|
||||||
except Exception as e:
|
|
||||||
print(f"获取任务失败,失败原因: {e}")
|
|
||||||
raise HTTPException(status_code=500, detail="获取任务失败;失败原因{}".format(e))
|
|
||||||
|
|
||||||
|
|
||||||
@app.post("/receive_data", summary="接收抓取得数据")
|
@app.post("/receive_data", summary="接收抓取得数据")
|
||||||
|
|
|
||||||
|
|
@ -1,9 +1,6 @@
|
||||||
import os
|
import os
|
||||||
import json
|
import json
|
||||||
import uuid
|
|
||||||
import redis
|
import redis
|
||||||
import random
|
|
||||||
from datetime import datetime
|
|
||||||
from typing import List, Dict, Any, Optional
|
from typing import List, Dict, Any, Optional
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -19,8 +16,6 @@ class RedisTaskManager:
|
||||||
decode_responses=True
|
decode_responses=True
|
||||||
)
|
)
|
||||||
self.expire_hours = 24 # 过期时间24小时
|
self.expire_hours = 24 # 过期时间24小时
|
||||||
self.processing_timeout = 300
|
|
||||||
self.lock_timeout = 30 # 锁超时时间
|
|
||||||
|
|
||||||
def write_data(self, key: str, data: Any, expire_time: Optional[int] = None):
|
def write_data(self, key: str, data: Any, expire_time: Optional[int] = None):
|
||||||
"""
|
"""
|
||||||
|
|
@ -74,104 +69,16 @@ class RedisTaskManager:
|
||||||
print(f"读取Redis数据时发生错误: {e}")
|
print(f"读取Redis数据时发生错误: {e}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def add_task_to_set(self, task_data: Dict[str, Any]) -> bool:
|
def add_task_to_set(self, task_data: Dict[str, Any], ):
|
||||||
"""添加任务到Redis集合"""
|
|
||||||
try:
|
try:
|
||||||
# 生成唯一任务标识
|
# 将任务数据序列化为JSON字符串
|
||||||
task_data['task_id'] = uuid.uuid4().hex
|
|
||||||
task_data['created_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
||||||
key = f"{task_data['shop_id']}_{task_data['item_id']}"
|
key = f"{task_data['shop_id']}_{task_data['item_id']}"
|
||||||
redis_key = f"{task_data["country"].lower()}_{task_data['app_name'].lower()}"
|
redis_key = f'{task_data["country"].lower()}_{task_data["app_name"].lower()}'
|
||||||
# 检查任务是否已存在
|
|
||||||
existing_task = self.redis_client.hget(redis_key, key)
|
|
||||||
if existing_task:
|
|
||||||
print(f"任务已存在: {key}")
|
|
||||||
return False
|
|
||||||
task_json = json.dumps(task_data)
|
task_json = json.dumps(task_data)
|
||||||
# 使用HSET添加任务,并设置过期时间
|
# 为每个pad_code在集合中添加相同的任务数据
|
||||||
result = self.redis_client.hset(redis_key, key, task_json)
|
# Redis集合会自动去重,相同的pad_code只会保存一次
|
||||||
self.redis_client.expire(redis_key, self.processing_timeout)
|
self.redis_client.hset(redis_key, key, task_json)
|
||||||
return result > 0
|
return True
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"添加任务失败: {e}")
|
print(f"添加任务失败: {e}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def get_random_task_and_delete(self, key: str) -> Optional[Dict[str, Any]]:
|
|
||||||
"""随机获取一个任务并从HSET中删除,确保一次性消费"""
|
|
||||||
lock_key = f"{key}:lock"
|
|
||||||
# 尝试获取分布式锁
|
|
||||||
if self._acquire_lock(lock_key):
|
|
||||||
try:
|
|
||||||
# 获取HSET中的所有任务
|
|
||||||
all_tasks = self.redis_client.hgetall(key)
|
|
||||||
if not all_tasks:
|
|
||||||
return None
|
|
||||||
# 随机选择一个任务
|
|
||||||
task_id = random.choice(list(all_tasks.keys()))
|
|
||||||
task_json = all_tasks[task_id]
|
|
||||||
task_data = json.loads(task_json)
|
|
||||||
# 删除该任务
|
|
||||||
self.redis_client.hdel(key, task_id)
|
|
||||||
return task_data
|
|
||||||
finally:
|
|
||||||
self._release_lock(lock_key)
|
|
||||||
else:
|
|
||||||
print("获取锁失败,可能其他进程正在处理")
|
|
||||||
return None
|
|
||||||
|
|
||||||
def get_and_process_task(self, key: str, process_callback: callable) -> bool:
|
|
||||||
"""获取并处理任务"""
|
|
||||||
task_data = self.get_random_task_and_delete(key)
|
|
||||||
if task_data:
|
|
||||||
try:
|
|
||||||
result = process_callback(task_data)
|
|
||||||
return result
|
|
||||||
except Exception as e:
|
|
||||||
print(f"任务处理失败: {e}")
|
|
||||||
return False
|
|
||||||
return False
|
|
||||||
|
|
||||||
def _acquire_lock(self, lock_key: str) -> bool:
|
|
||||||
"""获取分布式锁"""
|
|
||||||
try:
|
|
||||||
# 使用SET命令获取锁,NX表示不存在时才设置,EX设置过期时间
|
|
||||||
result = self.redis_client.set(lock_key, "locked", ex=self.lock_timeout, nx=True)
|
|
||||||
return result is not None
|
|
||||||
except Exception as e:
|
|
||||||
print(f"获取锁失败: {e}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
def _release_lock(self, lock_key: str) -> bool:
|
|
||||||
"""释放分布式锁"""
|
|
||||||
try:
|
|
||||||
self.redis_client.delete(lock_key)
|
|
||||||
return True
|
|
||||||
except Exception as e:
|
|
||||||
print(f"释放锁失败: {e}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
def get_task_count(self, key: str) -> int:
|
|
||||||
"""获取指定key中的任务数量"""
|
|
||||||
try:
|
|
||||||
return self.redis_client.hlen(key)
|
|
||||||
except Exception as e:
|
|
||||||
print(f"获取任务数量失败: {e}")
|
|
||||||
return 0
|
|
||||||
|
|
||||||
def get_all_tasks(self, key: str) -> List[Dict[str, Any]]:
|
|
||||||
"""获取所有任务(仅查看,不删除)"""
|
|
||||||
try:
|
|
||||||
all_tasks = self.redis_client.hgetall(key)
|
|
||||||
return [json.loads(task_json) for task_json in all_tasks.values()]
|
|
||||||
except Exception as e:
|
|
||||||
print(f"获取所有任务失败: {e}")
|
|
||||||
return []
|
|
||||||
|
|
||||||
def clear_all_tasks(self, key: str) -> bool:
|
|
||||||
"""清空所有任务"""
|
|
||||||
try:
|
|
||||||
self.redis_client.delete(key)
|
|
||||||
return True
|
|
||||||
except Exception as e:
|
|
||||||
print(f"清空任务失败: {e}")
|
|
||||||
return False
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue