本例包括Storm Trident中shuffle与parallelismHint的使用。

代码当中包括注释

maven

<dependency><groupId>org.apache.storm</groupId><artifactId>storm-core</artifactId><version>1.2.1</version><scope>provided</scope>
</dependency>

import java.util.Date;
import java.util.List;
import java.util.Map;  import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.BaseFilter;
import org.apache.storm.trident.operation.TridentOperationContext;
import org.apache.storm.trident.testing.FixedBatchSpout;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;  public class TridentTest {  public static class Debug extends BaseFilter {  private static final long serialVersionUID = -3136720361960744881L;  private final String name;  private int partitionIndex;    public Debug() {  this(false);  }  public Debug(boolean useLogger) {  this.name = "DEBUG: ";  }  public Debug(String name) {  this.name = "DEBUG(" + name + "): ";  }  @Override  public void prepare(Map conf, TridentOperationContext context) {  this.partitionIndex = context.getPartitionIndex();    super.prepare(conf, context);  }  @Override  public boolean isKeep(TridentTuple tuple) {  System.out.println("<"+new Date()+"[partition"+partitionIndex+"-"+Thread.currentThread().getName()+"]"+"> "+name + tuple.toString());  return true;  }  }  public static class MyFixedBatchSpout extends FixedBatchSpout {  public MyFixedBatchSpout(Fields fields, int maxBatchSize, List<Object>... outputs) {  super(fields, maxBatchSize, outputs);  }  @Override  public Map<String, Object> getComponentConfiguration() {  Config conf = new Config();  // 设置此组件的最大并发度  //conf.setMaxTaskParallelism(1);  return conf;  }  }  public static StormTopology buildTopology() {  //FixedBatchSpout, 发射出两个字段,user与score, 一个batch中包括3个tuples  FixedBatchSpout spout = new MyFixedBatchSpout(new Fields("user", "score"), 3,   new Values("john1", 4),  new Values("john2", 7),   new Values("john3", 8),  new Values("john4", 9),   new Values("john5", 7),  new Values("john6", 11),  new Values("john7", 5)  );  spout.setCycle(false);  TridentTopology topology = new TridentTopology();  topology.newStream("spout1", spout)  .parallelismHint(2)//设置spout的并行度为2,因为上面数据jonh1到john7一共有7条数据,则1共会发射2*7=14条数据  .shuffle()  .each(new Fields("user"),new Debug("print:"))  .parallelismHint(5);//设置Debug并行度为5,由于使用了shuffle,14个tuple会随机分步到5个partion当中  return topology.build();  }  public static void main(String[] args) throws Exception {  Config conf = new Config();  conf.setMaxSpoutPending(200);  conf.setNumWorkers(30);  conf.setMessageTimeoutSecs(100000);  LocalCluster local = new LocalCluster();    local.submitTopology("test-topology", conf, buildTopology());   }
}

  

输出结果如下:一共14条 tuples,分布上0-4的partition里

<Fri Mar 23 14:17:13 CST 2018[partition1-Thread-146-b-0-executor[34 34]]> DEBUG(print:): [john2]
<Fri Mar 23 14:17:13 CST 2018[partition4-Thread-119-b-0-executor[37 37]]> DEBUG(print:): [john5]
<Fri Mar 23 14:17:13 CST 2018[partition4-Thread-119-b-0-executor[37 37]]> DEBUG(print:): [john6]
<Fri Mar 23 14:17:13 CST 2018[partition4-Thread-119-b-0-executor[37 37]]> DEBUG(print:): [john7]
<Fri Mar 23 14:17:13 CST 2018[partition2-Thread-53-b-0-executor[35 35]]> DEBUG(print:): [john4]
<Fri Mar 23 14:17:13 CST 2018[partition3-Thread-116-b-0-executor[36 36]]> DEBUG(print:): [john4]
<Fri Mar 23 14:17:13 CST 2018[partition1-Thread-146-b-0-executor[34 34]]> DEBUG(print:): [john2]
<Fri Mar 23 14:17:13 CST 2018[partition1-Thread-146-b-0-executor[34 34]]> DEBUG(print:): [john3]
<Fri Mar 23 14:17:13 CST 2018[partition1-Thread-146-b-0-executor[34 34]]> DEBUG(print:): [john7]
<Fri Mar 23 14:17:13 CST 2018[partition0-Thread-78-b-0-executor[33 33]]> DEBUG(print:): [john1]
<Fri Mar 23 14:17:13 CST 2018[partition0-Thread-78-b-0-executor[33 33]]> DEBUG(print:): [john3]
<Fri Mar 23 14:17:13 CST 2018[partition2-Thread-53-b-0-executor[35 35]]> DEBUG(print:): [john1]
<Fri Mar 23 14:17:13 CST 2018[partition2-Thread-53-b-0-executor[35 35]]> DEBUG(print:): [john5]
<Fri Mar 23 14:17:13 CST 2018[partition2-Thread-53-b-0-executor[35 35]]> DEBUG(print:): [john6]

转载于:https://www.cnblogs.com/nickt/p/8630124.html

Storm Trident示例shuffleparallelismHint相关推荐

  1. Storm Trident示例function, filter, projection

    以下代码演示function, filter, projection的使用,可结合注释 省略部分代码,省略部分可参考:https://blog.csdn.net/nickta/article/deta ...

  2. Storm Trident拓扑中的错误处理

    这篇文章总结了我在设计Storm Trident拓扑时当前的错误处理方法. 我在这里关注代码设计,而不是监督或冗余之类的部署良好实践. 由于Storm的实时流性质,当面对大多数错误时,我们最终将不得不 ...

  3. Storm Trident API

    在Storm Trident中有五种操作类型 Apply Locally:本地操作,所有操作应用在本地节点数据上,不会产生网络传输 Repartitioning:数据流重定向,单纯的改变数据流向,不会 ...

  4. Storm Trident简介

    转载自:[翻译][Trident] Storm Trident 教程 英文原址:https://github.com/nathanmarz/storm/wiki/Trident-tutorial -- ...

  5. [Trident] Storm Trident 教程,state详解、trident api详解及实例

    英文原址:https://github.com/nathanmarz/storm/wiki/Trident-tutorial ---------------- Trident是在storm基础上,一个 ...

  6. storm trident mysql_Trident-MySQL

    使用事物TridentTopology 持久化数据到MySQL1.构建拓扑JDBCTopology类packagestorm.trident.mysql;importjava.util.Arrays; ...

  7. Storm Trident 详细介绍

    一.概要 1.1 Storm(简介)      Storm是一个实时的可靠地分布式流计算框架.      具体就不多说了,举个例子,它的一个典型的大数据实时计算应用场景:从Kafka消息队列读取消息( ...

  8. Storm - Trident

    [align=center][size=large]Trident[/size][/align] 一.Storm 保证性 1.数据一定会发送 通过 ack / fail 方法确认,若失败,则提供重新发 ...

  9. storm trident mysql,storm_Trident

    简介: Trident 是 Storm 的一种高度抽象的实时计算模型,它可以将高吞吐量(每秒百万级)数据输入.有状态的流式处理与低延时的分布式查询无缝结合起来.如果你了解 Pig 或者 Cascadi ...

最新文章

  1. Properties相关
  2. @requestbody 接受int参数_C++之指针作为函数参数
  3. 对计算机辅助英语的看法,论我国计算机辅助英语笔译的必要性
  4. VueJS样式绑定v-bind:class
  5. springboot10 Web开发静态资源
  6. 如何在本机安装mysql_机器人之如何在本机安装MySQL,并配置电脑为数据库服务器...
  7. 微型计算机系统视频适配器为,计算机硬件基础1-微型计算机系统组成(含教材6,8,9章内容).ppt...
  8. arduino通过串口监视器读取一行字符
  9. 1.4 px30驱动移植-网卡驱动找不到网卡解决
  10. D511 外置功放软件烧录方法
  11. 常用的颜色色值(转)
  12. 使用电脑风扇控制软件Macs Fan Control Pro更好的管理电风扇
  13. ValueError: Wrong number of items passed 2, placement implies 1
  14. PS轻松打造低多边形风格图像
  15. Java中多线程、多线程的实现方式、同步代码块的方式
  16. [Codeforces Round #516][Codeforces 1063C/1064E. Dwarves, Hats and Extrasensory Abilities]
  17. k8s(三):命令行工具kubectl与核心技术Pod
  18. GE核磁共振常见术语
  19. WOS(SCI)爬虫案例
  20. CP2102国产替代DPU02— USB 转 UART 桥接芯片

热门文章

  1. 山东传媒职业学院计算机专业,山东传媒职业学院计算机多媒体技术专业2017年在内蒙古文科高考录取最低分数线...
  2. mysql log error_MySQL日志之error_log
  3. java 做猜数字小游戏_Java Properties类:利用Properties类制作猜数字小游戏
  4. 企业级内网的域控环境搭建教程
  5. [Web 前端] SuperAgent中文使用文档
  6. 面向切面的Spring
  7. Apache的流处理技术概述
  8. MySQL Create Table创建表
  9. android 成长日记 3.关于Activity的用户体验提升办法和使用技巧说明
  10. SVN 的安装与配置