diff --git a/main.py b/main.py index 07dfacb..8ec7e82 100644 --- a/main.py +++ b/main.py @@ -8,7 +8,7 @@ from typing import Dict, Any, Optional, List from fastapi import FastAPI, HTTPException, Depends from public_function.public_func import read_config -from model.model import AccountCreate, AccountUpdate, CrawlerTask, AccountDelete +from model.model import AccountCreate, AccountUpdate, CrawlerTask, AccountDelete, DataReceive app = FastAPI() @@ -106,9 +106,18 @@ async def delete_account(account_data: AccountDelete, account_manager: Any = Dep @app.post("/receive_data", summary="接收抓取得数据") -async def receive_data(params: Dict[str, Any]): +async def receive_data(task_data: DataReceive, task_manager: Any = Depends(get_task_manager)): """数据接收接口""" - # 接收数据表写入redis后,存入mysql + try: + params = task_data.dict() + print(params) + result = await task_manager.deal_receive_data(params) + if result: + return {"code": 200, "message": "数据保存成功", "data": result} + raise HTTPException(status_code=404, detail="抓取商品数据失败,请重新尝试") + except Exception as e: + print(f": {e}") + raise HTTPException(status_code=500, detail="获取数据失败;失败原因{}".format(e)) @app.get("/crawler_task", summary="爬虫任务调用接口") diff --git a/model/model.py b/model/model.py index 02ad62e..5a0ec95 100644 --- a/model/model.py +++ b/model/model.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- from pydantic import BaseModel, Field +from typing import Optional, Dict, Any # 定义数据模型 @@ -23,5 +24,14 @@ class AccountDelete(BaseModel): class CrawlerTask(BaseModel): country: str = Field(..., min_length=1, max_length=128, description="账号所在国家") app_name: str = Field(..., min_length=1, max_length=128, description="应用名称") - goods_id: str = Field(..., min_length=1, max_length=128, description="账号所在国家") - store_id: str = Field(..., min_length=1, max_length=128, description="应用名称") + goods_id: str = Field(..., min_length=1, max_length=128, description="商品ID") + store_id: str = Field(..., min_length=1, max_length=128, description="店铺ID") + + +class DataReceive(BaseModel): + task_id: str = Field(..., description="任务ID") + app_name: str = Field(..., description="应用名称") + store_id: str = Field(..., description="店铺ID") + goods_id: str = Field(..., description="商品ID") + country: str = Field(..., description="国家") + goods_info: Dict[str, Any] = Field(..., description="商品信息") diff --git a/task_management/all_task_management.py b/task_management/all_task_management.py index 9e4d806..656001b 100644 --- a/task_management/all_task_management.py +++ b/task_management/all_task_management.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- import time +import json from typing import Dict, Any from public_function.asyn_mysql import AsyncMySQL @@ -9,7 +10,7 @@ from public_function.redis_task_manager import RedisTaskManager class AllTask: def __init__(self, config_data: Dict[str, Any]): self.config_data = config_data - self.redis_conn = RedisTaskManager(self.config_data["redis_config"]) + self.redis_conn = RedisTaskManager(self.config_data) self.db_pool: Optional[AsyncMySQL] = AsyncMySQL(self.config_data["advert_policy"]) async def deal_shopee_task(self, param): @@ -28,6 +29,30 @@ class AllTask: return result return [] + async def deal_receive_data(self, data: Dict[str, Any]): + # 将商品数据写入mysql和redis + print("开始处理接收到得数据:{}".format(data)) + await self.db_pool.initialize() + key = f"{data['app_name']}:{data['store_id']}:{data['goods_id']}" + params = data.get("goods_info") + affected_rows = self.redis_conn.write_data(key, params) + if affected_rows: + print(f"{key};数据已存入redis中") + params = {"goods_id": data["goods_id"], "store_id": data["store_id"], "country": data["country"], "app_name": data["app_name"], + "goods_info": json.dumps(data)} + print(f"{params}<") + affected_rows = await self.db_pool.insert_many(table='goods_information_record', data=[params]) + if affected_rows: + print(f"{key};商品数据已存入mysql") + # 更改任务状态 + params = {"task_id": data["task_id"]} + where_conditions = "account_id = %s" + affected_rows = await self.db_pool.update(table='crawler_account_record_info', set_columns={"status": 2}, + where_conditions=where_conditions, params=params) + if affected_rows: + print(f"task_id{data['task_id']} 任务完成,任务状态已修改成功") + return True + async def task_distribution(self, data: Dict[str, Any]): # 需要对任务进行记录 try: @@ -38,7 +63,7 @@ class AllTask: except Exception as e: print("将任务记录到数据库失败,失败原因为:{}".format(e)) if param["app_name"] == "Shopee": - result = await self.deal_shopee_task(param) + result = await self.deal_shopee_task(data) if result: return result return None