美文网首页我爱编程
数据采集之Flume+Kafka

数据采集之Flume+Kafka

作者: 吃橘子的冬天 | 来源:发表于2017-12-05 10:43 被阅读243次

Flume简介

1. Flume特点

flume是收集日志的开源软件解决方案之一,相对于其他同类软件他具有高可用的,高可靠的,分布式等特性。flume的数据流由事件(Event)贯穿始终。事件是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些Event由Agent外部的Source生成,当Source捕获事件后会进行特定的格式化,然后Source会把事件推入(单个或多个)Channel中。你可以把Channel看作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source

2. Flume核心概念

  • Agent 使用JVM运行Flume 每台机器运行一个agent , 但是可以在一个agent中包含多个sources和sinks
  • Client 生产数据 , 运行在一个独立的线程
  • Source 从Client收集数据 , 传递给Channel
  • Sink 从Channel收集数据 , 运行在一个独立线程
  • Channel 连接 sources 和 sinks ,这个有点像一个队列
  • Events 可以是日志记录、 avro 对象等

Flume快速开发

1. 安装

  • yum 方式下载安装 :
[mis-ecif@hadoop10-4-0-226 ~]$ yum install flume 

解压文件,若打印如下信息,解压缩报错 ,可能是包没下载完全,重新下载重试即可

[mis-ecif@hadoop10-4-0-226 ~]$ tar -zxvf apache-flume-1.6.0-bin.tar.gz
gzip: stdin: unexpected end of file  
tar: Unexpected EOF in archive  
tar: Unexpected EOF in archive
tar: Error is not recoverable: exiting now

若解压成功,可检测安装是否成功:/usr/local/flume/bin/flume-ng version
打印以下信息,则表示安装成功了

[mis-ecif@hadoop10-4-0-226 ~]$ flume-ng version
Flume 1.6.0-transwarp-tdh480
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: Unknown
Compiled by root on Fri Apr  7 07:52:45 UTC 2017
From source with checksum 4031fa0e0507f30090f954451ab3a164

若打印以下信息,可能是因为安装了hbase,将Hbase的hbase-env.sh文件中HBASE_CLASS注释掉即可

[mis-ecif@hadoop10-4-0-226 ~]$ flume-ng version
Could not find or load main class org.apache.flume.tools.GetJavaProperty #加载不了该类
Flume 1.6.0-transwarp-tdh480
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: Unknown
Compiled by root on Fri Apr  7 07:52:45 UTC 2017
From source with checksum 4031fa0e0507f30090f954451ab3a164

2. 开发

  • 更改Flume配置文件
cd /usr/local/flume/conf/
cp flume-env.sh.template flume-env.sh
vi flume-env.sh # 修改flume-env.sh中JAVA_HOME变量的值
  • 创建Flume启动使用到的配置文件 exec_tail.conf
[root@hadoop10-1-0-144 conf]# vi /local/flume/conf/exec_tail.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/mis-ecif/flume_logs/phoneinfo-20171204.log

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3. 测试

  • 启动Flume
flume-ng agent -c /usr/lib/flume/apache-flume-1.6.0-bin/conf -f /usr/lib/flume/apache-flume-1.6.0-bin/conf/spoon_kafka.conf -n a1 -Dflume.root.logger=INFO,console
  • 往Flume监控日志中添加数据
echo 'phoneinfo||223.104.7.66||OPPO R9sk||6.0.1||天津市||2017-07-25 06:53:23||中国移动||yingyongbao
' >> /opt/mis-ecif/flume_logs/phoneinfo-20171204.log

echo 'phoneinfo||101.38.64.172||iPhone 6 Plus||10.3.2||北京市||2017-07-25 07:11:40||中国联通' >> /opt/mis-ecif/flume_logs/phoneinfo-20171204.log

控制台若有数据打印,则表示测试成功

4. 更改配置,与kafka集成

  • 将消息传给 kafka
# Describe the sink
a1.sinks.k1.type = logger
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = flume_demo
#a1.sinks.k1.brokerList = 10.1.0.141:9092,10.1.0.142:9092,10.1.0.143:9092,10.1.0.144:9092
#Kafka集群Broker列表
a1.sinks.k1.brokerList = 10.1.0.144:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 100
  • 将消息缓存在本地文件系统中 --建议将消息缓存在本地文件系统
# Use a channel which buffers events in memory
a1.channels.c1.type = file
a1.channels.c1.checkpoint = /mnt/disk1/flume/checkpoint #检查点文件存储路径
a1.channels.c1.dataDirs = /mnt/disk1/flume/data #消息数据存储路径
  • 创建kafka topic
./kafka-topics.sh --zookeeper 10.1.0.144:2181 --create --topic flume_demo --partition 3 --replication-factor 1
  • 查看topic
./kafka-topics.sh  --list --zookeeper 10.1.0.144:2181
  • 启动kafka consumer,接收flume消息
./kafka-console-consumer.sh --topic flume_demo  --bootstrap-server 10.1.0.144:9092
  • 重启Flume
flume-ng agent -c /usr/lib/flume/apache-flume-1.6.0-bin/conf -f /usr/lib/flume/apache-flume-1.6.0-bin/conf/spoon_kafka.conf -n a1 -Dflume.root.logger=INFO,console
  • 往flume监控文件中添加日志
echo 'phoneinfo||116.227.248.47||HUAWEI MT7-CL00||6.0||||2017-07-25 07:21:36||中国移动||yingyongbao'  >> /opt/mis-ecif/flume_logs/phoneinfo-20171204.log

查看kafka consumer窗口,若能够正常接收消息,则表示集成kafka成功。

相关文章

  • 数据采集之Flume+Kafka

    Flume简介 1. Flume特点 flume是收集日志的开源软件解决方案之一,相对于其他同类软件他具有高可用的...

  • Python网络数据采集之图像识别与文字处理

    网络采集系列文章 Python网络数据采集之创建爬虫Python网络数据采集之HTML解析Python网络数据采集...

  • 大数据之数据采集

    大数据体系一般分为:数据采集、数据计算、数据服务、以及数据应用 几大层次。 在数据采集层,主要分为 日志采集 和 ...

  • 大数据系列之Flume+kafka 整合

    关于Flume 的 一些核心概念: 组件名称 功能介绍 Agent代理 使用JVM 运行Flume。每台机器运...

  • 数据仓库搭建

    全流程:数据采集->数据存储->数据分析->数据呈现 数据采集 首先我们从数据采集来说,数据采集的数据主要来自于日...

  • Flume+Kafka双剑合璧玩转大数据平台日志采集

    概述 大数据平台每天会产生大量的日志,处理这些日志需要特定的日志系统。 一般而言,这些系统需要具有以下特征: 构建...

  • 产品要懂点数据分析(三)- 分析过程

    数据采集 数据分析的对象就是数据,通过数据采集来获得数据。数据采集在《产品要懂点数据分析(一)-数据采集和数据指标...

  • 数据采集之websocket

    这里有一篇很好的解释websocket的网站供大家参考 https://segmentfault.com/a/11...

  • 用数据驱动产品和运营 之 数据处理流程

    数据处理流程: 数据金字塔:数据采集——数据建模——数据分析 (一)数据采集 数据采集问题:不准确、不完备、不细致...

  • 第一章 总述

    大数据系统体系:数据采集、数据计算、数据服务、数据应用 一. 数据采集层: 1)web端日志采集技术方案:Aplu...

网友评论

    本文标题:数据采集之Flume+Kafka

    本文链接:https://www.haomeiwen.com/subject/bjpqixtx.html