代码优化
This commit is contained in:
parent
ae59838e11
commit
3a1e13dcab
4
main.py
4
main.py
|
|
@ -133,7 +133,7 @@ async def receive_data(task_data: DataReceive, task_manager: Any = Depends(get_t
|
||||||
return {"code": 200, "message": "数据保存成功", "data": result}
|
return {"code": 200, "message": "数据保存成功", "data": result}
|
||||||
raise HTTPException(status_code=404, detail="抓取商品数据失败,请重新尝试")
|
raise HTTPException(status_code=404, detail="抓取商品数据失败,请重新尝试")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"{get_local_time()},商品数据处理失败,失败原因: {e}")
|
print(f"商品数据处理失败,失败原因: {e}")
|
||||||
raise HTTPException(status_code=500, detail="获取数据失败;失败原因{}".format(e))
|
raise HTTPException(status_code=500, detail="获取数据失败;失败原因{}".format(e))
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -166,4 +166,4 @@ async def device_reset(task_data: DeviceResetData, reset_manager: Any = Depends(
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
uvicorn.run(app)
|
uvicorn.run(app, host="0.0.0.0", port=8000)
|
||||||
|
|
|
||||||
|
|
@ -36,7 +36,6 @@ class RedisTaskManager:
|
||||||
# 设置过期时间,优先使用自定义时间
|
# 设置过期时间,优先使用自定义时间
|
||||||
expire_seconds = expire_time if expire_time is not None else self.expire_hours * 3600
|
expire_seconds = expire_time if expire_time is not None else self.expire_hours * 3600
|
||||||
result = self.redis_client.setex(key, expire_seconds, data_str)
|
result = self.redis_client.setex(key, expire_seconds, data_str)
|
||||||
|
|
||||||
if result:
|
if result:
|
||||||
print(f"数据写入成功 - 键: {key}, 过期时间: {expire_seconds}秒")
|
print(f"数据写入成功 - 键: {key}, 过期时间: {expire_seconds}秒")
|
||||||
return True
|
return True
|
||||||
|
|
@ -59,7 +58,6 @@ class RedisTaskManager:
|
||||||
try:
|
try:
|
||||||
data_str = self.redis_client.get(key)
|
data_str = self.redis_client.get(key)
|
||||||
if data_str is None:
|
if data_str is None:
|
||||||
print(f"键不存在或已过期 - 键: {key}")
|
|
||||||
return None
|
return None
|
||||||
# 尝试解析JSON数据
|
# 尝试解析JSON数据
|
||||||
try:
|
try:
|
||||||
|
|
@ -70,3 +68,16 @@ class RedisTaskManager:
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"读取Redis数据时发生错误: {e}")
|
print(f"读取Redis数据时发生错误: {e}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def add_task_to_set(self, task_data: Dict[str, Any], redis_key='crawler_task'):
|
||||||
|
try:
|
||||||
|
# 将任务数据序列化为JSON字符串
|
||||||
|
key = f"{task_data['shop_id']}_{task_data['item_id']}"
|
||||||
|
task_json = json.dumps(task_data)
|
||||||
|
# 为每个pad_code在集合中添加相同的任务数据
|
||||||
|
# Redis集合会自动去重,相同的pad_code只会保存一次
|
||||||
|
self.redis_client.hset(redis_key, key, task_json)
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
print(f"添加任务失败: {e}")
|
||||||
|
return False
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
import time
|
import time
|
||||||
import json
|
import json
|
||||||
|
import asyncio
|
||||||
from typing import Dict, Any
|
from typing import Dict, Any
|
||||||
from public_function.asyn_mysql import AsyncMySQL
|
from public_function.asyn_mysql import AsyncMySQL
|
||||||
from public_function.public_func import create_logger
|
from public_function.public_func import create_logger
|
||||||
|
|
@ -15,26 +16,39 @@ class AllTask:
|
||||||
self.redis_conn = RedisTaskManager(self.config_data)
|
self.redis_conn = RedisTaskManager(self.config_data)
|
||||||
self.db_pool: Optional[AsyncMySQL] = AsyncMySQL(self.config_data["advert_policy"])
|
self.db_pool: Optional[AsyncMySQL] = AsyncMySQL(self.config_data["advert_policy"])
|
||||||
|
|
||||||
async def deal_shopee_task(self, param):
|
async def deal_shopee_task(self, param: Dict[str, Any]):
|
||||||
|
"""
|
||||||
|
处理Shopee任务的异步方法
|
||||||
|
修复了NoneType对象无法await的问题
|
||||||
|
"""
|
||||||
# 查询redis数据库,redis数据库存在该数据直接返回
|
# 查询redis数据库,redis数据库存在该数据直接返回
|
||||||
key = f"{param['shop_id']}:{param['item_id']}"
|
key = f"{param['shop_id']}:{param['item_id']}"
|
||||||
|
# 如果不是重新抓取,先尝试从Redis获取数据
|
||||||
if not param.get('is_re_crawl', False):
|
if not param.get('is_re_crawl', False):
|
||||||
print(f"{key} 开始在redis中获取数据")
|
print(f"{key} 开始在redis中获取数据")
|
||||||
result = self.redis_conn.read_data(key)
|
result = self.redis_conn.read_data(key)
|
||||||
if result:
|
# 关键修复:确保result不是None再进行处理
|
||||||
|
if result is not None:
|
||||||
|
print(f"{key} 从Redis缓存中获取到数据")
|
||||||
return result
|
return result
|
||||||
# 调用对应爬虫任务
|
print(f"{key} 在redis中获取数据失败,将任务提交到队列")
|
||||||
print(f"{key} 在redis中获取数据失败,开始调用爬虫抓取数据")
|
# 确保add_task_to_set是异步方法或正确处理
|
||||||
|
self.redis_conn.add_task_to_set(task_data=param)
|
||||||
# 任务结束后开始等待
|
# 任务结束后开始等待
|
||||||
result = dict()
|
|
||||||
endtime = time.time() + 55
|
endtime = time.time() + 55
|
||||||
|
counter = 0
|
||||||
while time.time() < endtime:
|
while time.time() < endtime:
|
||||||
await time.sleep(5)
|
# 使用asyncio.sleep而不是time.sleep
|
||||||
|
await asyncio.sleep(5)
|
||||||
|
counter += 1
|
||||||
result = self.redis_conn.read_data(key)
|
result = self.redis_conn.read_data(key)
|
||||||
if result:
|
# 关键修复:检查result是否为None
|
||||||
print(f"{key} 调用爬虫抓取数据后在redis获取到数据")
|
if result is not None:
|
||||||
return result
|
print(f"{key} 第 {counter} 次在redis获取到数据")
|
||||||
return result
|
return result
|
||||||
|
print(f"{key}:第 {counter} 次从redis中未能获取数据")
|
||||||
|
print(f"{key} 在超时时间内未获取到数据")
|
||||||
|
return None
|
||||||
|
|
||||||
async def task_distribution(self, data: Dict[str, Any]):
|
async def task_distribution(self, data: Dict[str, Any]):
|
||||||
result = dict()
|
result = dict()
|
||||||
|
|
@ -54,10 +68,10 @@ class AllTask:
|
||||||
affected_rows = self.redis_conn.write_data(key, params)
|
affected_rows = self.redis_conn.write_data(key, params)
|
||||||
if affected_rows:
|
if affected_rows:
|
||||||
print(f"{key};数据已存入redis中")
|
print(f"{key};数据已存入redis中")
|
||||||
params = {"item_id": data["item_id"], "shop_id": data["shop_id"], "country": data["country"], "app_name": data["app_name"],
|
params_data = {"item_id": data["item_id"], "shop_id": data["shop_id"], "country": data["country"], "app_name": data["app_name"],
|
||||||
"goods_info": json.dumps(data)}
|
"goods_info": json.dumps(params)}
|
||||||
print(f"{params}<")
|
print(f"{params}<")
|
||||||
affected_rows = await self.db_pool.insert_many(table='goods_information_record', data=[params])
|
affected_rows = await self.db_pool.insert_many(table='goods_information_record', data=[params_data])
|
||||||
if affected_rows:
|
if affected_rows:
|
||||||
print(f"{key};商品数据已存入mysql")
|
print(f"{key};商品数据已存入mysql")
|
||||||
# 更改任务状态
|
# 更改任务状态
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue