美文网首页
distcp服务

distcp服务

作者: john瀚 | 来源:发表于2024-01-16 17:00 被阅读0次
import argparse
import logging
import os
import re
import subprocess
import sys
import traceback
from concurrent.futures import ThreadPoolExecutor
import time
from functools import wraps

from flask import Flask, request, jsonify

LOG_FORMAT = '%(asctime)s %(levelname)s %(message)s'
DEFAULT_MAP_NUMS = 10
DEFAULT_MAP_MAX_BANDWIDTH = 20
RUNING_RETURN_CODE = 200
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format=LOG_FORMAT)
app = Flask("distcp")
executor = ThreadPoolExecutor()
from mysql.connector import pooling

pattern = r"job_[a-zA-Z0-9]+_[0-9]+"
progress_rate_pattern = r"map (\d+)%"
pool = None
# 执行distcp提交mapreduce作业的客户端,多久刷一次日志,默认1000ms
monitor_total_bandwidth = 0
monitor_bandwidth_by_database = False


def retry_on_exception(retries=10, wait_time=2):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for _ in range(retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    print(f"Exception occurred: {e}")
                    print(f"Retrying in {wait_time} seconds...")
                    time.sleep(wait_time)
            raise e  # 所有重试耗尽后抛出异常

        return wrapper

    return decorator


def create_pool(host, port, user, password, database):
    config = {
        "host": host,
        "port": port,
        "user": user,
        "password": password,
        "database": database
    }
    return pooling.MySQLConnectionPool(pool_name="mypool", pool_size=20, **config)


@app.route('/verify', methods=['POST'])
def verify():
    data = request.get_json()
    source_path = data.get('source_path')
    target_path = data.get('target_path')
    result = {}
    if source_path and target_path:
        command_source = f'hadoop fs -du -s {source_path}'
        command_target = f'hadoop fs -du -s {target_path}'

        output_source = subprocess.check_output(command_source, shell=True)
        output_target = subprocess.check_output(command_target, shell=True)

        result_source = re.sub(r"\s+", " ", output_source.decode("utf-8")).split(" ")[0]
        result_target = re.sub(r"\s+", " ", output_target.decode("utf-8")).split(" ")[0]

        # 比较两个目录的存储大小,相等则验证数据一致;否则,验证数据不一致
        # 因为使用distcp拷贝数据时保持了各种属性,正常来说,只需要比较两个目录的存储大小
        if result_source == result_target:
            result = {'verify_result': 'true'}
        else:
            result = {'verify_result': 'false'}
    return jsonify(result)


def perm_to_int(perm_str):
    if "rwx" == perm_str:
        return 7
    elif "rw-" == perm_str:
        return 6
    elif "r-x" == perm_str:
        return 5
    elif "r--" == perm_str:
        return 4
    elif "-wx" == perm_str:
        return 3
    elif "-w-" == perm_str:
        return 2
    elif "--x" == perm_str:
        return 1
    else:
        return 0


def parse_permission(attribute):
    if len(attribute) != 9:
        raise ValueError("permission attribute length must be 9 character.")

    user_permission = perm_to_int(attribute[0:3])
    group_permission = perm_to_int(attribute[3:6])
    other_permission = perm_to_int(attribute[6:9])
    return str(user_permission) + str(group_permission) + str(other_permission)


def get_attribute_dict(table_path):
    results = {}
    if table_path:
        command = f'hadoop fs -ls -d {table_path}'
        command_result = subprocess.check_output(command, shell=True)
        attribute_outputs = re.sub(r"\s+", " ", command_result.decode("utf-8")).split(" ")
        permission = parse_permission(attribute_outputs[0][1:10])
        user = attribute_outputs[2]
        group = attribute_outputs[3]
        results["permission"] = permission
        results["user"] = user
        results["group"] = group
    return results


def do_sync_attribute(source_attr_dict, target_attr_dict, target_table_path):
    try:
        if target_attr_dict["permission"] != source_attr_dict["permission"]:
            new_permission = int(source_attr_dict["permission"])
            perm_command = f'hadoop fs -chmod {new_permission} {target_table_path}'
            print(f'sync permission for {target_table_path} command is {perm_command}')
            subprocess.check_output(perm_command, shell=True)

        if target_attr_dict["user"] != source_attr_dict["user"] \
                or target_attr_dict["group"] != source_attr_dict["group"]:
            new_owner = source_attr_dict["user"] + ":" + source_attr_dict["group"]
            owner_command = f'hadoop fs -chown {new_owner} {target_table_path}'
            print(f'sync owner for {target_table_path} command is {owner_command}')
            subprocess.check_output(owner_command, shell=True)

        return True
    except Exception as e:
        print(f'do sync table directory attribute failed, cause is {e.args}, stack info is {traceback.print_stack()}')
        return False


@app.route('/sync_dir_attribute', methods=['POST'])
def sync_dir_attribute():
    data = request.get_json()
    source_table_path = data.get('source_table_path')
    target_table_path = data.get('target_table_path')

    source_table_attr_dict = get_attribute_dict(source_table_path)
    target_table_attr_dict = get_attribute_dict(target_table_path)
    print(f'source table attribute dict is {source_table_attr_dict}, '
          f'target table attribute dict is {target_table_attr_dict}')
    sync_result = do_sync_attribute(source_table_attr_dict, target_table_attr_dict, target_table_path)

    if sync_result:
        result = {'sync_result': 'true'}
    else:
        result = {'sync_result': 'false'}
    return jsonify(result)


@app.route('/get_storage', methods=['POST'])
def get_storage():
    data = request.get_json()
    target_path = data.get('target_path')
    target_storage = None
    if target_path:
        command_storage = f'hadoop fs -du -s {target_path}'
        output_target = subprocess.check_output(command_storage, shell=True)
        target_storage = re.sub(r"\s+", " ", output_target.decode("utf-8")).split(" ")[0]
        # 获取指定hdfs目录的存储大小
    result = {'storage': target_storage}
    return jsonify(result)


@app.route('/distcp', methods=['POST'])
def distcp():
    data = request.get_json()
    source_path = data.get('source_path')
    target_path = data.get('target_path')
    run_params = data.get('run_params')
    job_name = data.get('job_name', 'distcp-job')
    hadoop_user_name = data.get('hadoop_user_name', None)
    monitor_poll_interval = data.get('monitor_poll_interval', 5000)
    # 使用线程池执行异步任务
    command = ['hadoop', 'distcp', f'-Dmapreduce.job.name={job_name}',
               f'-Dmapreduce.client.progressmonitor.pollinterval={monitor_poll_interval}']
    if run_params:
        # 将命令中多余的空格去掉
        run_params = re.sub(r"\s+", " ", run_params)
        command.extend(run_params.split(" "))
    command.extend([source_path, target_path])
    logging.info(' '.join(command))

    exec_id = insert(command, '', '', 100)

    env_vars = os.environ.copy()
    if hadoop_user_name:
        env_vars.update({'HADOOP_USER_NAME': hadoop_user_name})

    executor.submit(run_distcp, exec_id, command, env_vars)
    return jsonify({'exec_id': exec_id})


def run_distcp(exec_id, command, env_vars):
    # 构建distcp命令
    logs = []
    job_id = None
    map_nums, map_max_bandwidth = parse_command(command)
    cmd = ' '.join(command)
    global monitor_total_bandwidth
    process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=env_vars)
    for line in iter(process.stdout.readline, b''):
        if line:
            line = line.decode('utf-8')
            logs.append(line)
            logging.info(line)
            # progress_rate
            matches = re.findall(progress_rate_pattern, line)
            if matches:
                progress_rate = matches[0]
                update_progress_rate(exec_id, cmd, '', progress_rate, 200)
            # job_id
            matches = re.findall(pattern, line)
            if not job_id and len(matches) > 0:
                job_id = matches[0]
                monitor_total_bandwidth += int(map_nums) * int(map_max_bandwidth)
                update(exec_id, cmd, '', job_id, 200)

    process.stdout.close()
    process.wait()
    return_code = process.returncode
    monitor_total_bandwidth -= int(map_nums) * int(map_max_bandwidth)

    # 记录执行结果
    logging.info(f'Return code: {return_code}\n')
    logging.info(f'Job id: {job_id}\n')
    logging.info('------------------------\n')
    update(exec_id, cmd, ''.join(logs), job_id, return_code)


@retry_on_exception()
def insert(command, output, job_id, return_code):
    map_nums, map_max_bandwidth = parse_command(command)
    cmd = ' '.join(command)
    connection = None
    try:
        connection = pool.get_connection()
        cursor = connection.cursor()
        sql = "INSERT INTO distcp_logs (cmd, output, job_id, return_code, map_nums, map_max_bandwidth) " \
              "VALUES (%s, %s, %s, %s, %s, %s)"
        val = (cmd, output, job_id, return_code, map_nums, map_max_bandwidth)
        cursor.execute(sql, val)
        connection.commit()
        job_id = cursor.lastrowid
        cursor.close()
        logging.info(f"Distcp record insert successfully. ID: {job_id}")
    except Exception as e:
        raise e
    finally:
        if connection:
            connection.close()
    return job_id


@retry_on_exception()
def update_progress_rate(id, cmd, output, progress_rate, return_code):
    connection = None
    try:
        connection = pool.get_connection()
        cursor = connection.cursor()
        sql = "UPDATE distcp_logs SET cmd = %s, output = %s, progress_rate = %s, return_code = %s WHERE id = %s"
        val = (cmd, output, progress_rate, return_code, id)
        cursor.execute(sql, val)
        connection.commit()
        cursor.close()
        logging.info(f"Distcp record {id} with progress_rate {progress_rate} updated successfully.")
    except Exception as e:
        raise e
    finally:
        if connection:
            connection.close()


@retry_on_exception()
def update(id, cmd, output, job_id, return_code):
    connection = None
    try:
        connection = pool.get_connection()
        cursor = connection.cursor()
        sql = "UPDATE distcp_logs SET cmd = %s, output = %s, job_id = %s, return_code = %s WHERE id = %s"
        val = (cmd, output, job_id, return_code, id)
        cursor.execute(sql, val)
        connection.commit()
        cursor.close()
        logging.info(f"Distcp record with id {id} updated successfully.")
    except Exception as e:
        raise e
    finally:
        if connection:
            connection.close()


@app.route('/distcp', methods=['GET'])
def distcp_logs():
    exec_id = request.args.get('exec_id')
    # 查询执行结果
    connection = pool.get_connection()
    cursor = connection.cursor()
    if exec_id:
        sql = "SELECT * FROM distcp_logs WHERE id = %s"
        val = (exec_id,)
        cursor.execute(sql, val)
    else:
        sql = "SELECT * FROM distcp_logs"
        cursor.execute(sql)

    result = cursor.fetchall()
    cursor.close()
    connection.close()
    # 将查询结果转换为Python对象
    data = []
    for row in result:
        entry = {
            "id": row[0],
            "cmd": row[1],
            "job_id": row[2],
            "return_code": row[3],
            "progress_rate": f"{row[4]}%",
            "logs": row[5]
        }
        data.append(entry)

    return jsonify(data)


@app.route('/distcp/total_bandwidth', methods=['GET'])
def distcp_total_bandwidth():
    total_bandwidth = 0
    if monitor_bandwidth_by_database:
        return_code = request.args.get("return_code")
        if return_code is None:
            return_code = RUNING_RETURN_CODE

        # 查询执行结果
        connection = pool.get_connection()
        cursor = connection.cursor()
        sql = "SELECT * FROM distcp_logs WHERE return_code = %s"
        val = (return_code,)
        cursor.execute(sql, val)

        result = cursor.fetchall()
        cursor.close()
        connection.close()

        for row in result:
            map_nums = row[5]
            map_max_bandwidth = row[6]
            if map_nums and map_max_bandwidth:
                total_bandwidth += int(map_nums) * int(map_max_bandwidth)
            else:
                logging.warning(f'Wrong map_nums with {map_nums} '
                                f'or map_max_bandwidth with {map_max_bandwidth}')
    else:
        total_bandwidth = monitor_total_bandwidth

    data = {'total_bandwidth': total_bandwidth}
    return jsonify(data)


@app.route('/distcp/kill', methods=['GET'])
def distcp_kill():
    app_id = request.args.get('app_id')
    command = ['yarn', 'application', '-kill', app_id]
    code = subprocess.call(command)
    return jsonify({'code', code})


# 从命令数组中获得map数和map最大带宽
def parse_command(command):
    map_nums = DEFAULT_MAP_NUMS
    map_max_bandwidth = DEFAULT_MAP_MAX_BANDWIDTH

    i = 0
    while i < len(command):
        if command[i] == '-m':
            i += 1
            map_nums = command[i]
        elif command[i] == '-bandwidth':
            i += 1
            map_max_bandwidth = command[i]
        i += 1

    return map_nums, map_max_bandwidth


def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument('--host', type=str, default='10.52.46.123', help='Database host')
    parser.add_argument('--port', type=int, default=33066, help='Database port')
    parser.add_argument('--user', type=str, default='diagnose', help='Database username')
    parser.add_argument('--password', type=str, default='diagnose123456', help='Database password')
    parser.add_argument('--database', type=str, default='spark_diagnose', help='Database name')
    return parser.parse_args()


if __name__ == '__main__':
    args = parse_args()
    logging.info(' '.join(f'\n{k}={v}' for k, v in vars(args).items()))
    pool = create_pool(args.host, args.port, args.user, args.password, args.database)
    app.run(host="0.0.0.0", debug=True)

相关文章

  • distcp

    hadoop不同集群之间的拷贝 参考官网:http://hadoop.apache.org/docs/r1.0.4...

  • Hadoop深入研究一

    Distcp 用于在两个多个集群之间进行数据的迁移,复制文件 hadoop distcp hdfs://namen...

  • hadoop集群 distcp 缓慢 两个hadoop集群之间使用distcp拷贝时,发现集群之间拷贝数据缓慢,最...

  • Hadoop命令之distcp参考

    distcp命令是用于集群内部或者集群之间拷贝数据的常用命令。 #顾名思义:dist即分布式, distcp即分布...

  • hadoop-distcp源码篇

    0.背景介绍 DistCp(Distributed Copy)是Apache Hadoop自带的工具,目前存在两个...

  • 【2019-07-17】discp拷贝文件失败

    问题描述 distcp跨集群拷贝文件,失败异常如下。 分析过程 1.找到application_156283359...

  • 十一、备份与恢复

    1、HBase发展历程中的集中备份方式 1.1、使用distcp进行关机全备份。HBase所有文件都存在HDFS,...

  • 跨集群hbase表迁移

    HBase提供了copyTable,相当于distcp,但影响在线应用 使用创建快照,跨集群传送集群,然后再还原快...

  • HDFS--DistCP远程复制文件

    1、简介2、使用方式3、常见问题-3.1、需要远程复制的文件没有关闭,还处于写的状态-3.2、带宽问题 1、简介 ...

  • HDFS(6)- distcp并行复制

    我们可以用Java Api写代码进行复制文件或目录,也可以使用hadoop fs -cp进行复制,可这两种效率并不...

网友评论

      本文标题:distcp服务

      本文链接:https://www.haomeiwen.com/subject/cfikodtx.html