相关概念
1、Topologies
一个topology是spouts和bolts组成的图, 通过stream groupings将图中的spouts和bolts连接起来。

2、Streams
消息流stream是storm里的关键抽象。一个消息流是一个没有边界的tuple序列, 而这些tuple序列会以一种分布式的方式并行地创建和处理。通过对stream中tuple序列中每个字段命名来定义stream。在默认的情况下,tuple的字段类型可以是:integer,long,short, byte,string,double,float,boolean和byte array。你也可以自定义类型(只要实现相应的序列化器)。
每个消息流在定义的时候会被分配给一个id,因为单向消息流使用的相当普遍, OutputFieldsDeclarer定义了一些方法让你可以定义一个stream而不用指定这个id。在这种情况下这个stream会分配个值为‘default’默认的id 。
Storm提供的最基本的处理stream的原语是spout和bolt。你可以实现spout和bolt提供的接口来处理你的业务逻辑。

3、Spouts
消息源spout是Storm里面一个topology里面的消息生产者。一般来说消息源会从一个外部源读取数据并且向topology里面发出消息:tuple。Spout可以是可靠的也可以是不可靠的。如果这个tuple没有被storm成功处理,可靠的消息源spouts可以重新发射一个tuple, 但是不可靠的消息源spouts一旦发出一个tuple就不能重发了。
消息源可以发射多条消息流stream。使用OutputFieldsDeclarer.declareStream来定义多个stream,然后使用SpoutOutputCollector来发射指定的stream。
Spout类里面最重要的方法是nextTuple。要么发射一个新的tuple到topology里面或者简单的返回如果已经没有新的tuple。要注意的是nextTuple方法不能阻塞,因为storm在同一个线程上面调用所有消息源spout的方法。
另外两个比较重要的spout方法是ack和fail。storm在检测到一个tuple被整个topology成功处理的时候调用ack,否则调用fail。storm只对可靠的spout调用ack和fail。

4.Bolts
所有的消息处理逻辑被封装在bolts里面。Bolts可以做很多事情:过滤,聚合,查询数据库等等。
Bolts可以简单的做消息流的传递。复杂的消息流处理往往需要很多步骤,从而也就需要经过很多bolts。比如算出一堆图片里面被转发最多的图片就至少需要两步:第一步算出每个图片的转发数量。第二步找出转发最多的前10个图片。(如果要把这个过程做得更具有扩展性那么可能需要更多的步骤)。
Bolts可以发射多条消息流, 使用OutputFieldsDeclarer.declareStream定义stream,使用OutputCollector.emit来选择要发射的stream。
Bolts的主要方法是execute, 它以一个tuple作为输入,bolts使用OutputCollector来发射tuple,bolts必须要为它处理的每一个tuple调用OutputCollector的ack方法,以通知Storm这个tuple被处理完成了,从而通知这个tuple的发射者spouts。 一般的流程是: bolts处理一个输入tuple, 发射0个或者多个tuple, 然后调用ack通知storm自己已经处理过这个tuple了。storm提供了一个IBasicBolt会自动调用ack。

5、Stream groupings
定义一个topology的其中一步是定义每个bolt接收什么样的流作为输入。stream grouping就是用来定义一个stream应该如果分配数据给bolts上面的多个tasks。
Storm里面有7种类型的stream grouping

  1. Shuffle Grouping: 随机分组, 随机派发stream里面的tuple,保证每个bolt接收到的tuple数目大致相同。
  2. Fields Grouping:按字段分组, 比如按userid来分组, 具有同样userid的tuple会被分到相同的Bolts里的一个task, 而不同的userid则会被分配到不同的bolts里的task。
  3. All Grouping:广播发送,对于每一个tuple,所有的bolts都会收到。
  4. Global Grouping:全局分组, 这个tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。
  5. Non Grouping:不分组,这个分组的意思是说stream不关心到底谁会收到它的tuple。目前这种分组和Shuffle grouping是一样的效果, 有一点不同的是storm会把这个bolt放到这个bolt的订阅者同一个线程里面去执行。
  6. Direct Grouping: 直接分组, 这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个task处理这个消息。 只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来获取处理它的消息的task的id (OutputCollector.emit方法也会返回task的id)。
  7. Local or shuffle grouping:如果目标bolt有一个或者多个task在同一个工作进程中,tuple将会被随机发生给这些tasks。否则,和普通的Shuffle Grouping行为一致。

下面建立项目需要的topology

public class AreaAmtTopo {public static void main(String[] args) {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("spout", new OrderBaseSpout(KafkaProperties.Order_topic), 5);builder.setBolt("filter", new AreaFilterBolt() , 5).shuffleGrouping("spout") ;builder.setBolt("areabolt", new AreaAmtBolt() , 2).fieldsGrouping("filter", new Fields("area_id")) ;builder.setBolt("rsltBolt", new AreaRsltBolt(), 1).shuffleGrouping("areabolt");Config conf = new Config() ;conf.setDebug(false);if (args.length > 0) {try {               StormSubmitter.submitTopology(args[0], conf, builder.createTopology());} catch (AlreadyAliveException e) {e.printStackTrace();} catch (InvalidTopologyException e) {e.printStackTrace();}}else {LocalCluster localCluster = new LocalCluster();            localCluster.submitTopology("mytopology", conf, builder.createTopology());}       }
}

下面来看看spout,连接Kafka的topic,把数据发送到bolt

public class OrderBaseSpout implements IRichSpout {String topic = null;public OrderBaseSpout(String topic){this.topic = topic ;}private static final long serialVersionUID = 1L;Integer TaskId = null;SpoutOutputCollector collector = null;Queue<String> queue = new ConcurrentLinkedQueue<String>() ;@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// TODO Auto-generated method stubdeclarer.declare(new Fields("order")) ;}@Overridepublic void nextTuple() {if (queue.size() > 0) {String str = queue.poll() ;collector.emit(new Values(str)) ;}}@Overridepublic void open(Map conf, TopologyContext context,SpoutOutputCollector collector) {// TODO Auto-generated method stubthis.collector = collector ;TaskId = context.getThisTaskId() ;OrderConsumer consumer = new OrderConsumer(topic) ;consumer.start() ;queue = consumer.getQueue() ;}

第一个bolt “AreaFilterBolt”,实现Map功能

public class AreaFilterBolt implements IBasicBolt {@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {// order_id,order_amt,create_time,area_idString order = input.getString(0) ;if (order != null) {String orderArr[] = order.split("\\t") ;collector.emit(new Values(orderArr[3],orderArr[1],DateFmt.getCountDate(orderArr[2], DateFmt.date_short))) ;// ared_id,order_amt,create_time}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("area_id","order_amt","order_date")) ;}}

第二个bolt “AreaAmtBolt”,在内存里保存各区域的sum值,隔天要清零,重新计算.

public class AreaAmtBolt implements IBasicBolt {String today = null;HBaseDAO dao = null;Map <String,Double> countsMap = null ;@Overridepublic void cleanup() {// TODO Auto-generated method stubcountsMap.clear() ;}@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {// area_id,countif (input != null) {String area_id = input.getString(0) ;double order_amt = 0.0;try {order_amt = Double.parseDouble(input.getString(1)) ;} catch (Exception e) {System.out.println(input.getString(1)+":---------------------------------");e.printStackTrace() ;}String order_date = input.getStringByField("order_date") ;if (! order_date.equals(today)) {//跨天处理countsMap.clear() ;}Double count = countsMap.get(order_date+"_"+area_id) ;if (count == null) {count = 0.0 ;}count += order_amt ;countsMap.put(order_date+"_"+area_id, count) ;System.err.println("areaAmtBolt:"+order_date+"_"+area_id+"="+count);collector.emit(new Values(order_date+"_"+area_id,count)) ;}}@Overridepublic void prepare(Map stormConf, TopologyContext context) {// TODO Auto-generated method stubdao = new HBaseDAOImp() ;//根据HBase里初始值进行初始化 countsMaptoday = DateFmt.getCountDate(null, DateFmt.date_short);countsMap = this.initMap(today, dao);for(String key:countsMap.keySet()){System.err.println("key:"+key+"; value:"+countsMap.get(key));}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("date_area","amt")) ;}public Map<String, Double> initMap(String rowKeyDate, HBaseDAO dao){Map <String,Double> countsMap = new HashMap<String, Double>() ;List<Result> list = dao.getRows("area_order", rowKeyDate, new String[]{"order_amt"});for(Result rsResult : list){String rowKey = new String(rsResult.getRow());for(KeyValue keyValue : rsResult.raw()){if("order_amt".equals(new String(keyValue.getQualifier()))){countsMap.put(rowKey, Double.parseDouble(new String(keyValue.getValue()))) ;break;}}}return countsMap;}}

第三个bolt “AreaRsltBolt“,每隔5秒保存数据到HBase

public class AreaRsltBolt implements IBasicBolt {/*** */private static final long serialVersionUID = 1L;Map <String,Double> countsMap = null ;@Overridepublic void cleanup() {// TODO Auto-generated method stub}HBaseDAO dao = null;long beginTime = System.currentTimeMillis() ;long endTime = 0L ;@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {// TODO Auto-generated method stubString date_areaid = input.getString(0) ;double order_amt = input.getDouble(1) ;countsMap.put(date_areaid, order_amt) ;endTime = System.currentTimeMillis() ;if (endTime - beginTime >= 5 * 1000) {for(String key : countsMap.keySet()){// put into hbase// 2014-05-05_1,amtdao.insert("area_order", key, "cf", "order_amt", countsMap.get(key)+"") ;System.err.println("rsltBolt put hbase: key="+key+"; order_amt="+countsMap.get(key));}beginTime = System.currentTimeMillis() ;}}@Overridepublic void prepare(Map stormConf, TopologyContext context) {// TODO Auto-generated method stubdao = new HBaseDAOImp() ;countsMap = new HashMap<String, Double>() ;}}

Kafka+Storm+HBase项目Demo(5)--topology,spout,bolt使用相关推荐

  1. nodejs+kafka+storm+hbase 开发

    1.环境介绍 如图所示,NODEJS做为数据源的的产生者产生消息,发到Kafka队列,然后参见红线,表示本地开发的环境下数据的流向(本地开发时,storm topology运行在本地模式) 2.搭建环 ...

  2. HBase项目实战:HBase+Flume+Kafka+Hive+SSM实现电信大数据通话信息实时读写定位系统

    内容简介 一.项目内容深度分析 1. 项目内容概览 2.数据的大致流向分析 3. 涉及的知识难点分析 二.项目所用到的框架清单 三.项目实战代码 1. 后端开发 1. 构建工程项目模块 2.开发通话记 ...

  3. Flume+Kafka+Storm+Redis构建大数据实时处理系统:实时统计网站PV、UV+展示

    http://blog.51cto.com/xpleaf/2104160?cid=704690 1 大数据处理的常用方法 前面在我的另一篇文章中<大数据采集.清洗.处理:使用MapReduce进 ...

  4. Kafka+Storm+HDFS整合实践

    2019独角兽企业重金招聘Python工程师标准>>> 在基于Hadoop平台的很多应用场景中,我们需要对数据进行离线和实时分析,离线分析可以很容易地借助于Hive来实现统计分析,但 ...

  5. kfaka storm写入mysql_flume+kafka+storm+mysql架构设计

    序言 前段时间学习了storm,最近刚开blog,就把这些资料放上来供大家参考.这个框架用的组件基本都是最新稳定版本,flume-ng1.4+kafka0.8+storm0.9+mysql如果有需要测 ...

  6. mysql storm_flume+kafka+storm+mysql架构设计

    前段时间学习了storm,最近刚开blog,就把这些资料放上来供大家参考. 这个框架用的组件基本都是最新稳定版本,flume-ng1.4+kafka0.8+storm0.9+mysql(项目是mave ...

  7. Storm精华问答 | 如何理解spout/bolt的生命周期?

    戳蓝字"CSDN云计算"关注我们哦! Storm是Twitter开源的分布式实时大数据处理框架,被业界称为实时版Hadoop.随着越来越多的场景对Hadoop的MapReduce高 ...

  8. Storm的WordCount案例(spout bolt详细总结)

    spout介绍 一个spout是由流组成的数据源在storm的拓扑里,通常情况下会读取外部的数据源  然后emit(发射)到拓扑里面,比如是kafka,MySQL或者redis等等,Spout有两种实 ...

  9. 【十三】景区人流量统计:python日志生成+logstash+kafka+storm+mysql+springBoot+高德地图

    storm+kafka+logstash+springBoot+高德地图 项目概述: 作用:交通信息化,智慧城市 需求:实时统计人流量并通过热力图展示. 类似于腾讯热力图的景区人流量统计 如何采集某个 ...

最新文章

  1. 彭旭老师《项目管理中的领导力与团队建设》
  2. R语言广义线性模型函数GLM、R中有几种logistic回归扩展和变异、robust包中的glmRob函数鲁棒logistic回归、ms包中的lrm函数拟合序数逻辑回归
  3. mysql 9.0创建数据库_数据库基础学习——MySQL数据库知识小结(9)
  4. BZOJ-3289-Mato的文件管理-莫队+树状数组
  5. 使用Hot Chocolate创建ASP.NET Core GraphQL服务
  6. IBM 确认裁员约 1700 人;华为新款操作系统来了!开通 5G 服务不换卡不换号 | 极客头条...
  7. Windows10安装IIS服务器
  8. 上下文无关文法的组成
  9. 五金模具设计统赢外挂提升效率技巧、外挂模具设计流程的问题归纳
  10. 【MD5加密算法能被破解么?】
  11. 基础的重要性(程序员之路)
  12. c++中类的private的static变量实现类对象的数据共享
  13. tableau 集动作_举个栗子!Tableau技巧(59):学做两个集合的维恩图(文氏图)Venn diagram...
  14. 常成员函数函数和返回值为常量的函数
  15. iview table 导出csv文件错行问题
  16. VB获得迅雷资讯弹出网页的源代码
  17. 最全的TypeScript学习指南
  18. STM32夺命100问,你知道几个?
  19. Datatable 转换成Json
  20. 电镜三维重构中多构象的辨别方法

热门文章

  1. 高性能mysql学习笔记--索引
  2. HX711 24位A/D模块计算公式
  3. 2.4.1 HMC连接状态
  4. 微信公众号涨粉技巧与微信公众号掉粉取关的原因
  5. 描述计算机主机,上海交大计算机第一次作业
  6. 马云说聪明的人都离开了阿里,剩下的成了富翁
  7. 国信证券学习系列(2)
  8. Android开发之漫漫长途 XII——Fragment详解
  9. Python爬虫模拟登录京东获取个人信息
  10. nvm You do not have sufficient privilege to perform this operation.