整合flume和kafka完成实时数据采集

这里的kafka sink实际作为kafka的生产者。
flume版本不同,配置参数不同
如果你的flume的版本来自于cdh5的网站,那么你在配置agent配置文件时,需要根据对应的flume版本号下的开发者指南来配置,我用的时flume-ng-1.6.0-cdh5.7.0的版本,所以我需要查看http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.7.0/FlumeUserGuide.html用户指南
找到kafka sink的配置

test-avro-memory-kafka.conf的配置文件
##配置source
test-avro-memory-kafka.sources=avro-source
test-avro-memory-kafka.sinks=kafka-sink
test-avro-memory-kafka.channels=memroy-channel
test-avro-memory-kafka.sources.avro-source.type=avro
test-avro-memory-kafka.sources.avro-source.bind=10.101.x.x
test-avro-memory-kafka.sources.avro-source.port=44444
test-avro-memory-kafka.channels.memroy-channel.type=memory
test-avro-memory-kafka.sinks.kafka-sink.type =org.apache.flume.sink.kafka.KafkaSink
#test-avro-memory-kafka.sinks.kafka-sink.kafka.bootstrap.servers=10.101.x.x:9092
test-avro-memory-kafka.sinks.kafka-sink.brokerList=10.101.x.x:9092
#test-avro-memory-kafka.sinks.kafka-sink.kafka.topic=test
test-avro-memory-kafka.sinks.kafka-sink.topic=test
#test-avro-memory-kafka.sinks.kafka-sink.kafka.producer.acks=1
test-avro-memory-kafka.sinks.kafka-sink.requiredAcks=1
#test-avro-memory-kafka.sinks.kafka-sink.flumeBatchSize=5
test-avro-memory-kafka.sinks.kafka-sink.batchSize=5
## Bind the source and sink to the channel
test-avro-memory-kafka.sources.avro-source.channels=memroy-channel
test-avro-memory-kafka.sinks.kafka-sink.channel=memroy-channel
启动此flume服务
./flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/test-avro-memory-kafka.conf --name test-avro-memory-kafka -Dflume.root.logger=INFO,console
启动kafka的消费端服务
./kafka-console-consumer.sh --bootstrap-server 10.101.x.x:9092 --topic test--from-beginning
这个时候,你的kafka消费端,应该就能收到信息了

网友评论