From ebaabae1884bcc4e97790a8d67806e0ab4e137bc Mon Sep 17 00:00:00 2001 From: liujianjiang Date: Wed, 26 Nov 2025 17:40:11 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- account_management/__init__.py | 1 + account_management/deal_account.py | 61 +++++++++++ crawler_management/__init__.py | 1 + device_management/__init__.py | 1 + main.py | 112 +++++++++++++++++++ model/__init__.py | 1 + model/model.py | 22 ++++ public_function/__init__.py | 16 +++ public_function/asyn_mysql.py | 142 +++++++++++++++++++++++++ public_function/config.yaml | 21 ++++ public_function/public_func.py | 12 +++ public_function/redis_task_manager.py | 72 +++++++++++++ public_function/table_log.sql | 61 +++++++++++ task_management/__init__.py | 1 + task_management/all_task_management.py | 44 ++++++++ 15 files changed, 568 insertions(+) create mode 100644 account_management/__init__.py create mode 100644 account_management/deal_account.py create mode 100644 crawler_management/__init__.py create mode 100644 device_management/__init__.py create mode 100644 main.py create mode 100644 model/__init__.py create mode 100644 model/model.py create mode 100644 public_function/__init__.py create mode 100644 public_function/asyn_mysql.py create mode 100644 public_function/config.yaml create mode 100644 public_function/public_func.py create mode 100644 public_function/redis_task_manager.py create mode 100644 public_function/table_log.sql create mode 100644 task_management/__init__.py create mode 100644 task_management/all_task_management.py diff --git a/account_management/__init__.py b/account_management/__init__.py new file mode 100644 index 0000000..40a96af --- /dev/null +++ b/account_management/__init__.py @@ -0,0 +1 @@ +# -*- coding: utf-8 -*- diff --git a/account_management/deal_account.py b/account_management/deal_account.py new file mode 100644 index 0000000..2576da1 --- /dev/null +++ b/account_management/deal_account.py @@ -0,0 +1,61 @@ +# -*- coding: utf-8 -*- +import os +from typing import Optional, Dict, Any, List + +from public_function.asyn_mysql import AsyncMySQL + + +class DealAccount: + def __init__(self, config_data: Dict[str, Any]): + self.config_data = config_data + self.db_pool: Optional[AsyncMySQL] = AsyncMySQL(self.config_data["advert_policy"]) + + async def add_account(self, params: List[Dict[str, Any]]): + """新增账户""" + await self.db_pool.initialize() + result = await self.db_pool.insert_many(table="crawler_account_record_info", data=params) + if result: + return True + return False + + async def delete_account(self, params: List[str]): + """删除账户""" + if len(params) == 1: + condition = "account_id={}".format(params[0]) + else: + condition = f"account_id in ({','.join(params)})" + await self.db_pool.initialize() + result = await self.db_pool.delete(table="crawler_account_record_info", where_conditions=condition) + if result: + return True + return False + + async def query_account_info(self, params: Dict[str, Any]): + """查询具体账户信息""" + sql_str = f"""select * from crawler_account_record_info where account_id='{params['account_id']}'""" + await self.db_pool.initialize() + result = await self.db_pool.fetch_all(sql_str, ) + if result: + return result + return [] + + async def obtain_account_info(self, app_name, country, number=1): + """获取指定个数账户信息""" + sql_str = f"""select account_id,password from crawler_account_record_info + where status=1 and app_name='{app_name}' and country='{country}' limit {number}""" + await self.db_pool.initialize() + result = await self.db_pool.fetch_all(sql_str, ) + if result: + return result + return [] + + async def update_account_info(self, set_param: Dict[str, Any], params): + """更新账户信息""" + # params = {'name': '张三', 'age': 25} + where_conditions = "account_id = %s" + await self.db_pool.initialize() + affected_rows = await self.db_pool.update(table='crawler_account_record_info', set_columns=set_param, + where_conditions=where_conditions, params=params) + if affected_rows: + return True + return False diff --git a/crawler_management/__init__.py b/crawler_management/__init__.py new file mode 100644 index 0000000..40a96af --- /dev/null +++ b/crawler_management/__init__.py @@ -0,0 +1 @@ +# -*- coding: utf-8 -*- diff --git a/device_management/__init__.py b/device_management/__init__.py new file mode 100644 index 0000000..40a96af --- /dev/null +++ b/device_management/__init__.py @@ -0,0 +1 @@ +# -*- coding: utf-8 -*- diff --git a/main.py b/main.py new file mode 100644 index 0000000..8c0d364 --- /dev/null +++ b/main.py @@ -0,0 +1,112 @@ +import os +import yaml +import uuid +import asyncio +import uvicorn +from pathlib import Path +from typing import Dict, Any, Optional, List +from fastapi import FastAPI, HTTPException, Depends + +from public_function.public_func import read_config +from model.model import AccountCreate, AccountUpdate, CrawlerTask + +app = FastAPI() + + +def get_config(): + """获取配置文件""" + config_path = os.path.join(Path(__file__).resolve().parent, 'public_function/config.yaml') + try: + # 这里假设read_config函数存在 + from public_function.public_func import read_config + return read_config(config_path) + except ImportError: + logger.warning("未找到read_config函数,使用默认配置") + return {'host': 'localhost', 'port': 3306, 'user': 'root', 'password': 'password', 'db': 'test_db', 'max_overflow': 10} + + +def get_account_manager(): + """获取账号管理器实例""" + config = get_config() + try: + from account_management.deal_account import DealAccount + return DealAccount(config) + except ImportError: + logger.warning("未找到DealAccount类,返回模拟实例") + return None + + +def get_task_manager(): + """获任务管理器实例""" + config = get_config() + try: + from task_management.all_task_management import AllTask + return AllTask(config) + except ImportError: + logger.warning("未找到AllTask类,返回模拟实例") + return None + + +# 账号处理相关 +@app.get("/obtain_account", summary="获取可用账号") +async def obtain_account(app_name: str, country: str, account_manager: Any = Depends(get_account_manager)): + """ + 获取指定应用的可用账号 + - **app_name**: 应用名称 + """ + if not app_name or not app_name.strip(): + raise HTTPException(status_code=400, detail="应用名称不能为空") + if not country or not country.strip(): + raise HTTPException(status_code=400, detail="国家不能为空会") + try: + result = await account_manager.obtain_account_info(app_name, country) + if result: + return {"code": 200, "message": "获取账号成功", "data": result[0]} + else: + raise HTTPException(status_code=404, detail="没有可用的账号") + except Exception as e: + print(f"获取账号失败: {e}") + raise HTTPException(status_code=404, detail="{}".format(e)) + + +@app.post("/add_account", summary="新增账号") +async def add_account(account_data: AccountCreate, account_manager: Any = Depends(get_account_manager)): + """ + 新增爬虫账号 + - **account_id**: 账号ID + - **password**: 密码 + - **app_name**: 应用名称 + """ + try: + print(account_data.dict()) + # 这里应该调用实际的添加账号方法 + result = await account_manager.add_account([account_data.dict()]) + return {"code": 200, "message": "新增账号成功", "data": result} + except Exception as e: + print(f"新增账号失败: {e}") + raise HTTPException(status_code=500, detail="新增账号失败,失败原因:{}".format(e)) + + +@app.post("/receive_data") +async def receive_data(params: Dict[str, Any]): + """数据接收接口""" + # 接收道德数据表写入redis后,存入mysql(可以根据业务需求确认是否需要永久保存) + + +@app.get("/crawler_task") +async def crawler_task(task_data: CrawlerTask, task_manager: Any = Depends(get_task_manager)): + """爬虫任务接口""" + try: + params = task_data.dict() + params['task_id'] = uuid.uuid4().hex + result = await task_manager.task_distribution(params) + if result: + return {"code": 200, "message": "", "data": result} + raise HTTPException(status_code=404, detail="抓取商品数据失败,请重新尝试") + except Exception as e: + print(f": {e}") + raise HTTPException(status_code=500, detail="获取数据失败;失败原因{}".format(e)) + + +if __name__ == '__main__': + uvicorn.run(app) diff --git a/model/__init__.py b/model/__init__.py new file mode 100644 index 0000000..40a96af --- /dev/null +++ b/model/__init__.py @@ -0,0 +1 @@ +# -*- coding: utf-8 -*- diff --git a/model/model.py b/model/model.py new file mode 100644 index 0000000..6576479 --- /dev/null +++ b/model/model.py @@ -0,0 +1,22 @@ +# -*- coding: utf-8 -*- +from pydantic import BaseModel, Field + + +# 定义数据模型 +class AccountCreate(BaseModel): + account_id: str = Field(..., min_length=1, max_length=128, description="账号ID") + password: str = Field(..., min_length=1, max_length=128, description="密码") + country: str = Field(..., min_length=1, max_length=128, description="账号所在国家") + app_name: str = Field(..., min_length=1, max_length=128, description="应用名称") + + +class AccountUpdate(BaseModel): + account_id: str = Field(..., description="账号ID") + status: int = Field(..., ge=1, le=2, description="状态:1-空闲,2-使用中") + + +class CrawlerTask(BaseModel): + country: str = Field(..., min_length=1, max_length=128, description="账号所在国家") + app_name: str = Field(..., min_length=1, max_length=128, description="应用名称") + goods_id: str = Field(..., min_length=1, max_length=128, description="账号所在国家") + store_id: str = Field(..., min_length=1, max_length=128, description="应用名称") diff --git a/public_function/__init__.py b/public_function/__init__.py new file mode 100644 index 0000000..8df4cb9 --- /dev/null +++ b/public_function/__init__.py @@ -0,0 +1,16 @@ +# param = {'endTime': 1762591513782, 'padCode': 'ACP5B44DWZTR7DP6', 'padStatus': None, 'taskContent': '', 'taskId': 3422505, +# 'taskResult': 'Success', 'taskStatus': 3} +# task_info = {'mirrorName': 'bkp-ACP251016US1U4WP-1762437337310', 'countryCode': 'US', +# 'deviceId': 'ACP251016US1U4WP-2025-11-06 07:48:47-0gsctk80', 'retentionScript': 'reback.js', 'retentionFuture': True, +# 'pad_code': ['ACP5B44DWZTR7DP6'], 'task_id': 'acp5b44dwztr7dp6_d773b1a222d84a089ec38d8dacbd37f2', 'taskId': 3422505, +# 'task_type': 'restore'} +# print(param.get('taskId', 0)) +# print(task_info.get('taskId', 0)) +# print(param.get("taskResult", "")) +# if param.get('taskId', 0) == task_info.get("taskId", 0) and param.get("taskResult", "") == "Success": +# print(task_info) +# # 需要判断该任务是备份韩式还原 +# task_type = task_info.get("task_type", "") +# if task_type == "restore": +# print(f"{get_local_time()};{param['padCode']};实例还原成功") + diff --git a/public_function/asyn_mysql.py b/public_function/asyn_mysql.py new file mode 100644 index 0000000..9ea9228 --- /dev/null +++ b/public_function/asyn_mysql.py @@ -0,0 +1,142 @@ +import asyncio +import aiomysql +from typing import List, Tuple, Dict, Any + + +class AsyncMySQL: + def __init__(self, config_data: Dict): + + self.config = { + 'host': config_data['host'], + 'port': config_data['port'], + 'user': config_data['user'], + 'password': config_data['password'], + 'db': config_data['db'], + 'autocommit': True, + 'minsize': 1, + 'maxsize': config_data['max_overflow'], + } + self.pool = None + + async def initialize(self): + """初始化连接池""" + self.pool = await aiomysql.create_pool(**self.config) + return self + + async def close(self): + """关闭连接池""" + if self.pool: + self.pool.close() + await self.pool.wait_closed() + + async def execute(self, query: str, params=None): + """执行单条SQL语句""" + async with self.pool.acquire() as conn: + async with conn.cursor() as cursor: + await cursor.execute(query, params) + return cursor.rowcount + + async def executemany(self, query: str, params_list: List[Tuple]): + """批量执行SQL语句""" + async with self.pool.acquire() as conn: + async with conn.cursor() as cursor: + await cursor.executemany(query, params_list) + return cursor.rowcount + + async def insert_many_tuple(self, table: str, columns: List[str], data: List[Tuple]): + """批量插入数据到指定表""" + placeholders = ', '.join(['%s'] * len(columns)) + columns_str = ', '.join(columns) + query = f"INSERT INTO {table} ({columns_str}) VALUES ({placeholders})" + return await self.executemany(query, data) + + async def insert_many(self, table: str, data: List[Dict[str, Any]]): + columns = list(data[0].keys()) + # 从字典数据中提取值,转换为元组列表 + params_list = [tuple(record.get(col) for col in columns) for record in data] + placeholders = ', '.join(['%s'] * len(columns)) + columns_str = ', '.join(columns) + query = f"INSERT INTO {table} ({columns_str}) VALUES ({placeholders})" + return await self.executemany(query, params_list) + + async def fetch_all(self, query: str, params=None) -> List[Dict[str, Any]]: + """查询多条记录""" + async with self.pool.acquire() as conn: + async with conn.cursor(aiomysql.DictCursor) as cursor: + await cursor.execute(query, params) + return await cursor.fetchall() + + async def delete(self, table: str, where_conditions: str = None, params: Tuple = None) -> int: + + if where_conditions: + query = f"DELETE FROM {table} WHERE {where_conditions}" + else: + query = f"DELETE FROM {table}" + + return await self.execute(query, params) + + async def delete_many(self, table: str, conditions_list: List[Tuple[str, Tuple]]) -> int: + + total_affected = 0 + for where_conditions, params in conditions_list: + affected_rows = await self.delete(table, where_conditions, params) + total_affected += affected_rows + return total_affected + + async def update(self, table: str, set_columns: Dict[str, Any], where_conditions: str = None, + params: Tuple = None) -> int: + """ + 更新单条记录 + :param table: 表名 + :param set_columns: 要更新的字段和值字典 + :param where_conditions: WHERE条件 + :param params: 参数列表 + :return: 受影响的行数 + """ + set_clause = ', '.join([f"{k} = %s" for k in set_columns]) + query = f"UPDATE {table} SET {set_clause}" + + if where_conditions: + query += f" WHERE {where_conditions}" + + # 构造参数列表 + update_params = list(set_columns.values()) + if where_conditions: + if params: + update_params.extend(params) + else: + raise ValueError("当使用WHERE条件时,必须提供参数") + + return await self.execute(query, update_params) + + async def update_many(self, table: str, set_columns: Dict[str, Any], conditions_list: List[Tuple[str, Tuple]]) -> int: + """ + 批量更新多条记录 + :param table: 表名 + :param set_columns: 要更新的字段和值字典 + :param conditions_list: 条件列表,每个元素是(where_conditions, params) + :return: 受影响的总行数 + """ + set_clause = ', '.join([f"{k} = %s" for k in set_columns]) + total_affected = 0 + + for where_conditions, params in conditions_list: + query = f"UPDATE {table} SET {set_clause} WHERE {where_conditions}" + update_params = list(set_columns.values()) + if params: + update_params.extend(params) + + affected_rows = await self.execute(query, update_params) + total_affected += affected_rows + + return total_affected + + +if __name__ == '__main__': + from public_function.public_func import read_config + + config = read_config(r"C:\workfile\crawler_task_management\public_function\config.yaml") + obj = AsyncMySQL(config['advert_policy']) + sql_str = f"""select account_id,password from crawler_account_record_info + where status=1 and app_name='xiapi' limit 1""" + obj.fetch_all(sql_str) diff --git a/public_function/config.yaml b/public_function/config.yaml new file mode 100644 index 0000000..ff374f8 --- /dev/null +++ b/public_function/config.yaml @@ -0,0 +1,21 @@ +advert_policy: + host: pc-2ze85s1hw783u87wlo.mysql.polardb.rds.aliyuncs.com + port: 3306 + user: policyuser + password: IvM@ck#z9$Eqy3KGBK74hk + db: advert_policy + table: save_create_device_info + # 连接池配置 + pool_size: 10 + max_overflow: 20 + pool_recycle: 3600 + pool_timeout: 30 + +access_key_id: LTAI5tA92Av7DQmSQY2MTJPe +access_key_secret: KI5s3C78HcPX9MDjUwPwVDytFzxEjY + +redis_config: + host: 182.92.181.218 + port: 6372 + password: qaz@wsx + db: 0 \ No newline at end of file diff --git a/public_function/public_func.py b/public_function/public_func.py new file mode 100644 index 0000000..b31f3cf --- /dev/null +++ b/public_function/public_func.py @@ -0,0 +1,12 @@ +# -*- coding: utf-8 -*- +import yaml +from pathlib import Path + + +def read_config(path): + if Path(path).exists(): + with open(path, encoding="utf-8") as f: + config = yaml.safe_load(f) + return config + else: + raise FileNotFoundError diff --git a/public_function/redis_task_manager.py b/public_function/redis_task_manager.py new file mode 100644 index 0000000..c08756e --- /dev/null +++ b/public_function/redis_task_manager.py @@ -0,0 +1,72 @@ +import os +import json +import redis +from typing import List, Dict, Any, Optional + + +class RedisTaskManager: + def __init__(self, config_data: Dict[str, Any]): + self.config_data = config_data + """初始化Redis连接""" + self.redis_client = redis.Redis( + host=self.config_data['redis_config']['host'], + port=self.config_data['redis_config']['port'], + password=self.config_data['redis_config']['password'], + db=self.config_data['redis_config']['db'], + decode_responses=True + ) + self.expire_hours = 24 # 过期时间24小时 + + def write_data(self, key: str, data: Any, expire_time: Optional[int] = None): + """ + 写入数据到Redis,设置过期时间 + Args: + key: Redis键名 + data: 要存储的数据 + expire_time: 自定义过期时间(秒),默认24小时 + Returns: + bool: 写入是否成功 + """ + try: + # 如果数据是字典或列表,先序列化为JSON + if isinstance(data, (dict, list)): + data_str = json.dumps(data, ensure_ascii=False) + else: + data_str = str(data) + # 设置过期时间,优先使用自定义时间 + expire_seconds = expire_time if expire_time is not None else self.expire_hours * 3600 + result = self.redis_client.setex(key, expire_seconds, data_str) + + if result: + print(f"数据写入成功 - 键: {key}, 过期时间: {expire_seconds}秒") + return True + else: + print(f"数据写入失败 - 键: {key}") + return False + + except Exception as e: + print(f"写入Redis数据时发生错误: {e}") + return False + + def read_data(self, key: str): + """ + 从Redis读取数据 + Args: + key: Redis键名 + Returns: + Optional[Any]: 读取到的数据,如果键不存在返回None + """ + try: + data_str = self.redis_client.get(key) + if data_str is None: + print(f"键不存在或已过期 - 键: {key}") + return None + # 尝试解析JSON数据 + try: + return json.loads(data_str) + except json.JSONDecodeError: + # 如果不是JSON格式,返回原始字符串 + return data_str + except Exception as e: + print(f"读取Redis数据时发生错误: {e}") + return None diff --git a/public_function/table_log.sql b/public_function/table_log.sql new file mode 100644 index 0000000..9547b0a --- /dev/null +++ b/public_function/table_log.sql @@ -0,0 +1,61 @@ +CREATE TABLE crawler_device_info_record +( + id INT(11) NOT NULL AUTO_INCREMENT COMMENT '表自增ID', + pad_code VARCHAR(50) NOT NULL COMMENT '设备唯一标识', + name VARCHAR(100) NOT NULL DEFAULT '' COMMENT '设备名称', + status int NOT NULL DEFAULT 1 COMMENT '任务状态:1、空闲 ;2、使用中', + create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', + PRIMARY KEY (id), + UNIQUE KEY uk_pad_code (pad_code), + KEY idx_status (status) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='爬虫设备信息表'; + +create table crawler_task_record_info +( + id INT(11) NOT NULL AUTO_INCREMENT COMMENT '表自增ID', + task_id varchar(50) NOT NULL COMMENT '任务ID', + app_name VARCHAR(50) NOT NULL COMMENT 'app名称', + country VARCHAR(50) NOT NULL COMMENT '国家', + status int NOT NULL DEFAULT 1 COMMENT '任务状态:1、开始执行;2、执行成功;3、执行失败', + create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', + PRIMARY KEY (id), + UNIQUE KEY uk_task_id (task_id), + KEY idx_status (status) +)ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='爬虫任务执行记录表'; + + + +CREATE TABLE crawler_account_record_info +( + id INT(11) NOT NULL AUTO_INCREMENT COMMENT '表自增ID', + account_id VARCHAR(50) NOT NULL COMMENT '账号名称', + password VARCHAR(50) NOT NULL COMMENT '账号密码', + country VARCHAR(50) NOT NULL COMMENT '国家', + status INT NOT NULL DEFAULT 1 COMMENT '任务状态:1、空闲;2、使用中', + app_name VARCHAR(50) NOT NULL COMMENT 'app名称', + create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', + PRIMARY KEY (id), + UNIQUE KEY uk_account_id (account_id,app_name), + KEY idx_status (status), + KEY idx_country (country) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='爬虫账号记录表'; + + +CREATE TABLE goods_information_record +( + id INT(11) NOT NULL AUTO_INCREMENT COMMENT '表自增ID', + goods_id VARCHAR(50) NOT NULL COMMENT '商品ID', + store_id VARCHAR(50) NOT NULL COMMENT '店铺ID', + country VARCHAR(50) NOT NULL COMMENT '国家', + app_name VARCHAR(50) NOT NULL COMMENT 'app名称', + goods_info text NOT NULL COMMENT '商品具体价格详情等信息', + create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', + PRIMARY KEY (id), + UNIQUE KEY uk_goods_info (goods_id,store_id,country,app_name), + KEY idx_status (status), + KEY idx_country (country) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='爬虫账号记录表'; \ No newline at end of file diff --git a/task_management/__init__.py b/task_management/__init__.py new file mode 100644 index 0000000..40a96af --- /dev/null +++ b/task_management/__init__.py @@ -0,0 +1 @@ +# -*- coding: utf-8 -*- diff --git a/task_management/all_task_management.py b/task_management/all_task_management.py new file mode 100644 index 0000000..9e4d806 --- /dev/null +++ b/task_management/all_task_management.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 -*- +import time +from typing import Dict, Any + +from public_function.asyn_mysql import AsyncMySQL +from public_function.redis_task_manager import RedisTaskManager + + +class AllTask: + def __init__(self, config_data: Dict[str, Any]): + self.config_data = config_data + self.redis_conn = RedisTaskManager(self.config_data["redis_config"]) + self.db_pool: Optional[AsyncMySQL] = AsyncMySQL(self.config_data["advert_policy"]) + + async def deal_shopee_task(self, param): + # 查询redis数据库,redis 数据库存在该数据直接返回 + key = f"{param['app_name']}:{param['store_id']}:{param['goods_id']}" + result = self.redis_conn.read_data(key) + if result: + return result + # 调用对应爬虫任务 + # 任务结束后开始等待 + endtime = time.time() + 55 + while time.time() < endtime: + await time.sleep(1) + result = self.redis_conn.read_data(key) + if result: + return result + return [] + + async def task_distribution(self, data: Dict[str, Any]): + # 需要对任务进行记录 + try: + await self.db_pool.initialize() + # 将任务记录到mysql + param = {"app_name": data["app_name"], "task_id": data["task_id"], "country": data["country"]} + await self.db_pool.insert_many(table="crawler_task_record_info", data=param) + except Exception as e: + print("将任务记录到数据库失败,失败原因为:{}".format(e)) + if param["app_name"] == "Shopee": + result = await self.deal_shopee_task(param) + if result: + return result + return None