diff --git a/public_function/config.yaml b/public_function/config.yaml index ff374f8..3298e82 100644 --- a/public_function/config.yaml +++ b/public_function/config.yaml @@ -15,7 +15,7 @@ access_key_id: LTAI5tA92Av7DQmSQY2MTJPe access_key_secret: KI5s3C78HcPX9MDjUwPwVDytFzxEjY redis_config: - host: 182.92.181.218 - port: 6372 - password: qaz@wsx + host: 47.238.96.231 + port: 6379 + password: QAhL5RXKh1o21RbEqRZb db: 0 \ No newline at end of file diff --git a/public_function/table_log.sql b/public_function/table_log.sql index b920150..96f671a 100644 --- a/public_function/table_log.sql +++ b/public_function/table_log.sql @@ -58,8 +58,9 @@ CREATE TABLE goods_information_record create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', PRIMARY KEY (id), - UNIQUE KEY uk_goods_info (item_id,shop_id,region,app_name), - KEY idx_status (status), + KEY idx_shop_id(shop_id), + KEY idx_item_id(item_id), + KEY idx_app_name(app_name), KEY idx_region (region) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='爬虫账号记录表'; diff --git a/task_management/all_task_management.py b/task_management/all_task_management.py index 6f2ce04..170c6a8 100644 --- a/task_management/all_task_management.py +++ b/task_management/all_task_management.py @@ -41,12 +41,12 @@ class AllTask: """ 处理Shopee任务的异步方法 """ - key = f"{param['app_name'].lower()}:{param['region'].lower()}" + key = f"{data['app_name'].lower()}:{data['region'].lower()}:{data['shop_id']}:{data['item_id']}" result = self.redis_conn.read_data(key) if result is not None: print(f"{key} 从Redis缓存中获取到数据") return result - print(f"{key}:{param['shop_id']}:{param['item_id']} 在redis中获取数据失败,将任务提交到队列") + print(f"{key} 在redis中获取数据失败,将任务提交到队列") self.redis_conn.add_task_to_set(task_data=param) update_parms = {"task_id": param["task_id"], "status": 3} await self.update_task_record(update_parms) @@ -100,7 +100,7 @@ class AllTask: # 将商品数据写入mysql和redis print("开始处理接收到得数据:{}".format(data)) await self.db_pool.initialize() - key = f"{data['shop_id']}:{data['item_id']}" + key = f"{data['app_name'].lower()}:{data['region'].lower()}:{data['shop_id']}:{data['item_id']}" params = data.get("goods_info") affected_rows = self.redis_conn.write_data(key, params) if affected_rows: @@ -108,12 +108,18 @@ class AllTask: params_data = {"item_id": data["item_id"], "shop_id": data["shop_id"], "region": data["region"], "app_name": data["app_name"], "goods_info": json.dumps(params)} print(f"{params}<") - affected_rows = await self.db_pool.insert_many(table='goods_information_record', data=[params_data]) - if affected_rows: - print(f"{key};商品数据已存入mysql") - # 更改任务状态 - params = {"task_id": data["task_id"], "status": 2} - result = await self.update_task_record(params) - if result: - print(f"task_id{data['task_id']} 任务完成,任务状态已修改成功") + try: + affected_rows = await self.db_pool.insert_many(table='goods_information_record', data=[params_data]) + if affected_rows: + print(f"{key};商品数据已存入mysql") + except Exception as e: + print(f"{key}数据存入mysql失败,失败原因为:{e}") + try: + # 更改任务状态 + params = {"task_id": data["task_id"], "status": 2} + result = await self.update_task_record(params) + if result: + print(f"task_id{data['task_id']} 任务完成,任务状态已修改成功") + except Exception as e: + print(f"{data['task_id']} 任务状态修改失败,失败原因为:{e}") return True