logstash

etl 工具

concepts
pipeline
input - filter -output 阶段
插件生命周期管理
队列管理

logstash event
数据进入成为一个event , 可进行增删改

架构
input插件
jdbc

filter插件
date

output插件
es

codec plugins
将原始数据decode 成event ; 将event encode成目标数据

内置的codec plugins
line / multiline 解析行, 多行
json / avro 解析json

filter plugins
mutate 操作event字段
metrics agg
ruby 执行ruby代码

pipeline
支持多pipeline , 可调整pipeline的并发处理线程数 和 一批数据量, 默认125条event

queue
codec 之后放入queue中, 防止丢失数据

有内存的队列和 持久化队列 , 内存队列宕机丢数据 , 持久化可保证数据不丢失被消费

安装
linux logstash 启动太慢 , 有说是熵值低,或者堆内存低, 都调了 ,不好使, 用windows版的logstash演示吧
logstash 7.1.0
wget
jdk

input plugin
解析单行 , 解析多行 ,
通过匹配正则, 判断多行是否为一条数据 , 比如java的异常堆栈日志 , 判断下一行开头非字母, 则跟上一行为同一条
可对文件读取, logstash记录读取到的位置, 接着上次的位置读取

filter plugin
日期解析
分隔符解析
正则匹配
mutate 处理字段 , 重命名
ruby代码来修改event

案例
logstash读取csv文件, 内容为

movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance

logstash.conf中配置内容

input 的file 定义了文件位置 ,
之后event会经过filter中每个plugin ,
csv 定义逗号进行分割, 并产生新的3个字段
mutate 对genre字段 | 进行分隔 , 移除4个多余字段
最后output 到es中

input {file {path => "D:/Develop/software/logstash-7.10.1/bin/movies.csv"start_position => "beginning"}
}
filter {csv {separator => ","columns => ["id","content","genre"]}mutate {split => { "genre" => "|" }remove_field => ["path", "host","@timestamp","message"]}mutate {split => ["content", "("]add_field => { "title" => "%{[content][0]}"}add_field => { "year" => "%{[content][1]}"}}mutate {convert => {"year" => "integer"}strip => ["title"]remove_field => ["path", "host","@timestamp","message","content"]}}
output {elasticsearch {hosts => "http://127.0.0.1:9200"index => "movies"document_id => "%{id}"}stdout {}
}

利用jdbc插件 导入数据到es

案例
将mysql数据同步到es中
mysql中的数据变动将同步到es中,
支持增量更新
用户注销后, 不能被搜索到

环境
安装mysql ,https://downloads.mysql.com/archives/installer/
创建db_example表;
mysqlapp工程 , 使用springboot,通过gradle/ maven构建, 项目启动后 hibernate 通过实体创建对应表;
启动es , kibana , 删除users索引

DELETE users

mysql-demo.yaml

input {jdbc {jdbc_driver_library => "D:/Develop/software/logstash-7.10.1/lib/jars/mysql-connector-java-8.0.17.jar"jdbc_driver_class => "com.mysql.jdbc.Driver"jdbc_connection_string => "jdbc:mysql://localhost:3306/db_example?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai"jdbc_user => rootjdbc_password => *******#启用追踪,如果为true,则需要指定tracking_columnuse_column_value => true#指定追踪的字段,tracking_column => "last_updated"#追踪字段的类型,目前只有数字(numeric)和时间类型(timestamp),默认是数字类型tracking_column_type => "numeric"#记录最后一次运行的结果record_last_run => true#上面运行结果的保存位置last_run_metadata_path => "jdbc-position.txt"statement => "SELECT * FROM user where last_updated >:sql_last_value;"schedule => " * * * * * *"}
}
output {elasticsearch {document_id => "%{id}"document_type => "_doc"index => "users"hosts => ["http://localhost:9200"]}stdout{codec => rubydebug}
}

# 新增用户
curl localhost:8080/demo/add -d name=Mike -d email=mike@xyz.com -d tags=Elasticsearch,IntelliJ
curl localhost:8080/demo/add -d name=Jack -d email=jack@xyz.com -d tags=Mysql,IntelliJ
curl localhost:8080/demo/add -d name=Bob -d email=bob@xyz.com -d tags=Mysql,IntelliJ

看logstash日志 , 输出到了es中

记录了更新的时间 用来查询之后变动的数据

es中自动创建了users的索引 , 并索引了数据

之后修改 , 删除数据 , 看日志和kibana中的变化


# 更新用户
curl -X PUT localhost:8080/demo/update -d id=6 -d name=Bob2 -d email=bob2@xyz.com -d tags=Mysql,IntelliJ# 删除用户
curl -X DELETE localhost:8080/demo/delete -d id=15


修改后, 同步到es中的version仍然为1 , 说明是删除后新增的

针对未删除的数据,在es中创建视图 , 使用别名并过滤出未逻辑删除的数据

# 创建 alias,只显示没有被标记 deleted的用户
POST /_aliases
{"actions": [{"add": {"index": "users","alias": "view_users","filter" : { "term" : { "is_deleted" : false } }}}]
}# 通过 Alias查询,查不到被标记成 deleted的用户
POST view_users/_search
{}POST view_users/_search
{"query": {"term": {"name.keyword": {"value": "Jack"}}}
}

beats介绍

日志, 指标收集

metricbeat

组成
module 用来收集指标对象, 例如不同的操作系统 , 数据库

metricset:
一个module由多个set组成
具体的指标集合 , 不同的set可以设置不同的抓取时长

module
提供大量 开箱即用的module, 默认都是关闭的

metricbeat官方文档

案例
windows版操作

查看当前module

.\metricbeat modules list

开启mysql module

.\metricbeat modules enable mysql

开启kibana中metricbeat 的控制面板

.\metricbeat setup --dashboards

修改modules.d文件夹中的mysql.yml内容 , 连接mysql

启动metricbeat

 .\metricbeat

可以在dashboard搜索mysql, 查询几次, 看到mysql相关的

packetbeat 用来抓包

其他beats

index pattern配置数据

环境准备
创建索引

PUT /logstash-2015.05.18
{"mappings": {"properties": {"geo": {"properties": {"coordinates": {"type": "geo_point"}}}}}
}PUT /logstash-2015.05.19
{"mappings": {"properties": {"geo": {"properties": {"coordinates": {"type": "geo_point"}}}}}
}PUT /logstash-2015.05.20
{"mappings": {"properties": {"geo": {"properties": {"coordinates": {"type": "geo_point"}}}}}
}

开启es ,
在logs.jsonl所在目录中启动cmd, 导入文件中的文档

curl命令

curl localhost:9200/_bulk?pretty -X POST -H "Content-Type: application/x-ndjson" --data-binary @logs.jsonlcurl -H "Content-Type: application/x-ndjson" -XPOST localhost:9200/bank/account/_bulk?pretty --data-binary @accounts.json

导入完成后, 创建该index pattern , 用来方便查看批量索引

选择时间戳字段 , timestamp
列表示能否被聚合, 搜索
scripted filed 增加脚本字段 , 还可以过滤字段

kibana discover

对刚创建 的logstash index pattern 进行展示 ,
过滤出文档的事件发生时间 , 过滤字段值 , 点击字段可以看到聚合的指标信息

基本可视化组件介绍

visualize 选择创建饼图 , 选择ecommerce , 分析消费账单

选择时间 , 创建了指标为count , 分桶中选择消费总额, 性别 和区域 , 效果为不同消费区间下不同性别的前几个地区的消费者

点击inspect 可以看到请求体

dashboard

了解kibana的样例怎么配置的

用 monitoring 和 alerting 监控es集群

monitoring xpack 免费功能 , 生产生部署额外集群监控业务集群 , 如果业务集群自身监控自己获取不到自己的信息

watcher for alerting
需要gold 账户 , 不是免费的功能

一个watcher由5个部分组成
trigger 多久被触发一次
input 查询条件
condition 查询是否满足
action 相关操作 , 报警 ,邮件等

7.10版本alert是x-pack , 好像不收费了
https://www.elastic.co/guide/en/kibana/7.10/alert-action-settings-kb.html#general-alert-action-settings

apm进行程序性能监控

apm 应用表现管理, 通过将agent部署在应用端作为客户端, 采集运行指标并上报给apm server端, 完成应用指标的收集

环境准备

下载 apm

windows版演示 , 启动server

.\ apm-server

kibana上检查server状态 , 可以在apm server 的log目录查看日志

agent运行原理
https://www.elastic.co/guide/en/apm/agent/java/1.x/faq.html#faq-how-does-it-work

agent与其他es组件版本不绑定, 建议去下载最新的agent
https://search.maven.org/artifact/co.elastic.apm/elastic-apm-agent

打开之前mysql的springboot 应用, 用mvn打包 , 用命令运行
指定了apm需要的参数

java -javaagent:./elastic-apm-agent-1.6.1.jar -Delastic.apm.service_name=my-application -Delastic.apm.server_url=http://localhost:8200  -Delastic.apm.application_packages=hello -jar ./target/gs-mysql-data-0.1.0.jar

或者配置在idea的project里 ,用idea运行

最后检查agent 状态可能会显示没有数据 , 直接点之后的按钮 , 把数据加载进kibana , 最后看ui

机器学习实现时序数据的异常检测

机器学习付费功能 , 可配置单一职责机器学习节点 , 防止影响业务资源;

案例
通过现有数据的聚合计算, 定义正常/异常标准 , 预测未来走势 ,

single-mertric : 单字段指标聚合,检测一系列时间中异常指标, 针对异常指标人工订正, 排除脏数据

multi-mertric : 多字段指标 ,
平铺多个指标的折线图 , 选择时间间隔, 设置key分组, 出现不同的 app分栏

population
检测个体表现与群体不同的 , 例如用户多日的发包数量

指定用户作为区分数据标识 和 发包数量为指标字段 ,

elk日志管理

收集 , 分析 , 检索可视化 , 风险告警

filebeat

简介
prospector 抓取 日志文件
harvester 发送至 libbeat , output到下游

发送到es中 , es需要提前创建index template
filebeat对数据分析处理不好, 需要增加logstash 或者ingest pipeline额外处理
可视化由kibana dashboard提供

filebeat 也有很多组件相关的moudle用来快速配置抓取日志文件

下载

windows 版filebeat演示

查看 modules  列表
./filebeat modules list
开启指定模块 windows没有system这个
~~./filebeat modules enable system~~ ./filebeat modules enable elasticsearch

进 modules.d 编辑相应的文件,修改log路径
var.paths: 中的内容格式要一直

var.paths: ["D:\\Develop\\software\\elasticsearch-7.10.1-windows-x86_64\\elasticsearch-7.10.1\\logs\\elasticsearch_server.json"]

开启kibana的dashboard
启动

./filebeat setup./filebeat

看到新建了filebeat索引

和 index pattern 还有 index life cycle

log中能看到实时日志

canvas数据演示

表现力更强 ,

不同版本kibana canvas的语法不同 ,

电影应用搜索案例

项目架构分析及架构设计
需求 一个电影搜索类应用 , 前台可视化配置 , 后台运营人员 可以通过埋点进行数据分析, 同义词配置, 帮助增加搜索体验

es app search 是一个商业收费应用 , 基于es系统之上 , 后端管理应用 , 能做到数据分析 ,同义词配置等, 前端生成node.js应用

https://www.elastic.co/guide/en/app-search/8.2/index.html

导入数据至es
使用tmdb数据
搭建电影搜索服务
es app search 的reference ui , 快速搭建生成node.js前端程序

用户问卷案例

需求分析架构设计
使用Stack Overflow 开源问卷调查结果 , 通过logstash 导入es , kibana 做展示

数据 extract & enrichment
下载问卷统计 https://insights.stackoverflow.com/survey

修改logstash 文件

input {file {path => "D:/Develop/software/logstash-7.10.1/survey_results_public.csv"start_position => "beginning"}
}filter {csv {autogenerate_column_names => falseskip_empty_columns => truecolumns => ["Respondent","MainBranch","Hobbyist","OpenSourcer","OpenSource","Employment","Country","Student","EdLevel","UndergradMajor","EduOther","OrgSize","DevType","YearsCode","Age1stCode","YearsCodePro","CareerSat","JobSat","MgrIdiot","MgrMoney","MgrWant","JobSeek","LastHireDate","LastInt","FizzBuzz","JobFactors","ResumeUpdate","CurrencySymbol","CurrencyDesc","CompTotal","CompFreq","ConvertedComp","WorkWeekHrs","WorkPlan","WorkChallenge","WorkRemote","WorkLoc","ImpSyn","CodeRev","CodeRevHrs","UnitTests","PurchaseHow","PurchaseWhat","LanguageWorkedWith","LanguageDesireNextYear","DatabaseWorkedWith","DatabaseDesireNextYear","PlatformWorkedWith","PlatformDesireNextYear","WebFrameWorkedWith","WebFrameDesireNextYear","MiscTechWorkedWith","MiscTechDesireNextYear","DevEnviron","OpSys","Containers","BlockchainOrg","BlockchainIs","BetterLife","ITperson","OffOn","SocialMedia","Extraversion","ScreenName","SOVisit1st","SOVisitFreq","SOVisitTo","SOFindAnswer","SOTimeSaved","SOHowMuchTime","SOAccount","SOPartFreq","SOJobs","EntTeams","SOComm","WelcomeChange","SONewContent","Age","Gender","Trans,Sexuality","Ethnicity","Dependents","SurveyLength","SurveyEase"]}if ([collector] == "collector") {drop {}}mutate { remove_field => ["message", "@version", "@timestamp", "host"] }
}
output {stdout { codec => "dots" }elasticsearch {hosts => ["http://localhost:9200"]  index => "stackoverflow-survey-raw"document_type => "_doc"}
}

启动 logstash

logstash -f logstash-stackoverflow-survey.conf

导入完成后 , kibana查看索引映射,

GET stackoverflow-survey-raw/_mapping

其中的有很多text字段, 对聚合分析只需要keyword就可以了
之后要进行reindex , 设定新索引映射修改字段类型

PUT final-stackoverflow-survey
{"mappings": {"dynamic_templates": [{"strings_as_keywords": {"match_mapping_type": "string","mapping": {"type": "keyword"}}}]},"settings": {"number_of_replicas": 0}
}

查看具体数据, 有不少数字型字段被自动映射成字符类 , 还有字符类要订正成数组型字段, 方便进行term agg

GET stackoverflow-survey-raw/_search

定义reindex 用的ingest , 字符转数字型考虑转换失败后给默认值0


PUT _ingest/pipeline/stackoverflow_pipeline
{"description": "Pipeline for stackoverflow survey","processors": [{"split": {"field": "DatabaseDesireNextYear","separator": ";"}},{"split": {"field": "DatabaseWorkedWith","separator": ";"}},{"split": {"field": "DevEnviron","separator": ";"}},{"split": {"field": "LanguageWorkedWith","separator": ";"}},{"split": {"field": "MiscTechDesireNextYear","separator": ";"}},{"split": {"field": "PlatformWorkedWith","separator": ";"}},{"split": {"field": "PlatformDesireNextYear","separator": ";"}},{"split": {"field": "WebFrameWorkedWith","separator": ";"}},{"split": {"field": "WebFrameDesireNextYear","separator": ";"}},{"split": {"field": "Containers","separator": ";"}},{"script": {"source": """try{ctx.YearsCode = Integer.parseInt(ctx.YearsCode);}catch(Exception e){ctx.YearsCode = 0;}
"""}},{"script": {"source": """try{ctx.WorkWeekHrs = Integer.parseInt(ctx.WorkWeekHrs);}catch(Exception e){ctx.WorkWeekHrs = 0;}
"""}},{"script": {"source": """try{ctx.Age = Integer.parseInt(ctx.Age);}catch(Exception e){ctx.Age = 0;}
"""}},{"script": {"source": """try{ctx.Age1stCode = Integer.parseInt(ctx.Age1stCode);}catch(Exception e){ctx.Age1stCode = 0;}
"""}},{"script": {"source": """try{ctx.YearsCodePro = Integer.parseInt(ctx.YearsCodePro);}catch(Exception e){ctx.YearsCodePro = 0;}
"""}}]
}

reindex 后查看映射

POST _reindex?wait_for_completion=false
{"source": {"index": "stackoverflow-survey-raw"},"dest": {"index": "final-stackoverflow-survey","pipeline": "stackoverflow_pipeline"}
}GET final-stackoverflow-survey/_mapping

集群数据备份

集群升级前对数据进行备份 , 备份支持本地 , hdfs 云盘
快照是基于索引进行的 , 所以可以增量备份 , 指定索引备份/恢复

配置备份地址后 启动es

path.repo: ["D:/Develop/software/elasticsearch-7.10.1-windows-x86_64/elasticsearch-7.10.1/back_up"]

kibana中操作

#创建一个 repositoty
PUT /_snapshot/my_fs_backup
{"type": "fs","settings": {"location": "D:/Develop/software/elasticsearch-7.10.1-windows-x86_64/elasticsearch-7.10.1/back_up","compress": true}
}# 创建一个snapshot
PUT /_snapshot/my_fs_backup/snapshot_1?wait_for_completion=trueDELETE testPUT test/_doc/1
{"key":"value1"
}
#指定索引创建快照
PUT /_snapshot/my_fs_backup/snapshot_2?wait_for_completion=true
{"indices": "test","ignore_unavailable": true,"include_global_state": false,"metadata": {"taken_by": "yiming","taken_because": "backup before upgrading"}
}#查看所有的快照
GET /_snapshot/my_fs_backup/_all
# 删除快照
DELETE /_snapshot/my_fs_backup/snapshot_2# 指定快照恢复 , 存在与当前重名的索引 , 会失败
POST /_snapshot/my_fs_backup/snapshot_1/_restore
{}
# 删除索引后 , 确认 , 再指定索引恢复 , 再查看效果
DELETE stackoverflow-survey-rawGET stackoverflow-survey-raw# 指定索引进行 restore
POST /_snapshot/my_fs_backup/snapshot_1/_restore
{"indices": "stackoverflow-survey-raw","index_settings": {"index.number_of_replicas": 1},"ignore_index_settings": ["index.refresh_interval"]
}

备份的目录存在两个快照文件 , 全部索引有400MB

# 删除快照后, 不可查询 , 但实际仍存在, 属于逻辑删除 , 延迟删除
DELETE /_snapshot/my_fs_backup

基于java和es构建应用

es java client 工具 6.0 前 low-level , 之后为high-level, high-level 增加了异步调用

springdata 支持 es的版本

演示用win 的 es 6.2.2 , 用jdk1.8

java工程的 配置文件中声明es集群配置

spring:application:name: sample-spring-elasticsearchdata:elasticsearch:cluster-name: springboot-democluster-nodes: 127.0.0.1:9300elasticsearch:rest:uris: http://127.0.0.1:9200initial-import:enabled: true

initial-import变量用来控制初始化 , 启动类加载了bean , bean中定义了PostConstruct 构造后执行, 向es造了些数据,



实体声明了索引 和映射相关的

#访问 挑一个 name
http://localhost:9200/employees/_search#验证java应用查询
http://localhost:8080/employees/calandra.keeling

high level 案例

ps: es client 高版本将 low 和 high进行了统一 为 java api client

用教程中 7.3.2 演示 , pom中指定相同版本号

   <dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId><version>7.3.2</version></dependency><dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>7.3.2</version></dependency>

启动项目后 , 通过不同请求方式 调用不同接口

elasticsearch学习 part4相关推荐

  1. ElasticSearch学习总结(三):查询总结

    ElasticSearch学习总结(三):查询总结 本文主要对Elasticsearch中查询相关的知识做一个简单的总结,内容主要包括查询的评分机制,查询改写,过滤器,以及对常见的查询做一个简单的分类 ...

  2. Elasticsearch 学习(二).实战使用

    Elasticsearch 学习(二).实战使用 参考:http://www.passjava.cn/#/01.PassJava/02.PassJava_Architecture/15.Elastic ...

  3. Elasticsearch学习第二篇--常用的几种搜索方式

    Elasticsearch学习第二篇--常用的几种搜索方式 一.Query String Search 查询全部 条件查询 二.Query DSL 查询全部 条件查询 三.Query Filter 四 ...

  4. elasticSearch学习笔记04-同义词,停用词,拼音,高亮,拼写纠错

    由于elasticSearch版本更新频繁,此笔记适用ES版本为 7.10.2 此笔记摘录自<Elasticsearch搜索引擎构建入门与实战>第一版 文中涉及代码适用于kibana开发工 ...

  5. ElasticSearch学习之Kibana(一)

    文章目录 前言 安装elasticsearch elasticsearch基础 定义 基础 安装Kibana 安装 查看服务状态 Kibana索引管理 增加索引(数据库) 获取索引(数据库) 删除索引 ...

  6. Elasticsearch学习(四) - 查询①

    title: Elasticsearch学习(四)-查询① date: 2020-10-29 tags: Elasticsearch Elasticsearch学习(四)-查询① categories ...

  7. Elasticsearch学习-Doc与Segment原理

    Elasticsearch学习-Doc与Segment原理 0x00 系列文章目录 Elasticsearch学习-关于倒排索引.DocValues.FieldData和全局序号 Elasticsea ...

  8. ElasticSearch 学习笔记:Multi Search

    本文目录 1 简介 2 格式 3 header格式 4 body格式 5 返回格式 6 性能 7 相关文章 1 简介 批量查询接口(Multi Search API)允许在一次请求中执行多个查询操作, ...

  9. ElasticSearch学习笔记-ngram、中文拼音、简繁体搜索记录

    ElasticSearch版本:elasticsearch-7.3.0 ElasticSearch相关插件安装可以参考: ElasticSearch学习笔记-插件安装记录_人生偌只如初见的博客-CSD ...

最新文章

  1. 初识广度优先搜索与解题套路
  2. Windows核心编程 第十八章 堆栈
  3. echarts雷达图线的样式_echarts 雷达图的个性化设置
  4. 实现自己的“单页”博客,只需要一个指令 (Moka)
  5. Linux Shell基础 - 流程控制 - for循环 - while 循环 - until循环
  6. weblogic双机热备部署linux,WebLogic应用在集群环境下的一些基本知识【转载】
  7. 逻辑分析推理(戴帽子问题)博弈
  8. java用接口实例化对象_[求助]迷茫中,接口可以直接实例化对象吗?
  9. python ftp 设置代理_用Python搭建一个简单的代理池
  10. 用计算机进行有理数除法时,有理数的乘除法怎么算?,什么是有理数的乘除法。越详细越好。...
  11. 心理学实验必备 | 脑电实验流程及注意事项
  12. android 4.4 投屏,安卓投屏助手官方
  13. 利用中文维基百科训练词向量模型
  14. python数据结构题目_python数据结构_递归python数据结构_python数据结构 面试题 - 云+社区 - 腾讯云...
  15. 微信小程序电子签名及图片生成
  16. 金士顿服务器内存条怎么看型号,Win10怎么查看内存条型号?
  17. 28个在线游戏编程学习网站
  18. 极速office(Word)怎么调出标尺
  19. PyTorch中的torch.clamp()实现矩阵裁剪
  20. 谷氨酰胺合成酶(Glutamine synthetase,GS)试剂盒说明书

热门文章

  1. byval 和byref的区别,今天刚明白。
  2. 智慧教室—基于人脸表情识别的考试防作弊系统
  3. 猪八戒java开发,猪八戒--Java开发
  4. Java图片嵌套图片
  5. mysql uftb8mb4 储存 emoji 表情失败
  6. 计算机学校宣传片创意和构思,【差一步就会02】用PPT做影视宣传片
  7. python协程池_python3下multiprocessing、threading和gevent性能对比—-暨进程池、线程池和协程池性能对比 | 学步园...
  8. 加速编码的17款最棒的CSS工具
  9. 支付宝和微信支付合作伙伴RiverPay加速全球化战略布局
  10. android连接小票打印机,打印小票数据的两种模式