介绍分享一个接口自动化框架搭建方法 (pytest+request+allure),这个方案是由 xpcs 同学在TesterHome社区网站的分享。
写在前面
去年11月被裁,到现在还没上岸,gap 半年了。上岸无望,专业技能不能落下,花了两三天时间,把之前工作中搭建使用的接口自动化框架,重写了一套。
楼主代码菜鸡,代码可能比较 low - -,希望通过本次分享,给社区里想写接口自动化的同学一些借鉴,也希望社区里的大神多给一些优化建议,大家互帮互助,共同进步~
-
框架基于 python 语言,框架使用 pytest,报告使用 allure
-
支持多环境运行,通过命令行传参区分
-
支持多进程跑测,用例需独立无依赖,conftest.py 中包含多进程下只运行一次的 fileLock fixture
-
支持数据库连接单例,一个库在一个进程下只会建立一次连接
-
支持 mysql、redis 操作
-
支持 get、post、put、delete 请求方法,请求是通过用例的请求头 Content-Type 来区分,是使用 params、data 还是 json 传参
-
支持参数化数据驱动,用参数化参数字典,去更新通用参数字典,更新后发起请求
以下使用 windows 环境
conda 配置和新建工程:
安装conda
https://www.anaconda.com/download/success
新建工程,在 pycharm 中新建 conda 虚拟环境
安装 allure 报告
https://repo.maven.apache.org/maven2/io/qameta/allure/allure-commandline/2.29.0/
解压并配置环境变量
D:\allure\allure-2.29.0
cmd 命令行,验证 allure 安装
设置 pycharm 文件编码 UTF-8
依赖安装
pycharm 命令行运行
激活虚拟环境
conda activate pytest_api_auto
安装 python3.8
conda install python=3.8
安装依赖包
pip install requests
pip install jsonpath
pip install pytest
pip install allure-pytest
pip install pytest-sugar
pip install pytest-xdist
pip install pytest-assume
pip install pymysql
pip install redis
pip install faker
pip install filelock
目录划分
以下是源码部分
封装log日志工具类
-
# common/log_util.pyimport loggingimport os# create loggerlog = logging.getLogger("pytest_api_auto")log.setLevel(logging.INFO)# create file handler
-
# mode 默认为a追加模式,如果修改为w为覆盖模式,多进程运行会出现日志缺失和错乱
-
# 获取项目根目录拼接,日志会存在工程根目录pytest.log 每次运行追加写入fh = logging.FileHandler(os.path.join(os.path.dirname(os.path.dirname(__file__)), "pytest.log"),
-
mode='a', encoding='UTF-8')fh.setLevel(logging.INFO)# create stream handlersh = logging.StreamHandler(stream=None)# create formatterfmt = "%(asctime)s - %(filename)s - %(funcName)s - %(lineno)d - %(levelname)s - %(message)s"formatter = logging.Formatter(fmt)# add handler and formatter to loggerfh.setFormatter(formatter)sh.setFormatter(formatter)log.addHandler(fh)log.addHandler(sh)
封装环境配置
-
config/env.pyfrom common.log_util import logclass ENV:
-
# 环境信息:test 测试 prod 准生产 # 从pytest命令行获取 info = None# 测试环境服务域名配置class UrlTestConfig:
-
api_backend = "http://api_backend.cn:8899"# 准生产环境服务域名配置class UrlProdConfig:
-
api_backend = "http://api_backend.cn:8899"def get_url(server_name):
-
if ENV.info == "test":
-
url = getattr(UrlTestConfig, server_name)
-
log.info(f"测试环境获取服务域名 - {server_name} : {url}")
-
return url
-
elif ENV.info == "prod":
-
url = getattr(UrlProdConfig, server_name)
-
log.info(f"准生产环境获取服务域名 - {server_name} : {url}")
-
return url
-
else:
-
raise Exception("--env 环境信息有误")
封装 mysql 操作工具类
-
# common/mysql_util.py
-
# 装饰器,同一个mysql数据库只建立一次连接import pymysqlfrom time import sleepfrom common.log_util import log# 装饰器,同一个mysql数据库只建立一次连接def decorate_single(cls):
-
connect_list = {}
-
def wrapper(*args, **kwargs):
-
nonlocal connect_list
-
db_name = args[0]["db"]
-
if db_name not in connect_list:
-
connect_list[db_name] = cls(*args, **kwargs)
-
log.info(f"建立mysql连接并返回 - {db_name}")
-
else:
-
log.info(f"mysql连接已建立,直接返回 - {db_name}")
-
return connect_list[db_name]
-
return wrapper@decorate_singleclass MySql:
-
def __init__(self, db_config: dict):
-
"""
-
:params: db_config 数据库配置 类型为字典
-
"""
-
# 数据库配置 # autocommit: True 选项很关键,如果不设置,新增数据无法查出 # mysql默认数据引擎是innodb 默认数据隔离级别重复读,如果事务不提交,那么每次查询,查询都是同一块数据快照 self.conn = None
-
while True:
-
try:
-
self.conn = pymysql.connect(**db_config)
-
break
-
# 数据库连接,偶尔会连接不上 # 报错 pymysql.err.OperationalError: (2013, 'Lost connection to MySQL server during query') # 解决办法,就是重新连接 except pymysql.err.OperationalError:
-
log.warning("连接失败,可能环境不稳定,重新连接!")
-
sleep(1)
-
except Exception as e:
-
log.warning("获取mysql连接失败!请检查数据库配置或网络连接")
-
raise e
-
def fetchone(self, sql_str: str):
-
"""
-
:params: sql_str 数据库sql
-
:return: 返回查询结果的一条记录,类型是字典; 若未查询到,则返回None
-
"""
-
try:
-
with self.conn.cursor(pymysql.cursors.DictCursor) as cursor:
-
log.info(f"执行sql: {sql_str}")
-
cursor.execute(sql_str)
-
data = cursor.fetchone()
-
log.info(f"sql执行结果: {data}")
-
return data
-
except Exception as e:
-
log.warning("执行sql失败!")
-
raise e
-
def fetchall(self, sql_str: str):
-
"""
-
:params: sql_str 数据库sql
-
:return: 返回查询结果的全部记录,类型是列表,列表元素为字典
-
"""
-
try:
-
with self.conn.cursor(pymysql.cursors.DictCursor) as cursor:
-
log.info(f"执行sql: {sql_str}")
-
cursor.execute(sql_str)
-
data = cursor.fetchall()
-
log.info(f"sql执行结果: {data}")
-
return data
-
except Exception as e:
-
log.warning("执行sql失败!")
-
raise e
-
def execute_dml(self, sql_str):
-
"""
-
function: 执行insert、update、delete
-
:param sql_str 数据库sql
-
:return: 无返回
-
"""
-
try:
-
with self.conn.cursor(pymysql.cursors.DictCursor) as cursor:
-
log.info(f"执行sql: {sql_str}")
-
data = cursor.execute(sql_str)
-
# 提交操作,我们配置连接是自动提交,所以下面提交步骤也可省略 self.conn.commit()
-
log.info(f"sql执行结果: {data}")
-
except Exception as e:
-
log.warning("执行sql失败!")
-
raise e
-
def close(self):
-
"""
-
function:关闭数据库连接
-
params: conn 数据库连接
-
"""
-
self.conn.close()
封装 mysql 配置和连接获取
-
# config/mysql.pyfrom common.mysql_util import MySqlfrom config.env import ENVfrom common.log_util import log# 数据库连接配置class MysqlTestConfig:
-
"""Mysql测试环境配置"""
-
api_auto = {'host': 'localhost', 'port': 3306,
-
'db': 'api_auto', 'user': 'root',
-
'password': 'root', 'autocommit': True
-
}class MysqlProdConfig:
-
"""Mysql准生产环境配置"""
-
api_auto = {'host': 'localhost', 'port': 3306,
-
'db': 'api_auto', 'user': 'root',
-
'password': 'root', 'autocommit': True
-
}def get_mysql_conn(db_name):
-
if ENV.info == "test":
-
log.info("测试环境建立mysql连接 - " + db_name)
-
return MySql(getattr(MysqlTestConfig, db_name))
-
elif ENV.info == "prod":
-
log.info("准生产环境建立mysql连接 - " + db_name)
-
return MySql(getattr(MysqlProdConfig, db_name))
-
else:
-
raise Exception("--env 环境信息有误")
封装 redis 工具类
-
# common/redis_util.pyimport redisfrom common.log_util import log# 装饰器,同一个redis只建立一次连接def decorate_single(cls):
-
connect_list = {}
-
def wrapper(*args, **kwargs):
-
nonlocal connect_list
-
host = args[0]["host"]
-
if host not in connect_list:
-
connect_list[host] = cls(*args, **kwargs)
-
log.info(f"建立redis连接并返回 - {host}")
-
else:
-
log.info(f"redis连接已建立,直接返回 - {host}")
-
return connect_list[host]
-
return wrapper@decorate_singleclass Redis:
-
def __init__(self, db_config):
-
"""
-
:params: db_config 数据库配置 类型为字典
-
"""
-
self.pool = redis.ConnectionPool(**db_config)
-
self.rs = redis.Redis(connection_pool=self.pool)
-
def del_key(self, key):
-
"""
-
:param key: redis key str字符类型
-
:return: 删除成功返回 True 否则 False
-
"""
-
log.info(f"redis 删除key {key}")
-
if self.rs.delete(key) == 1:
-
log.info(f"key {key} 删除成功")
-
return True
-
else:
-
log.warning(f"key: {key} 不存在!")
-
return False
-
def del_keys(self, keys_pattern):
-
"""
-
:param keys_pattern: key通配符 str字符类型 ex: *name*
-
:return:删除成功返回 True 否则 False
-
"""
-
log.info(f"redis 删除keys 通配符 {keys_pattern}")
-
keys = self.rs.keys(keys_pattern)
-
if keys:
-
log.info(f"redis 删除keys {keys}")
-
for k in keys:
-
self.rs.delete(k)
-
log.info(f"keys {keys} 删除成功")
-
return True
-
else:
-
log.warning("通配符未匹配到key!")
-
return False
-
def set(self, key, value, ex=8 * 60 * 60):
-
"""
-
操作str类型
-
:param key: redis key str字符类型
-
:param value: str字符类型
-
:param ex: 数据超时时间,默认8小时
-
return: 写入成功返回 True
-
"""
-
log.info(f"redis str类型 数据写入 key: {key} value: {value}")
-
return self.rs.set(key, value, ex=ex)
-
def get(self, key):
-
"""
-
操作str类型
-
:param key: redis key str字符类型
-
:return: 获取到返回str字符类型 # 未获取到返回 None
-
"""
-
data = self.rs.get(key)
-
log.info(f"redis str类型 数据获取 key: {key} value: {data}")
-
return data
-
def lrange(self, key):
-
"""
-
操作list类型
-
:param key: redis key str字符类型
-
return: 获取到返回list列表类型 # 未获取到返回空列表 []
-
"""
-
data = self.rs.lrange(key, 0, -1)
-
log.info(f"redis list类型 数据获取 key: {key} values: {data}")
-
return data
-
def smembers(self, key):
-
"""
-
操作 set 集合
-
:param key: redis key str字符类型
-
return: 获取到返回set集合类型 # 未获取到返回空集合 set()
-
"""
-
data = self.rs.smembers(key)
-
log.info(f"redis set类型 数据获取 key: {key} values: {data}")
-
return data
-
def zrange(self, key):
-
"""
-
操作 zset 有序集合
-
:param key: redis key str字符类型
-
return: 获取到返回list列表类型 # 未获取到返回空列表 []
-
"""
-
data = self.rs.zrange(key, 0, -1)
-
log.info(f"redis zset类型 数据获取 key: {key} values: {data}")
-
return data
-
# hash 操作 hset hget 后续可扩展
-
def close(self):
-
"""
-
function:关闭数据库连接
-
params: rs Redis对象
-
"""
-
self.rs.close()
封装 redis 配置和连接获取
-
# config/redis.pyfrom common.redis_util import Redisfrom config.env import ENVfrom common.log_util import logclass RedisTestConfig:
-
api_backend = {'host': 'api_backend.cn', 'password': 'redis123',
-
'port': 6379, 'db': 0, 'decode_responses': True}class RedisProdConfig:
-
api_backend = {'host': 'api_backend.cn', 'password': 'redis123',
-
'port': 6379, 'db': 0, 'decode_responses': True}def get_redis_conn(name):
-
if ENV.info == "test":
-
log.info("测试环境建立redis连接 - " + name)
-
return Redis(getattr(RedisTestConfig, name))
-
elif ENV.info == "prod":
-
log.info("准生产环境建立redis连接 - " + name)
-
return Redis(getattr(RedisProdConfig, name))
-
else:
-
raise Exception("--env 环境信息有误")
封装 requests 工具类
-
# common/requests_util.pyimport requestsfrom common.log_util import logdef send_request(url, method, data, headers, **kwargs):
-
"""
-
:param url: 请求域名 类型 str ex: http://xxx.com/path
-
:param method: 请求方法 类型 str 暂时支持 get、post、put、delete
-
:param data: 请求数据,类型 dict、list、str
-
:param headers: 请求头,类型 dict
-
:param kwargs: 扩展支持 files 上传文件、proxy 代理等
-
:return:
-
"""
-
if not url.startswith("http://") and not url.startswith("https://"):
-
raise Exception("请求url缺少协议名")
-
if method.lower() not in ("get", "post", "put", "delete"):
-
raise Exception(f"暂不支持请求方法 - {method} - 可后续扩展")
-
log.info("请求参数:")
-
log.info(f"url: {url}")
-
log.info(f"method: {method}")
-
log.info(f"data: {data}")
-
log.info(f"headers: {headers}")
-
log.info(f"kwargs: {kwargs}")
-
try:
-
if "Content-Type" in headers.keys():
-
# headers 包含传参类型 if headers["Content-Type"] in ("application/x-www-form-urlencoded", "multipart/form-data"):
-
res = requests.request(url=url, method=method, data=data, headers=headers, timeout=30, **kwargs)
-
elif headers["Content-Type"] == "application/json":
-
res = requests.request(url=url, method=method, json=data, headers=headers, timeout=30, **kwargs)
-
else: # 若非上面三种类型,默认使用json传参 text/html, text/plain等,可后续扩展验证 res = requests.request(url=url, method=method, json=data, headers=headers, timeout=30, **kwargs)
-
else:
-
# 请求头没指定传参类型Content-Type,则使用params传参,即在url中传参,如get请求 res = requests.request(url=url, method=method, params=data, headers=headers, timeout=30, **kwargs)
-
except Exception as e:
-
log.warning("请求发生异常!!!")
-
raise e
-
if res.status_code == 200:
-
log.info("请求成功")
-
log.info("响应参数:")
-
log.info(f"{res.text}")
-
else:
-
log.warning(f"请求失败!!! 返回码不为200, 状态码为: {res.status_code}")
-
log.warning(f"响应参数:")
-
log.warning(f"text: {res.text}")
-
log.warning(f"raw: {res.raw}")
-
raise Exception("返回码不为200")
-
try:
-
# 返回为字典类型 return res.json()
-
except requests.exceptions.JSONDecodeError:
-
log.warning("响应参数不为json,返回响应 response对象")
-
return res
封装用例数据解析
-
# common/data_parser.pyfrom config.env import get_urlfrom common.request_util import send_requestimport jsonpathfrom common.log_util import logdef parser(server_name, data_dict, param=None, **kwargs):
-
"""
-
:param server_name: env.py 中的服务域名 类型str ex: api_backend
-
:param data_dict: test_xxx.py测试用例对应data.py中的接口请求数据字典 ex: api_backend["get_student"]
-
:param param: data.py 参数化列表中一项中的data ex: api_backend["get_student"]["param_list"][0]["data"]
-
:**kwargs: 扩展参数
-
:return: 请求结果,如果响应是json类型返回dict,否则返回response对象
-
"""
-
# 获取配置中的服务器域名,拼接path url = get_url(server_name) + data_dict["path"]
-
method = data_dict["method"]
-
headers = data_dict["headers"]
-
data = data_dict["data"]
-
# 参数化后发起请求,用参数化参数更新或替代通用参数 if param:
-
if isinstance(data, dict) and isinstance(param, dict):
-
# 如果通用参数为字典,参数化参数也为字典,使用参数化参数更新通用参数 ex: {"xx": "xx"} data.update(param)
-
else:
-
# 如果通用参数是字符串、列表(元素为字符、数字、字典),直接使用参数化参数据替换通用参数 ex: ["xx", "xx"] data = param
-
res = send_request(url, method, data, headers, **kwargs)
-
return resdef assert_res(res_dict, expect_dict):
-
"""
-
:param res_dict: request请求返回的结果字典,类型 dict
-
:param expect_dict: 预期结果字典, 类型 dict
-
"""
-
if isinstance(res_dict, dict):
-
log.info("开始断言")
-
log.info(f"预期结果: {expect_dict}")
-
# 遍历预期结果的key,使用jsonpath获取请求结果的value,与预期结果value比对 for k in expect_dict.keys():
-
res_list = jsonpath.jsonpath(res_dict, '$..' + str(k)) # 返回列表 assert expect_dict[k] in res_list
-
log.info("断言通过")
-
else:
-
log.warning("请求结果不为dict字典类型,跳过断言!")
封装 faker 模拟数据
-
# common/faker.pyfrom faker import Fakerfrom common.log_util import logfake = Faker("zh_CN")def get_name():
-
name = fake.name()
-
log.info(f"faker 生成姓名: {name}")
-
return namedef get_phone_number():
-
phone_number = fake.phone_number()
-
log.info(f"faker 生成手机号: {phone_number}")
-
return phone_numberdef get_id_card():
-
id_card = fake.ssn()
-
log.info(f"faker 生成身份证号: {id_card}")
-
return id_card
pytest.ini
-
[pytest]addopts = -p no:warnings -vsmarkers =
-
multiprocess: suppurt mutl-process execute cases
全局 conftest.py
-
# conftest.pyimport pytestfrom common.log_util import logfrom filelock import FileLockimport jsonfrom config.env import ENVimport osimport allure# 自定义环境信息pytest命令行def pytest_addoption(parser):
-
parser.addoption(
-
"--env",
-
action="store",
-
default="test",
-
help="set pytest running environment ex: --env=test --env=prod"
-
)# 从pytest命令行获取环境信息@pytest.fixture(scope="session")def get_env(request):
-
ENV.info = request.config.getoption("--env")
-
log.info("运行环境: " + ENV.info)
-
return ENV.info# 终结函数,最后执行@pytest.fixture(scope="session", autouse=True)def fixture_case_end(request):
-
def case_end():
-
log.info("测试结束")
-
request.addfinalizer(case_end)@pytest.fixture(scope="session", autouse=True)# fixture 嵌套先执行获取环境信息get_env
-
# 加入 tmp_path_factory worker_id 用于多进程执行 # 多进程运行,token只获取一次def fixture_get_token(get_env, tmp_path_factory, worker_id):
-
# 单进程执行 if worker_id == "master":
-
# 获取token token = {"token": "xpcs"}
-
log.info("fixture_get_token master获取token %s" % token['token'])
-
else:
-
# 多进程执行 root_tmp_dir = tmp_path_factory.getbasetemp().parent
-
fn = root_tmp_dir / "data.json"
-
# 这里with里面的语句,理解为是被加锁的,同一时间只能有一个进程访问 with FileLock(str(fn) + ".lock"):
-
if fn.is_file():
-
# session_fixture 获取token已执行,直接从文件中读取token token = json.loads(fn.read_text())
-
log.info("fixture_get_token slave使用token %s" % token['token'])
-
else:
-
token = {"token": "xpcs"}
-
fn.write_text(json.dumps(token))
-
log.info("fixture_get_token slave获取token %s" % token['token'])
-
yield token['token']
-
# session 结束后自动执行如下 log.info("session结束")# 用例失败自动执行钩子函数@pytest.hookimpl(tryfirst=True, hookwrapper=True)def pytest_runtest_makereport(item):
-
# 获取钩子方法的调用结果 outcome = yield
-
rep = outcome.get_result()
-
# 仅仅获取用例call 执行结果是失败的情况, 不包含 setup/teardown if rep.when == "call" and rep.failed:
-
mode = "a" if os.path.exists("failures") else "w"
-
with open("failures", mode) as f:
-
# let's also access a fixture for the fun of it if "tmpdir" in item.fixturenames:
-
extra = " (%s)" % item.funcargs["tmpdir"]
-
else:
-
extra = ""
-
f.write(rep.nodeid + extra + "\n")
-
with allure.step("用例运行失败,可加入信息"):
-
allure.attach("失败内容: ----xpcs----", "失败标题", allure.attachment_type.TEXT)
测试用例
-
# line_of_business/service_name_api_backend/test_api_backend.pyimport pytestfrom time import sleepimport allurefrom common.data_parser import parser, assert_resfrom config.mysql import get_mysql_connfrom common.log_util import logfrom line_of_business_name.service_name_api_backend.data import api_backendfrom common.faker_util import get_name, get_id_card, get_phone_numberfrom config.redis import get_redis_conn@allure.feature("flask后端接口测试")class TestApiBackend:
-
@classmethod
-
def setup_class(cls):
-
# 获取数据库连接,执行sql测试 log.info("setup_class")
-
# 数据库连接根据db库名单例,相同库返回同一个连接 conn = get_mysql_conn("api_auto")
-
conn1 = get_mysql_conn("api_auto")
-
conn.execute_dml("insert into test_xdist(msg) values ('%s')" % "class_setup-数据库写入测试")
-
conn1.fetchone("select * from test_xdist limit 1")
-
@classmethod
-
def teardown_class(cls):
-
log.info("steup_teardowm")
-
# 获取redis连接,执行命令测试 # redis连接根据host单例,相同host返回同一个连接 rs = get_redis_conn("api_backend")
-
rs1 = get_redis_conn("api_backend")
-
rs.set("name", "xp")
-
rs1.get("name")
-
@allure.story("测试故事1")
-
@pytest.mark.xfail(reason='预期失败用例')
-
@user12ize("param", [{"title": "标题1", "param": 2, "assert": 3}])
-
def test_case_one(self, param):
-
sleep(1)
-
allure.dynamic.description("测试故事1-描述信息")
-
allure.dynamic.severity(allure.severity_level.CRITICAL) # 用例级别严重 # allure动态标题 allure.dynamic.title(param["title"])
-
log.info("测试faker数据")
-
log.info(f"{get_name()} {get_phone_number()} {get_id_card()}")
-
# pytest.assume(False) # 多重断言插件,断言失败继续执行下面 assert param["param"] + 2 == param["assert"]
-
@allure.story("查询学生接口")
-
@user14cess # 此用例分组到可多进程跑测 @user15ize("param", api_backend["get_student"]["param_list"])
-
def test_get_student(self, param, fixture_get_token):
-
sleep(1)
-
allure.dynamic.title(param["title"])
-
data_dict = api_backend["get_student"]
-
data_dict["headers"]["Cookie"] = fixture_get_token
-
res = parser("api_backend", data_dict, param["data"])
-
assert_res(res, param["assert"])
-
@allure.story("新增学生接口")
-
@user17cess # 此用例分组到可多进程跑测 @user18ize("param", api_backend["post_student"]["param_list"])
-
def test_post_student(self, param):
-
sleep(1)
-
allure.dynamic.title(param["title"])
-
data_dict = api_backend["post_student"]
-
res = parser("api_backend", data_dict, param["data"])
-
assert_res(res, param["assert"])
-
@allure.story("更新学生接口")
-
@user20cess # 此用例分组到可多进程跑测 @user21ize("param", api_backend["put_student"]["param_list"])
-
def test_put_student(self, param):
-
sleep(1)
-
allure.dynamic.title(param["title"])
-
data_dict = api_backend["put_student"]
-
res = parser("api_backend", data_dict, param["data"])
-
assert_res(res, param["assert"])
用例数据驱动
-
# line_of_business/service_name_api_backend/data.py
-
# 服务名外层大字典,参数key是接口名,value是接口的请求信息字典,用例模块可通过接口名引用接口信息字典
-
# param_list 参数化列表,用于pytest参数化,每次选取其中一项的data,去更新外部data通用参数,发起请求api_backend = {
-
"get_student": dict(path="/student",
-
method="get",
-
# headers 不包含Content-Type 则request使用params传参 headers={},
-
# 通用参数,每次请求使用 data={"test": "test"},
-
# 参数化参数,每次使用其中一项,更新通用参数 param_list=[
-
{"title": "获取学生信息-张三", "data": {"name": "张三"}, "assert": {"code": 0, "msg": "ok"}},
-
{"title": "获取学生信息-李四", "data": {"name": "李四"}, "assert": {"code": 0, "msg": "ok"}},
-
{"title": "获取学生信息-王五", "data": {"name": "王五"}, "assert": {"code": 0, "msg": "ok"}}
-
]),
-
'post_student': dict(path="/student",
-
method="post",
-
# headers Content-Type = application/x-www-form-urlencoded 则使用 request使用data传参 headers={"Cookie": "", "Content-Type": "application/x-www-form-urlencoded"},
-
data={"test": "test"},
-
param_list=[
-
{"title": "新增学生信息-张三", "data": {"name": "张三"}, "assert": {"code": 1, "msg": "ok"}},
-
{"title": "新增学生信息-李四", "data": {"name": "李四"}, "assert": {"code": 0, "msg": "ok"}},
-
{"title": "新增学生信息-王五", "data": {"name": "王五"}, "assert": {"code": 0, "msg": "ok"}}
-
]),
-
'put_student': dict(path="/student",
-
method="put",
-
# headers Content-Type = application/json 则使用 request使用json传参 headers={"Cookie": "", "Content-Type": "application/json"},
-
data={"test": "test"},
-
param_list=[
-
{"title": "更新学生信息-张三", "data": {"name": "张三"}, "assert": {"code": 0, "msg": "ok"}},
-
{"title": "更新学生信息-李四", "data": {"name": "李四"}, "assert": {"code": 0, "msg": "okk"}},
-
{"title": "更新学生信息-王五", "data": {"name": "王五"}, "assert": {"code": 0, "msg": "ok"}}
-
])}
调试运行入口
-
# run.pyimport pytestimport os# 用例调试入口if __name__ == '__main__':
-
pytest.main([r"line_of_business_name", "--clean-alluredir", "--alluredir=allure_result", "--cache-clear", "--env=prod"])
-
# pytest.main([r"-m multiprocess", "--clean-alluredir", "--alluredir=allure_result", "-n 3", "--cache-clear", "--env=prod"]) os.system(r"allure generate allure_result -c -o allure_report")
-
os.system(r"allure open -h 127.0.0.1 -p 8899 allure_report")
失败用例重跑
-
# failed_run.pyimport pytestimport os# 失败用例重跑if __name__ == '__main__':
-
pytest.main([r"line_of_business_name", "--lf", "--clean-alluredir", "--alluredir=allure_result", "--env=prod"])
-
os.system(r"allure generate allure_result -c -o allure_report")
-
os.system(r"allure open -h 127.0.0.1 -p 8899 allure_report")
报告展示
感谢每一个认真阅读我文章的人,礼尚往来总是要有的,虽然不是什么很值钱的东西,如果你用得到的话可以直接拿走:
这些资料,对于【软件测试】的朋友来说应该是最全面最完整的备战仓库,这个仓库也陪伴上万个测试工程师们走过最艰难的路程,希望也能帮助到你!有需要的小伙伴可以点击下方小卡片领取