美文网首页
Flume学习二(数据采集场景模拟)

Flume学习二(数据采集场景模拟)

作者: 刘子栋 | 来源:发表于2018-08-07 12:24 被阅读0次

1、单Agent模式

场景说明:

source采用netcat(可以直接通过Telnet命令做数据测试),channel统一采用memory,sink在这里采用HDFS sink

配置(netcat-memory-hdfs.conf):

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = netcat

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 33333

# Describe the sink

a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path = hdfs://192.168.205.131:9000/data/%Y%m%d%H%M

a1.sinks.k1.hdfs.filePrefix = app_name

a1.sinks.k1.hdfs.fileSuffix = .log

a1.sinks.k1.hdfs.inUseSuffix = .tmp

a1.sinks.k1.hdfs.rollInterval = 30

a1.sinks.k1.hdfs.rollSize = 10485760

a1.sinks.k1.hdfs.rollCount = 100000

a1.sinks.k1.hdfs.round = true

a1.sinks.k1.hdfs.roundValue = 10

a1.sinks.k1.hdfs.roundUnit = minute

a1.sinks.k1.hdfs.useLocalTimeStamp = true

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

启动:

flume-ng agent \

--name a1 \

--conf $FLUME_HOME/conf \

--conf-file /home/hadoop/app/flume-1.6.0-cdh5.14.0-bin/agents/netcat-memory-hdfs.conf \

-Dflume.root.logger=INFO,console

测试:

telnet localhost 33333 ==>输入测试数据+Enter

2、多Agent场景

场景说明:

多Agent串联工作,这里采用avro作为两个Agent之间的数据传输,foo的source采用netcat,两个Agent的channel都是memory,bar的sink采用的是logger(测试方便)

配置1(netcat-memory-avro.conf):

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = netcat

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 33333

# Describe the sink

a1.sinks.k1.type = avro

a1.sinks.k1.hostname = localhost

a1.sinks.k1.port = 44444

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

配置2(avro-memory-logger.conf):

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = avro

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 44444

a1.sources.r1.ipFilter = true

a1.sources.r1.ipFilterRules = allow:ip:192.*,allow:name:localhost

a1.sources.r1.interceptors = i1 i2 i3

a1.sources.r1.interceptors.i1.type = timestamp

a1.sources.r1.interceptors.i2.type = host

a1.sources.r1.interceptors.i3.type = static

a1.sources.r1.interceptors.i3.key = test_key

a1.sources.r1.interceptors.i3.value = test_value

# Describe the sink

a1.sinks.k1.type = logger

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

启动(其他启动逻辑一致,都是先启动消费者,再启动生产者):

先启动消费者(即avro-memory-logger的Agent(bar))

flume-ng agent \

--name a1 \

--conf $FLUME_HOME/conf \

--conf-file /home/hadoop/app/flume-1.6.0-cdh5.14.0-bin/agents/avro-memory-logger.conf \

-Dflume.root.logger=INFO,console

再启动生产者(即netcat-memory-avro的agent(foo))

flume-ng agent \

--name a1 \

--conf $FLUME_HOME/conf \

--conf-file /home/hadoop/app/flume-1.6.0-cdh5.14.0-bin/agents/netcat-memory-avro.conf \

-Dflume.root.logger=INFO,console

测试:

telnet  localhost 33333  ==>输入测试数据+Enter

3、多Agent(多对一)

场景说明:

为了测试方便,这里采用2对1的模式,Agent1采用netcat-memory-avro模式,Agent2采用avro-memory-avro模式,Agent4采用avro-memory-logger模式

配置Agent1:

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = netcat

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 33333

# Describe the sink

a1.sinks.k1.type = avro

a1.sinks.k1.hostname = localhost

a1.sinks.k1.port = 44444

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

配置Agent2:

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = avro

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 44444

a1.sources.r1.ipFilter = true

a1.sources.r1.ipFilterRules = allow:ip:192.*,allow:name:localhost

a1.sources.r1.interceptors = i1 i2 i3

a1.sources.r1.interceptors.i1.type = timestamp

a1.sources.r1.interceptors.i2.type = host

a1.sources.r1.interceptors.i3.type = static

a1.sources.r1.interceptors.i3.key = test_key

a1.sources.r1.interceptors.i3.value = test_value

# Describe the sink

a1.sinks.k1.type = avro

a1.sinks.k1.hostname = localhost

a1.sinks.k1.port = 22222

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

配置Agent4:

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = avro

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 44444

a1.sources.r1.ipFilter = true

a1.sources.r1.ipFilterRules = allow:ip:192.*,allow:name:localhost

a1.sources.r1.interceptors = i1 i2 i3

a1.sources.r1.interceptors.i1.type = timestamp

a1.sources.r1.interceptors.i2.type = host

a1.sources.r1.interceptors.i3.type = static

a1.sources.r1.interceptors.i3.key = test_key

a1.sources.r1.interceptors.i3.value = test_value

# Describe the sink

a1.sinks.k1.type = logger

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

测试:

telnet  localhost  33333==>输入测试数据+Enter

avro测试:

4、一对多场景

场景说明:

为了方便演示,我才用1对2模式,source为avro,Log4jAppender采用不同的logger对象轮流发送数据,然后测试channel selector的作用。channel都是memory,sink1是logger,sink3是avro,对应另一个Agent。

配置(avro-memorys-logger_avro):

a1.sources = r1

a1.sinks = k1 k2

a1.channels = c1 c2

# Describe/configure the source

a1.sources.r1.type = avro

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 22222

a1.sources.r1.selector.type = multiplexing

a1.sources.r1.selector.header = flume.client.log4j.logger.name

a1.sources.r1.selector.mapping.logger-1 = c1

a1.sources.r1.selector.mapping.logger-0 = c2

# Describe the sink

a1.sinks.k1.type = logger

a1.sinks.k2.type = avro

a1.sinks.k2.hostname = localhost

a1.sinks.k2.port = 44444

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory

a1.channels.c2.capacity = 1000

a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1 c2

a1.sinks.k1.channel = c1

a1.sinks.k2.channel = c2

配置(avro-memory-logger):

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = avro

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 44444

a1.sources.r1.ipFilter = true

a1.sources.r1.ipFilterRules = allow:ip:192.*,allow:name:localhost

a1.sources.r1.interceptors = i1 i2 i3

a1.sources.r1.interceptors.i1.type = timestamp

a1.sources.r1.interceptors.i2.type = host

a1.sources.r1.interceptors.i3.type = static

a1.sources.r1.interceptors.i3.key = test_key

a1.sources.r1.interceptors.i3.value = test_value

# Describe the sink

a1.sinks.k1.type = logger

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

测试:

相关文章

  • Flume学习二(数据采集场景模拟)

    1、单Agent模式 场景说明: source采用netcat(可以直接通过Telnet命令做数据测试),chan...

  • Flume架构与实践

    Flume架构与实践 Flume是一款在线数据采集的系统,典型的应用场景是作为数据的总线,在线的进行日志的采集、分...

  • 数据平台实践①——Flume+Kafka+SparkStream

    蜻蜓点水 Flume——数据采集 如果说,爬虫是采集外部数据的常用手段的话,那么,Flume就是采集内部数据的常用...

  • 项目技术选型

    数据采集传输 FLUME,DATAHUB,RDS FLUME,KAFKA,SQOOP,DATAX 数据存储 MAX...

  • Kafka学习笔记二:Flume+Kafka安装

    Flume介绍 Flume是流式日志采集工具,FLume提供对数据进行简单处理并且写到各种数据接收方(可定制)的能...

  • (十)大数据学习之sqoop

    Sqoop 1.架构: (1)flume数据采集 采集日志数据(2)sqoop数据迁移 hdfs->mysql(3...

  • 大数据开发:Flume日志采集框架简介

    在大数据学习阶段,Flume作为日志采集的重要组件之一,是必学的一个部分。在Hadoop生态当中,Flume负责日...

  • flume数据采集

    简介 flume官网里面有user guide。作用:日志采集、聚合、传输核心组件:Agentagent内部组件:...

  • 大数据学习之:Flume

    flume作用 从磁盘采集文件发送到HDFS 数据采集来源:系统日志文件、Python爬虫数据、端口数据 数据发送...

  • Flume基础学习

    Flume是一款非常优秀的日志采集工具。支持多种形式的日志采集,作为apache的顶级开源项目,Flume再大数据...

网友评论

      本文标题:Flume学习二(数据采集场景模拟)

      本文链接:https://www.haomeiwen.com/subject/doxnvftx.html