#Author: Liwenjun
#Desc: 海康安全协议OPENAPI接口python3文档
#Date: 2020/09/05
#EMAIL: 121959858@qq.com
import requests
#import requests_async as requests
import json
import ssl
import hmac
import base64,sys
import hashlib
import uuid
import time
from urllib.parse import urlencode
from urllib3.exceptions import InsecureRequestWarning
import urllib3
urllib3.disable_warnings(InsecureRequestWarning)
import ast
class HttpSignature:
token = ''
server = ''
url = 'artemis/api/v1/oauth/token' #为海康安全认证库openapi,通过签名获取access_token的URL
verify = False
method = 'POST'
hk_app_secret = ''
hk_app_key = ''
base_headers = {
'Accept': 'application/json',
'Content-Type': 'application/json'}
'''Accept:application/json
Content-Type:application/json
content-md5:1B2M2Y8AsgTpgAmY7PhCfg==
x-ca-timestamp:15991250668046
x-ca-nonce:2d77131b-429e-4363-958f-b772af5f70e6
x-ca-key:21524525
x-ca-signature-headers:x-ca-key,x-ca-nonce,hk_app_secretx-ca-timestamp'''
def __init__(self,hk_app_secret=None,hk_app_key=None,server=None,headers={},verify=False):
super().__init__()
if verify==False:
#关闭ssl证书检验
ssl._create_default_https_context = ssl._create_unverified_context
for key,value in headers.items():
self.base_headers[key] = value
self.verify = verify
self.hk_app_secret = hk_app_secret if hk_app_secret else self.hk_app_secret
self.hk_app_key = hk_app_key if hk_app_key else self.hk_app_key
self.server = server if server else self.server
def create_sign_headers(self,method="POST",url="",body="",date=""):
#创建签名headers
nonce = uuid.uuid4()
timestamp = int(round(time.time()) * 1000)
headers = {**self.base_headers}
if method.upper()=='POST':#method为post时需计算body的md5值
#计算body,md5值
if body:
headers['content-md5'] = self.md5_convert(str(body))
else:
headers['content-md5'] = self.md5_convert('')
#此处"1B2M2Y8AsgTpgAmY7PhCfg=="为原海康对空字符串加密后生成的24位md5加密,在不需要
#POST BODY内容的可以直接使用。包括可以通过此MD5值申请TOKEN后进行API接口对接。
#由于时间有限,作者尚未利用python3加密出24位md5字符串
#原海康C#加密格式:
###########################################################################
#MD5 md5 = new MD5CryptoServiceProvider();
#byte[] result = md5.ComputeHash(System.Text.Encoding.UTF8.GetBytes(body));
#Console.WriteLine(result);
#return Convert.ToBase64String(result);
##########################################################################
sign_content_md5 = '{md5value}\n'.format(md5value = '1B2M2Y8AsgTpgAmY7PhCfg==')
else :
sign_content_md5 = ""
#拼装签名字符串
sign_string = "{method}\napplication/json\napplication/json\nx-ca-key:{key}\nx-ca-nonce:{nonce}\nx-ca-timestamp:{timestamp}\n{url}".format(
method=method.upper(),
key=self.hk_app_key,
timestamp=timestamp,
nonce=str(nonce), url=url)
#计算签名字符串
signature = self.signature(self.hk_app_secret, sign_string)
#生成签名后最终headers内容
headers['x-ca-key'] = self.hk_app_key
headers['x-ca-nonce'] = str(nonce)
headers['x-ca-timestamp'] = str(timestamp)
headers['x-ca-signature-headers'] = 'x-ca-key,x-ca-nonce,x-ca-timestamp'
headers['x-ca-signature'] = signature
return headers
def signature(self, key, value):
"""
计算签名
"""
temp = hmac.new(key.encode(), value.encode(), digestmod=hashlib.sha256)
return (base64.b64encode(temp.digest()).decode())
def md5_convert(self,body):
"""
计算字符串md5值,此方法加密出来32位,无法用作海康md5加密,仅做参考。
:param string:body
:return: base64后的content-md5
"""
m = hashlib.md5()
m.update(body.encode())
content_md5 = base64.b64encode(m.hexdigest().encode()).decode()
return content_md5
def post(self,url,params={},body={},header={'Accept': 'application/json', 'Content-Type': 'application/json'}):
try:
if isinstance(params,dict):
params_str = urlencode(params)
if header.get('Date',''):
date = header.get('Date','')
else:
date = ''
sign_headers = self.create_sign_headers(method='post',url=url,body=body,date=date)
req = requests.post(self.server + url + params_str, headers = sign_headers,verify=self.verify)
return req.json() if isinstance(req.json(),dict) else {'code':'0x00000000','msg':'request is error'}
except Exception as e:
return {'code':'0x00000001','msg':e}
def get(self,url,params={},body={},header={'Accept': 'application/json', 'Content-Type': 'application/json'}):
try:
if isinstance(params,dict):
params_str = urlencode(params)
if header.get('Date',''):
date = header.get('Date','')
else:
date = ''
sign_headers = self.create_sign_headers(method='post',url=url,body=body,date=date)
req = requests.get(self.server + url,headers = self.headers,verify=self.verify)
return req.json if req.json else {'code':'0x00000002','msg':'request is error'}
except Exception as e:
return {'code':'0x00000003','msg':e}
class HttpToken:
token = ''
server = ''
url = ''
verify = False
method = 'POST'
def __init__(self,server=None,url=None,token=None):
super().__init__()
ssl._create_default_https_context = ssl._create_unverified_context
self.token = token if token else self.token
self.server = server if server else self.server
self.url = url if url else self.url
self.headers = {
'Accept': 'application/json',
'Content-Type': 'application/json',
'access_token': self.token}
def post(self,url="",params={},body={},access_token=''):
try:
if isinstance(params,dict):
params_str = urlencode(params)
self.headers['access_token'] = access_token
req = requests.post(self.server + url + params_str, data=json.dumps(body),headers = self.headers,verify=self.verify)
return req.json() if req.json() else {'code':'0x00000004','msg':'request is error'}
except Exception as e:
return {'code':'0x00000005','msg':e}
def get(self,url="",params={},body={}):
try:
if isinstance(params,dict):
params_str = urlencode(params)
req = requests.get(self.server + url + params_str, data=json.dumps(body),headers = self.headers,verify=self.verify)
return req.json() if req.json() else {'code':'0x00000006','msg':'request is error'}
except Exception as e:
return {'code':'0x00000007','msg':e}