crawler_task_management/main.py

259 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import time
import yaml
import uuid
import random
import asyncio
import uvicorn
from pathlib import Path
from typing import Dict, Any, Optional, List
from fastapi import FastAPI, HTTPException, Depends, Header, BackgroundTasks
from public_function.auth import verify_tk_token
from task_management.all_task_management import AllTask
from account_management.deal_account import DealAccount
from public_function.public_func import read_config, create_logger
from model.model import GoodsInfo, DataReceive, AccountStatus, AccountUpdate
from model.model import TokenItem, CrawlerItem, ResetTask, AccountObtain, AlterStatus, BackupItem
app = FastAPI()
app.middleware("http")(verify_tk_token)
logger = create_logger(file_name="crawler_main")
def get_config():
"""获取配置文件"""
config_path = os.path.join(Path(__file__).resolve().parent, 'public_function/config.yaml')
try:
# 这里假设read_config函数存在
return read_config(config_path)
except ImportError:
print(f"未找到read_config函数使用默认配置")
return {'host': 'localhost', 'port': 3306, 'user': 'root', 'password': 'password', 'db': 'test_db', 'max_overflow': 10}
def get_account_manager():
"""获取账号管理器实例"""
config = get_config()
try:
return DealAccount(config)
except ImportError:
print(f"未找到DealAccount类返回模拟实例")
return None
def get_task_manager():
"""任务获取管理器实例"""
config = get_config()
try:
return AllTask(config)
except ImportError:
print(f"未找到AllTask类返回模拟实例")
return None
# 账号处理相关
@app.post("/obtain_account", summary="获取可用账号")
async def obtain_account(account_data: AccountObtain, account_manager: Any = Depends(get_account_manager)):
"""
获取指定应用的可用账号
- **app_name**: 应用名称
pad_code
"""
try:
param = account_data.model_dump()
result = await account_manager.obtain_account_info(param["app_name"], param["region"])
if result:
# 将账号和设备进行记录
set_param = {"status": 2}
print(result)
params = (result[0]["account_id"], result[0]["app_name"])
where_conditions = "account_id = %s and app_name = %s "
await account_manager.update_account_info(set_param=set_param, where_conditions=where_conditions, params=params)
return {"code": 200, "message": "获取账号成功", "data": result[0]}
else:
raise HTTPException(status_code=404, detail="没有可用的账号")
except Exception as e:
print(f"获取账号失败: {e}")
raise HTTPException(status_code=404, detail="{}".format(e))
@app.post("/account_status", summary="获取账号状态")
async def account_status(account_data: AccountStatus, account_manager: Any = Depends(get_account_manager)):
"""
新增爬虫账号
- **account_id**: 账号ID
- **password**: 密码
- **app_name**: 应用名称
"""
try:
result = await account_manager.query_account_info(account_data.model_dump())
if result:
return {"code": 200, "message": "获取账号状态", "data": result}
raise HTTPException(status_code=404, detail="获取账号状态")
except Exception as e:
print(f"获取账号状态: {e}")
raise HTTPException(status_code=500, detail="获取账号状态,失败原因:{}".format(e))
@app.post("/update_account", summary="账号状态修改")
async def update_account(account_data: AccountUpdate, account_manager: Any = Depends(get_account_manager)):
"""
删除爬虫账号
- **account_id**: 账号ID
- **password**: 密码
- **app_name**: 应用名称
"""
try:
data = account_data.model_dump()
set_param = {"status": data["status"]}
params = (data["account_id"], data["app_name"])
where_conditions = "account_id = %s and app_name = %s "
result = await account_manager.update_account_info(set_param=set_param, where_conditions=where_conditions, params=params)
if result:
return {"code": 200, "message": "账号状态修改成功", "data": result}
else:
return {"code": 401, "message": "数据库中状态与传入状态一样"}
except Exception as e:
print(f"新增账号失败: {e}")
raise HTTPException(status_code=500, detail="账号失败,请重试,失败原因:{}".format(e))
@app.post("/get_crawler_task", summary="手机获取任务")
async def get_crawler_task(task_data: CrawlerItem, task_manager: Any = Depends(get_task_manager)):
"""
获取指定应用的可用账号
- **app_name**: 应用名称
- **country**: 应用名称
"""
try:
params = task_data.model_dump()
result = task_manager.get_task_item(params)
if result:
data = {"task_id": result["task_id"], "status": 4}
await task_manager.update_task_record(data)
return {"code": 200, "message": "任务获取成功", "data": result}
else:
return {"code": 200, "message": "暂时没有任务", "data": None}
except Exception as e:
print(f"获取任务失败,失败原因: {e}")
raise HTTPException(status_code=500, detail="获取任务失败;失败原因{}".format(e))
@app.post("/receive_data", summary="接收抓取数据")
async def receive_data(task_data: DataReceive, task_manager: Any = Depends(get_task_manager)):
"""数据接收接口"""
try:
# 需要对账号设备进行记录,便于统计每个账号采集数据数量
params = task_data.model_dump()
result = await task_manager.deal_receive_data(params)
if result:
return {"code": 200, "message": "数据保存成功", "data": result}
raise HTTPException(status_code=404, detail="抓取商品数据失败,请重新尝试")
except Exception as e:
print(f"商品数据处理失败,失败原因: {e}")
raise HTTPException(status_code=500, detail="获取数据失败;失败原因{}".format(e))
@app.post("/get_goods_info", summary="客户端获取商品数据")
async def get_goods_info(task_data: GoodsInfo, task_manager: Any = Depends(get_task_manager), token: Optional[str] = Header(None)):
"""客户端获取商品数据接口"""
task_id = uuid.uuid4().hex
try:
params = task_data.model_dump()
params['task_id'] = task_id
params["app_name"] = "Shopee"
params['region'] = params["host"]
params["token"] = token
print(params)
await task_manager.insert_task_record(params)
print(f"开始处理任务: {task_id}")
result = await task_manager.deal_shopee_task(params)
if result:
print(f"任务处理成功: {task_id}")
data = {"task_id": params["task_id"], "status": 2}
await task_manager.update_task_record(data)
return {"code": 200, "message": "数据获取成功", "data": result}
print(f"任务处理失败: {task_id}")
raise HTTPException(status_code=503, detail="抓取商品数据失败,请重新尝试")
except Exception as e:
print(f"任务异常 - ID: {task_id}, 错误: {str(e)}")
raise HTTPException(status_code=503, detail="抓取商品数据失败,请重新尝试")
@app.post("/add_token", summary="添加token")
async def add_token(task_data: TokenItem, task_manager: Any = Depends(get_task_manager)):
"""设备重置接口"""
try:
params = task_data.model_dump()
result = task_manager.add_token(params.get("token"))
print(result)
# result = True
if result:
print(f"新增token:{params.get("token")} 成功")
return {"code": 200, "message": "<UNK>", "data": result}
raise HTTPException(status_code=404, detail=f"新增token:{params.get("token")} 失败")
except Exception as e:
print(f"新增token:{params.get("token")} 失败,失败原因: {e}")
raise HTTPException(status_code=500, detail=f"新增token失败失败原因{e}")
@app.post("/reset_task", summary="重置任务")
async def reset_task(task_data: ResetTask, task_manager: Any = Depends(get_task_manager)):
# 任务上报失败,需要对任务进行重置
try:
params = task_data.model_dump()
result = await task_manager.deal_reset_task(params)
if result:
print(f"任务:{params.get("task_id")} 重启成功")
return {"code": 200, "message": f"任务:{params.get("task_id")} 重启成功", }
else:
return {"code": 200, "message": f"没有查询到相关任务:{params.get("task_id")}"}, 404
except Exception as e:
print(f"新增token:{params.get("token")} 失败,失败原因: {e}")
raise HTTPException(status_code=404, detail=f"任务:{params.get("task_id")} 重启失败")
async def deal_shopee_task(param: Dict[str, Any], account_manager):
print(param)
result = await account_manager.deal_backup_task(param)
print(f"pad_code{param.get('pad_code')} 备份结果为:{result}")
if param.get("is_restore", False):
time.sleep(random.randint(1, 10))
print(param)
restore_result = await account_manager.deal_restore_system(param)
print(f"pad_code{param.get('pad_code')} 还原结果为:{restore_result}")
# if restore_result and result:
# return {"code": 200, "message": f"任务:{param.get("pad_code")} 备份还原成功"}
# raise HTTPException(status_code=404, detail=f"云机:{param.get("pad_code")} 备份还原失败")
# else:
# if result:
# return {"code": 200, "message": f"任务:{param.get("pad_code")} 备份成功", }
# else:
# raise HTTPException(status_code=404, detail=f"云机:{param.get("pad_code")} 备份失败")
# # try:
# # pass
# # except Exception as e:
# # raise HTTPException(status_code=404, detail=f"云机:{param.get("pad_code")} 备份失败,失败原因:{e}")
@app.post("/shop_backup", summary="云机备份与还原")
async def shop_backup(background_tasks: BackgroundTasks, backup_data: BackupItem, account_manager: Any = Depends(get_account_manager)):
"""
获取指定应用的可用账号
- **app_name**: 应用名称
pad_code
"""
param = backup_data.model_dump()
task_id = f"task_{int(time.time())}"
param["task_id"] = task_id # 后续需要对任务状态进行记录使用
background_tasks.add_task(deal_shopee_task, param, account_manager)
return {
"message": "请求已接收,任务在后台执行",
"task_id": task_id
}
if __name__ == '__main__':
uvicorn.run(app, host="0.0.0.0", port=8000)