crawler_task_management/task_management/all_task_management.py

112 lines
5.1 KiB
Python
Raw Normal View History

2025-11-26 17:40:11 +08:00
# -*- coding: utf-8 -*-
import time
2025-11-26 18:56:26 +08:00
import json
2025-11-28 14:50:01 +08:00
import asyncio
2025-11-26 17:40:11 +08:00
from typing import Dict, Any
from public_function.asyn_mysql import AsyncMySQL
2025-11-27 16:18:12 +08:00
from public_function.public_func import create_logger
2025-11-26 17:40:11 +08:00
from public_function.redis_task_manager import RedisTaskManager
2025-11-27 16:18:12 +08:00
logger = create_logger(file_name="all_task_management")
2025-11-26 17:40:11 +08:00
class AllTask:
def __init__(self, config_data: Dict[str, Any]):
self.config_data = config_data
2025-11-26 18:56:26 +08:00
self.redis_conn = RedisTaskManager(self.config_data)
2025-11-26 17:40:11 +08:00
self.db_pool: Optional[AsyncMySQL] = AsyncMySQL(self.config_data["advert_policy"])
2025-11-28 18:32:41 +08:00
def get_task_item(self, task_data: Dict[str, Any]):
redis_key = f"{task_data["region"].lower()}_{task_data['app_name'].lower()}"
result = self.redis_conn.get_random_task_and_delete(redis_key)
if result:
return result
return None
2025-11-28 14:50:01 +08:00
async def deal_shopee_task(self, param: Dict[str, Any]):
"""
处理Shopee任务的异步方法
修复了NoneType对象无法await的问题
"""
# 查询redis数据库redis数据库存在该数据直接返回
2025-11-28 15:08:58 +08:00
# key = f"{param['shop_id']}:{param['item_id']}"
2025-11-28 17:19:51 +08:00
key = f"{param['app_name'].lower()}:{param['region'].lower()}"
result = self.redis_conn.read_data(key)
# 关键修复确保result不是None再进行处理
if result is not None:
print(f"{key} 从Redis缓存中获取到数据")
return result
2025-11-28 17:29:21 +08:00
print(f"{key}:{param['shop_id']}:{param['item_id']} 在redis中获取数据失败将任务提交到队列")
2025-11-28 14:50:01 +08:00
# 确保add_task_to_set是异步方法或正确处理
self.redis_conn.add_task_to_set(task_data=param)
2025-11-28 18:32:41 +08:00
update_parms = {"task_id": param["task_id"], "status": 3}
await self.update_task_record(update_parms)
2025-11-26 17:40:11 +08:00
# 任务结束后开始等待
endtime = time.time() + 55
2025-11-28 14:50:01 +08:00
counter = 0
2025-11-26 17:40:11 +08:00
while time.time() < endtime:
2025-11-28 14:50:01 +08:00
# 使用asyncio.sleep而不是time.sleep
await asyncio.sleep(5)
counter += 1
2025-11-26 17:40:11 +08:00
result = self.redis_conn.read_data(key)
2025-11-28 14:50:01 +08:00
# 关键修复检查result是否为None
if result is not None:
print(f"{key}{counter} 次在redis获取到数据")
2025-11-26 17:40:11 +08:00
return result
2025-11-28 14:50:01 +08:00
print(f"{key}:第 {counter} 次从redis中未能获取数据")
print(f"{key} 在超时时间内未获取到数据")
return None
2025-11-27 14:57:57 +08:00
2025-11-28 17:13:08 +08:00
async def insert_task_record(self, params: Dict[str, Any]):
# 需要进行记录,便于统计
await self.db_pool.initialize()
try:
insert_param = {"task_id": params["task_id"], "app_name": params["app_name"], "region": params["host"],
2025-11-28 17:38:20 +08:00
"shop_id": params["shop_id"], "item_id": params["item_id"], "status": 1, "token": params["token"]}
2025-11-28 17:13:08 +08:00
result = await self.db_pool.insert_many("crawler_task_record_info", [insert_param])
if result:
print(f"{params['task_id']} 数据存入mysql{result} 行成功")
2025-11-28 17:38:20 +08:00
else:
print(f"{params['task_id']} 数据存入mysql失败")
2025-11-28 17:13:08 +08:00
except Exception as e:
print(f"{params['task_id']} 数据存入mysql失败失败原因为{e}")
async def update_task_record(self, data: Dict[str, Any]):
2025-11-28 17:29:21 +08:00
print(f"开始修改任务:{data['task_id']} 状态")
2025-11-28 17:13:08 +08:00
await self.db_pool.initialize()
# 对任务状态进行修改
try:
2025-11-28 17:29:21 +08:00
params = (data["task_id"],)
2025-11-28 17:13:08 +08:00
where_conditions = "task_id = %s"
2025-11-28 17:23:54 +08:00
affected_rows = await self.db_pool.update(table='crawler_task_record_info', set_columns={"status": data["status"]},
2025-11-28 17:13:08 +08:00
where_conditions=where_conditions, params=params)
2025-11-28 17:29:21 +08:00
2025-11-28 17:13:08 +08:00
if affected_rows:
print(f"task_id{data['task_id']} 任务完成,任务状态已修改成功")
return True
except Exception as e:
print(f"{params['task_id']} 状态更新失败;失败原因为:{e}")
return False
2025-11-26 17:40:11 +08:00
2025-11-26 18:56:26 +08:00
async def deal_receive_data(self, data: Dict[str, Any]):
# 将商品数据写入mysql和redis
print("开始处理接收到得数据:{}".format(data))
await self.db_pool.initialize()
2025-11-28 11:40:58 +08:00
key = f"{data['shop_id']}:{data['item_id']}"
2025-11-26 18:56:26 +08:00
params = data.get("goods_info")
affected_rows = self.redis_conn.write_data(key, params)
if affected_rows:
print(f"{key}数据已存入redis中")
2025-11-28 17:19:51 +08:00
params_data = {"item_id": data["item_id"], "shop_id": data["shop_id"], "region": data["region"], "app_name": data["app_name"],
2025-11-28 14:50:01 +08:00
"goods_info": json.dumps(params)}
2025-11-26 18:56:26 +08:00
print(f"{params}<")
2025-11-28 14:50:01 +08:00
affected_rows = await self.db_pool.insert_many(table='goods_information_record', data=[params_data])
2025-11-26 18:56:26 +08:00
if affected_rows:
print(f"{key}商品数据已存入mysql")
# 更改任务状态
2025-11-28 17:13:08 +08:00
params = {"task_id": data["task_id"], "status": 2}
result = await self.update_task_record(params)
if result:
2025-11-26 18:56:26 +08:00
print(f"task_id{data['task_id']} 任务完成,任务状态已修改成功")
return True