From 1243c6ec803a5ee217fd7c6f63a13468364eda3f Mon Sep 17 00:00:00 2001 From: luhaitao Date: Fri, 31 Oct 2025 07:58:07 +0000 Subject: [PATCH] =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E6=96=87=E4=BB=B6=E8=87=B3?= =?UTF-8?q?=20/?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- API_Keys.py | 1706 ++++++++++++++++++++++++++++++++++++++++++++ Test_Api_Runner.py | 317 ++++++++ 2 files changed, 2023 insertions(+) create mode 100644 API_Keys.py create mode 100644 Test_Api_Runner.py diff --git a/API_Keys.py b/API_Keys.py new file mode 100644 index 0000000..e43cc1e --- /dev/null +++ b/API_Keys.py @@ -0,0 +1,1706 @@ + +# -*- coding: utf-8 -*- +from email.header import Header +import unittest +import requests +import websockets +import asyncio +import datetime +import json +from datetime import datetime +import serial +from hashlib import sha256 +import json +from fengzhuang import cks +import random +from email.header import Header +import unittest +import requests +import hmac +import hashlib +import hashlib +import base64 +import urllib.parse +import time +import hmac +import base64 +import hashlib +import hmac +import hashlib +import hashlib +import base64 +import urllib.parse +import time +import hmac +import base64 +import serial +import hashlib + +from serial.win32 import GetCommTimeouts + +timess = GetCommTimeouts +from hashlib import sha256 + +# from jiekou.接口1.login import response, str_sign + +timestamp = str(int(time.time())) +uuid = "44c12d6471794f6fb712b7ceee128e83" +url = "http://api.hassecurity.cn" +class apis: + def __init__(self): + timestamp = str(int(time.time())) + self.login_url = url+"/v1/user/login" + self.getusers_url = url+"/v1/user/info" + self.resetSend_url = url+"/v1/user/resetSend" + self.resetpwd_url = url+"/v1/user/reset" + self.putAvatar_url = url+"/v1/user/putAvatar" + self.updatePwd_url = url+"/v1/user/updatePwd" + self.updateInfo_url= url+"/v1/user/updateInfo" + self.deletet_url = url+"/v1/device/remove" + self.putClient_url = url+"/v1/user/putClient" + self.bind_url = url+"/v1/device/bind" + self.upgradedVersion_url = url+"/v1/device/upgradedVersion" + self.models_url = url+"/v1/device/models" + self.newList_url = url+"/v1/device/newList" + self.UpdateName_url = url+"/v1/device/upName" + self.evenlist_url = url+"/v1/event/list" + self.eventread_url = url+"/v1/event/read" + self.evendelete_url = url+"/v1/event/delete" + self.eventunreadNum_url = url+"/v1/event/unreadNum" + self.time_url = url+"/time" + self.getUrl_url = url+"/getUrl" + self.homeCreate_url = url+"/v1/device/homeCreate" + self.homeslist_url = url+"/v1/device/homes" + self.homeUpdate_url = url+"/v1/device/homeUpdate" + self.homeChange_url = url+"/v1/device/homeChange" + self.homeDevices_url = url+"/v1/device/homeDevices" + self.homeUsers_url = url+"/v1/device/homeUsers" + self.homeShare_url = url+"/v1/device/homeShare" + #分享家庭后需要去调用获取消息列表拿msg_id传入到用户反馈分享结果 + self.messagelist_url = url+"/v1/message/list" + self.homeShareFeedback_url = url+"/v1/device/homeShareFeedback" + self.homeShareRemove_url = url+"/v1/device/homeShareRemove" + self.homeAddDevice_url = url+"/v1/device/homeAddDevice" + self.homeDelete_url = "http://api.hassecurity.cn/v1/device/homeDelete" + self.cloudToken_url = url+"/v1/cloud/getToken" + self.cloudFiles_url = url+"/v1/cloud/files" + self.cloudputToken_url = url+"/v1/cloud/putToken"#设备获取文件上报token + self.cloudputFile_url = url+"/v1/cloud/putFile" + # self.cloudputToken_url = "http://api.hassecurity.cn/v1/cloud/putFile" + self.messageread_url = url+"/v1/message/read" + self.messageunreaNum_url = url+"/v1/message/unreadNum" + self.messagedelete_url = url+"/v1/message/delete" + self.deviceshare_url = url+"/v1/device/share" + self.deviceshareRecords_url = url+"/v1/device/shareRecords" + self.deviceshareFeedback_url = url+"/v1/device/shareFeedback" + self.deviceshareDelete_url = url+"/v1/device/shareDelete" + self.feedbackcreate_url = url+"/v1/feedback/create" + self.feedbacklist_url = url+"/v1/feedback/list" + self.feedbackinfo_url = url+"/v1/feedback/info" + self.feedbacksend_url = url+"/v1/feedback/send" + self.leavewordlist_url = url+"/v1/leaveword/list" + self.leavewordunreadNumber_url = url+"/v1/leaveword/unreadNumber" + self.leavewordread_url = url+"/v1/leaveword/read" + self.leaveworddeviceList_url = url+"/v1/leaveword/deviceList"#设备获取留言列表 + self.leaveworddeviceUnreadNumber_url = url+"/v1/leaveword/deviceUnreadNumber" + self.leaveworddeviceRead_url = url+"/v1/leaveword/deviceRead" + self.devicereset_url = url+"/v1/device/reset" + self.bindmac_url = "http://admin.hassecurity.cn/v1/getUuid"#传mac + self.activate_url = "http://admin.hassecurity.cn/v1/confirm"#传mac uuid + self.devicegetAllUser_url = url+"/v1/device/getAllUser" + self.deviceputAllUser_url = url+"/v1/device/putAllUser" + self.devicelogin_url = url+"/v1/device/login" + + + # 关联的字典 + self.relations = {} + #sign加密 + def __get__relations(self, params=''): + """ + 获取关联后的字符串 + 约定,如果要使用关联的变量,形式为{paramname} + :param s: 需要关联的字符串 + :return: 返回关联后的字符串 + """ + if params is None or params == '': + return None + for key_value in self.relations.keys(): + params = params.replace('{' + key_value + '}', str(self.relations[key_value])) + return params + def randoms(self, value): + str = "ABCDEFGHIJKLMNOPQRSTabcdefglijklmnopqr1234567890" + params = ''.join(__import__('random').choice(str) for i in range(value)) + print(params) + # self.relations['randoms'] = params + # print(self.relations['randoms']) + return params + def randomChinese(self, value): + str = "中文测试四五六七八九十" + params = ''.join(__import__('random').choice(str) for i in range(value)) + print(params) + # self.relations['randomChinese'] = params + # print(self.relations['randomChinese']) + return params + def randomspecial(self, value): + str = "!@#$&*带中文字符" + params = ''.join(__import__('random').choice(str) for i in range(value)) + print(params) + # self.relations['randomspecial'] = params + # print(self.relations['randomspecial']) + return params + + def randomnumber(self): + return random.choice([1, 2,3,4,5,6,7,8,9,10,11,12]) + # def assertss(self,code,masg,printok): + # try: + # assert response["code"] == code + # assert response["msg"] == masg + # print(printok,"接口正常",response) + # except: + # print(printok,"接口异常",response) + def replace_plus_in_query(self,query_string): + # 使用urllib.parse.quote_plus编码查询字符串 + encoded_query = urllib.parse.quote_plus(query_string) + # 替换所有的'+'为其他字符,例如空格' '或者'-'等 + replaced_query = encoded_query.replace('+', ' ') # 例如,将'+'替换为空格 + return replaced_query + + def get_hmac_sha256(self,message, secret): + """ + 生成HMAC-SHA256签名 + :param message: 待加密的源字符串 + :param secret: 密钥 + :return: Base64编码后的签名 + """ + message = message.encode('utf-8') # 字符串转字节 + secret = secret.encode('utf-8') # 密钥转字节 + # HMAC-SHA256加密 + signature = hmac.new(secret, message, digestmod=hashlib.sha256).hexdigest() + # Base64编码并返回字符串 + return base64.b64encode(signature.encode()).decode() + + #get请求拼接sign,带token + def get_sign(self): + str_sign = ("GET" + "&" + self.relations['access_token'] + "IOS_1.0.30" + "994f9d2e0c5ad0f3" + "b6e91dfb8defa381cb1c3eda4e5235f0" + + "bbf91e0d-d397-4775-8296-25c99e5f7403" + timestamp + "&") + # print(str_sign) + return str_sign + def delete_sign(self,data): + + str_sign = "DELETE" + "&" + self.relations['access_token'] +"IOS_1.0.30" + "994f9d2e0c5ad0f3" + "b6e91dfb8defa381cb1c3eda4e5235f0" + "bbf91e0d-d397-4775-8296-25c99e5f7403" + timestamp + "&" + urllib.parse.urlencode( + sorted((data).items())).replace("+", " ") + return str_sign + + + def get_signs(self, data): + str_sign = "GET" + "&" + self.relations['access_token'] +"IOS_1.0.30" + "994f9d2e0c5ad0f3" + "b6e91dfb8defa381cb1c3eda4e5235f0" + "bbf91e0d-d397-4775-8296-25c99e5f7403" + timestamp + "&" + urllib.parse.urlencode( + sorted((data).items())).replace("+", " ") + + return str_sign + # def post_sign(self,data): + # str_sign = "POST" + "&" + self.relations['access_token'] +"IOS_1.0.30" + "994f9d2e0c5ad0f3" + "b6e91dfb8defa381cb1c3eda4e5235f0" + "bbf91e0d-d397-4775-8296-25c99e5f7403" + timestamp + "&" + urllib.parse.urlencode( + # sorted((data).items())).replace("+", " ") + # return str_sign + import urllib.parse + + def post_sign(self, data): + # 1. 构造基础签名部分(保持原样) + base_str = ( + "POST" + "&" + self.relations['access_token'] + + "IOS_1.0.30" + "994f9d2e0c5ad0f3" + + "b6e91dfb8defa381cb1c3eda4e5235f0" + + "bbf91e0d-d397-4775-8296-25c99e5f7403" + + str(timestamp) + "&" + ) + # 2. 对参数差异化编码 + encoded_params = [] + for key, value in sorted(data.items()): + # 关键点:对值进行定制化编码 + encoded_value = self._encode_special_chars(str(value)) + encoded_pair = f"{key}={encoded_value}" + encoded_params.append(encoded_pair) + + # 3. 拼接参数(不强制替换 +,除非服务端要求) + param_str = "&".join(encoded_params) + return base_str + param_str + + def _encode_special_chars(self, s): + """差异化编码特殊字符""" + # 定义不需要编码的字符(根据服务端要求调整) + SAFE_CHARS = "-_.~#@!$&*()" # 例如允许 #、!、$、& 不编码 + encoded = [] + for char in s: + if char.isalnum() or char in SAFE_CHARS: + encoded.append(char) + else: + encoded.append(urllib.parse.quote(char)) + return "".join(encoded) + + + def bind_mac_sign(self,data,value): + str_sign = value + "&" + "1" + "TEST_SL100_20240801" + "SL100" + "bbf91e0d-d397-4775-8296-25c99e5f7403" + timestamp + "TEST" + "&" + urllib.parse.urlencode( + sorted(data.items())).replace("+", " ").replace("%20", ":") + return str_sign + + + + + def get_params(self, data): + """对应Postman的GetParams()函数,处理GET参数或POST Body""" + if not data: + return "" + + # 对参数排序(对应sortedKeys) + sorted_items = sorted(data.items()) + + # 拼接key=value&格式(对应循环处理逻辑) + param_str = "" + for key, value in sorted_items: + # 对应escape()编码,Python用urllib.parse.quote + encoded_value = urllib.parse.quote(str(value), safe='').replace("+", "%20") + param_str += f"{key}={encoded_value}&" + + # 去除最后一个&(对应slice(0, -1)) + return param_str.rstrip('&') + + + def post_signsss(self,data):#data body + str_sign = "POST" + "&" + "IOS_1.0.30" + "994f9d2e0c5ad0f3" + "b6e91dfb8defa381cb1c3eda4e5235f0" + "bbf91e0d-d397-4775-8296-25c99e5f7403" + timestamp + "&" + urllib.parse.urlencode( + sorted(data.items())).replace("+", " ") + return str_sign + #设备 + def postdevice_sion(self,data): + str_sign = "POST" + "&" + "SL100" + "bbf91e0d-d397-4775-8296-25c99e5f7403" + timestamp + uuid + "&" + urllib.parse.urlencode( + sorted(data.items())).replace("+", " ").replace("%3A", ":") + return str_sign + + return str_sign + #带参数 + def getdevice_sion(self,data): + str_sign = "GET" + "&" + "SL100" + "bbf91e0d-d397-4775-8296-25c99e5f7403" + timestamp + uuid + "&" + urllib.parse.urlencode( + sorted(data.items())).replace("+", " ").replace("%3A", ":") + return str_sign + #不带参数 + def getdevice_sionNOdata(self): + str_sign = "GET" + "&" + "SL100" + "bbf91e0d-d397-4775-8296-25c99e5f7403" + timestamp + uuid + "&" + return str_sign + #post不带参数 + def postdevice_sionNOdata(self): + str_sign = "GET" + "&" + "SL100" + "bbf91e0d-d397-4775-8296-25c99e5f7403" + timestamp + uuid + "&" + return str_sign + '带uid uid放的顺序也很重要' '设备云存储' + def getdevice_sionuuid(self): + str_sign = "GET" + "&" + "SL100" + "bbf91e0d-d397-4775-8296-25c99e5f7403" + timestamp + "7e1558dff183cd08" + uuid +"&" + return str_sign + def postdevice_sionsuuid(self,data): + str_sign = "POST" + "&" + "SL100" + "bbf91e0d-d397-4775-8296-25c99e5f7403" + timestamp + "7e1558dff183cd08" + uuid + "&" + urllib.parse.urlencode( + sorted(data.items())).replace("+", " ").replace("%3A", ":") + return str_sign + + + + 'post带deta' '设备云存储' + def postdevice_sionuuid(self, data): + """优化签名生成,严格遵循编码规范""" + # 构建基础前缀(与GET完全一致) + "uuid替换了" + # prefix = "POST&SL100bbf91e0d-d397-4775-8296-25c99e5f7403{}{}576ad09e31e148f68180c985f5eea989&".format( + # timestamp, "7e1558dff183cd08" + # ) + prefix = "POST&SL100bbf91e0d-d397-4775-8296-25c99e5f7403{}{}{}&".format( + timestamp, "7e1558dff183cd08",uuid + ) + + # 处理参数:排序+编码(移除不必要的替换) + sorted_params = sorted(data.items()) + encoded_params = urllib.parse.urlencode(sorted_params) + + # 拼接完整签名 + str_sign = prefix + encoded_params + return str_sign + + def postdevice_sionuuids(self, data): + # 1. 将嵌套data转为JSON字符串(注意:服务端需支持此格式) + data_str = json.dumps(data, separators=(',', ':')) # 去除空格,压缩格式 + # 2. 对JSON字符串进行urlencode编码(而非直接对字典编码) + encoded_data = urllib.parse.quote(data_str, safe='') # safe='' 表示编码所有特殊字符 + # 3. 构造正确前缀(参考步骤1修复后的顺序) + prefix = "POST&SL100&bbf91e0d-d397-4775-8296-25c99e5f7403&{}&{}&{}&".format( + timestamp, "7e1558dff183cd08", uuid + ) + # 4. 拼接签名(前缀 + 编码后的参数) + str_sign = prefix + encoded_data + return str_sign + #云存获取设备 带了uid + def device_sions(self,data): + base_sign = ( + "POST" + "&" + + "SL100" + "&" + + "bbf91e0d-d397-4775-8296-25c99e5f7403" + "&" + + str(timestamp) + "&" + + uuid+ + # "576ad09e31e148f68180c985f5eea989" + "7e1558dff183cd08" + ) + + # 如果有参数则添加参数部分 + if data: + param_str = "&" + urllib.parse.urlencode(sorted(data.items())).replace("+", " ").replace("%3A", ":") + return base_sign + param_str + return base_sign + + + def webtoken(self): + login_Headers = { + 'phone_code': 'b6e91dfb8defa381cb1c3eda4e5235f0', + "access_token": self.relations['access_token'] + } + return login_Headers + def asserts(self,assertss,code,value,value1): + + + if assertss['code'] == code or assertss['msg'] == 'ok': + print(value,assertss) + + else: + print(value1,assertss) + + def assertsmsg(self,assertss,code,value,value1,msg): + + if assertss['code'] == code and assertss['msg'] == msg: + print(value,assertss) + + else: + print(value1,assertss) + + def gethttpheaders(self, value): + login_Headers = { + 'appid': '994f9d2e0c5ad0f3', + 'request_id': 'bbf91e0d-d397-4775-8296-25c99e5f7403', + 'phone_code': 'b6e91dfb8defa381cb1c3eda4e5235f0', + 'timestamp': timestamp, + 'app_version': 'IOS_1.0.30', + 'sign': self.get_hmac_sha256(value, "c5ca86260e08e561996e5960dc93d2f0"), + 'Content-Type': 'application/json', + "access_token": self.relations['access_token'] + } + return login_Headers + #设备 + def deviceheaders(self, value): + login_Headers = { + 'appid': '994f9d2e0c5ad0f3', + 'request_id': 'bbf91e0d-d397-4775-8296-25c99e5f7403', + 'timestamp': timestamp, + 'sign': self.get_hmac_sha256(value, "a94f6fe623ce98720825198e9b57c4d7"), + 'Content-Type': 'application/json', + 'uuid': uuid, + 'model': 'SL100', + 'uid' : '7e1558dff183cd08' + } + return login_Headers + def getdeviceheaders(self, value): + login_Headers = { + 'appid': '994f9d2e0c5ad0f3', + 'request_id': 'bbf91e0d-d397-4775-8296-25c99e5f7403', + 'timestamp': timestamp, + 'sign': self.get_hmac_sha256(value, "a94f6fe623ce98720825198e9b57c4d7"), + 'Content-Type': 'application/json', + 'uuid': uuid, + 'model': 'SL100' + } + return login_Headers + def bandmacheaders(self,value): + timestamp = str(int(time.time())) + login_Headers = { + 'factory_id':'TEST', + 'label':'TEST_SL100_20240801', + 'batch':"1", + 'model':'SL100', + 'timestamp': timestamp, + 'request_id': 'bbf91e0d-d397-4775-8296-25c99e5f7403', + 'sign': self.get_hmac_sha256(value,"rCeOzwisLFLasvlt"), + 'Content-Type' : 'application/json' + } + return login_Headers + +#delete + def DELTEThttpheaders(self, value): + login_Headers = { + 'appid': '994f9d2e0c5ad0f3', + 'request_id': 'bbf91e0d-d397-4775-8296-25c99e5f7403', + 'phone_code': 'b6e91dfb8defa381cb1c3eda4e5235f0', + 'timestamp': timestamp, + 'app_version': 'IOS_1.0.30', + 'sign': self.get_hmac_sha256(value, "c5ca86260e08e561996e5960dc93d2f0"), + "access_token": self.relations['access_token'] + } + return login_Headers + # http请求头 + def httpheaders(self,value): + login_Headers = { + 'appid': '994f9d2e0c5ad0f3', + 'request_id': 'bbf91e0d-d397-4775-8296-25c99e5f7403', + 'phone_code': 'b6e91dfb8defa381cb1c3eda4e5235f0', + 'timestamp': timestamp, + 'app_version': 'IOS_1.0.30', + 'sign': self.get_hmac_sha256(value, "c5ca86260e08e561996e5960dc93d2f0"), + 'Content-Type': 'application/json' + } + return login_Headers + #登录子账号(分享账号) + def uesrlog_share(self): + data = { + "username": "15814087116", + "type": "password", + "password": "10d50a1e43a718a9c31bc450a0985131", + "phone_brand": "XiaoMi 14", + "code": "767092" + } + # + str_sign = "POST" + "&" + "IOS_1.0.30" + "994f9d2e0c5ad0f3" + "b6e91dfb8defa381cb1c3eda4e5235f0" + "bbf91e0d-d397-4775-8296-25c99e5f7403" + timestamp + "&" + urllib.parse.urlencode( + sorted(data.items())).replace("+", " ") + # self.post_sign(data) + # print(str_sign) + # 带密钥KEY 通过sha256加密之后base64 + login_Headers = { + 'appid': '994f9d2e0c5ad0f3', + 'request_id': 'bbf91e0d-d397-4775-8296-25c99e5f7403', + 'phone_code': 'b6e91dfb8defa381cb1c3eda4e5235f0', + 'timestamp': timestamp, + 'app_version': 'IOS_1.0.30', + 'sign': self.get_hmac_sha256(str_sign, "c5ca86260e08e561996e5960dc93d2f0"), + 'Content-Type': 'application/json' + } + # print(login_Headers) + response = requests.post(url=self.login_url, headers=login_Headers, data=json.dumps(data)).json() + # print("用户登录",response) + access_token = response["data"]["access_token"] + self.relations['access_token'] = access_token + # print('------------------------------',self.relations) + try: + assert response["code"] == 1000 + assert response["msg"] == "ok" + # print("用户登陆成功",response) + except Exception as e: + pass + # print("用户登录失败",response) + return access_token + #登录获取token + def uesrlog(self): + data = { + "username": "18875374973", + "type": "password", + "password": "10d50a1e43a718a9c31bc450a0985131", + "phone_brand": "XiaoMi 14", + "code": "767092" + } + # + str_sign = "POST" + "&" + "IOS_1.0.30" + "994f9d2e0c5ad0f3" + "b6e91dfb8defa381cb1c3eda4e5235f0" + "bbf91e0d-d397-4775-8296-25c99e5f7403" + timestamp + "&" + urllib.parse.urlencode( + sorted(data.items())).replace("+", " ") + # self.post_sign(data) + # print(str_sign) + # 带密钥KEY 通过sha256加密之后base64 + login_Headers = { + 'appid': '994f9d2e0c5ad0f3', + 'request_id': 'bbf91e0d-d397-4775-8296-25c99e5f7403', + 'phone_code': 'b6e91dfb8defa381cb1c3eda4e5235f0', + 'timestamp': timestamp, + 'app_version': 'IOS_1.0.30', + 'sign': self.get_hmac_sha256(str_sign, "c5ca86260e08e561996e5960dc93d2f0"), + 'Content-Type': 'application/json' + } + # print(login_Headers) + response = requests.post(url=self.login_url, headers=login_Headers, data=json.dumps(data)).json() + # print("用户登录",response) + access_token = response["data"]["access_token"] + self.relations['access_token'] = access_token + # print('------------------------------',self.relations) + try: + assert response["code"] == 1000 + assert response["msg"] == "ok" + print("用户登陆成功",response) + except Exception as e: + print("用户登录失败",response) + return access_token + def uesrlogerror(self): + '未注册账号登录' + data = { + "username": "18875374974", + "type": "password", + "password": "10d50a1e43a718a9c31bc450a0985131", + "phone_brand": "XiaoMi 14", + "code": "767092" + } + response = requests.post(url=self.login_url,headers=self.httpheaders(self.post_signsss(data)),data=json.dumps(data)).json() + asserts = response + # if asserts["code"] == 1100 and asserts["msg"] == "invalid username or password": + # print("未注册账号登录失败,接口正常",response) + # else: + # print("未注册账号登录失败,接口正常",response) + self.asserts(asserts,1000,"未注册账号登录成功,接口异常","未注册账号登录失败,接口正常") +# try: +# assert response["code"] == 1100 +# assert response["msg"] == "ok" +# print("未注册账号登录成功",response) +# except Exception as e: +# print("未注册账号登录失败",response) +# # + def getlog(self): + '获取用户信息' + str_sign = ("GET" + "&" + self.relations['access_token'] + "IOS_1.0.30" + "994f9d2e0c5ad0f3" + "b6e91dfb8defa381cb1c3eda4e5235f0" + + "bbf91e0d-d397-4775-8296-25c99e5f7403" + timestamp + "&") + # print("第二个",str_sign) + login_Headers = { + 'appid': '994f9d2e0c5ad0f3', + 'request_id': 'bbf91e0d-d397-4775-8296-25c99e5f7403', + 'phone_code': 'b6e91dfb8defa381cb1c3eda4e5235f0', + 'timestamp': timestamp, + 'app_version': 'IOS_1.0.30', + 'sign': self.get_hmac_sha256(str_sign, "c5ca86260e08e561996e5960dc93d2f0"), + 'Content-Type': 'application/json', + "access_token": self.relations['access_token'] + } + response = requests.get(url=self.getusers_url, headers=login_Headers) + print("获取用户信息",response.text) + + + + #错误密码 + def pwdsuser(self,value): + '错误密码' + data = { + "username": "18875374973", + "type": "password", + "password": "10d50a1e43a718a9c31bc450a0985132", + "phone_brand": "XiaoMi 14", + "code": "767092" + } + + str_sign = "POST" + "&" + "IOS_1.0.30" + "994f9d2e0c5ad0f3" + "b6e91dfb8defa381cb1c3eda4e5235f0" + "bbf91e0d-d397-4775-8296-25c99e5f7403" + timestamp + "&" + urllib.parse.urlencode( + sorted(data.items())).replace("+", " ") + # print(str_sign) + # 带密钥KEY 通过sha256加密之后base64 + login_Headers = { + 'appid': '994f9d2e0c5ad0f3', + 'request_id': 'bbf91e0d-d397-4775-8296-25c99e5f7403', + 'phone_code': 'b6e91dfb8defa381cb1c3eda4e5235f0', + 'timestamp': timestamp, + 'app_version': 'IOS_1.0.30', + 'sign': self.get_hmac_sha256(str_sign, "c5ca86260e08e561996e5960dc93d2f0"), + 'Content-Type': 'application/json' + } + # print(login_Headers) + response = requests.post(self.login_url, headers=login_Headers, data=json.dumps(data)).json() + # print("错误密码",response) + try: + assert response["code"] == 1100 + # assert response["msg"] == "ok" + print("错误密码登录失败接口返回正常",response) + except: + print("错误密码登录成功返回异常",response) + # + def resetSend(self): + '发送忘记密码验证码' + data = { + "username": "18875374973", + "type": "password", + "password": "10d50a1e43a718a9c31bc450a0985131", + } + str_sign = "POST" + "&" + "IOS_1.0.30" + "994f9d2e0c5ad0f3" + "b6e91dfb8defa381cb1c3eda4e5235f0" + "bbf91e0d-d397-4775-8296-25c99e5f7403" + timestamp + "&" + urllib.parse.urlencode( + sorted(data.items())).replace("+", " ") + response = requests.post(self.resetSend_url, headers=self.httpheaders(str_sign), data=json.dumps(data)).json() + print("发送忘记密码验证码",response) + + + + # #忘记密码 + # def resetpwd(self): + # data = { + # "username": "18875374973", + # "code":"", + # "password": "10d50a1e43a718a9c31bc450a0985131", + # } + # + # str_sign = "POST" + "&" + "IOS_1.0.30" + "994f9d2e0c5ad0f3" + "b6e91dfb8defa381cb1c3eda4e5235f0" + "bbf91e0d-d397-4775-8296-25c99e5f7403" + timestamp + "&" + urllib.parse.urlencode( + # sorted(data.items())).replace("+", " ") + # str_sign = "GET" + "&" + self.relations[ + # 'access_token'] + "IOS_1.0.30" + "994f9d2e0c5ad0f3" + "b6e91dfb8defa381cb1c3eda4e5235f0" + "bbf91e0d-d397-4775-8296-25c99e5f7403" + timestamp + "&" + # response = requests.post() + #获取用户头像上传地址 + def putAvatar(self): + #调用get拼接sign 然后headers传入get_sign + # self.get_sign() + response = requests.get(url=self.putAvatar_url,headers=self.gethttpheaders(self.get_sign())) + print('获取用户头像地址',response.json()) + + + def UpdatePwd(self): + '修改密码' + data={ + "new_password":"10d50a1e43a718a9c31bc450a0985131" + } + # str_sign = "POST" + "&" + self.relations['access_token'] +"IOS_1.0.30" + "994f9d2e0c5ad0f3" + "b6e91dfb8defa381cb1c3eda4e5235f0" + "bbf91e0d-d397-4775-8296-25c99e5f7403" + timestamp + "&" + urllib.parse.urlencode( + # sorted(data.items())).replace("+", " ") + response = requests.post(url=self.updatePwd_url, headers=self.gethttpheaders(self.post_sign(data)), data=json.dumps(data)).json() + # print("修改密码",response) + try: + assert response["code"] == 1000 + assert response["msg"] == "ok" + print("修改密码接口正常",response) + except: + print("修改密码接口异常",response) + + # print("头文件",self.gethttpheaders(self.get_sign())) + #修改昵称-------------- + def updateInfo(self): + data={ + "nickname":"123" + } + response = requests.post(url=self.updateInfo_url, headers=self.gethttpheaders(self.post_sign(data)), data=json.dumps(data)).json() + # print("修改昵称",response) + # self.assertss(1000,"ok","修改昵称") + try: + assert response["code"] == 1000 + assert response["msg"] == "ok" + print("修改昵称接口正常",response) + except: + print("修改昵称接口异常",response) + # assert response.json.post('code')==1000 + def updateInfomax(self): + '''修改昵称--30字符''' + data={ + "nickname":self.randoms(30) + } + response = requests.post(url=self.updateInfo_url, headers=self.gethttpheaders(self.post_sign(data)), data=json.dumps(data)).json() + asserts = response + self.asserts(asserts,5000,"修改昵称--30字符接口正常","修改昵称--30字符接口异常") + def updateInfomin(self): + '''修改昵称--1字符''' + data={ + "nickname":self.randoms(1) + } + response = requests.post(url=self.updateInfo_url, headers=self.gethttpheaders(self.post_sign(data)), data=json.dumps(data)).json() + asserts = response + self.asserts(asserts,1000,"修改昵称--1字符接口正常","修改昵称--1字符接口异常") + def updateInfoSpecial_symbols(self): + '''修改昵称--特殊字符''' + data={ + "nickname":self.randomspecial(5) + } + response = requests.post(url=self.updateInfo_url, headers=self.gethttpheaders(self.post_sign(data)), data=json.dumps(data)).json() + asserts = response + self.asserts(asserts,1000,"修改昵称--特殊符号接口正常","修改昵称--特殊符号接口异常") + def updateInfoSpecial_Chinese(self): + '''修改昵称--特殊字符''' + data={ + "nickname":self.randomChinese(5) + } + response = requests.post(url=self.updateInfo_url, headers=self.gethttpheaders(self.post_sign(data)), data=json.dumps(data)).json() + asserts = response + self.asserts(asserts,1000,"修改昵称--中文接口正常","修改昵称--中文接口异常") + + + def putClient(self): + '上报登录设备信息' + data = { + "push_type":2, + "push_token":"cbc16bcc7a47203727415cb2ca339147", + "zone":"-8.00" + } + response = requests.post(url=self.putClient_url,headers=self.gethttpheaders(self.post_sign(data)),data=json.dumps(data)).json() + + # print('',response) + try: + assert response["code"] == 1000 + assert response["msg"] == "ok" + print("上报设备登录信息接口返回正常",response) + except Exception as e: + print("上报设备登录信息接口返回异常",response) +#-------------------------------------------------------------------设备绑定------------------------------------------------------------------- + def bind(self,code,value,value1,data,msg): + '设备绑定' + data = { + "uid": "7e1558dff183cd08",#主账号 + # "uid": "d500a9738331eeb9", #295账号 + # "mac": "b8:a9:af:01:74:27", + "mac": data, + "zone": "8.00", + "version": "1.00.43" + } + str_sign = "POST" + "&" + "SL100" + "bbf91e0d-d397-4775-8296-25c99e5f7403" + timestamp + "44c12d6471794f6fb712b7ceee128e83" + "&" + urllib.parse.urlencode( + sorted(data.items())).replace("+", " ").replace("%3A", ":") + login_Headers = { + 'appid': '994f9d2e0c5ad0f3', + 'request_id': 'bbf91e0d-d397-4775-8296-25c99e5f7403', + 'timestamp': timestamp, + 'sign': self.get_hmac_sha256(str_sign, "a94f6fe623ce98720825198e9b57c4d7"), + 'Content-Type': 'application/json', + # 'uuid': 'f3d7ad96ed424b15a5c27e7cdb1015fd', + 'uuid':'44c12d6471794f6fb712b7ceee128e83', + 'model': 'SL100' + } + response = requests.post(url=self.bind_url,headers=login_Headers,data=json.dumps(data)).json() + # print('绑定',response) + asserts = response + self.assertsmsg(asserts,code,value,value1,msg) + return asserts + + def deletedevices(self): + '删除设备' + params = { + "uuid": "f3d7ad96ed424b15a5c27e7cdb1015fd" + # "uuid": "576ad09e31e148f68180c985f5eea989" + } + response = requests.delete(url=self.deletet_url, headers=self.DELTEThttpheaders(self.delete_sign(params)), + params=params) + asserts = response.json() + self.asserts(asserts,1000,"设备删除成功,删除接口正常","设备删除失败,删除接口异常") + def upgradedVersion_Front(self): + '''获取前板设备可升级版本''' + + params = { + "uuid":"f3d7ad96ed424b15a5c27e7cdb1015fd", + + "flag": "Front" + } + response =requests.get(url=self.upgradedVersion_url,headers=self.gethttpheaders(self.get_signs(params)),params=params) + # print(response.json()) + asserts = response.json() + self.asserts(asserts,1000,"获取前板设备可升级版本接口正常","获取前板设备可升级版本接口异常") + + def upgradedVersion_Rear(self): + '''获取后板设备可升级版本''' + + params = { + "uuid": "f3d7ad96ed424b15a5c27e7cdb1015fd", + "flag": "Rear" + } + response = requests.get(url=self.upgradedVersion_url, headers=self.gethttpheaders(self.get_signs(params)), + params=params) + # print(response.json()) + asserts = response.json() + self.asserts(asserts, 1000, "获取后板设备可升级版本接口正常", "获取后板设备可升级版本接口异常") + + def models(self): + '''获取所有设备型号列表''' + response = requests.get(url=self.models_url,headers=self.gethttpheaders(self.get_sign())) + asserts = response.json() + self.asserts(asserts,1000,"<获取所有设备型号列表接口正常>","获取所有设备型号列表接口异常") + + def newList(self): + '''获取五分钟内绑定的设备''' + response = requests.get(url=self.newList_url,headers=self.gethttpheaders(self.get_sign())) + asserts = response.json() + self.asserts(asserts,1000,"<<获取五分钟内绑定的设备接口正常>>","<获取五分钟内绑定的设备接口异常>") + def UpdateName(self,randoms): + '''修改设备名称----20字符''' + + data = { + "uuid":"f3d7ad96ed424b15a5c27e7cdb1015fd", + 'name':randoms + } + response = requests.post(url=self.UpdateName_url,headers=self.gethttpheaders(self.post_sign(data)),data=json.dumps(data)).json() + asserts = response + self.asserts(asserts,1000,"<修改设备名称-输入个字符接口正常>","<修改设备名称-输入20个字符接口异常>") + + def UpdateName_max(self,randoms): + '''修改设备名称--21字符''' + + data = { + "uuid": "f3d7ad96ed424b15a5c27e7cdb1015fd", + 'name': randoms + } + response = requests.post(url=self.UpdateName_url, headers=self.gethttpheaders(self.post_sign(data)), + data=json.dumps(data)).json() + + try : + assert response["code"] == 5000 + assert response["msg"] == "system error, please contact the administrator" + print("<修改设备名称-输入21个字符接口正常>",response) + except Exception as e: + print("<修改设备名称-输入21个字符接口异常>",response) + + def UpdateName_min(self,randoms): + '''修改设备名称--1字符''' + + data = { + "uuid": "f3d7ad96ed424b15a5c27e7cdb1015fd", + 'name': randoms + } + response = requests.post(url=self.UpdateName_url, headers=self.gethttpheaders(self.post_sign(data)), + data=json.dumps(data)).json() + asserts = response + self.asserts(asserts, 1000, "<修改设备名称-输入1个字符接口正常>", "<修改设备名称-输入1个字符接口异常>") + def UpdateName_special(self,randoms): + '''修改设备名称--特殊字符''' + + data = { + "uuid": "f3d7ad96ed424b15a5c27e7cdb1015fd", + 'name': randoms + } + response = requests.post(url=self.UpdateName_url, headers=self.gethttpheaders(self.post_sign(data)), + data=json.dumps(data)).json() + asserts = response + self.asserts(asserts, 1000, "<修改设备名称-输入特殊字符接口正常>", "<修改设备名称-输入特殊字符接口异常>") + def UpdateName_Chinese(self,randoms): + '''修改设备名称--特殊字符''' + + data = { + "uuid": "f3d7ad96ed424b15a5c27e7cdb1015fd", + 'name': randoms + } + response = requests.post(url=self.UpdateName_url, headers=self.gethttpheaders(self.post_sign(data)), + data=json.dumps(data)).json() + asserts = response + self.asserts(asserts, 1000, "<修改设备名称-输入中文接口正常>", "<修改设备名称-输入中文接口异常>") +#-------------------------------------------------------------获取事件部分------------------------------------------------------------- + def evenlist(self): + params = { + "uuid": uuid, + "start_time": int(time.time()), + "date": datetime.now().strftime("%Y-%m-%d"), # 已实现实时日期 + "type": self.randomnumber(),#随机获取1~12种tpye类型的事件消息列表 + "rows" : 30,#最大条数 + } + print(f"打印type事件类型{params}") + # 发起请求并获取响应 + response = requests.get( + url=self.evenlist_url, + headers=self.gethttpheaders(self.get_signs(params)), + params=params + ) + asserts = response.json() + self.asserts(asserts, 1000, "<<获取事件列表接口正常>>", "<获取事件列表接口异常>") + + # ---------------- 新增:提取第一个事件的id并返回 ---------------- + first_event_id = None # 初始化默认值,避免无数据时报错 + # 检查响应数据结构是否正确(防止接口返回空列表或格式异常) + if asserts.get("code") == 1000 and asserts.get("data") and asserts["data"].get("list"): + event_list = asserts["data"]["list"] + if len(event_list) > 0: # 确保列表非空 + first_event_id = event_list[0]["id"] # 取第一个事件的id + print(f"从evenlist提取到第一个事件id:{first_event_id}") + else: + print("evenlist接口返回无事件数据,无法提取msg_id") + + return first_event_id # 返回提取到的id(无数据时返回None) + + def eventread(self): + data = { + "msg_id" : "685bdb011c9c06e0e8a43066", + "uuid" : uuid + } + response = requests.post(url=self.eventread_url,headers=self.gethttpheaders(self.post_sign(data)),data=json.dumps(data)).json() + asserts = response + self.asserts(asserts,1000,"<<<修改事件消息的读取状态接口正常>>>","<<修改事件消息的读取状态接口异常>>") + + def evendel(self): + # ---------------- 关键:调用evenlist获取第一个事件的id ---------------- + msg_id = self.evenlist() # 接收evenlist返回的第一个事件id + if not msg_id: # 若未提取到id(如无事件数据),直接提示并退出,避免无效请求 + print("未获取到有效事件id,无法执行删除操作") + return + # 使用提取到的msg_id构造参数 + params = { + "msg_id": msg_id, # 替换固定值,使用实时提取的id + "uuid": uuid + } + # 发起删除请求 + response = requests.delete(url=self.evendelete_url,headers=self.DELTEThttpheaders(self.delete_sign(params)),params=params + ) + asserts = response.json() + self.asserts(asserts, 1000, "<<<删除事件消息接口正常>>>", "<<删除事件消息接口异常>>") + def eventunreadNum(self): + params = { + "uuid" : uuid + } + response = requests.get(url=self.eventunreadNum_url,headers=self.gethttpheaders(self.get_signs(params)),params=params) + asserts = response.json() + self.asserts(asserts,1000,"<<<获取事件未读数量接口正常>>>","<<获取事件未读数量接口异常>>") +#------------------------------------------------公共接口--------------------------------------- + def times(self): + response = requests.get(url=self.time_url) + asserts = response.json() + self.asserts(asserts,1000,"<<<获取服务器当前时间戳接口正常>>>","<<获取服务器当前时间戳接口异常>>") + def getUrl(self): + headers = { + 'Content-Type': 'application/json', + "appid" : "994f9d2e0c5ad0f3" + } + params = { + "uid" : "7e1558dff183cd08" + } + response = requests.get(url=self.getUrl_url,headers=headers,params=params) + asserts = response.json() + self.asserts(asserts,1000,"<<<<获取服务器地址接口正常>>>>","<<<获取服务器地址接口异常>>>") + + # -------------------------------------------------------------------------------------设备家庭------------------------------------------------------------------------ + def homeCreate(self,randoms): + + data={ + "name" :randoms + } + response = requests.post(url=self.homeCreate_url,headers=self.gethttpheaders(self.post_sign(data)),data=json.dumps(data)).json() + asserts =response + self.asserts(asserts,1000,"<<<<创建家庭接口正常>>>>","<<<创建家庭接口异常>>>") + def homeCreatepecial(self,randoms): + + data={ + "name" :randoms + } + response = requests.post(url=self.homeCreate_url,headers=self.gethttpheaders(self.post_sign(data)),data=json.dumps(data)).json() + asserts =response + self.asserts(asserts,1000,"<<<<创建家庭-特殊字符接口正常>>>>","<<<创建家庭-特殊字符接口异常>>>") + def homeCreateChinese(self,randoms): + + data={ + "name" :randoms + } + response = requests.post(url=self.homeCreate_url,headers=self.gethttpheaders(self.post_sign(data)),data=json.dumps(data)).json() + asserts =response + self.asserts(asserts,1000,"<<<<创建家庭-中文字符接口正常>>>>","<<<创建家庭-中文字符接口异常>>>") + def homeCreateMax(self,randoms): + + data={ + "name" :randoms + } + response = requests.post(url=self.homeCreate_url,headers=self.gethttpheaders(self.post_sign(data)),data=json.dumps(data)).json() + asserts =response + self.asserts(asserts,1000,"<<<<创建家庭-21字符接口正常>>>>","<<<创建家庭-21字符接口异常>>>") + + def homeCreateMin(self, randoms): + + data = { + "name": randoms + } + response = requests.post(url=self.homeCreate_url, headers=self.gethttpheaders(self.post_sign(data)), + data=json.dumps(data)).json() + asserts = response + self.asserts(asserts, 1000, "<<<<创建家庭-1字符接口正常>>>>", "<<<创建家庭-1字符接口异常>>>") + + + def homeslist(self): + response = requests.get(url=self.homeslist_url, headers=self.gethttpheaders(self.get_sign())) + asserts = response.json() + # 校验接口返回状态 + self.asserts(asserts, 1000, "<<<获取家庭列表接口正常>>>", "<<<获取家庭列表接口异常>>>") + # 提取最后一个id + if "data" in asserts and len(asserts["data"]) > 0: + last_id = asserts["data"][-1]["id"] + print("最后一个家庭的id:", last_id) + + self.relations['home_id'] = last_id + return last_id # 返回id供其他方法使用 + else: + print("家庭列表为空或数据格式错误") + return None + + def homeDelete(self): + time.sleep(1) + self.homeslist() + # home_id = self.homeslist() + params = { + "home_id":self.relations["home_id"] + } + # print("这是家庭id"+self.relations["home_id"],params) + response = requests.delete(url=self.homeDelete_url,headers=self.gethttpheaders(self.delete_sign(params)),params=params) + asserts = response.json() + self.asserts(asserts, 1000, "<<<<<<<删除家庭接口正常>>>>>>>","<<<<<删除家庭接口异常>>>>>") + + def homeUpdate(self,randoms): + data = { + "home_id" : "685d3d5c8e2acc38e53eb38a", + "name" : randoms, + "location": "深圳" + + } + response = requests.post(url=self.homeUpdate_url,headers=self.gethttpheaders(self.post_sign(data)),data=json.dumps(data)).json() + asserts =response + self.asserts(asserts,1000,"<<<<<修改家庭名称或地址接口正常>>>>>","<<<<修改家庭名称或地址接口异常>>>>") + def homeUpdatepecial(self,randoms): + data = { + "home_id" : "685d3d5c8e2acc38e53eb38a", + "name" : randoms, + "location": "深圳" + + } + response = requests.post(url=self.homeUpdate_url,headers=self.gethttpheaders(self.post_sign(data)),data=json.dumps(data)).json() + asserts =response + self.asserts(asserts,1000,"<<<<<修改家庭名称-特殊字符接口正常>>>>>","<<<<修改家庭名称-特殊字符接口异常>>>>") + def homeUpdateMax(self,randoms): + data = { + "home_id" : "685d3d5c8e2acc38e53eb38a", + "name" : randoms, + "location": "深圳" + + } + response = requests.post(url=self.homeUpdate_url,headers=self.gethttpheaders(self.post_sign(data)),data=json.dumps(data)).json() + asserts =response + self.asserts(asserts,1000,"<<<<<修改家庭名称-30字符接口正常>>>>>","<<<<修改家庭名称-30字符接口异常>>>>") + + def homeChange(self): + data = { + "home_id" : "685d3d5c8e2acc38e53eb38a", + "uuid" : uuid + } + response = requests.post(url=self.homeChange_url,headers=self.gethttpheaders(self.post_sign(data)),data=json.dumps(data)).json() + asserts =response + self.asserts(asserts,1000,"<<<<<更换设备所属家庭(最后一个家庭)接口正常>>>>>","<<<<更换设备所属家庭(最后一个家庭)接口异常>>>>") + + def homeChangenoe(self): + data = { + "home_id": "6865f9dd8e2acc38e53eb69f", + "uuid": uuid + } + response = requests.post(url=self.homeChange_url, headers=self.gethttpheaders(self.post_sign(data)), + data=json.dumps(data)).json() + asserts = response + self.asserts(asserts, 1000, "<<<<<更换设备所属家庭(第一个家庭)接口正常>>>>>", "<<<<更换设备所属家庭(第一个家庭)接口异常>>>>") + def homeDevices(self): + params = { + "home_id": "685d3d5c8e2acc38e53eb38a" + } + response = requests.get(url=self.homeDevices_url, headers=self.gethttpheaders(self.get_signs(params)),params=params) + asserts = response.json() + self.asserts(asserts,1000,"<<<<<获取家庭下的设备列表接口正常>>>>>","<<<<获取家庭下的设备列表接口异常>>>>>") + def homeUsers(self): + params = { + "home_id" : "685d3d5c8e2acc38e53eb38a" + } + response = requests.get(url=self.homeUsers_url, headers=self.gethttpheaders(self.get_signs(params)),params=params) + asserts = response.json() + self.asserts(asserts,1000,"<<<<<<获取家庭下的用户接口正常>>>>>>","<<<<<获取家庭下的用户接口异常>>>>>") + def homeShare(self): + data = { + "home_id" :"685d3d5c8e2acc38e53eb38a", + "username" : "15814087116" + } + response = requests.post(url=self.homeShare_url,headers=self.gethttpheaders(self.post_sign(data)),data=json.dumps(data)).json() + asserts = response + self.asserts(asserts,1000,"<<<<<<分享家庭给用户接口正常>>>>>>>>","<<<<<分享家庭给用户接口异常>>>>>") + # 登录被分享账号去获取消息ID传入用户反馈结果 + + #分享后需要去获取消息列表 + def messagelist(self): + #登录被分享账号去获取消息列表 + time.sleep(1) + self.uesrlog_share() + response = requests.get(url=self.messagelist_url,headers=self.gethttpheaders(self.get_sign())) + asserts = response.json() + self.asserts(asserts,1000,"<<<<<<<获取消息列表正常>>>>>>>","<<<<<获取消息列表异常>>>>>") + # 提取消息列表中的第一个id + if asserts.get('code') == 1000 and asserts.get('data') and asserts['data'].get('list'): + message_list = asserts['data']['list'] + if message_list: + return message_list[0].get('id') # 返回第一个消息的id + return None # 没有找到消息时返回None + def messagelisttype(self): + #登录被分享账号去获取消息列表 + time.sleep(1) + self.uesrlog_share() + response = requests.get(url=self.messagelist_url,headers=self.gethttpheaders(self.get_sign())) + asserts = response.json() + self.asserts(asserts,1000,"<<<<<<<获取消息列表正常>>>>>>>","<<<<<获取消息列表异常>>>>>") + # 提取type等于4的消息的id + if asserts.get('code') == 1000 and asserts.get('data') and asserts['data'].get('list'): + message_list = asserts['data']['list'] + # 筛选出type等于4的消息 + type_4_messages = [msg for msg in message_list if msg.get('type') == 4] + if type_4_messages: + return type_4_messages[0].get('id') # 返回第一个type等于4的消息的id + else: + print("没有找到type等于4的消息") + return None # 没有找到消息或type等于4的消息时返回None + + + def messagelists(self):#用来调用下面删除消息的 + response = requests.get(url=self.messagelist_url,headers=self.gethttpheaders(self.get_sign())) + asserts = response.json() + # self.asserts(asserts,1000,"<<<<<<<获取消息列表正常>>>>>>>","<<<<<获取消息列表异常>>>>>") + # 提取消息列表中的第一个id + if asserts.get('code') == 1000 and asserts.get('data') and asserts['data'].get('list'): + message_list = asserts['data']['list'] + if message_list: + return message_list[0].get('id') # 返回第一个消息的id + return None # 没有找到消息时返回None + def homeShareFeedback(self,number): + #获取msg_id + msg_id = self.messagelist() + if not msg_id: + print("获取消息id失败,无法执行反馈操作") + return + data = { + "msg_id" : msg_id, + "accept" : number#同意 1同意 2拒绝 + } + # print(msg_id,"查看") + response = requests.post(url=self.homeShareFeedback_url,headers=self.gethttpheaders(self.post_sign(data)),data=json.dumps(data)).json() + asserts = response + self.asserts(asserts,1000,"<<<<<<<用户反馈分享结果接口(同意加入家庭)正常>>>>>>>","<<<<<用户反馈分享结果接口(同意加入家庭)异常>>>>>") + def homeShareRemove(self): + time.sleep(1) + self.uesrlog()#登录主账号去删除分享 + data = { + "home_id" : "685d3d5c8e2acc38e53eb38a", + "uid" : "f032bda4b2fbcced" + } + response = requests.post(url=self.homeShareRemove_url,headers=self.gethttpheaders(self.post_sign(data)),data=json.dumps(data)).json() + asserts = response + self.asserts(asserts,1000,"<<<<<<<删除分享者接口正常>>>>>>>","<<<<<删除分享者接口异常>>>>>") +#------------------------------------------------------------- + def homeShareFeedbackNo(self,value): + #分享家庭给用户 + self.homeShare() + #获取msg_id + msg_id = self.messagelist() + if not msg_id: + print("获取消息id失败,无法执行反馈操作") + return + data = { + "msg_id": msg_id, + "accept": value # 拒绝 + } + # print(msg_id,"查看") + response = requests.post(url=self.homeShareFeedback_url, headers=self.gethttpheaders(self.post_sign(data)), + data=json.dumps(data)).json() + asserts = response + self.asserts(asserts, 1000, "<<<<<<<用户反馈分享结果接口(拒绝加入家庭)正常>>>>>>>","<<<<<用户反馈分享结果接口(拒绝加入家庭)异常>>>>>") + + def homeAddDevice(self,value): + time.sleep(1) + self.uesrlog()#再登录原账户进行后续的接口调用 + data = { + "home_id" : value, + "uuid" : uuid + } + # 683ffa2f77375f5bb1a44521 + response = requests.post(url=self.homeAddDevice_url,headers=self.gethttpheaders(self.post_sign(data)),data=json.dumps(data)).json() + asserts = response + self.asserts(asserts, 1000, "<<<<<<<家庭内添加设备接口正常>>>>>>>","<<<<<家庭内添加设备接口异常>>>>>") + + #''''''''''''''''''''''''''''''云存储''''''''''''''''''''''''''''''''''''''''''''''''''''' + def cloudToken(self): + params = { + "uuid" : uuid + } + response = requests.get(url=self.cloudToken_url,headers=self.gethttpheaders(self.get_signs(params)),params=params) + asserts = response.json() + self.asserts(asserts, 1000, "<<<<<<<>>>>>>>","<<<<<>>>>>") + def cloudFiles(self): + params = { + "date" : 20250625, + "uuid": uuid + } + response = requests.get(url=self.cloudFiles_url,headers=self.gethttpheaders(self.get_signs(params)),params=params) + asserts = response.json() + self.asserts(asserts, 1000, "<<<<<<<>>>>>>>","<<<<<>>>>>") + + def cloudputToken(self): + response = requests.get(url=self.cloudputToken_url,headers=self.deviceheaders(self.getdevice_sionuuid())) + asserts = response.json() + self.asserts(asserts, 1000, "<<<<<<<<设备获取文件上报token接口正常>>>>>>>>","<<<<<<设备获取文件上报token接口异常>>>>>>") + + def cloudputFiles(self): + current_time = datetime.now() + formatted_time = current_time.strftime("%Y%m%d") + data = { + "date" : formatted_time, + "suffix" : "h264", + "zone" : "8.00", + "store" : 7, + "event_tag":"p_s", + "files": "0_7:1_-1" + } + response = requests.post(url=self.cloudputFile_url,headers=self.deviceheaders(self.postdevice_sionuuid(data)),data=json.dumps(data)).json() + asserts = response + self.asserts(asserts, 1000, "<<<<<<<<获取设备上云存信息接口正常>>>>>>>>","<<<<<<获取设备上云存信息接口异常>>>>>>") + + +#-----------------------------------------------------------------消息部分------------------------------------------- + def messageread(self): + data ={ + "message_id" : "" + } + response =requests.post(url=self.messageread_url,headers=self.gethttpheaders(self.post_sign(data)),data=json.dumps(data)).json() + asserts = response + self.asserts(asserts, 1000, "<<<<<<<<修改消息读取状态接口正常>>>>>>>>","<<<<<<修改消息读取状态接口异常>>>>>>") + def messageunreaNum(self): + response = requests.get(url=self.messageunreaNum_url,headers=self.gethttpheaders(self.get_sign())) + asserts = response.json() + self.asserts(asserts,1000,"<<<<<<<<获取消息未读数量接口正常>>>>>>>>","<<<<<<修改消息读取状态接口异常>>>>>>") + def messagedelete(self): + msg_id = self.messagelists() + print(msg_id) + params ={ + "message_id" : msg_id, + } + response = requests.delete(url=self.messagedelete_url, headers=self.DELTEThttpheaders(self.delete_sign(params)),params=params) + asserts = response.json() + self.asserts(asserts,1000,"<<<<<删除消息接口正常>>>>>","<<<<<删除消息接口异常>>>>>") + + + # -----------------------------------------------------------------设备分享------------------------------------------- + def deviceshare(self,value): + time.sleep(1) + self.uesrlog() + data = { + "Username" : value, + "uuid" :uuid + } + response =requests.post(url=self.deviceshare_url,headers=self.gethttpheaders(self.post_sign(data)),data=json.dumps(data)).json() + asserts = response + self.asserts(asserts, 1000, "<<<<<设备分享接口正常>>>>>", "<<<<<设备分享接口异常>>>>>") + def deviceshareUser(self,value): + # self.uesrlog() + data = { + "Username" : value, + "uuid" :'f3d7ad96ed424b15a5c27e7cdb1015fd' + } + response =requests.post(url=self.deviceshare_url,headers=self.gethttpheaders(self.post_sign(data)),data=json.dumps(data)).json() + asserts = response + try: + assert response["code"] == 1103 + assert response["msg"] == 'username does not exist' + print("设备分享(不存在的用户)分享失败-接口正常",response) + except: + print("设备分享(不存在的用户)分享成功-接口异常",response) + + def deviceshareDevice(self, value): + # self.uesrlog() + data = { + "Username": '15814087116', + "uuid": value + } + response = requests.post(url=self.deviceshare_url, headers=self.gethttpheaders(self.post_sign(data)), + data=json.dumps(data)).json() + asserts = response + try: + assert response["code"] == 1302 + assert response["msg"] == 'invalid uuid' + print("设备分享(不存在的设备)分享失败-接口正常", response) + except: + print("设备分享(不存在的设备)分享成功-接口异常", response) + # asserts = response + # self.asserts(asserts, 1000, "<<<<<设备分享(不存在的用户)接口正常>>>>>", "<<<<<设备分享(不存在的用户)接口异常>>>>>") + def deviceshareRecords(self): + + params = { + "uuid" : uuid + } + response =requests.get(url=self.deviceshareRecords_url,headers=self.gethttpheaders(self.get_signs(params)),params=params) + asserts = response.json() + self.asserts(asserts, 1000, "<<<<<获取分享列表正常>>>>>", "<<<<<获取分享列表异常>>>>>") + def devicesshareFeedback(self): + #登录被分享账号获取消息列表 + self.messagelisttype() + msg_id = self.messagelisttype() + print(msg_id,"测试返回id") + data = { + "msg_id" : msg_id, + "status" : 2#拒绝 + } + response = requests.post(url=self.deviceshareFeedback_url,headers=self.gethttpheaders(self.post_sign(data)),data=json.dumps(data)).json() + asserts = response + self.asserts(asserts, 1000, "用户反馈(拒绝)分享接口正常", "<<<<<用户反馈(拒绝)分享接口异常>>>>>") + print("子账号列表") + def devicesshareFeedbackOK(self): + time.sleep(1) + self.uesrlog() + self.deviceshare("15814087116")#调用设备分享接口 + self.messagelisttype()#再次调用获取消息列表 + msg_id = self.messagelisttype() + data = { + "msg_id" : msg_id, + "status" : 1#同意 + } + response = requests.post(url=self.deviceshareFeedback_url,headers=self.gethttpheaders(self.post_sign(data)),data=json.dumps(data)).json() + asserts = response + self.asserts(asserts, 1000, "用户反馈(同意)接受分享接口正常", "<<<<<用户反馈(同意)接受分享接口异常>>>>>") + def deviceshareDelete(self): + time.sleep(1) + self.uesrlog() + params = { + "uuid" : uuid, + "uid" : "f032bda4b2fbcced" + } + response = requests.delete(url=self.deviceshareDelete_url, headers=self.DELTEThttpheaders(self.delete_sign(params)),params=params) + asserts = response.json() + self.asserts(asserts, 1000, "设备主人移除分享者接口正常", "<<<<<设备主人移除分享者接口异常>>>>>") + # -----------------------------------------------------------------反馈------------------------------------------- + + def create(self): + data = { + "reason" : 'app', + "explain" :'创建用户反馈接口测试' + } + response = requests.post(url=self.feedbackcreate_url,headers=self.gethttpheaders(self.post_sign(data)),data=json.dumps(data)).json() + asserts = response + self.asserts(asserts, 1000, "<<<<<<创建用户反馈接口正常>>>>>>", "<<<<<<创建用户反馈接口异常>>>>>>") + + def feedbacklist(self): + response = requests.get(url=self.feedbacklist_url, headers=self.gethttpheaders(self.get_sign())) + asserts = response.json() + self.asserts(asserts, 1000, "<<<<<<获取反馈列表接口正常>>>>>>", "<<<<<<获取反馈列表接口异常>>>>>>") + # 提取第一个反馈ID + if asserts.get('code') == 1000 and asserts.get('data') and asserts['data'].get('list'): + return asserts['data']['list'][0].get('id') + return None + + def feedbackinfo(self): + # 先调用列表接口获取ID + feedback_id = self.feedbacklist() + parmas = { + "feedback_id": feedback_id + } + response = requests.get(url=self.feedbackinfo_url, headers=self.gethttpheaders(self.get_signs(parmas)), + params=parmas) + asserts = response.json() + self.asserts(asserts, 1000, "<<<<<<获取反馈详情接口正常>>>>>>", "<<<<<<获取反馈详情接口异常>>>>>>") + def feedbacksend(self): + feedback_id = self.feedbacklist() + data = { + "feedback_id": feedback_id, + "message" : "发送反馈消息测试", + + } + response = requests.post(url=self.feedbacksend_url,headers=self.gethttpheaders(self.post_sign(data)),data=json.dumps(data)).json() + asserts = response + self.asserts(asserts, 1000, "<<<<<<发送反馈消息接口正常>>>>>>", "<<<<<<发送反馈消息接口异常>>>>>>") + #------------------------------------------------------------------留言--------------------------------------------- + def leavewordlist(self): + params = { + "uuid":uuid, + "rows" : "30" #获取最大条数 默认10 + } + response = requests.get(url=self.leavewordlist_url,headers=self.gethttpheaders(self.get_signs(params)),params=params) + asserts = response.json() + self.asserts(asserts, 1000, "<<<<<<>>>>>>", "<<<<<<>>>>>>") + def leavewordunreadNumber(self): + response = requests.get(url=self.leavewordunreadNumber_url,headers=self.gethttpheaders(self.get_sign())) + asserts = response.json() + self.asserts(asserts,1000,"<<<<<<>>>>>>", "<<<<<<>>>>>>") + def leavewordread(self): + data = { + "id" :"" + } + response = requests.post(url=self.leavewordread_url,headers=self.gethttpheaders(self.post_sign(data)),data=json.dumps(data)).json() + asserts = response + self.asserts(asserts,1000,"APP设置全部留言已读接口正常","APP设置全部留言已读接口异常") + def leaveworddeviceList(self): + "设备获取留言列表" + params = { + "rows" : "20" + } + response = requests.get(url=self.leaveworddeviceList_url,headers=self.getdeviceheaders(self.getdevice_sion(params)),params=params) + asserts = response.json() + self.asserts(asserts,1000,"设备获取留言列表接口正常","设备获取留言列表接口异常") + def leaveworddeviceUnreadNumber(self): + + response =requests.get(url=self.leaveworddeviceUnreadNumber_url,headers=self.getdeviceheaders(self.getdevice_sionNOdata())) + asserts = response.json() + self.asserts(asserts,1000,"设备获取留言未读数量接口正常","设备获取留言未读数量接口正常异常") + + def leaveworddeviceRead(self): + data = { + "id" : "" + } + response =requests.post(url=self.leaveworddeviceRead_url,headers=self.getdeviceheaders(self.postdevice_sion(data)),data=json.dumps(data)).json() + asserts = response + self.asserts(asserts,1000,"设备设置留言已读接口正常","设备设置留言已读接口异常") + def devicegetAllUser(self): + response = requests.get(url=self.devicegetAllUser_url,headers=self.deviceheaders(self.getdevice_sionuuid())) + asserts = response.json() + self.asserts(asserts,1000,"设备获取所有门锁用户正常","设备获取所有门锁用户异常") + # print("测试",response.json()) + def deviceputAllUser(self): + data = { + "users": [] + } + + response = requests.post(url=self.deviceputAllUser_url,headers=self.deviceheaders(self.postdevice_sionsuuid(data)),data=json.dumps(data)).json() + asserts = response + self.asserts(asserts,1000,"设备上报所有门锁用户正常","设备上报所有门锁用户异常") + + def devicelogin(self): + data = { + "zone" : "8.00" + } + response = requests.post(url=self.devicelogin_url,headers=self.deviceheaders(self.postdevice_sionsuuid(data)),data=json.dumps(data)).json() + asserts = response + self.asserts(asserts,1000,"设备登录成功","设备登录失败") + return response + + # 连接串口 发送指令 + # def CDM(self): + # time.sleep(2) + # value = "has devband HiChS-Guest *12345678# 7e1558dff183cd08\n" + # # value = "reboot\n" + # app = cks('COM10', 115200, 0, value) + # print(time.strftime('%Y.%m.%d %H:%M:%S ', time.localtime(time.time())), "下发配网指令...") + + + + def mac(self): + # 调试:确认正则表达式模块是否可以导入 + try: + import re + print(f"{time.strftime('%Y.%m.%d %H:%M:%S')} 成功导入正则表达式模块") + except ImportError as e: + print(f"{time.strftime('%Y.%m.%d %H:%M:%S')} 导入正则表达式模块失败: {e}") + # 如果无法导入正则模块,提供替代方案 + re = None + + time.sleep(2) + com_port = 'COM10' + baud_rate = 115200 + timeout = 10 + command = "has print env\n" + print(f"{time.strftime('%Y.%m.%d %H:%M:%S')} 下发配网指令: {command.strip()}") + + # 只关注这三个参数 + target_info = {'mac': None, 'sn': None, 'uuid': None} + max_retries = 3 + + try: + with serial.Serial(com_port, baud_rate, timeout=timeout) as ser: + for attempt in range(max_retries): + print(f"{time.strftime('%Y.%m.%d %H:%M:%S')} 尝试 {attempt + 1}/{max_retries}") + + ser.reset_input_buffer() + ser.reset_output_buffer() + + bytes_sent = ser.write(command.encode('gbk')) + print(f"{time.strftime('%Y.%m.%d %H:%M:%S')} 已发送 {bytes_sent} 字节数据") + time.sleep(1) + + # 接收数据 + response = b'' + start_time = time.time() + while time.time() - start_time < timeout: + if ser.in_waiting > 0: + chunk = ser.read(ser.in_waiting) + response += chunk + print(f"{time.strftime('%Y.%m.%d %H:%M:%S')} 读取到数据片段,长度: {len(chunk)}") + if b'===========' in response and len(response) > 500: + break + else: + time.sleep(0.1) + + print( + f"{time.strftime('%Y.%m.%d %H:%M:%S')} 尝试 {attempt + 1} 接收结束,总数据长度: {len(response)}") + + if len(response) > 0: + # 解码响应 + try: + response_text = response.decode('gbk', errors='replace') + except UnicodeDecodeError: + response_text = response.decode('utf-8', errors='replace') + + print(f"{time.strftime('%Y.%m.%d %H:%M:%S')} 收到设备回复,准备提取信息") + + # 调试:确认正则模块是否可用 + if re: + print(f"{time.strftime('%Y.%m.%d %H:%M:%S')} 正则表达式模块状态: 可用") + else: + print(f"{time.strftime('%Y.%m.%d %H:%M:%S')} 正则表达式模块状态: 不可用,将使用替代方法") + + # 只提取需要的三个参数 + patterns = { + 'sn': r'sn\s*:\s*(\S+)', + 'mac': r'mac\s*:\s*(\S+)', + 'uuid': r'uuid\s*:\s*(\S+)' + } + + for key, pattern in patterns.items(): + if re: + match = re.search(pattern, response_text) + if match: + target_info[key] = match.group(1).strip() + print(f"{time.strftime('%Y.%m.%d %H:%M:%S')} 提取{key.upper()}: {target_info[key]}") + else: + # 替代方法:使用简单字符串查找 + for line in response_text.split('\n'): + if key + ' :' in line.lower(): + value = line.split(':', 1)[1].strip() + target_info[key] = value + print(f"{time.strftime('%Y.%m.%d %H:%M:%S')} 提取{key.upper()}: {value}") + break + + # 三个参数都获取到则跳出循环 + if all(target_info.values()): + print(f"{time.strftime('%Y.%m.%d %H:%M:%S')} 成功获取所有目标信息") + break + else: + print(f"{time.strftime('%Y.%m.%d %H:%M:%S')} 未收到数据,将重试") + + if attempt < max_retries - 1: + time.sleep(1) + + # 最终输出结果(只显示目标参数) + print("\n===== 提取结果 =====") + print(f"MAC: {target_info['mac'] or '未找到'}") + + print(f"SN: {target_info['sn'] or '未找到'}") + print(f"UUID: {target_info['uuid'] or '未找到'}") + target_info_MAC = target_info['mac'] + target_info_SN = target_info['sn'] + target_info_UUID = target_info['uuid'] + self.relations['MAC'] = target_info_MAC + # self.relations['SN'] = target_info_SN + # self.relations['UUID'] = target_info_UUID + # print(self.relations['MAC'], self.relations['SN'],self.relations['UUID']) + print("====================") + + except serial.SerialException as e: + print(f"{time.strftime('%Y.%m.%d %H:%M:%S')} 串口错误: {e}") + except Exception as e: + print(f"{time.strftime('%Y.%m.%d %H:%M:%S')} 错误: {e}") + # 调试:打印完整的错误堆栈 + import traceback + print(traceback.format_exc()) + + return target_info + + + + + def bindmac(self): + # self.mac() + params = { + "label":"TEST_SL100_20240801", + "model":"sl100", + "batch":"1", + # "mac": self.relations['MAC'] + "mac":"6c:20:1e:1d:93:7c" + } + # print("串口传回来的mac",self.relations['MAC']) + response = requests.get(url=self.bindmac_url,headers=self.bandmacheaders(self.bind_mac_sign(params,"GET")),params=params) + print(f"接口返回结果{response.json()}") + # asserts = response + #get请求需要将返回数据转回json再提取 + response_data = response.json()#转字典 + + if response_data['code'] == 1000 and response_data['msg'] == 'ok': + access_sn = response_data["data"]["sn"]#转换完成后提取 + access_uuid = response_data["data"]["uuid"] + self.relations['SN'] = access_sn + self.relations['UUID'] = access_uuid + print(self.relations['SN'],self.relations['UUID']) + else: + self.relations['msg'] = response_data["msg"] + print(self.relations['msg']) + + + + "传入mac地址和uuid" + + def activate(self): + data = { + "mac": self.relations['MAC'], + "uuid": self.relations['UUID'], + } + print(f'传入mac和uuid:{data}') + response =requests.post(url=self.activate_url,headers=self.bandmacheaders(self.bind_mac_sign(data,"POST")),data=json.dumps(data)).json() + print(f"接口返回结果:{response}") + + + # def releas(self): + # # self.relations['UUID'] = self.relations['UUID'] + # print("",self.relations['UUID']) + # return self.relations['UUID'] + + + + + def CMD(self): + # 等待2秒 + time.sleep(2) + # 配置串口参数[ + com_port = 'COM10' + baud_rate = 115200 + timeout = 5 # 设置超时时间为5秒 + # 下发配网指令 + command = "has devband HiChS-Guest *12345678# 7e1558dff183cd08\n" + # command = "reboot\n" + print(f"{time.strftime('%Y.%m.%d %H:%M:%S')} 下发配网指令: {command.strip()}") + + try: + # 初始化串口并发送指令 + with serial.Serial(com_port, baud_rate, timeout=timeout) as ser: + # 发送指令 + bytes_sent = ser.write(command.encode('gbk')) + print(f"{time.strftime('%Y.%m.%d %H:%M:%S')} 已发送 {bytes_sent} 字节数据") + + # 清空输入缓冲区(可选) + # ser.reset_input_buffer() + # 接收设备回复 + response = ser.read_until(b'\n') # 读取直到遇到换行符 + response = response.decode('gbk', errors='replace').strip() + + if response: + print(f"{time.strftime('%Y.%m.%d %H:%M:%S')} 收到设备回复: {response}") + else: + print(f"{time.strftime('%Y.%m.%d %H:%M:%S')} 在超时时间内未收到回复") + + except serial.SerialException as e: + print(f"{time.strftime('%Y.%m.%d %H:%M:%S')} 串口通信错误: {e}") + except UnicodeDecodeError as e: + print(f"{time.strftime('%Y.%m.%d %H:%M:%S')} 解码错误: {e}") + except Exception as e: + print(f"{time.strftime('%Y.%m.%d %H:%M:%S')} 发生未知错误: {e}") + + +#保活设备 + def testmodeon(self): + + # for i in range(1): + time.sleep(2) + value = "has testmode on\n" + # value = "reboot\n" + app = cks('COM10', 115200, 0, value) + print(time.strftime('%Y.%m.%d %H:%M:%S ', time.localtime(time.time())), "保活...") + # app.close() # 关闭串口 + +#关于设备的sing get 带参数和不带参数的 +# def getdevice_sion(self, data=None): +# """生成设备接口签名,data参数可选""" +# # 基础签名部分(固定字段) +# base_sign = ( +# "GET" + "&" + +# "SL100" + "&" + +# "bbf91e0d-d397-4775-8296-25c99e5f7403" + "&" + +# str(timestamp) + "&" + +# "576ad09e31e148f68180c985f5eea989" +# ) +# +# # 如果有参数则添加参数部分 +# if data: +# param_str = "&" + urllib.parse.urlencode(sorted(data.items())).replace("+", " ").replace("%3A", ":") +# return base_sign + param_str +# return base_sign +# +# +# def leaveworddeviceUnreadNumber(self): +# """获取留言未读数量(无参数接口)""" +# # 生成签名(不传参数) +# sign_str = self.getdevice_sion() # 注意这里不传data +# headers = self.getdeviceheaders(sign_str) +# response = requests.get( +# url=self.leaveworddeviceUnreadNumber_url, +# headers=headers +# ) +# return response.json() +# +# +# def leaveworddeviceList(self): +# """获取留言列表(带参数接口)""" +# params = {"rows": "20"} +# # 生成签名(传入params) +# sign_str = self.getdevice_sion(params) +# headers = self.getdeviceheaders(sign_str) +# response = requests.get( +# url=self.leaveworddeviceList_url, +# headers=headers, +# params=params +# ) +# return response.json() \ No newline at end of file diff --git a/Test_Api_Runner.py b/Test_Api_Runner.py new file mode 100644 index 0000000..3cf54cf --- /dev/null +++ b/Test_Api_Runner.py @@ -0,0 +1,317 @@ +# -*- coding: utf-8 -*- +import json +import time +import unittest +import requests +import os +import HtmlTestRunner +from API_Keys import apis +# 用例路径 +case_path = os.path.join(os.getcwd(), "case") +# 报告存放路径 +report_path = os.path.join(os.getcwd(), "report") +'''API接口测试''' +class TestAPIs(unittest.TestCase): + # 类变量:跟踪登录状态和API实例 + _logged_in = False + devapi = None + + @classmethod + def setUpClass(cls): + """初始化API实例,首次调用时执行登录""" + cls.devapi = apis() + cls.login() # 初始登录 + + @classmethod + def login(cls): + """执行登录并更新登录状态""" + response = cls.devapi.uesrlog() + cls._logged_in = True + return response + + @classmethod + def logout(cls): + """清除登录状态""" + cls._logged_in = False + + # def test_01_uesrlogerror(self): + # '未注册账号登录' + # # 如果需要在此测试中重新登录,可以调用: + # # TestAPIs.login() + # self.devapi.uesrlogerror() + # '''用户''' + # def test_02_pwdsuser(self): + # '<错误密码>' + # self.devapi.pwdsuser("dwqdwqwq3") + # def test_03_getlog(self): + # '<获取用户信息>' + # '获取用户头像上传地址' + # self.devapi.putAvatar() + # def test_05_UpdatePwd(self): + # '修改密码' + # self.devapi.UpdatePwd() + # def test_06_updateInfo(self): + # '修改昵称' + # self.devapi.getlog() + # def test_04_putAvatar(self): + # self.devapi.updateInfo() + # def test_07_updateInfomax(self): + # '修改昵称---30字符' + # self.devapi.updateInfomax() + # def test_08_updateInfomin(self): + # '修改昵称---1字符' + # self.devapi.updateInfomin() + # def test_09_updateInfoSpecial_symbols(self): + # '修改昵称---特殊符号' + # self.devapi.updateInfoSpecial_symbols() + # def test_10_updateInfoSpecial_Chinese(self): + # '''中文字符''' + # self.devapi.updateInfoSpecial_Chinese() + # "测试连" + # def test_11_putClient(self): + # '上报登录设备信息' + # self.devapi.putClient() + # '''设备绑定''' + # def test_12_bind(self): + # '设备绑定' + # self.devapi.bind( + # 1000, + # "设备绑定成功接口正常", + # "设备绑定成功接口异常", + # "d8:1b:99:ee:c2:62", + # "ok" + # ) + # def test_13_bind(self): + # '设备绑定-不存在的mac' + # self.devapi.bind( + # 1003, + # "不存在的MAC绑定失败,接口正常", + # "不存在的MAC绑定成功,接口异常", + # "11:a9:af:01:74:27", + # "invalid mac" + # ) + # def test_14_upgradedVersion_Front(self): + # '获取设备前板可升级版本' + # self.devapi.upgradedVersion_Front() + # def test_15_login(self): + # '获取设备后板可升级版本' + # self.devapi.upgradedVersion_Rear() + # def test_16_models(self): + # '获取所有设备型号列表' + # self.devapi.models() + # def test_17_newList(self): + # '获取五分钟内绑定的设备' + # self.devapi.newList() + # def test_18_UpdateName(self): + # '修改设备名称----20字符' + # self.devapi.UpdateName(self.devapi.randoms(20)) + # def test_19_UpdateName_max(self): + # '修改设备名称--21字符' + # self.devapi.UpdateName_max(self.devapi.randoms(21)) + # def test_20_UpdateName_min(self): + # '修改设备名称--1字符' + # self.devapi.UpdateName_min(self.devapi.randoms(1)) + # def test_21_UpdateName_special(self): + # '修改名称-特殊字符' + # self.devapi.UpdateName_special(self.devapi.randomspecial(5)) + # def test_22_UpdateName_Chinese(self): + # '修改名称-输入中文字符' + # self.devapi.UpdateName_Chinese(self.devapi.randomChinese(5)) + # # def test_23_deletedevices(self): + # # '解绑设备' + # # self.devapi.deletedevices() + # """设备事件部分""" + # def test_24_evenlist(self): + # '获取事件列表' + # self.devapi.evenlist() + # def test_25_eventread(self): + # '修改事件消息的读取状态' + # self.devapi.eventread() + # def test_26_eventunreadNum(self): + # '获取事件未读数量' + # self.devapi.eventunreadNum() + # def test_27_evendel(self): + # '删除事件消息' + # self.devapi.evendel() + # "公共接口" + # def test_28_times(self): + # '获取服务器当前时间戳' + # self.devapi.times() + # def test_29_getUrl(self): + # '获取服务器地址' + # self.devapi.getUrl() + # "设备家庭" + # def test_30_homeCreate(self): + # '创建家庭' + # self.devapi.homeCreate(self.devapi.randoms(20)) + # def test_31_homeCreatepecial(self): + # '创建家庭-特殊字符' + # self.devapi.homeCreatepecial(self.devapi.randomspecial(6)) + # def test_32_homeCreateChinese(self): + # '创建家庭-中文字符' + # self.devapi.homeCreateChinese(self.devapi.randomChinese(6)) + # def test_33_homeCreateMax(self): + # '创建家庭-21字符' + # self.devapi.homeCreateMax(self.devapi.randoms(21)) + # def test_34_homeCreateMin(self): + # '创建家庭-1字符' + # self.devapi.homeCreateMin(self.devapi.randomChinese(1)) + # def test_35_homesList(self): + # '获取家庭列表' + # self.devapi.homeslist() + # def test_36_homeDelete(self): + # '删除家庭' + # for i in range(4): + # + # self.devapi.homeDelete() + # def test_37_homeUpdate(self): + # '修改家庭名称或者地址' + # self.devapi.homeUpdate(self.devapi.randomChinese(5)) + # def test_38_homeUpdatepecial(self): + # '修改家庭名称-特殊字符' + # self.devapi.homeUpdatepecial(self.devapi.randoms(10)) + # def test_39_homeUpdateMax(self): + # '修改家庭名称-30字符' + # self.devapi.homeUpdateMax(self.devapi.randoms(20)) + # def test_40_homeChange(self): + # '更换设备所属家庭-最后一个' + # self.devapi.homeChange() + # def test_41_homeChangenoe(self): + # '更换设备所属家庭-第一个' + # self.devapi.homeChangenoe() + # def test_42_homeDevices(self): + # '获取家庭下的设备列表' + # self.devapi.homeDevices() + # def test_43_homeUsers(self): + # '获取家庭下的用户' + # self.devapi.homeUsers() + # def test_44_homeShare(self): + # '分享家庭给用户' + # self.devapi.homeShare() + # def test_45_messageList(self): + # '登录被分享账号去获取消息列表' + # self.devapi.messagelist() + # # time.sleep(3) + # def test_46_homeShareFeedback(self): + # '用户反馈分享结果接口(同意加入家庭)' + # self.devapi.homeShareFeedback(1) + # def test_47_homeShareRemove(self): + # '删除分享者(主账号登录去删除)' + # self.devapi.homeShareRemove() + # def test_48_homeShareFeedbackNo(self): + # '用户反馈分享结果接口(拒绝加入家庭)' + # self.devapi.homeShareFeedbackNo(2) + # ''' + # 家庭内添加设备接口功能(已废弃) + # def test_48_1_homeAddDevice(self): + # '家庭内添加设备' + # self.devapi.homeAddDevice("685d3d5c8e2acc38e53eb38a") + # def test_48_2_homeAddDevice(self): + # time.sleep(1) + # self.devapi.homeAddDevice("6865f9dd8e2acc38e53eb69f") + # def test_48_3_homeAddDevice(self): + # time.sleep(1) + # self.devapi.homeAddDevice("687460cb3ca717516d7124a2") + # ''' + # # def test_49_homeDelete(self): #前面有了删除家庭 + # # '删除家庭' + # # # for i in range(4): + # # # time.sleep(1) + # # self.devapi.homeDelete() + # # "云存储" + # def test_50_cloudToken(self): + # 'APP获取文件下载token' + # self.devapi.cloudToken() + # def test_51_cloudFiles(self): + # 'APP获取云存储文件信息' + # self.devapi.cloudFiles() + # def test_52_cloudputToken(self): + # '设备获取文件上报token' + # self.devapi.cloudputToken() + # def test_53_cloudputFiles(self): + # '获取设备上云存信息' + # self.devapi.cloudputFiles() + # "消息部分" + # def test_54_messagelist(self): + # '获取消息列表' + # self.devapi.messagelist() + # # time.sleep(3) + # def test_55_messageread(self): + # '修改消息读取状态' + # self.devapi.messageread() + # def test_56_messageunreaNum(self): + # '获取消息未读数量' + # self.devapi.messageunreaNum() + # def test_57_mmessagedelete(self): + # '删除消息' + # self.devapi.messagedelete() + # "设备分享" + # def test_58_deviceshare(self): + # '设备分享' + # self.devapi.deviceshare("15814087116") + # def test_59_deviceshareUser(self): + # '设备分享-不存在的用户' + # self.devapi.deviceshareUser("18814087116") + # def test_60_deviceshareUser(self): + # '分享不存在的设备' + # self.devapi.deviceshareDevice("c7e8e360d5c4e8aa9624e1367d2896b") + # def test_61_deviceshareRecords(self): + # '获取分享列表' + # self.devapi.deviceshareRecords() + # def test_62_devicesshareFeedback(self): + # '用户反馈(拒绝)接受分享' + # self.devapi.devicesshareFeedback() + # def test_63_devicesshareFeedbackOK(self): + # '用户反馈(同意)接受分享' + # self.devapi.devicesshareFeedbackOK() + # def test_64_deviceshareDelete(self): + # '设备主人移除分享' + # self.devapi.deviceshareDelete() + # def test_65_create(self): + # '创建反馈' + # self.devapi.create() + # def test_66_feedbacklist(self): + # '获取创建反馈列表' + # self.devapi.feedbacklist() + # def test_67_feedbackinfo(self): + # '获取反馈详情' + # self.devapi.feedbackinfo() + # def test_68_feedbacksend(self): + # '发送反馈消息' + # self.devapi.feedbacksend() + # def test_69_leavewordlist(self): + # 'APP获取留言列表' + # self.devapi.leavewordlist() + # def test_70_leavewordunreadNumber(self): + # 'APP获取留言未读数量' + # self.devapi.leavewordunreadNumber() + # def test_71_leavewordread(self): + # 'APP设置全部留已读' + # self.devapi.leavewordread() + # def test_72_leaveworddeviceList(self): + # '设备获取留言列表' + # self.devapi.leaveworddeviceList() + # def test_73_leaveworddeviceUnreadNumber(self): + # '设备获取留言未读数量' + # self.devapi.leaveworddeviceUnreadNumber() + # def test_74_leaveworddeviceRead(self): + # # '设备设置留言已读' + # self.devapi.leaveworddeviceRead() + def test_75_devicegetAllUser(self): + self.devapi.devicegetAllUser() + def test_76_deviceputAllUser(self): + self.devapi.deviceputAllUser() + def test_77_devicelogin(self): + self.devapi.devicelogin() + # + # def test_02_binmac(self): + # self.devapi.bindmac() + # def test_03_binmacs(self): + # self.devapi.activate() + # + # + + +if __name__ == '__main__': + + unittest.main()