新增获取任务接口

This commit is contained in:
liujianjiang 2025-11-28 17:13:08 +08:00
parent d08b4e203b
commit 9ad6a3e5f4
5 changed files with 81 additions and 52 deletions

28
main.py
View File

@ -73,10 +73,11 @@ def get_reset_manager():
# 账号处理相关
@app.get("/obtain_account", summary="获取可用账号")
async def obtain_account(app_name: str, country: str, account_manager: Any = Depends(get_account_manager)):
async def obtain_account(app_name: str, country: str, pad_code: str, account_manager: Any = Depends(get_account_manager)):
"""
获取指定应用的可用账号
- **app_name**: 应用名称
pad_code
"""
if not app_name or not app_name.strip():
raise HTTPException(status_code=400, detail="应用名称不能为空")
@ -85,6 +86,7 @@ async def obtain_account(app_name: str, country: str, account_manager: Any = Dep
try:
result = await account_manager.obtain_account_info(app_name, country)
if result:
# 将账号和设备进行记录
return {"code": 200, "message": "获取账号成功", "data": result[0]}
else:
raise HTTPException(status_code=404, detail="没有可用的账号")
@ -155,8 +157,8 @@ async def get_crawler_task(task_data: CrawlerItem, task_manager: Any = Depends(g
async def receive_data(task_data: DataReceive, task_manager: Any = Depends(get_task_manager)):
"""数据接收接口"""
try:
# 需要对账号设备进行记录,便于统计每个账号采集数据数量
params = task_data.model_dump()
print(params)
result = await task_manager.deal_receive_data(params)
if result:
return {"code": 200, "message": "数据保存成功", "data": result}
@ -169,15 +171,26 @@ async def receive_data(task_data: DataReceive, task_manager: Any = Depends(get_t
@app.post("/get_goods_info", summary="客户端获取商品数据")
async def get_goods_info(task_data: GoodsInfo, task_manager: Any = Depends(get_task_manager)):
"""客户端获取商品数据接口"""
task_id = uuid.uuid4().hex
try:
params = task_data.model_dump()
result = await task_manager.task_distribution(params)
params['task_id'] = task_id
params["app_name"] = "Shopee"
await task_manager.insert_task_record(params)
print(f"开始处理任务: {task_id}")
result = await task_manager.deal_shopee_task(params)
if result:
printinfo(f"任务处理成功: {task_id}")
data = {"task_id": params["task_id"], "status": 2}
await task_manager.update_task_record(data)
return {"code": 200, "message": "数据获取成功", "data": result}
raise HTTPException(status_code=404, detail="抓取商品数据失败,请重新尝试")
print(f"任务处理失败: {task_id}")
raise HTTPException(status_code=503, detail="抓取商品数据失败,请重新尝试")
except Exception as e:
print(f"<UNK>: {e}")
raise HTTPException(status_code=500, detail="获取数据失败;失败原因{}".format(e))
print(f"任务异常 - ID: {task_id}, 错误: {str(e)}")
data = {"task_id": params["task_id"], "status": 3}
await task_manager.insert_task_record(data)
raise HTTPException(status_code=503, detail="抓取商品数据失败,请重新尝试")
@app.get("/device_reset", summary="设备重置")
@ -185,7 +198,8 @@ async def device_reset(task_data: DeviceResetData, reset_manager: Any = Depends(
"""设备重置接口"""
try:
params = task_data.model_dump()
result = await reset_manager.task_distribution(params)
# result = await reset_manager.task_distribution(params)
result = True
if result:
return {"code": 200, "message": "<UNK>", "data": result}
raise HTTPException(status_code=404, detail="抓取商品数据失败,请重新尝试")

View File

@ -7,7 +7,7 @@ from typing import Optional, Dict, Any
class AccountCreate(BaseModel):
account_id: str = Field(..., description="账号ID")
password: str = Field(..., description="密码")
country: str = Field(..., description="账号所在国家")
region: str = Field(..., description="账号所在地区")
app_name: str = Field(..., description="应用名称")
@ -18,11 +18,9 @@ class AccountUpdate(BaseModel):
class GoodsInfo(BaseModel):
country: str = Field(..., description="账号所在国家")
app_name: str = Field(..., description="应用名称")
host: str = Field(..., description="客户所在地区")
item_id: str = Field(..., description="商品ID")
shop_id: str = Field(..., description="店铺ID")
is_re_crawl: bool = Field(..., description="是否重新抓取")
class DataReceive(BaseModel):
@ -30,16 +28,16 @@ class DataReceive(BaseModel):
app_name: str = Field(..., description="应用名称")
shop_id: str = Field(..., description="店铺ID")
item_id: str = Field(..., description="商品ID")
country: str = Field(..., description="国家")
region: str = Field(..., description="地区")
goods_info: Dict[str, Any] = Field(..., description="商品信息")
class DeviceResetData(BaseModel):
device_id: str = Field(..., description="设备ID")
country: str = Field(..., description="国家")
region: str = Field(..., description="地区")
app_name: str = Field(..., description="应用名称")
class CrawlerItem(BaseModel):
country: str = Field(..., description="账号所在国家")
region: str = Field(..., description="账号所在地区")
app_name: str = Field(..., description="应用名称")

View File

@ -1,6 +1,5 @@
import os
import json
import uuid
import redis
import random
from datetime import datetime
@ -78,7 +77,6 @@ class RedisTaskManager:
"""添加任务到Redis集合"""
try:
# 生成唯一任务标识
task_data['task_id'] = uuid.uuid4().hex
task_data['created_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
key = f"{task_data['shop_id']}_{task_data['item_id']}"
redis_key = f"{task_data["country"].lower()}_{task_data['app_name'].lower()}"

View File

@ -1,22 +1,11 @@
CREATE TABLE crawler_device_info_record
(
id INT(11) NOT NULL AUTO_INCREMENT COMMENT '表自增ID',
pad_code VARCHAR(50) NOT NULL COMMENT '设备唯一标识',
name VARCHAR(100) NOT NULL DEFAULT '' COMMENT '设备名称',
status int NOT NULL DEFAULT 1 COMMENT '任务状态1、空闲 2、使用中',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (id),
UNIQUE KEY uk_pad_code (pad_code),
KEY idx_status (status)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='爬虫设备信息表';
create table crawler_task_record_info
(
id INT(11) NOT NULL AUTO_INCREMENT COMMENT '表自增ID',
task_id varchar(50) NOT NULL COMMENT '任务ID',
app_name VARCHAR(50) NOT NULL COMMENT 'app名称',
country VARCHAR(50) NOT NULL COMMENT '国家',
shop_id VARCHAR(50) NOT NULL COMMENT '店铺ID',
item_id VARCHAR(50) NOT NULL COMMENT '商品ID',
region VARCHAR(50) NOT NULL COMMENT '客户所在地区',
status int NOT NULL DEFAULT 1 COMMENT '任务状态1、开始执行2、执行成功3、执行失败',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
@ -25,6 +14,18 @@ create table crawler_task_record_info
KEY idx_status (status)
)ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='爬虫任务执行记录表';
CREATE TABLE crawler_device_info_record
(
id INT(11) NOT NULL AUTO_INCREMENT COMMENT '表自增ID',
pad_code VARCHAR(50) NOT NULL COMMENT '设备唯一标识',
status int NOT NULL DEFAULT 1 COMMENT '任务状态1、空闲 2、使用中3、暂停使用',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (id),
UNIQUE KEY uk_pad_code (pad_code),
KEY idx_status (status)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='爬虫设备信息表';
CREATE TABLE crawler_account_record_info
@ -32,30 +33,31 @@ CREATE TABLE crawler_account_record_info
id INT(11) NOT NULL AUTO_INCREMENT COMMENT '表自增ID',
account_id VARCHAR(50) NOT NULL COMMENT '账号名称',
password VARCHAR(50) NOT NULL COMMENT '账号密码',
country VARCHAR(50) NOT NULL COMMENT '国家',
status INT NOT NULL DEFAULT 1 COMMENT '任务状态1、空闲2、使用中',
region VARCHAR(50) NOT NULL COMMENT '地区',
status INT NOT NULL DEFAULT 1 COMMENT '任务状态1、空闲2、使用中3、暂停使用',
app_name VARCHAR(50) NOT NULL COMMENT 'app名称',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (id),
UNIQUE KEY uk_account_id (account_id,app_name),
KEY idx_status (status),
KEY idx_country (country)
KEY idx_region (region)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='爬虫账号记录表';
CREATE TABLE goods_information_record
(
id INT(11) NOT NULL AUTO_INCREMENT COMMENT '表自增ID',
item_id VARCHAR(50) NOT NULL COMMENT '商品ID',
shop_id VARCHAR(50) NOT NULL COMMENT '店铺ID',
country VARCHAR(50) NOT NULL COMMENT '国家',
item_id VARCHAR(50) NOT NULL COMMENT '商品ID',
shop_id VARCHAR(50) NOT NULL COMMENT '店铺ID',
region VARCHAR(50) NOT NULL COMMENT '地区',
app_name VARCHAR(50) NOT NULL COMMENT 'app名称',
goods_info text NOT NULL COMMENT '商品具体价格详情等信息',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (id),
UNIQUE KEY uk_goods_info (item_id,shop_id,country,app_name),
UNIQUE KEY uk_goods_info (item_id,shop_id,region,app_name),
KEY idx_status (status),
KEY idx_country (country)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='爬虫账号记录表';
KEY idx_region (region)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='爬虫账号记录表';

View File

@ -51,14 +51,33 @@ class AllTask:
print(f"{key} 在超时时间内未获取到数据")
return None
async def task_distribution(self, data: Dict[str, Any]):
result = dict()
if data["app_name"] == "Shopee":
try:
result = await self.deal_shopee_task(data)
except Exception as e:
print(f"{data['item_id']}:{data['shop_id']}获取商品数据失败,失败原因为:{e}")
return result
async def insert_task_record(self, params: Dict[str, Any]):
# 需要进行记录,便于统计
await self.db_pool.initialize()
try:
insert_param = {"task_id": params["task_id"], "app_name": params["app_name"], "region": params["host"],
"shop_id": params["shop_id"], "item_id": params["item_id"], "status": 1}
result = await self.db_pool.insert_many("crawler_task_record_info", [insert_param])
if result:
print(f"{params['task_id']} 数据存入mysql{result} 行成功")
print(f"{params['task_id']} 数据存入mysql失败")
except Exception as e:
print(f"{params['task_id']} 数据存入mysql失败失败原因为{e}")
async def update_task_record(self, data: Dict[str, Any]):
await self.db_pool.initialize()
# 对任务状态进行修改
try:
params = {"task_id": data["task_id"]}
where_conditions = "task_id = %s"
affected_rows = await self.db_pool.update(table='crawler_account_record_info', set_columns={"status": data["status"]},
where_conditions=where_conditions, params=params)
if affected_rows:
print(f"task_id{data['task_id']} 任务完成,任务状态已修改成功")
return True
except Exception as e:
print(f"{params['task_id']} 状态更新失败;失败原因为:{e}")
return False
async def deal_receive_data(self, data: Dict[str, Any]):
# 将商品数据写入mysql和redis
@ -76,10 +95,8 @@ class AllTask:
if affected_rows:
print(f"{key}商品数据已存入mysql")
# 更改任务状态
params = {"task_id": data["task_id"]}
where_conditions = "account_id = %s"
affected_rows = await self.db_pool.update(table='crawler_account_record_info', set_columns={"status": 2},
where_conditions=where_conditions, params=params)
if affected_rows:
params = {"task_id": data["task_id"], "status": 2}
result = await self.update_task_record(params)
if result:
print(f"task_id{data['task_id']} 任务完成,任务状态已修改成功")
return True