crawler_task_management/task_management/all_task_management.py

112 lines
5.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import time
import json
import asyncio
from typing import Dict, Any
from public_function.asyn_mysql import AsyncMySQL
from public_function.public_func import create_logger
from public_function.redis_task_manager import RedisTaskManager
logger = create_logger(file_name="all_task_management")
class AllTask:
def __init__(self, config_data: Dict[str, Any]):
self.config_data = config_data
self.redis_conn = RedisTaskManager(self.config_data)
self.db_pool: Optional[AsyncMySQL] = AsyncMySQL(self.config_data["advert_policy"])
def get_task_item(self, task_data: Dict[str, Any]):
redis_key = f"{task_data["region"].lower()}_{task_data['app_name'].lower()}"
result = self.redis_conn.get_random_task_and_delete(redis_key)
if result:
return result
return None
async def deal_shopee_task(self, param: Dict[str, Any]):
"""
处理Shopee任务的异步方法
修复了NoneType对象无法await的问题
"""
# 查询redis数据库redis数据库存在该数据直接返回
# key = f"{param['shop_id']}:{param['item_id']}"
key = f"{param['app_name'].lower()}:{param['region'].lower()}"
result = self.redis_conn.read_data(key)
# 关键修复确保result不是None再进行处理
if result is not None:
print(f"{key} 从Redis缓存中获取到数据")
return result
print(f"{key}:{param['shop_id']}:{param['item_id']} 在redis中获取数据失败将任务提交到队列")
# 确保add_task_to_set是异步方法或正确处理
self.redis_conn.add_task_to_set(task_data=param)
update_parms = {"task_id": param["task_id"], "status": 3}
await self.update_task_record(update_parms)
# 任务结束后开始等待
endtime = time.time() + 55
counter = 0
while time.time() < endtime:
# 使用asyncio.sleep而不是time.sleep
await asyncio.sleep(5)
counter += 1
result = self.redis_conn.read_data(key)
# 关键修复检查result是否为None
if result is not None:
print(f"{key}{counter} 次在redis获取到数据")
return result
print(f"{key}:第 {counter} 次从redis中未能获取数据")
print(f"{key} 在超时时间内未获取到数据")
return None
async def insert_task_record(self, params: Dict[str, Any]):
# 需要进行记录,便于统计
await self.db_pool.initialize()
try:
insert_param = {"task_id": params["task_id"], "app_name": params["app_name"], "region": params["host"],
"shop_id": params["shop_id"], "item_id": params["item_id"], "status": 1, "token": params["token"]}
result = await self.db_pool.insert_many("crawler_task_record_info", [insert_param])
if result:
print(f"{params['task_id']} 数据存入mysql{result} 行成功")
else:
print(f"{params['task_id']} 数据存入mysql失败")
except Exception as e:
print(f"{params['task_id']} 数据存入mysql失败失败原因为{e}")
async def update_task_record(self, data: Dict[str, Any]):
print(f"开始修改任务:{data['task_id']} 状态")
await self.db_pool.initialize()
# 对任务状态进行修改
try:
params = (data["task_id"],)
where_conditions = "task_id = %s"
affected_rows = await self.db_pool.update(table='crawler_task_record_info', set_columns={"status": data["status"]},
where_conditions=where_conditions, params=params)
if affected_rows:
print(f"task_id{data['task_id']} 任务完成,任务状态已修改成功")
return True
except Exception as e:
print(f"{params['task_id']} 状态更新失败;失败原因为:{e}")
return False
async def deal_receive_data(self, data: Dict[str, Any]):
# 将商品数据写入mysql和redis
print("开始处理接收到得数据:{}".format(data))
await self.db_pool.initialize()
key = f"{data['shop_id']}:{data['item_id']}"
params = data.get("goods_info")
affected_rows = self.redis_conn.write_data(key, params)
if affected_rows:
print(f"{key}数据已存入redis中")
params_data = {"item_id": data["item_id"], "shop_id": data["shop_id"], "region": data["region"], "app_name": data["app_name"],
"goods_info": json.dumps(params)}
print(f"{params}<")
affected_rows = await self.db_pool.insert_many(table='goods_information_record', data=[params_data])
if affected_rows:
print(f"{key}商品数据已存入mysql")
# 更改任务状态
params = {"task_id": data["task_id"], "status": 2}
result = await self.update_task_record(params)
if result:
print(f"task_id{data['task_id']} 任务完成,任务状态已修改成功")
return True