Logstash的输入流有多种方式,如标准输入流、Redis、Kafka、文件、数据库、Filebeat等,下面分章节简单介绍更类输入流。
1、Stdin
从标准输入读取事件。
默认情况下,每一行读取为一个事件。如果要多行读取为一个事件,则需要使用[multiline解码器]
input {
stdin {
}
}
2、Redis
通过配置Redis的连接地址及key值,获取value值。
input{
redis {
host =>"192.168.200.21"
port =>" 6379"
db =>"6"
data_type =>"list"
key="demo"
}
}
output {
elasticsearch {
hosts => [ "192.168.200.21:9200" ]
index => "redis-demo-%{+YYYY.MM.dd}"
}
}
使用Redis作为输入,data_type 可以有三种类型,分别是:
1)list,表示的redis命令为blpop,代表从redis list的左边获取第一个元素,如无元素则阻塞;
2)channel,表示的redis命令为subscribe,代表从redis频道获取最新的数据;
3)pattern_channel,表示的redis命令为psubscribe,代表通过pattern正则表达式匹配频道,获取最新的数据。
其中:
1)channel与pattern_channel的区别在于,pattern_channel可以通过正则表达式匹配多个频道,而channel是单一频道;
2)list与另外两个channel的区别在于,1个channel的数据会被多个订阅的logstash重复获取,1个list的数据被多个logstash获取时不会重复,会被分摊在各个Logstash中。
3、Kafka
通过配置Redis的连接地址及key值,获取value值。
input{
kafka{
bootstrap_servers => ["192.168.110.31:9092,192.168.110.31:9093,192.168.110.31:9094"]
client_id => "test"
group_id => "test"
auto_offset_reset => "latest" //从最新的偏移量开始消费
consumer_threads => 5
decorate_events => true //此属性会将当前topic、offset、group、partition等信息也带到message中
topics => ["logq","loge"] //数组类型,可配置多个topic
type => "bhy" //所有插件通用属性,尤其在input里面配置多个数据源时很有用
}
}
此处使用了decorate_events属性,注意看logstash控制台打印的信息,会输出如下
"kafka":{"consumer_group":"test","partition":0,"offset":10430232,"topic":"logq","key":null}
另外一个input里面可设置多个kafka,从而读取多个kafka的消息。
4、Beats
在FileBeat中已经配置好了将日志输出到Logstash,在Logstash中,只需要接收数据即可。
input {
beats {
port => 5044
}
}










网友评论