2025-11-26 17:40:11 +08:00
|
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
import time
|
2025-11-26 18:56:26 +08:00
|
|
|
|
import json
|
2025-11-26 17:40:11 +08:00
|
|
|
|
from typing import Dict, Any
|
|
|
|
|
|
|
|
|
|
|
|
from public_function.asyn_mysql import AsyncMySQL
|
|
|
|
|
|
from public_function.redis_task_manager import RedisTaskManager
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class AllTask:
|
|
|
|
|
|
def __init__(self, config_data: Dict[str, Any]):
|
|
|
|
|
|
self.config_data = config_data
|
2025-11-26 18:56:26 +08:00
|
|
|
|
self.redis_conn = RedisTaskManager(self.config_data)
|
2025-11-26 17:40:11 +08:00
|
|
|
|
self.db_pool: Optional[AsyncMySQL] = AsyncMySQL(self.config_data["advert_policy"])
|
|
|
|
|
|
|
|
|
|
|
|
async def deal_shopee_task(self, param):
|
|
|
|
|
|
# 查询redis数据库,redis 数据库存在该数据直接返回
|
|
|
|
|
|
key = f"{param['app_name']}:{param['store_id']}:{param['goods_id']}"
|
|
|
|
|
|
result = self.redis_conn.read_data(key)
|
|
|
|
|
|
if result:
|
|
|
|
|
|
return result
|
|
|
|
|
|
# 调用对应爬虫任务
|
|
|
|
|
|
# 任务结束后开始等待
|
|
|
|
|
|
endtime = time.time() + 55
|
|
|
|
|
|
while time.time() < endtime:
|
|
|
|
|
|
await time.sleep(1)
|
|
|
|
|
|
result = self.redis_conn.read_data(key)
|
|
|
|
|
|
if result:
|
|
|
|
|
|
return result
|
|
|
|
|
|
return []
|
|
|
|
|
|
|
2025-11-26 18:56:26 +08:00
|
|
|
|
async def deal_receive_data(self, data: Dict[str, Any]):
|
|
|
|
|
|
# 将商品数据写入mysql和redis
|
|
|
|
|
|
print("开始处理接收到得数据:{}".format(data))
|
|
|
|
|
|
await self.db_pool.initialize()
|
|
|
|
|
|
key = f"{data['app_name']}:{data['store_id']}:{data['goods_id']}"
|
|
|
|
|
|
params = data.get("goods_info")
|
|
|
|
|
|
affected_rows = self.redis_conn.write_data(key, params)
|
|
|
|
|
|
if affected_rows:
|
|
|
|
|
|
print(f"{key};数据已存入redis中")
|
|
|
|
|
|
params = {"goods_id": data["goods_id"], "store_id": data["store_id"], "country": data["country"], "app_name": data["app_name"],
|
|
|
|
|
|
"goods_info": json.dumps(data)}
|
|
|
|
|
|
print(f"{params}<")
|
|
|
|
|
|
affected_rows = await self.db_pool.insert_many(table='goods_information_record', data=[params])
|
|
|
|
|
|
if affected_rows:
|
|
|
|
|
|
print(f"{key};商品数据已存入mysql")
|
|
|
|
|
|
# 更改任务状态
|
|
|
|
|
|
params = {"task_id": data["task_id"]}
|
|
|
|
|
|
where_conditions = "account_id = %s"
|
|
|
|
|
|
affected_rows = await self.db_pool.update(table='crawler_account_record_info', set_columns={"status": 2},
|
|
|
|
|
|
where_conditions=where_conditions, params=params)
|
|
|
|
|
|
if affected_rows:
|
|
|
|
|
|
print(f"task_id{data['task_id']} 任务完成,任务状态已修改成功")
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
2025-11-26 17:40:11 +08:00
|
|
|
|
async def task_distribution(self, data: Dict[str, Any]):
|
|
|
|
|
|
# 需要对任务进行记录
|
|
|
|
|
|
try:
|
|
|
|
|
|
await self.db_pool.initialize()
|
|
|
|
|
|
# 将任务记录到mysql
|
|
|
|
|
|
param = {"app_name": data["app_name"], "task_id": data["task_id"], "country": data["country"]}
|
|
|
|
|
|
await self.db_pool.insert_many(table="crawler_task_record_info", data=param)
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print("将任务记录到数据库失败,失败原因为:{}".format(e))
|
|
|
|
|
|
if param["app_name"] == "Shopee":
|
2025-11-26 18:56:26 +08:00
|
|
|
|
result = await self.deal_shopee_task(data)
|
2025-11-26 17:40:11 +08:00
|
|
|
|
if result:
|
|
|
|
|
|
return result
|
|
|
|
|
|
return None
|