diff --git a/public_function/redis_task_manager.py b/public_function/redis_task_manager.py index 268ac09..742bffe 100644 --- a/public_function/redis_task_manager.py +++ b/public_function/redis_task_manager.py @@ -79,7 +79,7 @@ class RedisTaskManager: # 生成唯一任务标识 task_data['created_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') key = f"{task_data['shop_id']}_{task_data['item_id']}" - redis_key = f"{task_data["country"].lower()}_{task_data['app_name'].lower()}" + redis_key = f"{task_data["region"].lower()}_{task_data['app_name'].lower()}" # 检查任务是否已存在 existing_task = self.redis_client.hget(redis_key, key) if existing_task: diff --git a/task_management/all_task_management.py b/task_management/all_task_management.py index 3e4557a..f560324 100644 --- a/task_management/all_task_management.py +++ b/task_management/all_task_management.py @@ -23,15 +23,12 @@ class AllTask: """ # 查询redis数据库,redis数据库存在该数据直接返回 # key = f"{param['shop_id']}:{param['item_id']}" - key = f"{param['app_name'].lower()}:{param['country'].lower()}" - # 如果不是重新抓取,先尝试从Redis获取数据 - if not param.get('is_re_crawl', False): - print(f"{key} 开始在redis中获取数据") - result = self.redis_conn.read_data(key) - # 关键修复:确保result不是None再进行处理 - if result is not None: - print(f"{key} 从Redis缓存中获取到数据") - return result + key = f"{param['app_name'].lower()}:{param['region'].lower()}" + result = self.redis_conn.read_data(key) + # 关键修复:确保result不是None再进行处理 + if result is not None: + print(f"{key} 从Redis缓存中获取到数据") + return result print(f"{key} 在redis中获取数据失败,将任务提交到队列") # 确保add_task_to_set是异步方法或正确处理 self.redis_conn.add_task_to_set(task_data=param) @@ -88,7 +85,7 @@ class AllTask: affected_rows = self.redis_conn.write_data(key, params) if affected_rows: print(f"{key};数据已存入redis中") - params_data = {"item_id": data["item_id"], "shop_id": data["shop_id"], "country": data["country"], "app_name": data["app_name"], + params_data = {"item_id": data["item_id"], "shop_id": data["shop_id"], "region": data["region"], "app_name": data["app_name"], "goods_info": json.dumps(params)} print(f"{params}<") affected_rows = await self.db_pool.insert_many(table='goods_information_record', data=[params_data])