crawler_task_management/task_management/all_task_management.py

71 lines
3.1 KiB
Python
Raw Normal View History

2025-11-26 17:40:11 +08:00
# -*- coding: utf-8 -*-
import time
2025-11-26 18:56:26 +08:00
import json
2025-11-26 17:40:11 +08:00
from typing import Dict, Any
from public_function.asyn_mysql import AsyncMySQL
2025-11-27 16:18:12 +08:00
from public_function.public_func import create_logger
2025-11-26 17:40:11 +08:00
from public_function.redis_task_manager import RedisTaskManager
2025-11-27 16:18:12 +08:00
logger = create_logger(file_name="all_task_management")
2025-11-26 17:40:11 +08:00
class AllTask:
def __init__(self, config_data: Dict[str, Any]):
self.config_data = config_data
2025-11-26 18:56:26 +08:00
self.redis_conn = RedisTaskManager(self.config_data)
2025-11-26 17:40:11 +08:00
self.db_pool: Optional[AsyncMySQL] = AsyncMySQL(self.config_data["advert_policy"])
async def deal_shopee_task(self, param):
# 查询redis数据库redis 数据库存在该数据直接返回
2025-11-27 14:57:57 +08:00
key = f"{param['store_id']}:{param['goods_id']}"
if not param.get('is_re_crawl', False):
print(f"{key} 开始在redis中获取数据")
result = self.redis_conn.read_data(key)
if result:
return result
2025-11-26 17:40:11 +08:00
# 调用对应爬虫任务
2025-11-27 14:57:57 +08:00
print(f"{key} 在redis中获取数据失败开始调用爬虫抓取数据")
2025-11-26 17:40:11 +08:00
# 任务结束后开始等待
2025-11-27 14:57:57 +08:00
result = dict()
2025-11-26 17:40:11 +08:00
endtime = time.time() + 55
while time.time() < endtime:
2025-11-27 14:08:48 +08:00
await time.sleep(5)
2025-11-26 17:40:11 +08:00
result = self.redis_conn.read_data(key)
if result:
2025-11-27 14:57:57 +08:00
print(f"{key} 调用爬虫抓取数据后在redis获取到数据")
2025-11-26 17:40:11 +08:00
return result
2025-11-27 14:57:57 +08:00
return result
async def task_distribution(self, data: Dict[str, Any]):
result = dict()
if data["app_name"] == "Shopee":
try:
result = await self.deal_shopee_task(data)
except Exception as e:
print(f"{data['task_id']} 获取商品数据失败,失败原因为:{e}")
return result
2025-11-26 17:40:11 +08:00
2025-11-26 18:56:26 +08:00
async def deal_receive_data(self, data: Dict[str, Any]):
# 将商品数据写入mysql和redis
print("开始处理接收到得数据:{}".format(data))
await self.db_pool.initialize()
2025-11-27 14:57:57 +08:00
key = f"{data['store_id']}:{data['goods_id']}"
2025-11-26 18:56:26 +08:00
params = data.get("goods_info")
affected_rows = self.redis_conn.write_data(key, params)
if affected_rows:
print(f"{key}数据已存入redis中")
params = {"goods_id": data["goods_id"], "store_id": data["store_id"], "country": data["country"], "app_name": data["app_name"],
"goods_info": json.dumps(data)}
print(f"{params}<")
affected_rows = await self.db_pool.insert_many(table='goods_information_record', data=[params])
if affected_rows:
print(f"{key}商品数据已存入mysql")
# 更改任务状态
params = {"task_id": data["task_id"]}
where_conditions = "account_id = %s"
affected_rows = await self.db_pool.update(table='crawler_account_record_info', set_columns={"status": 2},
where_conditions=where_conditions, params=params)
if affected_rows:
print(f"task_id{data['task_id']} 任务完成,任务状态已修改成功")
return True