美文网首页
0x05.Logstash导入CSV数据

0x05.Logstash导入CSV数据

作者: 0x70e8 | 来源:发表于2018-07-05 18:05 被阅读0次

安装配置Logstash

  • 下载Logstash,解压即可使用
  • 检查JDK版本
  • logstash数据传输的逻辑结构

图片来自官方
  • 检查Logstash是否可用
# 进入文件目录
cd /home/test/dev/logstash-6.2.4
# 测试stdin输入是否能输出到stdout
bin/logstash -e 'input { stdin { } } output { stdout {} }'

等待返回如下类似内容:

$ bin/logstash -e 'input { stdin { } } output { stdout {} }'
Sending Logstash's logs to /home/test/dev/logstash-6.2.4/logs which is now configured via log4j2.properties
[2018-07-05T18:50:53,944][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/home/test/dev/logstash-6.2.4/modules/fb_apache/configuration"}
[2018-07-05T18:50:53,966][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/home/test/dev/logstash-6.2.4/modules/netflow/configuration"}
[2018-07-05T18:50:54,724][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2018-07-05T18:50:55,545][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"6.2.4"}
[2018-07-05T18:50:56,171][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2018-07-05T18:51:00,195][INFO ][logstash.pipeline        ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2018-07-05T18:51:00,622][INFO ][logstash.pipeline        ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x4c49822d sleep>"}
The stdin plugin is now waiting for input:
[2018-07-05T18:51:00,756][INFO ][logstash.agent           ] Pipelines running {:count=>1, :pipelines=>["main"]}

键入hellologstash:

hellologstash
{
      "@version" => "1",
    "@timestamp" => 2018-07-05T09:51:55.280Z,
          "host" => "host01",
       "message" => "hellologstash"
}

返回以上内容表示logstash正常工作。
Ctrl+D退出进程。

准备数据文件

  • user.csv
$ cat user.csv
"1","李","俊明","0","88","1"
"2","高","励","1","54","2"
"3","郑","景诚","0","55","3"
"4","杨","景诚","1","98","4"
"5","阮","俊依","0","41","5"
"6","江","明","0","53","6"
"7","赵","十莉","1","87","7"
"8","彭","十莉","0","28","8"
"9","王","恬","0","54","9"
"10","林","恬","1","63","10"

将csv后缀改为txt(csv不知道为什么一直无响应)

配置logstash输入输出设置

类似前面的'input { stdin { } } output { stdout {} }',这次因为比较长,将配置写在文件中:

  • 在logstash下的config目录下创建文件csv.conf
input{
    file{
        path => ["/tmp/data/*.txt"]
        start_position =>"beginning"
        # 5s发现一次
        discover_interval => 5
        # 最后修改时间15s内的才会扫描,其他的忽略
        ignore_older => 15
        close_older => 1
        # 最大打开文件数
        max_open_files => 32
         # 检查文件状态的间隔时间,默认1s
        stat_interval => 5
        }
}
filter{
    csv{
        separator => ","
        columns => ["id","first_name","last_name","gender","score","no"]
        }
}
output{
    # stdout{}
    elasticsearch{
        index => "user"
        # document_id => "%{id}" 让es自动生成id
        hosts => ["127.0.0.1:9200"]
    }
}

自动生成文件模拟

使用python脚本自动以10s的间隔cp文件到logstash扫描的文件夹中去

  • autoGen.py
#!/usr/bin/python
import time,commands
if __name__ == '__main__':
    index = 0
    datafile = "/tmp/user.txt"
    dst = "/tmp/data/{file}.txt"
    while 1:
        index+=1
        command = "cp {src} {dst}".format(src=datafile,dst=dst.format(file="data"+str(index)))
        print(command)
        commands.getoutput(command)
        time.sleep(10)
        pass

创建索引

PUT user_dev

配置映射

PUT user_dev/_mapping/doc
{
  "properties":{
    "id":{
      "type":"integer"
    },
    "first_name":{
      "type":"text",
      "fields": {
          "keyword":{
            "type": "keyword",
            "ignore_above": 256
        }
      }
    },
    "last_name":{
      "type":"text",
      "fields": {
        "keyword":{
        "type": "keyword",
          "ignore_above": 256
        }
      }
    },
    "gender":{
      "type":"short"
    },
    "score":{
    "type":"float"
    },
    "no":{
    "type":"integer"
    }
  }
}
# 查看映射
GET user_dev/_mapping
# 设置别名
PUT user_dev/_alias/user
# 查看别名
GET user_dev/_alias

启动

  • 确保elasticsearch已启动$ ps -ef | grep elsatic
  • 启动logstash$ bin/logstash -f config/csv.conf
  • 启动python脚本./autoGen.py

查看结果

$ curl "http://localhost:9200/user/_search?pretty&_source=false&size=0"
{
  "took" : 6,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 180290,
    "max_score" : 0.0,
    "hits" : [ ]
  }
}

相关文章

网友评论

      本文标题:0x05.Logstash导入CSV数据

      本文链接:https://www.haomeiwen.com/subject/pwopuftx.html