美文网首页
Locust批量执行

Locust批量执行

作者: Lutous | 来源:发表于2021-01-27 16:07 被阅读0次
背景

运维时,往往需要对多个接口做性能测试。Locust只能对单个py文件测试,若不优化需人工处理每个文件。

实现功能
  1. 执行指定目录下py文件中的指定标签
  2. log、csv文件保存到指定目录下
def get_file_tags(case_path):
    """
    获取指定py文件中的标签值,以列表的形式返回
    注:文件中不能存在已注释的tag标签
    :param case_path: py文件路径
    :return: 标签列表
    """
    version_list = set()
    with open(case_path, mode='r', encoding="utf-8") as project_file:
        file_str = project_file.readlines()
        for line_str in file_str:
            if re.search("@tag", line_str):
                version_list.update(re.findall(r"v\d+", line_str))
    return version_list
LOCUST_ALL_FILE_ALL_TAG, LOCUST_ALL_FILE_SELECTED_TAG, LOCUST_SELECTED_FILE_ALL_TAG, LOCUST_SELECTED_FILE_SELECTED_TAG\
    = ["all_file_all_tag", "all_file_selected_tag", "selected_file_all_tag", "selected_file_selected_tag"]


def make_out_put_dir():
    """
    创建输出文件文件夹
    :return: log路径,csv路径
    """
    now_time_str = datetime.now().strftime("%Y%m%d_%H%M%S")
    base_log_dir = os.path.join(base_dir, "Outputs", 'logs')
    base_csv_dir = os.path.join(base_dir, "Outputs", 'reports')

    # 创建输出文件夹
    os.mkdir(os.path.join(base_log_dir, now_time_str))
    os.mkdir(os.path.join(base_csv_dir, now_time_str))

    return os.path.join(base_log_dir, now_time_str), os.path.join(base_csv_dir, now_time_str)


def get_case_file_name(case_directory):
    """
    递归函数,获取指定目录下的所有的load_*.py文件目录列表
    """
    case_file_list = list()
    if not os.path.isdir(case_directory):
        return case_file_list
    for child_file in os.listdir(case_directory):
        if os.path.isdir(os.path.join(case_directory, child_file)):
            case_file_list.extend(get_case_file_name(os.path.join(case_directory, child_file)))
        else:
            ret = re.match(r"^load_.*\.py$", child_file)
            if ret:
                case_file_list.append(os.path.join(case_directory, ret.group()))
    return case_file_list


def write_case_to_file(case_directory):
    """
    将指定目录下要执行的load_*.py文件都记录在跟目录下的case_file.txt里面。
    """
    case_file_list = get_case_file_name(case_directory)
    with open(os.path.join(base_dir, "case_file.txt"), mode='w') as file:
        for case_name in case_file_list:
            file.write("{}{}".format(case_name.split(base_dir)[1], "\r"))


def get_case_data():
    """
    从根目录下的case_file.txt中读取要进行压测的load_*.py文件,存放到列表中。
    """
    case_file_name = list()
    with open(os.path.join(base_dir, "case_file.txt"), mode='r') as file:
        while True:
            case_name = file.readline()
            if not case_name:
                return case_file_name
            # linux 系统下文件路径格式变更
            if sys.platform.startswith("linux"):
                match_obj = re.search(r"TestCases\\(.*)\\(.*)\\(load_.*[.]py)", case_name)
                if match_obj:
                    case_name = os.path.join(base_dir, "TestCases", match_obj.group(1), match_obj.group(2),
                                             match_obj.group(3))
            case_file_name.append(case_name)


def start_locust(run_type, case_path=os.path.join(os.path.dirname(os.path.dirname((os.path.abspath(__file__)))), "TestCases")):
    """
    根据run_type类型确认执行脚本及tag
    :param run_type: LOCUST_ALL_FILE_ALL_TAG 所有文件所有标签
                     LOCUST_ALL_FILE_SELECTED_TAG 所有文件指定标签
                     LOCUST_SELECTED_FILE_ALL_TAG 指定文件所有标签
                     LOCUST_SELECTED_FILE_SELECTED_TAG 指定文件指定标签
    :param case_path: 运行所有标签时,读取该目录下的脚本
    :return:
    """
    # 执行所有文件时,读取case_path目录下所有脚本并保存到case_file.txt中
    if run_type in [LOCUST_ALL_FILE_ALL_TAG, LOCUST_ALL_FILE_SELECTED_TAG]:
        write_case_to_file(case_path)

    log_dir, csv_dir = make_out_put_dir()

    # 获取所有执行脚本
    case_files = get_case_data()
    for case_file in case_files:
        ret = re.search(r"(load_.*)\.py$", case_file)
        if ret:
            # 执行语句
            locust_parse = "locust -f {}{} --host={} --headless --users {} --spawn-rate {} --run-time {} "\
                .format(base_dir, case_file[:-1],
                        configr.cp.get('api', 'pre_url'),
                        configr.cp.get('locust_run', 'clients'),
                        configr.cp.get('locust_run', 'spawn_rate'),
                        configr.cp.get('locust_run', 'run_times'))

            # 运行类型为指定标签,则在locust_parse中增加 -T
            if run_type in [LOCUST_ALL_FILE_SELECTED_TAG, LOCUST_SELECTED_FILE_SELECTED_TAG]:
                if list(set(eval(configr.cp.get("run_tags", "tags"))) & get_file_tags(base_dir+case_file[:-1])):
                    # 当前文件包含配置相中的tag, 在locust_parse中增加 -T
                    search_res = re.findall(r"v\d+", configr.cp.get("run_tags", "tags"))
                    locust_parse += "--tags "
                    locust_parse += " ".join(search_res) + " "
                else:
                    # 结束当前循环,开始下一个循环
                    continue
            try:
                os.mkdir(os.path.join(csv_dir, ret.group(1)))
            except Exception as e:  # 避免出现名称相同的csv文件存在而导致运行失败
                logging.error(e.__repr__())
            else:
                log_path = os.path.join(log_dir, ret.group(1)) + '.log'
                cvs_path = os.path.join(csv_dir, ret.group(1), ret.group(1))
    
                locust_parse += "--loglevel=INFO --logfile={} --csv={}".format(log_path, cvs_path)
            # 执行
              os.system(locust_parse)

相关文章

网友评论

      本文标题:Locust批量执行

      本文链接:https://www.haomeiwen.com/subject/soedzktx.html