美文网首页
flink-on-k8s Application-mode 问题

flink-on-k8s Application-mode 问题

作者: 邵红晓 | 来源:发表于2021-07-13 15:34 被阅读0次

问题1

Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that 
implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an 
error: Table sink 'default_catalog.default_database.fs_table' doesn't support consuming update changes 
which is 
produced by node GroupAggregate(groupBy=[id, channel], select=[id, channel, COUNT(*) AS cnt])
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-
dist_2.12-1.13.0.jar:1.13.0]
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.j
ava:222) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(App
licationDispatcherBootstrap.java:242) ~[flink-dist_2.12-1.13.0.jar:1.13.0]

解决

1、注意引用,flink-connector-kafka_2.12和flink-sql-connector-kafka_2.12不要同时出现在pom.xml

      <!--flink 算子使用-->
       <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-connector-kafka_2.12</artifactId>
           <version>${flink.version}</version>
       </dependency>
       <!--flink sql 算子使用-->
       <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-sql-connector-kafka_2.12</artifactId>
           <version>${flink.version}</version>
       </dependency>

2、Flink 加载 table Factory 使用的时SPI机制,而我们打的的flink jar包是不包含META-INF.services


image.png

目录自己建好,并且要打入jar中
maven打包插件,将META-INF.services目录下的文件打入jar中,一下是maven插件

         <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.0</version>
                <configuration>
                    <filters>
                        <filter>
                            <artifact>*:*</artifact>
                            <excludes>
                                <exclude>META-INF/*.SF</exclude>
                                <exclude>META-INF/*.DSA</exclude>
                                <exclude>META-INF/*.RSA</exclude>
                            </excludes>
                        </filter>
                    </filters>
                    <transformers>
                        <transformer
                                implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                            <resource>META-INF/spring.handlers</resource>
                        </transformer>
                        <transformer
                                implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                            <mainClass>com.fxc.rpc.impl.member.MemberProvider</mainClass>
                        </transformer>
                        <transformer
                                implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                            <resource>META-INF/spring.schemas</resource>
                        </transformer>
                    </transformers>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

org.apache.flink.table.factories.TableFactory内容

org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory

org.apache.flink.table.factories.Factory内容

org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory
org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaDynamicTableFactory

问题2

Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink 
and no Hadoop file system to support this scheme could be loaded. For a full list of supported file systems,
 please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/. 
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies. 

Unable to mount volumes for pod "my-second-cluster-taskmanager-1-2_flink(e6164522-e2f6-11eb-b173-
eeeeeeeeeeee)": timeout expired waiting for volumes to attach or mount 
for pod "flink"/"my-second-cluster-taskmanager-1-2". list of unmounted volumes=[hadoop-config-volume]. 

list of unattached volumes=[hadoop-config-volume flink-config-volume default-token-qbpfh]
MountVolume.SetUp failed for volume "flink-config-volume" : configmap "flink-config-my-second-cluster" not found

解决

1、1.11版本以后可以直接在Flink Client的机器上(提交作业的机器上,该机器上需要有kubectl客户端环境以及flink环境)
export HADOOP_CONF_DIR然后运行flink run-application启动Flink任务,这样Flink Client会自动通过ConfigMap将Hadoop配置ship到JobManager和TaskManager pod并且加到classpath的
flink-1.13.0/bin/config.sh新增一行export HADOOP_CONF_DIR=/var/lib/jenkins/flink-k8s/file/hadoopconf
内容如下:$ ls hadoopconf/
core-site.xml hdfs-site.xml yarn-site.xml

2、打包镜像Dockerfile也需要指定
COPY hadoopconf/*.xml $FLINK_HOME/hadoopconf/
ENV HADOOP_CONF_DIR=$FLINK_HOME/hadoopconf/
3、依赖的hadoop jar有问题
下载(也可以自己编译)flink Pre-bundled Hadoop 2.7.5 https://flink.apache.org/downloads.html
jar导入到$FLINK_HOME/lib中

问题3

Caused by: org.apache.flink.runtime.taskexecutor.exceptions.TaskSubmissionException: No task slot 
allocated for job ID 7b1c33d4ebbe8d47623c52d33b838f4a and allocation ID 56306de50eed492c961f884c131b2f9e. 

解决

直接加大taskmanager内存解决-Dtaskmanager.memory.process.size=1024m

问题4

java.net.UnknownHostException: xxx-hadoop

解决

修改k8s组件中coredns confmap配置,重新部署pod

.:53 {
errors
health
hosts {
ip0 hostname0
ip1 hostname1
...
}
...
}

问题5
flink 命令参数 -C 使用问题
直接使用maven仓库http://maven.aliyun.com/nexus/content/groups/public/org/apache/flink/flink-sql-connector-kafka_2.12/1.13.0/flink-sql-connector-kafka_2.12-1.13.0.jar
拉取jar包,发现无法拉取成功,验证不是网络不通问题

解决
修改为linux httpd服务,将jar放入/var/www/html/jar/ 目录下可以实现自动拉取

参考
https://blog.csdn.net/qq_31866793/article/details/114883944
https://blog.csdn.net/u013516966/article/details/106536525
https://segmentfault.com/a/1190000039198813
https://segmentfault.com/a/1190000023280126
http://apache-flink.147419.n8.nabble.com/Flink-on-k8s-1-11-3-hdfs-taskmanager-td9907.html
https://blog.csdn.net/cenjianteng/article/details/102654070

相关文章

  • flink-on-k8s Application-mode 问题

    问题1 解决 1、注意引用,flink-connector-kafka_2.12和flink-sql-connec...

  • 问题,不是问题;问题,还是问题

    问题,不是问题 今天,是到新校舍的第一天。没水没电没床铺,教室里连黑板都没有。面对诸多问题,幸运的是...

  • 问题问题还是问题?

    问题实在是太多了!菜要这样做,不这样做是问题;饭没煮好,也是问题;自己不知道学会搞吃的也是问题;生活好像只剩下无尽...

  • 问题不是问题,如何对待问题才是问题。

    这几天身体状态一直不太好,反复头疼,但仍然坚持上课,我知道我要什么。我渴望找到那个自由绽放的自己。 早上怀着期待的...

  • “问题”不是问题,认为“是问题”才是问题

    大概从一个多星期前,腹部就开始有间断的疼痛感了,有时甚至会很难受。这种感觉在今天上午坐车时达到了最强烈,我简直不知...

  • 问题不是问题,怎样看问题才是问题。

    焦点幸福教师 坚持分享第1349天2020.11.17周二 小风波 1.购物 最近布置班级文化,我从网上购...

  • 问题的问题

    “你别总给我说问题、问题,给我提一些建设性的建议行不?” “你咋变成了意见篓子?” 满腔热血地跑到领导面前表功,结...

  • 问题的问题

    “你别总给我说问题、问题,给我提些建设性的建议行不?” “你咋变成了意见篓子?” 满腔热血地跑到领导面前表功,结果...

  • 问题不是问题!?

    创45:3-5 约瑟对他弟兄们说:“我是约瑟,我的父亲还在吗?”他弟兄不能回答,因为在他面前都惊惶。约瑟又对他弟兄...

  • 问题的问题

    问题家庭的问题是:家长本身用着有问题的方法教育孩子,没问题的孩子也变得有问题。 而且更严重的问题是:家长并没有意识...

网友评论

      本文标题:flink-on-k8s Application-mode 问题

      本文链接:https://www.haomeiwen.com/subject/ddcspltx.html