pytest+request+allure接口自动化框架搭建分享

介绍分享一个接口自动化框架搭建方法 (pytest+request+allure),这个方案是由 xpcs  同学在TesterHome社区网站的分享。

写在前面

去年11月被裁,到现在还没上岸,gap 半年了。上岸无望,专业技能不能落下,花了两三天时间,把之前工作中搭建使用的接口自动化框架,重写了一套。

楼主代码菜鸡,代码可能比较 low - -,希望通过本次分享,给社区里想写接口自动化的同学一些借鉴,也希望社区里的大神多给一些优化建议,大家互帮互助,共同进步~

  • 框架基于 python 语言,框架使用 pytest,报告使用 allure

  • 支持多环境运行,通过命令行传参区分

  • 支持多进程跑测,用例需独立无依赖,conftest.py 中包含多进程下只运行一次的 fileLock fixture

  • 支持数据库连接单例,一个库在一个进程下只会建立一次连接

  • 支持 mysql、redis 操作

  • 支持 get、post、put、delete 请求方法,请求是通过用例的请求头 Content-Type 来区分,是使用 params、data 还是 json 传参

  • 支持参数化数据驱动,用参数化参数字典,去更新通用参数字典,更新后发起请求

以下使用 windows 环境

conda 配置和新建工程:

安装conda
https://www.anaconda.com/download/success

新建工程,在 pycharm 中新建 conda 虚拟环境

图片

安装 allure 报告

https://repo.maven.apache.org/maven2/io/qameta/allure/allure-commandline/2.29.0/

图片

解压并配置环境变量
D:\allure\allure-2.29.0

图片

图片

cmd 命令行,验证 allure 安装

图片

设置 pycharm 文件编码 UTF-8

图片

依赖安装

pycharm 命令行运行
激活虚拟环境
conda activate pytest_api_auto
安装 python3.8
conda install python=3.8
安装依赖包
pip install requests
pip install jsonpath
pip install pytest
pip install allure-pytest
pip install pytest-sugar
pip install pytest-xdist
pip install pytest-assume
pip install pymysql
pip install redis
pip install faker
pip install filelock

目录划分

图片

以下是源码部分

封装log日志工具类

 
  1. # common/log_util.pyimport loggingimport os# create loggerlog = logging.getLogger("pytest_api_auto")log.setLevel(logging.INFO)# create file handler

  2. # mode 默认为a追加模式,如果修改为w为覆盖模式,多进程运行会出现日志缺失和错乱

  3. # 获取项目根目录拼接,日志会存在工程根目录pytest.log 每次运行追加写入fh = logging.FileHandler(os.path.join(os.path.dirname(os.path.dirname(__file__)), "pytest.log"),

  4. mode='a', encoding='UTF-8')fh.setLevel(logging.INFO)# create stream handlersh = logging.StreamHandler(stream=None)# create formatterfmt = "%(asctime)s - %(filename)s - %(funcName)s - %(lineno)d - %(levelname)s - %(message)s"formatter = logging.Formatter(fmt)# add handler and formatter to loggerfh.setFormatter(formatter)sh.setFormatter(formatter)log.addHandler(fh)log.addHandler(sh)

封装环境配置

 
  1. config/env.pyfrom common.log_util import logclass ENV:

  2. # 环境信息:test 测试 prod 准生产 # 从pytest命令行获取 info = None# 测试环境服务域名配置class UrlTestConfig:

  3. api_backend = "http://api_backend.cn:8899"# 准生产环境服务域名配置class UrlProdConfig:

  4. api_backend = "http://api_backend.cn:8899"def get_url(server_name):

  5. if ENV.info == "test":

  6. url = getattr(UrlTestConfig, server_name)

  7. log.info(f"测试环境获取服务域名 - {server_name} : {url}")

  8. return url

  9. elif ENV.info == "prod":

  10. url = getattr(UrlProdConfig, server_name)

  11. log.info(f"准生产环境获取服务域名 - {server_name} : {url}")

  12. return url

  13. else:

  14. raise Exception("--env 环境信息有误")

封装 mysql 操作工具类

 
  1. # common/mysql_util.py

  2. # 装饰器,同一个mysql数据库只建立一次连接import pymysqlfrom time import sleepfrom common.log_util import log# 装饰器,同一个mysql数据库只建立一次连接def decorate_single(cls):

  3. connect_list = {}

  4. def wrapper(*args, **kwargs):

  5. nonlocal connect_list

  6. db_name = args[0]["db"]

  7. if db_name not in connect_list:

  8. connect_list[db_name] = cls(*args, **kwargs)

  9. log.info(f"建立mysql连接并返回 - {db_name}")

  10. else:

  11. log.info(f"mysql连接已建立,直接返回 - {db_name}")

  12. return connect_list[db_name]

  13. return wrapper@decorate_singleclass MySql:

  14. def __init__(self, db_config: dict):

  15. """

  16. :params: db_config 数据库配置 类型为字典

  17. """

  18. # 数据库配置 # autocommit: True 选项很关键,如果不设置,新增数据无法查出 # mysql默认数据引擎是innodb 默认数据隔离级别重复读,如果事务不提交,那么每次查询,查询都是同一块数据快照 self.conn = None

  19. while True:

  20. try:

  21. self.conn = pymysql.connect(**db_config)

  22. break

  23. # 数据库连接,偶尔会连接不上 # 报错 pymysql.err.OperationalError: (2013, 'Lost connection to MySQL server during query') # 解决办法,就是重新连接 except pymysql.err.OperationalError:

  24. log.warning("连接失败,可能环境不稳定,重新连接!")

  25. sleep(1)

  26. except Exception as e:

  27. log.warning("获取mysql连接失败!请检查数据库配置或网络连接")

  28. raise e

  29. def fetchone(self, sql_str: str):

  30. """

  31. :params: sql_str 数据库sql

  32. :return: 返回查询结果的一条记录,类型是字典; 若未查询到,则返回None

  33. """

  34. try:

  35. with self.conn.cursor(pymysql.cursors.DictCursor) as cursor:

  36. log.info(f"执行sql: {sql_str}")

  37. cursor.execute(sql_str)

  38. data = cursor.fetchone()

  39. log.info(f"sql执行结果: {data}")

  40. return data

  41. except Exception as e:

  42. log.warning("执行sql失败!")

  43. raise e

  44. def fetchall(self, sql_str: str):

  45. """

  46. :params: sql_str 数据库sql

  47. :return: 返回查询结果的全部记录,类型是列表,列表元素为字典

  48. """

  49. try:

  50. with self.conn.cursor(pymysql.cursors.DictCursor) as cursor:

  51. log.info(f"执行sql: {sql_str}")

  52. cursor.execute(sql_str)

  53. data = cursor.fetchall()

  54. log.info(f"sql执行结果: {data}")

  55. return data

  56. except Exception as e:

  57. log.warning("执行sql失败!")

  58. raise e

  59. def execute_dml(self, sql_str):

  60. """

  61. function: 执行insert、update、delete

  62. :param sql_str 数据库sql

  63. :return: 无返回

  64. """

  65. try:

  66. with self.conn.cursor(pymysql.cursors.DictCursor) as cursor:

  67. log.info(f"执行sql: {sql_str}")

  68. data = cursor.execute(sql_str)

  69. # 提交操作,我们配置连接是自动提交,所以下面提交步骤也可省略 self.conn.commit()

  70. log.info(f"sql执行结果: {data}")

  71. except Exception as e:

  72. log.warning("执行sql失败!")

  73. raise e

  74. def close(self):

  75. """

  76. function:关闭数据库连接

  77. params: conn 数据库连接

  78. """

  79. self.conn.close()

封装 mysql 配置和连接获取

 
  1. # config/mysql.pyfrom common.mysql_util import MySqlfrom config.env import ENVfrom common.log_util import log# 数据库连接配置class MysqlTestConfig:

  2. """Mysql测试环境配置"""

  3. api_auto = {'host': 'localhost', 'port': 3306,

  4. 'db': 'api_auto', 'user': 'root',

  5. 'password': 'root', 'autocommit': True

  6. }class MysqlProdConfig:

  7. """Mysql准生产环境配置"""

  8. api_auto = {'host': 'localhost', 'port': 3306,

  9. 'db': 'api_auto', 'user': 'root',

  10. 'password': 'root', 'autocommit': True

  11. }def get_mysql_conn(db_name):

  12. if ENV.info == "test":

  13. log.info("测试环境建立mysql连接 - " + db_name)

  14. return MySql(getattr(MysqlTestConfig, db_name))

  15. elif ENV.info == "prod":

  16. log.info("准生产环境建立mysql连接 - " + db_name)

  17. return MySql(getattr(MysqlProdConfig, db_name))

  18. else:

  19. raise Exception("--env 环境信息有误")

封装 redis 工具类

 
  1. # common/redis_util.pyimport redisfrom common.log_util import log# 装饰器,同一个redis只建立一次连接def decorate_single(cls):

  2. connect_list = {}

  3. def wrapper(*args, **kwargs):

  4. nonlocal connect_list

  5. host = args[0]["host"]

  6. if host not in connect_list:

  7. connect_list[host] = cls(*args, **kwargs)

  8. log.info(f"建立redis连接并返回 - {host}")

  9. else:

  10. log.info(f"redis连接已建立,直接返回 - {host}")

  11. return connect_list[host]

  12. return wrapper@decorate_singleclass Redis:

  13. def __init__(self, db_config):

  14. """

  15. :params: db_config 数据库配置 类型为字典

  16. """

  17. self.pool = redis.ConnectionPool(**db_config)

  18. self.rs = redis.Redis(connection_pool=self.pool)

  19. def del_key(self, key):

  20. """

  21. :param key: redis key str字符类型

  22. :return: 删除成功返回 True 否则 False

  23. """

  24. log.info(f"redis 删除key {key}")

  25. if self.rs.delete(key) == 1:

  26. log.info(f"key {key} 删除成功")

  27. return True

  28. else:

  29. log.warning(f"key: {key} 不存在!")

  30. return False

  31. def del_keys(self, keys_pattern):

  32. """

  33. :param keys_pattern: key通配符 str字符类型 ex: *name*

  34. :return:删除成功返回 True 否则 False

  35. """

  36. log.info(f"redis 删除keys 通配符 {keys_pattern}")

  37. keys = self.rs.keys(keys_pattern)

  38. if keys:

  39. log.info(f"redis 删除keys {keys}")

  40. for k in keys:

  41. self.rs.delete(k)

  42. log.info(f"keys {keys} 删除成功")

  43. return True

  44. else:

  45. log.warning("通配符未匹配到key!")

  46. return False

  47. def set(self, key, value, ex=8 * 60 * 60):

  48. """

  49. 操作str类型

  50. :param key: redis key str字符类型

  51. :param value: str字符类型

  52. :param ex: 数据超时时间,默认8小时

  53. return: 写入成功返回 True

  54. """

  55. log.info(f"redis str类型 数据写入 key: {key} value: {value}")

  56. return self.rs.set(key, value, ex=ex)

  57. def get(self, key):

  58. """

  59. 操作str类型

  60. :param key: redis key str字符类型

  61. :return: 获取到返回str字符类型 # 未获取到返回 None

  62. """

  63. data = self.rs.get(key)

  64. log.info(f"redis str类型 数据获取 key: {key} value: {data}")

  65. return data

  66. def lrange(self, key):

  67. """

  68. 操作list类型

  69. :param key: redis key str字符类型

  70. return: 获取到返回list列表类型 # 未获取到返回空列表 []

  71. """

  72. data = self.rs.lrange(key, 0, -1)

  73. log.info(f"redis list类型 数据获取 key: {key} values: {data}")

  74. return data

  75. def smembers(self, key):

  76. """

  77. 操作 set 集合

  78. :param key: redis key str字符类型

  79. return: 获取到返回set集合类型 # 未获取到返回空集合 set()

  80. """

  81. data = self.rs.smembers(key)

  82. log.info(f"redis set类型 数据获取 key: {key} values: {data}")

  83. return data

  84. def zrange(self, key):

  85. """

  86. 操作 zset 有序集合

  87. :param key: redis key str字符类型

  88. return: 获取到返回list列表类型 # 未获取到返回空列表 []

  89. """

  90. data = self.rs.zrange(key, 0, -1)

  91. log.info(f"redis zset类型 数据获取 key: {key} values: {data}")

  92. return data

  93. # hash 操作 hset hget 后续可扩展

  94. def close(self):

  95. """

  96. function:关闭数据库连接

  97. params: rs Redis对象

  98. """

  99. self.rs.close()

封装 redis 配置和连接获取

 
  1. # config/redis.pyfrom common.redis_util import Redisfrom config.env import ENVfrom common.log_util import logclass RedisTestConfig:

  2. api_backend = {'host': 'api_backend.cn', 'password': 'redis123',

  3. 'port': 6379, 'db': 0, 'decode_responses': True}class RedisProdConfig:

  4. api_backend = {'host': 'api_backend.cn', 'password': 'redis123',

  5. 'port': 6379, 'db': 0, 'decode_responses': True}def get_redis_conn(name):

  6. if ENV.info == "test":

  7. log.info("测试环境建立redis连接 - " + name)

  8. return Redis(getattr(RedisTestConfig, name))

  9. elif ENV.info == "prod":

  10. log.info("准生产环境建立redis连接 - " + name)

  11. return Redis(getattr(RedisProdConfig, name))

  12. else:

  13. raise Exception("--env 环境信息有误")

封装 requests 工具类

 
  1. # common/requests_util.pyimport requestsfrom common.log_util import logdef send_request(url, method, data, headers, **kwargs):

  2. """

  3. :param url: 请求域名 类型 str ex: http://xxx.com/path

  4. :param method: 请求方法 类型 str 暂时支持 get、post、put、delete

  5. :param data: 请求数据,类型 dict、list、str

  6. :param headers: 请求头,类型 dict

  7. :param kwargs: 扩展支持 files 上传文件、proxy 代理等

  8. :return:

  9. """

  10. if not url.startswith("http://") and not url.startswith("https://"):

  11. raise Exception("请求url缺少协议名")

  12. if method.lower() not in ("get", "post", "put", "delete"):

  13. raise Exception(f"暂不支持请求方法 - {method} - 可后续扩展")

  14. log.info("请求参数:")

  15. log.info(f"url: {url}")

  16. log.info(f"method: {method}")

  17. log.info(f"data: {data}")

  18. log.info(f"headers: {headers}")

  19. log.info(f"kwargs: {kwargs}")

  20. try:

  21. if "Content-Type" in headers.keys():

  22. # headers 包含传参类型 if headers["Content-Type"] in ("application/x-www-form-urlencoded", "multipart/form-data"):

  23. res = requests.request(url=url, method=method, data=data, headers=headers, timeout=30, **kwargs)

  24. elif headers["Content-Type"] == "application/json":

  25. res = requests.request(url=url, method=method, json=data, headers=headers, timeout=30, **kwargs)

  26. else: # 若非上面三种类型,默认使用json传参 text/html, text/plain等,可后续扩展验证 res = requests.request(url=url, method=method, json=data, headers=headers, timeout=30, **kwargs)

  27. else:

  28. # 请求头没指定传参类型Content-Type,则使用params传参,即在url中传参,如get请求 res = requests.request(url=url, method=method, params=data, headers=headers, timeout=30, **kwargs)

  29. except Exception as e:

  30. log.warning("请求发生异常!!!")

  31. raise e

  32. if res.status_code == 200:

  33. log.info("请求成功")

  34. log.info("响应参数:")

  35. log.info(f"{res.text}")

  36. else:

  37. log.warning(f"请求失败!!! 返回码不为200, 状态码为: {res.status_code}")

  38. log.warning(f"响应参数:")

  39. log.warning(f"text: {res.text}")

  40. log.warning(f"raw: {res.raw}")

  41. raise Exception("返回码不为200")

  42. try:

  43. # 返回为字典类型 return res.json()

  44. except requests.exceptions.JSONDecodeError:

  45. log.warning("响应参数不为json,返回响应 response对象")

  46. return res

封装用例数据解析

 
  1. # common/data_parser.pyfrom config.env import get_urlfrom common.request_util import send_requestimport jsonpathfrom common.log_util import logdef parser(server_name, data_dict, param=None, **kwargs):

  2. """

  3. :param server_name: env.py 中的服务域名 类型str ex: api_backend

  4. :param data_dict: test_xxx.py测试用例对应data.py中的接口请求数据字典 ex: api_backend["get_student"]

  5. :param param: data.py 参数化列表中一项中的data ex: api_backend["get_student"]["param_list"][0]["data"]

  6. :**kwargs: 扩展参数

  7. :return: 请求结果,如果响应是json类型返回dict,否则返回response对象

  8. """

  9. # 获取配置中的服务器域名,拼接path url = get_url(server_name) + data_dict["path"]

  10. method = data_dict["method"]

  11. headers = data_dict["headers"]

  12. data = data_dict["data"]

  13. # 参数化后发起请求,用参数化参数更新或替代通用参数 if param:

  14. if isinstance(data, dict) and isinstance(param, dict):

  15. # 如果通用参数为字典,参数化参数也为字典,使用参数化参数更新通用参数 ex: {"xx": "xx"} data.update(param)

  16. else:

  17. # 如果通用参数是字符串、列表(元素为字符、数字、字典),直接使用参数化参数据替换通用参数 ex: ["xx", "xx"] data = param

  18. res = send_request(url, method, data, headers, **kwargs)

  19. return resdef assert_res(res_dict, expect_dict):

  20. """

  21. :param res_dict: request请求返回的结果字典,类型 dict

  22. :param expect_dict: 预期结果字典, 类型 dict

  23. """

  24. if isinstance(res_dict, dict):

  25. log.info("开始断言")

  26. log.info(f"预期结果: {expect_dict}")

  27. # 遍历预期结果的key,使用jsonpath获取请求结果的value,与预期结果value比对 for k in expect_dict.keys():

  28. res_list = jsonpath.jsonpath(res_dict, '$..' + str(k)) # 返回列表 assert expect_dict[k] in res_list

  29. log.info("断言通过")

  30. else:

  31. log.warning("请求结果不为dict字典类型,跳过断言!")

封装 faker 模拟数据

 
  1. # common/faker.pyfrom faker import Fakerfrom common.log_util import logfake = Faker("zh_CN")def get_name():

  2. name = fake.name()

  3. log.info(f"faker 生成姓名: {name}")

  4. return namedef get_phone_number():

  5. phone_number = fake.phone_number()

  6. log.info(f"faker 生成手机号: {phone_number}")

  7. return phone_numberdef get_id_card():

  8. id_card = fake.ssn()

  9. log.info(f"faker 生成身份证号: {id_card}")

  10. return id_card

pytest.ini

 
  1. [pytest]addopts = -p no:warnings -vsmarkers =

  2. multiprocess: suppurt mutl-process execute cases

全局 conftest.py

 
  1. # conftest.pyimport pytestfrom common.log_util import logfrom filelock import FileLockimport jsonfrom config.env import ENVimport osimport allure# 自定义环境信息pytest命令行def pytest_addoption(parser):

  2. parser.addoption(

  3. "--env",

  4. action="store",

  5. default="test",

  6. help="set pytest running environment ex: --env=test --env=prod"

  7. )# 从pytest命令行获取环境信息@pytest.fixture(scope="session")def get_env(request):

  8. ENV.info = request.config.getoption("--env")

  9. log.info("运行环境: " + ENV.info)

  10. return ENV.info# 终结函数,最后执行@pytest.fixture(scope="session", autouse=True)def fixture_case_end(request):

  11. def case_end():

  12. log.info("测试结束")

  13. request.addfinalizer(case_end)@pytest.fixture(scope="session", autouse=True)# fixture 嵌套先执行获取环境信息get_env

  14. # 加入 tmp_path_factory worker_id 用于多进程执行 # 多进程运行,token只获取一次def fixture_get_token(get_env, tmp_path_factory, worker_id):

  15. # 单进程执行 if worker_id == "master":

  16. # 获取token token = {"token": "xpcs"}

  17. log.info("fixture_get_token master获取token %s" % token['token'])

  18. else:

  19. # 多进程执行 root_tmp_dir = tmp_path_factory.getbasetemp().parent

  20. fn = root_tmp_dir / "data.json"

  21. # 这里with里面的语句,理解为是被加锁的,同一时间只能有一个进程访问 with FileLock(str(fn) + ".lock"):

  22. if fn.is_file():

  23. # session_fixture 获取token已执行,直接从文件中读取token token = json.loads(fn.read_text())

  24. log.info("fixture_get_token slave使用token %s" % token['token'])

  25. else:

  26. token = {"token": "xpcs"}

  27. fn.write_text(json.dumps(token))

  28. log.info("fixture_get_token slave获取token %s" % token['token'])

  29. yield token['token']

  30. # session 结束后自动执行如下 log.info("session结束")# 用例失败自动执行钩子函数@pytest.hookimpl(tryfirst=True, hookwrapper=True)def pytest_runtest_makereport(item):

  31. # 获取钩子方法的调用结果 outcome = yield

  32. rep = outcome.get_result()

  33. # 仅仅获取用例call 执行结果是失败的情况, 不包含 setup/teardown if rep.when == "call" and rep.failed:

  34. mode = "a" if os.path.exists("failures") else "w"

  35. with open("failures", mode) as f:

  36. # let's also access a fixture for the fun of it if "tmpdir" in item.fixturenames:

  37. extra = " (%s)" % item.funcargs["tmpdir"]

  38. else:

  39. extra = ""

  40. f.write(rep.nodeid + extra + "\n")

  41. with allure.step("用例运行失败,可加入信息"):

  42. allure.attach("失败内容: ----xpcs----", "失败标题", allure.attachment_type.TEXT)

测试用例

 
  1. # line_of_business/service_name_api_backend/test_api_backend.pyimport pytestfrom time import sleepimport allurefrom common.data_parser import parser, assert_resfrom config.mysql import get_mysql_connfrom common.log_util import logfrom line_of_business_name.service_name_api_backend.data import api_backendfrom common.faker_util import get_name, get_id_card, get_phone_numberfrom config.redis import get_redis_conn@allure.feature("flask后端接口测试")class TestApiBackend:

  2. @classmethod

  3. def setup_class(cls):

  4. # 获取数据库连接,执行sql测试 log.info("setup_class")

  5. # 数据库连接根据db库名单例,相同库返回同一个连接 conn = get_mysql_conn("api_auto")

  6. conn1 = get_mysql_conn("api_auto")

  7. conn.execute_dml("insert into test_xdist(msg) values ('%s')" % "class_setup-数据库写入测试")

  8. conn1.fetchone("select * from test_xdist limit 1")

  9. @classmethod

  10. def teardown_class(cls):

  11. log.info("steup_teardowm")

  12. # 获取redis连接,执行命令测试 # redis连接根据host单例,相同host返回同一个连接 rs = get_redis_conn("api_backend")

  13. rs1 = get_redis_conn("api_backend")

  14. rs.set("name", "xp")

  15. rs1.get("name")

  16. @allure.story("测试故事1")

  17. @pytest.mark.xfail(reason='预期失败用例')

  18. @user12ize("param", [{"title": "标题1", "param": 2, "assert": 3}])

  19. def test_case_one(self, param):

  20. sleep(1)

  21. allure.dynamic.description("测试故事1-描述信息")

  22. allure.dynamic.severity(allure.severity_level.CRITICAL) # 用例级别严重 # allure动态标题 allure.dynamic.title(param["title"])

  23. log.info("测试faker数据")

  24. log.info(f"{get_name()} {get_phone_number()} {get_id_card()}")

  25. # pytest.assume(False) # 多重断言插件,断言失败继续执行下面 assert param["param"] + 2 == param["assert"]

  26. @allure.story("查询学生接口")

  27. @user14cess # 此用例分组到可多进程跑测 @user15ize("param", api_backend["get_student"]["param_list"])

  28. def test_get_student(self, param, fixture_get_token):

  29. sleep(1)

  30. allure.dynamic.title(param["title"])

  31. data_dict = api_backend["get_student"]

  32. data_dict["headers"]["Cookie"] = fixture_get_token

  33. res = parser("api_backend", data_dict, param["data"])

  34. assert_res(res, param["assert"])

  35. @allure.story("新增学生接口")

  36. @user17cess # 此用例分组到可多进程跑测 @user18ize("param", api_backend["post_student"]["param_list"])

  37. def test_post_student(self, param):

  38. sleep(1)

  39. allure.dynamic.title(param["title"])

  40. data_dict = api_backend["post_student"]

  41. res = parser("api_backend", data_dict, param["data"])

  42. assert_res(res, param["assert"])

  43. @allure.story("更新学生接口")

  44. @user20cess # 此用例分组到可多进程跑测 @user21ize("param", api_backend["put_student"]["param_list"])

  45. def test_put_student(self, param):

  46. sleep(1)

  47. allure.dynamic.title(param["title"])

  48. data_dict = api_backend["put_student"]

  49. res = parser("api_backend", data_dict, param["data"])

  50. assert_res(res, param["assert"])

用例数据驱动

 
  1. # line_of_business/service_name_api_backend/data.py

  2. # 服务名外层大字典,参数key是接口名,value是接口的请求信息字典,用例模块可通过接口名引用接口信息字典

  3. # param_list 参数化列表,用于pytest参数化,每次选取其中一项的data,去更新外部data通用参数,发起请求api_backend = {

  4. "get_student": dict(path="/student",

  5. method="get",

  6. # headers 不包含Content-Type 则request使用params传参 headers={},

  7. # 通用参数,每次请求使用 data={"test": "test"},

  8. # 参数化参数,每次使用其中一项,更新通用参数 param_list=[

  9. {"title": "获取学生信息-张三", "data": {"name": "张三"}, "assert": {"code": 0, "msg": "ok"}},

  10. {"title": "获取学生信息-李四", "data": {"name": "李四"}, "assert": {"code": 0, "msg": "ok"}},

  11. {"title": "获取学生信息-王五", "data": {"name": "王五"}, "assert": {"code": 0, "msg": "ok"}}

  12. ]),

  13. 'post_student': dict(path="/student",

  14. method="post",

  15. # headers Content-Type = application/x-www-form-urlencoded 则使用 request使用data传参 headers={"Cookie": "", "Content-Type": "application/x-www-form-urlencoded"},

  16. data={"test": "test"},

  17. param_list=[

  18. {"title": "新增学生信息-张三", "data": {"name": "张三"}, "assert": {"code": 1, "msg": "ok"}},

  19. {"title": "新增学生信息-李四", "data": {"name": "李四"}, "assert": {"code": 0, "msg": "ok"}},

  20. {"title": "新增学生信息-王五", "data": {"name": "王五"}, "assert": {"code": 0, "msg": "ok"}}

  21. ]),

  22. 'put_student': dict(path="/student",

  23. method="put",

  24. # headers Content-Type = application/json 则使用 request使用json传参 headers={"Cookie": "", "Content-Type": "application/json"},

  25. data={"test": "test"},

  26. param_list=[

  27. {"title": "更新学生信息-张三", "data": {"name": "张三"}, "assert": {"code": 0, "msg": "ok"}},

  28. {"title": "更新学生信息-李四", "data": {"name": "李四"}, "assert": {"code": 0, "msg": "okk"}},

  29. {"title": "更新学生信息-王五", "data": {"name": "王五"}, "assert": {"code": 0, "msg": "ok"}}

  30. ])}

调试运行入口

 
  1. # run.pyimport pytestimport os# 用例调试入口if __name__ == '__main__':

  2. pytest.main([r"line_of_business_name", "--clean-alluredir", "--alluredir=allure_result", "--cache-clear", "--env=prod"])

  3. # pytest.main([r"-m multiprocess", "--clean-alluredir", "--alluredir=allure_result", "-n 3", "--cache-clear", "--env=prod"]) os.system(r"allure generate allure_result -c -o allure_report")

  4. os.system(r"allure open -h 127.0.0.1 -p 8899 allure_report")

失败用例重跑

 
  1. # failed_run.pyimport pytestimport os# 失败用例重跑if __name__ == '__main__':

  2. pytest.main([r"line_of_business_name", "--lf", "--clean-alluredir", "--alluredir=allure_result", "--env=prod"])

  3. os.system(r"allure generate allure_result -c -o allure_report")

  4. os.system(r"allure open -h 127.0.0.1 -p 8899 allure_report")

报告展示

 

感谢每一个认真阅读我文章的人,礼尚往来总是要有的,虽然不是什么很值钱的东西,如果你用得到的话可以直接拿走:

这些资料,对于【软件测试】的朋友来说应该是最全面最完整的备战仓库,这个仓库也陪伴上万个测试工程师们走过最艰难的路程,希望也能帮助到你!有需要的小伙伴可以点击下方小卡片领取   

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/9779.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

Linux之gdb的收尾部分

Linux之gdb的收尾部分 gbc常见指令的使用 gdb的调试

数据冒险-add x1, x1, x2 add x1, x1, x3 add x1, x1, x4

第一张图没有传递机制 竞争情况分析 读后写(RAW)竞争:当某条指令需要读取一个寄存器的值,而该寄存器的值尚未被前面的指令写入时,就会发生这种竞争。 指令2(dadd r1, r1, r3)依赖于指令1&#…

[产品管理-61]:马斯洛需求层次与产品的情感化设计

目录 一、概述 1、马斯洛需求层次理论概述 2、产品情感化设计与马斯洛需求层次的关系 3、产品情感化设计的实践案例 二、马斯洛需求层次与用户情感程度(本能、行为、反思)的关系 1、马斯洛需求层次与用户情感程度概述 2、马斯洛需求层次与用户情感…

浮动路由:实现出口线路的负载均衡冗余备份。

浮动路由 Tip:浮动路由指在多条默认路由基础上加入优先级参数,实现出口线路冗余备份。 ip routing-table //查看路由表命令 路由优先级参数:越小越优 本次实验测试两条默认路由,其中一条默认路由添加优先级参数,设置…

一阶 RC 低通滤波器实验方案

一阶 RC 低通滤波电路采用 RC 串联电路,把 R 或 C 做为负载端,对负载端与输入端的信 号做比较得到电路的特性曲线。图 1 所示 RC 串联电路构成一个双口网络, 根据图 1,其负载端开路时电容电压对输入电压的转移电压比为 这是一个…

华为私有接口类型hybrid

华为私有接口类型hybrid Tip&#xff1a;hybrid类型&#xff0c;简称混合型接口。 本次实验模拟2层网络下 vlan10 vlan20 不能互访&#xff0c;vlan10 vlan20 同时可以访问vlan100 sw1配置如下&#xff1a; <Huawei>sy [Huawei]sys sw1 [sw1]vl ba 10 20 100 [sw1]int…

006— 爬取第一考试网试题

import requests import logging import parsel import re import os#京东异步加载的反爬要求提供origin的信息 headers {user-agent: Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.87 Safari/537.36 SE 2.X MetaSr 1.0}lo…

【分布式】分布式锁设计与Redisson源码解析

分布式锁 分布式锁是一种在分布式计算环境中用于控制多个节点&#xff08;或多个进程&#xff09;对共享资源的访问的机制。在分布式系统中&#xff0c;多个节点可能需要协调对共享资源的访问&#xff0c;以防止数据的不一致性或冲突。分布式锁允许多个节点在竞争访问共享资源…

【架构设计常见技术】

EJB EJB是服务器端的组件模型&#xff0c;使开发者能够构建可扩展、分布式的业务逻辑组件。这些组件运行在EJB容器中&#xff0c;EJB将各功能模块封装成独立的组件&#xff0c;能够被不同的客户端应用程序调用&#xff0c;简化开发过程&#xff0c;支持分布式应用开发。 IOC …

万字长文深度解读Movie Gen技术原理(5部曲):图像视频联合生成模型 (2)

​引言 简介 图像和视频基础模型 时间自编码器(TAE) 训练目标 骨干架构 文本嵌入和视觉-文本生成 空间上采样 模型扩展和训练效率 预训练 预训练数据 训练 微调STF 微调数据集创建 监督微调&模型平均 推理 推理提示重写 提高推理效率 评估 评估维度 评估基准…

基于MATLAB的农业病虫害识别研究

matlab有处理语音信号的函数wavread&#xff0c;不过已经过时了&#xff0c;现在处理语音信号的函数名称是audioread选取4.wav进行处理&#xff08;只有4的通道数为1&#xff09; 利用hamming窗设计滤波器 Ham.m function [N,h,H,w] Ham(fp,fs,fc)wp 2*pi*fp/fc;ws 2*pi*…

KEIL编译后直接生成bin文件

KEIL编译后直接生成bin文件 fromelf --bin -o "$LL.bin" "$LL.axf"表示在“与axf相同的文件夹”下生成bin文件。

解析广告联盟的玩法、功能及注意事项

广告联盟是一种商业模式&#xff0c;通过联合多个站点或平台&#xff0c;共同向广告商提供广告展示和推广服务。在这篇文章中&#xff0c;我将重点介绍什么是广告联盟&#xff0c;广告联盟的玩法、功能及注意事项&#xff0c;帮助商业模式策划师更好地了解和应用该模式。 一、…

GitHub中搜索项目方法

0 Preface/Foreword 1 搜索方法 1.1 项目介绍 如上截图&#xff0c;一个项目包含的基本信息&#xff1a; 项目名项目简介项目介绍Watch数量&#xff0c;接收邮件提醒Star数量&#xff0c;关注&#xff0c;subscribeFork数量&#xff0c;在repo中创建分支 1.2 限定项目名查找…

基于java+SpringBoot+Vue的洗衣店订单管理系统设计与实现

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; Springboot mybatis Maven mysql5.7或8.0等等组成&#x…

简述kafka集群中的Leader选举机制

Kafka 集群中有一个 broker 的 Controller 会被选举为 Controller Leader&#xff0c;负责管理集群broker 的上下线&#xff0c;所有 topic 的分区副本分配和 Leader 选举等工作。 Controller 的信息同步工作是依赖于 Zookeeper 的。 &#xff08;1&#xff09;创建一个新的 t…

OpenGl绘制了一个雪人

#include <GL/glut.h> #include <math.h>const int n 1000; int q; //圆的半径 int m, p;//圆心 const GLfloat R 0.5f; const GLfloat Pi 3.1415926536f;//初始化OpenGL void init(void) {glClearColor(0.0f, 0.0f, 0.0f, 0.0f);//设置背景颜色glShadeModel(G…

Golang进阶

1.面向对象 1.1.golang语言面向对象编程说明 Golang 也支持面向对象编程(OOP)&#xff0c;但是和传统的面向对象编程有区别&#xff0c;并不是纯粹的面向对象语言。所以我们说 Golang 支持面向对象编程特性是比较准确的。Golang 没有类(class)&#xff0c;Go 语言的结构体(st…

kafka面试夺命连环三十问(上篇)

1、kafka消息发送的流程&#xff1f; 在消息发送的过程中&#xff0c;涉及到两个线程--main线程和sender线程。在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发送给RecordAccumulator&#xff0c;然后sender线程不断从双端队列RecordAccumulator 拉取消息发…

【linux】再谈网络基础(二)

8. 再谈端口号 &#xff08;一&#xff09;与协议之间的关系 端口号(Port)标识了一个主机上进行通信的不同的应用程序 在TCP/IP协议中, 用 "源IP", "源端口号", "目的IP", "目的端口号", "协议号" 这样一个五元组来标识…