代码优化

This commit is contained in:
liujianjiang 2025-11-27 14:57:57 +08:00
parent cde87999f7
commit 93e016b8d6
3 changed files with 29 additions and 29 deletions

15
main.py
View File

@ -8,7 +8,7 @@ from typing import Dict, Any, Optional, List
from fastapi import FastAPI, HTTPException, Depends from fastapi import FastAPI, HTTPException, Depends
from public_function.public_func import read_config from public_function.public_func import read_config
from model.model import CrawlerTask, DataReceive from model.model import GoodsInfo, DataReceive
from model.model import AccountCreate, AccountUpdate, DeviceResetData from model.model import AccountCreate, AccountUpdate, DeviceResetData
app = FastAPI() app = FastAPI()
@ -129,23 +129,22 @@ async def receive_data(task_data: DataReceive, task_manager: Any = Depends(get_t
raise HTTPException(status_code=500, detail="获取数据失败;失败原因{}".format(e)) raise HTTPException(status_code=500, detail="获取数据失败;失败原因{}".format(e))
@app.get("/crawler_task", summary="爬虫任务调用接口") @app.post("/get_goods_info", summary="客户端获取商品数据")
async def crawler_task(task_data: CrawlerTask, task_manager: Any = Depends(get_task_manager)): async def get_goods_info(task_data: GoodsInfo, task_manager: Any = Depends(get_task_manager)):
"""爬虫任务接口""" """客户端获取商品数据接口"""
try: try:
params = task_data.model_dump() params = task_data.model_dump()
params['task_id'] = uuid.uuid4().hex
result = await task_manager.task_distribution(params) result = await task_manager.task_distribution(params)
if result: if result:
return {"code": 200, "message": "<UNK>", "data": result} return {"code": 200, "message": "数据获取成功", "data": result}
raise HTTPException(status_code=404, detail="抓取商品数据失败,请重新尝试") raise HTTPException(status_code=404, detail="抓取商品数据失败,请重新尝试")
except Exception as e: except Exception as e:
print(f"<UNK>: {e}") print(f"<UNK>: {e}")
raise HTTPException(status_code=500, detail="获取数据失败;失败原因{}".format(e)) raise HTTPException(status_code=500, detail="获取数据失败;失败原因{}".format(e))
# @app.get("/device_reset", summary="爬虫任务调用接口") # @app.get("/device_reset", summary="设备重置")
# async def device_reset(task_data: CrawlerTask, reset_manager: Any = Depends(get_reset_manager)): # async def device_reset(task_data: GoodsInfo, reset_manager: Any = Depends(get_reset_manager)):
# """设备重置接口""" # """设备重置接口"""
# try: # try:
# params = task_data.model_dump() # params = task_data.model_dump()

View File

@ -17,11 +17,12 @@ class AccountUpdate(BaseModel):
status: int = Field(..., ge=1, le=10, description="状态1-空闲2-使用中3-暂停使用(后续还能使用)4-账号已无法使用") status: int = Field(..., ge=1, le=10, description="状态1-空闲2-使用中3-暂停使用(后续还能使用)4-账号已无法使用")
class CrawlerTask(BaseModel): class GoodsInfo(BaseModel):
country: str = Field(..., min_length=1, max_length=128, description="账号所在国家") country: str = Field(..., min_length=1, max_length=128, description="账号所在国家")
app_name: str = Field(..., min_length=1, max_length=128, description="应用名称") app_name: str = Field(..., min_length=1, max_length=128, description="应用名称")
goods_id: str = Field(..., min_length=1, max_length=128, description="商品ID") goods_id: str = Field(..., min_length=1, max_length=128, description="商品ID")
store_id: str = Field(..., min_length=1, max_length=128, description="店铺ID") store_id: str = Field(..., min_length=1, max_length=128, description="店铺ID")
is_re_crawl: str = Field(..., min_length=0, max_length=128, description="是否重新抓取")
class DataReceive(BaseModel): class DataReceive(BaseModel):

View File

@ -15,25 +15,39 @@ class AllTask:
async def deal_shopee_task(self, param): async def deal_shopee_task(self, param):
# 查询redis数据库redis 数据库存在该数据直接返回 # 查询redis数据库redis 数据库存在该数据直接返回
key = f"{param['app_name']}:{param['store_id']}:{param['goods_id']}" key = f"{param['store_id']}:{param['goods_id']}"
if not param.get('is_re_crawl', False):
print(f"{key} 开始在redis中获取数据")
result = self.redis_conn.read_data(key) result = self.redis_conn.read_data(key)
if result: if result:
return result return result
# 调用对应爬虫任务 # 调用对应爬虫任务
print(f"{key} 在redis中获取数据失败开始调用爬虫抓取数据")
# 任务结束后开始等待 # 任务结束后开始等待
result = dict()
endtime = time.time() + 55 endtime = time.time() + 55
while time.time() < endtime: while time.time() < endtime:
await time.sleep(5) await time.sleep(5)
result = self.redis_conn.read_data(key) result = self.redis_conn.read_data(key)
if result: if result:
print(f"{key} 调用爬虫抓取数据后在redis获取到数据")
return result
return result
async def task_distribution(self, data: Dict[str, Any]):
result = dict()
if data["app_name"] == "Shopee":
try:
result = await self.deal_shopee_task(data)
except Exception as e:
print(f"{data['task_id']} 获取商品数据失败,失败原因为:{e}")
return result return result
return []
async def deal_receive_data(self, data: Dict[str, Any]): async def deal_receive_data(self, data: Dict[str, Any]):
# 将商品数据写入mysql和redis # 将商品数据写入mysql和redis
print("开始处理接收到得数据:{}".format(data)) print("开始处理接收到得数据:{}".format(data))
await self.db_pool.initialize() await self.db_pool.initialize()
key = f"{data['app_name']}:{data['store_id']}:{data['goods_id']}" key = f"{data['store_id']}:{data['goods_id']}"
params = data.get("goods_info") params = data.get("goods_info")
affected_rows = self.redis_conn.write_data(key, params) affected_rows = self.redis_conn.write_data(key, params)
if affected_rows: if affected_rows:
@ -52,17 +66,3 @@ class AllTask:
if affected_rows: if affected_rows:
print(f"task_id{data['task_id']} 任务完成,任务状态已修改成功") print(f"task_id{data['task_id']} 任务完成,任务状态已修改成功")
return True return True
async def task_distribution(self, data: Dict[str, Any]):
# 需要对任务进行记录
try:
await self.db_pool.initialize()
# 将任务记录到mysql
param = {"app_name": data["app_name"], "task_id": data["task_id"], "country": data["country"]}
await self.db_pool.insert_many(table="crawler_task_record_info", data=param)
except Exception as e:
print("将任务记录到数据库失败,失败原因为:{}".format(e))
result = []
if param["app_name"] == "Shopee":
result = await self.deal_shopee_task(data)
return result