# -*- coding: utf-8 -*- from email.header import Header import unittest import requests import websockets import asyncio import datetime import json from datetime import datetime import serial from hashlib import sha256 import json from fengzhuang import cks import random from email.header import Header import unittest import requests import hmac import hashlib import hashlib import base64 import urllib.parse import time import hmac import base64 import hashlib import hmac import hashlib import hashlib import base64 import urllib.parse import time import hmac import base64 import serial import hashlib from serial.win32 import GetCommTimeouts timess = GetCommTimeouts from hashlib import sha256 # from jiekou.接口1.login import response, str_sign timestamp = str(int(time.time())) uuid = "44c12d6471794f6fb712b7ceee128e83" url = "http://api.hassecurity.cn" class apis: def __init__(self): timestamp = str(int(time.time())) self.login_url = url+"/v1/user/login" self.getusers_url = url+"/v1/user/info" self.resetSend_url = url+"/v1/user/resetSend" self.resetpwd_url = url+"/v1/user/reset" self.putAvatar_url = url+"/v1/user/putAvatar" self.updatePwd_url = url+"/v1/user/updatePwd" self.updateInfo_url= url+"/v1/user/updateInfo" self.deletet_url = url+"/v1/device/remove" self.putClient_url = url+"/v1/user/putClient" self.bind_url = url+"/v1/device/bind" self.upgradedVersion_url = url+"/v1/device/upgradedVersion" self.models_url = url+"/v1/device/models" self.newList_url = url+"/v1/device/newList" self.UpdateName_url = url+"/v1/device/upName" self.evenlist_url = url+"/v1/event/list" self.eventread_url = url+"/v1/event/read" self.evendelete_url = url+"/v1/event/delete" self.eventunreadNum_url = url+"/v1/event/unreadNum" self.time_url = url+"/time" self.getUrl_url = url+"/getUrl" self.homeCreate_url = url+"/v1/device/homeCreate" self.homeslist_url = url+"/v1/device/homes" self.homeUpdate_url = url+"/v1/device/homeUpdate" self.homeChange_url = url+"/v1/device/homeChange" self.homeDevices_url = url+"/v1/device/homeDevices" self.homeUsers_url = url+"/v1/device/homeUsers" self.homeShare_url = url+"/v1/device/homeShare" #分享家庭后需要去调用获取消息列表拿msg_id传入到用户反馈分享结果 self.messagelist_url = url+"/v1/message/list" self.homeShareFeedback_url = url+"/v1/device/homeShareFeedback" self.homeShareRemove_url = url+"/v1/device/homeShareRemove" self.homeAddDevice_url = url+"/v1/device/homeAddDevice" self.homeDelete_url = "http://api.hassecurity.cn/v1/device/homeDelete" self.cloudToken_url = url+"/v1/cloud/getToken" self.cloudFiles_url = url+"/v1/cloud/files" self.cloudputToken_url = url+"/v1/cloud/putToken"#设备获取文件上报token self.cloudputFile_url = url+"/v1/cloud/putFile" # self.cloudputToken_url = "http://api.hassecurity.cn/v1/cloud/putFile" self.messageread_url = url+"/v1/message/read" self.messageunreaNum_url = url+"/v1/message/unreadNum" self.messagedelete_url = url+"/v1/message/delete" self.deviceshare_url = url+"/v1/device/share" self.deviceshareRecords_url = url+"/v1/device/shareRecords" self.deviceshareFeedback_url = url+"/v1/device/shareFeedback" self.deviceshareDelete_url = url+"/v1/device/shareDelete" self.feedbackcreate_url = url+"/v1/feedback/create" self.feedbacklist_url = url+"/v1/feedback/list" self.feedbackinfo_url = url+"/v1/feedback/info" self.feedbacksend_url = url+"/v1/feedback/send" self.leavewordlist_url = url+"/v1/leaveword/list" self.leavewordunreadNumber_url = url+"/v1/leaveword/unreadNumber" self.leavewordread_url = url+"/v1/leaveword/read" self.leaveworddeviceList_url = url+"/v1/leaveword/deviceList"#设备获取留言列表 self.leaveworddeviceUnreadNumber_url = url+"/v1/leaveword/deviceUnreadNumber" self.leaveworddeviceRead_url = url+"/v1/leaveword/deviceRead" self.devicereset_url = url+"/v1/device/reset" self.bindmac_url = "http://admin.hassecurity.cn/v1/getUuid"#传mac self.activate_url = "http://admin.hassecurity.cn/v1/confirm"#传mac uuid self.devicegetAllUser_url = url+"/v1/device/getAllUser" self.deviceputAllUser_url = url+"/v1/device/putAllUser" self.devicelogin_url = url+"/v1/device/login" # 关联的字典 self.relations = {} #sign加密 def __get__relations(self, params=''): """ 获取关联后的字符串 约定,如果要使用关联的变量,形式为{paramname} :param s: 需要关联的字符串 :return: 返回关联后的字符串 """ if params is None or params == '': return None for key_value in self.relations.keys(): params = params.replace('{' + key_value + '}', str(self.relations[key_value])) return params def randoms(self, value): str = "ABCDEFGHIJKLMNOPQRSTabcdefglijklmnopqr1234567890" params = ''.join(__import__('random').choice(str) for i in range(value)) print(params) # self.relations['randoms'] = params # print(self.relations['randoms']) return params def randomChinese(self, value): str = "中文测试四五六七八九十" params = ''.join(__import__('random').choice(str) for i in range(value)) print(params) # self.relations['randomChinese'] = params # print(self.relations['randomChinese']) return params def randomspecial(self, value): str = "!@#$&*带中文字符" params = ''.join(__import__('random').choice(str) for i in range(value)) print(params) # self.relations['randomspecial'] = params # print(self.relations['randomspecial']) return params def randomnumber(self): return random.choice([1, 2,3,4,5,6,7,8,9,10,11,12]) # def assertss(self,code,masg,printok): # try: # assert response["code"] == code # assert response["msg"] == masg # print(printok,"接口正常",response) # except: # print(printok,"接口异常",response) def replace_plus_in_query(self,query_string): # 使用urllib.parse.quote_plus编码查询字符串 encoded_query = urllib.parse.quote_plus(query_string) # 替换所有的'+'为其他字符,例如空格' '或者'-'等 replaced_query = encoded_query.replace('+', ' ') # 例如,将'+'替换为空格 return replaced_query def get_hmac_sha256(self,message, secret): """ 生成HMAC-SHA256签名 :param message: 待加密的源字符串 :param secret: 密钥 :return: Base64编码后的签名 """ message = message.encode('utf-8') # 字符串转字节 secret = secret.encode('utf-8') # 密钥转字节 # HMAC-SHA256加密 signature = hmac.new(secret, message, digestmod=hashlib.sha256).hexdigest() # Base64编码并返回字符串 return base64.b64encode(signature.encode()).decode() #get请求拼接sign,带token def get_sign(self): str_sign = ("GET" + "&" + self.relations['access_token'] + "IOS_1.0.30" + "994f9d2e0c5ad0f3" + "b6e91dfb8defa381cb1c3eda4e5235f0" + "bbf91e0d-d397-4775-8296-25c99e5f7403" + timestamp + "&") # print(str_sign) return str_sign def delete_sign(self,data): str_sign = "DELETE" + "&" + self.relations['access_token'] +"IOS_1.0.30" + "994f9d2e0c5ad0f3" + "b6e91dfb8defa381cb1c3eda4e5235f0" + "bbf91e0d-d397-4775-8296-25c99e5f7403" + timestamp + "&" + urllib.parse.urlencode( sorted((data).items())).replace("+", " ") return str_sign def get_signs(self, data): str_sign = "GET" + "&" + self.relations['access_token'] +"IOS_1.0.30" + "994f9d2e0c5ad0f3" + "b6e91dfb8defa381cb1c3eda4e5235f0" + "bbf91e0d-d397-4775-8296-25c99e5f7403" + timestamp + "&" + urllib.parse.urlencode( sorted((data).items())).replace("+", " ") return str_sign # def post_sign(self,data): # str_sign = "POST" + "&" + self.relations['access_token'] +"IOS_1.0.30" + "994f9d2e0c5ad0f3" + "b6e91dfb8defa381cb1c3eda4e5235f0" + "bbf91e0d-d397-4775-8296-25c99e5f7403" + timestamp + "&" + urllib.parse.urlencode( # sorted((data).items())).replace("+", " ") # return str_sign import urllib.parse def post_sign(self, data): # 1. 构造基础签名部分(保持原样) base_str = ( "POST" + "&" + self.relations['access_token'] + "IOS_1.0.30" + "994f9d2e0c5ad0f3" + "b6e91dfb8defa381cb1c3eda4e5235f0" + "bbf91e0d-d397-4775-8296-25c99e5f7403" + str(timestamp) + "&" ) # 2. 对参数差异化编码 encoded_params = [] for key, value in sorted(data.items()): # 关键点:对值进行定制化编码 encoded_value = self._encode_special_chars(str(value)) encoded_pair = f"{key}={encoded_value}" encoded_params.append(encoded_pair) # 3. 拼接参数(不强制替换 +,除非服务端要求) param_str = "&".join(encoded_params) return base_str + param_str def _encode_special_chars(self, s): """差异化编码特殊字符""" # 定义不需要编码的字符(根据服务端要求调整) SAFE_CHARS = "-_.~#@!$&*()" # 例如允许 #、!、$、& 不编码 encoded = [] for char in s: if char.isalnum() or char in SAFE_CHARS: encoded.append(char) else: encoded.append(urllib.parse.quote(char)) return "".join(encoded) def bind_mac_sign(self,data,value): str_sign = value + "&" + "1" + "TEST_SL100_20240801" + "SL100" + "bbf91e0d-d397-4775-8296-25c99e5f7403" + timestamp + "TEST" + "&" + urllib.parse.urlencode( sorted(data.items())).replace("+", " ").replace("%20", ":") return str_sign def get_params(self, data): """对应Postman的GetParams()函数,处理GET参数或POST Body""" if not data: return "" # 对参数排序(对应sortedKeys) sorted_items = sorted(data.items()) # 拼接key=value&格式(对应循环处理逻辑) param_str = "" for key, value in sorted_items: # 对应escape()编码,Python用urllib.parse.quote encoded_value = urllib.parse.quote(str(value), safe='').replace("+", "%20") param_str += f"{key}={encoded_value}&" # 去除最后一个&(对应slice(0, -1)) return param_str.rstrip('&') def post_signsss(self,data):#data body str_sign = "POST" + "&" + "IOS_1.0.30" + "994f9d2e0c5ad0f3" + "b6e91dfb8defa381cb1c3eda4e5235f0" + "bbf91e0d-d397-4775-8296-25c99e5f7403" + timestamp + "&" + urllib.parse.urlencode( sorted(data.items())).replace("+", " ") return str_sign #设备 def postdevice_sion(self,data): str_sign = "POST" + "&" + "SL100" + "bbf91e0d-d397-4775-8296-25c99e5f7403" + timestamp + uuid + "&" + urllib.parse.urlencode( sorted(data.items())).replace("+", " ").replace("%3A", ":") return str_sign return str_sign #带参数 def getdevice_sion(self,data): str_sign = "GET" + "&" + "SL100" + "bbf91e0d-d397-4775-8296-25c99e5f7403" + timestamp + uuid + "&" + urllib.parse.urlencode( sorted(data.items())).replace("+", " ").replace("%3A", ":") return str_sign #不带参数 def getdevice_sionNOdata(self): str_sign = "GET" + "&" + "SL100" + "bbf91e0d-d397-4775-8296-25c99e5f7403" + timestamp + uuid + "&" return str_sign #post不带参数 def postdevice_sionNOdata(self): str_sign = "GET" + "&" + "SL100" + "bbf91e0d-d397-4775-8296-25c99e5f7403" + timestamp + uuid + "&" return str_sign '带uid uid放的顺序也很重要' '设备云存储' def getdevice_sionuuid(self): str_sign = "GET" + "&" + "SL100" + "bbf91e0d-d397-4775-8296-25c99e5f7403" + timestamp + "7e1558dff183cd08" + uuid +"&" return str_sign def postdevice_sionsuuid(self,data): str_sign = "POST" + "&" + "SL100" + "bbf91e0d-d397-4775-8296-25c99e5f7403" + timestamp + "7e1558dff183cd08" + uuid + "&" + urllib.parse.urlencode( sorted(data.items())).replace("+", " ").replace("%3A", ":") return str_sign 'post带deta' '设备云存储' def postdevice_sionuuid(self, data): """优化签名生成,严格遵循编码规范""" # 构建基础前缀(与GET完全一致) "uuid替换了" # prefix = "POST&SL100bbf91e0d-d397-4775-8296-25c99e5f7403{}{}576ad09e31e148f68180c985f5eea989&".format( # timestamp, "7e1558dff183cd08" # ) prefix = "POST&SL100bbf91e0d-d397-4775-8296-25c99e5f7403{}{}{}&".format( timestamp, "7e1558dff183cd08",uuid ) # 处理参数:排序+编码(移除不必要的替换) sorted_params = sorted(data.items()) encoded_params = urllib.parse.urlencode(sorted_params) # 拼接完整签名 str_sign = prefix + encoded_params return str_sign def postdevice_sionuuids(self, data): # 1. 将嵌套data转为JSON字符串(注意:服务端需支持此格式) data_str = json.dumps(data, separators=(',', ':')) # 去除空格,压缩格式 # 2. 对JSON字符串进行urlencode编码(而非直接对字典编码) encoded_data = urllib.parse.quote(data_str, safe='') # safe='' 表示编码所有特殊字符 # 3. 构造正确前缀(参考步骤1修复后的顺序) prefix = "POST&SL100&bbf91e0d-d397-4775-8296-25c99e5f7403&{}&{}&{}&".format( timestamp, "7e1558dff183cd08", uuid ) # 4. 拼接签名(前缀 + 编码后的参数) str_sign = prefix + encoded_data return str_sign #云存获取设备 带了uid def device_sions(self,data): base_sign = ( "POST" + "&" + "SL100" + "&" + "bbf91e0d-d397-4775-8296-25c99e5f7403" + "&" + str(timestamp) + "&" + uuid+ # "576ad09e31e148f68180c985f5eea989" "7e1558dff183cd08" ) # 如果有参数则添加参数部分 if data: param_str = "&" + urllib.parse.urlencode(sorted(data.items())).replace("+", " ").replace("%3A", ":") return base_sign + param_str return base_sign def webtoken(self): login_Headers = { 'phone_code': 'b6e91dfb8defa381cb1c3eda4e5235f0', "access_token": self.relations['access_token'] } return login_Headers def asserts(self,assertss,code,value,value1): if assertss['code'] == code or assertss['msg'] == 'ok': print(value,assertss) else: print(value1,assertss) def assertsmsg(self,assertss,code,value,value1,msg): if assertss['code'] == code and assertss['msg'] == msg: print(value,assertss) else: print(value1,assertss) def gethttpheaders(self, value): login_Headers = { 'appid': '994f9d2e0c5ad0f3', 'request_id': 'bbf91e0d-d397-4775-8296-25c99e5f7403', 'phone_code': 'b6e91dfb8defa381cb1c3eda4e5235f0', 'timestamp': timestamp, 'app_version': 'IOS_1.0.30', 'sign': self.get_hmac_sha256(value, "c5ca86260e08e561996e5960dc93d2f0"), 'Content-Type': 'application/json', "access_token": self.relations['access_token'] } return login_Headers #设备 def deviceheaders(self, value): login_Headers = { 'appid': '994f9d2e0c5ad0f3', 'request_id': 'bbf91e0d-d397-4775-8296-25c99e5f7403', 'timestamp': timestamp, 'sign': self.get_hmac_sha256(value, "a94f6fe623ce98720825198e9b57c4d7"), 'Content-Type': 'application/json', 'uuid': uuid, 'model': 'SL100', 'uid' : '7e1558dff183cd08' } return login_Headers def getdeviceheaders(self, value): login_Headers = { 'appid': '994f9d2e0c5ad0f3', 'request_id': 'bbf91e0d-d397-4775-8296-25c99e5f7403', 'timestamp': timestamp, 'sign': self.get_hmac_sha256(value, "a94f6fe623ce98720825198e9b57c4d7"), 'Content-Type': 'application/json', 'uuid': uuid, 'model': 'SL100' } return login_Headers def bandmacheaders(self,value): timestamp = str(int(time.time())) login_Headers = { 'factory_id':'TEST', 'label':'TEST_SL100_20240801', 'batch':"1", 'model':'SL100', 'timestamp': timestamp, 'request_id': 'bbf91e0d-d397-4775-8296-25c99e5f7403', 'sign': self.get_hmac_sha256(value,"rCeOzwisLFLasvlt"), 'Content-Type' : 'application/json' } return login_Headers #delete def DELTEThttpheaders(self, value): login_Headers = { 'appid': '994f9d2e0c5ad0f3', 'request_id': 'bbf91e0d-d397-4775-8296-25c99e5f7403', 'phone_code': 'b6e91dfb8defa381cb1c3eda4e5235f0', 'timestamp': timestamp, 'app_version': 'IOS_1.0.30', 'sign': self.get_hmac_sha256(value, "c5ca86260e08e561996e5960dc93d2f0"), "access_token": self.relations['access_token'] } return login_Headers # http请求头 def httpheaders(self,value): login_Headers = { 'appid': '994f9d2e0c5ad0f3', 'request_id': 'bbf91e0d-d397-4775-8296-25c99e5f7403', 'phone_code': 'b6e91dfb8defa381cb1c3eda4e5235f0', 'timestamp': timestamp, 'app_version': 'IOS_1.0.30', 'sign': self.get_hmac_sha256(value, "c5ca86260e08e561996e5960dc93d2f0"), 'Content-Type': 'application/json' } return login_Headers #登录子账号(分享账号) def uesrlog_share(self): data = { "username": "15814087116", "type": "password", "password": "10d50a1e43a718a9c31bc450a0985131", "phone_brand": "XiaoMi 14", "code": "767092" } # str_sign = "POST" + "&" + "IOS_1.0.30" + "994f9d2e0c5ad0f3" + "b6e91dfb8defa381cb1c3eda4e5235f0" + "bbf91e0d-d397-4775-8296-25c99e5f7403" + timestamp + "&" + urllib.parse.urlencode( sorted(data.items())).replace("+", " ") # self.post_sign(data) # print(str_sign) # 带密钥KEY 通过sha256加密之后base64 login_Headers = { 'appid': '994f9d2e0c5ad0f3', 'request_id': 'bbf91e0d-d397-4775-8296-25c99e5f7403', 'phone_code': 'b6e91dfb8defa381cb1c3eda4e5235f0', 'timestamp': timestamp, 'app_version': 'IOS_1.0.30', 'sign': self.get_hmac_sha256(str_sign, "c5ca86260e08e561996e5960dc93d2f0"), 'Content-Type': 'application/json' } # print(login_Headers) response = requests.post(url=self.login_url, headers=login_Headers, data=json.dumps(data)).json() # print("用户登录",response) access_token = response["data"]["access_token"] self.relations['access_token'] = access_token # print('------------------------------',self.relations) try: assert response["code"] == 1000 assert response["msg"] == "ok" # print("用户登陆成功",response) except Exception as e: pass # print("用户登录失败",response) return access_token #登录获取token def uesrlog(self): data = { "username": "18875374973", "type": "password", "password": "10d50a1e43a718a9c31bc450a0985131", "phone_brand": "XiaoMi 14", "code": "767092" } # str_sign = "POST" + "&" + "IOS_1.0.30" + "994f9d2e0c5ad0f3" + "b6e91dfb8defa381cb1c3eda4e5235f0" + "bbf91e0d-d397-4775-8296-25c99e5f7403" + timestamp + "&" + urllib.parse.urlencode( sorted(data.items())).replace("+", " ") # self.post_sign(data) # print(str_sign) # 带密钥KEY 通过sha256加密之后base64 login_Headers = { 'appid': '994f9d2e0c5ad0f3', 'request_id': 'bbf91e0d-d397-4775-8296-25c99e5f7403', 'phone_code': 'b6e91dfb8defa381cb1c3eda4e5235f0', 'timestamp': timestamp, 'app_version': 'IOS_1.0.30', 'sign': self.get_hmac_sha256(str_sign, "c5ca86260e08e561996e5960dc93d2f0"), 'Content-Type': 'application/json' } # print(login_Headers) response = requests.post(url=self.login_url, headers=login_Headers, data=json.dumps(data)).json() # print("用户登录",response) access_token = response["data"]["access_token"] self.relations['access_token'] = access_token # print('------------------------------',self.relations) try: assert response["code"] == 1000 assert response["msg"] == "ok" print("用户登陆成功",response) except Exception as e: print("用户登录失败",response) return access_token def uesrlogerror(self): '未注册账号登录' data = { "username": "18875374974", "type": "password", "password": "10d50a1e43a718a9c31bc450a0985131", "phone_brand": "XiaoMi 14", "code": "767092" } response = requests.post(url=self.login_url,headers=self.httpheaders(self.post_signsss(data)),data=json.dumps(data)).json() asserts = response # if asserts["code"] == 1100 and asserts["msg"] == "invalid username or password": # print("未注册账号登录失败,接口正常",response) # else: # print("未注册账号登录失败,接口正常",response) self.asserts(asserts,1000,"未注册账号登录成功,接口异常","未注册账号登录失败,接口正常") # try: # assert response["code"] == 1100 # assert response["msg"] == "ok" # print("未注册账号登录成功",response) # except Exception as e: # print("未注册账号登录失败",response) # # def getlog(self): '获取用户信息' str_sign = ("GET" + "&" + self.relations['access_token'] + "IOS_1.0.30" + "994f9d2e0c5ad0f3" + "b6e91dfb8defa381cb1c3eda4e5235f0" + "bbf91e0d-d397-4775-8296-25c99e5f7403" + timestamp + "&") # print("第二个",str_sign) login_Headers = { 'appid': '994f9d2e0c5ad0f3', 'request_id': 'bbf91e0d-d397-4775-8296-25c99e5f7403', 'phone_code': 'b6e91dfb8defa381cb1c3eda4e5235f0', 'timestamp': timestamp, 'app_version': 'IOS_1.0.30', 'sign': self.get_hmac_sha256(str_sign, "c5ca86260e08e561996e5960dc93d2f0"), 'Content-Type': 'application/json', "access_token": self.relations['access_token'] } response = requests.get(url=self.getusers_url, headers=login_Headers) print("获取用户信息",response.text) #错误密码 def pwdsuser(self,value): '错误密码' data = { "username": "18875374973", "type": "password", "password": "10d50a1e43a718a9c31bc450a0985132", "phone_brand": "XiaoMi 14", "code": "767092" } str_sign = "POST" + "&" + "IOS_1.0.30" + "994f9d2e0c5ad0f3" + "b6e91dfb8defa381cb1c3eda4e5235f0" + "bbf91e0d-d397-4775-8296-25c99e5f7403" + timestamp + "&" + urllib.parse.urlencode( sorted(data.items())).replace("+", " ") # print(str_sign) # 带密钥KEY 通过sha256加密之后base64 login_Headers = { 'appid': '994f9d2e0c5ad0f3', 'request_id': 'bbf91e0d-d397-4775-8296-25c99e5f7403', 'phone_code': 'b6e91dfb8defa381cb1c3eda4e5235f0', 'timestamp': timestamp, 'app_version': 'IOS_1.0.30', 'sign': self.get_hmac_sha256(str_sign, "c5ca86260e08e561996e5960dc93d2f0"), 'Content-Type': 'application/json' } # print(login_Headers) response = requests.post(self.login_url, headers=login_Headers, data=json.dumps(data)).json() # print("错误密码",response) try: assert response["code"] == 1100 # assert response["msg"] == "ok" print("错误密码登录失败接口返回正常",response) except: print("错误密码登录成功返回异常",response) # def resetSend(self): '发送忘记密码验证码' data = { "username": "18875374973", "type": "password", "password": "10d50a1e43a718a9c31bc450a0985131", } str_sign = "POST" + "&" + "IOS_1.0.30" + "994f9d2e0c5ad0f3" + "b6e91dfb8defa381cb1c3eda4e5235f0" + "bbf91e0d-d397-4775-8296-25c99e5f7403" + timestamp + "&" + urllib.parse.urlencode( sorted(data.items())).replace("+", " ") response = requests.post(self.resetSend_url, headers=self.httpheaders(str_sign), data=json.dumps(data)).json() print("发送忘记密码验证码",response) # #忘记密码 # def resetpwd(self): # data = { # "username": "18875374973", # "code":"", # "password": "10d50a1e43a718a9c31bc450a0985131", # } # # str_sign = "POST" + "&" + "IOS_1.0.30" + "994f9d2e0c5ad0f3" + "b6e91dfb8defa381cb1c3eda4e5235f0" + "bbf91e0d-d397-4775-8296-25c99e5f7403" + timestamp + "&" + urllib.parse.urlencode( # sorted(data.items())).replace("+", " ") # str_sign = "GET" + "&" + self.relations[ # 'access_token'] + "IOS_1.0.30" + "994f9d2e0c5ad0f3" + "b6e91dfb8defa381cb1c3eda4e5235f0" + "bbf91e0d-d397-4775-8296-25c99e5f7403" + timestamp + "&" # response = requests.post() #获取用户头像上传地址 def putAvatar(self): #调用get拼接sign 然后headers传入get_sign # self.get_sign() response = requests.get(url=self.putAvatar_url,headers=self.gethttpheaders(self.get_sign())) print('获取用户头像地址',response.json()) def UpdatePwd(self): '修改密码' data={ "new_password":"10d50a1e43a718a9c31bc450a0985131" } # str_sign = "POST" + "&" + self.relations['access_token'] +"IOS_1.0.30" + "994f9d2e0c5ad0f3" + "b6e91dfb8defa381cb1c3eda4e5235f0" + "bbf91e0d-d397-4775-8296-25c99e5f7403" + timestamp + "&" + urllib.parse.urlencode( # sorted(data.items())).replace("+", " ") response = requests.post(url=self.updatePwd_url, headers=self.gethttpheaders(self.post_sign(data)), data=json.dumps(data)).json() # print("修改密码",response) try: assert response["code"] == 1000 assert response["msg"] == "ok" print("修改密码接口正常",response) except: print("修改密码接口异常",response) # print("头文件",self.gethttpheaders(self.get_sign())) #修改昵称-------------- def updateInfo(self): data={ "nickname":"123" } response = requests.post(url=self.updateInfo_url, headers=self.gethttpheaders(self.post_sign(data)), data=json.dumps(data)).json() # print("修改昵称",response) # self.assertss(1000,"ok","修改昵称") try: assert response["code"] == 1000 assert response["msg"] == "ok" print("修改昵称接口正常",response) except: print("修改昵称接口异常",response) # assert response.json.post('code')==1000 def updateInfomax(self): '''修改昵称--30字符''' data={ "nickname":self.randoms(30) } response = requests.post(url=self.updateInfo_url, headers=self.gethttpheaders(self.post_sign(data)), data=json.dumps(data)).json() asserts = response self.asserts(asserts,5000,"修改昵称--30字符接口正常","修改昵称--30字符接口异常") def updateInfomin(self): '''修改昵称--1字符''' data={ "nickname":self.randoms(1) } response = requests.post(url=self.updateInfo_url, headers=self.gethttpheaders(self.post_sign(data)), data=json.dumps(data)).json() asserts = response self.asserts(asserts,1000,"修改昵称--1字符接口正常","修改昵称--1字符接口异常") def updateInfoSpecial_symbols(self): '''修改昵称--特殊字符''' data={ "nickname":self.randomspecial(5) } response = requests.post(url=self.updateInfo_url, headers=self.gethttpheaders(self.post_sign(data)), data=json.dumps(data)).json() asserts = response self.asserts(asserts,1000,"修改昵称--特殊符号接口正常","修改昵称--特殊符号接口异常") def updateInfoSpecial_Chinese(self): '''修改昵称--特殊字符''' data={ "nickname":self.randomChinese(5) } response = requests.post(url=self.updateInfo_url, headers=self.gethttpheaders(self.post_sign(data)), data=json.dumps(data)).json() asserts = response self.asserts(asserts,1000,"修改昵称--中文接口正常","修改昵称--中文接口异常") def putClient(self): '上报登录设备信息' data = { "push_type":2, "push_token":"cbc16bcc7a47203727415cb2ca339147", "zone":"-8.00" } response = requests.post(url=self.putClient_url,headers=self.gethttpheaders(self.post_sign(data)),data=json.dumps(data)).json() # print('',response) try: assert response["code"] == 1000 assert response["msg"] == "ok" print("上报设备登录信息接口返回正常",response) except Exception as e: print("上报设备登录信息接口返回异常",response) #-------------------------------------------------------------------设备绑定------------------------------------------------------------------- def bind(self,code,value,value1,data,msg): '设备绑定' data = { "uid": "7e1558dff183cd08",#主账号 # "uid": "d500a9738331eeb9", #295账号 # "mac": "b8:a9:af:01:74:27", "mac": data, "zone": "8.00", "version": "1.00.43" } str_sign = "POST" + "&" + "SL100" + "bbf91e0d-d397-4775-8296-25c99e5f7403" + timestamp + "44c12d6471794f6fb712b7ceee128e83" + "&" + urllib.parse.urlencode( sorted(data.items())).replace("+", " ").replace("%3A", ":") login_Headers = { 'appid': '994f9d2e0c5ad0f3', 'request_id': 'bbf91e0d-d397-4775-8296-25c99e5f7403', 'timestamp': timestamp, 'sign': self.get_hmac_sha256(str_sign, "a94f6fe623ce98720825198e9b57c4d7"), 'Content-Type': 'application/json', # 'uuid': 'f3d7ad96ed424b15a5c27e7cdb1015fd', 'uuid':'44c12d6471794f6fb712b7ceee128e83', 'model': 'SL100' } response = requests.post(url=self.bind_url,headers=login_Headers,data=json.dumps(data)).json() # print('绑定',response) asserts = response self.assertsmsg(asserts,code,value,value1,msg) return asserts def deletedevices(self): '删除设备' params = { "uuid": "f3d7ad96ed424b15a5c27e7cdb1015fd" # "uuid": "576ad09e31e148f68180c985f5eea989" } response = requests.delete(url=self.deletet_url, headers=self.DELTEThttpheaders(self.delete_sign(params)), params=params) asserts = response.json() self.asserts(asserts,1000,"设备删除成功,删除接口正常","设备删除失败,删除接口异常") def upgradedVersion_Front(self): '''获取前板设备可升级版本''' params = { "uuid":"f3d7ad96ed424b15a5c27e7cdb1015fd", "flag": "Front" } response =requests.get(url=self.upgradedVersion_url,headers=self.gethttpheaders(self.get_signs(params)),params=params) # print(response.json()) asserts = response.json() self.asserts(asserts,1000,"获取前板设备可升级版本接口正常","获取前板设备可升级版本接口异常") def upgradedVersion_Rear(self): '''获取后板设备可升级版本''' params = { "uuid": "f3d7ad96ed424b15a5c27e7cdb1015fd", "flag": "Rear" } response = requests.get(url=self.upgradedVersion_url, headers=self.gethttpheaders(self.get_signs(params)), params=params) # print(response.json()) asserts = response.json() self.asserts(asserts, 1000, "获取后板设备可升级版本接口正常", "获取后板设备可升级版本接口异常") def models(self): '''获取所有设备型号列表''' response = requests.get(url=self.models_url,headers=self.gethttpheaders(self.get_sign())) asserts = response.json() self.asserts(asserts,1000,"<获取所有设备型号列表接口正常>","获取所有设备型号列表接口异常") def newList(self): '''获取五分钟内绑定的设备''' response = requests.get(url=self.newList_url,headers=self.gethttpheaders(self.get_sign())) asserts = response.json() self.asserts(asserts,1000,"<<获取五分钟内绑定的设备接口正常>>","<获取五分钟内绑定的设备接口异常>") def UpdateName(self,randoms): '''修改设备名称----20字符''' data = { "uuid":"f3d7ad96ed424b15a5c27e7cdb1015fd", 'name':randoms } response = requests.post(url=self.UpdateName_url,headers=self.gethttpheaders(self.post_sign(data)),data=json.dumps(data)).json() asserts = response self.asserts(asserts,1000,"<修改设备名称-输入个字符接口正常>","<修改设备名称-输入20个字符接口异常>") def UpdateName_max(self,randoms): '''修改设备名称--21字符''' data = { "uuid": "f3d7ad96ed424b15a5c27e7cdb1015fd", 'name': randoms } response = requests.post(url=self.UpdateName_url, headers=self.gethttpheaders(self.post_sign(data)), data=json.dumps(data)).json() try : assert response["code"] == 5000 assert response["msg"] == "system error, please contact the administrator" print("<修改设备名称-输入21个字符接口正常>",response) except Exception as e: print("<修改设备名称-输入21个字符接口异常>",response) def UpdateName_min(self,randoms): '''修改设备名称--1字符''' data = { "uuid": "f3d7ad96ed424b15a5c27e7cdb1015fd", 'name': randoms } response = requests.post(url=self.UpdateName_url, headers=self.gethttpheaders(self.post_sign(data)), data=json.dumps(data)).json() asserts = response self.asserts(asserts, 1000, "<修改设备名称-输入1个字符接口正常>", "<修改设备名称-输入1个字符接口异常>") def UpdateName_special(self,randoms): '''修改设备名称--特殊字符''' data = { "uuid": "f3d7ad96ed424b15a5c27e7cdb1015fd", 'name': randoms } response = requests.post(url=self.UpdateName_url, headers=self.gethttpheaders(self.post_sign(data)), data=json.dumps(data)).json() asserts = response self.asserts(asserts, 1000, "<修改设备名称-输入特殊字符接口正常>", "<修改设备名称-输入特殊字符接口异常>") def UpdateName_Chinese(self,randoms): '''修改设备名称--特殊字符''' data = { "uuid": "f3d7ad96ed424b15a5c27e7cdb1015fd", 'name': randoms } response = requests.post(url=self.UpdateName_url, headers=self.gethttpheaders(self.post_sign(data)), data=json.dumps(data)).json() asserts = response self.asserts(asserts, 1000, "<修改设备名称-输入中文接口正常>", "<修改设备名称-输入中文接口异常>") #-------------------------------------------------------------获取事件部分------------------------------------------------------------- def evenlist(self): params = { "uuid": uuid, "start_time": int(time.time()), "date": datetime.now().strftime("%Y-%m-%d"), # 已实现实时日期 "type": self.randomnumber(),#随机获取1~12种tpye类型的事件消息列表 "rows" : 30,#最大条数 } print(f"打印type事件类型{params}") # 发起请求并获取响应 response = requests.get( url=self.evenlist_url, headers=self.gethttpheaders(self.get_signs(params)), params=params ) asserts = response.json() self.asserts(asserts, 1000, "<<获取事件列表接口正常>>", "<获取事件列表接口异常>") # ---------------- 新增:提取第一个事件的id并返回 ---------------- first_event_id = None # 初始化默认值,避免无数据时报错 # 检查响应数据结构是否正确(防止接口返回空列表或格式异常) if asserts.get("code") == 1000 and asserts.get("data") and asserts["data"].get("list"): event_list = asserts["data"]["list"] if len(event_list) > 0: # 确保列表非空 first_event_id = event_list[0]["id"] # 取第一个事件的id print(f"从evenlist提取到第一个事件id:{first_event_id}") else: print("evenlist接口返回无事件数据,无法提取msg_id") return first_event_id # 返回提取到的id(无数据时返回None) def eventread(self): data = { "msg_id" : "685bdb011c9c06e0e8a43066", "uuid" : uuid } response = requests.post(url=self.eventread_url,headers=self.gethttpheaders(self.post_sign(data)),data=json.dumps(data)).json() asserts = response self.asserts(asserts,1000,"<<<修改事件消息的读取状态接口正常>>>","<<修改事件消息的读取状态接口异常>>") def evendel(self): # ---------------- 关键:调用evenlist获取第一个事件的id ---------------- msg_id = self.evenlist() # 接收evenlist返回的第一个事件id if not msg_id: # 若未提取到id(如无事件数据),直接提示并退出,避免无效请求 print("未获取到有效事件id,无法执行删除操作") return # 使用提取到的msg_id构造参数 params = { "msg_id": msg_id, # 替换固定值,使用实时提取的id "uuid": uuid } # 发起删除请求 response = requests.delete(url=self.evendelete_url,headers=self.DELTEThttpheaders(self.delete_sign(params)),params=params ) asserts = response.json() self.asserts(asserts, 1000, "<<<删除事件消息接口正常>>>", "<<删除事件消息接口异常>>") def eventunreadNum(self): params = { "uuid" : uuid } response = requests.get(url=self.eventunreadNum_url,headers=self.gethttpheaders(self.get_signs(params)),params=params) asserts = response.json() self.asserts(asserts,1000,"<<<获取事件未读数量接口正常>>>","<<获取事件未读数量接口异常>>") #------------------------------------------------公共接口--------------------------------------- def times(self): response = requests.get(url=self.time_url) asserts = response.json() self.asserts(asserts,1000,"<<<获取服务器当前时间戳接口正常>>>","<<获取服务器当前时间戳接口异常>>") def getUrl(self): headers = { 'Content-Type': 'application/json', "appid" : "994f9d2e0c5ad0f3" } params = { "uid" : "7e1558dff183cd08" } response = requests.get(url=self.getUrl_url,headers=headers,params=params) asserts = response.json() self.asserts(asserts,1000,"<<<<获取服务器地址接口正常>>>>","<<<获取服务器地址接口异常>>>") # -------------------------------------------------------------------------------------设备家庭------------------------------------------------------------------------ def homeCreate(self,randoms): data={ "name" :randoms } response = requests.post(url=self.homeCreate_url,headers=self.gethttpheaders(self.post_sign(data)),data=json.dumps(data)).json() asserts =response self.asserts(asserts,1000,"<<<<创建家庭接口正常>>>>","<<<创建家庭接口异常>>>") def homeCreatepecial(self,randoms): data={ "name" :randoms } response = requests.post(url=self.homeCreate_url,headers=self.gethttpheaders(self.post_sign(data)),data=json.dumps(data)).json() asserts =response self.asserts(asserts,1000,"<<<<创建家庭-特殊字符接口正常>>>>","<<<创建家庭-特殊字符接口异常>>>") def homeCreateChinese(self,randoms): data={ "name" :randoms } response = requests.post(url=self.homeCreate_url,headers=self.gethttpheaders(self.post_sign(data)),data=json.dumps(data)).json() asserts =response self.asserts(asserts,1000,"<<<<创建家庭-中文字符接口正常>>>>","<<<创建家庭-中文字符接口异常>>>") def homeCreateMax(self,randoms): data={ "name" :randoms } response = requests.post(url=self.homeCreate_url,headers=self.gethttpheaders(self.post_sign(data)),data=json.dumps(data)).json() asserts =response self.asserts(asserts,1000,"<<<<创建家庭-21字符接口正常>>>>","<<<创建家庭-21字符接口异常>>>") def homeCreateMin(self, randoms): data = { "name": randoms } response = requests.post(url=self.homeCreate_url, headers=self.gethttpheaders(self.post_sign(data)), data=json.dumps(data)).json() asserts = response self.asserts(asserts, 1000, "<<<<创建家庭-1字符接口正常>>>>", "<<<创建家庭-1字符接口异常>>>") def homeslist(self): response = requests.get(url=self.homeslist_url, headers=self.gethttpheaders(self.get_sign())) asserts = response.json() # 校验接口返回状态 self.asserts(asserts, 1000, "<<<获取家庭列表接口正常>>>", "<<<获取家庭列表接口异常>>>") # 提取最后一个id if "data" in asserts and len(asserts["data"]) > 0: last_id = asserts["data"][-1]["id"] print("最后一个家庭的id:", last_id) self.relations['home_id'] = last_id return last_id # 返回id供其他方法使用 else: print("家庭列表为空或数据格式错误") return None def homeDelete(self): time.sleep(1) self.homeslist() # home_id = self.homeslist() params = { "home_id":self.relations["home_id"] } # print("这是家庭id"+self.relations["home_id"],params) response = requests.delete(url=self.homeDelete_url,headers=self.gethttpheaders(self.delete_sign(params)),params=params) asserts = response.json() self.asserts(asserts, 1000, "<<<<<<<删除家庭接口正常>>>>>>>","<<<<<删除家庭接口异常>>>>>") def homeUpdate(self,randoms): data = { "home_id" : "685d3d5c8e2acc38e53eb38a", "name" : randoms, "location": "深圳" } response = requests.post(url=self.homeUpdate_url,headers=self.gethttpheaders(self.post_sign(data)),data=json.dumps(data)).json() asserts =response self.asserts(asserts,1000,"<<<<<修改家庭名称或地址接口正常>>>>>","<<<<修改家庭名称或地址接口异常>>>>") def homeUpdatepecial(self,randoms): data = { "home_id" : "685d3d5c8e2acc38e53eb38a", "name" : randoms, "location": "深圳" } response = requests.post(url=self.homeUpdate_url,headers=self.gethttpheaders(self.post_sign(data)),data=json.dumps(data)).json() asserts =response self.asserts(asserts,1000,"<<<<<修改家庭名称-特殊字符接口正常>>>>>","<<<<修改家庭名称-特殊字符接口异常>>>>") def homeUpdateMax(self,randoms): data = { "home_id" : "685d3d5c8e2acc38e53eb38a", "name" : randoms, "location": "深圳" } response = requests.post(url=self.homeUpdate_url,headers=self.gethttpheaders(self.post_sign(data)),data=json.dumps(data)).json() asserts =response self.asserts(asserts,1000,"<<<<<修改家庭名称-30字符接口正常>>>>>","<<<<修改家庭名称-30字符接口异常>>>>") def homeChange(self): data = { "home_id" : "685d3d5c8e2acc38e53eb38a", "uuid" : uuid } response = requests.post(url=self.homeChange_url,headers=self.gethttpheaders(self.post_sign(data)),data=json.dumps(data)).json() asserts =response self.asserts(asserts,1000,"<<<<<更换设备所属家庭(最后一个家庭)接口正常>>>>>","<<<<更换设备所属家庭(最后一个家庭)接口异常>>>>") def homeChangenoe(self): data = { "home_id": "6865f9dd8e2acc38e53eb69f", "uuid": uuid } response = requests.post(url=self.homeChange_url, headers=self.gethttpheaders(self.post_sign(data)), data=json.dumps(data)).json() asserts = response self.asserts(asserts, 1000, "<<<<<更换设备所属家庭(第一个家庭)接口正常>>>>>", "<<<<更换设备所属家庭(第一个家庭)接口异常>>>>") def homeDevices(self): params = { "home_id": "685d3d5c8e2acc38e53eb38a" } response = requests.get(url=self.homeDevices_url, headers=self.gethttpheaders(self.get_signs(params)),params=params) asserts = response.json() self.asserts(asserts,1000,"<<<<<获取家庭下的设备列表接口正常>>>>>","<<<<获取家庭下的设备列表接口异常>>>>>") def homeUsers(self): params = { "home_id" : "685d3d5c8e2acc38e53eb38a" } response = requests.get(url=self.homeUsers_url, headers=self.gethttpheaders(self.get_signs(params)),params=params) asserts = response.json() self.asserts(asserts,1000,"<<<<<<获取家庭下的用户接口正常>>>>>>","<<<<<获取家庭下的用户接口异常>>>>>") def homeShare(self): data = { "home_id" :"685d3d5c8e2acc38e53eb38a", "username" : "15814087116" } response = requests.post(url=self.homeShare_url,headers=self.gethttpheaders(self.post_sign(data)),data=json.dumps(data)).json() asserts = response self.asserts(asserts,1000,"<<<<<<分享家庭给用户接口正常>>>>>>>>","<<<<<分享家庭给用户接口异常>>>>>") # 登录被分享账号去获取消息ID传入用户反馈结果 #分享后需要去获取消息列表 def messagelist(self): #登录被分享账号去获取消息列表 time.sleep(1) self.uesrlog_share() response = requests.get(url=self.messagelist_url,headers=self.gethttpheaders(self.get_sign())) asserts = response.json() self.asserts(asserts,1000,"<<<<<<<获取消息列表正常>>>>>>>","<<<<<获取消息列表异常>>>>>") # 提取消息列表中的第一个id if asserts.get('code') == 1000 and asserts.get('data') and asserts['data'].get('list'): message_list = asserts['data']['list'] if message_list: return message_list[0].get('id') # 返回第一个消息的id return None # 没有找到消息时返回None def messagelisttype(self): #登录被分享账号去获取消息列表 time.sleep(1) self.uesrlog_share() response = requests.get(url=self.messagelist_url,headers=self.gethttpheaders(self.get_sign())) asserts = response.json() self.asserts(asserts,1000,"<<<<<<<获取消息列表正常>>>>>>>","<<<<<获取消息列表异常>>>>>") # 提取type等于4的消息的id if asserts.get('code') == 1000 and asserts.get('data') and asserts['data'].get('list'): message_list = asserts['data']['list'] # 筛选出type等于4的消息 type_4_messages = [msg for msg in message_list if msg.get('type') == 4] if type_4_messages: return type_4_messages[0].get('id') # 返回第一个type等于4的消息的id else: print("没有找到type等于4的消息") return None # 没有找到消息或type等于4的消息时返回None def messagelists(self):#用来调用下面删除消息的 response = requests.get(url=self.messagelist_url,headers=self.gethttpheaders(self.get_sign())) asserts = response.json() # self.asserts(asserts,1000,"<<<<<<<获取消息列表正常>>>>>>>","<<<<<获取消息列表异常>>>>>") # 提取消息列表中的第一个id if asserts.get('code') == 1000 and asserts.get('data') and asserts['data'].get('list'): message_list = asserts['data']['list'] if message_list: return message_list[0].get('id') # 返回第一个消息的id return None # 没有找到消息时返回None def homeShareFeedback(self,number): #获取msg_id msg_id = self.messagelist() if not msg_id: print("获取消息id失败,无法执行反馈操作") return data = { "msg_id" : msg_id, "accept" : number#同意 1同意 2拒绝 } # print(msg_id,"查看") response = requests.post(url=self.homeShareFeedback_url,headers=self.gethttpheaders(self.post_sign(data)),data=json.dumps(data)).json() asserts = response self.asserts(asserts,1000,"<<<<<<<用户反馈分享结果接口(同意加入家庭)正常>>>>>>>","<<<<<用户反馈分享结果接口(同意加入家庭)异常>>>>>") def homeShareRemove(self): time.sleep(1) self.uesrlog()#登录主账号去删除分享 data = { "home_id" : "685d3d5c8e2acc38e53eb38a", "uid" : "f032bda4b2fbcced" } response = requests.post(url=self.homeShareRemove_url,headers=self.gethttpheaders(self.post_sign(data)),data=json.dumps(data)).json() asserts = response self.asserts(asserts,1000,"<<<<<<<删除分享者接口正常>>>>>>>","<<<<<删除分享者接口异常>>>>>") #------------------------------------------------------------- def homeShareFeedbackNo(self,value): #分享家庭给用户 self.homeShare() #获取msg_id msg_id = self.messagelist() if not msg_id: print("获取消息id失败,无法执行反馈操作") return data = { "msg_id": msg_id, "accept": value # 拒绝 } # print(msg_id,"查看") response = requests.post(url=self.homeShareFeedback_url, headers=self.gethttpheaders(self.post_sign(data)), data=json.dumps(data)).json() asserts = response self.asserts(asserts, 1000, "<<<<<<<用户反馈分享结果接口(拒绝加入家庭)正常>>>>>>>","<<<<<用户反馈分享结果接口(拒绝加入家庭)异常>>>>>") def homeAddDevice(self,value): time.sleep(1) self.uesrlog()#再登录原账户进行后续的接口调用 data = { "home_id" : value, "uuid" : uuid } # 683ffa2f77375f5bb1a44521 response = requests.post(url=self.homeAddDevice_url,headers=self.gethttpheaders(self.post_sign(data)),data=json.dumps(data)).json() asserts = response self.asserts(asserts, 1000, "<<<<<<<家庭内添加设备接口正常>>>>>>>","<<<<<家庭内添加设备接口异常>>>>>") #''''''''''''''''''''''''''''''云存储''''''''''''''''''''''''''''''''''''''''''''''''''''' def cloudToken(self): params = { "uuid" : uuid } response = requests.get(url=self.cloudToken_url,headers=self.gethttpheaders(self.get_signs(params)),params=params) asserts = response.json() self.asserts(asserts, 1000, "<<<<<<<>>>>>>>","<<<<<>>>>>") def cloudFiles(self): params = { "date" : 20250625, "uuid": uuid } response = requests.get(url=self.cloudFiles_url,headers=self.gethttpheaders(self.get_signs(params)),params=params) asserts = response.json() self.asserts(asserts, 1000, "<<<<<<<>>>>>>>","<<<<<>>>>>") def cloudputToken(self): response = requests.get(url=self.cloudputToken_url,headers=self.deviceheaders(self.getdevice_sionuuid())) asserts = response.json() self.asserts(asserts, 1000, "<<<<<<<<设备获取文件上报token接口正常>>>>>>>>","<<<<<<设备获取文件上报token接口异常>>>>>>") def cloudputFiles(self): current_time = datetime.now() formatted_time = current_time.strftime("%Y%m%d") data = { "date" : formatted_time, "suffix" : "h264", "zone" : "8.00", "store" : 7, "event_tag":"p_s", "files": "0_7:1_-1" } response = requests.post(url=self.cloudputFile_url,headers=self.deviceheaders(self.postdevice_sionuuid(data)),data=json.dumps(data)).json() asserts = response self.asserts(asserts, 1000, "<<<<<<<<获取设备上云存信息接口正常>>>>>>>>","<<<<<<获取设备上云存信息接口异常>>>>>>") #-----------------------------------------------------------------消息部分------------------------------------------- def messageread(self): data ={ "message_id" : "" } response =requests.post(url=self.messageread_url,headers=self.gethttpheaders(self.post_sign(data)),data=json.dumps(data)).json() asserts = response self.asserts(asserts, 1000, "<<<<<<<<修改消息读取状态接口正常>>>>>>>>","<<<<<<修改消息读取状态接口异常>>>>>>") def messageunreaNum(self): response = requests.get(url=self.messageunreaNum_url,headers=self.gethttpheaders(self.get_sign())) asserts = response.json() self.asserts(asserts,1000,"<<<<<<<<获取消息未读数量接口正常>>>>>>>>","<<<<<<修改消息读取状态接口异常>>>>>>") def messagedelete(self): msg_id = self.messagelists() print(msg_id) params ={ "message_id" : msg_id, } response = requests.delete(url=self.messagedelete_url, headers=self.DELTEThttpheaders(self.delete_sign(params)),params=params) asserts = response.json() self.asserts(asserts,1000,"<<<<<删除消息接口正常>>>>>","<<<<<删除消息接口异常>>>>>") # -----------------------------------------------------------------设备分享------------------------------------------- def deviceshare(self,value): time.sleep(1) self.uesrlog() data = { "Username" : value, "uuid" :uuid } response =requests.post(url=self.deviceshare_url,headers=self.gethttpheaders(self.post_sign(data)),data=json.dumps(data)).json() asserts = response self.asserts(asserts, 1000, "<<<<<设备分享接口正常>>>>>", "<<<<<设备分享接口异常>>>>>") def deviceshareUser(self,value): # self.uesrlog() data = { "Username" : value, "uuid" :'f3d7ad96ed424b15a5c27e7cdb1015fd' } response =requests.post(url=self.deviceshare_url,headers=self.gethttpheaders(self.post_sign(data)),data=json.dumps(data)).json() asserts = response try: assert response["code"] == 1103 assert response["msg"] == 'username does not exist' print("设备分享(不存在的用户)分享失败-接口正常",response) except: print("设备分享(不存在的用户)分享成功-接口异常",response) def deviceshareDevice(self, value): # self.uesrlog() data = { "Username": '15814087116', "uuid": value } response = requests.post(url=self.deviceshare_url, headers=self.gethttpheaders(self.post_sign(data)), data=json.dumps(data)).json() asserts = response try: assert response["code"] == 1302 assert response["msg"] == 'invalid uuid' print("设备分享(不存在的设备)分享失败-接口正常", response) except: print("设备分享(不存在的设备)分享成功-接口异常", response) # asserts = response # self.asserts(asserts, 1000, "<<<<<设备分享(不存在的用户)接口正常>>>>>", "<<<<<设备分享(不存在的用户)接口异常>>>>>") def deviceshareRecords(self): params = { "uuid" : uuid } response =requests.get(url=self.deviceshareRecords_url,headers=self.gethttpheaders(self.get_signs(params)),params=params) asserts = response.json() self.asserts(asserts, 1000, "<<<<<获取分享列表正常>>>>>", "<<<<<获取分享列表异常>>>>>") def devicesshareFeedback(self): #登录被分享账号获取消息列表 self.messagelisttype() msg_id = self.messagelisttype() print(msg_id,"测试返回id") data = { "msg_id" : msg_id, "status" : 2#拒绝 } response = requests.post(url=self.deviceshareFeedback_url,headers=self.gethttpheaders(self.post_sign(data)),data=json.dumps(data)).json() asserts = response self.asserts(asserts, 1000, "用户反馈(拒绝)分享接口正常", "<<<<<用户反馈(拒绝)分享接口异常>>>>>") print("子账号列表") def devicesshareFeedbackOK(self): time.sleep(1) self.uesrlog() self.deviceshare("15814087116")#调用设备分享接口 self.messagelisttype()#再次调用获取消息列表 msg_id = self.messagelisttype() data = { "msg_id" : msg_id, "status" : 1#同意 } response = requests.post(url=self.deviceshareFeedback_url,headers=self.gethttpheaders(self.post_sign(data)),data=json.dumps(data)).json() asserts = response self.asserts(asserts, 1000, "用户反馈(同意)接受分享接口正常", "<<<<<用户反馈(同意)接受分享接口异常>>>>>") def deviceshareDelete(self): time.sleep(1) self.uesrlog() params = { "uuid" : uuid, "uid" : "f032bda4b2fbcced" } response = requests.delete(url=self.deviceshareDelete_url, headers=self.DELTEThttpheaders(self.delete_sign(params)),params=params) asserts = response.json() self.asserts(asserts, 1000, "设备主人移除分享者接口正常", "<<<<<设备主人移除分享者接口异常>>>>>") # -----------------------------------------------------------------反馈------------------------------------------- def create(self): data = { "reason" : 'app', "explain" :'创建用户反馈接口测试' } response = requests.post(url=self.feedbackcreate_url,headers=self.gethttpheaders(self.post_sign(data)),data=json.dumps(data)).json() asserts = response self.asserts(asserts, 1000, "<<<<<<创建用户反馈接口正常>>>>>>", "<<<<<<创建用户反馈接口异常>>>>>>") def feedbacklist(self): response = requests.get(url=self.feedbacklist_url, headers=self.gethttpheaders(self.get_sign())) asserts = response.json() self.asserts(asserts, 1000, "<<<<<<获取反馈列表接口正常>>>>>>", "<<<<<<获取反馈列表接口异常>>>>>>") # 提取第一个反馈ID if asserts.get('code') == 1000 and asserts.get('data') and asserts['data'].get('list'): return asserts['data']['list'][0].get('id') return None def feedbackinfo(self): # 先调用列表接口获取ID feedback_id = self.feedbacklist() parmas = { "feedback_id": feedback_id } response = requests.get(url=self.feedbackinfo_url, headers=self.gethttpheaders(self.get_signs(parmas)), params=parmas) asserts = response.json() self.asserts(asserts, 1000, "<<<<<<获取反馈详情接口正常>>>>>>", "<<<<<<获取反馈详情接口异常>>>>>>") def feedbacksend(self): feedback_id = self.feedbacklist() data = { "feedback_id": feedback_id, "message" : "发送反馈消息测试", } response = requests.post(url=self.feedbacksend_url,headers=self.gethttpheaders(self.post_sign(data)),data=json.dumps(data)).json() asserts = response self.asserts(asserts, 1000, "<<<<<<发送反馈消息接口正常>>>>>>", "<<<<<<发送反馈消息接口异常>>>>>>") #------------------------------------------------------------------留言--------------------------------------------- def leavewordlist(self): params = { "uuid":uuid, "rows" : "30" #获取最大条数 默认10 } response = requests.get(url=self.leavewordlist_url,headers=self.gethttpheaders(self.get_signs(params)),params=params) asserts = response.json() self.asserts(asserts, 1000, "<<<<<<>>>>>>", "<<<<<<>>>>>>") def leavewordunreadNumber(self): response = requests.get(url=self.leavewordunreadNumber_url,headers=self.gethttpheaders(self.get_sign())) asserts = response.json() self.asserts(asserts,1000,"<<<<<<>>>>>>", "<<<<<<>>>>>>") def leavewordread(self): data = { "id" :"" } response = requests.post(url=self.leavewordread_url,headers=self.gethttpheaders(self.post_sign(data)),data=json.dumps(data)).json() asserts = response self.asserts(asserts,1000,"APP设置全部留言已读接口正常","APP设置全部留言已读接口异常") def leaveworddeviceList(self): "设备获取留言列表" params = { "rows" : "20" } response = requests.get(url=self.leaveworddeviceList_url,headers=self.getdeviceheaders(self.getdevice_sion(params)),params=params) asserts = response.json() self.asserts(asserts,1000,"设备获取留言列表接口正常","设备获取留言列表接口异常") def leaveworddeviceUnreadNumber(self): response =requests.get(url=self.leaveworddeviceUnreadNumber_url,headers=self.getdeviceheaders(self.getdevice_sionNOdata())) asserts = response.json() self.asserts(asserts,1000,"设备获取留言未读数量接口正常","设备获取留言未读数量接口正常异常") def leaveworddeviceRead(self): data = { "id" : "" } response =requests.post(url=self.leaveworddeviceRead_url,headers=self.getdeviceheaders(self.postdevice_sion(data)),data=json.dumps(data)).json() asserts = response self.asserts(asserts,1000,"设备设置留言已读接口正常","设备设置留言已读接口异常") def devicegetAllUser(self): response = requests.get(url=self.devicegetAllUser_url,headers=self.deviceheaders(self.getdevice_sionuuid())) asserts = response.json() self.asserts(asserts,1000,"设备获取所有门锁用户正常","设备获取所有门锁用户异常") # print("测试",response.json()) def deviceputAllUser(self): data = { "users": [] } response = requests.post(url=self.deviceputAllUser_url,headers=self.deviceheaders(self.postdevice_sionsuuid(data)),data=json.dumps(data)).json() asserts = response self.asserts(asserts,1000,"设备上报所有门锁用户正常","设备上报所有门锁用户异常") def devicelogin(self): data = { "zone" : "8.00" } response = requests.post(url=self.devicelogin_url,headers=self.deviceheaders(self.postdevice_sionsuuid(data)),data=json.dumps(data)).json() asserts = response self.asserts(asserts,1000,"设备登录成功","设备登录失败") return response # 连接串口 发送指令 # def CDM(self): # time.sleep(2) # value = "has devband HiChS-Guest *12345678# 7e1558dff183cd08\n" # # value = "reboot\n" # app = cks('COM10', 115200, 0, value) # print(time.strftime('%Y.%m.%d %H:%M:%S ', time.localtime(time.time())), "下发配网指令...") def mac(self): # 调试:确认正则表达式模块是否可以导入 try: import re print(f"{time.strftime('%Y.%m.%d %H:%M:%S')} 成功导入正则表达式模块") except ImportError as e: print(f"{time.strftime('%Y.%m.%d %H:%M:%S')} 导入正则表达式模块失败: {e}") # 如果无法导入正则模块,提供替代方案 re = None time.sleep(2) com_port = 'COM10' baud_rate = 115200 timeout = 10 command = "has print env\n" print(f"{time.strftime('%Y.%m.%d %H:%M:%S')} 下发配网指令: {command.strip()}") # 只关注这三个参数 target_info = {'mac': None, 'sn': None, 'uuid': None} max_retries = 3 try: with serial.Serial(com_port, baud_rate, timeout=timeout) as ser: for attempt in range(max_retries): print(f"{time.strftime('%Y.%m.%d %H:%M:%S')} 尝试 {attempt + 1}/{max_retries}") ser.reset_input_buffer() ser.reset_output_buffer() bytes_sent = ser.write(command.encode('gbk')) print(f"{time.strftime('%Y.%m.%d %H:%M:%S')} 已发送 {bytes_sent} 字节数据") time.sleep(1) # 接收数据 response = b'' start_time = time.time() while time.time() - start_time < timeout: if ser.in_waiting > 0: chunk = ser.read(ser.in_waiting) response += chunk print(f"{time.strftime('%Y.%m.%d %H:%M:%S')} 读取到数据片段,长度: {len(chunk)}") if b'===========' in response and len(response) > 500: break else: time.sleep(0.1) print( f"{time.strftime('%Y.%m.%d %H:%M:%S')} 尝试 {attempt + 1} 接收结束,总数据长度: {len(response)}") if len(response) > 0: # 解码响应 try: response_text = response.decode('gbk', errors='replace') except UnicodeDecodeError: response_text = response.decode('utf-8', errors='replace') print(f"{time.strftime('%Y.%m.%d %H:%M:%S')} 收到设备回复,准备提取信息") # 调试:确认正则模块是否可用 if re: print(f"{time.strftime('%Y.%m.%d %H:%M:%S')} 正则表达式模块状态: 可用") else: print(f"{time.strftime('%Y.%m.%d %H:%M:%S')} 正则表达式模块状态: 不可用,将使用替代方法") # 只提取需要的三个参数 patterns = { 'sn': r'sn\s*:\s*(\S+)', 'mac': r'mac\s*:\s*(\S+)', 'uuid': r'uuid\s*:\s*(\S+)' } for key, pattern in patterns.items(): if re: match = re.search(pattern, response_text) if match: target_info[key] = match.group(1).strip() print(f"{time.strftime('%Y.%m.%d %H:%M:%S')} 提取{key.upper()}: {target_info[key]}") else: # 替代方法:使用简单字符串查找 for line in response_text.split('\n'): if key + ' :' in line.lower(): value = line.split(':', 1)[1].strip() target_info[key] = value print(f"{time.strftime('%Y.%m.%d %H:%M:%S')} 提取{key.upper()}: {value}") break # 三个参数都获取到则跳出循环 if all(target_info.values()): print(f"{time.strftime('%Y.%m.%d %H:%M:%S')} 成功获取所有目标信息") break else: print(f"{time.strftime('%Y.%m.%d %H:%M:%S')} 未收到数据,将重试") if attempt < max_retries - 1: time.sleep(1) # 最终输出结果(只显示目标参数) print("\n===== 提取结果 =====") print(f"MAC: {target_info['mac'] or '未找到'}") print(f"SN: {target_info['sn'] or '未找到'}") print(f"UUID: {target_info['uuid'] or '未找到'}") target_info_MAC = target_info['mac'] target_info_SN = target_info['sn'] target_info_UUID = target_info['uuid'] self.relations['MAC'] = target_info_MAC # self.relations['SN'] = target_info_SN # self.relations['UUID'] = target_info_UUID # print(self.relations['MAC'], self.relations['SN'],self.relations['UUID']) print("====================") except serial.SerialException as e: print(f"{time.strftime('%Y.%m.%d %H:%M:%S')} 串口错误: {e}") except Exception as e: print(f"{time.strftime('%Y.%m.%d %H:%M:%S')} 错误: {e}") # 调试:打印完整的错误堆栈 import traceback print(traceback.format_exc()) return target_info def bindmac(self): # self.mac() params = { "label":"TEST_SL100_20240801", "model":"sl100", "batch":"1", # "mac": self.relations['MAC'] "mac":"6c:20:1e:1d:93:7c" } # print("串口传回来的mac",self.relations['MAC']) response = requests.get(url=self.bindmac_url,headers=self.bandmacheaders(self.bind_mac_sign(params,"GET")),params=params) print(f"接口返回结果{response.json()}") # asserts = response #get请求需要将返回数据转回json再提取 response_data = response.json()#转字典 if response_data['code'] == 1000 and response_data['msg'] == 'ok': access_sn = response_data["data"]["sn"]#转换完成后提取 access_uuid = response_data["data"]["uuid"] self.relations['SN'] = access_sn self.relations['UUID'] = access_uuid print(self.relations['SN'],self.relations['UUID']) else: self.relations['msg'] = response_data["msg"] print(self.relations['msg']) "传入mac地址和uuid" def activate(self): data = { "mac": self.relations['MAC'], "uuid": self.relations['UUID'], } print(f'传入mac和uuid:{data}') response =requests.post(url=self.activate_url,headers=self.bandmacheaders(self.bind_mac_sign(data,"POST")),data=json.dumps(data)).json() print(f"接口返回结果:{response}") # def releas(self): # # self.relations['UUID'] = self.relations['UUID'] # print("",self.relations['UUID']) # return self.relations['UUID'] def CMD(self): # 等待2秒 time.sleep(2) # 配置串口参数[ com_port = 'COM10' baud_rate = 115200 timeout = 5 # 设置超时时间为5秒 # 下发配网指令 command = "has devband HiChS-Guest *12345678# 7e1558dff183cd08\n" # command = "reboot\n" print(f"{time.strftime('%Y.%m.%d %H:%M:%S')} 下发配网指令: {command.strip()}") try: # 初始化串口并发送指令 with serial.Serial(com_port, baud_rate, timeout=timeout) as ser: # 发送指令 bytes_sent = ser.write(command.encode('gbk')) print(f"{time.strftime('%Y.%m.%d %H:%M:%S')} 已发送 {bytes_sent} 字节数据") # 清空输入缓冲区(可选) # ser.reset_input_buffer() # 接收设备回复 response = ser.read_until(b'\n') # 读取直到遇到换行符 response = response.decode('gbk', errors='replace').strip() if response: print(f"{time.strftime('%Y.%m.%d %H:%M:%S')} 收到设备回复: {response}") else: print(f"{time.strftime('%Y.%m.%d %H:%M:%S')} 在超时时间内未收到回复") except serial.SerialException as e: print(f"{time.strftime('%Y.%m.%d %H:%M:%S')} 串口通信错误: {e}") except UnicodeDecodeError as e: print(f"{time.strftime('%Y.%m.%d %H:%M:%S')} 解码错误: {e}") except Exception as e: print(f"{time.strftime('%Y.%m.%d %H:%M:%S')} 发生未知错误: {e}") #保活设备 def testmodeon(self): # for i in range(1): time.sleep(2) value = "has testmode on\n" # value = "reboot\n" app = cks('COM10', 115200, 0, value) print(time.strftime('%Y.%m.%d %H:%M:%S ', time.localtime(time.time())), "保活...") # app.close() # 关闭串口 #关于设备的sing get 带参数和不带参数的 # def getdevice_sion(self, data=None): # """生成设备接口签名,data参数可选""" # # 基础签名部分(固定字段) # base_sign = ( # "GET" + "&" + # "SL100" + "&" + # "bbf91e0d-d397-4775-8296-25c99e5f7403" + "&" + # str(timestamp) + "&" + # "576ad09e31e148f68180c985f5eea989" # ) # # # 如果有参数则添加参数部分 # if data: # param_str = "&" + urllib.parse.urlencode(sorted(data.items())).replace("+", " ").replace("%3A", ":") # return base_sign + param_str # return base_sign # # # def leaveworddeviceUnreadNumber(self): # """获取留言未读数量(无参数接口)""" # # 生成签名(不传参数) # sign_str = self.getdevice_sion() # 注意这里不传data # headers = self.getdeviceheaders(sign_str) # response = requests.get( # url=self.leaveworddeviceUnreadNumber_url, # headers=headers # ) # return response.json() # # # def leaveworddeviceList(self): # """获取留言列表(带参数接口)""" # params = {"rows": "20"} # # 生成签名(传入params) # sign_str = self.getdevice_sion(params) # headers = self.getdeviceheaders(sign_str) # response = requests.get( # url=self.leaveworddeviceList_url, # headers=headers, # params=params # ) # return response.json()