Flink 使用介绍相关文档目录
环境信息
- HBase 2.x
- Flink 1.17.2
- Hadoop 3.1.0
- Hudi 0.15.0(非必须,例子中作为数据源)
依赖配置
下载flink-sql-connector-hbase-2.2-1.17.2.jar并将其放到$FLINK_HOME/lib目录中。
cd $FLINK_HOME/lib
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hbase-2.2/1.17.2/flink-sql-connector-hbase-2.2-1.17.2.jar
除了HBase之外的依赖本篇默认已经配置完毕。Hudi的环境配置方式请参考:Flink 使用之 Hudi 编译部署、配置和使用
使用示例
下面以环境Hudi表同步入HBase场景为例,说明Flink写入和读取HBase的使用方法。
首先进入hbase shell创建hbase表,例如表名为demo-table具有两个column family。
create 'demo-table', 'family1', 'family2'
然后进入Flink sql client。SQL Client的使用方法参见:Flink 使用之 SQL Client
首先创建Hudi表。
CREATE TABLE t1(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'hdfs:///hudi/t1',
'table.type' = 'MERGE_ON_READ'
);
然后插入示例数据:
INSERT INTO t1 VALUES
('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
接着创建HBase表。
HBase集群未开启Kerberos的情况创建方式如下:
create table hb (
rowkey STRING,
family1 ROW<name STRING>,
family2 ROW<age INT, ts TIMESTAMP(3)>,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'hbase-2.2',
'table-name' = 'demo-table',
'zookeeper.quorum' = 'zk1:2181,zk2:2181,zk3:2181',
'zookeeper.znode.parent' = '/hbase-unsecure',
'properties.hbase.security.authentication' = 'simple'
);
HBase集群已开启Kerberos的情况创建方式如下:
create table hb (
rowkey STRING,
family1 ROW<name STRING>,
family2 ROW<age INT, ts TIMESTAMP(3)>,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'hbase-2.2',
'table-name' = 'demo-table',
'zookeeper.quorum' = 'zk1:2181,zk2:2181,zk3:2181',
'zookeeper.znode.parent' = '/hbase-secure',
'properties.hbase.security.authentication' = 'kerberos',
'properties.hbase.regionserver.kerberos.principal' = 'hbase/_HOST@PAUL.COM'
);
定义HBase表时:
- 所有的column family都必须定义为ROW类型。字段名映射为column family名称(例如上面的family1),ROW中级联的字段类型对应当前column family中的column qualifier名称(例如name)。不需要声明所有的column family,只需要声明用到的。
- 除了ROW类型字段外,需要定义一个原生类型的字段,例如STRING,BIGINT等。这个字段将被视为HBase表的rowkey。Rowkey字段的名称没有强制要求。
配置项的解释如下:
- connector:Flink支持HBase 1.4和HBase 2.2。这里使用hbase 2.x版本,因此需要
hbase-2.2连接器。 - table-name:HBase的表名。写入HBase数据前该表必须现在HBase中创建,否则Flink任务也会正常退出,但数据并未写入。查看日志可见表不存在的报错。
- zookeeper.quorum:HBase集群的zookeeper集群地址。
- zookeeper.znode.parent:保存HBase信息的ZNode根路径。这个很重要,有些系统使用的并非默认路径。例如开启Kerberos之后HBase会使用
/hbase-secure和未开启时候的/hbase-unsecure不同,因此需要显式配置。 - properties.hbase.security.authentication:HBase集群认证方式。开启Kerberos时配置为
kerberos,未开启时配置simple。 - properties.hbase.regionserver.kerberos.principal:HBase region server认证principal。根据集群实际情况配置。例如'hbase/_HOST@PAUL.COM'。
最后启动同步作业,将Hudi表数据插入到HBase中:
insert into hb select uuid, ROW(name), ROW(age, ts) FROM t1;
等待作业运行完毕之后,可通过select * from hb;查看同步过去的数据。
问题和解答
Flink HBase和Hudi配合使用的时候会存在连接Region Server被拒绝的问题。可能的日志如下:
2025-09-28 15:51:59,564 INFO org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.RpcRetryingCallerImpl [] - Call exception, tries=6, retries=16, started=4933 ms ago, cancelled=false, msg=Call to master.paul.com/192.168.0.10:16020 failed on local exception: org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.exceptions.ConnectionClosedException: Connection closed, details=row 'demo-table,id8,99999999999999' on table 'hbase:meta' at region=hbase:meta,,1.1588230740, hostname=master.bigdata.com,16020,1757669350953, seqNum=-1, see https://s.apache.org/timeout
经过调查和翻阅相关资料后,发现原因是Hudi Flink的依赖hudi-flink1.17-bundle-0.15.0.jar中包含有hbase-site.xml文件,这个文件并不对应集群中的HBase真实配置。这是产生问题的根因。
[root@manager lib]# jar -tf hudi-flink1.17-bundle-0.15.0.jar | grep hbase-site
hbase-site.xml
参考https://www.modb.pro/db/1734025945613934592之后博主建议使用的解决方案为在创建HBase表的时候显式提供Kerberos的相关配置。例如(和前面例子相同):
create table hb (
rowkey STRING,
family1 ROW<name STRING>,
family2 ROW<age INT, ts TIMESTAMP(3)>,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'hbase-2.2',
'table-name' = 'demo-table',
'zookeeper.quorum' = 'zk1:2181,zk2:2181,zk3:2181',
'zookeeper.znode.parent' = '/hbase-secure',
'properties.hbase.security.authentication' = 'kerberos',
'properties.hbase.regionserver.kerberos.principal' = 'hbase/_HOST@PAUL.COM'
);
此方式在创建表的时候显式指定了认证相关的配置。配置生效的优先级最高,可以避免其他干扰因素影响结果。
参考文献
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/hbase/







网友评论