Elasticsearch 是一个分布式的 RESTful 搜索和分析引擎,能够解决越来越多的用例。 作为Elastic Stack 的核心,它集中存储你的数据,以实现闪电般的快速搜索,微调的相关性以及易于扩展的强大分析。Elasticsearch 在很多的情况下可以帮我们解决实时的商业数据的分析及统计。 在很多的实时事件处理中,Websocket 经常会用到。WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在 WebSocket API 中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。那么我们如何把 Websocket 数据导入到 Elasticsearch 中呢?

在今天的展示中,我们将使用一个 Python 应用作为 Websocket 的 router。我们的架构如下:

在上面,我们使用 Websocket 来采集数据并把它转为可以被导入的数据结果。这些实时的商业数据可以是股票等信息。这个原理和我们之前摄入 MQTT 的方法是一样的。

准备工作

如果你还没有安装好自己的 Elasticsearch 及  Kibana 的话,那么请你参照我之前的文章 “Elastic:菜鸟上手指南” 来安装 Elasticsearch 及 Kibana。

在今天的练习中,我将使用 https://finnhub.io/ 网站提供的 REST API 接口来进行展示。我们必须申请 API key 以得到数据:

当你签名过后,你可以看到如下所示的 API key:

我们点击上面的 API Documentation:

在左边我们点击 Trades,然后拷贝自己的代码并保存到本地文件 finnhub-websockets.py 中:

finnhub-websockets.py

#https://pypi.org/project/websocket_client/
import websocketdef on_message(ws, message):print(message)def on_error(ws, error):print(error)def on_close(ws):print("### closed ###")def on_open(ws):ws.send('{"type":"subscribe","symbol":"AAPL"}')ws.send('{"type":"subscribe","symbol":"AMZN"}')ws.send('{"type":"subscribe","symbol":"BINANCE:BTCUSDT"}')ws.send('{"type":"subscribe","symbol":"IC MARKETS:1"}')if __name__ == "__main__":websocket.enableTrace(True)ws = websocket.WebSocketApp("wss://ws.finnhub.io?token=c091aan48v6tm13rku80",on_message = on_message,on_error = on_error,on_close = on_close)ws.on_open = on_openws.run_forever()

我们接下来安装 websocket-client:

pip3 install websocket-client

然后,我们直接运行上面的 Python 应用:

python3 finnhub-websockets.py

我们将看到如下的类似的输出:

好了,看起来我们的 API 是成功的。

导入数据到 Elasticsearch

我们接下来把我们的数据导入到 Elasticsearch 中去。我们访问网址 https://elasticsearch-py.readthedocs.io/en/v7.10.1/。首先,我们必须安装 elasticsearch 安装包:

pip3 install elasticsearch

我们可以参考我之前的文章 “Elasticsearch 开发入门 - Python”。我们可以安装那篇文章中介绍的方式来进行 ES 的连接,并进一步修改我们的 finnhub-websocket.py 文件:

finnhub-websocket.py


#https://pypi.org/project/websocket_client/
import json
import datetime
import websocket
from elasticsearch import Elasticsearches = Elasticsearch([{'host':'localhost','port':9200}])def on_message(ws, message):message_json = json.loads(message)message_json["@timestamp"] = datetime.datetime.utcnow().isoformat()res = es.index(index="websockets-data", body=message_json)print(message_json)def on_error(ws, error):print(error)def on_close(ws):print("### closed ###")def on_open(ws):ws.send('{"type":"subscribe","symbol":"AAPL"}')ws.send('{"type":"subscribe","symbol":"AMZN"}')ws.send('{"type":"subscribe","symbol":"TSLA"}')ws.send('{"type":"subscribe","symbol":"ESTC"}')if __name__ == "__main__":websocket.enableTrace(True)ws = websocket.WebSocketApp("wss://ws.finnhub.io?token=<my-finnhub-token>",on_message = on_message,on_error = on_error,on_close = on_close)ws.on_open = on_openws.run_forever()

在上面你需要把自己的 token 填入到上面的代码中。这里我说明一下。

  1. es 变量是建立和 Elasticsearch 的连接。你需要根据自己的部署而修改上面的地址及端口。如果必要,你还需要提供相应的账号信息来进行连接
  2. on_message 中,我们添加了当前的时间戳,这样可以使得我们我们的数据具有时效性,从而可以更精准地分析数据
  3. 我们的数据在 Elasticsearch 中被保存在 websockets-data 索引中。

我们重新运行我们的应用:

python3 finnhub-websockets.py 

同样地,我们可以看到数据源源不断地导入到 Elasticsearch 中。

我们可以在 Kibana 中通过如下的命令来查看新生产的 websockets-data 索引:

GET  _cat/indices
yellow open websockets-data                 n3RU2Ze8Rj-hVi3a8H3-zw 1 1  1   0    4kb    4kb
green  open .apm-custom-link                Fqq-lxCiQHKib8kxdO0uoQ 1 0  0   0   208b   208b
green  open .kibana_task_manager_1          29ilRYTkSOSx1aFtR0DUWQ 1 0  5 213 89.3kb 89.3kb
green  open .apm-agent-configuration        yY8-Sbn8TbWac4R_l1-8qQ 1 0  0   0   208b   208b
green  open .kibana-event-log-7.10.0-000001 g7vkPKUHQiqxfpJDGnvKmw 1 0  1   0  5.6kb  5.6kb
green  open .kibana_1                       oF471rX0R8Cu4H1tvE813Q 1 0 18   2 10.4mb 10.4mb

我们可以为这个索引创建一个索引模式:

在目前写这篇文章的时候,不是美国的交易时间,所以 websocket 里暂时没有数据。在交易的时间,Websocket 会自己向 Elasticsearch 推送数据。我们会发现如下的这些字段:

上面的字段的定义,我们可以在地址找到:

我们可以使用  Kibana 中的 Lens 为我们的 Stock 数据进行实时的数据分析:

总结

在本篇文章中,我们介绍了如何使用 Python 语言作为一个 router 把 websocket 所生成的实时数据导入到 Elasticsearch 中,并在 Elasticsearch 中进行分析。

Elastic:如何摄入 Websocket 数据到 Elasticsearch相关推荐

  1. Observability:我们该选 Beats 还是 Elastic Agents 来采集数据?

    除了 Logstash 之外,Elastic 提供了两种主要的方式来向 Elasticsearch 发送数据: 我们可以选择直接把数据从 Beats 发送至 Elasticsearch.当然我们也可以 ...

  2. elastic如何和mysql同步数据_MySQL数据库之mysql 同步数据到 ElasticSearch 的方案

    本文主要向大家介绍了MySQL数据库之mysql 同步数据到 ElasticSearch 的方案 ,通过具体的内容向大家展现,希望对大家学习MySQL数据库有所帮助. MySQL Binlog 要通过 ...

  3. 【技术实验】表格存储Tablestore准实时同步数据到Elasticsearch

    实验背景 图书馆Q是一家大型图书馆,图书馆藏书众多,纸质图书600多万册,电子图书7000多万册,总数有八千多万册,这些图书之前都是人工检索维护的,现在需要做一个系统来存储管理这些图书信息. 需求如下 ...

  4. 导入json数据到Elasticsearch(bulk方法)

    一.前言 在前面几章,基本把本地的环境给配置好了,那么配置好了之后,要做的第一件事当然就是导入数据进去.我这边准备的是一份json数据,这里通过ES的bulk API给导入进去. 二.导入数据 1.批 ...

  5. Day 4 - PB级规模数据的Elasticsearch分库分表实践

    Day 4 - PB级规模数据的Elasticsearch分库分表实践 从2018年7月在开始在某阿里云数据中心部署Elasticsearch软件,到2018年12月共创建了15个集群,服务于客户的文 ...

  6. 《从0到1学习Flink》—— Flink 写入数据到 ElasticSearch

    前言 前面 FLink 的文章中我们已经介绍了说 Flink 已经有很多自带的 Connector. 1.<从0到1学习Flink>-- Data Source 介绍 2.<从0到1 ...

  7. NodeJS同步MySQL上游数据到ElasticSearch数据库中

    NodeJS同步MySQL上游数据到ElasticSearch数据库中 项目地址: https://github.com/Miazzy/xdata-elasticsearchs-service.git ...

  8. MySQL从零到一解读增量同步数据到elasticsearch canal adapter方式(binlog)实现

    本文是作者在单机上面从零到一实现增量同步MySQL数据到elasticsearch canal adapter方式(binlog)实现. 实现步骤 (1)安装MySQL (2)开启MySQL binl ...

  9. AWS强烈反击Elastic,欲打造自己的Elasticsearch开源产品OpenSearch

    近期,Elastic开发者向Elasticsearch-py提交了一个PR,增加了一个验证逻辑,限制用户使用Elasticsearch客户端连接到由AWS创建维护的Elasticsearch分支Ope ...

最新文章

  1. 2018会考计算机成绩查询入口,2018年山东会考成绩查询时间及入口
  2. python 密度聚类 使用_使用wgd进行全基因组复制分析
  3. spark.kubernetes.file.upload.path的作用
  4. linux清空redis命令,使用Linux管道批量删除Redis的key
  5. 发际线不符合有经验形象,程序员面试遭拒绝,网友:只招秃子?
  6. PHP 7.3 比 PHP 7.0 快 22%,即将进入特性冻结阶段
  7. 深度学习_pytorch中的forward()的使用与解释
  8. 《实施Cisco统一通信管理器(CIPT1)》一1.3 总结
  9. 信号与线性系统管致中第六版pdf_【对讲机的那点事】无线电天馈系统中载频合路器的作用...
  10. 【游戏开发实战】(完结)使用Unity制作水果消消乐游戏教程(九):使用UGUI显示游戏UI
  11. 【泡咖啡1】linux下caffe编译以及python环境配置手记
  12. (48)性能测试——聚合报告
  13. 豆瓣电影评分(微信小程序)——Day1
  14. 苹果7p服务器维护中,苹果7p无服务怎么解决
  15. 什么是浏览器隐私模式?浏览器隐私模式是否安全?
  16. mysql数据库更改密码
  17. python利用有道词典翻译_Python利用有道词典接口制作即时翻译的工具
  18. 谈谈步进电机的优点与缺点
  19. 表关联之内关联用法案例详解
  20. 根据图片名字生成文件夹并归类+批量重命名照片名

热门文章

  1. android 手机短信恢复,安卓手机短信删除了怎么恢复?简单恢复的方法
  2. 小测试整理(含T1 T2)
  3. 双排桩弯矩Matlab求解程序,考虑开挖过程椅式双排桩内力及变形分析
  4. 【单片机】【数码管】数码管显示
  5. 信号量sem_wait()函数的学习
  6. 立秋了,愿天下人天寒心不寒,快快乐乐
  7. Backtrader:用feather格式股票数据代替tushare进行数据回测
  8. python 为女神编朵玫瑰花的代码,python绘制玫瑰的代码
  9. python求斜边上的高是多少厘米_已知一个直角三角形的两条直角边,如何求斜边上的高的长度...
  10. [教程分享]锐族MP3刷固件教程