美文网首页
日志采集Agent

日志采集Agent

作者: john不哭 | 来源:发表于2018-11-24 11:43 被阅读161次

前言

日志采集这一领域,好像没多少技术含量。但是如果往下挖技术细节,可以涉及到Linux文件系统的工作原理(文件回收规则);进程管理(如何保证进程不会被杀死,杀死后如何恢复等问题);如何保证日志不漏,然后到日志不重复;从轮询收集,到inotify事件收集;如何处理日志接收方背压的问题等等。

业界方案

采集agent的方案有以下两种代表:

  • 以Flume为代表
  • 以Elastic公司为代表的logstash ,beat系列。其中本文重点研究了并且参考Filebeat。

参考资料

https://yq.aliyun.com/articles/204554?spm=5176.10695662.1996646101.searchclickresult.6ed6ff98OMv6De
http://www.man7.org/linux/man-pages/man7/inotify.7.html
https://www.elastic.co/guide/en/beats/filebeat/current/index.html

核心问题

  • 轮转时,文件引用次数为零(压缩完成后,发生服务器断电)。导致日志丢失 (通过硬连接hold住文件解决)
  • 如何判断为一条完整日志 (通过\N等)
  • 采集进程被杀如何恢复工作环境

进阶问题:

  • 配置管理问题:如何下放配置,如何热更新配置等问题。
  • 资源限制问题,如何限制资源占用上限。
    完整代码地址:
    https://github.com/Whojohn/log_demo

基本概念

Demo

Version0.1(模仿Tail 实现日志采集,拥有断点续传功能。)

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import time


class Agent(object):
    def __init__(self):
        pass

    def time_count(fun):  # fun = world
        #  @functools.wraps(fun) 的作用就是保留原函数信息如__name__, __doc__, __module__
        @functools.wraps(fun)
        def wrapper(*args, **kwargs):
            """
            this is wrapper function
            :param args:
            :param kwargs:
            :return:
            """
            start_time = time.time()
            temp = fun(*args, **kwargs)  # world(a=1, b=2)
            end_time = time.time()
            print("%s函数运行时间为%s" % (fun.__name__, end_time - start_time))
            return temp

        return wrapper

    @time_count
    def collect(self, file_name):
        f = open(file_name, "r")
        length = 0

        #Recover from the lastest readed.
        try:
            loc = open("tail-seek", "r")
            pre_location = loc.readline()[:-1]
            if pre_location != "":
                f.seek(int(pre_location))
                length += int(pre_location)
                loc.close()
        except:
            pass
        loc = open("tail-seek", "w", buffering=0)

        # Avoid the overload the agent cpu and disk.
        # __loop = 0
        while 1:
            temp = f.readline()
            print temp[:-1]

            # Avoid the overload the agent cpu and disk.
            # if __loop%1000 == 0:
            #   time.sleep(0.003)

            # Push the data to the server.
            if temp != "":
                 print temp
 
            length += len(temp)
            loc.seek(0)
            loc.write(str(length) + '\n')
            if temp == "":
                f.close()
                loc.close()
                break

if __name__ == "__main__":
    s = Agent()
    s.collect("./show")

坑:

  • 参考:https://stackoverflow.com/questions/620367/how-to-jump-to-a-particular-line-in-a-huge-text-file
    windows中当前行字符总长度,不等于当前行文件偏移位置(linux暂时没发现这个问题)。
    解决方案:
    1 使用f.tell()确定当前文件偏移位。

  • 已知进程恢复,可能会导致agent重复上传(上传完成后,准备同步写保存文件偏移位时,同步写还未被写入文件。)。如果先同步写,再上传又会导致日志丢失。
    解决方案:
    1 可以通过双方模拟tcp的ack机制进行,第一版不打算实现这个功能。即通过服务端进行一个版本号(其实就是一个顺序号)进行处理。(完美方案,保证双方都能不重,不漏,但是有一定消耗。)
    2 Agent端逻辑不变,服务端自行对文件版本号进行对比,落后就丢弃。优雅一点服务端告知Agent已经收到版本号(行号),然后指定Agent按照最新版本号上传,类似方案1。

  • 记录文件偏移位,同步写的消耗问题。简单测试了一下,没有同步写大概能快10%左右的样子。
    优化方案:
    1 Agent同步写的逻辑改为1000行写一次offset地址。这样恢复的时候重复问题会变得严重,可以通过服务器丢弃落后的日志解决(也可以引入版本号解决)。

2018/11/26更新

Version1.0(引入网络部分)

特性:

  • 硬连接保证日志收集完成才释放文件。
  • 引入网络部分,简单的server接收者,作为Demo。
    代码见https://github.com/Whojohn/log_demo(version1.0部分)

问题:

  1. 依旧为单进程,但是已经写好各种基础模块。
  2. Server端作为展示,没有把数据落盘,数据落盘时,机器断电,可能存在丢数据的风险。(依旧是版本号解决问题)。
  3. 收集方式依旧为轮询方式。
  4. 没有背压感知功能(可以通过参考Mysql 刷脏的做法,通过引入版本号,对比发送版本号和确认版本号,当发现确认版本号落后发送版本号10%时,只接收,不发送,直至服务器版本号追赶至5%。)。

服务端数据落盘设计

方案:
  • 同一主题的日志放置在同一内存中(deque)。
  • 为了能够利用顺序写,尽可能压榨性能,不同主题的日志合并写入到同一文件中。

如何写:
1 轮询所有不同主题的内存,满足一定条件(一定条数,一定时间等多种条件下)写入。并且写入对应的文件映射关系(如:主题,1~100000条,开始\结束文件offset)。

如何读:
1 找到对应的文件映射表,找到对于的条目如31081在1~100000之间,读取offset,然后循环找到对应的条目。

如何恢复:
1 检查映射表。
2 利用写入的offset等信息重新从Agent拉取数据。

版本号初步设计

方案一:
  • 机器唯一标识(ip)+log文件唯一标识+offset作为版本号, Server端需要重传时候,需要发对应的版本号以及特殊的重传标识到对应的Agent上即可。Agent提取出对应的offset,继续上传。
    坑:
  • offset可能数字可能会很大。

2018/11/27更新

Version1.1 (优化采集性能,网络)

特性:

  • Server数据落盘雏形(没有实现版本号)。
  • Agent通过多条日志打包成一条TCP报文,Server端通过弃用rfile.readline(Python循环太慢,短报文时间完全浪费在循环上。)。大幅度提高Agent采集性能,Server数据落盘性能。

性能如下:

  1. 环境:Aliyun轻量服务器,1Core2G,40G SSD。
  2. 基准性能:
    通过dd测出磁盘性能连续读写性能大概在130MB/s之间。
  3. 日志类型:
    1. 短日志。如(787897797987897)
    2. 正常日志。如Nginx日志。
      优化前:
      1 1亿条,880Mb短日志。大概需要50秒。
      2 一千万条,3.7G日志。大概需要120秒。

优化记录:

方案:

  1. 优化Socket buff大小。
    • 修改Tcp buff为32k。

2.尝试引入压缩机制。

  1. 优化文件读取方式。
    • 每一次读取更多的字节数,减低日志收集细粒度,以8k的细粒度进行读取文件。Python函数具体实现中readlines(size),指定缓存最小就是8k,他会接近于8k以保证数据完整性。(读的粒度越大,文件check_point同步写次数越少,变相提高整体性能。)
    • 后期可以参照readlines,利用read做一个类似的功能(以4k的方式读入,4k刚好能够吻合ssd的一次读写)。
  2. 调整Queue的大小等细节,在保证性能的前提下,减少内存使用,以防止OOM。

优化后:

1 1亿条,880Mb短日志。大概需要14秒(受益于文件读取优化)。
2 1千万条,3.7G日志。大概需要55秒。

2018/11/29更新

考虑到单核存在进程调度的消耗。

在腾讯云再次测试:
条件:
Agent amd 2c4g 50g ssd
Server amd 2c4g 50g ssd
通过dd测出磁盘性能连续读写性能大概在130MB/s之间。

结果:

  1. 1亿条,880Mb短日志。大概需要15秒(受益于文件读取优化)。
  2. 1千万条,3.7G日志。大概需要45秒。

2018/12/3更新

大幅度减少Cpu占用,磁盘性能成为系统瓶颈,引入LZ4作为提高性能选择。(ps:LZ4算法需要依赖相应的python packet。)

彻底解决网络bug问题,由于Socket中 recv(len)不保证接收完整的len长度的data, 会出现数据异常问题。(通过判断每一次recv大小,直至达到len大小作为一次传输完整的数据。查了4天bug~~~)

由于毕业问题,暂停埋坑2周,任何问题请联系我18689235591@163.com,我叫John即可。

相关文章

网友评论

      本文标题:日志采集Agent

      本文链接:https://www.haomeiwen.com/subject/qrmrqqtx.html