ES5.0后引入的一种新的节点类型,默认配置下,每个节点都是Ingest Node
1.具有预处理能力,能够拦截Index或Bulk Api的请求
2.对数据进行处理,并重新返回给Index或Bulk Api
通过pipeline的方式使得ES具备了一定的实时处理能力。
下面的例子是将tags转化为数组后返回
#模拟pipeline
POST _ingest/pipeline/_simulate
{
"pipeline": {
"description": "test",
"processors": [
{
"split": {
"field": "tags",
"separator": ","
}
}
]
},
"docs": [
{
"_index": "index",
"_id": "id",
"_source": {
"title":"introucing big data",
"tags": "hadoop,es,spark,flink"
}
}
]
}
#添加pipeline,同时可以指定多个字段的设置,在processors多配置几个对象
put _ingest/pipeline/blog_pipeline
{
"description": "test",
"processors": [
{
"split": {
"field": "tags",
"separator": ","
}
},
{
"set": {
"field": "views",
"value": 0
}
}
]
}
#查看pipeline
get _ingest/pipeline/blog_pipeline
#测试pipeline
POST _ingest/pipeline/blog_pipeline/_simulate
{
"docs": [
{
"_index": "index",
"_id": "id",
"_source": {
"title": "introucing big data",
"tags": "hadoop,es,spark,flink"
}
}
]
}
添加一条数据
put blogs/_doc/1
{
"title": "introucing big data",
"tags": "hadoop,es,spark,flink"
}
#通过blog_pipeline处理后
put blogs/_doc/2?pipeline=blog_pipeline
{
"title": "introucing dataware",
"tags": "adb,doris,preto"
}
get blogs/_search
{}
更新之前没有pipeline处理的数据
#update_by_query会报错,因为tags已经变为数据了
post blogs/_update_by_query?pipeline=blog_pipline
{}
#增加update_by_query条件,通过这种方式可以处理之前没有管道处理的数据
post blogs/_update_by_query?pipeline=blog_pipeline
{
"query":{
"bool": {
"must_not": [
{
"exists": {
"field": "views"
}
}
]
}
}
}
#再查看数据
get blogs/_search
{}
# 创建索引指定pipeline,就不用每次添加数据的时候指定pipeline
PUT blogs
{
"settings": {
"default_pipeline": "blog_pipeline"
}
}











网友评论