代码优化

This commit is contained in:
liujianjiang 2025-11-29 16:08:33 +08:00
parent a601f2b2a1
commit 656da15505
3 changed files with 23 additions and 16 deletions

View File

@ -15,7 +15,7 @@ access_key_id: LTAI5tA92Av7DQmSQY2MTJPe
access_key_secret: KI5s3C78HcPX9MDjUwPwVDytFzxEjY
redis_config:
host: 182.92.181.218
port: 6372
password: qaz@wsx
host: 47.238.96.231
port: 6379
password: QAhL5RXKh1o21RbEqRZb
db: 0

View File

@ -58,8 +58,9 @@ CREATE TABLE goods_information_record
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (id),
UNIQUE KEY uk_goods_info (item_id,shop_id,region,app_name),
KEY idx_status (status),
KEY idx_shop_id(shop_id),
KEY idx_item_id(item_id),
KEY idx_app_name(app_name),
KEY idx_region (region)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='爬虫账号记录表';

View File

@ -41,12 +41,12 @@ class AllTask:
"""
处理Shopee任务的异步方法
"""
key = f"{param['app_name'].lower()}:{param['region'].lower()}"
key = f"{data['app_name'].lower()}:{data['region'].lower()}:{data['shop_id']}:{data['item_id']}"
result = self.redis_conn.read_data(key)
if result is not None:
print(f"{key} 从Redis缓存中获取到数据")
return result
print(f"{key}:{param['shop_id']}:{param['item_id']} 在redis中获取数据失败将任务提交到队列")
print(f"{key} 在redis中获取数据失败将任务提交到队列")
self.redis_conn.add_task_to_set(task_data=param)
update_parms = {"task_id": param["task_id"], "status": 3}
await self.update_task_record(update_parms)
@ -100,7 +100,7 @@ class AllTask:
# 将商品数据写入mysql和redis
print("开始处理接收到得数据:{}".format(data))
await self.db_pool.initialize()
key = f"{data['shop_id']}:{data['item_id']}"
key = f"{data['app_name'].lower()}:{data['region'].lower()}:{data['shop_id']}:{data['item_id']}"
params = data.get("goods_info")
affected_rows = self.redis_conn.write_data(key, params)
if affected_rows:
@ -108,12 +108,18 @@ class AllTask:
params_data = {"item_id": data["item_id"], "shop_id": data["shop_id"], "region": data["region"], "app_name": data["app_name"],
"goods_info": json.dumps(params)}
print(f"{params}<")
affected_rows = await self.db_pool.insert_many(table='goods_information_record', data=[params_data])
if affected_rows:
print(f"{key}商品数据已存入mysql")
# 更改任务状态
params = {"task_id": data["task_id"], "status": 2}
result = await self.update_task_record(params)
if result:
print(f"task_id{data['task_id']} 任务完成,任务状态已修改成功")
try:
affected_rows = await self.db_pool.insert_many(table='goods_information_record', data=[params_data])
if affected_rows:
print(f"{key}商品数据已存入mysql")
except Exception as e:
print(f"{key}数据存入mysql失败失败原因为{e}")
try:
# 更改任务状态
params = {"task_id": data["task_id"], "status": 2}
result = await self.update_task_record(params)
if result:
print(f"task_id{data['task_id']} 任务完成,任务状态已修改成功")
except Exception as e:
print(f"{data['task_id']} 任务状态修改失败,失败原因为:{e}")
return True