在我之前的文章 “如何在 Elasticsearch 中使用 pipeline API 来对事件进行处理” 中,我详细地介绍了如何创建并使用一个 ingest pipeline。简单地说 pipeline 是一系列处理器的定义,这些处理器将按照声明的顺序执行。 pipeline 包含两个主要字段:描述和处理器列表:

在这里,特别需要指出的是 pipeline 是运行于 ingest node 之上的。所有的 ingest pipeline 被保存于 cluster state 中。

Pipeline 是如何工作的

下面是一个定义 pipeline 的例子:

PUT _ingest/pipeline/apache-log
{"description": "This is an example for apache logs","processors": [{"grok": {"field": "message","patterns": ["%{COMMONAPACHELOG}"]}},{"date": {"field": "timestamp","formats": ["dd/MMM/yyyy:HH:mm:ss Z"]}},{"remove": {"field": "message"}}]
}

上面的 processors 将被依次执行。我们可以使用如下的例子来进行调用:

PUT logs/_doc/1?pipeline=apache-log
{"message": "83.149.9.216 - - [17/May/2015:10:05:03 +0000] \"GET / HTTP/1.1\" 200 24"
}

上面的命令的输出为:

{"took" : 20,"timed_out" : false,"_shards" : {"total" : 1,"successful" : 1,"skipped" : 0,"failed" : 0},"hits" : {"total" : {"value" : 1,"relation" : "eq"},"max_score" : 1.0,"hits" : [{"_index" : "logs","_type" : "_doc","_id" : "1","_score" : 1.0,"_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","@timestamp" : "2015-05-17T10:05:03.000Z","response" : "200","bytes" : "24","clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +0000"}}]}
}

我们可以通过如下的命令来查询已经被 apache-log pipeline 处理过的文档:

GET logs/_doc/1

上面的命令将返回:

{"_index" : "logs","_type" : "_doc","_id" : "1","_version" : 2,"_seq_no" : 1,"_primary_term" : 1,"found" : true,"_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","@timestamp" : "2015-05-17T10:05:03.000Z","response" : "200","bytes" : "24","clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +0000"}
}

从上面我们可以看出来:经过 apache-log 这个 pipeline 的一组 processors,我们成功地把 log 进行结构化并丰富我们的数据。我们通过 grok processor 把数据进行结构化;通过 date processor 把 @timestamp 设置为和 timestamp 一样的值;通过 remove 把 message 字段去掉。

在设计 pipeline 时,我们很少情况下直接让它作用于我们的文档。在更多的情况下,我们希望通过一些测试文档来检验我们的 pipeline 的正确性。否则一个不正确的 pipeline 会把我们的数据搞坏。我们可以通过 _simulate 来进行检测。针对我们的情况:

POST _ingest/pipeline/apache-log/_simulate
{"docs": [{"_source": {"message": "83.149.9.216 - - [17/May/2015:10:05:03 +0000] \"GET / HTTP/1.1\" 200 24"}}]
}

在上面 docs 可以定义各种可能的文档类型来进行测试。它是一个数组。我们可以同时定义多个文档来进行测试。上面的命令的返回结果是:

{"docs" : [{"doc" : {"_index" : "_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","@timestamp" : "2015-05-17T10:05:03.000Z","response" : "200","bytes" : "24","clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +0000"},"_ingest" : {"timestamp" : "2020-11-17T11:09:35.351117Z"}}}]
}

我们可以看到模拟出来的结果。

上面的个文档经过一组 pipeline processor 的处理,我们不能看出来每个 processor 的处理结果。这个时候,我们可以添加上  verbose 来进行查看每个 processor 的处理情况:

POST _ingest/pipeline/apache-log/_simulate?verbose
{"docs": [{"_source": {"message": "83.149.9.216 - - [17/May/2015:10:05:03 +0000] \"GET / HTTP/1.1\" 200 24"}}]
}

上面的返回的结果是:

{"docs" : [{"processor_results" : [{"processor_type" : "grok","status" : "success","doc" : {"_index" : "_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","message" : """83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET / HTTP/1.1" 200 24""","response" : "200","bytes" : "24","clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +0000"},"_ingest" : {"pipeline" : "apache-log","timestamp" : "2020-11-17T11:11:43.039149Z"}}},{"processor_type" : "date","status" : "success","doc" : {"_index" : "_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","message" : """83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET / HTTP/1.1" 200 24""","@timestamp" : "2015-05-17T10:05:03.000Z","response" : "200","bytes" : "24","clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +0000"},"_ingest" : {"pipeline" : "apache-log","timestamp" : "2020-11-17T11:11:43.039149Z"}}},{"processor_type" : "remove","status" : "success","doc" : {"_index" : "_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","@timestamp" : "2015-05-17T10:05:03.000Z","response" : "200","bytes" : "24","clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +0000"},"_ingest" : {"pipeline" : "apache-log","timestamp" : "2020-11-17T11:11:43.039149Z"}}}]}]
}

上面详细地记录了每个 processor 所处理的结果。它可以非常方便地让我对每个 processor 的运行进行分解,并对我们的错误进行排查。

如何处理 pipeline 错误

当我们使用 pipeline 处理一个文档的时候,有时并不是所有的文档都很规范,那么这个时候就会出现文档不能被正确解析或者处理的情况:

当它不能正常解析的时候,它会返回客户端一个错误的信息,表明它不能被正确地处理。这是一种默认的动作。另外一种处理方式是,我们可以通过 on_failure 来处理我们的错误:

当错误发生后,我们可以创建另外一组 processor 来处理我们的错。在通常的情况下,我们可以使用 set processor 来对文档做一些相关的处理。比如它可以帮我们把错误的文档信息记录下来,并保存于另外一个索引之中。之后我们可以检查这个索引,并根据错误信息来解决这个问题。在这种情况下:

在这种情况下,下面的 processor 将不被执行。在这种情况下,客户端将不再收到失败的消息。我们可以在 on_failure 中把错误的信息存放于另外一个索引之中。我们可以定义一组的 processor 来处理这个错误,比如:

在上面,我们可以通过 remove 以及 set 两个 processor 对失败的文档进行处理。我们甚至可以针对这个 failure 的 processor 组再进行额外的 on_failure 处理。在上面,我们可以通过 set process 来做一些处理,比如如果当前的 date 是错误的,我们可以设置一个默认的日期,或者使用当前的日期,让后重新让这个文档进入队列进行处理:

这个完全依赖于你自己的业务需求来进行处理。

下面,我将以一个例子来进行演示:

GET _ingest/pipeline/_simulate
{"pipeline": {"processors": [{"grok": {"field": "message","patterns": ["%{COMMONAPACHELOG}"]}},{"date": {"field": "timestamp","formats": ["dd/MMM/yyyy:HH:mm:ss Z"]}},{"remove": {"field": "message"}}]},"docs": [{"_source": {"message": """83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET / HTTP/1.1" 200 24"""},"_index": "my_index"}]
}

在上面,我们把文档写入到 my_index 之中,尽管我们只是模拟:

{"docs" : [{"doc" : {"_index" : "my_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","@timestamp" : "2015-05-17T10:05:03.000Z","response" : "200","bytes" : "24","clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +0000"},"_ingest" : {"timestamp" : "2020-11-17T11:55:43.679709Z"}}}]
}

在上面我们可以看出来经过 grok 的处理,bytes 是一个字符串。我们可以通过 convert processor 来把这个字段转变为整数:

GET _ingest/pipeline/_simulate
{"pipeline": {"processors": [{"grok": {"field": "message","patterns": ["%{COMMONAPACHELOG}"]}},{"date": {"field": "timestamp","formats": ["dd/MMM/yyyy:HH:mm:ss Z"]}},{"remove": {"field": "message"}},{"convert": {"field": "bytes","type": "integer"}}]},"docs": [{"_source": {"message": """83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET / HTTP/1.1" 200 24"""},"_index": "my_index"}]
}

那么现在的结果是:

{"docs" : [{"doc" : {"_index" : "my_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","@timestamp" : "2015-05-17T10:05:03.000Z","response" : "200","bytes" : 24,"clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +0000"},"_ingest" : {"timestamp" : "2020-11-17T12:01:38.662559Z"}}}]
}

从上面,我们可以看出来 bytes 现在变为整型值了。当然我们也可以如法炮制,把上面的接口调用添加  verbose 参数来查看每个 processor 的执行情况。为了调试的方便,我们甚至可以对每个 processor 添加一个 tag,这样当我们使用 verbose 时可以很轻松地知道是那个 processor:

GET _ingest/pipeline/_simulate?verbose
{"pipeline": {"processors": [{"grok": {"field": "message","patterns": ["%{COMMONAPACHELOG}"]}},{"date": {"field": "timestamp","formats": ["dd/MMM/yyyy:HH:mm:ss Z"]}},{"remove": {"field": "message"}},{"convert": {"field": "bytes","type": "integer"}},{"convert": {"tag": "convert_reponse","field": "response","type": "integer"}}]},"docs": [{"_source": {"message": """83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET / HTTP/1.1" 200 24"""},"_index": "my_index"}]
}

在上面,我们针对 response 的 convert processor 添加了一个叫做 convert_response 的 tag。这样当我们搜寻 convert_response 更加容易,否则有两个 convert processor,我们不容易区分,尽管执行是按照次序先后执行的。

{"docs" : [{"processor_results" : [{"processor_type" : "grok","status" : "success","doc" : {"_index" : "my_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","message" : """83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET / HTTP/1.1" 200 24""","response" : "200","bytes" : "24","clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +0000"},"_ingest" : {"pipeline" : "_simulate_pipeline","timestamp" : "2020-11-17T12:07:36.432606Z"}}},{"processor_type" : "date","status" : "success","doc" : {"_index" : "my_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","message" : """83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET / HTTP/1.1" 200 24""","@timestamp" : "2015-05-17T10:05:03.000Z","response" : "200","bytes" : "24","clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +0000"},"_ingest" : {"pipeline" : "_simulate_pipeline","timestamp" : "2020-11-17T12:07:36.432606Z"}}},{"processor_type" : "remove","status" : "success","doc" : {"_index" : "my_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","@timestamp" : "2015-05-17T10:05:03.000Z","response" : "200","bytes" : "24","clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +0000"},"_ingest" : {"pipeline" : "_simulate_pipeline","timestamp" : "2020-11-17T12:07:36.432606Z"}}},{"processor_type" : "convert","status" : "success","doc" : {"_index" : "my_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","@timestamp" : "2015-05-17T10:05:03.000Z","response" : "200","bytes" : 24,"clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +0000"},"_ingest" : {"pipeline" : "_simulate_pipeline","timestamp" : "2020-11-17T12:07:36.432606Z"}}},{"processor_type" : "convert","status" : "success","tag" : "convert_reponse","doc" : {"_index" : "my_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","@timestamp" : "2015-05-17T10:05:03.000Z","response" : 200,"bytes" : 24,"clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +0000"},"_ingest" : {"pipeline" : "_simulate_pipeline","timestamp" : "2020-11-17T12:07:36.432606Z"}}}]}]
}

我们从上面的输出中可以看到 convert_response 的 tag。

下面,我们来模拟一个错误的文档,从而使得 processor 不能被正确地解析。我们把文档中 2015 中的 “5” 去掉:

GET _ingest/pipeline/_simulate?verbose
{"pipeline": {"processors": [{"grok": {"field": "message","patterns": ["%{COMMONAPACHELOG}"]}},{"date": {"field": "timestamp","formats": ["dd/MMM/yyyy:HH:mm:ss Z"]}},{"remove": {"field": "message"}},{"convert": {"field": "bytes","type": "integer"}},{"convert": {"tag": "convert_reponse","field": "response","type": "integer"}}]},"docs": [{"_source": {"message": """83.149.9.216 - - [17/May/201:10:05:03 +0000] "GET / HTTP/1.1" 200 24"""},"_index": "my_index"}]
}

显然这样会造成一个不能被正确解析的文档。返回的错误如下:

{"docs" : [{"processor_results" : [{"processor_type" : "grok","status" : "error","error" : {"root_cause" : [{"type" : "illegal_argument_exception","reason" : """Provided Grok expressions do not match field value: [83.149.9.216 - - [17/May/201:10:05:03 +0000] "GET / HTTP/1.1" 200 24]"""}],"type" : "illegal_argument_exception","reason" : """Provided Grok expressions do not match field value: [83.149.9.216 - - [17/May/201:10:05:03 +0000] "GET / HTTP/1.1" 200 24]"""}}]}]
}

当我们发送这样的错误时,我们很容易发现这个问题。上面显示 grok pattern 不匹配。我们可以对文档进行如下的修改:

 "message": """83.149.9.216 - - [17/May/2015:10:05:03 +200] "GET / HTTP/1.1" 200 24"""

在上面,我们把时间中的 +0000 修改为 +000,也就是少了一个 0。我们接下来运行:

GET _ingest/pipeline/_simulate?verbose
{"pipeline": {"processors": [{"grok": {"field": "message","patterns": ["%{COMMONAPACHELOG}"]}},{"date": {"field": "timestamp","formats": ["dd/MMM/yyyy:HH:mm:ss Z"]}},{"remove": {"field": "message"}},{"convert": {"field": "bytes","type": "integer"}},{"convert": {"tag": "convert_reponse","field": "response","type": "integer"}}]},"docs": [{"_source": {"message": """83.149.9.216 - - [17/May/2015:10:05:03 +000] "GET / HTTP/1.1" 200 24"""},"_index": "my_index"}]
}

上面的命令返回的结果为:

{"docs" : [{"processor_results" : [{"processor_type" : "grok","status" : "success","doc" : {"_index" : "my_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","message" : """83.149.9.216 - - [17/May/2015:10:05:03 +000] "GET / HTTP/1.1" 200 24""","response" : "200","bytes" : "24","clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +000"},"_ingest" : {"pipeline" : "_simulate_pipeline","timestamp" : "2020-11-17T12:20:43.098763Z"}}},{"processor_type" : "date","status" : "error","error" : {"root_cause" : [{"type" : "illegal_argument_exception","reason" : "unable to parse date [17/May/2015:10:05:03 +000]"}],"type" : "illegal_argument_exception","reason" : "unable to parse date [17/May/2015:10:05:03 +000]","caused_by" : {"type" : "illegal_argument_exception","reason" : "failed to parse date field [17/May/2015:10:05:03 +000] with format [dd/MMM/yyyy:HH:mm:ss Z]","caused_by" : {"type" : "date_time_parse_exception","reason" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21"}}}}]}]
}

这次显然和之前的是不一样的。 grok pattern 能正确地解析我们的文档,但是我们的 date  processor 解析时间出现了问题。

处理这种问题,我们有两种方法:

  • pipeline 级别来处理
  • processor 级别来处理

Pipeline 级别来处理

我们在 pipeline 的后面添加一个 on_failure:

GET _ingest/pipeline/_simulate?verbose
{"pipeline": {"processors": [{"grok": {"field": "message","patterns": ["%{COMMONAPACHELOG}"]}},{"date": {"field": "timestamp","formats": ["dd/MMM/yyyy:HH:mm:ss Z"]}},{"remove": {"field": "message"}},{"convert": {"field": "bytes","type": "integer"}},{"convert": {"tag": "convert_reponse","field": "response","type": "integer"}}],"on_failure": [{"set": {"field": "_index","value": "failed"}}]},"docs": [{"_source": {"message": """83.149.9.216 - - [17/May/2015:10:05:03 +000] "GET / HTTP/1.1" 200 24"""},"_index": "my_index"}]
}

在上面,我添加了如下的代码:

    "on_failure": [{"set": {"field": "_index","value": "failed"}}]

在这里,我们指定了另外一个索引叫做 failed。执行上面的 pipeline:

{"docs" : [{"processor_results" : [{"processor_type" : "grok","status" : "success","doc" : {"_index" : "my_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","message" : """83.149.9.216 - - [17/May/2015:10:05:03 +000] "GET / HTTP/1.1" 200 24""","response" : "200","bytes" : "24","clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +000"},"_ingest" : {"pipeline" : "_simulate_pipeline","timestamp" : "2020-11-17T12:25:50.517958Z"}}},{"processor_type" : "date","status" : "error","error" : {"root_cause" : [{"type" : "illegal_argument_exception","reason" : "unable to parse date [17/May/2015:10:05:03 +000]"}],"type" : "illegal_argument_exception","reason" : "unable to parse date [17/May/2015:10:05:03 +000]","caused_by" : {"type" : "illegal_argument_exception","reason" : "failed to parse date field [17/May/2015:10:05:03 +000] with format [dd/MMM/yyyy:HH:mm:ss Z]","caused_by" : {"type" : "date_time_parse_exception","reason" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21"}}}},{"processor_type" : "set","status" : "success","doc" : {"_index" : "failed","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","message" : """83.149.9.216 - - [17/May/2015:10:05:03 +000] "GET / HTTP/1.1" 200 24""","response" : "200","bytes" : "24","clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +000"},"_ingest" : {"pipeline" : "_simulate_pipeline","on_failure_message" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21","on_failure_processor_tag" : null,"timestamp" : "2020-11-17T12:25:50.517958Z","on_failure_processor_type" : "date"}}}]}]
}

显然第一步是成功的,第二步有错误,紧接着它执行了 on_failure,并在里面执行了 set processor 把索引修改为 failed。你之后可以直接在 failed 索引中进行查看。

            "_ingest" : {"pipeline" : "_simulate_pipeline","on_failure_message" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21","on_failure_processor_tag" : null,"timestamp" : "2020-11-17T12:25:50.517958Z","on_failure_processor_type" : "date"}

在上面它指出来在 ingest 是的一个错误信息,我们可以接着记录这个错误的信息:

GET _ingest/pipeline/_simulate?verbose
{"pipeline": {"processors": [{"grok": {"field": "message","patterns": ["%{COMMONAPACHELOG}"]}},{"date": {"field": "timestamp","formats": ["dd/MMM/yyyy:HH:mm:ss Z"]}},{"remove": {"field": "message"}},{"convert": {"field": "bytes","type": "integer"}},{"convert": {"tag": "convert_reponse","field": "response","type": "integer"}}],"on_failure": [{"set": {"field": "_index","value": "failed"}},{"set": {"tag": "mark_failure","field": "failure","value": {"message": "{{_ingest.on_failure_message}}"     }}}]},"docs": [{"_source": {"message": """83.149.9.216 - - [17/May/2015:10:05:03 +000] "GET / HTTP/1.1" 200 24"""},"_index": "my_index"}]
}

在上面我们设置 failure 字段,并记录一个 object。运行上面的 pipeline:

{"docs" : [{"processor_results" : [{"processor_type" : "grok","status" : "success","doc" : {"_index" : "my_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","message" : """83.149.9.216 - - [17/May/2015:10:05:03 +000] "GET / HTTP/1.1" 200 24""","response" : "200","bytes" : "24","clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +000"},"_ingest" : {"pipeline" : "_simulate_pipeline","timestamp" : "2020-11-17T12:39:09.206999Z"}}},{"processor_type" : "date","status" : "error","error" : {"root_cause" : [{"type" : "illegal_argument_exception","reason" : "unable to parse date [17/May/2015:10:05:03 +000]"}],"type" : "illegal_argument_exception","reason" : "unable to parse date [17/May/2015:10:05:03 +000]","caused_by" : {"type" : "illegal_argument_exception","reason" : "failed to parse date field [17/May/2015:10:05:03 +000] with format [dd/MMM/yyyy:HH:mm:ss Z]","caused_by" : {"type" : "date_time_parse_exception","reason" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21"}}}},{"processor_type" : "set","status" : "success","doc" : {"_index" : "failed","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","message" : """83.149.9.216 - - [17/May/2015:10:05:03 +000] "GET / HTTP/1.1" 200 24""","response" : "200","bytes" : "24","clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +000"},"_ingest" : {"pipeline" : "_simulate_pipeline","on_failure_message" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21","on_failure_processor_tag" : null,"timestamp" : "2020-11-17T12:39:09.206999Z","on_failure_processor_type" : "date"}}},{"processor_type" : "set","status" : "success","tag" : "mark_failure","doc" : {"_index" : "failed","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","message" : """83.149.9.216 - - [17/May/2015:10:05:03 +000] "GET / HTTP/1.1" 200 24""","response" : "200","bytes" : "24","failure" : {"message" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21"},"clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +000"},"_ingest" : {"pipeline" : "_simulate_pipeline","on_failure_message" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21","on_failure_processor_tag" : null,"timestamp" : "2020-11-17T12:39:09.206999Z","on_failure_processor_type" : "date"}}}]}]
}

显然在上面的 _source 中新增加了一个叫做 failure 的字段。它含有相应的错误信息。由于上面的 failure 是一个 object, 事实上我们可以为它添加多个字段,比如:

GET _ingest/pipeline/_simulate?verbose
{"pipeline": {"processors": [{"grok": {"field": "message","patterns": ["%{COMMONAPACHELOG}"]}},{"date": {"field": "timestamp","formats": ["dd/MMM/yyyy:HH:mm:ss Z"]}},{"remove": {"field": "message"}},{"convert": {"field": "bytes","type": "integer"}},{"convert": {"tag": "convert_reponse","field": "response","type": "integer"}}],"on_failure": [{"set": {"field": "_index","value": "failed"}},{"set": {"tag": "mark_failure","field": "failure","value": {"message": "{{_ingest.on_failure_message}}","processor": "{{_ingest.on_failure_processor_type}}"}}}]},"docs": [{"_source": {"message": """83.149.9.216 - - [17/May/2015:10:05:03 +000] "GET / HTTP/1.1" 200 24"""},"_index": "my_index"}]
}

我们添加了字段 processor,这样我们可以更容易知道是哪个 processor 出了问题:

{"docs" : [{"processor_results" : [{"processor_type" : "grok","status" : "success","doc" : {"_index" : "my_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","message" : """83.149.9.216 - - [17/May/2015:10:05:03 +000] "GET / HTTP/1.1" 200 24""","response" : "200","bytes" : "24","clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +000"},"_ingest" : {"pipeline" : "_simulate_pipeline","timestamp" : "2020-11-17T12:42:27.811805Z"}}},{"processor_type" : "date","status" : "error","error" : {"root_cause" : [{"type" : "illegal_argument_exception","reason" : "unable to parse date [17/May/2015:10:05:03 +000]"}],"type" : "illegal_argument_exception","reason" : "unable to parse date [17/May/2015:10:05:03 +000]","caused_by" : {"type" : "illegal_argument_exception","reason" : "failed to parse date field [17/May/2015:10:05:03 +000] with format [dd/MMM/yyyy:HH:mm:ss Z]","caused_by" : {"type" : "date_time_parse_exception","reason" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21"}}}},{"processor_type" : "set","status" : "success","doc" : {"_index" : "failed","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","message" : """83.149.9.216 - - [17/May/2015:10:05:03 +000] "GET / HTTP/1.1" 200 24""","response" : "200","bytes" : "24","clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +000"},"_ingest" : {"pipeline" : "_simulate_pipeline","on_failure_message" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21","on_failure_processor_tag" : null,"timestamp" : "2020-11-17T12:42:27.811805Z","on_failure_processor_type" : "date"}}},{"processor_type" : "set","status" : "success","tag" : "mark_failure","doc" : {"_index" : "failed","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","message" : """83.149.9.216 - - [17/May/2015:10:05:03 +000] "GET / HTTP/1.1" 200 24""","response" : "200","bytes" : "24","failure" : {"message" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21","processor" : "date"},"clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +000"},"_ingest" : {"pipeline" : "_simulate_pipeline","on_failure_message" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21","on_failure_processor_tag" : null,"timestamp" : "2020-11-17T12:42:27.811805Z","on_failure_processor_type" : "date"}}}]}]
}

上面的这种处理是在 pipeline 级的处理。

Processor 级处理

我们直接可以针对每个 processor 进行错误的捕获及处理。比如针对 date process:

GET _ingest/pipeline/_simulate?verbose
{"pipeline": {"processors": [{"grok": {"field": "message","patterns": ["%{COMMONAPACHELOG}"]}},{"date": {"field": "timestamp","formats": ["dd/MMM/yyyy:HH:mm:ss Z"],"on_failure": [{"set": {"tag": "set_default_date","field": "@timestamp","value": "{{_ingest.timestamp}}"}}]}},{"remove": {"field": "message"}},{"convert": {"field": "bytes","type": "integer"}},{"convert": {"tag": "convert_reponse","field": "response","type": "integer"}}],"on_failure": [{"set": {"field": "_index","value": "failed"}},{"set": {"tag": "mark_failure","field": "failure","value": {"message": "{{_ingest.on_failure_message}}","processor": "{{_ingest.on_failure_processor_type}}"}}}]},"docs": [{"_source": {"message": """83.149.9.216 - - [17/May/2015:10:05:03 +000] "GET / HTTP/1.1" 200 24"""},"_index": "my_index"}]
}

在上面,我们为 date processor 添加了如下的 on_failure 代码:

      {"date": {"field": "timestamp","formats": ["dd/MMM/yyyy:HH:mm:ss Z"],"on_failure": [{"set": {"tag": "set_default_date","field": "@timestamp","value": "{{_ingest.timestamp}}"}}]}}

当错误发生时,我们直接使用 _ingest.timestamp 作为 @timestamp 的值。运行上面的 pipeline:

{"docs" : [{"processor_results" : [{"processor_type" : "grok","status" : "success","doc" : {"_index" : "my_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","message" : """83.149.9.216 - - [17/May/2015:10:05:03 +000] "GET / HTTP/1.1" 200 24""","response" : "200","bytes" : "24","clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +000"},"_ingest" : {"pipeline" : "_simulate_pipeline","timestamp" : "2020-11-17T12:49:49.720153Z"}}},{"processor_type" : "date","status" : "error","error" : {"root_cause" : [{"type" : "illegal_argument_exception","reason" : "unable to parse date [17/May/2015:10:05:03 +000]"}],"type" : "illegal_argument_exception","reason" : "unable to parse date [17/May/2015:10:05:03 +000]","caused_by" : {"type" : "illegal_argument_exception","reason" : "failed to parse date field [17/May/2015:10:05:03 +000] with format [dd/MMM/yyyy:HH:mm:ss Z]","caused_by" : {"type" : "date_time_parse_exception","reason" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21"}}}},{"processor_type" : "set","status" : "success","tag" : "set_default_date","doc" : {"_index" : "my_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","message" : """83.149.9.216 - - [17/May/2015:10:05:03 +000] "GET / HTTP/1.1" 200 24""","@timestamp" : "2020-11-17T12:49:49.720153Z","response" : "200","bytes" : "24","clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +000"},"_ingest" : {"pipeline" : "_simulate_pipeline","on_failure_message" : "Text '17/May/2015:10:05:03 +000' could not be parsed at index 21","on_failure_processor_tag" : null,"timestamp" : "2020-11-17T12:49:49.720153Z","on_failure_processor_type" : "date"}}},{"processor_type" : "remove","status" : "success","doc" : {"_index" : "my_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","@timestamp" : "2020-11-17T12:49:49.720153Z","response" : "200","bytes" : "24","clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +000"},"_ingest" : {"pipeline" : "_simulate_pipeline","timestamp" : "2020-11-17T12:49:49.720153Z"}}},{"processor_type" : "convert","status" : "success","doc" : {"_index" : "my_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","@timestamp" : "2020-11-17T12:49:49.720153Z","response" : "200","bytes" : 24,"clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +000"},"_ingest" : {"pipeline" : "_simulate_pipeline","timestamp" : "2020-11-17T12:49:49.720153Z"}}},{"processor_type" : "convert","status" : "success","tag" : "convert_reponse","doc" : {"_index" : "my_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","@timestamp" : "2020-11-17T12:49:49.720153Z","response" : 200,"bytes" : 24,"clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +000"},"_ingest" : {"pipeline" : "_simulate_pipeline","timestamp" : "2020-11-17T12:49:49.720153Z"}}}]}]
}

显然在这次运行中,当错误发生时 set_default_date,被调用,并且 @timestamp" : "2020-11-17T12:49:49.720153Z。显然是 ingest pipeline 被执行的时间。这个和之前的文档中的时间相差很远。这个完全依赖于你自己的业务设计,看你具体想使用什么值。

接下来,我们假如我们已经修正了我们的时间,重新变为 +0000。我们把 bytes 的数值修改为一个不可以转换为数值的字符,比如 "-".

    "message": """83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET / HTTP/1.1" 200 -"""

重新运行 pipeline,我们将会发现错误信息:

          "error" : {"root_cause" : [{"type" : "illegal_argument_exception","reason" : "field [bytes] not present as part of path [bytes]"}],"type" : "illegal_argument_exception","reason" : "field [bytes] not present as part of path [bytes]"}

如法炮制,我们可以为这个 processor 定制一个 on_failure:

GET _ingest/pipeline/_simulate?verbose
{"pipeline": {"processors": [{"grok": {"field": "message","patterns": ["%{COMMONAPACHELOG}"]}},{"date": {"field": "timestamp","formats": ["dd/MMM/yyyy:HH:mm:ss Z"],"on_failure": [{"set": {"tag": "set_default_date","field": "@timestamp","value": "{{_ingest.timestamp}}"}}]}},{"remove": {"field": "message"}},{"convert": {"field": "bytes","type": "integer","on_failure":[{"set": {"field": "bytes","value": -1}}]}},{"convert": {"tag": "convert_reponse","field": "response","type": "integer"}}],"on_failure": [{"set": {"field": "_index","value": "failed"}},{"set": {"tag": "mark_failure","field": "failure","value": {"message": "{{_ingest.on_failure_message}}","processor": "{{_ingest.on_failure_processor_type}}"}}}]},"docs": [{"_source": {"message": """83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET / HTTP/1.1" 200 -"""},"_index": "my_index"}]
}

我们添加了如想的代码:

      {"convert": {"field": "bytes","type": "integer","on_failure":[{"set": {"field": "bytes","value": -1}}]}}

也就是说,当错误发生后,我们直接把 bytes 设置为 -1:

{"docs" : [{"processor_results" : [{"processor_type" : "grok","status" : "success","doc" : {"_index" : "my_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","response" : "200","clientip" : "83.149.9.216","verb" : "GET","httpversion" : "1.1","message" : """83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET / HTTP/1.1" 200 -""","timestamp" : "17/May/2015:10:05:03 +0000"},"_ingest" : {"pipeline" : "_simulate_pipeline","timestamp" : "2020-11-17T12:59:19.385189Z"}}},{"processor_type" : "date","status" : "success","doc" : {"_index" : "my_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","message" : """83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET / HTTP/1.1" 200 -""","@timestamp" : "2015-05-17T10:05:03.000Z","response" : "200","clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +0000"},"_ingest" : {"pipeline" : "_simulate_pipeline","timestamp" : "2020-11-17T12:59:19.385189Z"}}},{"processor_type" : "remove","status" : "success","doc" : {"_index" : "my_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","@timestamp" : "2015-05-17T10:05:03.000Z","auth" : "-","ident" : "-","response" : "200","clientip" : "83.149.9.216","verb" : "GET","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +0000"},"_ingest" : {"pipeline" : "_simulate_pipeline","timestamp" : "2020-11-17T12:59:19.385189Z"}}},{"processor_type" : "convert","status" : "error","error" : {"root_cause" : [{"type" : "illegal_argument_exception","reason" : "field [bytes] not present as part of path [bytes]"}],"type" : "illegal_argument_exception","reason" : "field [bytes] not present as part of path [bytes]"}},{"processor_type" : "set","status" : "success","doc" : {"_index" : "my_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","@timestamp" : "2015-05-17T10:05:03.000Z","response" : "200","bytes" : -1,"clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +0000"},"_ingest" : {"pipeline" : "_simulate_pipeline","on_failure_message" : "field [bytes] not present as part of path [bytes]","on_failure_processor_tag" : null,"timestamp" : "2020-11-17T12:59:19.385189Z","on_failure_processor_type" : "convert"}}},{"processor_type" : "convert","status" : "success","tag" : "convert_reponse","doc" : {"_index" : "my_index","_type" : "_doc","_id" : "_id","_source" : {"request" : "/","auth" : "-","ident" : "-","verb" : "GET","@timestamp" : "2015-05-17T10:05:03.000Z","response" : 200,"bytes" : -1,"clientip" : "83.149.9.216","httpversion" : "1.1","timestamp" : "17/May/2015:10:05:03 +0000"},"_ingest" : {"pipeline" : "_simulate_pipeline","timestamp" : "2020-11-17T12:59:19.385189Z"}}}]}]
}

我们可以从上面的输出结果中看出来 byte 已经被设置为 -1。

好了今天的分享就到这里。希望大家知道如何来处理 pipeline 的错误,并做相应的处理。

Elasticsearch:如何处理 ingest pipeline 中的异常相关推荐

  1. Elasticsearch:Ingest pipeline 介绍

    Ingest pipeline 可让你在索引之前对数据执行常见转换. 例如,你可以使用 pipeline 删除字段.从文本中提取值并丰富你的数据. Pipeline 由一系列称为处理器(process ...

  2. Elasticsearch:ingest pipeline 使用示例 - 解析常用日志格式

    在本示例教程中,你将在索引之前使用 ingest pipeline 以通用日志格式解析服务器日志. 在开始之前,请检查摄取管道的先决条件. 你要解析的日志类似于以下内容: 127.0.0.1 user ...

  3. Elasticsearch:Ingest Pipeline 实践

    相比较 Logstash 而言,由于其丰富的 processors 而受到越来越多人的喜欢.最重要的一个优点就是它基于 Elasticsearch 极具可拓展性和维护性而受到开发者的喜欢.我在之前创建 ...

  4. Elasticsearch使用Ingest Pipeline进行数据预处理

    本文基于Elasticsearch7.x Elasticsearch可以使用自身的Ingest Pipeline功能进行数据预处理, 无须借助Logstash. Ingest Pipeline介绍 I ...

  5. java字符串除法函数,java – 函数式编程:如何处理函数式编程中的异常或它的等价物...

    以下显示了如何在Haskell中完成它. 基于类型siginure divide :: Int – > Int – >无论[Char] Int,您都可以看到函数除法将返回Left字符串或R ...

  6. Elasticsearch:ingest pipelines - 使用技巧和窍门

    在今天的文章中,我将列举一些例子来讲述使用 Elasticsearch ingest pipeline (摄取管道)的一些技巧.这些技巧虽然简单,但是在很多的应用场景中还是非常实用的.更多关于 ing ...

  7. Elasticsearch:从零开始创建一个 ingest pipeline 处理器

    实际上在我之前的文章: Elasticsearch:创建属于自己的 Ingest processor Elasticsearch:创建一个 Elasticsearch Ingest 插件 我已经详述了 ...

  8. Elasticsearch:创建 Ingest pipeline

    在 Elasticsearch 针对数据进行分析之前,我们必须针对数据进行摄入.在摄入的过程中,我们需要对数据进行加工,这其中包括非结构化数据转换为结构化数据,数据的转换,丰富,删除,添加新的字段等等 ...

  9. 如何处理Entity Framework中的DbUpdateConcurrencyException异常

    如何处理Entity Framework中的DbUpdateConcurrencyException异常 参考文章: (1)如何处理Entity Framework中的DbUpdateConcurre ...

最新文章

  1. Ajax实现在textbox中输入内容,动态从数据库中模糊查询显示到下拉框中
  2. HttpServletrequest 与HttpServletResponse总结
  3. pppoe移植到arm上 1.0
  4. canal中mysql版本错误日志
  5. 区块链(1)——以太坊下载安装(我营销?营销个屁)
  6. 肖仰华 | 知识图谱与认知智能
  7. jenkins html编辑,Jenkins HTML Publisher插件:Jenkins 1.643没有外部链接
  8. 微信支付开发(3) 对账单
  9. Oracle 空间管理
  10. Spring4新特性——泛型限定式依赖注入
  11. GB2312汉字区位码、交换码和机内码转换方法(转)
  12. 灰色关联分析与预测模型
  13. mx记录什么意思?域名mx记录怎么设置?
  14. response返回中文乱码
  15. ios 获取沙盒文件名_iOS之沙盒路径
  16. 运放专题:虚短、虚短
  17. PostgreSQL对不足位数的查询结果进行前后补0
  18. 我的校招——南京烽火笔试+格力初面
  19. python爬虫之字符集和编码
  20. 微机原理 - 期末考试复习考点

热门文章

  1. 人生理解---3、不想学习的时候怎么办
  2. App Store Review Guidelines中文版-上部
  3. 通过RSRP和SINR判断LTE信号质量
  4. sulley里面Pcapy模块安装容易出错地方
  5. Secret Layer Ligh(数据加密成图片)v2.7.2绿色版
  6. Three.js从入门到放弃
  7. java小组的队名,霸气小组名称口号大全
  8. single-spa结合vue项目初探
  9. 团队作业10——事后诸葛亮分析
  10. 阿里云RDS在线DDL工具gh-ost