86 lines
3.8 KiB
Python
86 lines
3.8 KiB
Python
# -*- coding: utf-8 -*-
|
||
import time
|
||
import json
|
||
import asyncio
|
||
from typing import Dict, Any
|
||
from public_function.asyn_mysql import AsyncMySQL
|
||
from public_function.public_func import create_logger
|
||
from public_function.redis_task_manager import RedisTaskManager
|
||
|
||
logger = create_logger(file_name="all_task_management")
|
||
|
||
|
||
class AllTask:
|
||
def __init__(self, config_data: Dict[str, Any]):
|
||
self.config_data = config_data
|
||
self.redis_conn = RedisTaskManager(self.config_data)
|
||
self.db_pool: Optional[AsyncMySQL] = AsyncMySQL(self.config_data["advert_policy"])
|
||
|
||
async def deal_shopee_task(self, param: Dict[str, Any]):
|
||
"""
|
||
处理Shopee任务的异步方法
|
||
修复了NoneType对象无法await的问题
|
||
"""
|
||
# 查询redis数据库,redis数据库存在该数据直接返回
|
||
# key = f"{param['shop_id']}:{param['item_id']}"
|
||
key = f"{param['app_name'].lower()}:{param['country'].lower()}"
|
||
# 如果不是重新抓取,先尝试从Redis获取数据
|
||
if not param.get('is_re_crawl', False):
|
||
print(f"{key} 开始在redis中获取数据")
|
||
result = self.redis_conn.read_data(key)
|
||
# 关键修复:确保result不是None再进行处理
|
||
if result is not None:
|
||
print(f"{key} 从Redis缓存中获取到数据")
|
||
return result
|
||
print(f"{key} 在redis中获取数据失败,将任务提交到队列")
|
||
# 确保add_task_to_set是异步方法或正确处理
|
||
self.redis_conn.add_task_to_set(task_data=param)
|
||
# 任务结束后开始等待
|
||
endtime = time.time() + 55
|
||
counter = 0
|
||
while time.time() < endtime:
|
||
# 使用asyncio.sleep而不是time.sleep
|
||
await asyncio.sleep(5)
|
||
counter += 1
|
||
result = self.redis_conn.read_data(key)
|
||
# 关键修复:检查result是否为None
|
||
if result is not None:
|
||
print(f"{key} 第 {counter} 次在redis获取到数据")
|
||
return result
|
||
print(f"{key}:第 {counter} 次从redis中未能获取数据")
|
||
print(f"{key} 在超时时间内未获取到数据")
|
||
return None
|
||
|
||
async def task_distribution(self, data: Dict[str, Any]):
|
||
result = dict()
|
||
if data["app_name"] == "Shopee":
|
||
try:
|
||
result = await self.deal_shopee_task(data)
|
||
except Exception as e:
|
||
print(f"{data['item_id']}:{data['shop_id']}获取商品数据失败,失败原因为:{e}")
|
||
return result
|
||
|
||
async def deal_receive_data(self, data: Dict[str, Any]):
|
||
# 将商品数据写入mysql和redis
|
||
print("开始处理接收到得数据:{}".format(data))
|
||
await self.db_pool.initialize()
|
||
key = f"{data['shop_id']}:{data['item_id']}"
|
||
params = data.get("goods_info")
|
||
affected_rows = self.redis_conn.write_data(key, params)
|
||
if affected_rows:
|
||
print(f"{key};数据已存入redis中")
|
||
params_data = {"item_id": data["item_id"], "shop_id": data["shop_id"], "country": data["country"], "app_name": data["app_name"],
|
||
"goods_info": json.dumps(params)}
|
||
print(f"{params}<")
|
||
affected_rows = await self.db_pool.insert_many(table='goods_information_record', data=[params_data])
|
||
if affected_rows:
|
||
print(f"{key};商品数据已存入mysql")
|
||
# 更改任务状态
|
||
params = {"task_id": data["task_id"]}
|
||
where_conditions = "account_id = %s"
|
||
affected_rows = await self.db_pool.update(table='crawler_account_record_info', set_columns={"status": 2},
|
||
where_conditions=where_conditions, params=params)
|
||
if affected_rows:
|
||
print(f"task_id{data['task_id']} 任务完成,任务状态已修改成功")
|
||
return True
|