diff --git a/account_management/deal_account.py b/account_management/deal_account.py index a81bc3e..5951f7f 100644 --- a/account_management/deal_account.py +++ b/account_management/deal_account.py @@ -1,14 +1,15 @@ # -*- coding: utf-8 -*- import os from typing import Optional, Dict, Any, List - from public_function.asyn_mysql import AsyncMySQL +from device_management.deal_all_task import DealAllTask class DealAccount: def __init__(self, config_data: Dict[str, Any]): self.config_data = config_data self.db_pool: Optional[AsyncMySQL] = AsyncMySQL(self.config_data["advert_policy"]) + self.task_all = DealAllTask() async def add_account(self, params: List[Dict[str, Any]]): """新增账户""" @@ -55,3 +56,61 @@ class DealAccount: if affected_rows: return True return False + + async def update_device_status(self, set_param: Dict[str, Any], where_conditions, params): + """更新账户信息""" + affected_rows = await self.db_pool.update(table='shoppe_device_record', set_columns=set_param, + where_conditions=where_conditions, params=params) + if affected_rows: + return True + return False + + async def deal_backup_task(self, data: Dict[str, Any]): + # 先查询该账户是否备份过,备份过就修改数据库状态, + await self.db_pool.initialize() + set_param = {"status": 1, "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")} + params = (data["account_id"],) + where_conditions = "account_id = %s " + result = await self.update_device_status(set_param, where_conditions, params) + if result: + print(f"该账号:{params['account_id']} 已经备份过,不需要在备份") + else: + tasks = self.task_all.backup_system([pad_code], pad_code.lower()) + backup_name = tasks[0]["backupId"] + payload = {"account_id": params["account_id"], "backup_name": backup_name, "script_name": params["script_name"]} + # 将数据插入mysql 的表 shoppe_device_record 中 + await self.db_pool.insert_many(table="shoppe_device_record", data=[payload]) + result = self.task_all.check_phone_status(pad_code=params["pad_code"], file_name="test_abc.js") + if result: + print(f"云机:{params['pad_code']}备份完成") + else: + print(f"云机:{params['pad_code']} 300秒备份失败,需要认为干预") + + async def deal_restore_system(self, data: Dict[str, Any]): + await self.db_pool.initialize() + sql_str = f"""SELECT account_id,backup_name,script_name FROM shoppe_device_record + WHERE update_time < NOW() - INTERVAL 24 HOUR ORDER BY RAND() LIMIT 1""" + result = await self.db_pool.fetch_all(sql_str, ) + if result: + res_dict = result[0] + print(f"{params['account_id']} ") + set_param = {"status": 2, "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")} + params = (res_dict["account_id"],) + where_conditions = "account_id = %s " + result_sql = await self.update_device_status(set_param, where_conditions, params) + if result_sql: + self.task_all.restore_system(pad_code=data["pad_code"], backup_name=res_dict["backup_name"]) + result_status = self.task_all.check_phone_status(pad_code=data["pad_code"], file_name="test_abc.js") + if result_status: + print(f"云机:{params['pad_code']} 环境还原成功") + # 下载执行脚本 + self.task_all.upload_file_to_phone([data["pad_code"]], file_name=res_dict["script_name"]) + # 启动对应脚本 + self.task_all.async_execute_adb_command(pad_code=data["pad_code"], file_name=res_dict["script_name"]) + # 需要新增函数 + else: + print(f"云机:{params['pad_code']} 300环境还原失败,需要认为干预") + + +if __name__ == '__main__': + pass diff --git a/main.py b/main.py index af1a418..31650a8 100644 --- a/main.py +++ b/main.py @@ -10,7 +10,8 @@ from public_function.auth import verify_tk_token from task_management.all_task_management import AllTask from account_management.deal_account import DealAccount from public_function.public_func import read_config, create_logger -from model.model import GoodsInfo, DataReceive, AccountStatus, AccountUpdate, TokenItem, CrawlerItem, ResetTask, AccountObtain +from model.model import GoodsInfo, DataReceive, AccountStatus, AccountUpdate +from model.model import TokenItem, CrawlerItem, ResetTask, AccountObtain, AlterStatus, BackupItem app = FastAPI() app.middleware("http")(verify_tk_token) @@ -210,5 +211,24 @@ async def reset_task(task_data: ResetTask, task_manager: Any = Depends(get_task_ raise HTTPException(status_code=404, detail=f"任务:{params.get("task_id")} 重启失败") +@app.post("/shop_backup", summary="云机备份与还原") +async def shop_backup(backup_data: AccountObtain, account_manager: Any = Depends(get_account_manager)): + """ + 获取指定应用的可用账号 + - **app_name**: 应用名称 + pad_code + """ + try: + param = backup_data.model_dump() + await account_manager.deal_backup_task(param) + if params.get("is_restore", False): + time.sleep(random.randint(1, 10)) + await account_manager.deal_restore_system(param) + return {"code": 200, "message": f"任务:{params.get("pad_code")} 备份还原成功" } + return {"code": 200, "message": f"任务:{params.get("pad_code")} 备份成功", } + except Exception as e: + raise HTTPException(status_code=404, detail=f"任务:{params.get("task_id")} 重启失败") + + if __name__ == '__main__': uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/public_function/arm_cloud_signature_v2.py b/public_function/arm_cloud_signature_v2.py new file mode 100644 index 0000000..dd9248e --- /dev/null +++ b/public_function/arm_cloud_signature_v2.py @@ -0,0 +1,116 @@ +# -*- coding: utf-8 -*- +import hmac +import hashlib +import time +import json +import requests +from typing import Dict, Any, Optional + + +class ArmCloudSignatureV2: + def __init__(self, access_key_id: str, secret_key: str, base_url: str = "https://api.xiaosuanyun.com"): + self.access_key_id = access_key_id + self.secret_key = secret_key + self.base_url = base_url # 国内: https://api.xiaosuanyun.com 海外: https://openapi-hk.armcloud.net + + def _calculate_signature(self, timestamp: str, path: str, body: str = "") -> str: + """计算HMAC-SHA256签名""" + string_to_sign = timestamp + path + (body if body else "") + signature = hmac.new( + self.secret_key.encode('utf-8'), + string_to_sign.encode('utf-8'), + hashlib.sha256 + ).hexdigest() + return signature + + def sign_get_request(self, path: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, str]: + """为GET请求生成签名头""" + timestamp = str(int(time.time() * 1000)) + # query_string = "" + if params: + query_string = "&".join([f"{k}={v}" for k, v in params.items()]) + body = query_string + else: + body = "" + + signature = self._calculate_signature(timestamp, path, body) + + return { + "authver": "2.0", + "x-ak": self.access_key_id, + "x-timestamp": timestamp, + "x-sign": signature + } + + def sign_post_request(self, path: str, data: Dict[str, Any]) -> Dict[str, str]: + """为POST请求生成签名头""" + timestamp = str(int(time.time() * 1000)) + request_body = json.dumps(data, ensure_ascii=False, separators=(',', ':')) + signature = self._calculate_signature(timestamp, path, request_body) + + return { + "authver": "2.0", + "x-ak": self.access_key_id, + "x-timestamp": timestamp, + "x-sign": signature, + "Content-Type": "application/json" + } + + def get(self, path: str, params: Optional[Dict[str, Any]] = None) -> requests.Response: + """发起GET请求""" + headers = self.sign_get_request(path, params) + url = self.base_url + path + return requests.get(url, params=params, headers=headers) + + def post(self, path: str, data: Dict[str, Any]) -> requests.Response: + """发起POST请求""" + headers = self.sign_post_request(path, data) + url = self.base_url + path + request_body = json.dumps(data, ensure_ascii=False, separators=(',', ':')) + return requests.post(url, data=request_body, headers=headers) + + +# 使用示例 +if __name__ == "__main__": + # 初始化客户端 - 根据地区选择域名 + # 国内用户 + # client = ArmCloudSignatureV2("ACP250915XAGEH03", "your_secret_key", "https://api.xiaosuanyun.com") + # 海外用户 + client = ArmCloudSignatureV2("gz8f1u0t63byzdu6ozbx8r5qs3e5lipt", "3yc8c8bg1dym0zaiwjh867al", "https://openapi-hk.armcloud.net") + + # POST请求示例 - 板卡列表查询(接口路径请参考OpenAPI文档) + device_list_data = { + "page": 1, + "rows": 10, + "padCodes": ["AC21020010391"], + "vmStatus": "1", + "deviceStatus": "0" + } + + try: + # 注意:此处路径仅为示例,实际路径请查看OpenAPI文档 + response = client.post("/openapi/open/device/list", device_list_data) + print(f"板卡列表查询 Response: {response.status_code}") + print(f"Response Body: {response.text}") + except Exception as e: + print(f"板卡列表查询 Error: {e}") + + # POST请求示例 - 创建云实例(接口路径和参数请参考OpenAPI文档) + create_instance_data = { + "clusterCode": "001", + "specificationCode": "m2-3", + "imageId": "img-25080826717", + "screenLayoutCode": "realdevice_1440x3120x560", + "number": 2, + "dns": "8.8.8.8", + "storageSize": 16, + "realPhoneTemplateId": 36 + } + + try: + # 注意:此处路径仅为示例,实际路径请查看OpenAPI文档 + response = client.post("/openapi/open/pad/net/storage/res/create", create_instance_data) + print(f"创建云实例 Response: {response.status_code}") + print(f"Response Body: {response.text}") + except Exception as e: + print(f"创建云实例 Error: {e}") diff --git a/public_function/asyn_mysql.py b/public_function/asyn_mysql.py index 9ea9228..4555884 100644 --- a/public_function/asyn_mysql.py +++ b/public_function/asyn_mysql.py @@ -132,11 +132,24 @@ class AsyncMySQL: return total_affected -if __name__ == '__main__': +async def main(): from public_function.public_func import read_config - config = read_config(r"C:\workfile\crawler_task_management\public_function\config.yaml") - obj = AsyncMySQL(config['advert_policy']) - sql_str = f"""select account_id,password from crawler_account_record_info - where status=1 and app_name='xiapi' limit 1""" - obj.fetch_all(sql_str) + print(config) + db = AsyncMySQL(config["advert_policy"]) + await db.initialize() + sql_str = f"""SELECT account_id,backup_name,script_name FROM shoppe_device_record + WHERE update_time < NOW() - INTERVAL 0 HOUR ORDER BY RAND() LIMIT 1""" + result = await db.fetch_all(sql_str) + print(result) + + +if __name__ == '__main__': + # from public_function.public_func import read_config + # + # config = read_config(r"C:\workfile\crawler_task_management\public_function\config.yaml") + # obj = AsyncMySQL(config['advert_policy']) + # sql_str = f"""select account_id,password from crawler_account_record_info + # where status=1 and app_name='xiapi' limit 1""" + # obj.fetch_all(sql_str) + asyncio.run(main()) \ No newline at end of file diff --git a/public_function/deal_all_task.py b/public_function/deal_all_task.py new file mode 100644 index 0000000..aa757aa --- /dev/null +++ b/public_function/deal_all_task.py @@ -0,0 +1,214 @@ +# -*- coding: utf-8 -*- +import time +import json +import pytz +import uuid +import shlex +import random +import requests +from typing import Dict, Any +from datetime import datetime +from public_function.kookey import set_network_proxy +from public_function.arm_cloud_signature_v2 import ArmCloudSignatureV2 + + +class DealAllTask: + def __init__(self): + self.end_point = "https://openapi-hk.armcloud.net" + # 性能配置 + self.secret_key = '3yc8c8bg1dym0zaiwjh867al' + self.access_key_id = 'gz8f1u0t63byzdu6ozbx8r5qs3e5lipt' + + def send_post_get(self, api_endpoint, payload, is_check=False): + print(f"{api_endpoint}: 参数为{payload}") + try: + arm_obj = ArmCloudSignatureV2(self.access_key_id, self.secret_key, self.end_point) + response = arm_obj.post(api_endpoint, payload) + result = response.json() + print(f"接口:{api_endpoint}返回结果为{result} ") + if result.get("code") == 200: + if isinstance(result['data'], dict): + return [result['data']] + elif isinstance(result['data'], list): + return result['data'] + elif result.get("code") == 110031: + time.sleep(random.randint(1, 3)) + if is_check: + return result + elif result.get("code") == 110012: + time.sleep(random.randint(1, 3)) + return [] + except Exception as e: + print(f": {str(e)}") + return [] + + def one_click_new_device(self, pad_code: list[str], country: str): + api_endpoint = "/openapi/open/pad/replacePad" + if country: + payload = {"padCodes": pad_code, 'countryCode': country, "realIphoneTemplateId": 37} + else: + payload = {"padCodes": pad_code, "realIphoneTemplateId": 37} + return self.send_post_get(api_endpoint, payload) + + @staticmethod + def set_network_proxy(pad_code: list[str], country: str, proxy_platform="kkoip"): + if proxy_platform == "kkoip": + print(f"{pad_code}使用kkoip") + set_network_proxy(pad_codes=pad_code, account='6255452-2ff229fb', + password='ef9996e4-%s-%d-60m' % (country, int(time.time() * 1000) + random.randint(0, 10000)), + ip='gate-hk.kkoip.com', port='15636', proxy_name='socks5', proxy_type='proxy') + if proxy_platform == "uniproxy": + print(f"{pad_code}使用 uniproxy") + session = random.randint(1000000, 1000000000) + # curl -x socks5://uniproxy-zone-custom-region-US:uniproxy@adv3.uipoxy.com:3000 ipinfo.io + set_network_proxy(pad_codes=pad_code, + account=f'uniproxy-zone-custom-region-{country}-session-{session}-sessTime-60-rc-0', + password='uniproxy', + ip='adv3.uipoxy.com', port='3000', proxy_name='socks5', proxy_type='proxy') + + def install_app(self, pad_code: list[str]): + end_point = "/openapi/open/pad/installApp" + payload = {"apps": [{"appId": 691720698, "appName": "AutoJs6", "pkgName": "org.autojs.autojs6", "padCodes": pad_code, }]} + print(f"{self.get_local_time()};开始安装autojs") + self.send_post_get(end_point, payload) + + def get_app_list(self, pad_code): + end_point = "/openapi/open/pad/listInstalledApp" + payload = {"padCodes": pad_code} + return self.send_post_get(end_point, payload) + + def wait_install_app(self, pad_code, package_name, timeout=300): + start_time = time.time() + while time.time() - start_time < timeout: + apps = self.get_app_list(pad_code) + if len(apps[0]['apps']) > 0 and package_name in [app['packageName'] for app in apps[0]['apps']]: + return True + time.sleep(5) + return False + + def turn_on_root(self, pad_code, app_id="org.autojs.autojs6"): + end_point = "/openapi/open/pad/switchRoot" + payload = { + "padCodes": pad_code, + "packageName": app_id, + "globalRoot": False, + "rootStatus": 1 + } + return self.send_post_get(end_point, payload) + + def grant_autojs_permission(self, pad_code): + end_point = "/openapi/open/pad/asyncCmd" + # adb shell pm grant org.autojs.autojs6 android.permission.WRITE_SECURE_SETTINGS + if isinstance(pad_code, str): + pad_code = [pad_code] + for cmd in ["appops set org.autojs.autojs6 MANAGE_EXTERNAL_STORAGE allow"]: + payload = {"padCodes": pad_code, "scriptContent": cmd} + return self.send_post_get(end_point, payload) + + def upload_file_to_phone(self, pad_code: list[str], file_name="uninetall.js"): + end_point = "/openapi/open/pad/v3/uploadFile" + file_id = self.find_file_unique_id(file_name) + payload = {"padCodes": pad_code, + "isAuthorization": False, + "autoInstall": 0, + "fileUniqueId": file_id, + "customizeFilePath": "/Download" + } + return self.send_post_get(end_point, payload) + + def find_file_unique_id(self, file_name="uninetall.js"): + end_point = "/file-center/open/file/list" + payload = {"page": 1, "rows": 10, "fileName": file_name} + res = self.send_post_get(end_point, payload) + print(res) + if res: + if len(res[0]['pageData']) > 0: + return res[0]['pageData'][0]['fileUniqueId'] + + return None + + @staticmethod + def build_autojs_command(params: str, file_name: str) -> str: + """安全构建AutoJS启动命令""" + script_path = '/sdcard/Download/' + file_name + return ( + f'am start -n "org.autojs.autojs6/org.autojs.autojs.external.open.RunIntentActivity" ' + f'--es path "{script_path}" --es referrer {shlex.quote(params)}' + ) + + def check_phone_status(self, pad_code: list[str], file_name: str,times=10): + count = 0 + while True: + result = self.async_execute_adb_command(pad_code, file_name, is_check=True) + if result: + if isinstance(result, list): + return True + else: + print(f"{pad_code[0]}未就绪; 返回结果为: {result}") + count += 1 + time.sleep(30) + if count == times: + return False + + def async_execute_adb_command(self, pad_code: list[str], file_name, is_check=False): + end_point = "/openapi/open/pad/asyncCmd" + script_content = self.build_autojs_command(json.dumps({"instance_id": pad_code[0]}), file_name) + if isinstance(pad_code, str): + pad_code = [pad_code] + payload = {"padCodes": pad_code, "scriptContent": script_content} + return self.send_post_get(end_point, payload, is_check=is_check) + + def alter_timezones(self, pad_code: list[str], country_code): + if isinstance(pad_code, str): + pad_code = [pad_code] + end_point = "/openapi/open/pad/updateTimeZone" + # time_zone_list = ['America/Los_Angeles', 'America/New_York', 'America/Denver', 'America/Anchorage', 'America/Chicago'] + time_zone_list = pytz.country_timezones.get(country_code) + time_zone = random.choice(time_zone_list) + payload = {"padCodes": pad_code, "timeZone": time_zone} + print(f"云机:{pad_code};将时区修改为:{time_zone}") + return self.send_post_get(end_point, payload) + + def backup_system(self, pad_code: list[str], backup_name: str): + end_point = "/openapi/open/pad/local/pod/backup" + payload = { + "padCode": pad_code[0], + "backupName": backup_name, + "backupPath": "backup/iaa/", + "ossConfig": { + 'endpoint': "oss-cn-hongkong.aliyuncs.com", + 'bucket': "zj-hk", + 'accessKey': "LTAI5tBs84AciVsFfmgKL3VQ", + 'secretKey': "faXPkY5auvwzFdqFISIzPTFj11FEcg", + 'protocol': "https", + 'region': "cn-hongkong", + } + } + return self.send_post_get(end_point, payload) + + def restore_system(self, pad_code: list[str], backup_name: str): + end_point = "/openapi/open/pad/local/pod/restore" + payload = {"padCode": pad_code[0], "backupId": backup_name} + return self.send_post_get(end_point, payload) + + def restart_system(self, pad_code: list[str]): + end_point = "/openapi/open/pad/restart" + payload = {"padCode": pad_code} + return self.send_post_get(end_point, payload) + + def restore_script_start(self, task_info: Dict[str, Any]): + # 睡眠三分钟后开始执行 + time.sleep(180) + retry_count = 0 + while True: + self.upload_file_to_phone(pad_code=task_info["pad_code"], file_name=task_info["retentionScript"]) + time.sleep(6) + self.grant_autojs_permission(pad_code=task_info["pad_code"]) + time.sleep(5) + result = self.async_execute_adb_command(task_info["pad_code"], task_info["retentionScript"]) + if result: + break + time.sleep(60) + retry_count += 1 + if retry_count > 5: + break diff --git a/public_function/kookey.py b/public_function/kookey.py new file mode 100644 index 0000000..f1447c5 --- /dev/null +++ b/public_function/kookey.py @@ -0,0 +1,148 @@ +# -*- coding: utf-8 -*- +import json +import hmac +import time +import random +import hashlib +import requests +from urllib.parse import urlparse + +# 示例参数 +secret_key = "3yc8c8bg1dym0zaiwjh867al" +end_point = "https://openapi-hk.armcloud.net" +access_key_id = "gz8f1u0t63byzdu6ozbx8r5qs3e5lipt" +default_bypass_domain = ['rr1---sn-i3b7knsl.gvt1.com'] + + +def calculate_signature(timestamp, path, body): + string_to_sign = timestamp + path + (body if body else "") + signature = hmac.new( + secret_key.encode('utf-8'), + string_to_sign.encode('utf-8'), + hashlib.sha256 + ).hexdigest() + return signature + + +def sig_request(req: requests.Request): + re_qu = urlparse(req.url) + query_string = "" + if req.method == "GET": + query_string = re_qu.query + elif req.method == "POST": + query_string = req.data + # 生成时间戳 + timestamp = str(int(time.time() * 1000)) + signature = calculate_signature(timestamp=timestamp, path=re_qu.path, body=query_string) + req.headers["authver"] = "2.0" + req.headers["x-ak"] = access_key_id + req.headers["x-timestamp"] = timestamp + req.headers["x-sign"] = signature + + +def api(api_endpoint, payload): + try: + headers = {"Content-Type": "application/json"} + payloads = json.dumps(payload) + + # 3. 发送请求 + r = requests.Request('POST', api_endpoint, + headers=headers, data=payloads) + sig_request(r) + session = requests.Session() + response = session.send(r.prepare()) + response.raise_for_status() + # 4. 解析响应 + result = response.json() + # print("API %s %s %s", api_endpoint, payloads, result) + if result.get("code") == 200: + if isinstance(result['data'], dict): + return [result['data']] + elif isinstance(result['data'], list): + return result['data'] + # 5. 业务状态码检查 + error_map = { + 110003: "ADB命令执行失败,请联系管理员", + 110012: "命令执行超时,请稍后重试" + } + error_code = result.get("code") + err_msg = " %s %s %s" % (error_map.get( + error_code, f"未知错误: {result.get('msg')}"), error_code, result.get("ts")) + except requests.exceptions.RequestException as e: + err_msg = f"网络请求异常: {str(e)} 500" + except json.JSONDecodeError: + err_msg = "响应解析失败 502" + except KeyError as e: + err_msg = f"响应字段缺失: {str(e)} 503" + print(err_msg) + return [] + + +def set_network_proxy(pad_codes, account, password, ip, port, proxy_type, proxy_name, + bypass_package_list=None, + bypass_ip_list=None, + bypass_domain_list=None, + enable=True, + s_uot=False): + """ + 设置网络代理配置 + + 参数说明: + pad_codes (list): 实例列表 (必填) + enable (bool): 启用代理 (必填) + account (str): 账号 + password (str): 密码 + ip (str): 代理IP + port (int): 代理端口 + proxy_type (str): 代理类型 (proxy/vpn) + proxy_name (str): 代理协议 (socks5/http-relay) + bypass_package_list (list): 不走代理的包名列表 + bypass_ip_list (list): 不走代理的IP列表 + bypass_domain_list (list): 不走代理的域名列表 + s_uot (bool): 是否开启UDP连接 + + 返回: API响应数据或错误信息 + """ + # 构造请求URL (替换为实际域名) + url = end_point + "/openapi/open/network/proxy/set" + # 构造请求体 + if isinstance(pad_codes, str): + pad_codes = [pad_codes] + payload = { + "padCodes": pad_codes, + "enable": enable, + "sUoT": s_uot + } + # 添加可选参数 + if account: + payload["account"] = account + if password: + payload["password"] = password + if ip: + payload["ip"] = ip + if port: + payload["port"] = port + if proxy_type: + payload["proxyType"] = proxy_type + if proxy_name: + payload["proxyName"] = proxy_name + if bypass_package_list: + payload["bypassPackageList"] = bypass_package_list + if bypass_ip_list: + payload["bypassIpList"] = bypass_ip_list + if bypass_domain_list: + bypass_domain_list.extend(default_bypass_domain) + payload["bypassDomainList"] = bypass_domain_list + return api(url, payload) + + +# tasks = set_network_proxy(pad_codes=["ACP250915XAGEH03"], +# account=f'uniproxy-zone-custom-region-US-session-{random.randint(1000000, 1000000000)}-sessTime-20', +# password='uniproxy', +# ip='adv1.uipoxy.com', port=3000, +# proxy_name='socks5', +# proxy_type='proxy') +# print(tasks) +# set_cmd = f"/data/wyproxy/run.sh -proxy socks5://6255452-2ff229fb:uniproxy@adv1.uipoxy.com:3000 -dns 8.8.8.8 -noudp" +# # curl -x socks5://uniproxy-zone-custom:uniproxy@adv1.ipmoyu.com:3000 ipinfo.io +# # curl -x socks5://uniproxy-zone-custom-region:uniproxy@adv1.ipmoyu.com:3000 ipinfo.io