diff --git a/main.py b/main.py index f393e93..dab93bf 100644 --- a/main.py +++ b/main.py @@ -73,10 +73,11 @@ def get_reset_manager(): # 账号处理相关 @app.get("/obtain_account", summary="获取可用账号") -async def obtain_account(app_name: str, country: str, account_manager: Any = Depends(get_account_manager)): +async def obtain_account(app_name: str, country: str, pad_code: str, account_manager: Any = Depends(get_account_manager)): """ 获取指定应用的可用账号 - **app_name**: 应用名称 + pad_code """ if not app_name or not app_name.strip(): raise HTTPException(status_code=400, detail="应用名称不能为空") @@ -85,6 +86,7 @@ async def obtain_account(app_name: str, country: str, account_manager: Any = Dep try: result = await account_manager.obtain_account_info(app_name, country) if result: + # 将账号和设备进行记录 return {"code": 200, "message": "获取账号成功", "data": result[0]} else: raise HTTPException(status_code=404, detail="没有可用的账号") @@ -155,8 +157,8 @@ async def get_crawler_task(task_data: CrawlerItem, task_manager: Any = Depends(g async def receive_data(task_data: DataReceive, task_manager: Any = Depends(get_task_manager)): """数据接收接口""" try: + # 需要对账号设备进行记录,便于统计每个账号采集数据数量 params = task_data.model_dump() - print(params) result = await task_manager.deal_receive_data(params) if result: return {"code": 200, "message": "数据保存成功", "data": result} @@ -169,15 +171,26 @@ async def receive_data(task_data: DataReceive, task_manager: Any = Depends(get_t @app.post("/get_goods_info", summary="客户端获取商品数据") async def get_goods_info(task_data: GoodsInfo, task_manager: Any = Depends(get_task_manager)): """客户端获取商品数据接口""" + task_id = uuid.uuid4().hex try: params = task_data.model_dump() - result = await task_manager.task_distribution(params) + params['task_id'] = task_id + params["app_name"] = "Shopee" + await task_manager.insert_task_record(params) + print(f"开始处理任务: {task_id}") + result = await task_manager.deal_shopee_task(params) if result: + printinfo(f"任务处理成功: {task_id}") + data = {"task_id": params["task_id"], "status": 2} + await task_manager.update_task_record(data) return {"code": 200, "message": "数据获取成功", "data": result} - raise HTTPException(status_code=404, detail="抓取商品数据失败,请重新尝试") + print(f"任务处理失败: {task_id}") + raise HTTPException(status_code=503, detail="抓取商品数据失败,请重新尝试") except Exception as e: - print(f": {e}") - raise HTTPException(status_code=500, detail="获取数据失败;失败原因{}".format(e)) + print(f"任务异常 - ID: {task_id}, 错误: {str(e)}") + data = {"task_id": params["task_id"], "status": 3} + await task_manager.insert_task_record(data) + raise HTTPException(status_code=503, detail="抓取商品数据失败,请重新尝试") @app.get("/device_reset", summary="设备重置") @@ -185,7 +198,8 @@ async def device_reset(task_data: DeviceResetData, reset_manager: Any = Depends( """设备重置接口""" try: params = task_data.model_dump() - result = await reset_manager.task_distribution(params) + # result = await reset_manager.task_distribution(params) + result = True if result: return {"code": 200, "message": "", "data": result} raise HTTPException(status_code=404, detail="抓取商品数据失败,请重新尝试") diff --git a/model/model.py b/model/model.py index 76dc047..8c7f762 100644 --- a/model/model.py +++ b/model/model.py @@ -7,7 +7,7 @@ from typing import Optional, Dict, Any class AccountCreate(BaseModel): account_id: str = Field(..., description="账号ID") password: str = Field(..., description="密码") - country: str = Field(..., description="账号所在国家") + region: str = Field(..., description="账号所在地区") app_name: str = Field(..., description="应用名称") @@ -18,11 +18,9 @@ class AccountUpdate(BaseModel): class GoodsInfo(BaseModel): - country: str = Field(..., description="账号所在国家") - app_name: str = Field(..., description="应用名称") + host: str = Field(..., description="客户所在地区") item_id: str = Field(..., description="商品ID") shop_id: str = Field(..., description="店铺ID") - is_re_crawl: bool = Field(..., description="是否重新抓取") class DataReceive(BaseModel): @@ -30,16 +28,16 @@ class DataReceive(BaseModel): app_name: str = Field(..., description="应用名称") shop_id: str = Field(..., description="店铺ID") item_id: str = Field(..., description="商品ID") - country: str = Field(..., description="国家") + region: str = Field(..., description="地区") goods_info: Dict[str, Any] = Field(..., description="商品信息") class DeviceResetData(BaseModel): device_id: str = Field(..., description="设备ID") - country: str = Field(..., description="国家") + region: str = Field(..., description="地区") app_name: str = Field(..., description="应用名称") class CrawlerItem(BaseModel): - country: str = Field(..., description="账号所在国家") + region: str = Field(..., description="账号所在地区") app_name: str = Field(..., description="应用名称") diff --git a/public_function/redis_task_manager.py b/public_function/redis_task_manager.py index 3ae5031..268ac09 100644 --- a/public_function/redis_task_manager.py +++ b/public_function/redis_task_manager.py @@ -1,6 +1,5 @@ import os import json -import uuid import redis import random from datetime import datetime @@ -78,7 +77,6 @@ class RedisTaskManager: """添加任务到Redis集合""" try: # 生成唯一任务标识 - task_data['task_id'] = uuid.uuid4().hex task_data['created_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') key = f"{task_data['shop_id']}_{task_data['item_id']}" redis_key = f"{task_data["country"].lower()}_{task_data['app_name'].lower()}" diff --git a/public_function/table_log.sql b/public_function/table_log.sql index 95b9d5d..5c04908 100644 --- a/public_function/table_log.sql +++ b/public_function/table_log.sql @@ -1,22 +1,11 @@ -CREATE TABLE crawler_device_info_record -( - id INT(11) NOT NULL AUTO_INCREMENT COMMENT '表自增ID', - pad_code VARCHAR(50) NOT NULL COMMENT '设备唯一标识', - name VARCHAR(100) NOT NULL DEFAULT '' COMMENT '设备名称', - status int NOT NULL DEFAULT 1 COMMENT '任务状态:1、空闲 ;2、使用中', - create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', - update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', - PRIMARY KEY (id), - UNIQUE KEY uk_pad_code (pad_code), - KEY idx_status (status) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='爬虫设备信息表'; - create table crawler_task_record_info ( id INT(11) NOT NULL AUTO_INCREMENT COMMENT '表自增ID', task_id varchar(50) NOT NULL COMMENT '任务ID', app_name VARCHAR(50) NOT NULL COMMENT 'app名称', - country VARCHAR(50) NOT NULL COMMENT '国家', + shop_id VARCHAR(50) NOT NULL COMMENT '店铺ID', + item_id VARCHAR(50) NOT NULL COMMENT '商品ID', + region VARCHAR(50) NOT NULL COMMENT '客户所在地区', status int NOT NULL DEFAULT 1 COMMENT '任务状态:1、开始执行;2、执行成功;3、执行失败', create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', @@ -25,6 +14,18 @@ create table crawler_task_record_info KEY idx_status (status) )ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='爬虫任务执行记录表'; +CREATE TABLE crawler_device_info_record +( + id INT(11) NOT NULL AUTO_INCREMENT COMMENT '表自增ID', + pad_code VARCHAR(50) NOT NULL COMMENT '设备唯一标识', + status int NOT NULL DEFAULT 1 COMMENT '任务状态:1、空闲 ;2、使用中;3、暂停使用', + create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', + PRIMARY KEY (id), + UNIQUE KEY uk_pad_code (pad_code), + KEY idx_status (status) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='爬虫设备信息表'; + CREATE TABLE crawler_account_record_info @@ -32,30 +33,31 @@ CREATE TABLE crawler_account_record_info id INT(11) NOT NULL AUTO_INCREMENT COMMENT '表自增ID', account_id VARCHAR(50) NOT NULL COMMENT '账号名称', password VARCHAR(50) NOT NULL COMMENT '账号密码', - country VARCHAR(50) NOT NULL COMMENT '国家', - status INT NOT NULL DEFAULT 1 COMMENT '任务状态:1、空闲;2、使用中', + region VARCHAR(50) NOT NULL COMMENT '地区', + status INT NOT NULL DEFAULT 1 COMMENT '任务状态:1、空闲;2、使用中;3、暂停使用', app_name VARCHAR(50) NOT NULL COMMENT 'app名称', create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', PRIMARY KEY (id), UNIQUE KEY uk_account_id (account_id,app_name), KEY idx_status (status), - KEY idx_country (country) + KEY idx_region (region) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='爬虫账号记录表'; CREATE TABLE goods_information_record ( id INT(11) NOT NULL AUTO_INCREMENT COMMENT '表自增ID', - item_id VARCHAR(50) NOT NULL COMMENT '商品ID', - shop_id VARCHAR(50) NOT NULL COMMENT '店铺ID', - country VARCHAR(50) NOT NULL COMMENT '国家', + item_id VARCHAR(50) NOT NULL COMMENT '商品ID', + shop_id VARCHAR(50) NOT NULL COMMENT '店铺ID', + region VARCHAR(50) NOT NULL COMMENT '地区', app_name VARCHAR(50) NOT NULL COMMENT 'app名称', goods_info text NOT NULL COMMENT '商品具体价格详情等信息', create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', PRIMARY KEY (id), - UNIQUE KEY uk_goods_info (item_id,shop_id,country,app_name), + UNIQUE KEY uk_goods_info (item_id,shop_id,region,app_name), KEY idx_status (status), - KEY idx_country (country) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='爬虫账号记录表'; \ No newline at end of file + KEY idx_region (region) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='爬虫账号记录表'; + diff --git a/task_management/all_task_management.py b/task_management/all_task_management.py index e6bcbdc..3e4557a 100644 --- a/task_management/all_task_management.py +++ b/task_management/all_task_management.py @@ -51,14 +51,33 @@ class AllTask: print(f"{key} 在超时时间内未获取到数据") return None - async def task_distribution(self, data: Dict[str, Any]): - result = dict() - if data["app_name"] == "Shopee": - try: - result = await self.deal_shopee_task(data) - except Exception as e: - print(f"{data['item_id']}:{data['shop_id']}获取商品数据失败,失败原因为:{e}") - return result + async def insert_task_record(self, params: Dict[str, Any]): + # 需要进行记录,便于统计 + await self.db_pool.initialize() + try: + insert_param = {"task_id": params["task_id"], "app_name": params["app_name"], "region": params["host"], + "shop_id": params["shop_id"], "item_id": params["item_id"], "status": 1} + result = await self.db_pool.insert_many("crawler_task_record_info", [insert_param]) + if result: + print(f"{params['task_id']} 数据存入mysql{result} 行成功") + print(f"{params['task_id']} 数据存入mysql失败") + except Exception as e: + print(f"{params['task_id']} 数据存入mysql失败,失败原因为{e}") + + async def update_task_record(self, data: Dict[str, Any]): + await self.db_pool.initialize() + # 对任务状态进行修改 + try: + params = {"task_id": data["task_id"]} + where_conditions = "task_id = %s" + affected_rows = await self.db_pool.update(table='crawler_account_record_info', set_columns={"status": data["status"]}, + where_conditions=where_conditions, params=params) + if affected_rows: + print(f"task_id{data['task_id']} 任务完成,任务状态已修改成功") + return True + except Exception as e: + print(f"{params['task_id']} 状态更新失败;失败原因为:{e}") + return False async def deal_receive_data(self, data: Dict[str, Any]): # 将商品数据写入mysql和redis @@ -76,10 +95,8 @@ class AllTask: if affected_rows: print(f"{key};商品数据已存入mysql") # 更改任务状态 - params = {"task_id": data["task_id"]} - where_conditions = "account_id = %s" - affected_rows = await self.db_pool.update(table='crawler_account_record_info', set_columns={"status": 2}, - where_conditions=where_conditions, params=params) - if affected_rows: + params = {"task_id": data["task_id"], "status": 2} + result = await self.update_task_record(params) + if result: print(f"task_id{data['task_id']} 任务完成,任务状态已修改成功") return True