Flume

作者: 博弈史密斯 | 来源:发表于2018-06-07 15:30 被阅读0次

flume 是一个数据搜集框架,负责采集各个服务器上的日志文件,并上传到 hdfs、kafka、其他服务器等。

flume由三部分组成:

  • source:用来搜集数据
  • channel: 用来暂时存放数据(可选择存到 内存 或文件 中 等)
  • sink: 将数据写入到某种介质中(hdfs、oracle等)

source

1. avro source

flume 通过avro方式在两台机器之间进行数据传输。

比如,192.168.17.18 上的数据 传到 192.168.17.17,
首先要再两台机器上都部署 flume,17 下的配置文件:

a2.sources.r1.type = avro  
a2.sources.r1.channels = c1  
a2.sources.r1.bind = 192.168.17.17  
a2.sources.r1.port = 1234  

在17上启动 flume:

flume-ng agent --conf ./conf/ -f conf/avro-flume.conf -Dflume.root.logger=DEBUG,console -n a2

在18上运行:

flume-ng avro-client -c ./conf -H 192.168.17.17 -p 1234 -F logs/test_data.log

这样 18 上的logs/test_data.log 的数据直接在 17上打印出来了,即 17 的控制台可以看到 18 的内容

2. exec source

exec 在启动时运行给定的Unix命令。

tail -f filename
说明:监视filename文件的尾部内容(默认10行)

对文件实时监控,如果发现文件有新日志,立刻收集并发送。
缺点:当你的服务器宕机重启后,此时数据读取还是从头开始。

agent.sources.s1.type=exec            
agent.sources.s1.command=tail -f /tmp/logs/kafka.log
3. spooldir source

对指定目录进行实时监控,如发现目录新增文件,立刻收集并发送

缺点:不能对目录文件进行修改,如果有追加内容的文本文件,不允许

a1.sources.s1.type = spooldir    
a1.sources.s1.spoolDir = /opt/data/flume/log
4. taildir source

可以监控一个目录下的多个文件,可以实现 实时读取文件。
实现了 ExecSource + SpoolDirectorySource 的功能。

taildirSource 对于log4j 日志有BUG:log4j 日志会自动切分,log4j 切分日志其实就是新建一个文件,然后把原来的日志文件都改名。但是 taildirSource 组件不支持文件改名。如果文件改名会认为是新文件,就会重新读取,这就导致了日志文件重读。
可以通过修改源码解决,网上有教程。

channel

1. Memory Channel

Memory Channel把Event保存在内存队列中,有最好的性能,不过也有数据可能会丢失的风险,如果Flume崩溃或者重启,那么保存在Channel中的Event都会丢失

#具体定义channel,容纳1000条数据,100条发送一次
a4.channels.c1.type = memory
a4.channels.c1.capacity = 10000
a4.channels.c1.transactionCapacity = 100
2. File Channel

File Channel把Event保存在本地硬盘中,比Memory Channel提供更好的可靠性和可恢复性,不过要操作本地文件,性能要差一些。

配置项 默认值 说明
type 值为file
dataDir 保存路径

File Channel参考配置,a1为Agent实例名称。

a1.channels = c1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data

Kafka Channel

Kafka Channel把Event保存在Kafka集群中,能提供比File Channel更好的性能和比Memory Channel更高的可靠性。

配置项 默认值 说明
type org.apache.flume.channel.kafka.KafkaChannel
brokerList Kafka集群Broker列表
zookeeperConnect Kafka集群的ZooKeeper路径

Kafka Channel参考配置,a1为Agent实例名称。

a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.capacity = 10000
a1.channels.channel1.transactionCapacity = 1000
a1.channels.channel1.brokerList=kafka-2:9092, kafka-3:9092
a1.channels.channel1.topic=channel1
a1.channels.channel1.zookeeperConnect=kafka-1:2181

Spillable Memory Channel

Spillable Memory Channel把Event保存到内存队列和本地文件中,数据优先存储在内存队列中,当内存队列满了以后会保存到文件中。Spillable Memory Channel在保持较高性能的同时,又能兼顾可靠性。

配置项 默认值 说明
type SPILLABLEMEMORY
memoryCapacity 10000 内存队列中数据最大数量
overflowCapacity 100000000 文件中数据最大数量

参考配置如下,a1为Agent实例名称。

a1.channels = c1
a1.channels.c1.type = SPILLABLEMEMORY
a1.channels.c1.memoryCapacity = 10000
a1.channels.c1.overflowCapacity = 1000000
a1.channels.c1.byteCapacity = 800000
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data

sink

1. HDFS sink

接收器将事件写入Hadoop分布式文件系统(HDFS)

#具体定义sink,filePrefix:数据前缀。 hdfs://ns1/flume/%Y%m%d:指定path,%Y%m%d 指按照时间生成
a4.sinks.k1.type = hdfs
a4.sinks.k1.hdfs.path = hdfs://ns1/flume/%Y%m%d
a4.sinks.k1.hdfs.filePrefix = events-
# 使用文本文件,不使用sequenceFile  
a4.sinks.k1.hdfs.fileType = DataStream

#不按照条数生成文件,rollCount:多少条数据产生一个文件
a4.sinks.k1.hdfs.rollCount = 0

#HDFS上的文件达到128M时生成一个文件,即文件已经写了多次次之后,写新文件
a4.sinks.k1.hdfs.rollSize = 134217728
2. hive sink
a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083
a1.sinks.k1.hive.database = logsdb
a1.sinks.k1.hive.table = weblogs
a1.sinks.k1.hive.partition = asia,%{country},%y-%m-%d-%H-%M
3. hbase sink
4. avro sink

avro sink形成了Flume分层收集支持的一半。 发送到此接收器的Flume事件将转换为Avro事件并发送到配置的 主机名/端口对。

a1.sinks.k1.type = avro  
a1.sinks.k1.channel = c1  
a1.sinks.k1.hostname = rainbow.com.cn  
a1.sinks.k1.port = 4545

5. kafka sink

这是一个Flume Sink实现,可以将数据发布到Kafka主题。 其中一个目标是将Flume与Kafka集成

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink  
a1.sinks.k1.brokerList = rainbow.com.cn:9092  
a1.sinks.k1.topic = testTopic

可以在 .conf 文件中配置多个 source、channel、sink

1.多sink

channel 的内容只输出一次,同一个event 如果sink1 输出,sink2 不输出;如果sink1 输出,sink1 不输出。 最终 sink1+sink2=channel 中的数据。

配置文件如下:

a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.shell = /bin/bash -c
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/apps/logs/tail4.log

# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#sink1
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.ki.kafka.producer.compression.type = snappy

#sink2
a1.sinks.k2.type = file_roll
a1.sinks.k2.channel = c1
#a1.sinks.k2.sink.rollInterval=0
a1.sinks.k2.sink.directory = /opt/apps/tmp

2.多 channel 多sink ,每个sink 输出内容一致

(memory channel 用于kafka操作,实时性高,file channel 用于 sink file 数据安全性高)
(多channel 单 sink 的情况没有举例,个人感觉用处不广泛。)

配置文件如下:

a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.shell = /bin/bash -c
a1.sources.r1.channels = c1 c2
a1.sources.r1.command = tail -F /opt/apps/logs/tail4.log
#多个channel 的数据相同
a1.sources.r1.selector.type=replicating

# channel1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#channel2
a1.channels.c2.type = file
a1.channels.c2.checkpointDir = /opt/apps/flume-1.7.0/checkpoint
a1.channels.c2.dataDirs = /opt/apps/flume-1.7.0/data

#sink1
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.ki.kafka.producer.compression.type = snappy

#sink2
a1.sinks.k2.type = file_roll
a1.sinks.k2.channel = c2
#a1.sinks.k2.sink.rollInterval=0
a1.sinks.k2.sink.directory = /opt/apps/tmp

3. 多source 单 channel 单 sink

多个source 可以读取多种信息放在一个channel 然后输出到同一个地方
配置文件如下:

a1.sources = r1 r2
a1.sinks = k1
a1.channels = c1

# source1
a1.sources.r1.type = exec
a1.sources.r1.shell = /bin/bash -c

a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/apps/logs/tail4.log

# source2
a1.sources.r2.type = exec
a1.sources.r2.shell = /bin/bash -c
a1.sources.r2.channels = c1
a1.sources.r2.command = tail -F /opt/apps/logs/tail2.log

# channel1  in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#sink1
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.ki.kafka.producer.compression.type = snappy

相关文章

  • Flume01

    Flume架构组成 Flume 负载均衡 Flume Agent内部原理 启动 Flume 监听

  • Flume

    总结 一、Flume的定义 1、flume的优势 2、flume的组成 3、flume的架构 二、 flume部署...

  • 玩转大数据计算之Flume

    Flume版本:我们使用Flume最新的版本:Flume NG 1.7.0 Flume架构Flume是一个分布式的...

  • Flume 入门

    一:Flume是什么: 二:特点: 三:Flume版本介绍 四:Flume NG基本架构 五:Flume NG核心...

  • flume的部署和测试

    1 flume 安装 flume下载:http://flume.apache.org/download.htmlf...

  • 091-BigData-19Flume与Flume之间数据传递

    上一篇:090-BigData-18Flume Flume与Flume之间数据传递 一、单Flume多Channe...

  • java大数据之flume

    一、Flume简介 1.1 Flume的位置 1.2 Flume是什么 (1)Flume提供一种分布式的,可靠地,...

  • Flume(一)概述

    Flume图标 Flume图标 Flume定义 Apache Flume是一个分布式,可靠且可用的系统,用于有效地...

  • Flume pull方式和push方式整合

    Pull方式 Flume Agent 编写 启动Flume Push方式 Flume Agent的编写 启动flu...

  • 4.Flume1.9安装

    1 Flume安装部署 1.1 安装地址 (1) Flume官网地址:http://flume.apache.or...

网友评论

      本文标题:Flume

      本文链接:https://www.haomeiwen.com/subject/rssgsftx.html