import os import json import redis import random from datetime import datetime from typing import List, Dict, Any, Optional class RedisTaskManager: def __init__(self, config_data: Dict[str, Any]): self.config_data = config_data """初始化Redis连接""" self.redis_client = redis.Redis( host=self.config_data['redis_config']['host'], port=self.config_data['redis_config']['port'], password=self.config_data['redis_config']['password'], db=self.config_data['redis_config']['db'], decode_responses=True ) self.expire_hours = 24 # 过期时间24小时 self.processing_timeout = 300 self.lock_timeout = 30 # 锁超时时间 def write_data(self, key: str, data: Any, expire_time: Optional[int] = None): """ 写入数据到Redis,设置过期时间 Args: key: Redis键名 data: 要存储的数据 expire_time: 自定义过期时间(秒),默认24小时 Returns: bool: 写入是否成功 """ try: # 如果数据是字典或列表,先序列化为JSON if isinstance(data, (dict, list)): data_str = json.dumps(data, ensure_ascii=False) else: data_str = str(data) # 设置过期时间,优先使用自定义时间 expire_seconds = expire_time if expire_time is not None else self.expire_hours * 3600 result = self.redis_client.setex(key, expire_seconds, data_str) if result: print(f"数据写入成功 - 键: {key}, 过期时间: {expire_seconds}秒") return True else: print(f"数据写入失败 - 键: {key}") return False except Exception as e: print(f"写入Redis数据时发生错误: {e}") return False def read_data(self, key: str): """ 从Redis读取数据 Args: key: Redis键名 Returns: Optional[Any]: 读取到的数据,如果键不存在返回None """ try: data_str = self.redis_client.get(key) if data_str is None: return None # 尝试解析JSON数据 try: return json.loads(data_str) except json.JSONDecodeError: # 如果不是JSON格式,返回原始字符串 return data_str except Exception as e: print(f"读取Redis数据时发生错误: {e}") return None def add_task_to_set(self, task_data: Dict[str, Any]) -> bool: """添加任务到Redis集合""" try: # 生成唯一任务标识 task_data['created_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') key = f"{task_data['shop_id']}_{task_data['item_id']}" redis_key = f"{task_data["region"].lower()}_{task_data['app_name'].lower()}" # 检查任务是否已存在 existing_task = self.redis_client.hget(redis_key, key) if existing_task: print(f"任务已存在: {key}") return False task_json = json.dumps(task_data) # 使用HSET添加任务,并设置过期时间 result = self.redis_client.hset(redis_key, key, task_json) self.redis_client.expire(redis_key, self.processing_timeout) return result > 0 except Exception as e: print(f"添加任务失败: {e}") return False def get_random_task_and_delete(self, key: str) -> Optional[Dict[str, Any]]: """随机获取一个任务并从HSET中删除,确保一次性消费""" lock_key = f"{key}:lock" # 尝试获取分布式锁 if self._acquire_lock(lock_key): try: # 获取HSET中的所有任务 all_tasks = self.redis_client.hgetall(key) if not all_tasks: return None # 随机选择一个任务 task_id = random.choice(list(all_tasks.keys())) task_json = all_tasks[task_id] task_data = json.loads(task_json) # 删除该任务 self.redis_client.hdel(key, task_id) return task_data finally: self._release_lock(lock_key) else: print("获取锁失败,可能其他进程正在处理") return None def get_and_process_task(self, key: str, process_callback: callable) -> bool: """获取并处理任务""" task_data = self.get_random_task_and_delete(key) if task_data: try: result = process_callback(task_data) return result except Exception as e: print(f"任务处理失败: {e}") return False return False def _acquire_lock(self, lock_key: str) -> bool: """获取分布式锁""" try: # 使用SET命令获取锁,NX表示不存在时才设置,EX设置过期时间 result = self.redis_client.set(lock_key, "locked", ex=self.lock_timeout, nx=True) return result is not None except Exception as e: print(f"获取锁失败: {e}") return False def _release_lock(self, lock_key: str) -> bool: """释放分布式锁""" try: self.redis_client.delete(lock_key) return True except Exception as e: print(f"释放锁失败: {e}") return False def get_task_count(self, key: str) -> int: """获取指定key中的任务数量""" try: return self.redis_client.hlen(key) except Exception as e: print(f"获取任务数量失败: {e}") return 0 def get_all_tasks(self, key: str) -> List[Dict[str, Any]]: """获取所有任务(仅查看,不删除)""" try: all_tasks = self.redis_client.hgetall(key) return [json.loads(task_json) for task_json in all_tasks.values()] except Exception as e: print(f"获取所有任务失败: {e}") return [] def clear_all_tasks(self, key: str) -> bool: """清空所有任务""" try: self.redis_client.delete(key) return True except Exception as e: print(f"清空任务失败: {e}") return False