crawler_task_management/public_function/deal_all_task.py

231 lines
9.9 KiB
Python

# -*- coding: utf-8 -*-
import time
import json
import pytz
import uuid
import shlex
import random
import requests
from typing import Dict, Any
from datetime import datetime
from public_function.kookey import set_network_proxy
from public_function.arm_cloud_signature_v2 import ArmCloudSignatureV2
class DealAllTask:
def __init__(self, logger):
self.end_point = "https://openapi-hk.armcloud.net"
# 性能配置
self.secret_key = '3yc8c8bg1dym0zaiwjh867al'
self.access_key_id = 'gz8f1u0t63byzdu6ozbx8r5qs3e5lipt'
self.logger = logger
def send_post_get(self, api_endpoint, payload, is_check=False):
self.logger.info(f"{api_endpoint}: 参数为{payload}")
try:
arm_obj = ArmCloudSignatureV2(self.access_key_id, self.secret_key, self.end_point)
response = arm_obj.post(api_endpoint, payload)
result = response.json()
self.logger.info(f"接口:{api_endpoint}返回结果为{result} ")
if result.get("code") == 200:
if isinstance(result['data'], dict):
return [result['data']]
elif isinstance(result['data'], list):
return result['data']
elif result.get("code") == 110031:
time.sleep(random.randint(1, 3))
if is_check:
return result
elif result.get("code") == 110012:
time.sleep(random.randint(1, 3))
return []
except Exception as e:
self.logger.info(f"<UNK>: {str(e)}")
return []
def one_click_new_device(self, pad_code: list[str], country: str):
api_endpoint = "/openapi/open/pad/replacePad"
if country:
payload = {"padCodes": pad_code, 'countryCode': country, "realIphoneTemplateId": 37}
else:
payload = {"padCodes": pad_code, "realIphoneTemplateId": 37}
return self.send_post_get(api_endpoint, payload)
def set_network_proxy(self, pad_code: list[str], country: str, proxy_platform="kkoip"):
if proxy_platform == "kkoip":
self.logger.info(f"{pad_code}使用kkoip")
set_network_proxy(pad_codes=pad_code, account='6255452-2ff229fb',
password='ef9996e4-%s-%d-60m' % (country, int(time.time() * 1000) + random.randint(0, 10000)),
ip='gate-hk.kkoip.com', port='15636', proxy_name='socks5', proxy_type='proxy')
if proxy_platform == "uniproxy":
self.logger.info(f"{pad_code}使用 uniproxy")
session = random.randint(1000000, 1000000000)
# curl -x socks5://uniproxy-zone-custom-region-US:uniproxy@adv3.uipoxy.com:3000 ipinfo.io
set_network_proxy(pad_codes=pad_code,
account=f'uniproxy-zone-custom-region-{country}-session-{session}-sessTime-60-rc-0',
password='uniproxy',
ip='adv3.uipoxy.com', port='3000', proxy_name='socks5', proxy_type='proxy')
def install_app(self, pad_code: list[str]):
end_point = "/openapi/open/pad/installApp"
payload = {"apps": [{"appId": 691720698, "appName": "AutoJs6", "pkgName": "org.autojs.autojs6", "padCodes": pad_code, }]}
self.logger.info(f"{pad_code};开始安装autojs")
self.send_post_get(end_point, payload)
def get_app_list(self, pad_code):
end_point = "/openapi/open/pad/listInstalledApp"
payload = {"padCodes": pad_code}
return self.send_post_get(end_point, payload)
def wait_install_app(self, pad_code, package_name, timeout=300):
start_time = time.time()
while time.time() - start_time < timeout:
apps = self.get_app_list(pad_code)
if len(apps[0]['apps']) > 0 and package_name in [app['packageName'] for app in apps[0]['apps']]:
return True
time.sleep(5)
return False
def turn_on_root(self, pad_code, app_id="org.autojs.autojs6"):
end_point = "/openapi/open/pad/switchRoot"
payload = {
"padCodes": pad_code,
"packageName": app_id,
"globalRoot": False,
"rootStatus": 1
}
return self.send_post_get(end_point, payload)
def grant_autojs_permission(self, pad_code):
end_point = "/openapi/open/pad/asyncCmd"
# adb shell pm grant org.autojs.autojs6 android.permission.WRITE_SECURE_SETTINGS
if isinstance(pad_code, str):
pad_code = [pad_code]
for cmd in ["appops set org.autojs.autojs6 MANAGE_EXTERNAL_STORAGE allow"]:
payload = {"padCodes": pad_code, "scriptContent": cmd}
return self.send_post_get(end_point, payload)
def upload_file_to_phone(self, pad_code: list[str], file_name="uninetall.js"):
end_point = "/openapi/open/pad/v3/uploadFile"
file_id = self.find_file_unique_id(file_name)
payload = {"padCodes": pad_code,
"isAuthorization": False,
"autoInstall": 0,
"fileUniqueId": file_id,
"customizeFilePath": "/Download"
}
return self.send_post_get(end_point, payload)
def find_file_unique_id(self, file_name="uninetall.js"):
end_point = "/file-center/open/file/list"
payload = {"page": 1, "rows": 10, "fileName": file_name}
res = self.send_post_get(end_point, payload)
self.logger.info(res)
if res:
if len(res[0]['pageData']) > 0:
return res[0]['pageData'][0]['fileUniqueId']
return None
@staticmethod
def build_autojs_command(params: str, file_name: str) -> str:
"""安全构建AutoJS启动命令"""
script_path = '/sdcard/Download/' + file_name
return (
f'am start -n "org.autojs.autojs6/org.autojs.autojs.external.open.RunIntentActivity" '
f'--es path "{script_path}" --es referrer {shlex.quote(params)}'
)
def check_phone_status(self, pad_code: list[str], file_name: str, times=10):
count = 0
self.logger.info(f"{pad_code};{file_name},开始等待180秒")
time.sleep(180)
self.logger.info(f"{pad_code};{file_name},等待180秒结束")
while True:
try:
result = self.async_execute_adb_command(pad_code, file_name, is_check=True)
self.logger.info(f"{pad_code};<UNK>{file_name};<UNK>{result}")
if result:
if isinstance(result, list):
return True
else:
self.logger.info(f"{pad_code[0]}未就绪; 返回结果为: {result}")
count += 1
time.sleep(30)
except Exception as e:
self.logger.info(f"{pad_code[0]}<UNK>; <UNK>: {e}")
if count == times:
return False
def async_execute_adb_command(self, pad_code: list[str], file_name, is_check=False):
end_point = "/openapi/open/pad/asyncCmd"
script_content = self.build_autojs_command(json.dumps({"instance_id": pad_code[0]}), file_name)
if isinstance(pad_code, str):
pad_code = [pad_code]
payload = {"padCodes": pad_code, "scriptContent": script_content}
return self.send_post_get(end_point, payload, is_check=is_check)
def alter_timezones(self, pad_code: list[str], country_code):
if isinstance(pad_code, str):
pad_code = [pad_code]
end_point = "/openapi/open/pad/updateTimeZone"
# time_zone_list = ['America/Los_Angeles', 'America/New_York', 'America/Denver', 'America/Anchorage', 'America/Chicago']
time_zone_list = pytz.country_timezones.get(country_code)
time_zone = random.choice(time_zone_list)
payload = {"padCodes": pad_code, "timeZone": time_zone}
self.logger.info(f"云机:{pad_code};将时区修改为:{time_zone}")
return self.send_post_get(end_point, payload)
def backup_system(self, pad_code: list[str], backup_name: str):
end_point = "/openapi/open/pad/local/pod/backup"
payload = {
"padCode": pad_code[0],
"backupName": backup_name,
"backupPath": "backup/iaa/",
"ossConfig": {
'endpoint': "oss-cn-hongkong.aliyuncs.com",
'bucket': "zj-hk",
'accessKey': "LTAI5tBs84AciVsFfmgKL3VQ",
'secretKey': "faXPkY5auvwzFdqFISIzPTFj11FEcg",
'protocol': "https",
'region': "cn-hongkong",
}
}
return self.send_post_get(end_point, payload)
def restore_system(self, pad_code: list[str], backup_name: str):
end_point = "/openapi/open/pad/local/pod/restore"
payload = {"padCode": pad_code[0], "backupId": backup_name}
return self.send_post_get(end_point, payload)
def restart_system(self, pad_code: list[str]):
end_point = "/openapi/open/pad/restart"
payload = {"padCode": pad_code}
return self.send_post_get(end_point, payload)
def restore_script_start(self, task_info: Dict[str, Any]):
# 睡眠三分钟后开始执行
time.sleep(180)
retry_count = 0
while True:
self.upload_file_to_phone(pad_code=task_info["pad_code"], file_name=task_info["retentionScript"])
time.sleep(6)
self.grant_autojs_permission(pad_code=task_info["pad_code"])
time.sleep(5)
result = self.async_execute_adb_command(task_info["pad_code"], task_info["retentionScript"])
if result:
break
time.sleep(60)
retry_count += 1
if retry_count > 5:
break
# if __name__ == '__main__':
# obj = DealAllTask()
# results = obj.async_execute_adb_command(["ACP251024MKBZTCO"], "test_abc.js", is_check=True)
# self.logger.info(type(results))
# if results:
# self.logger.info(123113)
# self.logger.info(f"1223{results}1223")