美文网首页
数据实时同步(canal、FlinkCDC、ClickHouse

数据实时同步(canal、FlinkCDC、ClickHouse

作者: 笨鸡 | 来源:发表于2022-03-03 11:03 被阅读0次

1.canal

canal 监听mysql8.0 
example.log
Caused by: java.io.IOException: caching_sha2_password Auth failed

ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'password';

2.flink cdc

package com.ctgu;

import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FlinkCDC {

    public static void main(String[] args) throws Exception {

        //1.获取Flink 执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //1.1 开启CK
//        env.enableCheckpointing(5000);
//        env.getCheckpointConfig().setCheckpointTimeout(10000);
//        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//
//        env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/cdc-test/ck"));

        //2.通过FlinkCDC构建SourceFunction
        DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
                .hostname("localhost")
                .port(3306)
                .serverTimeZone("Asia/Shanghai")
                .username("root")
                .password("password")
                .databaseList("binlog_test")
//                .tableList("binlog_test.user_info")
                .deserializer(new StringDebeziumDeserializationSchema())
                .startupOptions(StartupOptions.initial())
                .build();
        DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction);

        //3.数据打印
        dataStreamSource.print();

        //4.启动任务
        env.execute("FlinkCDC");

    }

}

3.clickhouse

mysql.ini 配置文件增加

gtid-mode=on
enforce-gtid-consistency=1
log-slave-updates=1

clickhouse

CREATE DATABASE test_binlog ENGINE = 
MaterializeMySQL('本机实际IP:3306', 'binlog_test', 'root', 'password');

遇到的问题

1.windows10 docker data 的文件挂载异常 permission denied

 把windows use wsl2 engine 关闭

2.Code: 100. DB::Exception: Access denied for user root.
root 用户不是 plugin mysql_native_password的密码

my.ini文件配置加入
default_authentication_plugin = mysql_native_password
修改root密码或者新建用户并赋权

相关文章

网友评论

      本文标题:数据实时同步(canal、FlinkCDC、ClickHouse

      本文链接:https://www.haomeiwen.com/subject/labyrrtx.html