crawler_task_management/task_management/all_task_management.py

152 lines
6.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import time
import json
import asyncio
from typing import Dict, Any
from public_function.asyn_mysql import AsyncMySQL
from public_function.public_func import create_logger
from public_function.redis_task_manager import RedisTaskManager
logger = create_logger(file_name="all_task_management")
class AllTask:
def __init__(self, config_data: Dict[str, Any]):
self.config_data = config_data
self.redis_conn = RedisTaskManager(self.config_data)
self.db_pool: Optional[AsyncMySQL] = AsyncMySQL(self.config_data["advert_policy"])
def get_task_item(self, task_data: Dict[str, Any]):
redis_key = f"{task_data["region"].lower()}_{task_data['app_name'].lower()}"
result = self.redis_conn.get_random_task_and_delete(redis_key)
if result:
return result
return None
def add_token(self, token) -> bool:
try:
return self.redis_conn.write_string_to_h_set("user_token", token, 1)
except Exception as e:
print(f"新增token{token}失败,s失败原因为{e}")
return False
def delete_token(self, token) -> bool:
try:
return self.redis_conn.write_string_to_h_set("user_token", token, 1)
except Exception as e:
print(f"新增token{token}失败,s失败原因为{e}")
return False
async def deal_shopee_task(self, param: Dict[str, Any]):
"""
处理Shopee任务的异步方法
"""
key = f"{param['app_name'].lower()}:{param['region'].lower()}:{param['shop_id']}:{param['item_id']}"
print(key)
result = self.redis_conn.read_data(key)
if result is not None:
print(f"{key} 从Redis缓存中获取到数据")
return result
print(f"{key} 在redis中获取数据失败将任务提交到队列")
self.redis_conn.add_task_to_set(task_data=param)
update_parms = {"task_id": param["task_id"], "status": 3}
await self.update_task_record(update_parms)
# 任务结束后开始等待
endtime = time.time() + 55
counter = 0
while time.time() < endtime:
await asyncio.sleep(5)
counter += 1
result = self.redis_conn.read_data(key)
# 关键修复检查result是否为None
if result is not None:
print(f"{key}{counter} 次在redis获取到数据")
return result
print(f"{key}:第 {counter} 次从redis中未能获取数据")
print(f"{key} 在超时时间内未获取到数据")
return None
async def insert_task_record(self, params: Dict[str, Any]):
# 需要进行记录,便于统计
await self.db_pool.initialize()
try:
insert_param = {"task_id": params["task_id"], "app_name": params["app_name"], "region": params["host"],
"shop_id": params["shop_id"], "item_id": params["item_id"], "status": 1, "token": params["token"]}
result = await self.db_pool.insert_many("crawler_task_record_info", [insert_param])
if result:
print(f"{params['task_id']} 数据存入mysql{result} 行成功")
else:
print(f"{params['task_id']} 数据存入mysql失败")
except Exception as e:
print(f"{params['task_id']} 数据存入mysql失败失败原因为{e}")
async def update_task_record(self, data: Dict[str, Any]):
print(f"开始修改任务:{data['task_id']} 状态")
await self.db_pool.initialize()
# 对任务状态进行修改
try:
params = (data["task_id"],)
where_conditions = "task_id = %s"
affected_rows = await self.db_pool.update(table='crawler_task_record_info', set_columns={"status": data["status"]},
where_conditions=where_conditions, params=params)
if affected_rows:
print(f"task_id{data['task_id']} 任务完成,任务状态已修改成功")
return True
except Exception as e:
print(f"{params['task_id']} 状态更新失败;失败原因为:{e}")
return False
async def deal_receive_data(self, data: Dict[str, Any]):
# 将商品数据写入mysql和redis
print("开始处理接收到得数据:{}".format(data.get("task_id")))
await self.db_pool.initialize()
key = f"{data['app_name'].lower()}:{data['region'].lower()}:{data['shop_id']}:{data['item_id']}"
params = data.get("goods_info")
affected_rows = self.redis_conn.write_data(key, params)
if affected_rows:
print(f"{key}数据已存入redis中")
params_data = {"item_id": data["item_id"], "shop_id": data["shop_id"], "region": data["region"],
"app_name": data["app_name"], "pad_code": data["pad_code"],
"account_id": data["account_id"],
"goods_info": json.dumps(params, ensure_ascii=False)}
try:
affected_rows = await self.db_pool.insert_many(table='goods_information_record', data=[params_data])
if affected_rows:
print(f"{key}商品数据已存入mysql")
except Exception as e:
print(f"{key}数据存入mysql失败失败原因为{e}")
try:
# 更改任务状态
params = {"task_id": data["task_id"], "status": 2}
result = await self.update_task_record(params)
if result:
print(f"task_id{data['task_id']} 任务完成,任务状态已修改成功")
except Exception as e:
print(f"{data['task_id']} 任务状态修改失败,失败原因为:{e}")
return True
async def deal_reset_task(self, data: Dict[str, Any]):
print("<UNK>{}".format(data))
await self.db_pool.initialize()
sql_str = f"""select task_id,app_name,shop_id,item_id,region,token from crawler_task_record_info
where task_id='{data['task_id']}' and status=4"""
param = await self.db_pool.fetch_all(sql_str, )
if param:
if isinstance(param, list):
param = param[0]
print(f"获取到对应任:{data['task_id']}务详情")
# # 将任务提交到redis队列
self.redis_conn.add_task_to_set(task_data=param)
# 修改状态为3
try:
# 更改任务状态
params = {"task_id": data["task_id"], "status": 3}
result = await self.update_task_record(params)
if result:
print(f"task_id{data['task_id']} 任务完成,任务状态已修改成功")
except Exception as e:
print(f"{data['task_id']} 任务状态修改失败,失败原因为:{e}")
return True
return False