代码优化

This commit is contained in:
liujianjiang 2025-11-28 11:40:58 +08:00
parent fbf7ef1301
commit ae59838e11
4 changed files with 12 additions and 12 deletions

View File

@ -20,16 +20,16 @@ class AccountUpdate(BaseModel):
class GoodsInfo(BaseModel): class GoodsInfo(BaseModel):
country: str = Field(..., description="账号所在国家") country: str = Field(..., description="账号所在国家")
app_name: str = Field(..., description="应用名称") app_name: str = Field(..., description="应用名称")
goods_id: str = Field(..., description="商品ID") item_id: str = Field(..., description="商品ID")
store_id: str = Field(..., description="店铺ID") shop_id: str = Field(..., description="店铺ID")
is_re_crawl: bool = Field(..., description="是否重新抓取") is_re_crawl: bool = Field(..., description="是否重新抓取")
class DataReceive(BaseModel): class DataReceive(BaseModel):
task_id: str = Field(..., description="任务ID") task_id: str = Field(..., description="任务ID")
app_name: str = Field(..., description="应用名称") app_name: str = Field(..., description="应用名称")
store_id: str = Field(..., description="店铺ID") shop_id: str = Field(..., description="店铺ID")
goods_id: str = Field(..., description="商品ID") item_id: str = Field(..., description="商品ID")
country: str = Field(..., description="国家") country: str = Field(..., description="国家")
goods_info: Dict[str, Any] = Field(..., description="商品信息") goods_info: Dict[str, Any] = Field(..., description="商品信息")

View File

@ -9,6 +9,6 @@ async def verify_tk_token(request: Request, call_next):
token = request.headers.get("token") token = request.headers.get("token")
if token != TOKEN: if token != TOKEN:
return JSONResponse(status_code=401, content={"detail": "Unauthorized"}) return JSONResponse(status_code=401, content={"detail": "token 验证失败"})
return await call_next(request) return await call_next(request)

View File

@ -47,15 +47,15 @@ CREATE TABLE crawler_account_record_info
CREATE TABLE goods_information_record CREATE TABLE goods_information_record
( (
id INT(11) NOT NULL AUTO_INCREMENT COMMENT '表自增ID', id INT(11) NOT NULL AUTO_INCREMENT COMMENT '表自增ID',
goods_id VARCHAR(50) NOT NULL COMMENT '商品ID', item_id VARCHAR(50) NOT NULL COMMENT '商品ID',
store_id VARCHAR(50) NOT NULL COMMENT '店铺ID', shop_id VARCHAR(50) NOT NULL COMMENT '店铺ID',
country VARCHAR(50) NOT NULL COMMENT '国家', country VARCHAR(50) NOT NULL COMMENT '国家',
app_name VARCHAR(50) NOT NULL COMMENT 'app名称', app_name VARCHAR(50) NOT NULL COMMENT 'app名称',
goods_info text NOT NULL COMMENT '商品具体价格详情等信息', goods_info text NOT NULL COMMENT '商品具体价格详情等信息',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (id), PRIMARY KEY (id),
UNIQUE KEY uk_goods_info (goods_id,store_id,country,app_name), UNIQUE KEY uk_goods_info (item_id,shop_id,country,app_name),
KEY idx_status (status), KEY idx_status (status),
KEY idx_country (country) KEY idx_country (country)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='爬虫账号记录表'; ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='爬虫账号记录表';

View File

@ -17,7 +17,7 @@ class AllTask:
async def deal_shopee_task(self, param): async def deal_shopee_task(self, param):
# 查询redis数据库redis 数据库存在该数据直接返回 # 查询redis数据库redis 数据库存在该数据直接返回
key = f"{param['store_id']}:{param['goods_id']}" key = f"{param['shop_id']}:{param['item_id']}"
if not param.get('is_re_crawl', False): if not param.get('is_re_crawl', False):
print(f"{key} 开始在redis中获取数据") print(f"{key} 开始在redis中获取数据")
result = self.redis_conn.read_data(key) result = self.redis_conn.read_data(key)
@ -42,19 +42,19 @@ class AllTask:
try: try:
result = await self.deal_shopee_task(data) result = await self.deal_shopee_task(data)
except Exception as e: except Exception as e:
print(f"{data['goods_id']}:{data['store_id']}获取商品数据失败,失败原因为:{e}") print(f"{data['item_id']}:{data['shop_id']}获取商品数据失败,失败原因为:{e}")
return result return result
async def deal_receive_data(self, data: Dict[str, Any]): async def deal_receive_data(self, data: Dict[str, Any]):
# 将商品数据写入mysql和redis # 将商品数据写入mysql和redis
print("开始处理接收到得数据:{}".format(data)) print("开始处理接收到得数据:{}".format(data))
await self.db_pool.initialize() await self.db_pool.initialize()
key = f"{data['store_id']}:{data['goods_id']}" key = f"{data['shop_id']}:{data['item_id']}"
params = data.get("goods_info") params = data.get("goods_info")
affected_rows = self.redis_conn.write_data(key, params) affected_rows = self.redis_conn.write_data(key, params)
if affected_rows: if affected_rows:
print(f"{key}数据已存入redis中") print(f"{key}数据已存入redis中")
params = {"goods_id": data["goods_id"], "store_id": data["store_id"], "country": data["country"], "app_name": data["app_name"], params = {"item_id": data["item_id"], "shop_id": data["shop_id"], "country": data["country"], "app_name": data["app_name"],
"goods_info": json.dumps(data)} "goods_info": json.dumps(data)}
print(f"{params}<") print(f"{params}<")
affected_rows = await self.db_pool.insert_many(table='goods_information_record', data=[params]) affected_rows = await self.db_pool.insert_many(table='goods_information_record', data=[params])