很期待用纯sql的形式来处理流式数据,flink 1.10推出了生产可用的 Hive 集成,拥有了更强的流式 SQL 处理能力。这次我们就来尝试一下啦~~
【本文大纲】
1、环境准备
2、SQL Client与hive集成配置
3、用SQL Client读取kafka数据
1、环境准备
相关软件版本:
linux版本:centos 6.5
Java版本:jdk1.8
Hive版本:hive-2.3.4
Hadoop版本:hadoop-2.7.3
flink: flink-1.10.0
scala:scala-2.11
kafka:kafka_2.11-2.3.0
有关java、hive、hadoop的安装之前写过了: Hive源码系列(一)hive2.1.1+hadoop2.7.3环境搭建
下面准备一下flink,scala,kafka环境
1.1 scala安装
下载 scala-2.11.12.tgz
tar-zxvf scala-2.11.12.tgz##解压scalaln -s flink-1.10.0flink##软链接vim /etc/profile##设置环境变量
source/etc/profile##生效
测试:
1.2 kafka安装
下载kafka_2.11-2.3.0.tgz
tar-zxvfkafka_2.11-2.3.0.tgz##解压kafkaln-skafka_2.11-2.3.0kafka##软链接vim/etc/profile##设置环境变量
source/etc/profile##生效
启动kafka服务:
zookeeper-server-start.sh$KAFKA_HOME/config/zookeeper.properties &kafka-server-start.sh$KAFKA_HOME/config/server.properties &
创建测试的topic(flinktest):
kafka-topics.sh--create--bootstrap-serverlocalhost:9092--replication-factor1--partitions1--topicflinktestkafka-topics.sh--list--bootstrap-serverlocalhost:9092##查看创建的topic
分别启动生产者和 消费者测试一下:
kafka-console-producer.sh--broker-listlocalhost:9092--topicflinktest##生产者kafka-console-consumer.sh--bootstrap-serverlocalhost:9092--topicflinktest--from-beginning##消费者
如下图,说明生产数据没有问题:
以上,准备好了实时数据源,方便后面做测试用
1.3 flink安装
下载flink-1.10.0-bin-scala_2.11.tgz
tar-zxvf flink-1.10.0-bin-scala_2.11.tgz##解压flinkln -s flink-1.10.0flink##软链接vim /etc/profile##设置环境变量
source/etc/profile##生效
配置flink–Standalone模式:
## flink-conf.yaml 文件配置vim$FLINK_HOME/conf/flink-conf.yaml##配置主节点的ip
## slavesvim$FLINK_HOME/conf/slaves##配置从节点ip##写入dataming
以上 flink单例模型配置完毕
2、SQL Client与hive集成配置
2.1 配制yaml文件
cp$FLINK_HOME/conf/sql-client-defaults.yaml sql-client-hive.yamlvim$FLINK_HOME/conf/sql-client-hive.yaml
2.2 加入依赖包
这一块是遇到问题最多的了
依赖hive相关包:
$HIVE_HOME/lib/hive-exec-2.3.4.jar$HIVE_HOME/lib/hive-common-2.3.4.jar$HIVE_HOME/lib/hive-metastore-2.3.4.jar$HIVE_HOME/lib/hive-shims-common-2.3.4.jar$HIVE_HOME/lib/antlr-runtime-3.5.2.jar$HIVE_HOME/lib/datanucleus-api-jdo-4.2.4.jar$HIVE_HOME/lib/datanucleus-core-4.1.17.jar$HIVE_HOME/lib/datanucleus-rdbms-4.1.19.jar$HIVE_HOME/lib/javax.jdo-3.2.0-m3.jar$HIVE_HOME/lib/libfb303-0.9.3.jar$HIVE_HOME/lib/jackson-core-2.6.5.jar
其它包:
commons-cli-1.3.1.jarflink-connector-hive_2.11-1.10.0.jarflink-hadoop-compatibility_2.11-1.10.0.jarflink-shaded-hadoop2-uber-blink-3.2.4.jarflink-table-api-java-bridge_2.11-1.10.0.jarmysql-connector-java-5.1.9.jar
将以上jar放入目录 $FLINK_HOME/lib
2.3 启动
start-cluster.sh
3、用SQL Client读取kafka数据
3.1 启动sql client
sql-client.sh embedded -d conf/sql-client-hive.yaml
3.2 创建表
CREATETABLEmykafka (nameString, ageInt)WITH('connector.type'='kafka','connector.version'='universal','connector.topic'='flinktest','connector.properties.zookeeper.connect'='localhost:2181','connector.properties.bootstrap.servers'='localhost:9092','format.type'='csv','update-mode'='append');
此时在hive中也能看到用flink sql client 新创建的表啦:
3.3 写数据
此时,用kafka生产端写入几条数据,可以从flink端查到了:
这样以来,就可以通过SQL Client这种纯SQL的方式来操作实时数据了
SQL Client 未来可期呀~~
如果觉得有用,欢迎关注【数据仓库践行者】公众号哦
网友评论