美文网首页Java玩转大数据
Flink 使用之 SQL 连接 HBase

Flink 使用之 SQL 连接 HBase

作者: AlienPaul | 来源:发表于2025-09-28 09:22 被阅读0次

Flink 使用介绍相关文档目录

Flink 使用介绍相关文档目录

环境信息

  • HBase 2.x
  • Flink 1.17.2
  • Hadoop 3.1.0
  • Hudi 0.15.0(非必须,例子中作为数据源)

依赖配置

下载flink-sql-connector-hbase-2.2-1.17.2.jar并将其放到$FLINK_HOME/lib目录中。

cd $FLINK_HOME/lib
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hbase-2.2/1.17.2/flink-sql-connector-hbase-2.2-1.17.2.jar

除了HBase之外的依赖本篇默认已经配置完毕。Hudi的环境配置方式请参考:Flink 使用之 Hudi 编译部署、配置和使用

使用示例

下面以环境Hudi表同步入HBase场景为例,说明Flink写入和读取HBase的使用方法。

首先进入hbase shell创建hbase表,例如表名为demo-table具有两个column family。

create 'demo-table',  'family1', 'family2'

然后进入Flink sql client。SQL Client的使用方法参见:Flink 使用之 SQL Client

首先创建Hudi表。

CREATE TABLE t1(  
  uuid VARCHAR(20),  
  name VARCHAR(10),  
  age INT,  
  ts TIMESTAMP(3),  
  `partition` VARCHAR(20)  
)  
PARTITIONED BY (`partition`)  
WITH (  
  'connector' = 'hudi',  
  'path' = 'hdfs:///hudi/t1',  
  'table.type' = 'MERGE_ON_READ'  
);

然后插入示例数据:

INSERT INTO t1 VALUES
  ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
  ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
  ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
  ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
  ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
  ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
  ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
  ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');

接着创建HBase表。

HBase集群未开启Kerberos的情况创建方式如下:

create table hb (
 rowkey STRING,
 family1 ROW<name STRING>,
 family2 ROW<age INT, ts TIMESTAMP(3)>,
 PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
 'connector' = 'hbase-2.2',
 'table-name' = 'demo-table',
 'zookeeper.quorum' = 'zk1:2181,zk2:2181,zk3:2181',
 'zookeeper.znode.parent' = '/hbase-unsecure',
 'properties.hbase.security.authentication' = 'simple'
);

HBase集群已开启Kerberos的情况创建方式如下:

create table hb (
 rowkey STRING,
 family1 ROW<name STRING>,
 family2 ROW<age INT, ts TIMESTAMP(3)>,
 PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
 'connector' = 'hbase-2.2',
 'table-name' = 'demo-table',
 'zookeeper.quorum' = 'zk1:2181,zk2:2181,zk3:2181',
 'zookeeper.znode.parent' = '/hbase-secure',
 'properties.hbase.security.authentication' = 'kerberos',
 'properties.hbase.regionserver.kerberos.principal' = 'hbase/_HOST@PAUL.COM'
);

定义HBase表时:

  • 所有的column family都必须定义为ROW类型。字段名映射为column family名称(例如上面的family1),ROW中级联的字段类型对应当前column family中的column qualifier名称(例如name)。不需要声明所有的column family,只需要声明用到的。
  • 除了ROW类型字段外,需要定义一个原生类型的字段,例如STRING,BIGINT等。这个字段将被视为HBase表的rowkey。Rowkey字段的名称没有强制要求。

配置项的解释如下:

  • connector:Flink支持HBase 1.4和HBase 2.2。这里使用hbase 2.x版本,因此需要hbase-2.2连接器。
  • table-name:HBase的表名。写入HBase数据前该表必须现在HBase中创建,否则Flink任务也会正常退出,但数据并未写入。查看日志可见表不存在的报错。
  • zookeeper.quorum:HBase集群的zookeeper集群地址。
  • zookeeper.znode.parent:保存HBase信息的ZNode根路径。这个很重要,有些系统使用的并非默认路径。例如开启Kerberos之后HBase会使用/hbase-secure和未开启时候的/hbase-unsecure不同,因此需要显式配置。
  • properties.hbase.security.authentication:HBase集群认证方式。开启Kerberos时配置为kerberos,未开启时配置simple
  • properties.hbase.regionserver.kerberos.principal:HBase region server认证principal。根据集群实际情况配置。例如'hbase/_HOST@PAUL.COM'。

最后启动同步作业,将Hudi表数据插入到HBase中:

insert into hb select uuid, ROW(name), ROW(age, ts) FROM t1;

等待作业运行完毕之后,可通过select * from hb;查看同步过去的数据。

问题和解答

Flink HBase和Hudi配合使用的时候会存在连接Region Server被拒绝的问题。可能的日志如下:

2025-09-28 15:51:59,564 INFO  org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.RpcRetryingCallerImpl [] - Call exception, tries=6, retries=16, started=4933 ms ago, cancelled=false, msg=Call to master.paul.com/192.168.0.10:16020 failed on local exception: org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.exceptions.ConnectionClosedException: Connection closed, details=row 'demo-table,id8,99999999999999' on table 'hbase:meta' at region=hbase:meta,,1.1588230740, hostname=master.bigdata.com,16020,1757669350953, seqNum=-1, see https://s.apache.org/timeout

经过调查和翻阅相关资料后,发现原因是Hudi Flink的依赖hudi-flink1.17-bundle-0.15.0.jar中包含有hbase-site.xml文件,这个文件并不对应集群中的HBase真实配置。这是产生问题的根因。

[root@manager lib]# jar -tf hudi-flink1.17-bundle-0.15.0.jar | grep hbase-site
hbase-site.xml

参考https://www.modb.pro/db/1734025945613934592之后博主建议使用的解决方案为在创建HBase表的时候显式提供Kerberos的相关配置。例如(和前面例子相同):

create table hb (
 rowkey STRING,
 family1 ROW<name STRING>,
 family2 ROW<age INT, ts TIMESTAMP(3)>,
 PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
 'connector' = 'hbase-2.2',
 'table-name' = 'demo-table',
 'zookeeper.quorum' = 'zk1:2181,zk2:2181,zk3:2181',
 'zookeeper.znode.parent' = '/hbase-secure',
 'properties.hbase.security.authentication' = 'kerberos',
 'properties.hbase.regionserver.kerberos.principal' = 'hbase/_HOST@PAUL.COM'
);

此方式在创建表的时候显式指定了认证相关的配置。配置生效的优先级最高,可以避免其他干扰因素影响结果。

参考文献

https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/hbase/

https://www.modb.pro/db/1734025945613934592

相关文章

网友评论

    本文标题:Flink 使用之 SQL 连接 HBase

    本文链接:https://www.haomeiwen.com/subject/vmwttstx.html