美文网首页
Flume采集日志到Kafka

Flume采集日志到Kafka

作者: Yobhel | 来源:发表于2023-10-31 09:07 被阅读0次
image.png

1.日志采集Flume安装

详情见:https://www.jianshu.com/p/a2590b997e8e?v=1698800791912

集群规划:

服务器hadoop101 服务器hadoop102 服务器hadoop103
Flume(采集日志) Flume Flume

2.项目经验之Flume组件选型

1)Source

(1)Taildir Source相比Exec Source、Spooling Directory Source的优势
TailDir Source:断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。
Exec Source可以实时搜集数据,但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失。
Spooling Directory Source监控目录,支持断点续传。

(2)batchSize大小如何设置?
答:Event 1K左右时,500-1000合适(默认为100)

2)Channel
采用Kafka Channel,省去了Sink,提高了效率。KafkaChannel数据存储在Kafka里面,所以数据是存储在磁盘中。
注意在Flume1.7以前,Kafka Channel很少有人使用,因为发现parseAsFlumeEvent这个配置起不了作用。也就是无论parseAsFlumeEvent配置为true还是false,都会转为Flume Event。这样的话,造成的结果是,会始终都把Flume的headers中的信息混合着内容一起写入Kafka的消息中,这显然不是我所需要的,我只是需要把内容写入即可。

3.日志采集Flume配置

image.png

Flume直接读log日志的数据,log日志的格式是app.yyyy-MM-dd.log。

2)Flume的具体配置如下:
(1)在 /opt/module/flume 目录下创建 job 目录

[yobhel@hadoop101 flume]$ mkdir job

进入 job 目录,创建 file-flume-kafka.conf 文件

[yobhel@hadoop101 job]$ vim file-flume-kafka.conf

在文件配置如下内容

#为各组件命名
a1.sources = r1
a1.channels = c1

#描述source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/data_mocker/log/app.*
a1.sources.r1.positionFile = /opt/module/data_mocker/log_position.json
a1.sources.r1.interceptors =  i1
a1.sources.r1.interceptors.i1.type = com.yobhel.flume.interceptors.ETLInterceptor$Builder

#描述channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop102:9092,hadoop103:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false

#绑定source和channel以及sink和channel的关系
a1.sources.r1.channels = c1

注意:com.yobhel.flume.interceptor.ETLInterceptor是自定义的拦截器的全类名。需要根据用户自定义的拦截器做相应修改。

4 Flume拦截器

https://github.com/Yobhel121/edu-flume-interceptor

1)需要先将打好的包放入到hadoop101的/opt/module/flume/lib文件夹下面。

[yobhel@hadoop101 lib]$ ls | grep edu-flume-interceptor
edu-flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar

2)分发Flume到hadoop102、hadoop103

[yobhel@hadoop101 module]$ xsync flume/

3)在hadoop101上启动Flume

[yobhel@hadoop101 flume]$ bin/flume-ng agent --name a1 --conf-file job/file-flume-kafka.conf &

5.日志采集Flume启动停止脚本

1)在/home/yobhel/bin目录下创建脚本f1.sh

[yobhel@hadoop101 bin]$ vim f1.sh

在脚本中填写如下内容

#! /bin/bash

case $1 in
"start"){
        for i in hadoop101 hadoop102
        do
                echo " --------启动 $i 采集flume-------"
                ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/job/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log1.txt 2>&1  &"
        done
};; 
"stop"){
        for i in hadoop101 hadoop102
        do
                echo " --------停止 $i 采集flume-------"
                ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk  '{print \$2}' | xargs -n1 kill -9 "
        done

};;
esac
  • 说明1:nohup,该命令可以在你退出帐户/关闭终端之后继续运行相应的进程。nohup就是不挂起的意思,不挂断地运行命令。
  • 说明2:awk 默认分隔符为空格。
  • 说明3:xargs 表示取出前面命令运行的结果,作为后面命令的输入参数。
    2)增加脚本执行权限
[yobhel@hadoop101 bin]$ chmod +x f1.sh

3)f1集群启动脚本

[yobhel@hadoop101 module]$ f1.sh start

4)f1集群停止脚本

[yobhel@hadoop101 module]$ f1.sh stop

相关文章

网友评论

      本文标题:Flume采集日志到Kafka

      本文链接:https://www.haomeiwen.com/subject/mxpbidtx.html