crawler_task_management/account_management/deal_account.py

117 lines
5.7 KiB
Python
Raw Normal View History

2025-11-26 17:40:11 +08:00
# -*- coding: utf-8 -*-
import os
from typing import Optional, Dict, Any, List
from public_function.asyn_mysql import AsyncMySQL
2026-01-08 14:11:33 +08:00
from public_function.deal_all_task import DealAllTask
2025-11-26 17:40:11 +08:00
class DealAccount:
def __init__(self, config_data: Dict[str, Any]):
self.config_data = config_data
self.db_pool: Optional[AsyncMySQL] = AsyncMySQL(self.config_data["advert_policy"])
2026-01-08 14:10:54 +08:00
self.task_all = DealAllTask()
2025-11-26 17:40:11 +08:00
async def add_account(self, params: List[Dict[str, Any]]):
"""新增账户"""
await self.db_pool.initialize()
result = await self.db_pool.insert_many(table="crawler_account_record_info", data=params)
if result:
return True
return False
2025-11-26 18:06:19 +08:00
async def delete_account(self, params: Dict[str, Any]):
2025-11-26 17:40:11 +08:00
"""删除账户"""
2025-11-26 18:06:19 +08:00
condition = "account_id='{}' and app_name='{}'".format(params["account_id"], params["app_name"])
2025-11-26 17:40:11 +08:00
await self.db_pool.initialize()
result = await self.db_pool.delete(table="crawler_account_record_info", where_conditions=condition)
if result:
return True
return False
async def query_account_info(self, params: Dict[str, Any]):
"""查询具体账户信息"""
2025-12-03 10:20:07 +08:00
sql_str = f"""select account_id,password,app_name,status from crawler_account_record_info
where app_name='{params['app_name']}' and account_id='{params['account_id']}'"""
2025-11-26 17:40:11 +08:00
await self.db_pool.initialize()
result = await self.db_pool.fetch_all(sql_str, )
if result:
2025-12-03 10:20:07 +08:00
return result[0]
2025-11-26 17:40:11 +08:00
return []
async def obtain_account_info(self, app_name, country, number=1):
"""获取指定个数账户信息"""
2025-12-03 10:20:07 +08:00
sql_str = f"""select account_id,password,app_name from crawler_account_record_info
where status=1 and app_name='{app_name}' and region='{country}' limit {number}"""
2025-11-26 17:40:11 +08:00
await self.db_pool.initialize()
result = await self.db_pool.fetch_all(sql_str, )
if result:
return result
return []
2025-11-27 10:44:09 +08:00
async def update_account_info(self, set_param: Dict[str, Any], where_conditions, params):
2025-11-26 17:40:11 +08:00
"""更新账户信息"""
await self.db_pool.initialize()
affected_rows = await self.db_pool.update(table='crawler_account_record_info', set_columns=set_param,
where_conditions=where_conditions, params=params)
if affected_rows:
return True
return False
2026-01-08 14:10:54 +08:00
async def update_device_status(self, set_param: Dict[str, Any], where_conditions, params):
"""更新账户信息"""
affected_rows = await self.db_pool.update(table='shoppe_device_record', set_columns=set_param,
where_conditions=where_conditions, params=params)
if affected_rows:
return True
return False
async def deal_backup_task(self, data: Dict[str, Any]):
# 先查询该账户是否备份过,备份过就修改数据库状态,
await self.db_pool.initialize()
set_param = {"status": 1, "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
params = (data["account_id"],)
where_conditions = "account_id = %s "
result = await self.update_device_status(set_param, where_conditions, params)
if result:
print(f"该账号:{params['account_id']} 已经备份过,不需要在备份")
else:
tasks = self.task_all.backup_system([pad_code], pad_code.lower())
backup_name = tasks[0]["backupId"]
payload = {"account_id": params["account_id"], "backup_name": backup_name, "script_name": params["script_name"]}
# 将数据插入mysql 的表 shoppe_device_record 中
await self.db_pool.insert_many(table="shoppe_device_record", data=[payload])
result = self.task_all.check_phone_status(pad_code=params["pad_code"], file_name="test_abc.js")
if result:
print(f"云机:{params['pad_code']}备份完成")
else:
print(f"云机:{params['pad_code']} 300秒备份失败需要认为干预")
async def deal_restore_system(self, data: Dict[str, Any]):
await self.db_pool.initialize()
sql_str = f"""SELECT account_id,backup_name,script_name FROM shoppe_device_record
WHERE update_time < NOW() - INTERVAL 24 HOUR ORDER BY RAND() LIMIT 1"""
result = await self.db_pool.fetch_all(sql_str, )
if result:
res_dict = result[0]
print(f"<UNK>{params['account_id']} <UNK>")
set_param = {"status": 2, "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
params = (res_dict["account_id"],)
where_conditions = "account_id = %s "
result_sql = await self.update_device_status(set_param, where_conditions, params)
if result_sql:
self.task_all.restore_system(pad_code=data["pad_code"], backup_name=res_dict["backup_name"])
result_status = self.task_all.check_phone_status(pad_code=data["pad_code"], file_name="test_abc.js")
if result_status:
print(f"云机:{params['pad_code']} 环境还原成功")
# 下载执行脚本
self.task_all.upload_file_to_phone([data["pad_code"]], file_name=res_dict["script_name"])
# 启动对应脚本
self.task_all.async_execute_adb_command(pad_code=data["pad_code"], file_name=res_dict["script_name"])
# 需要新增函数
else:
print(f"云机:{params['pad_code']} 300环境还原失败需要认为干预")
if __name__ == '__main__':
pass