Elasticsearch:如何处理 ingest pipeline 中的异常
在我之前的文章 “如何在 Elasticsearch 中使用 pipeline API 来对事件进行处理” 中,我详细地介绍了如何创建并使用一个 ingest pipeline。简单地说 pipeline 是一系列处理器的定义,这些处理器将按照声明的顺序执行。 pipeline 包含两个主要字段:描述和处理器列表:
在这里,特别需要指出的是 pipeline 是运行于 ingest node 之上的。所有的 ingest pipeline 被保存于 cluster state 中。
Pipeline 是如何工作的
下面是一个定义 pipeline 的例子:
PUT _ingest/pipeline/apache-log
{"description": "This is an example for apache logs","processors": [{"grok": {"field": "message","patterns": ["%{COMMONAPACHELOG}"]}},{"date": {"field": "timestamp","formats": ["dd/MMM/yyyy:HH:mm:ss Z"]}},{"remove": {"field": "message"}}]
}
上面的 processors 将被依次执行。我们可以使用如下的例子来进行调用:
PUT logs/_doc/1?pipeline=apache-log
{"message": "83.149.9.216 - - [17/May/2015:10:05:03 +0000] \"GET / HTTP/1.1\" 200 24"
}
上面的命令的输出为:
{"took" : 20,"timed_out" : false,"_shards" : {"total" : 1,"successful" : 1,"skipped" : 0,"failed" : 0},"hits" : {"total" : {"value" : 1,"relation" : "eq"},"max_score" : 1.0,"hits" : [{"_index" : "logs","_type" : "_doc","_id" : "1","_score" : 1.0,"_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","@timestamp" : "2015-05-17T10:05:03.000Z","response" : "200","bytes" : "24","clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +0000"}}]}
}
我们可以通过如下的命令来查询已经被 apache-log pipeline 处理过的文档:
GET logs/_doc/1
上面的命令将返回:
{"_index" : "logs","_type" : "_doc","_id" : "1","_version" : 2,"_seq_no" : 1,"_primary_term" : 1,"found" : true,"_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","@timestamp" : "2015-05-17T10:05:03.000Z","response" : "200","bytes" : "24","clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +0000"}
}
从上面我们可以看出来:经过 apache-log 这个 pipeline 的一组 processors,我们成功地把 log 进行结构化并丰富我们的数据。我们通过 grok processor 把数据进行结构化;通过 date processor 把 @timestamp 设置为和 timestamp 一样的值;通过 remove 把 message 字段去掉。
在设计 pipeline 时,我们很少情况下直接让它作用于我们的文档。在更多的情况下,我们希望通过一些测试文档来检验我们的 pipeline 的正确性。否则一个不正确的 pipeline 会把我们的数据搞坏。我们可以通过 _simulate 来进行检测。针对我们的情况:
POST _ingest/pipeline/apache-log/_simulate
{"docs": [{"_source": {"message": "83.149.9.216 - - [17/May/2015:10:05:03 +0000] \"GET / HTTP/1.1\" 200 24"}}]
}
在上面 docs 可以定义各种可能的文档类型来进行测试。它是一个数组。我们可以同时定义多个文档来进行测试。上面的命令的返回结果是:
{"docs" : [{"doc" : {"_index" : "_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","@timestamp" : "2015-05-17T10:05:03.000Z","response" : "200","bytes" : "24","clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +0000"},"_ingest" : {"timestamp" : "2020-11-17T11:09:35.351117Z"}}}]
}
我们可以看到模拟出来的结果。
上面的个文档经过一组 pipeline processor 的处理,我们不能看出来每个 processor 的处理结果。这个时候,我们可以添加上 verbose 来进行查看每个 processor 的处理情况:
POST _ingest/pipeline/apache-log/_simulate?verbose
{"docs": [{"_source": {"message": "83.149.9.216 - - [17/May/2015:10:05:03 +0000] \"GET / HTTP/1.1\" 200 24"}}]
}
上面的返回的结果是:
{"docs" : [{"processor_results" : [{"processor_type" : "grok","status" : "success","doc" : {"_index" : "_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","message" : """83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET / HTTP/1.1" 200 24""","response" : "200","bytes" : "24","clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +0000"},"_ingest" : {"pipeline" : "apache-log","timestamp" : "2020-11-17T11:11:43.039149Z"}}},{"processor_type" : "date","status" : "success","doc" : {"_index" : "_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","message" : """83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET / HTTP/1.1" 200 24""","@timestamp" : "2015-05-17T10:05:03.000Z","response" : "200","bytes" : "24","clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +0000"},"_ingest" : {"pipeline" : "apache-log","timestamp" : "2020-11-17T11:11:43.039149Z"}}},{"processor_type" : "remove","status" : "success","doc" : {"_index" : "_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","@timestamp" : "2015-05-17T10:05:03.000Z","response" : "200","bytes" : "24","clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +0000"},"_ingest" : {"pipeline" : "apache-log","timestamp" : "2020-11-17T11:11:43.039149Z"}}}]}]
}
上面详细地记录了每个 processor 所处理的结果。它可以非常方便地让我对每个 processor 的运行进行分解,并对我们的错误进行排查。
如何处理 pipeline 错误
当我们使用 pipeline 处理一个文档的时候,有时并不是所有的文档都很规范,那么这个时候就会出现文档不能被正确解析或者处理的情况:
当它不能正常解析的时候,它会返回客户端一个错误的信息,表明它不能被正确地处理。这是一种默认的动作。另外一种处理方式是,我们可以通过 on_failure 来处理我们的错误:
当错误发生后,我们可以创建另外一组 processor 来处理我们的错。在通常的情况下,我们可以使用 set processor 来对文档做一些相关的处理。比如它可以帮我们把错误的文档信息记录下来,并保存于另外一个索引之中。之后我们可以检查这个索引,并根据错误信息来解决这个问题。在这种情况下:
在这种情况下,下面的 processor 将不被执行。在这种情况下,客户端将不再收到失败的消息。我们可以在 on_failure 中把错误的信息存放于另外一个索引之中。我们可以定义一组的 processor 来处理这个错误,比如:
在上面,我们可以通过 remove 以及 set 两个 processor 对失败的文档进行处理。我们甚至可以针对这个 failure 的 processor 组再进行额外的 on_failure 处理。在上面,我们可以通过 set process 来做一些处理,比如如果当前的 date 是错误的,我们可以设置一个默认的日期,或者使用当前的日期,让后重新让这个文档进入队列进行处理:
这个完全依赖于你自己的业务需求来进行处理。
下面,我将以一个例子来进行演示:
GET _ingest/pipeline/_simulate
{"pipeline": {"processors": [{"grok": {"field": "message","patterns": ["%{COMMONAPACHELOG}"]}},{"date": {"field": "timestamp","formats": ["dd/MMM/yyyy:HH:mm:ss Z"]}},{"remove": {"field": "message"}}]},"docs": [{"_source": {"message": """83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET / HTTP/1.1" 200 24"""},"_index": "my_index"}]
}
在上面,我们把文档写入到 my_index 之中,尽管我们只是模拟:
{"docs" : [{"doc" : {"_index" : "my_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","@timestamp" : "2015-05-17T10:05:03.000Z","response" : "200","bytes" : "24","clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +0000"},"_ingest" : {"timestamp" : "2020-11-17T11:55:43.679709Z"}}}]
}
在上面我们可以看出来经过 grok 的处理,bytes 是一个字符串。我们可以通过 convert processor 来把这个字段转变为整数:
GET _ingest/pipeline/_simulate
{"pipeline": {"processors": [{"grok": {"field": "message","patterns": ["%{COMMONAPACHELOG}"]}},{"date": {"field": "timestamp","formats": ["dd/MMM/yyyy:HH:mm:ss Z"]}},{"remove": {"field": "message"}},{"convert": {"field": "bytes","type": "integer"}}]},"docs": [{"_source": {"message": """83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET / HTTP/1.1" 200 24"""},"_index": "my_index"}]
}
那么现在的结果是:
{"docs" : [{"doc" : {"_index" : "my_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","@timestamp" : "2015-05-17T10:05:03.000Z","response" : "200","bytes" : 24,"clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +0000"},"_ingest" : {"timestamp" : "2020-11-17T12:01:38.662559Z"}}}]
}
从上面,我们可以看出来 bytes 现在变为整型值了。当然我们也可以如法炮制,把上面的接口调用添加 verbose 参数来查看每个 processor 的执行情况。为了调试的方便,我们甚至可以对每个 processor 添加一个 tag,这样当我们使用 verbose 时可以很轻松地知道是那个 processor:
GET _ingest/pipeline/_simulate?verbose
{"pipeline": {"processors": [{"grok": {"field": "message","patterns": ["%{COMMONAPACHELOG}"]}},{"date": {"field": "timestamp","formats": ["dd/MMM/yyyy:HH:mm:ss Z"]}},{"remove": {"field": "message"}},{"convert": {"field": "bytes","type": "integer"}},{"convert": {"tag": "convert_reponse","field": "response","type": "integer"}}]},"docs": [{"_source": {"message": """83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET / HTTP/1.1" 200 24"""},"_index": "my_index"}]
}
在上面,我们针对 response 的 convert processor 添加了一个叫做 convert_response 的 tag。这样当我们搜寻 convert_response 更加容易,否则有两个 convert processor,我们不容易区分,尽管执行是按照次序先后执行的。
{"docs" : [{"processor_results" : [{"processor_type" : "grok","status" : "success","doc" : {"_index" : "my_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","message" : """83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET / HTTP/1.1" 200 24""","response" : "200","bytes" : "24","clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +0000"},"_ingest" : {"pipeline" : "_simulate_pipeline","timestamp" : "2020-11-17T12:07:36.432606Z"}}},{"processor_type" : "date","status" : "success","doc" : {"_index" : "my_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","message" : """83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET / HTTP/1.1" 200 24""","@timestamp" : "2015-05-17T10:05:03.000Z","response" : "200","bytes" : "24","clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +0000"},"_ingest" : {"pipeline" : "_simulate_pipeline","timestamp" : "2020-11-17T12:07:36.432606Z"}}},{"processor_type" : "remove","status" : "success","doc" : {"_index" : "my_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","@timestamp" : "2015-05-17T10:05:03.000Z","response" : "200","bytes" : "24","clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +0000"},"_ingest" : {"pipeline" : "_simulate_pipeline","timestamp" : "2020-11-17T12:07:36.432606Z"}}},{"processor_type" : "convert","status" : "success","doc" : {"_index" : "my_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","@timestamp" : "2015-05-17T10:05:03.000Z","response" : "200","bytes" : 24,"clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +0000"},"_ingest" : {"pipeline" : "_simulate_pipeline","timestamp" : "2020-11-17T12:07:36.432606Z"}}},{"processor_type" : "convert","status" : "success","tag" : "convert_reponse","doc" : {"_index" : "my_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","@timestamp" : "2015-05-17T10:05:03.000Z","response" : 200,"bytes" : 24,"clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +0000"},"_ingest" : {"pipeline" : "_simulate_pipeline","timestamp" : "2020-11-17T12:07:36.432606Z"}}}]}]
}
我们从上面的输出中可以看到 convert_response 的 tag。
下面,我们来模拟一个错误的文档,从而使得 processor 不能被正确地解析。我们把文档中 2015 中的 “5” 去掉:
GET _ingest/pipeline/_simulate?verbose
{"pipeline": {"processors": [{"grok": {"field": "message","patterns": ["%{COMMONAPACHELOG}"]}},{"date": {"field": "timestamp","formats": ["dd/MMM/yyyy:HH:mm:ss Z"]}},{"remove": {"field": "message"}},{"convert": {"field": "bytes","type": "integer"}},{"convert": {"tag": "convert_reponse","field": "response","type": "integer"}}]},"docs": [{"_source": {"message": """83.149.9.216 - - [17/May/201:10:05:03 +0000] "GET / HTTP/1.1" 200 24"""},"_index": "my_index"}]
}
显然这样会造成一个不能被正确解析的文档。返回的错误如下:
{"docs" : [{"processor_results" : [{"processor_type" : "grok","status" : "error","error" : {"root_cause" : [{"type" : "illegal_argument_exception","reason" : """Provided Grok expressions do not match field value: [83.149.9.216 - - [17/May/201:10:05:03 +0000] "GET / HTTP/1.1" 200 24]"""}],"type" : "illegal_argument_exception","reason" : """Provided Grok expressions do not match field value: [83.149.9.216 - - [17/May/201:10:05:03 +0000] "GET / HTTP/1.1" 200 24]"""}}]}]
}
当我们发送这样的错误时,我们很容易发现这个问题。上面显示 grok pattern 不匹配。我们可以对文档进行如下的修改:
"message": """83.149.9.216 - - [17/May/2015:10:05:03 +200] "GET / HTTP/1.1" 200 24"""
在上面,我们把时间中的 +0000 修改为 +000,也就是少了一个 0。我们接下来运行:
GET _ingest/pipeline/_simulate?verbose
{"pipeline": {"processors": [{"grok": {"field": "message","patterns": ["%{COMMONAPACHELOG}"]}},{"date": {"field": "timestamp","formats": ["dd/MMM/yyyy:HH:mm:ss Z"]}},{"remove": {"field": "message"}},{"convert": {"field": "bytes","type": "integer"}},{"convert": {"tag": "convert_reponse","field": "response","type": "integer"}}]},"docs": [{"_source": {"message": """83.149.9.216 - - [17/May/2015:10:05:03 +000] "GET / HTTP/1.1" 200 24"""},"_index": "my_index"}]
}
上面的命令返回的结果为:
{"docs" : [{"processor_results" : [{"processor_type" : "grok","status" : "success","doc" : {"_index" : "my_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","message" : """83.149.9.216 - - [17/May/2015:10:05:03 +000] "GET / HTTP/1.1" 200 24""","response" : "200","bytes" : "24","clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +000"},"_ingest" : {"pipeline" : "_simulate_pipeline","timestamp" : "2020-11-17T12:20:43.098763Z"}}},{"processor_type" : "date","status" : "error","error" : {"root_cause" : [{"type" : "illegal_argument_exception","reason" : "unable to parse date [17/May/2015:10:05:03 +000]"}],"type" : "illegal_argument_exception","reason" : "unable to parse date [17/May/2015:10:05:03 +000]","caused_by" : {"type" : "illegal_argument_exception","reason" : "failed to parse date field [17/May/2015:10:05:03 +000] with format [dd/MMM/yyyy:HH:mm:ss Z]","caused_by" : {"type" : "date_time_parse_exception","reason" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21"}}}}]}]
}
这次显然和之前的是不一样的。 grok pattern 能正确地解析我们的文档,但是我们的 date processor 解析时间出现了问题。
处理这种问题,我们有两种方法:
- pipeline 级别来处理
- processor 级别来处理
Pipeline 级别来处理
我们在 pipeline 的后面添加一个 on_failure:
GET _ingest/pipeline/_simulate?verbose
{"pipeline": {"processors": [{"grok": {"field": "message","patterns": ["%{COMMONAPACHELOG}"]}},{"date": {"field": "timestamp","formats": ["dd/MMM/yyyy:HH:mm:ss Z"]}},{"remove": {"field": "message"}},{"convert": {"field": "bytes","type": "integer"}},{"convert": {"tag": "convert_reponse","field": "response","type": "integer"}}],"on_failure": [{"set": {"field": "_index","value": "failed"}}]},"docs": [{"_source": {"message": """83.149.9.216 - - [17/May/2015:10:05:03 +000] "GET / HTTP/1.1" 200 24"""},"_index": "my_index"}]
}
在上面,我添加了如下的代码:
"on_failure": [{"set": {"field": "_index","value": "failed"}}]
在这里,我们指定了另外一个索引叫做 failed。执行上面的 pipeline:
{"docs" : [{"processor_results" : [{"processor_type" : "grok","status" : "success","doc" : {"_index" : "my_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","message" : """83.149.9.216 - - [17/May/2015:10:05:03 +000] "GET / HTTP/1.1" 200 24""","response" : "200","bytes" : "24","clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +000"},"_ingest" : {"pipeline" : "_simulate_pipeline","timestamp" : "2020-11-17T12:25:50.517958Z"}}},{"processor_type" : "date","status" : "error","error" : {"root_cause" : [{"type" : "illegal_argument_exception","reason" : "unable to parse date [17/May/2015:10:05:03 +000]"}],"type" : "illegal_argument_exception","reason" : "unable to parse date [17/May/2015:10:05:03 +000]","caused_by" : {"type" : "illegal_argument_exception","reason" : "failed to parse date field [17/May/2015:10:05:03 +000] with format [dd/MMM/yyyy:HH:mm:ss Z]","caused_by" : {"type" : "date_time_parse_exception","reason" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21"}}}},{"processor_type" : "set","status" : "success","doc" : {"_index" : "failed","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","message" : """83.149.9.216 - - [17/May/2015:10:05:03 +000] "GET / HTTP/1.1" 200 24""","response" : "200","bytes" : "24","clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +000"},"_ingest" : {"pipeline" : "_simulate_pipeline","on_failure_message" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21","on_failure_processor_tag" : null,"timestamp" : "2020-11-17T12:25:50.517958Z","on_failure_processor_type" : "date"}}}]}]
}
显然第一步是成功的,第二步有错误,紧接着它执行了 on_failure,并在里面执行了 set processor 把索引修改为 failed。你之后可以直接在 failed 索引中进行查看。
"_ingest" : {"pipeline" : "_simulate_pipeline","on_failure_message" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21","on_failure_processor_tag" : null,"timestamp" : "2020-11-17T12:25:50.517958Z","on_failure_processor_type" : "date"}
在上面它指出来在 ingest 是的一个错误信息,我们可以接着记录这个错误的信息:
GET _ingest/pipeline/_simulate?verbose
{"pipeline": {"processors": [{"grok": {"field": "message","patterns": ["%{COMMONAPACHELOG}"]}},{"date": {"field": "timestamp","formats": ["dd/MMM/yyyy:HH:mm:ss Z"]}},{"remove": {"field": "message"}},{"convert": {"field": "bytes","type": "integer"}},{"convert": {"tag": "convert_reponse","field": "response","type": "integer"}}],"on_failure": [{"set": {"field": "_index","value": "failed"}},{"set": {"tag": "mark_failure","field": "failure","value": {"message": "{{_ingest.on_failure_message}}" }}}]},"docs": [{"_source": {"message": """83.149.9.216 - - [17/May/2015:10:05:03 +000] "GET / HTTP/1.1" 200 24"""},"_index": "my_index"}]
}
在上面我们设置 failure 字段,并记录一个 object。运行上面的 pipeline:
{"docs" : [{"processor_results" : [{"processor_type" : "grok","status" : "success","doc" : {"_index" : "my_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","message" : """83.149.9.216 - - [17/May/2015:10:05:03 +000] "GET / HTTP/1.1" 200 24""","response" : "200","bytes" : "24","clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +000"},"_ingest" : {"pipeline" : "_simulate_pipeline","timestamp" : "2020-11-17T12:39:09.206999Z"}}},{"processor_type" : "date","status" : "error","error" : {"root_cause" : [{"type" : "illegal_argument_exception","reason" : "unable to parse date [17/May/2015:10:05:03 +000]"}],"type" : "illegal_argument_exception","reason" : "unable to parse date [17/May/2015:10:05:03 +000]","caused_by" : {"type" : "illegal_argument_exception","reason" : "failed to parse date field [17/May/2015:10:05:03 +000] with format [dd/MMM/yyyy:HH:mm:ss Z]","caused_by" : {"type" : "date_time_parse_exception","reason" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21"}}}},{"processor_type" : "set","status" : "success","doc" : {"_index" : "failed","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","message" : """83.149.9.216 - - [17/May/2015:10:05:03 +000] "GET / HTTP/1.1" 200 24""","response" : "200","bytes" : "24","clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +000"},"_ingest" : {"pipeline" : "_simulate_pipeline","on_failure_message" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21","on_failure_processor_tag" : null,"timestamp" : "2020-11-17T12:39:09.206999Z","on_failure_processor_type" : "date"}}},{"processor_type" : "set","status" : "success","tag" : "mark_failure","doc" : {"_index" : "failed","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","message" : """83.149.9.216 - - [17/May/2015:10:05:03 +000] "GET / HTTP/1.1" 200 24""","response" : "200","bytes" : "24","failure" : {"message" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21"},"clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +000"},"_ingest" : {"pipeline" : "_simulate_pipeline","on_failure_message" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21","on_failure_processor_tag" : null,"timestamp" : "2020-11-17T12:39:09.206999Z","on_failure_processor_type" : "date"}}}]}]
}
显然在上面的 _source 中新增加了一个叫做 failure 的字段。它含有相应的错误信息。由于上面的 failure 是一个 object, 事实上我们可以为它添加多个字段,比如:
GET _ingest/pipeline/_simulate?verbose
{"pipeline": {"processors": [{"grok": {"field": "message","patterns": ["%{COMMONAPACHELOG}"]}},{"date": {"field": "timestamp","formats": ["dd/MMM/yyyy:HH:mm:ss Z"]}},{"remove": {"field": "message"}},{"convert": {"field": "bytes","type": "integer"}},{"convert": {"tag": "convert_reponse","field": "response","type": "integer"}}],"on_failure": [{"set": {"field": "_index","value": "failed"}},{"set": {"tag": "mark_failure","field": "failure","value": {"message": "{{_ingest.on_failure_message}}","processor": "{{_ingest.on_failure_processor_type}}"}}}]},"docs": [{"_source": {"message": """83.149.9.216 - - [17/May/2015:10:05:03 +000] "GET / HTTP/1.1" 200 24"""},"_index": "my_index"}]
}
我们添加了字段 processor,这样我们可以更容易知道是哪个 processor 出了问题:
{"docs" : [{"processor_results" : [{"processor_type" : "grok","status" : "success","doc" : {"_index" : "my_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","message" : """83.149.9.216 - - [17/May/2015:10:05:03 +000] "GET / HTTP/1.1" 200 24""","response" : "200","bytes" : "24","clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +000"},"_ingest" : {"pipeline" : "_simulate_pipeline","timestamp" : "2020-11-17T12:42:27.811805Z"}}},{"processor_type" : "date","status" : "error","error" : {"root_cause" : [{"type" : "illegal_argument_exception","reason" : "unable to parse date [17/May/2015:10:05:03 +000]"}],"type" : "illegal_argument_exception","reason" : "unable to parse date [17/May/2015:10:05:03 +000]","caused_by" : {"type" : "illegal_argument_exception","reason" : "failed to parse date field [17/May/2015:10:05:03 +000] with format [dd/MMM/yyyy:HH:mm:ss Z]","caused_by" : {"type" : "date_time_parse_exception","reason" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21"}}}},{"processor_type" : "set","status" : "success","doc" : {"_index" : "failed","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","message" : """83.149.9.216 - - [17/May/2015:10:05:03 +000] "GET / HTTP/1.1" 200 24""","response" : "200","bytes" : "24","clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +000"},"_ingest" : {"pipeline" : "_simulate_pipeline","on_failure_message" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21","on_failure_processor_tag" : null,"timestamp" : "2020-11-17T12:42:27.811805Z","on_failure_processor_type" : "date"}}},{"processor_type" : "set","status" : "success","tag" : "mark_failure","doc" : {"_index" : "failed","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","message" : """83.149.9.216 - - [17/May/2015:10:05:03 +000] "GET / HTTP/1.1" 200 24""","response" : "200","bytes" : "24","failure" : {"message" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21","processor" : "date"},"clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +000"},"_ingest" : {"pipeline" : "_simulate_pipeline","on_failure_message" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21","on_failure_processor_tag" : null,"timestamp" : "2020-11-17T12:42:27.811805Z","on_failure_processor_type" : "date"}}}]}]
}
上面的这种处理是在 pipeline 级的处理。
Processor 级处理
我们直接可以针对每个 processor 进行错误的捕获及处理。比如针对 date process:
GET _ingest/pipeline/_simulate?verbose
{"pipeline": {"processors": [{"grok": {"field": "message","patterns": ["%{COMMONAPACHELOG}"]}},{"date": {"field": "timestamp","formats": ["dd/MMM/yyyy:HH:mm:ss Z"],"on_failure": [{"set": {"tag": "set_default_date","field": "@timestamp","value": "{{_ingest.timestamp}}"}}]}},{"remove": {"field": "message"}},{"convert": {"field": "bytes","type": "integer"}},{"convert": {"tag": "convert_reponse","field": "response","type": "integer"}}],"on_failure": [{"set": {"field": "_index","value": "failed"}},{"set": {"tag": "mark_failure","field": "failure","value": {"message": "{{_ingest.on_failure_message}}","processor": "{{_ingest.on_failure_processor_type}}"}}}]},"docs": [{"_source": {"message": """83.149.9.216 - - [17/May/2015:10:05:03 +000] "GET / HTTP/1.1" 200 24"""},"_index": "my_index"}]
}
在上面,我们为 date processor 添加了如下的 on_failure 代码:
{"date": {"field": "timestamp","formats": ["dd/MMM/yyyy:HH:mm:ss Z"],"on_failure": [{"set": {"tag": "set_default_date","field": "@timestamp","value": "{{_ingest.timestamp}}"}}]}}
当错误发生时,我们直接使用 _ingest.timestamp 作为 @timestamp 的值。运行上面的 pipeline:
{"docs" : [{"processor_results" : [{"processor_type" : "grok","status" : "success","doc" : {"_index" : "my_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","message" : """83.149.9.216 - - [17/May/2015:10:05:03 +000] "GET / HTTP/1.1" 200 24""","response" : "200","bytes" : "24","clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +000"},"_ingest" : {"pipeline" : "_simulate_pipeline","timestamp" : "2020-11-17T12:49:49.720153Z"}}},{"processor_type" : "date","status" : "error","error" : {"root_cause" : [{"type" : "illegal_argument_exception","reason" : "unable to parse date [17/May/2015:10:05:03 +000]"}],"type" : "illegal_argument_exception","reason" : "unable to parse date [17/May/2015:10:05:03 +000]","caused_by" : {"type" : "illegal_argument_exception","reason" : "failed to parse date field [17/May/2015:10:05:03 +000] with format [dd/MMM/yyyy:HH:mm:ss Z]","caused_by" : {"type" : "date_time_parse_exception","reason" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21"}}}},{"processor_type" : "set","status" : "success","tag" : "set_default_date","doc" : {"_index" : "my_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","message" : """83.149.9.216 - - [17/May/2015:10:05:03 +000] "GET / HTTP/1.1" 200 24""","@timestamp" : "2020-11-17T12:49:49.720153Z","response" : "200","bytes" : "24","clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +000"},"_ingest" : {"pipeline" : "_simulate_pipeline","on_failure_message" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21","on_failure_processor_tag" : null,"timestamp" : "2020-11-17T12:49:49.720153Z","on_failure_processor_type" : "date"}}},{"processor_type" : "remove","status" : "success","doc" : {"_index" : "my_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","@timestamp" : "2020-11-17T12:49:49.720153Z","response" : "200","bytes" : "24","clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +000"},"_ingest" : {"pipeline" : "_simulate_pipeline","timestamp" : "2020-11-17T12:49:49.720153Z"}}},{"processor_type" : "convert","status" : "success","doc" : {"_index" : "my_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","@timestamp" : "2020-11-17T12:49:49.720153Z","response" : "200","bytes" : 24,"clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +000"},"_ingest" : {"pipeline" : "_simulate_pipeline","timestamp" : "2020-11-17T12:49:49.720153Z"}}},{"processor_type" : "convert","status" : "success","tag" : "convert_reponse","doc" : {"_index" : "my_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","@timestamp" : "2020-11-17T12:49:49.720153Z","response" : 200,"bytes" : 24,"clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +000"},"_ingest" : {"pipeline" : "_simulate_pipeline","timestamp" : "2020-11-17T12:49:49.720153Z"}}}]}]
}
显然在这次运行中,当错误发生时 set_default_date,被调用,并且 @timestamp" : "2020-11-17T12:49:49.720153Z。显然是 ingest pipeline 被执行的时间。这个和之前的文档中的时间相差很远。这个完全依赖于你自己的业务设计,看你具体想使用什么值。
接下来,我们假如我们已经修正了我们的时间,重新变为 +0000。我们把 bytes 的数值修改为一个不可以转换为数值的字符,比如 "-".
"message": """83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET / HTTP/1.1" 200 -"""
重新运行 pipeline,我们将会发现错误信息:
"error" : {"root_cause" : [{"type" : "illegal_argument_exception","reason" : "field [bytes] not present as part of path [bytes]"}],"type" : "illegal_argument_exception","reason" : "field [bytes] not present as part of path [bytes]"}
如法炮制,我们可以为这个 processor 定制一个 on_failure:
GET _ingest/pipeline/_simulate?verbose
{"pipeline": {"processors": [{"grok": {"field": "message","patterns": ["%{COMMONAPACHELOG}"]}},{"date": {"field": "timestamp","formats": ["dd/MMM/yyyy:HH:mm:ss Z"],"on_failure": [{"set": {"tag": "set_default_date","field": "@timestamp","value": "{{_ingest.timestamp}}"}}]}},{"remove": {"field": "message"}},{"convert": {"field": "bytes","type": "integer","on_failure":[{"set": {"field": "bytes","value": -1}}]}},{"convert": {"tag": "convert_reponse","field": "response","type": "integer"}}],"on_failure": [{"set": {"field": "_index","value": "failed"}},{"set": {"tag": "mark_failure","field": "failure","value": {"message": "{{_ingest.on_failure_message}}","processor": "{{_ingest.on_failure_processor_type}}"}}}]},"docs": [{"_source": {"message": """83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET / HTTP/1.1" 200 -"""},"_index": "my_index"}]
}
我们添加了如想的代码:
{"convert": {"field": "bytes","type": "integer","on_failure":[{"set": {"field": "bytes","value": -1}}]}}
也就是说,当错误发生后,我们直接把 bytes 设置为 -1:
{"docs" : [{"processor_results" : [{"processor_type" : "grok","status" : "success","doc" : {"_index" : "my_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","response" : "200","clientip" : "83.149.9.216","verb" : "GET","httpversion" : "1.1","message" : """83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET / HTTP/1.1" 200 -""","timestamp" : "17/May/2015:10:05:03 +0000"},"_ingest" : {"pipeline" : "_simulate_pipeline","timestamp" : "2020-11-17T12:59:19.385189Z"}}},{"processor_type" : "date","status" : "success","doc" : {"_index" : "my_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","message" : """83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET / HTTP/1.1" 200 -""","@timestamp" : "2015-05-17T10:05:03.000Z","response" : "200","clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +0000"},"_ingest" : {"pipeline" : "_simulate_pipeline","timestamp" : "2020-11-17T12:59:19.385189Z"}}},{"processor_type" : "remove","status" : "success","doc" : {"_index" : "my_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","@timestamp" : "2015-05-17T10:05:03.000Z","auth" : "-","ident" : "-","response" : "200","clientip" : "83.149.9.216","verb" : "GET","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +0000"},"_ingest" : {"pipeline" : "_simulate_pipeline","timestamp" : "2020-11-17T12:59:19.385189Z"}}},{"processor_type" : "convert","status" : "error","error" : {"root_cause" : [{"type" : "illegal_argument_exception","reason" : "field [bytes] not present as part of path [bytes]"}],"type" : "illegal_argument_exception","reason" : "field [bytes] not present as part of path [bytes]"}},{"processor_type" : "set","status" : "success","doc" : {"_index" : "my_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","@timestamp" : "2015-05-17T10:05:03.000Z","response" : "200","bytes" : -1,"clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +0000"},"_ingest" : {"pipeline" : "_simulate_pipeline","on_failure_message" : "field [bytes] not present as part of path [bytes]","on_failure_processor_tag" : null,"timestamp" : "2020-11-17T12:59:19.385189Z","on_failure_processor_type" : "convert"}}},{"processor_type" : "convert","status" : "success","tag" : "convert_reponse","doc" : {"_index" : "my_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","@timestamp" : "2015-05-17T10:05:03.000Z","response" : 200,"bytes" : -1,"clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +0000"},"_ingest" : {"pipeline" : "_simulate_pipeline","timestamp" : "2020-11-17T12:59:19.385189Z"}}}]}]
}
我们可以从上面的输出结果中看出来 byte 已经被设置为 -1。
好了今天的分享就到这里。希望大家知道如何来处理 pipeline 的错误,并做相应的处理。
Elasticsearch:如何处理 ingest pipeline 中的异常相关推荐
- Elasticsearch:Ingest pipeline 介绍
Ingest pipeline 可让你在索引之前对数据执行常见转换. 例如,你可以使用 pipeline 删除字段.从文本中提取值并丰富你的数据. Pipeline 由一系列称为处理器(process ...
- Elasticsearch:ingest pipeline 使用示例 - 解析常用日志格式
在本示例教程中,你将在索引之前使用 ingest pipeline 以通用日志格式解析服务器日志. 在开始之前,请检查摄取管道的先决条件. 你要解析的日志类似于以下内容: 127.0.0.1 user ...
- Elasticsearch:Ingest Pipeline 实践
相比较 Logstash 而言,由于其丰富的 processors 而受到越来越多人的喜欢.最重要的一个优点就是它基于 Elasticsearch 极具可拓展性和维护性而受到开发者的喜欢.我在之前创建 ...
- Elasticsearch使用Ingest Pipeline进行数据预处理
本文基于Elasticsearch7.x Elasticsearch可以使用自身的Ingest Pipeline功能进行数据预处理, 无须借助Logstash. Ingest Pipeline介绍 I ...
- java字符串除法函数,java – 函数式编程:如何处理函数式编程中的异常或它的等价物...
以下显示了如何在Haskell中完成它. 基于类型siginure divide :: Int – > Int – >无论[Char] Int,您都可以看到函数除法将返回Left字符串或R ...
- Elasticsearch:ingest pipelines - 使用技巧和窍门
在今天的文章中,我将列举一些例子来讲述使用 Elasticsearch ingest pipeline (摄取管道)的一些技巧.这些技巧虽然简单,但是在很多的应用场景中还是非常实用的.更多关于 ing ...
- Elasticsearch:从零开始创建一个 ingest pipeline 处理器
实际上在我之前的文章: Elasticsearch:创建属于自己的 Ingest processor Elasticsearch:创建一个 Elasticsearch Ingest 插件 我已经详述了 ...
- Elasticsearch:创建 Ingest pipeline
在 Elasticsearch 针对数据进行分析之前,我们必须针对数据进行摄入.在摄入的过程中,我们需要对数据进行加工,这其中包括非结构化数据转换为结构化数据,数据的转换,丰富,删除,添加新的字段等等 ...
- 如何处理Entity Framework中的DbUpdateConcurrencyException异常
如何处理Entity Framework中的DbUpdateConcurrencyException异常 参考文章: (1)如何处理Entity Framework中的DbUpdateConcurre ...
最新文章
- Ajax实现在textbox中输入内容,动态从数据库中模糊查询显示到下拉框中
- HttpServletrequest 与HttpServletResponse总结
- pppoe移植到arm上 1.0
- canal中mysql版本错误日志
- 区块链(1)——以太坊下载安装(我营销?营销个屁)
- 肖仰华 | 知识图谱与认知智能
- jenkins html编辑,Jenkins HTML Publisher插件:Jenkins 1.643没有外部链接
- 微信支付开发(3) 对账单
- Oracle 空间管理
- Spring4新特性——泛型限定式依赖注入
- GB2312汉字区位码、交换码和机内码转换方法(转)
- 灰色关联分析与预测模型
- mx记录什么意思?域名mx记录怎么设置?
- response返回中文乱码
- ios 获取沙盒文件名_iOS之沙盒路径
- 运放专题:虚短、虚短
- PostgreSQL对不足位数的查询结果进行前后补0
- 我的校招——南京烽火笔试+格力初面
- python爬虫之字符集和编码
- 微机原理 - 期末考试复习考点