本文来自网易云社区

作者:汪建伟

  • 举个栗子

1 实现的目标

设计一个系统,来实现对一个文本里面的单词出现的频率进行统计。

2 设计Topology结构:

这是一个简单的例子,topology也非常简单。整个topology如下:

整个topology分为三个部分:

WordReader:数据源,负责发送sentence

WordNormalizer:负责将sentence切分

Wordcounter:负责对单词的频率进行累加

3 代码实现

1. 构建maven环境,添加storm依赖

    <repositories>  <!-- Repository where we can found the storm dependencies  -->  <repository>  <id>clojars.org</id>  <url>http://clojars.org/repo</url>  </repository>  </repositories>  <dependencies>  <dependency>   <groupId>storm</groupId>  <artifactId>storm</artifactId>  <version>0.7.1</version>  </dependency>  </dependencies>复制代码

2. 定义Topology

  public class TopologyMain {  public static void main(String[] args) throws InterruptedException {  //Topology definition  TopologyBuilder builder = new TopologyBuilder();  builder.setSpout("word-reader",new WordReader());  builder.setBolt("word-normalizer", new WordNormalizer())  .shuffleGrouping("word-reader");  builder.setBolt("word-counter", new WordCounter(),1)  .fieldsGrouping("word-normalizer", new Fields("word"));  //Configuration  Config conf = new Config();  conf.put("wordsFile", args[0]);  conf.setDebug(false);  //Topology run  conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);  LocalCluster cluster = new LocalCluster();  cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());  Thread.sleep(1000);  cluster.shutdown();  }  }复制代码

3. 实现WordReader Spout

    public class WordReader extends BaseRichSpout {  private SpoutOutputCollector collector;  private FileReader fileReader;  private boolean completed = false;  public void ack(Object msgId) {  System.out.println("OK:"+msgId);  }  public void close() {}  public void fail(Object msgId) {  System.out.println("FAIL:"+msgId);  }  public void nextTuple() {  if(completed){  try {  Thread.sleep(1000);  } catch (InterruptedException e) {  }  return;  }  String str;  BufferedReader reader = new BufferedReader(fileReader);  try{  while((str = reader.readLine()) != null){  this.collector.emit(new Values(str),str);  }  }catch(Exception e){  throw new RuntimeException("Error reading tuple",e);  }finally{  completed = true;  }  }  public void open(Map conf, TopologyContext context,  SpoutOutputCollector collector) {  try {  this.fileReader = new FileReader(conf.get("wordsFile").toString());  } catch (FileNotFoundException e) {  throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");  }  this.collector = collector;  }  public void declareOutputFields(OutputFieldsDeclarer declarer) {  declarer.declare(new Fields("line"));  }  }复制代码

第一个被调用的spout方法都是public void open(Map conf, TopologyContext context, SpoutOutputCollector collector)。它接收如下参数:配置对象,在定义topology对象是创建;TopologyContext对象,包含所有拓扑数据;还有SpoutOutputCollector对象,它能让我们发布交给bolts处理的数据。

4. 实现WordNormalizer bolt

  public class WordNormalizer extends BaseBasicBolt {  public void cleanup() {}  public void execute(Tuple input, BasicOutputCollector collector) {  String sentence = input.getString(0);  String[] words = sentence.split(" ");  for(String word : words){  word = word.trim();  if(!word.isEmpty()){  word = word.toLowerCase();  collector.emit(new Values(word));  }  }  }  public void declareOutputFields(OutputFieldsDeclarer declarer) {  declarer.declare(new Fields("word"));  }  }复制代码

bolt最重要的方法是void execute(Tuple input),每次接收到元组时都会被调用一次,还会再发布若干个元组。

5. 实现WordCounter bolt

   public class WordCounter extends BaseBasicBolt {  Integer id;  String name;  Map counters;  @Override  public void cleanup() {  System.out.println("-- Word Counter ["+name+"-"+id+"] --");  for(Map.Entry entry : counters.entrySet()){  System.out.println(entry.getKey()+": "+entry.getValue());  }  }  @Override  public void prepare(Map stormConf, TopologyContext context) {  this.counters = new HashMap();  this.name = context.getThisComponentId();  this.id = context.getThisTaskId();  }  @Override  public void declareOutputFields(OutputFieldsDeclarer declarer) {}  @Override  public void execute(Tuple input, BasicOutputCollector collector) {  String str = input.getString(0);  if(!counters.containsKey(str)){  counters.put(str, 1);  }else{  Integer c = counters.get(str) + 1;  counters.put(str, c);  }  }  }复制代码

6. 使用本地模式运行Topology

在这个目录下面创建一个文件,/src/main/resources/words.txt,一个单词一行,然后用下面的命令运行这个拓扑:mvn exec:java -Dexec.main -Dexec.args=”src/main/resources/words.txt”。

如果你的words.txt文件有如下内容: Storm test are great is an Storm simple application but very powerful really Storm is great 你应该会在日志中看到类似下面的内容: is: 2 application: 1 but: 1 great: 1 test: 1 simple: 1 Storm: 3 really: 1 are: 1 great: 1 an: 1 powerful: 1 very: 1 在这个例子中,每类节点只有一个实例。

  • 附-Storm记录级容错的基本原理

首先来看一下什么叫做记录级容错?storm允许用户在spout中发射一个新的源tuple时为其指定一个message id, 这个message id可以是任意的object对象。多个源tuple可以共用一个message id,表示这多个源 tuple对用户来说是同一个消息单元。storm中记录级容错的意思是说,storm会告知用户每一个消息单元是否在指定时间内被完全处理了。那什么叫做完全处理呢,就是该message id绑定的源tuple及由该源tuple后续生成的tuple经过了topology中每一个应该到达的bolt的处理。举个例子。在图4-1中,在spout由message 1绑定的tuple1和tuple2经过了bolt1和bolt2的处理生成两个新的tuple,并最终都流向了bolt3。当这个过程完成处理完时,称message 1被完全处理了。

在storm的topology中有一个系统级组件,叫做acker。这个acker的任务就是追踪从spout中流出来的每一个message id绑定的若干tuple的处理路径,如果在用户设置的最大超时时间内这些tuple没有被完全处理,那么acker就会告知spout该消息处理失败了,相反则会告知spout该消息处理成功了。在刚才的描述中,我们提到了”记录tuple的处理路径”,如果曾经尝试过这么做的同学可以仔细地思考一下这件事的复杂程度。但是storm中却是使用了一种非常巧妙的方法做到了。在说明这个方法之前,我们来复习一个数学定理。

A xor A = 0.

A xor B…xor B xor A = 0,其中每一个操作数出现且仅出现两次。

storm中使用的巧妙方法就是基于这个定理。具体过程是这样的:在spout中系统会为用户指定的message id生成一个对应的64位整数,作为一个root id。root id会传递给acker及后续的bolt作为该消息单元的唯一标识。同时无论是spout还是bolt每次新生成一个tuple的时候,都会赋予该tuple一个64位的整数的id。Spout发射完某个message id对应的源tuple之后,会告知acker自己发射的root id及生成的那些源tuple的id。而bolt呢,每次接受到一个输入tuple处理完之后,也会告知acker自己处理的输入tuple的id及新生成的那些tuple的id。Acker只需要对这些id做一个简单的异或运算,就能判断出该root id对应的消息单元是否处理完成了。下面通过一个图示来说明这个过程。

上图 spout中绑定message 1生成了两个源tuple,id分别是0010和1011.

上图 bolt1处理tuple 0010时生成了一个新的tuple,id为0110.

上图 bolt2处理tuple 1011时生成了一个新的tuple,id为0111.

上图 bolt3中接收到tuple 0110和tuple 0111,没有生成新的tuple.

容错过程存在一个可能出错的地方,那就是,如果生成的tuple id并不是完全各异的,acker可能会在消息单元完全处理完成之前就错误的计算为0。这个错误在理论上的确是存在的,但是在实际中其概率是极低极低的,完全可以忽略。

相关阅读:流式处理框架storm浅析(上篇)

网易云免费体验馆,0成本体验20+款云产品!

更多网易研发、产品、运营经验分享请访问网易云社区。

相关文章:
【推荐】 3分钟带你了解负载均衡服务
【推荐】 非对称加密与证书(上篇)
【推荐】 手游破解手段介绍及易盾保护方案

流式处理框架storm浅析(下篇)相关推荐

  1. 分布式流式计算框架Storm

    Storm用于实时处理,就好比 Hadoop 用于批处理.         --> 离线计算:批量获取数据,批量传输数据,周期性比量计算数据,数据展示(Sqoop-->HDFS--> ...

  2. Storm精华问答 | 最火的流式处理框架——Storm

    戳蓝字"CSDN云计算"关注我们哦! Storm是Twitter开源的分布式实时大数据处理框架,被业界称为实时版Hadoop.  今天就为大家带来Storm诞生到发展再到实践,赶快 ...

  3. 流式计算框架Storm 编程案例部署Linux结果演示及pom依赖

    使用maven方式创建storm项目: <?xml version="1.0" encoding="UTF-8"?> <project xml ...

  4. 流式计算框架Storm编程案例:实时给手机品牌转大写并加上时间戳后缀代码示例

    导入jar包,保险起见,直接从storm安装目录拷贝,maven方式可能会因版本问题出现纰漏. 结果演示:

  5. Intellij IDEA 导入或运行流式处理框架storm以及java.lang.NoClassDefFoundError报错的解决方案

    网上有很多使用eclipse编辑storm的,但是我觉得eclipse界面不太友好,于是毅然使用intellij IDEA来编辑 但是直接无脑导入会有各种各样奇葩的报错,于是我在解决了问题之后与大家分 ...

  6. 流式计算框架Storm网站访问来源实时统计及存储到redis代码示例

  7. 流式计算框架Storm后台启动命令(避免新开窗口)

    ​​

  8. python 流式计算框架_流式计算的三种框架:Storm、Spark和Flink

    我们知道,大数据的计算模式主要分为批量计算(batch computing).流式计算(stream computing).交互计算(interactive computing).图计算(graph ...

  9. Storm:最火的流式处理框架

    Storm:最火的流式处理框架 伴随着信息科技日新月异的发展,信息呈现出爆发式的膨胀,人们获取信息的途径也更加多样.更加便捷,同时对于信息的时效性要求也越来越高.举个搜索场景中的例子,当一个卖家发布了 ...

最新文章

  1. [ZJOI2019]线段树
  2. react 调用组件方法_React源码分析1 — 组件和对象的创建(createClass,createElement)...
  3. spark 集群单词统计_最近Kafka这么火,聊一聊Kafka:Kafka与Spark的集成
  4. BeanDefinition构建示例
  5. Android之6.0上的重要变化(一)
  6. cmake字符串转数组_JS 数组中你或许不知道的操作
  7. cookie不正确 请重新登录_小学多陪,初中少陪,高中不陪!小学6年陪孩子写作业的正确方式,请家长收下!...
  8. 转-python面试题目集锦(100道部分附答案)
  9. 弥散阴影html,三步制作出这种精美弥散阴影
  10. 文字处理技术:形状绕排的难点
  11. Excel高级应用高频使用函数汇总
  12. 车载双目摄像头,为什么特斯拉还在迟疑?
  13. 使用LSTM生成序列、自动问答使用?分割即可!
  14. 测试过程中印象最深刻的bug?| 万能回答必杀技
  15. 概率逗号分号_概率里面的逗号
  16. 2019.6.24 校内测试 NOIP模拟 Day 2 分析+题解
  17. python里面的pip是什么意思_为什么您应该使用`python -m pip`
  18. IP67 | IP58 防护等级理解
  19. DataWhale_Matplotlib_艺术画笔见乾坤
  20. 找不到com.mchange.v2.c3p0.ComboPooledDataSource

热门文章

  1. python需要php吗-C、C+、Java、PHP、Python分别用来开发什么?
  2. python发明者叫什么-近 50 年来最具影响力的 10 种编程语言,都是谁发明的?
  3. opencv、matplotlib、pillow和pytorch读取数据的通道顺序
  4. LeetCode Number of Islands(flood fill)
  5. libqrencode生成二维码图片的问题
  6. Linux虚拟地址空间布局
  7. jquery-migrate.js
  8. Python网络爬虫与信息提取(三)(正则表达式的基础语法)
  9. python json数据的转换
  10. [BZOJ1068/Luogu2470][SCOI2007]压缩