本文主要研究一下storm的PartialKeyGrouping

实例

    @Testpublic void testPartialKeyGrouping() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {String spoutId = "wordGenerator";String counterId = "counter";String aggId = "aggregator";String intermediateRankerId = "intermediateRanker";String totalRankerId = "finalRanker";int TOP_N = 5;TopologyBuilder builder = new TopologyBuilder();builder.setSpout(spoutId, new TestWordSpout(), 5);//NOTE 通过partialKeyGrouping替代fieldsGrouping,实现较为均衡的负载到countBoltbuilder.setBolt(counterId, new RollingCountBolt(9, 3), 4).partialKeyGrouping(spoutId, new Fields("word"));builder.setBolt(aggId, new RollingCountAggBolt(), 4).fieldsGrouping(counterId, new Fields("obj"));builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(aggId, new Fields("obj"));builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId);submitRemote(builder);}
复制代码
  • 值得注意的是在wordCount的bolt使用PartialKeyGrouping,同一个单词不再固定发给相同的task,因此这里还需要RollingCountAggBolt按fieldsGrouping进行合并。

PartialKeyGrouping(1.2.2版)

storm-core-1.2.2-sources.jar!/org/apache/storm/grouping/PartialKeyGrouping.java

public class PartialKeyGrouping implements CustomStreamGrouping, Serializable {private static final long serialVersionUID = -447379837314000353L;private List<Integer> targetTasks;private long[] targetTaskStats;private HashFunction h1 = Hashing.murmur3_128(13);private HashFunction h2 = Hashing.murmur3_128(17);private Fields fields = null;private Fields outFields = null;public PartialKeyGrouping() {//Empty}public PartialKeyGrouping(Fields fields) {this.fields = fields;}@Overridepublic void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {this.targetTasks = targetTasks;targetTaskStats = new long[this.targetTasks.size()];if (this.fields != null) {this.outFields = context.getComponentOutputFields(stream);}}@Overridepublic List<Integer> chooseTasks(int taskId, List<Object> values) {List<Integer> boltIds = new ArrayList<>(1);if (values.size() > 0) {byte[] raw;if (fields != null) {List<Object> selectedFields = outFields.select(fields, values);ByteBuffer out = ByteBuffer.allocate(selectedFields.size() * 4);for (Object o: selectedFields) {if (o instanceof List) {out.putInt(Arrays.deepHashCode(((List)o).toArray()));} else if (o instanceof Object[]) {out.putInt(Arrays.deepHashCode((Object[])o));} else if (o instanceof byte[]) {out.putInt(Arrays.hashCode((byte[]) o));} else if (o instanceof short[]) {out.putInt(Arrays.hashCode((short[]) o));} else if (o instanceof int[]) {out.putInt(Arrays.hashCode((int[]) o));} else if (o instanceof long[]) {out.putInt(Arrays.hashCode((long[]) o));} else if (o instanceof char[]) {out.putInt(Arrays.hashCode((char[]) o));} else if (o instanceof float[]) {out.putInt(Arrays.hashCode((float[]) o));} else if (o instanceof double[]) {out.putInt(Arrays.hashCode((double[]) o));} else if (o instanceof boolean[]) {out.putInt(Arrays.hashCode((boolean[]) o));} else if (o != null) {out.putInt(o.hashCode());} else {out.putInt(0);}}raw = out.array();} else {raw = values.get(0).toString().getBytes(); // assume key is the first field}int firstChoice = (int) (Math.abs(h1.hashBytes(raw).asLong()) % this.targetTasks.size());int secondChoice = (int) (Math.abs(h2.hashBytes(raw).asLong()) % this.targetTasks.size());int selected = targetTaskStats[firstChoice] > targetTaskStats[secondChoice] ? secondChoice : firstChoice;boltIds.add(targetTasks.get(selected));targetTaskStats[selected]++;}return boltIds;}
}
复制代码
  • 可以看到PartialKeyGrouping是一种CustomStreamGrouping,在prepare的时候,初始化了long[] targetTaskStats用于统计每个task
  • partialKeyGrouping如果没有指定fields,则默认按outputFields的第一个field来计算
  • 这里使用guava类库提供的Hashing.murmur3_128函数,构造了两个HashFunction,然后计算哈希值的绝对值与targetTasks.size()取余数得到两个可选的taskId下标
  • 然后根据targetTaskStats的统计值,取用过的次数小的那个taskId,选中之后更新targetTaskStats

PartialKeyGrouping(2.0.0版)

storm-2.0.0/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java

/*** A variation on FieldGrouping. This grouping operates on a partitioning of the incoming tuples (like a FieldGrouping), but it can send* Tuples from a given partition to multiple downstream tasks.** Given a total pool of target tasks, this grouping will always send Tuples with a given key to one member of a subset of those tasks. Each* key is assigned a subset of tasks. Each tuple is then sent to one task from that subset.** Notes: - the default TaskSelector ensures each task gets as close to a balanced number of Tuples as possible - the default* AssignmentCreator hashes the key and produces an assignment of two tasks*/
public class PartialKeyGrouping implements CustomStreamGrouping, Serializable {private static final long serialVersionUID = -1672360572274911808L;private List<Integer> targetTasks;private Fields fields = null;private Fields outFields = null;private AssignmentCreator assignmentCreator;private TargetSelector targetSelector;public PartialKeyGrouping() {this(null);}public PartialKeyGrouping(Fields fields) {this(fields, new RandomTwoTaskAssignmentCreator(), new BalancedTargetSelector());}public PartialKeyGrouping(Fields fields, AssignmentCreator assignmentCreator) {this(fields, assignmentCreator, new BalancedTargetSelector());}public PartialKeyGrouping(Fields fields, AssignmentCreator assignmentCreator, TargetSelector targetSelector) {this.fields = fields;this.assignmentCreator = assignmentCreator;this.targetSelector = targetSelector;}@Overridepublic void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {this.targetTasks = targetTasks;if (this.fields != null) {this.outFields = context.getComponentOutputFields(stream);}}@Overridepublic List<Integer> chooseTasks(int taskId, List<Object> values) {List<Integer> boltIds = new ArrayList<>(1);if (values.size() > 0) {final byte[] rawKeyBytes = getKeyBytes(values);final int[] taskAssignmentForKey = assignmentCreator.createAssignment(this.targetTasks, rawKeyBytes);final int selectedTask = targetSelector.chooseTask(taskAssignmentForKey);boltIds.add(selectedTask);}return boltIds;}/*** Extract the key from the input Tuple.*/private byte[] getKeyBytes(List<Object> values) {byte[] raw;if (fields != null) {List<Object> selectedFields = outFields.select(fields, values);ByteBuffer out = ByteBuffer.allocate(selectedFields.size() * 4);for (Object o : selectedFields) {if (o instanceof List) {out.putInt(Arrays.deepHashCode(((List) o).toArray()));} else if (o instanceof Object[]) {out.putInt(Arrays.deepHashCode((Object[]) o));} else if (o instanceof byte[]) {out.putInt(Arrays.hashCode((byte[]) o));} else if (o instanceof short[]) {out.putInt(Arrays.hashCode((short[]) o));} else if (o instanceof int[]) {out.putInt(Arrays.hashCode((int[]) o));} else if (o instanceof long[]) {out.putInt(Arrays.hashCode((long[]) o));} else if (o instanceof char[]) {out.putInt(Arrays.hashCode((char[]) o));} else if (o instanceof float[]) {out.putInt(Arrays.hashCode((float[]) o));} else if (o instanceof double[]) {out.putInt(Arrays.hashCode((double[]) o));} else if (o instanceof boolean[]) {out.putInt(Arrays.hashCode((boolean[]) o));} else if (o != null) {out.putInt(o.hashCode());} else {out.putInt(0);}}raw = out.array();} else {raw = values.get(0).toString().getBytes(); // assume key is the first field}return raw;}//......
}
复制代码
  • 2.0.0版本将逻辑封装到了RandomTwoTaskAssignmentCreator以及BalancedTargetSelector中

RandomTwoTaskAssignmentCreator

storm-2.0.0/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java

    /*** This interface is responsible for choosing a subset of the target tasks to use for a given key.** NOTE: whatever scheme you use to create the assignment should be deterministic. This may be executed on multiple Storm Workers, thus* each of them needs to come up with the same assignment for a given key.*/public interface AssignmentCreator extends Serializable {int[] createAssignment(List<Integer> targetTasks, byte[] key);}/*========== Implementations ==========*//*** This implementation of AssignmentCreator chooses two arbitrary tasks.*/public static class RandomTwoTaskAssignmentCreator implements AssignmentCreator {/*** Creates a two task assignment by selecting random tasks.*/public int[] createAssignment(List<Integer> tasks, byte[] key) {// It is necessary that this produce a deterministic assignment based on the key, so seed the Random from the keyfinal long seedForRandom = Arrays.hashCode(key);final Random random = new Random(seedForRandom);final int choice1 = random.nextInt(tasks.size());int choice2 = random.nextInt(tasks.size());// ensure that choice1 and choice2 are not the same taskchoice2 = choice1 == choice2 ? (choice2 + 1) % tasks.size() : choice2;return new int[]{ tasks.get(choice1), tasks.get(choice2) };}}
复制代码
  • 2.0.0版本不再使用guava类库提供的Hashing.murmur3_128哈希函数,转而使用key的哈希值作为seed,采用Random函数来计算两个taskId的下标,这里返回两个值供bolt做负载均衡选择

BalancedTargetSelector

storm-2.0.0/storm-client/src/jvm/org/apache/storm/grouping/PartialKeyGrouping.java

    /*** This interface chooses one element from a task assignment to send a specific Tuple to.*/public interface TargetSelector extends Serializable {Integer chooseTask(int[] assignedTasks);}/*** A basic implementation of target selection. This strategy chooses the task within the assignment that has received the fewest Tuples* overall from this instance of the grouping.*/public static class BalancedTargetSelector implements TargetSelector {private Map<Integer, Long> targetTaskStats = Maps.newHashMap();/*** Chooses one of the incoming tasks and selects the one that has been selected the fewest times so far.*/public Integer chooseTask(int[] assignedTasks) {Integer taskIdWithMinLoad = null;Long minTaskLoad = Long.MAX_VALUE;for (Integer currentTaskId : assignedTasks) {final Long currentTaskLoad = targetTaskStats.getOrDefault(currentTaskId, 0L);if (currentTaskLoad < minTaskLoad) {minTaskLoad = currentTaskLoad;taskIdWithMinLoad = currentTaskId;}}targetTaskStats.put(taskIdWithMinLoad, targetTaskStats.getOrDefault(taskIdWithMinLoad, 0L) + 1);return taskIdWithMinLoad;}}
复制代码
  • BalancedTargetSelector根据选中的taskId,然后根据targetTaskStats计算taskIdWithMinLoad返回

FieldsGrouper

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java

    public static class FieldsGrouper implements CustomStreamGrouping {private Fields outFields;private List<List<Integer>> targetTasks;private Fields groupFields;private int numTasks;public FieldsGrouper(Fields outFields, Grouping thriftGrouping) {this.outFields = outFields;this.groupFields = new Fields(Thrift.fieldGrouping(thriftGrouping));}@Overridepublic void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {this.targetTasks = new ArrayList<List<Integer>>();for (Integer targetTask : targetTasks) {this.targetTasks.add(Collections.singletonList(targetTask));}this.numTasks = targetTasks.size();}@Overridepublic List<Integer> chooseTasks(int taskId, List<Object> values) {int targetTaskIndex = TupleUtils.chooseTaskIndex(outFields.select(groupFields, values), numTasks);return targetTasks.get(targetTaskIndex);}}
复制代码
  • 这里可以看到FieldsGrouper的chooseTasks方法使用TupleUtils.chooseTaskIndex来选择taskId下标

TupleUtils.chooseTaskIndex

storm-2.0.0/storm-client/src/jvm/org/apache/storm/utils/TupleUtils.java

    public static <T> int chooseTaskIndex(List<T> keys, int numTasks) {return Math.floorMod(listHashCode(keys), numTasks);}private static <T> int listHashCode(List<T> alist) {if (alist == null) {return 1;} else {return Arrays.deepHashCode(alist.toArray());}}
复制代码
  • 这里先对keys进行listHashCode,然后与numTasks进行Math.floorMod运算,即向下取模
  • listHashCode调用了Arrays.deepHashCode(alist.toArray())进行哈希值计算

小结

  • storm的PartialKeyGrouping是解决fieldsGrouping造成的bolt节点skewed load的问题
  • fieldsGrouping采取的是对所选字段进行哈希然后与taskId数量向下取模来选择taskId的下标
  • PartialKeyGrouping在1.2.2版本的实现是使用guava提供的Hashing.murmur3_128哈希函数计算哈希值,然后取绝对值与taskId数量取余数得到两个可选的taskId下标;在2.0.0版本则使用key的哈希值作为seed,采用Random函数来计算两个taskId的下标。注意这里返回两个值供bolt做负载均衡选择,这是与fieldsGrouping的差别。在得到两个候选taskId之后,PartialKeyGrouping额外维护了taskId的使用数,每次选择使用少的,与此同时也更新每次选择的计数。
  • 值得注意的是在wordCount的bolt使用PartialKeyGrouping,同一个单词不再固定发给相同的task,因此这里还需要RollingCountAggBolt按fieldsGrouping进行合并。

doc

  • Common Topology Patterns
  • The Power of Both Choices: Practical Load Balancing for Distributed Stream Processing Engines
  • Storm-源码分析-Streaming Grouping (backtype.storm.daemon.executor)

聊聊storm的PartialKeyGrouping相关推荐

  1. 聊聊storm的AggregateProcessor的execute及finishBatch方法

    序 本文主要研究一下storm的AggregateProcessor的execute及finishBatch方法 实例 TridentTopology topology = new TridentTo ...

  2. 聊聊storm的LoggingClusterMetricsConsumer

    为什么80%的码农都做不了架构师?>>>    序 本文主要研究一下storm的LoggingClusterMetricsConsumer LoggingClusterMetrics ...

  3. 聊聊storm TridentBoltExecutor的finishBatch方法

    序 本文主要研究一下storm TridentBoltExecutor的finishBatch方法 MasterBatchCoordinator.nextTuple storm-core-1.2.2- ...

  4. 聊聊storm的stream的分流与合并

    序 本文主要研究一下storm的stream的分流与合并 实例 @Testpublic void testStreamSplitJoin() throws InvalidTopologyExcepti ...

  5. 聊聊storm的direct grouping

    序 本文主要研究一下storm的direct grouping direct grouping direct grouping是一种特殊的grouping,它是由上游的producer直接指定下游哪个 ...

  6. 聊聊storm的LoggingMetricsConsumer

    序 本文主要研究一下storm的LoggingMetricsConsumer LoggingMetricsConsumer storm-2.0.0/storm-client/src/jvm/org/a ...

  7. 聊聊storm supervisor的启动

    序 本文主要研究一下storm supervisor的启动 Supervisor.launch storm-core-1.2.2-sources.jar!/org/apache/storm/daemo ...

  8. 聊聊storm的IWaitStrategy

    为什么80%的码农都做不了架构师?>>>    序 本文主要研究一下storm的IWaitStrategy IWaitStrategy storm-2.0.0/storm-clien ...

  9. 聊聊storm nimbus的LeaderElector

    为什么80%的码农都做不了架构师?>>>    序 本文主要研究一下storm nimbus的LeaderElector Nimbus org/apache/storm/daemon ...

最新文章

  1. 学习如何写 Bug 的一天! | 每日趣闻
  2. SAP ECC6.0-中建信息版
  3. linux之间文件传输scp
  4. web_reverse_proxy -- haproxy
  5. R-Tree空间索引算法的研究历程和最新进展分析
  6. rc mysql common_RR与RC隔离级别下MySQL不同的加锁解锁方式
  7. MySQL锁定机制简介
  8. 传感器工作原理_荧光氧气传感器工作原理简介
  9. 高强度的加密软件怎么制作
  10. VB6之从1970年1月1日起的秒数 的与C语言类似的时间函数
  11. 基于asterisk/freewitch的webrtc电话
  12. 树状数组专题【完结】
  13. .jks或者.keystore文件查看MD5、SHA-1和SHA-256
  14. 学习版origin申请安装
  15. 磁盘分区,格式化,挂载
  16. Spark2.1.0 + CarbonData1.0.0集群模式部署及使用入门
  17. cropper.js 实现HTML5 裁剪上传头像
  18. 引入jquery不起作用 原因
  19. Linux不是Windows(转)
  20. 蓝桥杯-基础练习 查找整数

热门文章

  1. oracle一个表拆成多个表,oracle – 在oracle表中将多个以逗号分隔的值拆分为多行...
  2. csgo怎么控制电脑玩家_电脑怎么远程控制他人电脑,教您给电脑设置远程控制的方法...
  3. linux 挂载多余空间,linux 空间不够,磁盘挂载
  4. 74HC595芯片使用说明
  5. 内存区划分;内存分配;堆、栈概念分析;动态内存管理数据结构及程序样例;核心态与用户态...
  6. Missing artifact net.sf.json-lib:json-lib:jar:2.4错误和Eclipse安装Maven插件错误
  7. 超轻量级DI容器框架Google Guice与Spring框架的区别教程详解及其demo代码片段分享...
  8. 51nod 1040最大公约数和(欧拉函数)
  9. BZOJ3570 : DZY Loves Physics I
  10. 软件过程软件Scrum敏捷开发