crawler_task_management/task_management/all_task_management.py

70 lines
3.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import time
import json
from typing import Dict, Any
from public_function.asyn_mysql import AsyncMySQL
from public_function.redis_task_manager import RedisTaskManager
class AllTask:
def __init__(self, config_data: Dict[str, Any]):
self.config_data = config_data
self.redis_conn = RedisTaskManager(self.config_data)
self.db_pool: Optional[AsyncMySQL] = AsyncMySQL(self.config_data["advert_policy"])
async def deal_shopee_task(self, param):
# 查询redis数据库redis 数据库存在该数据直接返回
key = f"{param['app_name']}:{param['store_id']}:{param['goods_id']}"
result = self.redis_conn.read_data(key)
if result:
return result
# 调用对应爬虫任务
# 任务结束后开始等待
endtime = time.time() + 55
while time.time() < endtime:
await time.sleep(1)
result = self.redis_conn.read_data(key)
if result:
return result
return []
async def deal_receive_data(self, data: Dict[str, Any]):
# 将商品数据写入mysql和redis
print("开始处理接收到得数据:{}".format(data))
await self.db_pool.initialize()
key = f"{data['app_name']}:{data['store_id']}:{data['goods_id']}"
params = data.get("goods_info")
affected_rows = self.redis_conn.write_data(key, params)
if affected_rows:
print(f"{key}数据已存入redis中")
params = {"goods_id": data["goods_id"], "store_id": data["store_id"], "country": data["country"], "app_name": data["app_name"],
"goods_info": json.dumps(data)}
print(f"{params}<")
affected_rows = await self.db_pool.insert_many(table='goods_information_record', data=[params])
if affected_rows:
print(f"{key}商品数据已存入mysql")
# 更改任务状态
params = {"task_id": data["task_id"]}
where_conditions = "account_id = %s"
affected_rows = await self.db_pool.update(table='crawler_account_record_info', set_columns={"status": 2},
where_conditions=where_conditions, params=params)
if affected_rows:
print(f"task_id{data['task_id']} 任务完成,任务状态已修改成功")
return True
async def task_distribution(self, data: Dict[str, Any]):
# 需要对任务进行记录
try:
await self.db_pool.initialize()
# 将任务记录到mysql
param = {"app_name": data["app_name"], "task_id": data["task_id"], "country": data["country"]}
await self.db_pool.insert_many(table="crawler_task_record_info", data=param)
except Exception as e:
print("将任务记录到数据库失败,失败原因为:{}".format(e))
if param["app_name"] == "Shopee":
result = await self.deal_shopee_task(data)
if result:
return result
return None