crawler_task_management/public_function/redis_task_manager.py

217 lines
8.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import json
import redis
import random
from datetime import datetime
from typing import List, Dict, Any, Optional
class RedisTaskManager:
def __init__(self, config_data: Dict[str, Any]):
self.config_data = config_data
"""初始化Redis连接"""
self.redis_client = redis.Redis(
host=self.config_data['redis_config']['host'],
port=self.config_data['redis_config']['port'],
password=self.config_data['redis_config']['password'],
db=self.config_data['redis_config']['db'],
decode_responses=True
)
self.expire_hours = 24 # 过期时间24小时
self.processing_timeout = 300
self.lock_timeout = 30 # 锁超时时间
def write_data(self, key: str, data: Any, expire_time: Optional[int] = None):
"""
写入数据到Redis设置过期时间
Args:
key: Redis键名
data: 要存储的数据
expire_time: 自定义过期时间默认24小时
Returns:
bool: 写入是否成功
"""
try:
# 如果数据是字典或列表先序列化为JSON
if isinstance(data, (dict, list)):
data_str = json.dumps(data, ensure_ascii=False)
else:
data_str = str(data)
# 设置过期时间,优先使用自定义时间
expire_seconds = expire_time if expire_time is not None else self.expire_hours * 3600
result = self.redis_client.setex(key, expire_seconds, data_str)
if result:
print(f"数据写入成功 - 键: {key}, 过期时间: {expire_seconds}")
return True
else:
print(f"数据写入失败 - 键: {key}")
return False
except Exception as e:
print(f"写入Redis数据时发生错误: {e}")
return False
def read_data(self, key: str):
"""
从Redis读取数据
Args:
key: Redis键名
Returns:
Optional[Any]: 读取到的数据如果键不存在返回None
"""
try:
data_str = self.redis_client.get(key)
if data_str is None:
return None
# 尝试解析JSON数据
try:
return json.loads(data_str)
except json.JSONDecodeError:
# 如果不是JSON格式返回原始字符串
return data_str
except Exception as e:
print(f"读取Redis数据时发生错误: {e}")
return None
def check_field_exists(self, key: str, field: str) -> bool:
"""检查字段是否存在"""
try:
exists = self.redis_client.hexists(key, field)
return exists
except Exception as e:
print(f"检查字段存在失败: {e}")
return False
def delete_field(self, key: str, field: str) -> bool:
"""删除HSET中的字段"""
try:
result = self.redis_client.hdel(key, field)
return result > 0
except Exception as e:
print(f"删除字段失败: {e}")
return False
def write_string_to_h_set(self, key: str, field: str, value: str) -> bool:
"""将字符串写入Redis HSET"""
# key 默认为 user_token, field 默认为 token 值,value 为 123
try:
result = self.redis_client.hset(key, field, value)
return result > 0
except Exception as e:
print(f"写入HSET失败: {e}")
return False
def add_task_to_set(self, task_data: Dict[str, Any]) -> bool:
"""添加任务到Redis集合"""
try:
# 生成唯一任务标识
task_data['created_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
key = f"{task_data['shop_id']}_{task_data['item_id']}"
redis_key = f"{task_data["region"].lower()}_{task_data['app_name'].lower()}"
# 检查任务是否已存在
existing_task = self.redis_client.hget(redis_key, key)
if existing_task:
print(f"任务已存在: {key}")
return False
task_json = json.dumps(task_data)
# 使用HSET添加任务并设置过期时间
result = self.redis_client.hset(redis_key, key, task_json)
self.redis_client.expire(redis_key, self.processing_timeout)
return result > 0
except Exception as e:
print(f"添加任务失败: {e}")
return False
def get_random_task_and_delete(self, key: str) -> Optional[Dict[str, Any]]:
"""随机获取一个任务并从HSET中删除确保一次性消费"""
lock_key = f"{key}:lock"
# 尝试获取分布式锁
if self._acquire_lock(lock_key):
try:
# 获取HSET中的所有任务
all_tasks = self.redis_client.hgetall(key)
if not all_tasks:
return None
# 随机选择一个任务
task_id = random.choice(list(all_tasks.keys()))
task_json = all_tasks[task_id]
task_data = json.loads(task_json)
# 删除该任务
self.redis_client.hdel(key, task_id)
return task_data
finally:
self._release_lock(lock_key)
else:
print("获取锁失败,可能其他进程正在处理")
return None
def get_and_process_task(self, key: str, process_callback: callable) -> bool:
"""获取并处理任务"""
task_data = self.get_random_task_and_delete(key)
if task_data:
try:
result = process_callback(task_data)
return result
except Exception as e:
print(f"任务处理失败: {e}")
return False
return False
def _acquire_lock(self, lock_key: str) -> bool:
"""获取分布式锁"""
try:
# 使用SET命令获取锁NX表示不存在时才设置EX设置过期时间
result = self.redis_client.set(lock_key, "locked", ex=self.lock_timeout, nx=True)
return result is not None
except Exception as e:
print(f"获取锁失败: {e}")
return False
def _release_lock(self, lock_key: str) -> bool:
"""释放分布式锁"""
try:
self.redis_client.delete(lock_key)
return True
except Exception as e:
print(f"释放锁失败: {e}")
return False
def get_task_count(self, key: str) -> int:
"""获取指定key中的任务数量"""
try:
return self.redis_client.hlen(key)
except Exception as e:
print(f"获取任务数量失败: {e}")
return 0
def get_all_tasks(self, key: str) -> List[Dict[str, Any]]:
"""获取所有任务(仅查看,不删除)"""
try:
all_tasks = self.redis_client.hgetall(key)
return [json.loads(task_json) for task_json in all_tasks.values()]
except Exception as e:
print(f"获取所有任务失败: {e}")
return []
def clear_all_tasks(self, key: str) -> bool:
"""清空所有任务"""
try:
self.redis_client.delete(key)
return True
except Exception as e:
print(f"清空任务失败: {e}")
return False
if __name__ == '__main__':
import os
from pathlib import Path
from public_function.public_func import read_config
config_path = os.path.join(Path(__file__).resolve().parent, 'config.yaml')
config = read_config(config_path)
redis_conn = RedisTaskManager(config)
token = "opB4ztbdw45xFoJbXti20520bsEq3UDKKAtiDWHnGjjhP6v0KNFjqBM7bfzto6GLdUPviYnVdCgdCJYqe42nPoy6mvW59F3TPQZu"
redis_conn.write_string_to_h_set("user_token", token, 1)
# print(redis_conn.check_field_exists("user_token", token))