# -*- coding: utf-8 -*- import time import json import asyncio from typing import Dict, Any from public_function.asyn_mysql import AsyncMySQL from public_function.public_func import create_logger from public_function.redis_task_manager import RedisTaskManager logger = create_logger(file_name="all_task_management") class AllTask: def __init__(self, config_data: Dict[str, Any]): self.config_data = config_data self.redis_conn = RedisTaskManager(self.config_data) self.db_pool: Optional[AsyncMySQL] = AsyncMySQL(self.config_data["advert_policy"]) async def deal_shopee_task(self, param: Dict[str, Any]): """ 处理Shopee任务的异步方法 修复了NoneType对象无法await的问题 """ # 查询redis数据库,redis数据库存在该数据直接返回 # key = f"{param['shop_id']}:{param['item_id']}" key = f"{param['app_name'].lower()}:{param['region'].lower()}" result = self.redis_conn.read_data(key) # 关键修复:确保result不是None再进行处理 if result is not None: print(f"{key} 从Redis缓存中获取到数据") return result print(f"{key} 在redis中获取数据失败,将任务提交到队列") # 确保add_task_to_set是异步方法或正确处理 self.redis_conn.add_task_to_set(task_data=param) # 任务结束后开始等待 endtime = time.time() + 55 counter = 0 while time.time() < endtime: # 使用asyncio.sleep而不是time.sleep await asyncio.sleep(5) counter += 1 result = self.redis_conn.read_data(key) # 关键修复:检查result是否为None if result is not None: print(f"{key} 第 {counter} 次在redis获取到数据") return result print(f"{key}:第 {counter} 次从redis中未能获取数据") print(f"{key} 在超时时间内未获取到数据") return None async def insert_task_record(self, params: Dict[str, Any]): # 需要进行记录,便于统计 await self.db_pool.initialize() try: insert_param = {"task_id": params["task_id"], "app_name": params["app_name"], "region": params["host"], "shop_id": params["shop_id"], "item_id": params["item_id"], "status": 1} result = await self.db_pool.insert_many("crawler_task_record_info", [insert_param]) if result: print(f"{params['task_id']} 数据存入mysql{result} 行成功") print(f"{params['task_id']} 数据存入mysql失败") except Exception as e: print(f"{params['task_id']} 数据存入mysql失败,失败原因为{e}") async def update_task_record(self, data: Dict[str, Any]): await self.db_pool.initialize() # 对任务状态进行修改 try: params = {"task_id": data["task_id"]} where_conditions = "task_id = %s" affected_rows = await self.db_pool.update(table='crawler_account_record_info', set_columns={"status": data["status"]}, where_conditions=where_conditions, params=params) if affected_rows: print(f"task_id{data['task_id']} 任务完成,任务状态已修改成功") return True except Exception as e: print(f"{params['task_id']} 状态更新失败;失败原因为:{e}") return False async def deal_receive_data(self, data: Dict[str, Any]): # 将商品数据写入mysql和redis print("开始处理接收到得数据:{}".format(data)) await self.db_pool.initialize() key = f"{data['shop_id']}:{data['item_id']}" params = data.get("goods_info") affected_rows = self.redis_conn.write_data(key, params) if affected_rows: print(f"{key};数据已存入redis中") params_data = {"item_id": data["item_id"], "shop_id": data["shop_id"], "region": data["region"], "app_name": data["app_name"], "goods_info": json.dumps(params)} print(f"{params}<") affected_rows = await self.db_pool.insert_many(table='goods_information_record', data=[params_data]) if affected_rows: print(f"{key};商品数据已存入mysql") # 更改任务状态 params = {"task_id": data["task_id"], "status": 2} result = await self.update_task_record(params) if result: print(f"task_id{data['task_id']} 任务完成,任务状态已修改成功") return True