本节书摘来华章计算机《Storm实时数据处理》一书中的第2章 ,第2.3节,(澳)Quinton Anderson 著 卢誉声 译更多章节内容可以访问云栖社区“华章计算机”公众号查看。

2.3 创建日志Spout

日志Topology通过Redis通道读取所有logstash产生的日志,这些日志数据会通过本章介绍的Spout发送到Topology中。由于这是一个全新的Topology,因此我们先来新建一个Topology项目。

2.3.1 实战

我们先来创建项目目录和标准的Maven目录结构(标准结构可以参考:http://maven.apache.org/guides/introduction/introduction-to-the-standard-directory-layout.html)。

Step01 参照第1章中创建“Hello World” Topology的方法创建POM文件,修改和标签,然后添加以下依赖项:



Step02 通过以下命令生成Eclipse工程文件,然后将工程文件导入Eclipse:

Step03 日志Topology中的Tuple会携带日志域对象,该对象封装了数据和解析日志文件中单个记录的逻辑。在我们的项目中创建这个域对象:

这个代码片段中并没有包含getter、setter和equals方法,但还是有必要依次实现它们。对单元测试来说,equals方法有很大用处。
Step04 接下来创建Logspout类,并继承BaseRichSpout接口。参照第1章的方法实现这个类,声明下面这个字段:

然后发送接收到的日志记录到Topology中,代码如下:

应尽量避免直接在代码中使用字符串。虽说Tuple能在运行时高效地处理这些局部变量,但在编译链接的时候,初始化带有字符串元素的代码并没有任何意义。因此,我们还是建议使用静态变量定义代替这种直接使用字符串的方法。

2.3.2 解析

Redis Spout的实现我们已经熟谙于心,本节重点阐述LogEntry类域对象中的解析逻辑。logstash会以单个JSON值的形式向Redis通道发送日志。这些JSON值的格式如下所示:

2.3.3 补充内容

JSON格式包含两种关键结构,分别是JSON对象(JSON Object)和JSON数组(JSON Array)。JSON主页(www.json.org)提供了针对这两种结构的简要定义,为了方便起见,我们就直接在这里罗列出这两种结构的定义。对象是一个无序的名/值对(name/value pair)集合。一个对象以“{”(左括号)开始,以“}”(右括号)结束。每个名称后跟一个“:”(冒号),名/值对之间使用“,”(逗号)分隔,如图2-2所示。

数组是值(value)的有序集合。一个数组以“[”(左中括号)开始,以“]”(右中括号)结束。值之间使用“,”(逗号)分隔,如图2-3所示。

值(value)可以是双引号括起来的字符串(string)、数值(number)、true、false、 null、对象(object)或者数组(array)。这些结构可以嵌套。
LogEntry对象的构造函数包含一个JSONObject对象作为参数,并根据该对象的成员对自身进行初始化。可以通过toJSON()方法把LogEntry对象转换成对应的JSONObject对象,以备不时之需。LogEntry通过com.googlecode.json-simple库中的工具方法,将字符串转换成可用的JSON结构。
虽然结构定义清晰,但“日期-时间”却有多种不同的格式。parseDate()方法采取最优的方法解析各种日期格式。FORMATS类成员变量定义了支持的“日期-时间”格式。

《Storm实时数据处理》一2.3 创建日志Spout相关推荐

  1. 《Storm实时数据处理》一2.7 为日志流集群创建集成测试

    本节书摘来华章计算机<Storm实时数据处理>一书中的第2章 ,第2.7节,(澳)Quinton Anderson 著 卢誉声 译更多章节内容可以访问云栖社区"华章计算机&quo ...

  2. 《Storm实时数据处理》一2.6 统计与持久化日志统计信息

    本节书摘来华章计算机<Storm实时数据处理>一书中的第2章 ,第2.6节,(澳)Quinton Anderson 著 卢誉声 译更多章节内容可以访问云栖社区"华章计算机&quo ...

  3. 美团点评基于Storm的实时数据处理实践

    背景 目前美团点评已累计了丰富的线上交易与用户行为数据,为商家赋能需要我们有更强大的专业化数据加工能力,来帮助商家做出正确的决策从而提高用户体验.目前商家端产品在数据应用上主要基于离线数据加工,数据生 ...

  4. 使用 Kafka 和 Spark Streaming 构建实时数据处理系统

    使用 Kafka 和 Spark Streaming 构建实时数据处理系统  来源:https://www.ibm.com/developerworks,这篇文章转载自微信里文章,正好解决了我项目中的 ...

  5. Apache Pulsar:实时数据处理中消息,计算和存储的统一

    本文转载自"AI前线",整理自翟佳在 QCon2018 北京站的演讲,在本次演讲中,翟佳介绍了 Apache Pulsar 的架构.特性和其生态系统的组成,并展示了 Apache ...

  6. 如何使用实时计算 Flink 搞定实时数据处理难题?

    简介:如何使用实时计算 Flink 搞定实时数据处理难题?本文由阿里巴巴高级技术专家邓小勇老师分享,从实时计算的历史回顾着手,详细介绍了阿里云实时计算 Flink 的核心优势与应用场景,文章内容主要分 ...

  7. 使用storm 实时计算_使用Storm进行可扩展的实时状态更新

    使用storm 实时计算 在本文中,我将说明如何借助Storm框架以可扩展且无锁定的方式在数据库中维护实时事件驱动流程的当前状态. Storm是基于事件的数据处理引擎. 它的模型依赖于基本原语,例如事 ...

  8. 基于 MaxCompute 的实时数据处理实践

    简介: MaxCompute 通过流式数据高性能写入和秒级别查询能力(查询加速),提供EB级云原生数仓近实时分析能力:高效的实现对变化中的数据进行快速分析及决策辅助.当前Demo基于近实时交互式BI分 ...

  9. 浅析Kafka实时数据处理系统

    Kafka是啥?用Kafka官方的话来说就是: Kafka is used for building real-time data pipelines and streaming apps. It i ...

  10. Flink实时数据处理实践经验(Flink去重、维表关联、定时器、双流join)

    Flink实时数据处理实践经验 文章目录 Flink实时数据处理实践经验 1. 数据输入与预处理 2. 实时数据处理 3. 实时数仓架构 4. 优化方案 Java.大数据开发学习要点(持续更新中-) ...

最新文章

  1. 请问:这里的空应怎么填呀?
  2. C语言-宏定义#define的用法
  3. AndroidVerifyBoot
  4. html里面onclick属性是什么,html中onclick事件属性定义与用法
  5. 【收藏】CentOS 7 安装NFS
  6. 阿里疯传!Python+Tableau+Excel数分教程(附内部资源)
  7. mysql删除记录后id不连续_小水玩转Mysql---Mysql跟踪sql记录
  8. gnome 3.4 评测
  9. 3D游戏引擎的Web化
  10. 在IDEA里jsp项目图片显示不出来(图文解答)
  11. 如何提升自己(一) 谈学习
  12. Excel/WPS做数据透视表,即对变量做交叉汇总(列联表)
  13. Byte学堂:共享单车数据处理原理及分析方法
  14. 大一计算机试题答案,大一计算机期末考试试题及答案
  15. WORD设置从开始页数算总页数
  16. MySQL增删改查常用语句命令
  17. Metasploit技术(一)——Metasploit简介与基础
  18. HDU-1111解题报告
  19. c聊天室系统asp ajax,利用AJAX和ASP.NET实现简单聊天室
  20. C#调用matlab时,类型初始值设定项引发异常,如何解决?

热门文章

  1. python getmenu不到菜单句柄_Python and Menu[编程点滴1]
  2. Win10系统开机后任务栏卡死解决方法
  3. 单片机8255c语言程序,51单片机8255驱动C程序
  4. 第三方支付平台:微信支付接口
  5. 一些服务器常见漏洞的修复方法
  6. 数组元素全排列、组合 C语言代码
  7. 二、8【FPGA】Verilog中锁存器(Latch)原理、危害及避免
  8. IM即时通讯需要解决的问题
  9. IceSword V1.22 Final 冰刃
  10. 腾讯QQ会员中心g_tk32算法【C#版】