1.canal
canal 监听mysql8.0
example.log
Caused by: java.io.IOException: caching_sha2_password Auth failed
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'password';
2.flink cdc
package com.ctgu;
import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FlinkCDC {
public static void main(String[] args) throws Exception {
//1.获取Flink 执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//1.1 开启CK
// env.enableCheckpointing(5000);
// env.getCheckpointConfig().setCheckpointTimeout(10000);
// env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//
// env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/cdc-test/ck"));
//2.通过FlinkCDC构建SourceFunction
DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.serverTimeZone("Asia/Shanghai")
.username("root")
.password("password")
.databaseList("binlog_test")
// .tableList("binlog_test.user_info")
.deserializer(new StringDebeziumDeserializationSchema())
.startupOptions(StartupOptions.initial())
.build();
DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction);
//3.数据打印
dataStreamSource.print();
//4.启动任务
env.execute("FlinkCDC");
}
}
3.clickhouse
mysql.ini 配置文件增加
gtid-mode=on
enforce-gtid-consistency=1
log-slave-updates=1
clickhouse
CREATE DATABASE test_binlog ENGINE =
MaterializeMySQL('本机实际IP:3306', 'binlog_test', 'root', 'password');
遇到的问题
1.windows10 docker data 的文件挂载异常 permission denied
把windows use wsl2 engine 关闭
2.Code: 100. DB::Exception: Access denied for user root.
root 用户不是 plugin mysql_native_password的密码
my.ini文件配置加入
default_authentication_plugin = mysql_native_password
修改root密码或者新建用户并赋权
网友评论