[align=center][size=large]Trident[/size][/align]

一、Storm 保证性

1.数据一定会发送
通过 ack / fail 方法确认,若失败,则提供重新发送的机制

2.数据一定只会统计一次
数据发送后有一个唯一性的标识,通过判断此标识,若存在,则不处理

3.数据一定会按照顺序进行处理
数据发送后有一个唯一性的标识,按照标识编号的顺序进行处理

二、Storm 保证性实现

1.逐个发送,逐个处理

如果这样处理,则原有的并行处理会变成穿行处理,不可取

2.批量发送,批量处理

如果这样处理,则如果当前这批数据处理完毕但未发送,则无法处理下一批数据,且这一批数据之间的处理顺序是并发的在进行的

3.分成两个步骤
一个处理数据,一个发送数据;
数据处理完毕,则继续处理下一批数据;数据是否发送到下一个缓解,由发送数据的步骤决定
采用此方式

三、Trident

1.Spout

package com.study.storm.trident.wordcount;

import java.util.Map;

import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;

/** * @description * 数据来源 * 模拟批量数据发送 * <br/> * @remark * Storm 的保证及实现 * 1.数据一定被发送 * 通过 ack() 、 fail() 的确认机制,若发送失败,则重新发送 * 2.数据只被处理一次 * 数据发送时带有唯一的编号,判断此编号是否被处理过,若是,则忽略,不处理 * 3.数据被按照一定的顺序处理 * 数据发送时带有唯一的编号,按照编号的顺序进行处理,若数据不是按照顺序到达,则等待 *  * <br/> *  * Trident 处理批量数据 *  */public class SentenceSpout extends BaseRichSpout {

  /**    *     */   private static final long serialVersionUID = 2122598284858356171L;

 private SpoutOutputCollector collector = null ;

    /**    * 模拟批量数据发送    * key : name      * value : sentence    */   private Values [] valuesArray = new Values[] {           new Values("a","111111111111"),           new Values("b","222222222222"),           new Values("c","333333333333"),           new Values("d","444444444444"),           new Values("e","555555555555"),           new Values("f","666666666666"),           new Values("g","777777777777"),           new Values("h","888888888888")    };

  @SuppressWarnings("rawtypes")  @Override    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {     this.collector = collector ; }

   // 发送的顺序,即数据组合的下标,标识数据发送到哪个位置   private int index = 0 ;

    @Override    public void nextTuple() {

       if(index >= valuesArray.length){          return ;      }     index = index == valuesArray.length ? 0 : index++ ;      this.collector.emit(valuesArray[index]);  }

   @Override    public void declareOutputFields(OutputFieldsDeclarer declarer) {      declarer.declare(new Fields("name","sentence"));  }

}

简化实现

package com.study.storm.trident.wordcount;

import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.generated.StormTopology;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.utils.Utils;import storm.trident.Stream;import storm.trident.TridentTopology;import storm.trident.testing.FixedBatchSpout;

public class TridentTopologyDemo {

   public static void main(String[] args) {

        // 相当于原有的 Spout 实现        @SuppressWarnings("unchecked")     FixedBatchSpout tridentSpout = new FixedBatchSpout(new Fields("name","sentence"),                1,                new Values("a","111111111111"),               new Values("b","222222222222"),               new Values("c","333333333333"),               new Values("d","444444444444"),               new Values("e","555555555555"),               new Values("f","666666666666"),               new Values("g","777777777777"),               new Values("h","888888888888"));      // 是否循环发送,false 不      tridentSpout.setCycle(false);

       TridentTopology topology = new TridentTopology();        /**        *  1.本地过滤器设置      */       // 设置数据源      Stream initStream = topology.newStream("tridentSpout", tridentSpout);      // 设置过滤器  -- 过滤name : d 的数据       initStream = initStream.each(new Fields("name"),new RemovePartDataFilter());       // 添加函数,输出字母对应的位置      initStream = initStream.each(new Fields("name"),new NameIndexFunction() ,new Fields("indexNum"));

      // 设置过滤器  -- 拦截数据并打印      Stream filterPrintStream = initStream.each(new Fields("name","sentence"), new PrintFilter());

        //--提交Topology给集群运行       Config conf = new Config();      LocalCluster cluster = new LocalCluster();       cluster.submitTopology("MyTopology", conf, topology.build());

     //--运行10秒钟后杀死Topology关闭集群     Utils.sleep(1000 * 10);       cluster.killTopology("MyTopology");     cluster.shutdown();   }

}

package com.study.storm.trident.wordcount;

import java.util.Iterator;

import backtype.storm.tuple.Fields;import storm.trident.operation.BaseFilter;import storm.trident.tuple.TridentTuple;

/** * @description  * 打印:key 与 value ,fields 与  fields 对应传输的内容 */public class PrintFilter extends BaseFilter {

 /**    *     */   private static final long serialVersionUID = 4393484291178519442L;

 @Override    public boolean isKeep(TridentTuple tuple) {       Fields fields = tuple.getFields();       Iterator<String> iterator = fields.iterator();     while(iterator.hasNext()){            String key = iterator.next();            Object valueByField = tuple.getValueByField(key);            System.out.println("fields : "+ key + " values : "+valueByField);      }

       return true;  }

}

package com.study.storm.trident.wordcount;

import storm.trident.operation.BaseFilter;import storm.trident.tuple.TridentTuple;

/** * 过滤name = d 的数据 * return false 过滤 * return true  继续传递 */public class RemovePartDataFilter extends BaseFilter {

   /**    *     */   private static final long serialVersionUID = 8639858690618579558L;

 @Override    public boolean isKeep(TridentTuple tuple) {       String stringByField = tuple.getStringByField("name");     return !stringByField.equals("d");  }

}

package com.study.storm.trident.wordcount;

import java.util.HashMap;import java.util.Map;

import backtype.storm.tuple.Values;import storm.trident.operation.BaseFunction;import storm.trident.operation.TridentCollector;import storm.trident.tuple.TridentTuple;

public class NameIndexFunction extends BaseFunction {

   /**    *     */   private static final long serialVersionUID = 9085021905838331812L;

 static Map<String,Integer> indexMap = new HashMap<String,Integer>(); static {      indexMap.put("a", 1);       indexMap.put("b", 2);       indexMap.put("c", 3);       indexMap.put("d", 4);       indexMap.put("e", 5);       indexMap.put("f", 6);       indexMap.put("g", 7);       indexMap.put("h", 8);       indexMap.put("i", 9);   }

   @Override    public void execute(TridentTuple tuple, TridentCollector collector) {     String name = tuple.getStringByField("name");      collector.emit(new Values(indexMap.get(name)));   }

}

Storm - Trident相关推荐

  1. Storm Trident API

    在Storm Trident中有五种操作类型 Apply Locally:本地操作,所有操作应用在本地节点数据上,不会产生网络传输 Repartitioning:数据流重定向,单纯的改变数据流向,不会 ...

  2. Storm Trident拓扑中的错误处理

    这篇文章总结了我在设计Storm Trident拓扑时当前的错误处理方法. 我在这里关注代码设计,而不是监督或冗余之类的部署良好实践. 由于Storm的实时流性质,当面对大多数错误时,我们最终将不得不 ...

  3. Storm Trident示例shuffleparallelismHint

    本例包括Storm Trident中shuffle与parallelismHint的使用. 代码当中包括注释 maven <dependency><groupId>org.ap ...

  4. Storm Trident简介

    转载自:[翻译][Trident] Storm Trident 教程 英文原址:https://github.com/nathanmarz/storm/wiki/Trident-tutorial -- ...

  5. [Trident] Storm Trident 教程,state详解、trident api详解及实例

    英文原址:https://github.com/nathanmarz/storm/wiki/Trident-tutorial ---------------- Trident是在storm基础上,一个 ...

  6. storm trident mysql_Trident-MySQL

    使用事物TridentTopology 持久化数据到MySQL1.构建拓扑JDBCTopology类packagestorm.trident.mysql;importjava.util.Arrays; ...

  7. Storm Trident 详细介绍

    一.概要 1.1 Storm(简介)      Storm是一个实时的可靠地分布式流计算框架.      具体就不多说了,举个例子,它的一个典型的大数据实时计算应用场景:从Kafka消息队列读取消息( ...

  8. storm trident

    Trident是在storm基础上,一个以实时计算为目标的高度抽象. 它在提供处理大吞吐量数据能力(每秒百万次消息)的同时,也提供了低延时分布式查询和有状态流式处理的能力. 如果你对Pig和Casca ...

  9. storm trident mysql,storm_Trident

    简介: Trident 是 Storm 的一种高度抽象的实时计算模型,它可以将高吞吐量(每秒百万级)数据输入.有状态的流式处理与低延时的分布式查询无缝结合起来.如果你了解 Pig 或者 Cascadi ...

最新文章

  1. mobilefacenet
  2. python读取excel一列-Python从Excel中读取日期一列的方法
  3. 计算机操作系统课后题答案第三章,计算机操作系统教程习题与实验指导(第3版)...
  4. json 在后天怎么接_长相显老怎么办?以同岁的马伊琍和刘敏涛为例,解析显年轻的技巧...
  5. 【EOJ Monthly 2019.02 - F】方差(数学,前缀和,积的前缀和)
  6. my40_MySQL锁概述之意向锁
  7. AOE网与关键路径简介
  8. [AWDwR4] No JQuery call matches [:html, #cart]
  9. 32. My Experiences in the Factories 我在工厂的经历
  10. css中的clip:rect() 只能在绝对定位的元素上使用
  11. Windows编程入门
  12. 【Spring基础】CGLIB动态代理实现原理
  13. linux终端无法输入大写字母,linux不能打大写字母
  14. Flash制作大雪纷飞效果动画
  15. 【题解】2019,7.14 模拟赛(阿鲁巴)
  16. 1063 Set Similarity (25 分) java 题解
  17. 安装SQL server出现“服务没有及时响应启动或控制请求”
  18. win10系统提示:“重新启动以修复驱动器错误”解决办法
  19. 【航线运输驾驶员理论考试】人的行为能力
  20. Windows AIK+Imagex+DISM+U盘PE+U盘安装win7+PE

热门文章

  1. iPhone(IOS10)忘记了访问限制的密码该怎么办?
  2. GPLT练习集L1 25--32
  3. 系统svchost占用内存特别大的处理
  4. 程序员做外包能转正吗?外包员工能变正式员工吗?
  5. java 时间字符串比较大小_java中如何比较两个时间字符串的大小
  6. trunk端口配置错误导致环路
  7. Ubuntu 16.10 禁用 Guest 访客模式
  8. 2019网络购车平台易车的发展
  9. 炎黄传媒人事大变 投资方查账上现金引发恐慌
  10. 狗的品种识别实战(tf2.0)