From 3a1e13dcabbf5d55753554f198bbcf09164775e2 Mon Sep 17 00:00:00 2001 From: liujianjiang Date: Fri, 28 Nov 2025 14:50:01 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main.py | 4 +-- public_function/redis_task_manager.py | 15 ++++++++-- task_management/all_task_management.py | 40 +++++++++++++++++--------- 3 files changed, 42 insertions(+), 17 deletions(-) diff --git a/main.py b/main.py index 19688c8..a1d197b 100644 --- a/main.py +++ b/main.py @@ -133,7 +133,7 @@ async def receive_data(task_data: DataReceive, task_manager: Any = Depends(get_t return {"code": 200, "message": "数据保存成功", "data": result} raise HTTPException(status_code=404, detail="抓取商品数据失败,请重新尝试") except Exception as e: - print(f"{get_local_time()},商品数据处理失败,失败原因: {e}") + print(f"商品数据处理失败,失败原因: {e}") raise HTTPException(status_code=500, detail="获取数据失败;失败原因{}".format(e)) @@ -166,4 +166,4 @@ async def device_reset(task_data: DeviceResetData, reset_manager: Any = Depends( if __name__ == '__main__': - uvicorn.run(app) + uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/public_function/redis_task_manager.py b/public_function/redis_task_manager.py index c08756e..a637c77 100644 --- a/public_function/redis_task_manager.py +++ b/public_function/redis_task_manager.py @@ -36,7 +36,6 @@ class RedisTaskManager: # 设置过期时间,优先使用自定义时间 expire_seconds = expire_time if expire_time is not None else self.expire_hours * 3600 result = self.redis_client.setex(key, expire_seconds, data_str) - if result: print(f"数据写入成功 - 键: {key}, 过期时间: {expire_seconds}秒") return True @@ -59,7 +58,6 @@ class RedisTaskManager: try: data_str = self.redis_client.get(key) if data_str is None: - print(f"键不存在或已过期 - 键: {key}") return None # 尝试解析JSON数据 try: @@ -70,3 +68,16 @@ class RedisTaskManager: except Exception as e: print(f"读取Redis数据时发生错误: {e}") return None + + def add_task_to_set(self, task_data: Dict[str, Any], redis_key='crawler_task'): + try: + # 将任务数据序列化为JSON字符串 + key = f"{task_data['shop_id']}_{task_data['item_id']}" + task_json = json.dumps(task_data) + # 为每个pad_code在集合中添加相同的任务数据 + # Redis集合会自动去重,相同的pad_code只会保存一次 + self.redis_client.hset(redis_key, key, task_json) + return True + except Exception as e: + print(f"添加任务失败: {e}") + return False diff --git a/task_management/all_task_management.py b/task_management/all_task_management.py index 801a7fd..89960ea 100644 --- a/task_management/all_task_management.py +++ b/task_management/all_task_management.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- import time import json +import asyncio from typing import Dict, Any from public_function.asyn_mysql import AsyncMySQL from public_function.public_func import create_logger @@ -15,26 +16,39 @@ class AllTask: self.redis_conn = RedisTaskManager(self.config_data) self.db_pool: Optional[AsyncMySQL] = AsyncMySQL(self.config_data["advert_policy"]) - async def deal_shopee_task(self, param): - # 查询redis数据库,redis 数据库存在该数据直接返回 + async def deal_shopee_task(self, param: Dict[str, Any]): + """ + 处理Shopee任务的异步方法 + 修复了NoneType对象无法await的问题 + """ + # 查询redis数据库,redis数据库存在该数据直接返回 key = f"{param['shop_id']}:{param['item_id']}" + # 如果不是重新抓取,先尝试从Redis获取数据 if not param.get('is_re_crawl', False): print(f"{key} 开始在redis中获取数据") result = self.redis_conn.read_data(key) - if result: + # 关键修复:确保result不是None再进行处理 + if result is not None: + print(f"{key} 从Redis缓存中获取到数据") return result - # 调用对应爬虫任务 - print(f"{key} 在redis中获取数据失败,开始调用爬虫抓取数据") + print(f"{key} 在redis中获取数据失败,将任务提交到队列") + # 确保add_task_to_set是异步方法或正确处理 + self.redis_conn.add_task_to_set(task_data=param) # 任务结束后开始等待 - result = dict() endtime = time.time() + 55 + counter = 0 while time.time() < endtime: - await time.sleep(5) + # 使用asyncio.sleep而不是time.sleep + await asyncio.sleep(5) + counter += 1 result = self.redis_conn.read_data(key) - if result: - print(f"{key} 调用爬虫抓取数据后在redis获取到数据") + # 关键修复:检查result是否为None + if result is not None: + print(f"{key} 第 {counter} 次在redis获取到数据") return result - return result + print(f"{key}:第 {counter} 次从redis中未能获取数据") + print(f"{key} 在超时时间内未获取到数据") + return None async def task_distribution(self, data: Dict[str, Any]): result = dict() @@ -54,10 +68,10 @@ class AllTask: affected_rows = self.redis_conn.write_data(key, params) if affected_rows: print(f"{key};数据已存入redis中") - params = {"item_id": data["item_id"], "shop_id": data["shop_id"], "country": data["country"], "app_name": data["app_name"], - "goods_info": json.dumps(data)} + params_data = {"item_id": data["item_id"], "shop_id": data["shop_id"], "country": data["country"], "app_name": data["app_name"], + "goods_info": json.dumps(params)} print(f"{params}<") - affected_rows = await self.db_pool.insert_many(table='goods_information_record', data=[params]) + affected_rows = await self.db_pool.insert_many(table='goods_information_record', data=[params_data]) if affected_rows: print(f"{key};商品数据已存入mysql") # 更改任务状态