一般来说,我们使用Flink Table API的时候,将Kafka中的topic映射为Table的时候,都是采用json的格式。形如如下的定义:
CREATE TABLE user_behavior(
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
) WITH(
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'json'
);
json中的每个字段对应表中的一列。绝大多数的情况下是没有任何问题的。但是我们有这样的一个需求:我们需要增加一个新字段,而且可以控制这个字段是否在json中出现。如果采用上面的建表方式,无法满足这样的需求,输出的json会始终包含这个属性。考虑后续的扩展性,总不能每加一个字段就要重新构建表吧。于是乎,我们计划采用csv的格式,只定义一个字段,类型为string来存储整个json结构体,DDL如下:
CREATE TABLE user_behavior(
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
) WITH(
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'csv',
'csv.quote-character' = 'true'
);
然后写一个自定义函数,将字段拼接为json格式的字符串。
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.table.functions.ScalarFunction;
public class JsonFunction extends ScalarFunction {
public String eval(String userId,String itemId,String categoryId,String behavior,String ts){
JSONObject json = new JSONObject();
json.put("userId",userId);
json.put("itemId",itemId);
json.put("categoryId",categoryId);
json.put("behavior",behavior);
json.put("ts",ts);
System.out.println(json);
return json.toJSONString();
}
}
这样一来,我可以很方便的在函数中增加业务逻辑,比如根据你的配置决定哪些属性不拼接上去。
函数使用是比较简单的:
tableEnv.createTemporarySystemFunction("toJson", JsonFunction.class);
StatementSet stmtSet = tableEnv.createStatementSet();
stmtSet.addInsertSql("INSERT INTO user_behavior select toJson(userId,itemId,categoryId,behavior,ts) FROM UserBehavior");










网友评论