# -*- coding: utf-8 -*- import time import json from typing import Dict, Any from public_function.asyn_mysql import AsyncMySQL from public_function.public_func import create_logger from public_function.redis_task_manager import RedisTaskManager logger = create_logger(file_name="all_task_management") class AllTask: def __init__(self, config_data: Dict[str, Any]): self.config_data = config_data self.redis_conn = RedisTaskManager(self.config_data) self.db_pool: Optional[AsyncMySQL] = AsyncMySQL(self.config_data["advert_policy"]) async def deal_shopee_task(self, param): # 查询redis数据库,redis 数据库存在该数据直接返回 key = f"{param['store_id']}:{param['goods_id']}" if not param.get('is_re_crawl', False): print(f"{key} 开始在redis中获取数据") result = self.redis_conn.read_data(key) if result: return result # 调用对应爬虫任务 print(f"{key} 在redis中获取数据失败,开始调用爬虫抓取数据") # 任务结束后开始等待 result = dict() endtime = time.time() + 55 while time.time() < endtime: await time.sleep(5) result = self.redis_conn.read_data(key) if result: print(f"{key} 调用爬虫抓取数据后在redis获取到数据") return result return result async def task_distribution(self, data: Dict[str, Any]): result = dict() if data["app_name"] == "Shopee": try: result = await self.deal_shopee_task(data) except Exception as e: print(f"{data['goods_id']}:{data['store_id']}获取商品数据失败,失败原因为:{e}") return result async def deal_receive_data(self, data: Dict[str, Any]): # 将商品数据写入mysql和redis print("开始处理接收到得数据:{}".format(data)) await self.db_pool.initialize() key = f"{data['store_id']}:{data['goods_id']}" params = data.get("goods_info") affected_rows = self.redis_conn.write_data(key, params) if affected_rows: print(f"{key};数据已存入redis中") params = {"goods_id": data["goods_id"], "store_id": data["store_id"], "country": data["country"], "app_name": data["app_name"], "goods_info": json.dumps(data)} print(f"{params}<") affected_rows = await self.db_pool.insert_many(table='goods_information_record', data=[params]) if affected_rows: print(f"{key};商品数据已存入mysql") # 更改任务状态 params = {"task_id": data["task_id"]} where_conditions = "account_id = %s" affected_rows = await self.db_pool.update(table='crawler_account_record_info', set_columns={"status": 2}, where_conditions=where_conditions, params=params) if affected_rows: print(f"task_id{data['task_id']} 任务完成,任务状态已修改成功") return True