美文网首页
Flink Batch读取多个Hdfs文件

Flink Batch读取多个Hdfs文件

作者: 清蒸三文鱼_ | 来源:发表于2025-07-20 14:42 被阅读0次

代码

如若需要加载hdfs的配置, 则需要将core-site.xml和hdfs-site.xml的内容加载到Flink的Configuration 中, hadoop相关的参数需要加上前缀flink.hadoop

public static void main(String[] args) throws Exception {
     //假设路径: hdfs://192.168.44.71:8020/metric_data/yyyy-MM-dd/xx.json
    ExecutionEnvironment exeEnv = ExecutionEnvironment.getExecutionEnvironment();
    TextInputFormat format = new TextInputFormat(new Path());
    Configuration conf = new Configuration();
    conf.setBoolean("recursive.file.enumeration", true);
    format.configure(conf);
    format.setFilePaths(new Path());
    exeEnv.createInput(format)
        .output(new TextOutputFormat < > (new Path("result.txt")));
    exeEnv.execute();
}

自定义FileSystem, 用来Kerberos登录认证

继承org.apache.flink.core.fs.Path, 重写getFileSystem()方法, 在创建TextInputFormat 的时候使用下面的ExtendPath来代替原生的Path

下面代码中通过反射来重置FileSystem, 当然也可以通过new HadoopFileSystem(org.apache.hadoop.fs.FileSystem.get(new Configuration()))来重置

public class ExtendPath extends Path {
    public ExtendPath(String path) {
        super(path);
    }
    public ExtendPath(URI uri) {
        super(uri);
    }
    @Override
    public FileSystem getFileSystem() throws IOException {
        HadoopFileSystem fileSystem = (HadoopFileSystem) FileSystem.get(this.toUri());
        org.apache.hadoop.conf.Configuration conf = fileSystem.getHadoopFileSystem().getConf();
        String authType = conf.get(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION);
        if (Objects.equals(authType, "kerberos")) {
            UserGroupInformation.setConfiguration(conf);
            String principal = conf.get("dfs.namenode.kerberos.principal", conf.get("dfs.datanode.kerberos.principal"));
            File krb5Conf = new File("krb5.conf");
            File userKeyTab = new File("xx.keytab");
            System.setProperty("java.security.krb5.conf", krb5Conf.getAbsolutePath());
            try {
                UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, userKeyTab.getAbsolutePath());
                org.apache.hadoop.fs.FileSystem client = ugi.doAs((PrivilegedExceptionAction < org.apache.hadoop.fs.FileSystem > )() -> org.apache.hadoop.fs.FileSystem.get(conf));
                Field fsField = HadoopFileSystem.class.getDeclaredField("fs");
                fsField.setAccessible(true);
                fsField.set(fileSystem, client);
            } catch (Exception e) {
                throw new RuntimeException("认证失败," + principal, e);
            }
        }
        return fileSystem;
    }
}

相关文章

网友评论

      本文标题:Flink Batch读取多个Hdfs文件

      本文链接:https://www.haomeiwen.com/subject/lcnpnjtx.html