231 lines
9.9 KiB
Python
231 lines
9.9 KiB
Python
# -*- coding: utf-8 -*-
|
|
import time
|
|
import json
|
|
import pytz
|
|
import uuid
|
|
import shlex
|
|
import random
|
|
import requests
|
|
from typing import Dict, Any
|
|
from datetime import datetime
|
|
from public_function.kookey import set_network_proxy
|
|
from public_function.arm_cloud_signature_v2 import ArmCloudSignatureV2
|
|
|
|
|
|
class DealAllTask:
|
|
def __init__(self, logger):
|
|
self.end_point = "https://openapi-hk.armcloud.net"
|
|
# 性能配置
|
|
self.secret_key = '3yc8c8bg1dym0zaiwjh867al'
|
|
self.access_key_id = 'gz8f1u0t63byzdu6ozbx8r5qs3e5lipt'
|
|
self.logger = logger
|
|
|
|
def send_post_get(self, api_endpoint, payload, is_check=False):
|
|
self.logger.info(f"{api_endpoint}: 参数为{payload}")
|
|
try:
|
|
arm_obj = ArmCloudSignatureV2(self.access_key_id, self.secret_key, self.end_point)
|
|
response = arm_obj.post(api_endpoint, payload)
|
|
result = response.json()
|
|
self.logger.info(f"接口:{api_endpoint}返回结果为{result} ")
|
|
if result.get("code") == 200:
|
|
if isinstance(result['data'], dict):
|
|
return [result['data']]
|
|
elif isinstance(result['data'], list):
|
|
return result['data']
|
|
elif result.get("code") == 110031:
|
|
time.sleep(random.randint(1, 3))
|
|
if is_check:
|
|
return result
|
|
elif result.get("code") == 110012:
|
|
time.sleep(random.randint(1, 3))
|
|
return []
|
|
except Exception as e:
|
|
self.logger.info(f"<UNK>: {str(e)}")
|
|
return []
|
|
|
|
def one_click_new_device(self, pad_code: list[str], country: str):
|
|
api_endpoint = "/openapi/open/pad/replacePad"
|
|
if country:
|
|
payload = {"padCodes": pad_code, 'countryCode': country, "realIphoneTemplateId": 37}
|
|
else:
|
|
payload = {"padCodes": pad_code, "realIphoneTemplateId": 37}
|
|
return self.send_post_get(api_endpoint, payload)
|
|
|
|
def set_network_proxy(self, pad_code: list[str], country: str, proxy_platform="kkoip"):
|
|
if proxy_platform == "kkoip":
|
|
self.logger.info(f"{pad_code}使用kkoip")
|
|
set_network_proxy(pad_codes=pad_code, account='6255452-2ff229fb',
|
|
password='ef9996e4-%s-%d-60m' % (country, int(time.time() * 1000) + random.randint(0, 10000)),
|
|
ip='gate-hk.kkoip.com', port='15636', proxy_name='socks5', proxy_type='proxy')
|
|
if proxy_platform == "uniproxy":
|
|
self.logger.info(f"{pad_code}使用 uniproxy")
|
|
session = random.randint(1000000, 1000000000)
|
|
# curl -x socks5://uniproxy-zone-custom-region-US:uniproxy@adv3.uipoxy.com:3000 ipinfo.io
|
|
set_network_proxy(pad_codes=pad_code,
|
|
account=f'uniproxy-zone-custom-region-{country}-session-{session}-sessTime-60-rc-0',
|
|
password='uniproxy',
|
|
ip='adv3.uipoxy.com', port='3000', proxy_name='socks5', proxy_type='proxy')
|
|
|
|
def install_app(self, pad_code: list[str]):
|
|
end_point = "/openapi/open/pad/installApp"
|
|
payload = {"apps": [{"appId": 691720698, "appName": "AutoJs6", "pkgName": "org.autojs.autojs6", "padCodes": pad_code, }]}
|
|
self.logger.info(f"{pad_code};开始安装autojs")
|
|
self.send_post_get(end_point, payload)
|
|
|
|
def get_app_list(self, pad_code):
|
|
end_point = "/openapi/open/pad/listInstalledApp"
|
|
payload = {"padCodes": pad_code}
|
|
return self.send_post_get(end_point, payload)
|
|
|
|
def wait_install_app(self, pad_code, package_name, timeout=300):
|
|
start_time = time.time()
|
|
while time.time() - start_time < timeout:
|
|
apps = self.get_app_list(pad_code)
|
|
if len(apps[0]['apps']) > 0 and package_name in [app['packageName'] for app in apps[0]['apps']]:
|
|
return True
|
|
time.sleep(5)
|
|
return False
|
|
|
|
def turn_on_root(self, pad_code, app_id="org.autojs.autojs6"):
|
|
end_point = "/openapi/open/pad/switchRoot"
|
|
payload = {
|
|
"padCodes": pad_code,
|
|
"packageName": app_id,
|
|
"globalRoot": False,
|
|
"rootStatus": 1
|
|
}
|
|
return self.send_post_get(end_point, payload)
|
|
|
|
def grant_autojs_permission(self, pad_code):
|
|
end_point = "/openapi/open/pad/asyncCmd"
|
|
# adb shell pm grant org.autojs.autojs6 android.permission.WRITE_SECURE_SETTINGS
|
|
if isinstance(pad_code, str):
|
|
pad_code = [pad_code]
|
|
for cmd in ["appops set org.autojs.autojs6 MANAGE_EXTERNAL_STORAGE allow"]:
|
|
payload = {"padCodes": pad_code, "scriptContent": cmd}
|
|
return self.send_post_get(end_point, payload)
|
|
|
|
def upload_file_to_phone(self, pad_code: list[str], file_name="uninetall.js"):
|
|
end_point = "/openapi/open/pad/v3/uploadFile"
|
|
file_id = self.find_file_unique_id(file_name)
|
|
payload = {"padCodes": pad_code,
|
|
"isAuthorization": False,
|
|
"autoInstall": 0,
|
|
"fileUniqueId": file_id,
|
|
"customizeFilePath": "/Download"
|
|
}
|
|
return self.send_post_get(end_point, payload)
|
|
|
|
def find_file_unique_id(self, file_name="uninetall.js"):
|
|
end_point = "/file-center/open/file/list"
|
|
payload = {"page": 1, "rows": 10, "fileName": file_name}
|
|
res = self.send_post_get(end_point, payload)
|
|
self.logger.info(res)
|
|
if res:
|
|
if len(res[0]['pageData']) > 0:
|
|
return res[0]['pageData'][0]['fileUniqueId']
|
|
|
|
return None
|
|
|
|
@staticmethod
|
|
def build_autojs_command(params: str, file_name: str) -> str:
|
|
"""安全构建AutoJS启动命令"""
|
|
script_path = '/sdcard/Download/' + file_name
|
|
return (
|
|
f'am start -n "org.autojs.autojs6/org.autojs.autojs.external.open.RunIntentActivity" '
|
|
f'--es path "{script_path}" --es referrer {shlex.quote(params)}'
|
|
)
|
|
|
|
def check_phone_status(self, pad_code: list[str], file_name: str, times=10):
|
|
count = 0
|
|
self.logger.info(f"{pad_code};{file_name},开始等待180秒")
|
|
time.sleep(180)
|
|
self.logger.info(f"{pad_code};{file_name},等待180秒结束")
|
|
while True:
|
|
try:
|
|
result = self.async_execute_adb_command(pad_code, file_name, is_check=True)
|
|
self.logger.info(f"{pad_code};<UNK>{file_name};<UNK>{result}")
|
|
if result:
|
|
if isinstance(result, list):
|
|
return True
|
|
else:
|
|
self.logger.info(f"{pad_code[0]}未就绪; 返回结果为: {result}")
|
|
count += 1
|
|
time.sleep(30)
|
|
except Exception as e:
|
|
self.logger.info(f"{pad_code[0]}<UNK>; <UNK>: {e}")
|
|
if count == times:
|
|
return False
|
|
|
|
def async_execute_adb_command(self, pad_code: list[str], file_name, is_check=False):
|
|
end_point = "/openapi/open/pad/asyncCmd"
|
|
script_content = self.build_autojs_command(json.dumps({"instance_id": pad_code[0]}), file_name)
|
|
if isinstance(pad_code, str):
|
|
pad_code = [pad_code]
|
|
payload = {"padCodes": pad_code, "scriptContent": script_content}
|
|
return self.send_post_get(end_point, payload, is_check=is_check)
|
|
|
|
def alter_timezones(self, pad_code: list[str], country_code):
|
|
if isinstance(pad_code, str):
|
|
pad_code = [pad_code]
|
|
end_point = "/openapi/open/pad/updateTimeZone"
|
|
# time_zone_list = ['America/Los_Angeles', 'America/New_York', 'America/Denver', 'America/Anchorage', 'America/Chicago']
|
|
time_zone_list = pytz.country_timezones.get(country_code)
|
|
time_zone = random.choice(time_zone_list)
|
|
payload = {"padCodes": pad_code, "timeZone": time_zone}
|
|
self.logger.info(f"云机:{pad_code};将时区修改为:{time_zone}")
|
|
return self.send_post_get(end_point, payload)
|
|
|
|
def backup_system(self, pad_code: list[str], backup_name: str):
|
|
end_point = "/openapi/open/pad/local/pod/backup"
|
|
payload = {
|
|
"padCode": pad_code[0],
|
|
"backupName": backup_name,
|
|
"backupPath": "backup/iaa/",
|
|
"ossConfig": {
|
|
'endpoint': "oss-cn-hongkong.aliyuncs.com",
|
|
'bucket': "zj-hk",
|
|
'accessKey': "LTAI5tBs84AciVsFfmgKL3VQ",
|
|
'secretKey': "faXPkY5auvwzFdqFISIzPTFj11FEcg",
|
|
'protocol': "https",
|
|
'region': "cn-hongkong",
|
|
}
|
|
}
|
|
return self.send_post_get(end_point, payload)
|
|
|
|
def restore_system(self, pad_code: list[str], backup_name: str):
|
|
end_point = "/openapi/open/pad/local/pod/restore"
|
|
payload = {"padCode": pad_code[0], "backupId": backup_name}
|
|
return self.send_post_get(end_point, payload)
|
|
|
|
def restart_system(self, pad_code: list[str]):
|
|
end_point = "/openapi/open/pad/restart"
|
|
payload = {"padCode": pad_code}
|
|
return self.send_post_get(end_point, payload)
|
|
|
|
def restore_script_start(self, task_info: Dict[str, Any]):
|
|
# 睡眠三分钟后开始执行
|
|
time.sleep(180)
|
|
retry_count = 0
|
|
while True:
|
|
self.upload_file_to_phone(pad_code=task_info["pad_code"], file_name=task_info["retentionScript"])
|
|
time.sleep(6)
|
|
self.grant_autojs_permission(pad_code=task_info["pad_code"])
|
|
time.sleep(5)
|
|
result = self.async_execute_adb_command(task_info["pad_code"], task_info["retentionScript"])
|
|
if result:
|
|
break
|
|
time.sleep(60)
|
|
retry_count += 1
|
|
if retry_count > 5:
|
|
break
|
|
|
|
|
|
# if __name__ == '__main__':
|
|
# obj = DealAllTask()
|
|
# results = obj.async_execute_adb_command(["ACP251024MKBZTCO"], "test_abc.js", is_check=True)
|
|
# self.logger.info(type(results))
|
|
# if results:
|
|
# self.logger.info(123113)
|
|
# self.logger.info(f"1223{results}1223")
|