代码优化

This commit is contained in:
liujianjiang 2026-01-08 14:10:54 +08:00
parent 62c86e04e7
commit ef654f44fe
6 changed files with 578 additions and 8 deletions

View File

@ -1,14 +1,15 @@
# -*- coding: utf-8 -*-
import os
from typing import Optional, Dict, Any, List
from public_function.asyn_mysql import AsyncMySQL
from device_management.deal_all_task import DealAllTask
class DealAccount:
def __init__(self, config_data: Dict[str, Any]):
self.config_data = config_data
self.db_pool: Optional[AsyncMySQL] = AsyncMySQL(self.config_data["advert_policy"])
self.task_all = DealAllTask()
async def add_account(self, params: List[Dict[str, Any]]):
"""新增账户"""
@ -55,3 +56,61 @@ class DealAccount:
if affected_rows:
return True
return False
async def update_device_status(self, set_param: Dict[str, Any], where_conditions, params):
"""更新账户信息"""
affected_rows = await self.db_pool.update(table='shoppe_device_record', set_columns=set_param,
where_conditions=where_conditions, params=params)
if affected_rows:
return True
return False
async def deal_backup_task(self, data: Dict[str, Any]):
# 先查询该账户是否备份过,备份过就修改数据库状态,
await self.db_pool.initialize()
set_param = {"status": 1, "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
params = (data["account_id"],)
where_conditions = "account_id = %s "
result = await self.update_device_status(set_param, where_conditions, params)
if result:
print(f"该账号:{params['account_id']} 已经备份过,不需要在备份")
else:
tasks = self.task_all.backup_system([pad_code], pad_code.lower())
backup_name = tasks[0]["backupId"]
payload = {"account_id": params["account_id"], "backup_name": backup_name, "script_name": params["script_name"]}
# 将数据插入mysql 的表 shoppe_device_record 中
await self.db_pool.insert_many(table="shoppe_device_record", data=[payload])
result = self.task_all.check_phone_status(pad_code=params["pad_code"], file_name="test_abc.js")
if result:
print(f"云机:{params['pad_code']}备份完成")
else:
print(f"云机:{params['pad_code']} 300秒备份失败需要认为干预")
async def deal_restore_system(self, data: Dict[str, Any]):
await self.db_pool.initialize()
sql_str = f"""SELECT account_id,backup_name,script_name FROM shoppe_device_record
WHERE update_time < NOW() - INTERVAL 24 HOUR ORDER BY RAND() LIMIT 1"""
result = await self.db_pool.fetch_all(sql_str, )
if result:
res_dict = result[0]
print(f"<UNK>{params['account_id']} <UNK>")
set_param = {"status": 2, "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
params = (res_dict["account_id"],)
where_conditions = "account_id = %s "
result_sql = await self.update_device_status(set_param, where_conditions, params)
if result_sql:
self.task_all.restore_system(pad_code=data["pad_code"], backup_name=res_dict["backup_name"])
result_status = self.task_all.check_phone_status(pad_code=data["pad_code"], file_name="test_abc.js")
if result_status:
print(f"云机:{params['pad_code']} 环境还原成功")
# 下载执行脚本
self.task_all.upload_file_to_phone([data["pad_code"]], file_name=res_dict["script_name"])
# 启动对应脚本
self.task_all.async_execute_adb_command(pad_code=data["pad_code"], file_name=res_dict["script_name"])
# 需要新增函数
else:
print(f"云机:{params['pad_code']} 300环境还原失败需要认为干预")
if __name__ == '__main__':
pass

22
main.py
View File

@ -10,7 +10,8 @@ from public_function.auth import verify_tk_token
from task_management.all_task_management import AllTask
from account_management.deal_account import DealAccount
from public_function.public_func import read_config, create_logger
from model.model import GoodsInfo, DataReceive, AccountStatus, AccountUpdate, TokenItem, CrawlerItem, ResetTask, AccountObtain
from model.model import GoodsInfo, DataReceive, AccountStatus, AccountUpdate
from model.model import TokenItem, CrawlerItem, ResetTask, AccountObtain, AlterStatus, BackupItem
app = FastAPI()
app.middleware("http")(verify_tk_token)
@ -210,5 +211,24 @@ async def reset_task(task_data: ResetTask, task_manager: Any = Depends(get_task_
raise HTTPException(status_code=404, detail=f"任务:{params.get("task_id")} 重启失败")
@app.post("/shop_backup", summary="云机备份与还原")
async def shop_backup(backup_data: AccountObtain, account_manager: Any = Depends(get_account_manager)):
"""
获取指定应用的可用账号
- **app_name**: 应用名称
pad_code
"""
try:
param = backup_data.model_dump()
await account_manager.deal_backup_task(param)
if params.get("is_restore", False):
time.sleep(random.randint(1, 10))
await account_manager.deal_restore_system(param)
return {"code": 200, "message": f"任务:{params.get("pad_code")} 备份还原成功" }
return {"code": 200, "message": f"任务:{params.get("pad_code")} 备份成功", }
except Exception as e:
raise HTTPException(status_code=404, detail=f"任务:{params.get("task_id")} 重启失败")
if __name__ == '__main__':
uvicorn.run(app, host="0.0.0.0", port=8000)

View File

@ -0,0 +1,116 @@
# -*- coding: utf-8 -*-
import hmac
import hashlib
import time
import json
import requests
from typing import Dict, Any, Optional
class ArmCloudSignatureV2:
def __init__(self, access_key_id: str, secret_key: str, base_url: str = "https://api.xiaosuanyun.com"):
self.access_key_id = access_key_id
self.secret_key = secret_key
self.base_url = base_url # 国内: https://api.xiaosuanyun.com 海外: https://openapi-hk.armcloud.net
def _calculate_signature(self, timestamp: str, path: str, body: str = "") -> str:
"""计算HMAC-SHA256签名"""
string_to_sign = timestamp + path + (body if body else "")
signature = hmac.new(
self.secret_key.encode('utf-8'),
string_to_sign.encode('utf-8'),
hashlib.sha256
).hexdigest()
return signature
def sign_get_request(self, path: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, str]:
"""为GET请求生成签名头"""
timestamp = str(int(time.time() * 1000))
# query_string = ""
if params:
query_string = "&".join([f"{k}={v}" for k, v in params.items()])
body = query_string
else:
body = ""
signature = self._calculate_signature(timestamp, path, body)
return {
"authver": "2.0",
"x-ak": self.access_key_id,
"x-timestamp": timestamp,
"x-sign": signature
}
def sign_post_request(self, path: str, data: Dict[str, Any]) -> Dict[str, str]:
"""为POST请求生成签名头"""
timestamp = str(int(time.time() * 1000))
request_body = json.dumps(data, ensure_ascii=False, separators=(',', ':'))
signature = self._calculate_signature(timestamp, path, request_body)
return {
"authver": "2.0",
"x-ak": self.access_key_id,
"x-timestamp": timestamp,
"x-sign": signature,
"Content-Type": "application/json"
}
def get(self, path: str, params: Optional[Dict[str, Any]] = None) -> requests.Response:
"""发起GET请求"""
headers = self.sign_get_request(path, params)
url = self.base_url + path
return requests.get(url, params=params, headers=headers)
def post(self, path: str, data: Dict[str, Any]) -> requests.Response:
"""发起POST请求"""
headers = self.sign_post_request(path, data)
url = self.base_url + path
request_body = json.dumps(data, ensure_ascii=False, separators=(',', ':'))
return requests.post(url, data=request_body, headers=headers)
# 使用示例
if __name__ == "__main__":
# 初始化客户端 - 根据地区选择域名
# 国内用户
# client = ArmCloudSignatureV2("ACP250915XAGEH03", "your_secret_key", "https://api.xiaosuanyun.com")
# 海外用户
client = ArmCloudSignatureV2("gz8f1u0t63byzdu6ozbx8r5qs3e5lipt", "3yc8c8bg1dym0zaiwjh867al", "https://openapi-hk.armcloud.net")
# POST请求示例 - 板卡列表查询接口路径请参考OpenAPI文档
device_list_data = {
"page": 1,
"rows": 10,
"padCodes": ["AC21020010391"],
"vmStatus": "1",
"deviceStatus": "0"
}
try:
# 注意此处路径仅为示例实际路径请查看OpenAPI文档
response = client.post("/openapi/open/device/list", device_list_data)
print(f"板卡列表查询 Response: {response.status_code}")
print(f"Response Body: {response.text}")
except Exception as e:
print(f"板卡列表查询 Error: {e}")
# POST请求示例 - 创建云实例接口路径和参数请参考OpenAPI文档
create_instance_data = {
"clusterCode": "001",
"specificationCode": "m2-3",
"imageId": "img-25080826717",
"screenLayoutCode": "realdevice_1440x3120x560",
"number": 2,
"dns": "8.8.8.8",
"storageSize": 16,
"realPhoneTemplateId": 36
}
try:
# 注意此处路径仅为示例实际路径请查看OpenAPI文档
response = client.post("/openapi/open/pad/net/storage/res/create", create_instance_data)
print(f"创建云实例 Response: {response.status_code}")
print(f"Response Body: {response.text}")
except Exception as e:
print(f"创建云实例 Error: {e}")

View File

@ -132,11 +132,24 @@ class AsyncMySQL:
return total_affected
if __name__ == '__main__':
async def main():
from public_function.public_func import read_config
config = read_config(r"C:\workfile\crawler_task_management\public_function\config.yaml")
obj = AsyncMySQL(config['advert_policy'])
sql_str = f"""select account_id,password from crawler_account_record_info
where status=1 and app_name='xiapi' limit 1"""
obj.fetch_all(sql_str)
print(config)
db = AsyncMySQL(config["advert_policy"])
await db.initialize()
sql_str = f"""SELECT account_id,backup_name,script_name FROM shoppe_device_record
WHERE update_time < NOW() - INTERVAL 0 HOUR ORDER BY RAND() LIMIT 1"""
result = await db.fetch_all(sql_str)
print(result)
if __name__ == '__main__':
# from public_function.public_func import read_config
#
# config = read_config(r"C:\workfile\crawler_task_management\public_function\config.yaml")
# obj = AsyncMySQL(config['advert_policy'])
# sql_str = f"""select account_id,password from crawler_account_record_info
# where status=1 and app_name='xiapi' limit 1"""
# obj.fetch_all(sql_str)
asyncio.run(main())

View File

@ -0,0 +1,214 @@
# -*- coding: utf-8 -*-
import time
import json
import pytz
import uuid
import shlex
import random
import requests
from typing import Dict, Any
from datetime import datetime
from public_function.kookey import set_network_proxy
from public_function.arm_cloud_signature_v2 import ArmCloudSignatureV2
class DealAllTask:
def __init__(self):
self.end_point = "https://openapi-hk.armcloud.net"
# 性能配置
self.secret_key = '3yc8c8bg1dym0zaiwjh867al'
self.access_key_id = 'gz8f1u0t63byzdu6ozbx8r5qs3e5lipt'
def send_post_get(self, api_endpoint, payload, is_check=False):
print(f"{api_endpoint}: 参数为{payload}")
try:
arm_obj = ArmCloudSignatureV2(self.access_key_id, self.secret_key, self.end_point)
response = arm_obj.post(api_endpoint, payload)
result = response.json()
print(f"接口:{api_endpoint}返回结果为{result} ")
if result.get("code") == 200:
if isinstance(result['data'], dict):
return [result['data']]
elif isinstance(result['data'], list):
return result['data']
elif result.get("code") == 110031:
time.sleep(random.randint(1, 3))
if is_check:
return result
elif result.get("code") == 110012:
time.sleep(random.randint(1, 3))
return []
except Exception as e:
print(f"<UNK>: {str(e)}")
return []
def one_click_new_device(self, pad_code: list[str], country: str):
api_endpoint = "/openapi/open/pad/replacePad"
if country:
payload = {"padCodes": pad_code, 'countryCode': country, "realIphoneTemplateId": 37}
else:
payload = {"padCodes": pad_code, "realIphoneTemplateId": 37}
return self.send_post_get(api_endpoint, payload)
@staticmethod
def set_network_proxy(pad_code: list[str], country: str, proxy_platform="kkoip"):
if proxy_platform == "kkoip":
print(f"{pad_code}使用kkoip")
set_network_proxy(pad_codes=pad_code, account='6255452-2ff229fb',
password='ef9996e4-%s-%d-60m' % (country, int(time.time() * 1000) + random.randint(0, 10000)),
ip='gate-hk.kkoip.com', port='15636', proxy_name='socks5', proxy_type='proxy')
if proxy_platform == "uniproxy":
print(f"{pad_code}使用 uniproxy")
session = random.randint(1000000, 1000000000)
# curl -x socks5://uniproxy-zone-custom-region-US:uniproxy@adv3.uipoxy.com:3000 ipinfo.io
set_network_proxy(pad_codes=pad_code,
account=f'uniproxy-zone-custom-region-{country}-session-{session}-sessTime-60-rc-0',
password='uniproxy',
ip='adv3.uipoxy.com', port='3000', proxy_name='socks5', proxy_type='proxy')
def install_app(self, pad_code: list[str]):
end_point = "/openapi/open/pad/installApp"
payload = {"apps": [{"appId": 691720698, "appName": "AutoJs6", "pkgName": "org.autojs.autojs6", "padCodes": pad_code, }]}
print(f"{self.get_local_time()};开始安装autojs")
self.send_post_get(end_point, payload)
def get_app_list(self, pad_code):
end_point = "/openapi/open/pad/listInstalledApp"
payload = {"padCodes": pad_code}
return self.send_post_get(end_point, payload)
def wait_install_app(self, pad_code, package_name, timeout=300):
start_time = time.time()
while time.time() - start_time < timeout:
apps = self.get_app_list(pad_code)
if len(apps[0]['apps']) > 0 and package_name in [app['packageName'] for app in apps[0]['apps']]:
return True
time.sleep(5)
return False
def turn_on_root(self, pad_code, app_id="org.autojs.autojs6"):
end_point = "/openapi/open/pad/switchRoot"
payload = {
"padCodes": pad_code,
"packageName": app_id,
"globalRoot": False,
"rootStatus": 1
}
return self.send_post_get(end_point, payload)
def grant_autojs_permission(self, pad_code):
end_point = "/openapi/open/pad/asyncCmd"
# adb shell pm grant org.autojs.autojs6 android.permission.WRITE_SECURE_SETTINGS
if isinstance(pad_code, str):
pad_code = [pad_code]
for cmd in ["appops set org.autojs.autojs6 MANAGE_EXTERNAL_STORAGE allow"]:
payload = {"padCodes": pad_code, "scriptContent": cmd}
return self.send_post_get(end_point, payload)
def upload_file_to_phone(self, pad_code: list[str], file_name="uninetall.js"):
end_point = "/openapi/open/pad/v3/uploadFile"
file_id = self.find_file_unique_id(file_name)
payload = {"padCodes": pad_code,
"isAuthorization": False,
"autoInstall": 0,
"fileUniqueId": file_id,
"customizeFilePath": "/Download"
}
return self.send_post_get(end_point, payload)
def find_file_unique_id(self, file_name="uninetall.js"):
end_point = "/file-center/open/file/list"
payload = {"page": 1, "rows": 10, "fileName": file_name}
res = self.send_post_get(end_point, payload)
print(res)
if res:
if len(res[0]['pageData']) > 0:
return res[0]['pageData'][0]['fileUniqueId']
return None
@staticmethod
def build_autojs_command(params: str, file_name: str) -> str:
"""安全构建AutoJS启动命令"""
script_path = '/sdcard/Download/' + file_name
return (
f'am start -n "org.autojs.autojs6/org.autojs.autojs.external.open.RunIntentActivity" '
f'--es path "{script_path}" --es referrer {shlex.quote(params)}'
)
def check_phone_status(self, pad_code: list[str], file_name: str,times=10):
count = 0
while True:
result = self.async_execute_adb_command(pad_code, file_name, is_check=True)
if result:
if isinstance(result, list):
return True
else:
print(f"{pad_code[0]}未就绪; 返回结果为: {result}")
count += 1
time.sleep(30)
if count == times:
return False
def async_execute_adb_command(self, pad_code: list[str], file_name, is_check=False):
end_point = "/openapi/open/pad/asyncCmd"
script_content = self.build_autojs_command(json.dumps({"instance_id": pad_code[0]}), file_name)
if isinstance(pad_code, str):
pad_code = [pad_code]
payload = {"padCodes": pad_code, "scriptContent": script_content}
return self.send_post_get(end_point, payload, is_check=is_check)
def alter_timezones(self, pad_code: list[str], country_code):
if isinstance(pad_code, str):
pad_code = [pad_code]
end_point = "/openapi/open/pad/updateTimeZone"
# time_zone_list = ['America/Los_Angeles', 'America/New_York', 'America/Denver', 'America/Anchorage', 'America/Chicago']
time_zone_list = pytz.country_timezones.get(country_code)
time_zone = random.choice(time_zone_list)
payload = {"padCodes": pad_code, "timeZone": time_zone}
print(f"云机:{pad_code};将时区修改为:{time_zone}")
return self.send_post_get(end_point, payload)
def backup_system(self, pad_code: list[str], backup_name: str):
end_point = "/openapi/open/pad/local/pod/backup"
payload = {
"padCode": pad_code[0],
"backupName": backup_name,
"backupPath": "backup/iaa/",
"ossConfig": {
'endpoint': "oss-cn-hongkong.aliyuncs.com",
'bucket': "zj-hk",
'accessKey': "LTAI5tBs84AciVsFfmgKL3VQ",
'secretKey': "faXPkY5auvwzFdqFISIzPTFj11FEcg",
'protocol': "https",
'region': "cn-hongkong",
}
}
return self.send_post_get(end_point, payload)
def restore_system(self, pad_code: list[str], backup_name: str):
end_point = "/openapi/open/pad/local/pod/restore"
payload = {"padCode": pad_code[0], "backupId": backup_name}
return self.send_post_get(end_point, payload)
def restart_system(self, pad_code: list[str]):
end_point = "/openapi/open/pad/restart"
payload = {"padCode": pad_code}
return self.send_post_get(end_point, payload)
def restore_script_start(self, task_info: Dict[str, Any]):
# 睡眠三分钟后开始执行
time.sleep(180)
retry_count = 0
while True:
self.upload_file_to_phone(pad_code=task_info["pad_code"], file_name=task_info["retentionScript"])
time.sleep(6)
self.grant_autojs_permission(pad_code=task_info["pad_code"])
time.sleep(5)
result = self.async_execute_adb_command(task_info["pad_code"], task_info["retentionScript"])
if result:
break
time.sleep(60)
retry_count += 1
if retry_count > 5:
break

148
public_function/kookey.py Normal file
View File

@ -0,0 +1,148 @@
# -*- coding: utf-8 -*-
import json
import hmac
import time
import random
import hashlib
import requests
from urllib.parse import urlparse
# 示例参数
secret_key = "3yc8c8bg1dym0zaiwjh867al"
end_point = "https://openapi-hk.armcloud.net"
access_key_id = "gz8f1u0t63byzdu6ozbx8r5qs3e5lipt"
default_bypass_domain = ['rr1---sn-i3b7knsl.gvt1.com']
def calculate_signature(timestamp, path, body):
string_to_sign = timestamp + path + (body if body else "")
signature = hmac.new(
secret_key.encode('utf-8'),
string_to_sign.encode('utf-8'),
hashlib.sha256
).hexdigest()
return signature
def sig_request(req: requests.Request):
re_qu = urlparse(req.url)
query_string = ""
if req.method == "GET":
query_string = re_qu.query
elif req.method == "POST":
query_string = req.data
# 生成时间戳
timestamp = str(int(time.time() * 1000))
signature = calculate_signature(timestamp=timestamp, path=re_qu.path, body=query_string)
req.headers["authver"] = "2.0"
req.headers["x-ak"] = access_key_id
req.headers["x-timestamp"] = timestamp
req.headers["x-sign"] = signature
def api(api_endpoint, payload):
try:
headers = {"Content-Type": "application/json"}
payloads = json.dumps(payload)
# 3. 发送请求
r = requests.Request('POST', api_endpoint,
headers=headers, data=payloads)
sig_request(r)
session = requests.Session()
response = session.send(r.prepare())
response.raise_for_status()
# 4. 解析响应
result = response.json()
# print("API %s %s %s", api_endpoint, payloads, result)
if result.get("code") == 200:
if isinstance(result['data'], dict):
return [result['data']]
elif isinstance(result['data'], list):
return result['data']
# 5. 业务状态码检查
error_map = {
110003: "ADB命令执行失败请联系管理员",
110012: "命令执行超时,请稍后重试"
}
error_code = result.get("code")
err_msg = " %s %s %s" % (error_map.get(
error_code, f"未知错误: {result.get('msg')}"), error_code, result.get("ts"))
except requests.exceptions.RequestException as e:
err_msg = f"网络请求异常: {str(e)} 500"
except json.JSONDecodeError:
err_msg = "响应解析失败 502"
except KeyError as e:
err_msg = f"响应字段缺失: {str(e)} 503"
print(err_msg)
return []
def set_network_proxy(pad_codes, account, password, ip, port, proxy_type, proxy_name,
bypass_package_list=None,
bypass_ip_list=None,
bypass_domain_list=None,
enable=True,
s_uot=False):
"""
设置网络代理配置
参数说明
pad_codes (list): 实例列表 (必填)
enable (bool): 启用代理 (必填)
account (str): 账号
password (str): 密码
ip (str): 代理IP
port (int): 代理端口
proxy_type (str): 代理类型 (proxy/vpn)
proxy_name (str): 代理协议 (socks5/http-relay)
bypass_package_list (list): 不走代理的包名列表
bypass_ip_list (list): 不走代理的IP列表
bypass_domain_list (list): 不走代理的域名列表
s_uot (bool): 是否开启UDP连接
返回: API响应数据或错误信息
"""
# 构造请求URL (替换为实际域名)
url = end_point + "/openapi/open/network/proxy/set"
# 构造请求体
if isinstance(pad_codes, str):
pad_codes = [pad_codes]
payload = {
"padCodes": pad_codes,
"enable": enable,
"sUoT": s_uot
}
# 添加可选参数
if account:
payload["account"] = account
if password:
payload["password"] = password
if ip:
payload["ip"] = ip
if port:
payload["port"] = port
if proxy_type:
payload["proxyType"] = proxy_type
if proxy_name:
payload["proxyName"] = proxy_name
if bypass_package_list:
payload["bypassPackageList"] = bypass_package_list
if bypass_ip_list:
payload["bypassIpList"] = bypass_ip_list
if bypass_domain_list:
bypass_domain_list.extend(default_bypass_domain)
payload["bypassDomainList"] = bypass_domain_list
return api(url, payload)
# tasks = set_network_proxy(pad_codes=["ACP250915XAGEH03"],
# account=f'uniproxy-zone-custom-region-US-session-{random.randint(1000000, 1000000000)}-sessTime-20',
# password='uniproxy',
# ip='adv1.uipoxy.com', port=3000,
# proxy_name='socks5',
# proxy_type='proxy')
# print(tasks)
# set_cmd = f"/data/wyproxy/run.sh -proxy socks5://6255452-2ff229fb:uniproxy@adv1.uipoxy.com:3000 -dns 8.8.8.8 -noudp"
# # curl -x socks5://uniproxy-zone-custom:uniproxy@adv1.ipmoyu.com:3000 ipinfo.io
# # curl -x socks5://uniproxy-zone-custom-region:uniproxy@adv1.ipmoyu.com:3000 ipinfo.io