美文网首页
Apache Flink 学习笔记(三)

Apache Flink 学习笔记(三)

作者: 憨人Zoe | 来源:发表于2018-09-20 13:53 被阅读0次

本篇将演示如何用Table API 实现上一篇demo3的功能。上一篇传送门 Apache Flink 学习笔记(二)
FlinkDataSetDataStream 都能与Table 互转,每一种操作也都有相对应的 api

补充:使用Table API 以及下一章的SQL,请添加以下依赖项

<!-- Table API和SQL捆绑在flink-tableMaven工件中。必须将以下依赖项添加到项目中才能使用Table API和SQL -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table_2.11</artifactId>
  <version>1.6.0</version>
</dependency>

<!-- 为Flink的Scala批处理或流API添加依赖项 -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-scala_2.11</artifactId>
  <version>1.6.0</version>
</dependency>

<!-- 对于流式查询 -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-scala_2.11</artifactId>
  <version>1.6.0</version>
</dependency>

首先我把 pojo Bean3 抽离出来作为公用,使用pojo记住这四点

  • pojo 必须声明为public,如果是内部类必须是static
  • 必须为pojo创建一个无参的构造函数
  • 必须声明pojo的字段为public,或者生成publicgetset方法
  • 必须使用Flink 支持的数据类型
import java.io.Serializable;

/**
 * pojo
 */
public class Bean3 implements Serializable{
    public Long timestamp;
    public String appId;
    public String module;

    public Bean3() {
    }

    public Bean3(Long timestamp, String appId, String module) {
        this.timestamp = timestamp;
        this.appId = appId;
        this.module = module;
    }

    public long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(Long timestamp) {
        this.timestamp = timestamp;
    }

    public String getAppId() {
        return appId;
    }

    public void setAppId(String appId) {
        this.appId = appId;
    }

    public String getModule() {
        return module;
    }

    public void setModule(String module) {
        this.module = module;
    }

    @Override
    public String toString() {
        return "Bean3{" +
                "timestamp=" + timestamp +
                ", appId='" + appId + '\'' +
                ", module='" + module + '\'' +
                '}';
    }
}

demo5 代码部分

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.api.java.Tumble;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

import java.util.Date;

/**
 * Table API
 */
public class Demo5 {
    private static final String APP_NAME = "app_name";

    public static void main(String[] args) {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().enableSysoutLogging();
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //设置窗口的时间单位为process time
        env.setParallelism(1);//全局并发数

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "kafka bootstrap.servers");
        //设置topic和 app name
        //FlinkKafkaManager 源码见笔记二
        FlinkKafkaManager manager = new FlinkKafkaManager("kafka.topic", APP_NAME, properties);
        FlinkKafkaConsumer09<JSONObject> consumer = manager.build(JSONObject.class);
        consumer.setStartFromLatest();

        //获取DataStream,并转成Bean3
        DataStream<Bean3> stream = env.addSource(consumer).map(new FlatMap());

        final StreamTableEnvironment tableEnvironment = StreamTableEnvironment.getTableEnvironment(env);
        //timestamp,appId,module 是pojo的字段名,最后的tt是随意指定的扩展字段,.proctime用来标识process time
        Table table = tableEnvironment.fromDataStream(stream, "timestamp,appId,module,tt.proctime");
        tableEnvironment.registerTable("common", table);//注册表名

        //或者使用 registerDataStream
        //tableEnvironment.registerDataStream("common", stream, "timestamp,appId,module,tt.proctime");//注册表名

        Table query =
                tableEnvironment
                        .scan("common") //等价from
                        .window(Tumble.over("10.seconds").on("tt").as("dd"))// 每10s执行一次,必须要取别名,且不能和tt相同,这里还没有搞清楚原理
                        .groupBy("dd,appId")//必须要用window找那个指定的dd别名聚合
                        .select("appId,COUNT(module) as totals") //COUNT(module)也可以写成 module.count
                        .where("appId == '100007336' || appId == '100013668'"); //等价于 filter(); 用or 报错。奇葩的是用=,==,=== 都能通过

        DataStream<Row> result = tableEnvironment.toAppendStream(query, Row.class);
        result.process(new ProcessFunction<Row, Object>() {
            @Override
            public void processElement(Row value, Context ctx, Collector<Object> out) throws Exception {
                System.out.println(String.format("AppId:%s, Module Count:%s", value.getField(0).toString(), value.getField(1).toString()));
            }
        });

        try {
            env.execute(APP_NAME);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static class FlatMap implements MapFunction<JSONObject, Bean3> {
        @Override
        public Bean3 map(JSONObject jsonObject) throws Exception {
            return new Bean3(new Date().getTime(), jsonObject.getString("appId"), jsonObject.getString("module"));
        }
    }
}

使用Table API 需要先创建StreamTableEnvironment 对象,然后调用fromDataStream(如果是流处理)创建Table。或者直接调用registerDataStream同时指定表名和字段mapping

本例中我使用的是process time定义窗口event time,所以消息中的timestamp字段并没有使用。而是通过额外扩展一个自定义字段tt来作为process timestamp,该字段只能放在最后,此时还需要在tt后面加上.proctime后缀。

同样的,可以将process time改造成event time,改动如下:

 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //设置窗口的时间单位为event time

指定事件时间戳,同demo3的改造

 DataStream<Bean3> bean3DataStreamWithAssignTime 
        = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Bean3>(Time.seconds(0)) {
            @Override
            public long extractTimestamp(Bean3 element) {
                return element.getTimestamp();
            }
        });

使用timestamp 作为出发时间时间戳,此时必须添加.rowtime后缀

Table table = tableEnvironment.fromDataStream(bean3DataStreamWithAssignTime, "timestamp.rowtime,appId,module");

//window 内指定timestamp 即可

  .window(Tumble.over("10.seconds").on("timestamp").as("dd"))

相关文章

  • Apache Flink 学习笔记(三)

    本篇将演示如何用Table API 实现上一篇demo3的功能。上一篇传送门 Apache Flink 学习笔记(...

  • 2019-02-26

    《从 1 到 100 深入学习 Flink》--- Apache Flink 介绍 目录: 1,flink 流介绍...

  • Flink

    本文主要参考自: Apache Flink 漫谈Apache Flink 漫谈系列 - 序Apache Flink...

  • Apache Flink笔记

    Apache Flink笔记原文链接 :http://timeyang.com/articles/28/2018/...

  • BI系统套装

    flink 文档https://flink.apache.org/[https://flink.apache.or...

  • Flink —— 基本组件与 WordCount

    小白的新手学习笔记,请大佬轻喷本文归档于GitHub,欢迎大家批评指正 Apache Flink is a fra...

  • Apache Flink 学习笔记(一)

    最近在项目中需要用到Flink,关于Flink的基本介绍就不啰嗦了,官方文档传送门 。 由于是第一次接触,我花了一...

  • Apache Flink 学习笔记(四)

    本篇将演示如何使用 Flink SQL 实现上一篇demo5的功能,上一篇传送门 Apache Flink 学习笔...

  • Apache Flink 学习笔记(二)

    上一篇 Apache Flink 学习笔记(一) 简单示范了批处理的使用,本篇展示流式处理的使用方法。 流处理也叫...

  • Apache Flink 学习笔记(五)

    通过阅读官方文档,整理一些前面demo中没有用到但是很有用的东西。排序不分先后。 2018.11.1补充 Flin...

网友评论

      本文标题:Apache Flink 学习笔记(三)

      本文链接:https://www.haomeiwen.com/subject/dsognftx.html