# -*- coding: utf-8 -*- import time from typing import Dict, Any from public_function.asyn_mysql import AsyncMySQL from public_function.redis_task_manager import RedisTaskManager class AllTask: def __init__(self, config_data: Dict[str, Any]): self.config_data = config_data self.redis_conn = RedisTaskManager(self.config_data["redis_config"]) self.db_pool: Optional[AsyncMySQL] = AsyncMySQL(self.config_data["advert_policy"]) async def deal_shopee_task(self, param): # 查询redis数据库,redis 数据库存在该数据直接返回 key = f"{param['app_name']}:{param['store_id']}:{param['goods_id']}" result = self.redis_conn.read_data(key) if result: return result # 调用对应爬虫任务 # 任务结束后开始等待 endtime = time.time() + 55 while time.time() < endtime: await time.sleep(1) result = self.redis_conn.read_data(key) if result: return result return [] async def task_distribution(self, data: Dict[str, Any]): # 需要对任务进行记录 try: await self.db_pool.initialize() # 将任务记录到mysql param = {"app_name": data["app_name"], "task_id": data["task_id"], "country": data["country"]} await self.db_pool.insert_many(table="crawler_task_record_info", data=param) except Exception as e: print("将任务记录到数据库失败,失败原因为:{}".format(e)) if param["app_name"] == "Shopee": result = await self.deal_shopee_task(param) if result: return result return None