1. 基础组件及其API

  • storm中有关spout类的层次

在本例中,spout基于github的API监控某指定项目仓库的动态,并将变动情况发射为元组,每个元组包含针对该仓库的全部提交消息。紧接着,spout类文件的changelog.txt文件包含了所期望格式的提交消息,如下所示:

代码实现:

import org.apache.logging.log4j.core.util.IOUtils;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.spout.SpoutOutputCollector; //发射元组
import org.apache.storm.topology.OutputFieldsDeclarer; // 为spout发射的所有元组定义字段命名
import org.apache.storm.tuple.Fields;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Values;import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;
import java.lang.String;
import java.util.Map;public class CommitFeedListener extends BaseRichSpout {private SpoutOutputCollector outputCollector; // 负责发射元组或者让元组失效private List<String> commits; //从 changelog.txt文件中读取提交的消息列表// 为spout发射的所有元组定义字段命名public void declareOutputFields(OutputFieldsDeclarer declarer){// Field类构造函数中的命名顺序,必须与发射元组中值的顺序匹配,而发射元值的顺序由Value类来定义declarer.declare(new Fields("commit")); // spout发射一个命名为commit的字段}// storm准备运行spout时候调用,连接数据源public void open(Map configMap,TopologyContext context,SpoutOutputCollector outputCollector){this.outputCollector = outputCollector;try{ // 读取changelog.txt到Listcommits = org.apache.storm.shade.org.apache.commons.io.IOUtils.readLines(ClassLoader.getSystemResourceAsStream("changelog.txt"),Charset.defaultCharset().name());} catch(IOException e) {throw new RuntimeException(e);}}public void nextTuple(){ // 当spout读取下一个元组时,由storm定时调用,数据源准备好一个完整的数据之后才会触发for (String commit:commits) {// 为每个提交消息发射一个元组outputCollector.emit(new Values(commit));}}
}
  • storm中有关bolt的类层次

这里我们需要用到两个bolt,可以直接继承BaseBasicBolt,一个负责从元组中接受完整的提交消息,并提取提交Github代码用户的email地址,然后发射包含email地址的元组;另一个bolt维护一个内存映射表,并在映射表中更新用户提交的次数。

代码实现:

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;public class EmailExtractor extends BaseBasicBolt{//public void declareOutputFields(OutputFieldsDeclarer declearer){// 用于声明bolt发射的元组中字段的命名为emaildeclearer.declare(new Fields("email"));}// 当一个tuple被发射到该bolt上时被调用public void execute(Tuple tuple,BasicOutputCollector outputCollector) {// 获取字段为commit的值String commit = tuple.getStringByField("commit");String[] parts = commit.split(" ");// 发射一个字段为email的新元组outputCollector.emit(new Values(parts[1]));}
}
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;import java.util.HashMap;
import java.util.Map;public class EmailCounter extends BaseBasicBolt{private Map<String, Integer> counts;public void declareOutputFields(OutputFieldsDeclarer declarer){// does not emit anything}private Integer countFor(String email){Integer count = counts.get(email);return count == null? 0:count;}private void printCounts(){for(String email:counts.keySet()){System.out.println(String.format("s% has count of %s", email, counts.get(email)));}}// 获取字段为email的值public void execute(Tuple tuple,BasicOutputCollector outputCollector){String email = tuple.getStringByField("email");counts.put(email, countFor(email) + 1);printCounts();}// storm在bolt准备运行时调用public void prepare(Map config,TopologyContext context){counts = new HashMap<String, Integer>();}
}

2. Storm实现

完成spout和bolt部分,我们需要告诉storm数据流的位置以及每个流的分组策略,并构建spout-bolt的拓扑计算图。作为统筹的环节,需要完成三项工作:

  • 构建拓扑计算图,并告诉storm数据流的位置,以及指明每个数据流的流分组策略
  • 创建配置,建议打开日志
  • 生成拓扑,连同配置提交到storm集群,最后kill并关闭

示例代码:

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.Utils;public class LocalTopologyRunner {private static final int TEN_MINUTES = 600000;// 构建拓扑计算图,将spout-bolt链接起来// 本地运行拓扑的mainpublic static void main (String[] args){TopologyBuilder builder = new TopologyBuilder();// 添加spout到拓扑图,并指定idbuilder.setSpout("commit-feed-listener", new CommitFeedListener());// 添加listener-bolt到拓扑图,并链接commit-spoutbuilder.setBolt("email-extractor", new EmailExtractor()).shuffleGrouping("commit-feed-listener");// 添加count-bolt到拓扑图,并链接extractoe-boltbuilder.setBolt("email-counter", new EmailCounter()).shuffleGrouping("email-extractor");// 拓扑层的配置类,为了保持调试,开启了debugConfig config = new Config();config.setDebug(true);// 创建拓扑计算图StormTopology topology = builder.createTopology();// 定义本地集群LocalCluster cluster = new LocalCluster();// 提交拓扑计算图,并配置到本地集群cluster.submitTopology("github-commit-count", config, topology);Utils.sleep(TEN_MINUTES);cluster.killTopology("github-commit-count");cluster.shutdown();}
}

3. 小结

  • 一个拓扑计算图是一个结点图集,图中每个节点代表一个进程或者计算处理,每条边代表上个节点计算的输出,下个结点计算的输入
  • 元组是一个有序地数值序列,其中每个数值都被赋予一个命名
  • spout是数据流的源头,目的就是为了从数据源读取数据,并且发射元组作为输出流到数据流中
  • bolt是实现核心业务逻辑的地方,执行过滤/聚合/连接或者数据库交互等操作
  • spout/bolt都可以执行一个或者多个实例,这个是线程控制的
  • main中需要完成统筹的工作,包括添加节点到拓扑图,配置拓扑图,创建拓扑图,最后将拓扑图提交到storm集群
  • 所有的结点都有id唯一识别,所有的元组都有自己的字段名,并且结点上声明的Fieds类型的字段名必须与同节点发射的Value类型字段名相同

Sorm进阶(1):storm实现github提交数监控看板相关推荐

  1. Linux 发布全新 6.0 版; 谷歌超微软开源贡献第一; GitHub 遭数万恶意攻击 | 开源月报 Vol.9...

    「WeOpen Insight」是腾源会推出的「开源趋势与开源洞见」内容专栏,不定期为读者呈现开源圈内的第一手快讯.优质工具盘点等,洞察开源技术发展的风向标,预见未来趋势. 1 开源企业新闻 1.涉嫌 ...

  2. linux手机刷机包制作工具_刷GitHub提交记录工具制作

    跳转至专题目录 专题推荐文章: localPosition与anchoredPosition转化 unity Scene View扩展之编辑器扩展总结 又到了一个坑,下个坑还没想好怎么挖的环节. 就随 ...

  3. 从0开始学习GitHub系列之「向GitHub 提交代码」

    DevStore首页 >文章 >文章详情 从0开始学习GitHub系列之「向GitHub 提交代码」 糖果果| 2016-06-15 10:57    浏览量(500)    评论(1) ...

  4. Vue.js示例:GitHub提交(watch数据,created钩子,filters过滤); 网格组件(功能:1.检索,2排序);...

    GitHub提交 codePen:   https://codepen.io/chentianwei411/pen/wEVPZo 注意:频繁看案例,可能会被限制. 重点: 表单输入绑定, 单选按钮的使 ...

  5. 【错误记录】GitHub 提交报错 ( OpenSSL SSL_connect: SSL_ERROR_SYSCALL in connection to github.com:443 )

    文章目录 一.报错信息 二.解决方案 一.报错信息 GitHub 提交报错 : 16:37:19.781: [ClassLoader_Demo] git -c credential.helper= - ...

  6. 【错误记录】Android Studio 向 GitHub 提交代码报错 ( Push failed: Failed with error: Could not read | 使用命令行提交代码 )

    文章目录 一.报错信息 二.解决方案 一.报错信息 在 Android Studio 中首次向 GitHub 提交代码 , 报错 : Push failed: Failed with error: C ...

  7. 【错误记录】Android Studio 向 GitHub 提交代码报错 ( Push failed: Failed with error: Could not read from remote )

    文章目录 一.报错信息 二.解决方案 一.报错信息 在 Android Studio 中首次向 GitHub 提交代码 , 报错 : Push failed: Failed with error: C ...

  8. 【开发环境】PyCharm 配置 GitHub ( 在 PyCharm 中向 GitHub 提交代码 )

    文章目录 一.PyCharm 配置 GitHub 二.在 PyCharm 中向 GitHub 提交代码 一.PyCharm 配置 GitHub 选择 " 菜单栏 / File / Setti ...

  9. 【OpenGL】五、Visual Studio 2019 配置 GitHub ( 提交代码 )

    文章目录 一.源代码修改及标识 二.向 GitHub 提交代码 三.查看提交结果 一.源代码修改及标识 以 OpenGL.cpp 为例 , 没有更改的代码 , 前面都有一把蓝色的锁 , 在代码中添加一 ...

最新文章

  1. Swift String字符串版本更新特性
  2. linux php 版本切换,linux更换PHP版本,多个PHP版本切换
  3. 瑞士轮(洛谷-P1309)
  4. 当有多个设备online时,命令行窗口通过adb连接指定设备方法
  5. Java自学和培训的区别
  6. 如何让自己的CS水平更进一步?(二)了解武器
  7. 共享文件夹没有权限访问
  8. fullcalendar 课程表 js 插件 日程安排操作 js
  9. 购物车中选择物品结算功能的实现
  10. FC炸弹人 java源码下载
  11. python html 补全标签_补充:HTML标签和CSS
  12. Revit建模中如何快速画好幕墙?
  13. img不拉伸图片 对图片保留原始比例
  14. 读英语计算机书籍读后感,英文书读后感范文(精选4篇)
  15. 类似LINUX上的ldd,MAC使用otool查看库的链接库
  16. velocity插件安装
  17. 点击a标签改变链接字体颜色,点击其他标签,返回默认字体颜色
  18. win10 破解远程桌面用户连接数限制
  19. ASP(Attentive Statistics Pooling for Deep Speaker Embedding)
  20. Python+Vue计算机毕业设计基于web的智慧养老平台8w982(程序+LW+源码+部署)

热门文章

  1. dependencies与devDependencies之间的区别
  2. Javascript模块化编程require.js的用法
  3. 爱上MVC3系列~同步与异步提交,在过滤器里如何进行重定向~续
  4. hdu 5481(数学期望+区间合并)
  5. NYOJ 150 Train Problem I STL栈
  6. vue-cli3项目优化首页加载过慢的一些心得
  7. Linux 磁盘分区、格式化、目录挂载
  8. 【bzoj1486】【[HNOI2009]梦幻布丁】启发式链表合并(详解)
  9. Dictionary To Dynamic
  10. C# App.config全攻略