# -*- coding: utf-8 -*- import json import hmac import time import random import hashlib import requests from urllib.parse import urlparse # 示例参数 secret_key = "3yc8c8bg1dym0zaiwjh867al" end_point = "https://openapi-hk.armcloud.net" access_key_id = "gz8f1u0t63byzdu6ozbx8r5qs3e5lipt" default_bypass_domain = ['rr1---sn-i3b7knsl.gvt1.com'] def calculate_signature(timestamp, path, body): string_to_sign = timestamp + path + (body if body else "") signature = hmac.new( secret_key.encode('utf-8'), string_to_sign.encode('utf-8'), hashlib.sha256 ).hexdigest() return signature def sig_request(req: requests.Request): re_qu = urlparse(req.url) query_string = "" if req.method == "GET": query_string = re_qu.query elif req.method == "POST": query_string = req.data # 生成时间戳 timestamp = str(int(time.time() * 1000)) signature = calculate_signature(timestamp=timestamp, path=re_qu.path, body=query_string) req.headers["authver"] = "2.0" req.headers["x-ak"] = access_key_id req.headers["x-timestamp"] = timestamp req.headers["x-sign"] = signature def api(api_endpoint, payload): try: headers = {"Content-Type": "application/json"} payloads = json.dumps(payload) # 3. 发送请求 r = requests.Request('POST', api_endpoint, headers=headers, data=payloads) sig_request(r) session = requests.Session() response = session.send(r.prepare()) response.raise_for_status() # 4. 解析响应 result = response.json() # print("API %s %s %s", api_endpoint, payloads, result) if result.get("code") == 200: if isinstance(result['data'], dict): return [result['data']] elif isinstance(result['data'], list): return result['data'] # 5. 业务状态码检查 error_map = { 110003: "ADB命令执行失败,请联系管理员", 110012: "命令执行超时,请稍后重试" } error_code = result.get("code") err_msg = " %s %s %s" % (error_map.get( error_code, f"未知错误: {result.get('msg')}"), error_code, result.get("ts")) except requests.exceptions.RequestException as e: err_msg = f"网络请求异常: {str(e)} 500" except json.JSONDecodeError: err_msg = "响应解析失败 502" except KeyError as e: err_msg = f"响应字段缺失: {str(e)} 503" print(err_msg) return [] def set_network_proxy(pad_codes, account, password, ip, port, proxy_type, proxy_name, bypass_package_list=None, bypass_ip_list=None, bypass_domain_list=None, enable=True, s_uot=False): """ 设置网络代理配置 参数说明: pad_codes (list): 实例列表 (必填) enable (bool): 启用代理 (必填) account (str): 账号 password (str): 密码 ip (str): 代理IP port (int): 代理端口 proxy_type (str): 代理类型 (proxy/vpn) proxy_name (str): 代理协议 (socks5/http-relay) bypass_package_list (list): 不走代理的包名列表 bypass_ip_list (list): 不走代理的IP列表 bypass_domain_list (list): 不走代理的域名列表 s_uot (bool): 是否开启UDP连接 返回: API响应数据或错误信息 """ # 构造请求URL (替换为实际域名) url = end_point + "/openapi/open/network/proxy/set" # 构造请求体 if isinstance(pad_codes, str): pad_codes = [pad_codes] payload = { "padCodes": pad_codes, "enable": enable, "sUoT": s_uot } # 添加可选参数 if account: payload["account"] = account if password: payload["password"] = password if ip: payload["ip"] = ip if port: payload["port"] = port if proxy_type: payload["proxyType"] = proxy_type if proxy_name: payload["proxyName"] = proxy_name if bypass_package_list: payload["bypassPackageList"] = bypass_package_list if bypass_ip_list: payload["bypassIpList"] = bypass_ip_list if bypass_domain_list: bypass_domain_list.extend(default_bypass_domain) payload["bypassDomainList"] = bypass_domain_list return api(url, payload) # tasks = set_network_proxy(pad_codes=["ACP250915XAGEH03"], # account=f'uniproxy-zone-custom-region-US-session-{random.randint(1000000, 1000000000)}-sessTime-20', # password='uniproxy', # ip='adv1.uipoxy.com', port=3000, # proxy_name='socks5', # proxy_type='proxy') # print(tasks) # set_cmd = f"/data/wyproxy/run.sh -proxy socks5://6255452-2ff229fb:uniproxy@adv1.uipoxy.com:3000 -dns 8.8.8.8 -noudp" # # curl -x socks5://uniproxy-zone-custom:uniproxy@adv1.ipmoyu.com:3000 ipinfo.io # # curl -x socks5://uniproxy-zone-custom-region:uniproxy@adv1.ipmoyu.com:3000 ipinfo.io