这篇文章主要介绍“怎么使用enrich processor”,在日常操作中,相信很多人在怎么使用enrich processor问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”怎么使用enrich processor”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
ingest pipeline 可以在传入的文档被索引之前,对文档进行预处理,通过 processor 中定义的一系列规则来修改文档的内容(例如大小写转换等)。
在 Elasticsearch 7.5 版本引入了 enrich processor,可以将现有索引(source index)中的数据添加到传入的文档(incoming document)中。
比如,你可以在如下的场景中用到:
根据已知的 IP 地址识别 Web 服务或供应商。
根据产品 ID 将产品信息添加到零售订单中。
根据电子邮件地址补充联系信息。
根据用户坐标添加邮政编码。
使用 enrich processor 有如下几个步骤:
1.添加 enrich data:添加 document (enrich data)到一个或者多个的 source index 中,这些 document 中应包含之后要添加到 incoming documents 中的数据。
2.创建 enrich policy:enrich policy 中应至少包含如下参数:
指定source index的。
指定 incoming documents 和 source index 用于匹配的属性。
指定要添加到 incoming documents 中的属性。
3.执行 enrich policy:执行完后会自动创建相应的 enrich index, enrich index 和普通索引不同,进行了优化。
4.在 ingest pipeline 使用 enrich processor:enrich processor 使用 enrich index 来查询。
source index 的内容如下:
loc | num | company |
---|---|---|
广东省 | A1001 | 腾讯 |
上海市 | B1001 | Bilibili |
浙江省 | C1001 | 阿里巴巴 |
incoming document 传入的文档如下,通过 num 字段查到对应 source index 中的 loc 的值,添加到 incoming document 中新增 enrich_loc 属性中。
num | company |
---|---|
A1001 | 腾讯 |
B1001 | Bilibili |
C1001 | 阿里巴巴 |
通过 _bulk API 批量添加文档到 location 索引,这些文档和普通的文档一样。
POST _bulk {"index": {"_index":"location"}} {"loc":"广东省","company":"腾讯","num":"A1001"} {"index": {"_index":"location"}} {"loc":"上海市","company":"Bilibili","num":"B1001"} {"index": {"_index":"location"}} {"loc":"浙江省","company":"阿里巴巴","num":"C1001"}
enrich policy 一旦创建,就不能更新或者修改。
PUT /_enrich/policy/my-policy { "match": { "indices": "location", #source index 索引名,就是前面创建的 enrich data 对应的索引 "match_field": "num", #source index 中的属性名,用于incoming documents 和 source index 匹配的属性,属性名一样都是 num "enrich_fields": ["loc"], #添加到 incoming documents 中的属性 # 可选,过滤 source index 的文档,只有 loc.keyword 是上海市的 enrich data 才能将属性添加到 incoming documents 中 "query": { "match": { "loc.keyword": "上海市" } } } }
当创建了 enrich policy 后,你可以通过 execute enrich policy API 去执行 enrich policy。当执行 enrich policy 后,会自动创建 enrich index。
直接将 incoming document 与 source index 中的文档匹配可能会很慢且占用大量资源。 为了加快处理速度,enrich processor 使用了 enrich index。 enrich index 包含来自 source index 的 enrich data,enrich index 具有一些特殊属性可帮助简化它们:
它们是系统索引,这意味着它们由 Elasticsearch 在内部进行管理,仅适用于 enrich processor。
它们始终以 .enrich- * 开头。
它们是只读的,这意味着你不能直接更改它们。
它们被强制合并以便快速检索。
当 source index 中新增或者修改了数据,只需要重新执行 enrich policy 就可以更改 enrich index,从而更新 enrich processor。
通过以下命令执行 enrich policy:
PUT /_enrich/policy/my-policy/_execute
查看自动创建的 enrich index:
GET _cat/indices/.enrich* # 返回结果 green open .enrich-my-policy-1616136526661 Vxal9lLBSlKS5lmzMpFfwQ 1 3 1 0 13.4kb 3.3kb
我感觉 enrich policy 这里有个小 bug,当删除 enrich policy 时,例如删除的 enrich policy 为 my-policy-1,会同时删除 my-policy-1 的 enrich index 和 enrich policy ,但是如果原先还有个 my-policy-2(两个 enrich policy 在-
之前是一样的),会把 my-policy-2 的 enrich index 也误删了(enrich policy 不删)。
PUT _ingest/pipeline/loc-pipeline { "processors": [ { "enrich": { "policy_name": "my-policy", #引用前面创建的 enrich policy "field": "num", # incoming document 中的属性名,用于和 source index 中的属性匹配值 #在incoming document 中新增的属性, #包含在 enrich policy 中定义的 match_field 和 enrich_fields 的值 "target_field": "enrich_loc" } } ] }
使用 simulate 用来调试 ingest pipeline的效果,由于 source index 中匹配到的 loc.keyword 不是上海市,不会对这个文档进行处理:
POST _ingest/pipeline/loc-pipeline/_simulate { "docs": [ { "_source": { "num": "A1001", "company": "腾讯" } } ] } # 返回结果 { "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "company" : "腾讯", "num" : "A1001" }, "_ingest" : { "timestamp" : "2021-03-19T06:56:45.754486259Z" } } } ] }
这个文档的 loc.keyword 是上海市,因此会添加上 enrich data 中指定的属性:
POST _ingest/pipeline/loc-pipeline/_simulate { "docs": [ { "_source": { "num": "B1001", "company": "Bilibili" } } ] } # 返回结果 { "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "company" : "Bilibili", "enrich_loc" : { "loc" : "上海市", "num" : "B1001" }, "num" : "B1001" }, "_ingest" : { "timestamp" : "2021-03-19T06:56:29.393585306Z" } } } ] }
在 simulate 调试成功之后,我们在插入文档的时候指定 ingest pipeline:
# 方式一:单条插入 POST origin-location/_doc?pipeline=loc-pipeline { "num": "A1001", "company": "腾讯" } POST origin-location/_doc?pipeline=loc-pipeline { "num": "B1001", "company": "Bilibili" } # 方式二:批量插入 POST _bulk?pipeline=loc-pipeline {"index":{"_index":"origin-location"}} {"num":"A1001","company":"腾讯"} {"index":{"_index":"origin-location"}} {"num":"B1001","company":"Bilibili"}
查看插入的结果:
GET origin-location/_search #返回结果 { "took" : 12, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 2, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "origin-location", "_type" : "_doc", "_id" : "zXxLSXgBUc4opBV-QiOv", "_score" : 1.0, "_source" : { "num" : "A1001", "company" : "腾讯" } }, { "_index" : "origin-location", "_type" : "_doc", "_id" : "znxLSXgBUc4opBV-SCPk", "_score" : 1.0, "_source" : { "num" : "B1001", "company" : "Bilibili", "enrich_loc" : { "loc" : "上海市", "num" : "B1001" } } } ] } }
也可以指定索引默认使用的 ingest pipeline ,这样就不用每次在插入文档的时候指定 ingest pipeline了:
# 指定索引默认使用的 ingest pipeline PUT origin-location2 { "settings": { "default_pipeline": "loc-pipeline" } } # 插入数据 POST _bulk {"index":{"_index":"origin-location2"}} {"num":"A1001","company":"腾讯"} {"index":{"_index":"origin-location2"}} {"num":"B1001","company":"Bilibili"} # 查看结果 GET origin-location2/_search # 输出结果 { "took" : 8, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 2, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "origin-location2", "_type" : "_doc", "_id" : "CXxPSXgBUc4opBV-oyTJ", "_score" : 1.0, "_source" : { "num" : "A1001", "company" : "腾讯" } }, { "_index" : "origin-location2", "_type" : "_doc", "_id" : "CnxPSXgBUc4opBV-oyTJ", "_score" : 1.0, "_source" : { "num" : "B1001", "company" : "Bilibili", "enrich_loc" : { "loc" : "上海市", "num" : "B1001" } } } ] } }
另外还可以使用 index template,通过正则表达式的方式匹配多个索引,来指定索引使用的 ingest pipeline:
# 使用 index template PUT _template/my-template { "index_patterns": ["origin-*"], "settings": { "default_pipeline": "loc-pipeline" } } # 插入数据 POST _bulk {"index":{"_index":"origin-location3"}} {"num":"A1001","company":"腾讯"} {"index":{"_index":"origin-location3"}} {"num":"B1001","company":"Bilibili"} # 查看结果 GET origin-location3/_search # 输出结果 { "took" : 2, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 2, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "origin-location3", "_type" : "_doc", "_id" : "XnxVSXgBUc4opBV-1yRp", "_score" : 1.0, "_source" : { "num" : "A1001", "company" : "腾讯" } }, { "_index" : "origin-location3", "_type" : "_doc", "_id" : "X3xVSXgBUc4opBV-1yRp", "_score" : 1.0, "_source" : { "num" : "B1001", "company" : "Bilibili", "enrich_loc" : { "loc" : "上海市", "num" : "B1001" } } } ] } }
到此,关于“怎么使用enrich processor”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。