# -*- coding: utf-8 -*- import os import time from datetime import datetime from typing import Optional, Dict, Any, List from public_function.asyn_mysql import AsyncMySQL from public_function.deal_all_task import DealAllTask class DealAccount: def __init__(self, config_data: Dict[str, Any]): self.config_data = config_data self.db_pool: Optional[AsyncMySQL] = AsyncMySQL(self.config_data["advert_policy"]) self.task_all = DealAllTask() async def add_account(self, params: List[Dict[str, Any]]): """新增账户""" await self.db_pool.initialize() result = await self.db_pool.insert_many(table="crawler_account_record_info", data=params) if result: return True return False async def delete_account(self, params: Dict[str, Any]): """删除账户""" condition = "account_id='{}' and app_name='{}'".format(params["account_id"], params["app_name"]) await self.db_pool.initialize() result = await self.db_pool.delete(table="crawler_account_record_info", where_conditions=condition) if result: return True return False async def query_account_info(self, params: Dict[str, Any]): """查询具体账户信息""" sql_str = f"""select account_id,password,app_name,status from crawler_account_record_info where app_name='{params['app_name']}' and account_id='{params['account_id']}'""" await self.db_pool.initialize() result = await self.db_pool.fetch_all(sql_str, ) if result: return result[0] return [] async def obtain_account_info(self, app_name, country, number=1): """获取指定个数账户信息""" sql_str = f"""select account_id,password,app_name from crawler_account_record_info where status=1 and app_name='{app_name}' and region='{country}' limit {number}""" await self.db_pool.initialize() result = await self.db_pool.fetch_all(sql_str, ) if result: return result return [] async def update_account_info(self, set_param: Dict[str, Any], where_conditions, params): """更新账户信息""" await self.db_pool.initialize() affected_rows = await self.db_pool.update(table='crawler_account_record_info', set_columns=set_param, where_conditions=where_conditions, params=params) if affected_rows: return True return False async def update_device_status(self, set_param: Dict[str, Any], where_conditions, params): """更新账户信息""" affected_rows = await self.db_pool.update(table='shoppe_device_record', set_columns=set_param, where_conditions=where_conditions, params=params) if affected_rows: return True return False async def deal_backup_task(self, data: Dict[str, Any]): # 先查询该账户是否备份过,备份过就修改数据库状态, await self.db_pool.initialize() set_param = {"status": 1, "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")} params = (data["account_id"],) where_conditions = "account_id = %s " result = await self.update_device_status(set_param, where_conditions, params) if result: print(f"该账号:{data['account_id']} 已经备份过,不需要在备份") return True else: tasks = self.task_all.backup_system([data["pad_code"]], data["pad_code"].lower()) if tasks: backup_name = tasks[0]["backupId"] payload = {"account_id": data["account_id"], "backup_name": backup_name, "script_name": data["script_name"]} # 将数据插入mysql 的表 shoppe_device_record 中 await self.db_pool.insert_many(table="shoppe_device_record", data=[payload]) result = self.task_all.check_phone_status(pad_code=data["pad_code"], file_name="test_abc.js") if result: print(f"云机:{data['pad_code']}备份完成") return True else: print(f"云机:{data['pad_code']} 300秒备份失败,需要认为干预") else: print(f"实例:{data['pad_code']} 不存在") return False async def deal_restore_system(self, data: Dict[str, Any]): await self.db_pool.initialize() sql_str = f"""SELECT account_id,backup_name,script_name FROM shoppe_device_record WHERE update_time < NOW() - INTERVAL 24 HOUR ORDER BY RAND() LIMIT 1""" result = await self.db_pool.fetch_all(sql_str, ) if result: res_dict = result[0] print(f"{data['account_id']} ") set_param = {"status": 2, "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")} params = (res_dict["account_id"],) where_conditions = "account_id = %s " result_sql = await self.update_device_status(set_param, where_conditions, params) if result_sql: restore = self.task_all.restore_system(pad_code=[data["pad_code"]], backup_name=res_dict["backup_name"]) if restore: result_status = self.task_all.check_phone_status(pad_code=[data["pad_code"]], file_name="test_abc.js") if result_status: print(f"云机:{data['pad_code']} 环境还原成功") # 下载执行脚本 self.task_all.upload_file_to_phone([data["pad_code"]], file_name=res_dict["script_name"]) # 设置代理 print(data) self.task_all.set_network_proxy(pad_code=[data["pad_code"]],country=data["country"]) # 启动对应脚本 self.task_all.async_execute_adb_command(pad_code=data["pad_code"], file_name=res_dict["script_name"]) # 需要新增函数 return True else: print(f"云机:{data['pad_code']} 300环境还原失败,需要认为干预") return False if __name__ == '__main__': pass