crawler_task_management/public_function/arm_cloud_signature_v2.py

117 lines
4.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import hmac
import hashlib
import time
import json
import requests
from typing import Dict, Any, Optional
class ArmCloudSignatureV2:
def __init__(self, access_key_id: str, secret_key: str, base_url: str = "https://api.xiaosuanyun.com"):
self.access_key_id = access_key_id
self.secret_key = secret_key
self.base_url = base_url # 国内: https://api.xiaosuanyun.com 海外: https://openapi-hk.armcloud.net
def _calculate_signature(self, timestamp: str, path: str, body: str = "") -> str:
"""计算HMAC-SHA256签名"""
string_to_sign = timestamp + path + (body if body else "")
signature = hmac.new(
self.secret_key.encode('utf-8'),
string_to_sign.encode('utf-8'),
hashlib.sha256
).hexdigest()
return signature
def sign_get_request(self, path: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, str]:
"""为GET请求生成签名头"""
timestamp = str(int(time.time() * 1000))
# query_string = ""
if params:
query_string = "&".join([f"{k}={v}" for k, v in params.items()])
body = query_string
else:
body = ""
signature = self._calculate_signature(timestamp, path, body)
return {
"authver": "2.0",
"x-ak": self.access_key_id,
"x-timestamp": timestamp,
"x-sign": signature
}
def sign_post_request(self, path: str, data: Dict[str, Any]) -> Dict[str, str]:
"""为POST请求生成签名头"""
timestamp = str(int(time.time() * 1000))
request_body = json.dumps(data, ensure_ascii=False, separators=(',', ':'))
signature = self._calculate_signature(timestamp, path, request_body)
return {
"authver": "2.0",
"x-ak": self.access_key_id,
"x-timestamp": timestamp,
"x-sign": signature,
"Content-Type": "application/json"
}
def get(self, path: str, params: Optional[Dict[str, Any]] = None) -> requests.Response:
"""发起GET请求"""
headers = self.sign_get_request(path, params)
url = self.base_url + path
return requests.get(url, params=params, headers=headers)
def post(self, path: str, data: Dict[str, Any]) -> requests.Response:
"""发起POST请求"""
headers = self.sign_post_request(path, data)
url = self.base_url + path
request_body = json.dumps(data, ensure_ascii=False, separators=(',', ':'))
return requests.post(url, data=request_body, headers=headers)
# 使用示例
if __name__ == "__main__":
# 初始化客户端 - 根据地区选择域名
# 国内用户
# client = ArmCloudSignatureV2("ACP250915XAGEH03", "your_secret_key", "https://api.xiaosuanyun.com")
# 海外用户
client = ArmCloudSignatureV2("gz8f1u0t63byzdu6ozbx8r5qs3e5lipt", "3yc8c8bg1dym0zaiwjh867al", "https://openapi-hk.armcloud.net")
# POST请求示例 - 板卡列表查询接口路径请参考OpenAPI文档
device_list_data = {
"page": 1,
"rows": 10,
"padCodes": ["AC21020010391"],
"vmStatus": "1",
"deviceStatus": "0"
}
try:
# 注意此处路径仅为示例实际路径请查看OpenAPI文档
response = client.post("/openapi/open/device/list", device_list_data)
print(f"板卡列表查询 Response: {response.status_code}")
print(f"Response Body: {response.text}")
except Exception as e:
print(f"板卡列表查询 Error: {e}")
# POST请求示例 - 创建云实例接口路径和参数请参考OpenAPI文档
create_instance_data = {
"clusterCode": "001",
"specificationCode": "m2-3",
"imageId": "img-25080826717",
"screenLayoutCode": "realdevice_1440x3120x560",
"number": 2,
"dns": "8.8.8.8",
"storageSize": 16,
"realPhoneTemplateId": 36
}
try:
# 注意此处路径仅为示例实际路径请查看OpenAPI文档
response = client.post("/openapi/open/pad/net/storage/res/create", create_instance_data)
print(f"创建云实例 Response: {response.status_code}")
print(f"Response Body: {response.text}")
except Exception as e:
print(f"创建云实例 Error: {e}")