crawler_task_management/public_function/arm_cloud_signature_v2.py

117 lines
4.3 KiB
Python
Raw Normal View History

2026-01-08 14:10:54 +08:00
# -*- coding: utf-8 -*-
import hmac
import hashlib
import time
import json
import requests
from typing import Dict, Any, Optional
class ArmCloudSignatureV2:
def __init__(self, access_key_id: str, secret_key: str, base_url: str = "https://api.xiaosuanyun.com"):
self.access_key_id = access_key_id
self.secret_key = secret_key
self.base_url = base_url # 国内: https://api.xiaosuanyun.com 海外: https://openapi-hk.armcloud.net
def _calculate_signature(self, timestamp: str, path: str, body: str = "") -> str:
"""计算HMAC-SHA256签名"""
string_to_sign = timestamp + path + (body if body else "")
signature = hmac.new(
self.secret_key.encode('utf-8'),
string_to_sign.encode('utf-8'),
hashlib.sha256
).hexdigest()
return signature
def sign_get_request(self, path: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, str]:
"""为GET请求生成签名头"""
timestamp = str(int(time.time() * 1000))
# query_string = ""
if params:
query_string = "&".join([f"{k}={v}" for k, v in params.items()])
body = query_string
else:
body = ""
signature = self._calculate_signature(timestamp, path, body)
return {
"authver": "2.0",
"x-ak": self.access_key_id,
"x-timestamp": timestamp,
"x-sign": signature
}
def sign_post_request(self, path: str, data: Dict[str, Any]) -> Dict[str, str]:
"""为POST请求生成签名头"""
timestamp = str(int(time.time() * 1000))
request_body = json.dumps(data, ensure_ascii=False, separators=(',', ':'))
signature = self._calculate_signature(timestamp, path, request_body)
return {
"authver": "2.0",
"x-ak": self.access_key_id,
"x-timestamp": timestamp,
"x-sign": signature,
"Content-Type": "application/json"
}
def get(self, path: str, params: Optional[Dict[str, Any]] = None) -> requests.Response:
"""发起GET请求"""
headers = self.sign_get_request(path, params)
url = self.base_url + path
return requests.get(url, params=params, headers=headers)
def post(self, path: str, data: Dict[str, Any]) -> requests.Response:
"""发起POST请求"""
headers = self.sign_post_request(path, data)
url = self.base_url + path
request_body = json.dumps(data, ensure_ascii=False, separators=(',', ':'))
return requests.post(url, data=request_body, headers=headers)
# 使用示例
if __name__ == "__main__":
# 初始化客户端 - 根据地区选择域名
# 国内用户
# client = ArmCloudSignatureV2("ACP250915XAGEH03", "your_secret_key", "https://api.xiaosuanyun.com")
# 海外用户
client = ArmCloudSignatureV2("gz8f1u0t63byzdu6ozbx8r5qs3e5lipt", "3yc8c8bg1dym0zaiwjh867al", "https://openapi-hk.armcloud.net")
# POST请求示例 - 板卡列表查询接口路径请参考OpenAPI文档
device_list_data = {
"page": 1,
"rows": 10,
"padCodes": ["AC21020010391"],
"vmStatus": "1",
"deviceStatus": "0"
}
try:
# 注意此处路径仅为示例实际路径请查看OpenAPI文档
response = client.post("/openapi/open/device/list", device_list_data)
print(f"板卡列表查询 Response: {response.status_code}")
print(f"Response Body: {response.text}")
except Exception as e:
print(f"板卡列表查询 Error: {e}")
# POST请求示例 - 创建云实例接口路径和参数请参考OpenAPI文档
create_instance_data = {
"clusterCode": "001",
"specificationCode": "m2-3",
"imageId": "img-25080826717",
"screenLayoutCode": "realdevice_1440x3120x560",
"number": 2,
"dns": "8.8.8.8",
"storageSize": 16,
"realPhoneTemplateId": 36
}
try:
# 注意此处路径仅为示例实际路径请查看OpenAPI文档
response = client.post("/openapi/open/pad/net/storage/res/create", create_instance_data)
print(f"创建云实例 Response: {response.status_code}")
print(f"Response Body: {response.text}")
except Exception as e:
print(f"创建云实例 Error: {e}")