crawler_task_management/public_function/redis_task_manager.py

73 lines
2.5 KiB
Python
Raw Normal View History

2025-11-26 17:40:11 +08:00
import os
import json
import redis
from typing import List, Dict, Any, Optional
class RedisTaskManager:
def __init__(self, config_data: Dict[str, Any]):
self.config_data = config_data
"""初始化Redis连接"""
self.redis_client = redis.Redis(
host=self.config_data['redis_config']['host'],
port=self.config_data['redis_config']['port'],
password=self.config_data['redis_config']['password'],
db=self.config_data['redis_config']['db'],
decode_responses=True
)
self.expire_hours = 24 # 过期时间24小时
def write_data(self, key: str, data: Any, expire_time: Optional[int] = None):
"""
写入数据到Redis设置过期时间
Args:
key: Redis键名
data: 要存储的数据
expire_time: 自定义过期时间默认24小时
Returns:
bool: 写入是否成功
"""
try:
# 如果数据是字典或列表先序列化为JSON
if isinstance(data, (dict, list)):
data_str = json.dumps(data, ensure_ascii=False)
else:
data_str = str(data)
# 设置过期时间,优先使用自定义时间
expire_seconds = expire_time if expire_time is not None else self.expire_hours * 3600
result = self.redis_client.setex(key, expire_seconds, data_str)
if result:
print(f"数据写入成功 - 键: {key}, 过期时间: {expire_seconds}")
return True
else:
print(f"数据写入失败 - 键: {key}")
return False
except Exception as e:
print(f"写入Redis数据时发生错误: {e}")
return False
def read_data(self, key: str):
"""
从Redis读取数据
Args:
key: Redis键名
Returns:
Optional[Any]: 读取到的数据如果键不存在返回None
"""
try:
data_str = self.redis_client.get(key)
if data_str is None:
print(f"键不存在或已过期 - 键: {key}")
return None
# 尝试解析JSON数据
try:
return json.loads(data_str)
except json.JSONDecodeError:
# 如果不是JSON格式返回原始字符串
return data_str
except Exception as e:
print(f"读取Redis数据时发生错误: {e}")
return None