代码
如若需要加载hdfs的配置, 则需要将core-site.xml和hdfs-site.xml的内容加载到Flink的Configuration 中, hadoop相关的参数需要加上前缀flink.hadoop
public static void main(String[] args) throws Exception {
//假设路径: hdfs://192.168.44.71:8020/metric_data/yyyy-MM-dd/xx.json
ExecutionEnvironment exeEnv = ExecutionEnvironment.getExecutionEnvironment();
TextInputFormat format = new TextInputFormat(new Path());
Configuration conf = new Configuration();
conf.setBoolean("recursive.file.enumeration", true);
format.configure(conf);
format.setFilePaths(new Path());
exeEnv.createInput(format)
.output(new TextOutputFormat < > (new Path("result.txt")));
exeEnv.execute();
}
自定义FileSystem, 用来Kerberos登录认证
继承org.apache.flink.core.fs.Path, 重写getFileSystem()方法, 在创建TextInputFormat 的时候使用下面的ExtendPath来代替原生的Path
下面代码中通过反射来重置FileSystem, 当然也可以通过new HadoopFileSystem(org.apache.hadoop.fs.FileSystem.get(new Configuration()))来重置
public class ExtendPath extends Path {
public ExtendPath(String path) {
super(path);
}
public ExtendPath(URI uri) {
super(uri);
}
@Override
public FileSystem getFileSystem() throws IOException {
HadoopFileSystem fileSystem = (HadoopFileSystem) FileSystem.get(this.toUri());
org.apache.hadoop.conf.Configuration conf = fileSystem.getHadoopFileSystem().getConf();
String authType = conf.get(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION);
if (Objects.equals(authType, "kerberos")) {
UserGroupInformation.setConfiguration(conf);
String principal = conf.get("dfs.namenode.kerberos.principal", conf.get("dfs.datanode.kerberos.principal"));
File krb5Conf = new File("krb5.conf");
File userKeyTab = new File("xx.keytab");
System.setProperty("java.security.krb5.conf", krb5Conf.getAbsolutePath());
try {
UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, userKeyTab.getAbsolutePath());
org.apache.hadoop.fs.FileSystem client = ugi.doAs((PrivilegedExceptionAction < org.apache.hadoop.fs.FileSystem > )() -> org.apache.hadoop.fs.FileSystem.get(conf));
Field fsField = HadoopFileSystem.class.getDeclaredField("fs");
fsField.setAccessible(true);
fsField.set(fileSystem, client);
} catch (Exception e) {
throw new RuntimeException("认证失败," + principal, e);
}
}
return fileSystem;
}
}











网友评论