110.1 演示环境介绍
- CM版本:5.13.1
 - CDH版本:5.13.1
 - MariaDB版本:5.5.56
 - StreamSets版本:3.1.2.0
 
110.2 操作演示
1.环境布置
- 把MariaDB的Binlog日志开启
- 修改/etc/my.conf文件,在配置文件mysqld下增加如下配置:
- 建议使用Row模式的Binlog格式
 
 
 - 修改/etc/my.conf文件,在配置文件mysqld下增加如下配置:
 
server-id=1
log-bin=mysql-bin
binlog_format=ROW
- 重启服务
 
[root@ip-168-31-16-68 ~]# systemctl restart mariadb
[root@ip-168-31-16-68 ~]# systemctl status mariadb
- 创建同步账号
 
GRANT ALL on maxwell.* to 'maxwell'@'%' identified by '123456';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';
FLUSH PRIVILEGES;
- 安装MySQL驱动在StreamSets中
- 
把MySQL的JDBC驱动拷贝至/opt/cloudera/parcels/STREAMSETS_DATACOLLECTOR/streamsets-libs/streamsets-datacollector-mysql-binlog-lib/lib目录
 
 - 
 - 创建测试表
 
create database test;
create table cdc_test (
       id int,
       name varchar(32)
);
- 创建Kudu表
 
create table cdc_test (
       id int,
       name String,
       primary key(id)
)
       PARTITION BY HASH PARTITIONS 16
STORED AS KUDU; 
2.创建Pipline
- 
创建一个新的Pipline
 - 
选择Origins类别,搜索MySQL Binary Log
 - 
高级配置,根据需要进行配置
 - 
添加表过滤的Stream Selector
 - 
添加插入类型分流的Stream Selector
 - 添加处理Delete类型日志的JavaScript Evaluator
- 
JavaScript Evaluator主要用于解析DELETE类型的Binary Log 日志
 
 - 
 - 配置JavaScript脚本
 
for(var i = 0; i < records.length; i++) {
  try { 
    var newRecord = sdcFunctions.createRecord(true);
    newRecord.value = records[i].value['OldData'];
    newRecord.value.Type = records[i].value['Type'];
    newRecord.value.Database = records[i].value['Database'];
    newRecord.value.Table = records[i].value['Table'];
    log.info(records[i].value['Type'])
    output.write(newRecord);
  } catch (e) {
    // Send record to error
    error.write(records[i], e);
  }
}
- 添加处理INSRET和UPDATE类型日志的JavaScript Evaluator
- 
JavaScript Evaluator主要用于解析INSERT和UPDATE类型的日志
 
 - 
 - 配置JavaScript脚本
 
for(var i = 0; i < records.length; i++) {
  try { 
    var newRecord = sdcFunctions.createRecord(true);
    newRecord.value = records[i].value['Data'];
    newRecord.value.Type = records[i].value['Type'];
    newRecord.value.Database = records[i].value['Database'];
    newRecord.value.Table = records[i].value['Table'];
    log.info(records[i].value['Type'])
    output.write(newRecord);
  } catch (e) {
    // Send record to error
    error.write(records[i], e);
  }
}
- 
在JavaScript Evaluator-DELETE添加Kudu
 - 
在JavaScript Evaluator-UPSERT添加Kudu
 - 
启动Pipelines
 
3.Pipeline测试
- 向cdc_test表中插入数据
 
insert into cdc_test values(1, 'fayson');
- 
查看Pipeline实时状态
 - 查看Kudu表数据
- 
数据成功的插入到Kudu的cdc_test表中
 
 - 
 - 修改cdc_test表中数据
 
update cdc_test set name='fayson-update' where id=1;
- 查看Pipeline实时状态
- 
Kudu-Upsert成功处理了两条数据,这两条数据分别是INSERT和UPDATE
 
 - 
 - 
查看Kudu的cdc_test表
 - 删除cdc_test表中数据
 
delete from cdc_test where id=1;
- 查看Pipeline实时状态
- 
Kudu-Delete成功处理一条日志
 
 - 
 - 
查看Kudu的cdc_test表,id为1的数据已不存在
 
4.总结
1.在Kudu插入数据时指定Kudu表名需要注意,如使用Impala创建的表,则需要加上impala的前缀格式impala:<database>:<table>
2.实现MySQL CDC的前提是需要开启MySQL的Binary Log日志,并且需要创建复制账号,SreamSets中MySQL-Binary Log实际充当的为MySQL的一个Slave
3.向Kudu实时写入数据的前提是Kudu的表已存在,否则无法正常写入数据
4.需要去确保组装的Map数据中Key与Kudu表中的column字段一致
大数据视频推荐:
腾讯课堂
CSDN
大数据语音推荐:
企业级大数据技术应用
大数据机器学习案例之推荐系统
自然语言处理
大数据基础
人工智能:深度学习入门到精通













网友评论