crawler_task_management/public_function/redis_task_manager.py

217 lines
8.0 KiB
Python
Raw Normal View History

2025-11-26 17:40:11 +08:00
import os
import json
import redis
2025-11-28 15:38:52 +08:00
import random
from datetime import datetime
2025-11-26 17:40:11 +08:00
from typing import List, Dict, Any, Optional
class RedisTaskManager:
def __init__(self, config_data: Dict[str, Any]):
self.config_data = config_data
"""初始化Redis连接"""
self.redis_client = redis.Redis(
host=self.config_data['redis_config']['host'],
port=self.config_data['redis_config']['port'],
password=self.config_data['redis_config']['password'],
db=self.config_data['redis_config']['db'],
decode_responses=True
)
self.expire_hours = 24 # 过期时间24小时
2025-11-28 15:38:52 +08:00
self.processing_timeout = 300
self.lock_timeout = 30 # 锁超时时间
2025-11-26 17:40:11 +08:00
def write_data(self, key: str, data: Any, expire_time: Optional[int] = None):
"""
写入数据到Redis设置过期时间
Args:
key: Redis键名
data: 要存储的数据
expire_time: 自定义过期时间默认24小时
Returns:
bool: 写入是否成功
"""
try:
# 如果数据是字典或列表先序列化为JSON
if isinstance(data, (dict, list)):
data_str = json.dumps(data, ensure_ascii=False)
else:
data_str = str(data)
# 设置过期时间,优先使用自定义时间
expire_seconds = expire_time if expire_time is not None else self.expire_hours * 3600
result = self.redis_client.setex(key, expire_seconds, data_str)
if result:
print(f"数据写入成功 - 键: {key}, 过期时间: {expire_seconds}")
return True
else:
print(f"数据写入失败 - 键: {key}")
return False
except Exception as e:
print(f"写入Redis数据时发生错误: {e}")
return False
def read_data(self, key: str):
"""
从Redis读取数据
Args:
key: Redis键名
Returns:
Optional[Any]: 读取到的数据如果键不存在返回None
"""
try:
data_str = self.redis_client.get(key)
if data_str is None:
return None
# 尝试解析JSON数据
try:
return json.loads(data_str)
except json.JSONDecodeError:
# 如果不是JSON格式返回原始字符串
return data_str
except Exception as e:
print(f"读取Redis数据时发生错误: {e}")
return None
2025-11-28 14:50:01 +08:00
2025-11-28 18:17:54 +08:00
def check_field_exists(self, key: str, field: str) -> bool:
"""检查字段是否存在"""
try:
exists = self.redis_client.hexists(key, field)
return exists
except Exception as e:
print(f"检查字段存在失败: {e}")
return False
def delete_field(self, key: str, field: str) -> bool:
"""删除HSET中的字段"""
try:
result = self.redis_client.hdel(key, field)
return result > 0
except Exception as e:
print(f"删除字段失败: {e}")
return False
def write_string_to_h_set(self, key: str, field: str, value: str) -> bool:
"""将字符串写入Redis HSET"""
# key 默认为 user_token, field 默认为 token 值,value 为 123
try:
result = self.redis_client.hset(key, field, value)
return result > 0
except Exception as e:
print(f"写入HSET失败: {e}")
return False
2025-11-28 15:38:52 +08:00
def add_task_to_set(self, task_data: Dict[str, Any]) -> bool:
"""添加任务到Redis集合"""
2025-11-28 14:50:01 +08:00
try:
2025-11-28 15:38:52 +08:00
# 生成唯一任务标识
task_data['created_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
2025-11-28 14:50:01 +08:00
key = f"{task_data['shop_id']}_{task_data['item_id']}"
2025-11-28 17:19:51 +08:00
redis_key = f"{task_data["region"].lower()}_{task_data['app_name'].lower()}"
2025-11-28 15:38:52 +08:00
# 检查任务是否已存在
existing_task = self.redis_client.hget(redis_key, key)
if existing_task:
print(f"任务已存在: {key}")
return False
2025-11-28 14:50:01 +08:00
task_json = json.dumps(task_data)
2025-11-28 15:38:52 +08:00
# 使用HSET添加任务并设置过期时间
result = self.redis_client.hset(redis_key, key, task_json)
self.redis_client.expire(redis_key, self.processing_timeout)
return result > 0
2025-11-28 14:50:01 +08:00
except Exception as e:
print(f"添加任务失败: {e}")
return False
2025-11-28 15:38:52 +08:00
def get_random_task_and_delete(self, key: str) -> Optional[Dict[str, Any]]:
"""随机获取一个任务并从HSET中删除确保一次性消费"""
lock_key = f"{key}:lock"
# 尝试获取分布式锁
if self._acquire_lock(lock_key):
try:
# 获取HSET中的所有任务
all_tasks = self.redis_client.hgetall(key)
if not all_tasks:
return None
# 随机选择一个任务
task_id = random.choice(list(all_tasks.keys()))
task_json = all_tasks[task_id]
task_data = json.loads(task_json)
# 删除该任务
self.redis_client.hdel(key, task_id)
return task_data
finally:
self._release_lock(lock_key)
else:
print("获取锁失败,可能其他进程正在处理")
return None
def get_and_process_task(self, key: str, process_callback: callable) -> bool:
"""获取并处理任务"""
task_data = self.get_random_task_and_delete(key)
if task_data:
try:
result = process_callback(task_data)
return result
except Exception as e:
print(f"任务处理失败: {e}")
return False
return False
def _acquire_lock(self, lock_key: str) -> bool:
"""获取分布式锁"""
try:
# 使用SET命令获取锁NX表示不存在时才设置EX设置过期时间
result = self.redis_client.set(lock_key, "locked", ex=self.lock_timeout, nx=True)
return result is not None
except Exception as e:
print(f"获取锁失败: {e}")
return False
def _release_lock(self, lock_key: str) -> bool:
"""释放分布式锁"""
try:
self.redis_client.delete(lock_key)
return True
except Exception as e:
print(f"释放锁失败: {e}")
return False
def get_task_count(self, key: str) -> int:
"""获取指定key中的任务数量"""
try:
return self.redis_client.hlen(key)
except Exception as e:
print(f"获取任务数量失败: {e}")
return 0
def get_all_tasks(self, key: str) -> List[Dict[str, Any]]:
"""获取所有任务(仅查看,不删除)"""
try:
all_tasks = self.redis_client.hgetall(key)
return [json.loads(task_json) for task_json in all_tasks.values()]
except Exception as e:
print(f"获取所有任务失败: {e}")
return []
def clear_all_tasks(self, key: str) -> bool:
"""清空所有任务"""
try:
self.redis_client.delete(key)
return True
except Exception as e:
print(f"清空任务失败: {e}")
return False
2025-11-28 18:17:54 +08:00
if __name__ == '__main__':
import os
from pathlib import Path
from public_function.public_func import read_config
config_path = os.path.join(Path(__file__).resolve().parent, 'config.yaml')
config = read_config(config_path)
redis_conn = RedisTaskManager(config)
token = "opB4ztbdw45xFoJbXti20520bsEq3UDKKAtiDWHnGjjhP6v0KNFjqBM7bfzto6GLdUPviYnVdCgdCJYqe42nPoy6mvW59F3TPQZu"
# redis_conn.write_string_to_h_set("user_token", token, 1)
print(redis_conn.check_field_exists("user_token", token))