案例一:监听某一个端口的数据
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop000
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
(2)在$Flume_Home/conf中执行以下命令,启动agent(agent 为配置文件中agent的名称,conf为源目录的conf,conf-file为我们写的配置文件,最后一个参数为输出到控制台.)
flume-ng agent --name a1 --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/example.conf -Dflume.root.logger=INFO,console
另起一个终端,通过telnet hadoop000 44444命令向flume发log
案例二:监听某文件的数据
(1)配置文件的名字为:exec-memory-logger.conf 技术选型如名字所示
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/data/data.log
a1.sources.r1.shell = /bin/sh -c
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
~
(2)先在 /home/hadoop/data下创建data.log,然后启动agent,对log写数据时候,即可看到flume收集到数据
(3)启动命令为
flume-ng agent --name a1 --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/exec-memory-logger.conf -Dflume.root.logger=INFO,console

image.png
案例三:从服务器A上移动日志文件到服务器B
(1)本次的技术选型采用:avro.具体的配置文件为exec-memory-avro.conf和avro-memory-logger.conf,A方由exec向data.log中进行收集,之后转换为avro.slik,B方接受为avro.source,经过channel转换成logger输出到控制台,原理如图所示,(注意应先启动监听方)

image.png
//A方
exec-memory-avro.sources = exec-source
exec-memory-avro.sinks = avro-sink
exec-memory-avro.channels = memory-channel
exec-memory-avro.sources.exec-source.type = exec
exec-memory-avro.sources.exec-source.command = tail -F /home/hadoop/data/data.log
exec-memory-avro.sources.exec-source.shell = /bin/sh -c
//绑定端口
exec-memory-avro.sinks.avro-sink.type = avro
exec-memory-avro.sinks.avro-sink.hostname = hadoop000
exec-memory-avro.sinks.avro-sink.port = 44444
exec-memory-avro.channels.memory-channel.type = memory
exec-memory-avro.sources.exec-source.channels = memory-channel
exec-memory-avro.sinks.avro-sink.channel = memory-channel
//B方
avro-memory-logger.sources = avro-source
avro-memory-logger.sinks = logger-sink
avro-memory-logger.channels = memory-channel
//监听端口
avro-memory-logger.sources.avro-source.type = avro
avro-memory-logger.sources.avro-source.bind = hadoop000
avro-memory-logger.sources.avro-source.port = 44444
avro-memory-logger.sinks.logger-sink.type = logger
avro-memory-logger.channels.memory-channel.type = memory
avro-memory-logger.sources.avro-source.channels = memory-channel
avro-memory-logger.sinks.logger-sink.channel = memory-channel
(2)启动命令还是与上述的相似,注意修改文件名即可
(3)日志收集过程解析
- 机器A上监控一个文件,当我们访问主站时会有用户行为日志记录到data.log(nginx进行收集)
- avro sink 把新产生的日志输出到对应的avro source指定的hostname和port上
- 通过avro source对应的agent将我们的日志输出到控制台(console)
网友评论