当前测试FLINK版本是1.9.1 RELEASE
1. 使用FLINK SQL描述如下:
CREATE TABLE WORD_COUNT_SINK(`word` varchar, score INT) WITH ('connector.type'='elasticsearch','connector.version'='6'
,'connector.hosts.0.hostname'='localhost'
,'connector.hosts.0.port'='9200'
,'connector.hosts.0.protocol'='http'
,'connector.index'='word_cnt_idx'+
,'connector.document-type'='wc'
,'format.type'='json'
,'update-mode'='append'
,'format.derive-schema'='true'
,'connector.bulk-flush.max-actions'='1')
问题一: 无法sink出数据
分析代码发现,开源的essink为了性能优化使用了bulk提交。
默认需要1000条或大于5M提交一次,或遇到checkpoint时候进行sink。
本次实现小于这个数据,只有几条,所以很悲剧的没有写入到es。
同理,未来产品环境也会存在该问题,无法实现无损的写出。因此需要实现其他策略
问题二: 无法覆盖同样的数据,因此无法实现可重复的情况。(需要支持Primary KEY)









网友评论