简介:某些金融类的场景,必须保证数据精准一次处理,计算结果不多不少,以下展示flink在该场景下的代码实现。
前言
Flink提供了基于2PC的SinkFunction,名为TwoPhaseCommitSinkFunction,帮助我们做了一些基础的工作。
- 主逻辑
package com.dpf.flink;
import com.dpf.flink.sink.MysqlSink;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties;
public class MysqlTwoPhaseCommit {
//topic
private static final String topic_ExactlyOnce = "TwoPhaseCommit";
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度,为了方便测试,查看消息的顺序,这里设置为1,可以更改为多并行度
env.setParallelism(1);
//checkpoint的设置
//每隔10s进行启动一个检查点【设置checkpoint的周期】
env.enableCheckpointing(30000);
//设置模式为:exactly_one,仅一次语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//确保检查点之间有1s的时间间隔【checkpoint最小间隔】
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
//检查点必须在10s之内完成,或者被丢弃【checkpoint超时时间】
env.getCheckpointConfig().setCheckpointTimeout(10000);
//同一时间只允许进行一次检查点
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//表示一旦Flink程序被cancel后,会保留checkpoint数据,以便根据实际需要恢复到指定的checkpoint
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//设置statebackend,将检查点保存在hdfs上面,默认保存在内存中。这里先保存到本地
env.setStateBackend(new FsStateBackend("file:///Users/david.dong/tmp/flink/checkpoint"));
//设置kafka消费参数
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, topic_ExactlyOnce);
/*SimpleStringSchema可以获取到kafka消息,JSONKeyValueDeserializationSchema可以获取都消息的key,value,metadata:topic,partition,offset等信息*/
FlinkKafkaConsumer<String> kafkaConsumer011 = new FlinkKafkaConsumer<>(
topic_ExactlyOnce,
new SimpleStringSchema(),
properties);
//加入kafka数据源
DataStreamSource<String> streamSource = env.addSource(kafkaConsumer011);
SingleOutputStreamOperator<Tuple2<String, Integer>> tupleStream = streamSource.map(str -> Tuple2.of(str, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));
tupleStream.print();
//数据传输到下游
tupleStream.addSink(new MysqlSink()).name("MySqlTwoPhaseCommitSink");
//触发执行
env.execute("StreamDemoKafka2Mysql");
}
}
- 自定义sink方法
package com.dpf.flink.sink;
import com.dpf.flink.utils.DBConnectUtil;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.PreparedStatement;
public class MysqlSink extends TwoPhaseCommitSinkFunction<Tuple2<String,Integer>, Connection,Void> {
private static final Logger log = LoggerFactory.getLogger(MysqlSink.class);
public MysqlSink() {
super(new KryoSerializer<>(Connection.class,new ExecutionConfig()), VoidSerializer.INSTANCE);
}
/**
* 执行数据库入库操作 task初始化的时候调用
* @param connection
* @param tuple
* @param context
* @throws Exception
*/
@Override
protected void invoke(Connection connection, Tuple2<String, Integer> tuple, Context context) throws Exception {
log.info("start invoke...");
String value = tuple.f0;
Integer total = tuple.f1;
String sql = "insert into `t_test` (`value`,`total`,`insert_time`) values (?,?,?)";
log.info("====执行SQL:{}===",sql);
PreparedStatement ps = connection.prepareStatement(sql);
ps.setString(1, value);
ps.setInt(2, total);
ps.setLong(3, System.currentTimeMillis());
log.info("要插入的数据:{}----{}",value,total);
if (ps != null) {
String sqlStr = ps.toString().substring(ps.toString().indexOf(":")+2);
log.error("执行的SQL语句:{}",sqlStr);
}
//执行insert语句
ps.execute();
}
/**
* 获取连接,开启手动提交事物(getConnection方法中)
* @return
* @throws Exception
*/
@Override
protected Connection beginTransaction() throws Exception {
log.info("start beginTransaction.......");
String url = "jdbc:mysql://localhost:3306/bigdata?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&useSSL=false&autoReconnect=true";
Connection connection = DBConnectUtil.getConnection(url, "root", "12345678");
return connection;
}
/**
*预提交,这里预提交的逻辑在invoke方法中
* @param connection
* @throws Exception
*/
@Override
protected void preCommit(Connection connection) throws Exception {
log.info("start preCommit...");
}
/**
* 如果invoke方法执行正常,则提交事务
* @param connection
*/
@Override
protected void commit(Connection connection) {
log.info("start commit...");
DBConnectUtil.commit(connection);
}
/**
* 如果invoke执行异常则回滚事物,下一次的checkpoint操作也不会执行
* @param connection
*/
@Override
protected void abort(Connection connection) {
log.info("start abort rollback...");
DBConnectUtil.rollback(connection);
}
}
- JDBC工具类
package com.dpf.flink.utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
/**
* 数据库连接工具类
*/
public class DBConnectUtil {
private static final Logger log = LoggerFactory.getLogger(DBConnectUtil.class);
/**
* 获取连接
*
* @param url
* @param user
* @param password
* @return
* @throws SQLException
*/
public static Connection getConnection(String url, String user, String password) throws SQLException {
Connection conn = null;
try {
Class.forName("com.mysql.jdbc.Driver");
} catch (ClassNotFoundException e) {
log.error("获取mysql.jdbc.Driver失败");
e.printStackTrace();
}
try {
conn = DriverManager.getConnection(url, user, password);
log.info("获取连接:{" + conn + "} 成功...");
} catch (Exception e) {
log.error("获取连接失败,url:" + url + ",user:" + user);
}
//设置手动提交
conn.setAutoCommit(false);
return conn;
}
/**
* 提交事务
*/
public static void commit(Connection conn) {
if (conn != null) {
try {
conn.commit();
} catch (SQLException e) {
log.error("提交事务失败,Connection:" + conn);
e.printStackTrace();
} finally {
close(conn);
}
}
}
/**
* 事务回滚
*
* @param conn
*/
public static void rollback(Connection conn) {
if (conn != null) {
try {
conn.rollback();
} catch (SQLException e) {
log.error("事务回滚失败,Connection:" + conn);
e.printStackTrace();
} finally {
close(conn);
}
}
}
/**
* 关闭连接
*
* @param conn
*/
public static void close(Connection conn) {
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
log.error("关闭连接失败,Connection:" + conn);
e.printStackTrace();
}
}
}
}
结
本事例主要通过mysql展示了flink的两阶段提交流程。












网友评论