From 3df782999b0be33dd8ced9f5552d2209de2d793e Mon Sep 17 00:00:00 2001 From: liujianjiang Date: Fri, 28 Nov 2025 15:38:52 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E8=8E=B7=E5=8F=96=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- crawler_management/crawler_management.py | 10 +++ public_function/redis_task_manager.py | 107 +++++++++++++++++++++-- 2 files changed, 110 insertions(+), 7 deletions(-) diff --git a/crawler_management/crawler_management.py b/crawler_management/crawler_management.py index 96a2336..a94a232 100644 --- a/crawler_management/crawler_management.py +++ b/crawler_management/crawler_management.py @@ -1,6 +1,16 @@ # -*- coding: utf-8 -*- +from typing import Dict, Any +from public_function.redis_task_manager import RedisTaskManager + class CrawlerManagement: def __init__(self, config_data: Dict[str, Any]): self.config_data = config_data self.redis_conn = RedisTaskManager(self.config_data) + + def get_task_item(self, task_data: Dict[str, Any]): + redis_key = f"{task_data["country"].lower()}_{task_data['app_name'].lower()}" + result = self.redis_conn.get_random_task_and_delete(redis_key) + if result: + return result + return None diff --git a/public_function/redis_task_manager.py b/public_function/redis_task_manager.py index 558c7be..3ae5031 100644 --- a/public_function/redis_task_manager.py +++ b/public_function/redis_task_manager.py @@ -1,6 +1,9 @@ import os import json +import uuid import redis +import random +from datetime import datetime from typing import List, Dict, Any, Optional @@ -16,6 +19,8 @@ class RedisTaskManager: decode_responses=True ) self.expire_hours = 24 # 过期时间24小时 + self.processing_timeout = 300 + self.lock_timeout = 30 # 锁超时时间 def write_data(self, key: str, data: Any, expire_time: Optional[int] = None): """ @@ -69,16 +74,104 @@ class RedisTaskManager: print(f"读取Redis数据时发生错误: {e}") return None - def add_task_to_set(self, task_data: Dict[str, Any], ): + def add_task_to_set(self, task_data: Dict[str, Any]) -> bool: + """添加任务到Redis集合""" try: - # 将任务数据序列化为JSON字符串 + # 生成唯一任务标识 + task_data['task_id'] = uuid.uuid4().hex + task_data['created_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') key = f"{task_data['shop_id']}_{task_data['item_id']}" - redis_key = f'{task_data["country"].lower()}_{task_data["app_name"].lower()}' + redis_key = f"{task_data["country"].lower()}_{task_data['app_name'].lower()}" + # 检查任务是否已存在 + existing_task = self.redis_client.hget(redis_key, key) + if existing_task: + print(f"任务已存在: {key}") + return False task_json = json.dumps(task_data) - # 为每个pad_code在集合中添加相同的任务数据 - # Redis集合会自动去重,相同的pad_code只会保存一次 - self.redis_client.hset(redis_key, key, task_json) - return True + # 使用HSET添加任务,并设置过期时间 + result = self.redis_client.hset(redis_key, key, task_json) + self.redis_client.expire(redis_key, self.processing_timeout) + return result > 0 except Exception as e: print(f"添加任务失败: {e}") return False + + def get_random_task_and_delete(self, key: str) -> Optional[Dict[str, Any]]: + """随机获取一个任务并从HSET中删除,确保一次性消费""" + lock_key = f"{key}:lock" + # 尝试获取分布式锁 + if self._acquire_lock(lock_key): + try: + # 获取HSET中的所有任务 + all_tasks = self.redis_client.hgetall(key) + if not all_tasks: + return None + # 随机选择一个任务 + task_id = random.choice(list(all_tasks.keys())) + task_json = all_tasks[task_id] + task_data = json.loads(task_json) + # 删除该任务 + self.redis_client.hdel(key, task_id) + return task_data + finally: + self._release_lock(lock_key) + else: + print("获取锁失败,可能其他进程正在处理") + return None + + def get_and_process_task(self, key: str, process_callback: callable) -> bool: + """获取并处理任务""" + task_data = self.get_random_task_and_delete(key) + if task_data: + try: + result = process_callback(task_data) + return result + except Exception as e: + print(f"任务处理失败: {e}") + return False + return False + + def _acquire_lock(self, lock_key: str) -> bool: + """获取分布式锁""" + try: + # 使用SET命令获取锁,NX表示不存在时才设置,EX设置过期时间 + result = self.redis_client.set(lock_key, "locked", ex=self.lock_timeout, nx=True) + return result is not None + except Exception as e: + print(f"获取锁失败: {e}") + return False + + def _release_lock(self, lock_key: str) -> bool: + """释放分布式锁""" + try: + self.redis_client.delete(lock_key) + return True + except Exception as e: + print(f"释放锁失败: {e}") + return False + + def get_task_count(self, key: str) -> int: + """获取指定key中的任务数量""" + try: + return self.redis_client.hlen(key) + except Exception as e: + print(f"获取任务数量失败: {e}") + return 0 + + def get_all_tasks(self, key: str) -> List[Dict[str, Any]]: + """获取所有任务(仅查看,不删除)""" + try: + all_tasks = self.redis_client.hgetall(key) + return [json.loads(task_json) for task_json in all_tasks.values()] + except Exception as e: + print(f"获取所有任务失败: {e}") + return [] + + def clear_all_tasks(self, key: str) -> bool: + """清空所有任务""" + try: + self.redis_client.delete(key) + return True + except Exception as e: + print(f"清空任务失败: {e}") + return False