apachecn/storm-doc-zh

本文主要是将之前学习的Trident做一个总结,由于Trident的资料相对较少,排坑过程中相对艰难(也可能是我打开的方式不对orz),本文主要将Trident API的各种实现给罗列了一下,主要分为如下几个部分:

  • 概述: 讲述需求
  • Trident实现WordCount
  • RedisState实践
  • Trident DRPC 实践
  • TridentKafkaSpout实践

具体的代码不再做过多的解释,均在注解中体现,其他可以参考之前的博文或官方文档具体深入学习。

概述

需求我们要通过Trident对一个文本中的词频进行统计.
本例将从如下这样一个无限的输入流中读取语句作为输入:

FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,new Values("the cow jumped over the moon"),new Values("the man went to the store and bought some candy"),new Values("four score and seven years ago"),new Values("how many apples can you eat"));
spout.setCycle(true); //为了更好理解tuple按照batch处理数据,这里设置为true

这个 spout 会循环 sentences 数据集,不断输出 sentence stream ,
Trident 在处理输入 stream 的时候会转换成 batch (包含若干个 tuple )来处理. 比如说, 输入的 sentence stream 可能会被拆分成如下的 batch :

pom.xml
由于storm的各组件个版本插件差异较大,这里列出本机的主要的maven依赖。

    <properties><kafka.version>1.1.1</kafka.version></properties><dependencies><dependency><groupId>org.apache.storm</groupId><artifactId>storm-core</artifactId></dependency><dependency><groupId>org.apache.storm</groupId><artifactId>storm-kafka</artifactId><version>${storm.version}</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.12</artifactId><version>${kafka.version}</version><exclusions><exclusion><groupId>log4j</groupId><artifactId>log4j</artifactId></exclusion><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.storm</groupId><artifactId>storm-redis</artifactId><version>${storm.version}</version><type>jar</type></dependency></dependencies>

普通处理

public class WordCountApp {//构建topologyprivate static StormTopology buildTopology() {FixedBatchSpout spout1 = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),new Values("the man went to the store and bought some candy"),new Values("four score and seven years ago"),new Values("how many apples can you eat"));spout1.setCycle(false);TridentTopology topology = new TridentTopology();Stream sentenceStream = topology.newStream("spout1", spout1);Stream wordStream = sentenceStream.each(new Fields("sentence"), new Split(), new Fields("word"));TridentState wordCountState = wordStream.groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));//打印count后结果wordCountState.newValuesStream().peek(new Consumer() {@Overridepublic void accept(TridentTuple input) {System.out.println("【统计】--------" + input);}});return topology.build();}public static void main(String[] args) throws Exception {Config conf = new Config();conf.setMaxSpoutPending(20);if (args.length == 0) {LocalCluster cluster = new LocalCluster();cluster.submitTopology("app0", conf, buildTopology());} else {conf.setNumWorkers(3);StormSubmitter.submitTopology(args[0], conf, buildTopology());}}
}

RedisState

package cn.jhs.storm.trident.example.redis;import com.google.common.collect.Lists;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.redis.common.config.JedisPoolConfig;
import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import org.apache.storm.redis.common.mapper.RedisLookupMapper;
import org.apache.storm.redis.common.mapper.RedisStoreMapper;
import org.apache.storm.redis.trident.state.*;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentState;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.*;
import org.apache.storm.trident.operation.builtin.Count;
import org.apache.storm.trident.testing.FixedBatchSpout;
import org.apache.storm.trident.testing.MemoryMapState;
import org.apache.storm.trident.testing.Split;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.ITuple;
import org.apache.storm.tuple.Values;import java.util.List;/*** @author Trump* @desc* @date 2018/10/15 14:12*/
public class RedisTridentStateApp1 {//构建topologyprivate static StormTopology buildTopology() {//1.构建数据源spoutFixedBatchSpout spout1 = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),new Values("the man went to the store and bought some candy"),new Values("four score and seven years ago"),new Values("how many apples can you eat"));spout1.setCycle(false);TridentTopology topology = new TridentTopology();//2.构建sentence StreamStream sentenceStream = topology.newStream("spout1", spout1);//3.构建wordStreamStream wordStream = sentenceStream.each(new Fields("sentence"), new Split(), new Fields("word"));//4.构建RedisState FactoryJedisPoolConfig poolConfig = new JedisPoolConfig.Builder().setHost("192.168.129.157").setPort(6379) .build();RedisState.Factory factory = new RedisState.Factory(poolConfig);//5.构建query、update FunctionRedisStoreMapper storeMapper = new WordCountStoreMapper();RedisLookupMapper lookupMapper = new WordCountLookupMapper();//6.统计 word count,并存储到MemoryMapState中Stream wordCountStream = wordStream.groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(),new Fields("word"),new Count(),new Fields("count")).newValuesStream();//7.将wordCount 存储到RedisState中wordCountStream.partitionPersist(factory, new Fields("word","count"), new RedisStateUpdater(storeMapper).withExpire(86400000) , new Fields());TridentState state = topology.newStaticState(factory);//8.1 构建查询streamFixedBatchSpout queryParamSpout = new FixedBatchSpout(new Fields("param"), 1, new Values("the"));//8.2 将查询的fields 转换为state中已存在的streamStream fmtedQueryParamStream = topology.newStream("querySpout", queryParamSpout)//将接收的 param 重命名为 word.map(new MapFunction() {@Overridepublic Values execute(TridentTuple input) {return new Values(input.getString(0));}}, new Fields("word"));//8.3 查询 param 在state 中的count值fmtedQueryParamStream.stateQuery(state, new Fields("word"),new RedisStateQuerier(lookupMapper),new Fields("columnName", "columnValue")).peek(new Consumer() {@Overridepublic void accept(TridentTuple input) {String columnName = input.getStringByField("columnName");String columnValue = input.getStringByField("columnValue");System.out.println("lookupMapper=========" + columnName+":"+columnValue);}});return topology.build();}public static void main(String[] args) throws Exception {Config conf = new Config();conf.setMaxSpoutPending(20);if (args.length == 0) {LocalCluster cluster = new LocalCluster();cluster.submitTopology("app1", conf, buildTopology());} else {conf.setNumWorkers(3);StormSubmitter.submitTopology(args[0], conf, buildTopology());}}
}class WordCountLookupMapper implements RedisLookupMapper {private RedisDataTypeDescription description;private final String hashKey = "wordCount";public WordCountLookupMapper() {description = new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.HASH, hashKey);}@Overridepublic List<Values> toTuple(ITuple input, Object value) {String member = getKeyFromTuple(input);List<Values> values = Lists.newArrayList();values.add(new Values(member, value));return values;}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("wordName", "count"));}@Overridepublic RedisDataTypeDescription getDataTypeDescription() {return description;}@Overridepublic String getKeyFromTuple(ITuple tuple) {return tuple.getStringByField("word");}@Overridepublic String getValueFromTuple(ITuple tuple) {return null;}
}class WordCountStoreMapper implements RedisStoreMapper {private RedisDataTypeDescription description;private final String hashKey = "wordCount";public WordCountStoreMapper() {description = new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.HASH, hashKey);}@Overridepublic RedisDataTypeDescription getDataTypeDescription() {return description;}@Overridepublic String getKeyFromTuple(ITuple tuple) {return tuple.getStringByField("word");}@Overridepublic String getValueFromTuple(ITuple tuple) {return tuple.getLongByField("count").toString();}
}

Redis集群使用

     Set<InetSocketAddress> nodes = new HashSet<InetSocketAddress>();for (String hostPort : redisHostPort.split(",")) {String[] host_port = hostPort.split(":");nodes.add(new InetSocketAddress(host_port[0], Integer.valueOf(host_port[1])));}JedisClusterConfig clusterConfig = new JedisClusterConfig.Builder().setNodes(nodes).build();RedisClusterState.Factory factory = new RedisClusterState.Factory(clusterConfig);wordCountStream.partitionPersist(factory,fields,new RedisClusterStateUpdater(storeMapper).withExpire(86400000),new Fields());fmtedQueryParamStream.stateQuery(state, new Fields("word"),new RedisClusterStateQuerier(lookupMapper),new Fields("columnName","columnValue"));

TridentDrpc

public class WordCountDrpcApp2 {//将输入的sentence 按照 “ ”分割为 words[]static class MySplit extends BaseFunction {private String label;public MySplit(String label) {this.label = label;}public MySplit() {this("sentence");}@Overridepublic void execute(TridentTuple input, TridentCollector collector) {String sentence = input.getString(0);log(label, sentence);for (String word : sentence.split(" ")) {collector.emit(new Values(word));}}}//仅打印出 tuplestatic class PrintPeekConsumer implements Consumer {private String label;public PrintPeekConsumer(String label) {this.label = label;}@Overridepublic void accept(TridentTuple input) {log(label, input);}}//打印工具类static void log(String label, Object input) {System.out.println("【" + label + "】--------" + input);}//可循环的FixedBatchSpout,可以指定loop循环次数static class LoopFixedBatchSpout extends FixedBatchSpout {private int loop = 1;public LoopFixedBatchSpout(int loop, Fields fields, int maxBatchSize, List<Object>... outputs) {super(fields, maxBatchSize, outputs);this.loop = loop;}public LoopFixedBatchSpout(Fields fields, int maxBatchSize, List<Object>... outputs) {super(fields, maxBatchSize, outputs);}@Overridepublic void emitBatch(long batchId, TridentCollector collector) {while (loop > 0) {super.emitBatch(batchId, collector);loop--;}}}//构建topologyprivate static StormTopology buildTopology(LocalDRPC drpc) {int loop = new Random().nextInt(5)+1;System.out.println("=============初始化loop:"+loop+"=============");FixedBatchSpout spout1 = new LoopFixedBatchSpout(loop, new Fields("sentence"), 3, new Values("the cow jumped over the moon"),new Values("the man went to the store and bought some candy"),new Values("four score and seven years ago"),new Values("how many apples can you eat"));spout1.setCycle(false);TridentTopology topology = new TridentTopology();Stream sentenceStream = topology.newStream("spout1", spout1);Stream wordStream = sentenceStream.each(new Fields("sentence"), new MySplit(), new Fields("word")).peek(new PrintPeekConsumer("word"));TridentState wordCountState = wordStream.groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));//打印count后结果wordCountState.newValuesStream().peek(new PrintPeekConsumer("count"));topology.newDRPCStream("countWordFunc", drpc).each(new Fields("args"), new MySplit("drpc"), new Fields("word")).groupBy(new Fields("word")).stateQuery(wordCountState, new Fields("word"), new MapGet(), new Fields("count")).peek(new PrintPeekConsumer("drpc-count")).each(new Fields("count"), new FilterNull()).aggregate(new Fields("count"), new Sum(), new Fields("sum"));return topology.build();}public static void main(String[] args) throws Exception {Config conf = new Config();conf.setMaxSpoutPending(20);if (args.length == 0) {LocalDRPC drpc = new LocalDRPC();LocalCluster cluster = new LocalCluster();cluster.submitTopology("app2", conf, buildTopology(drpc));for (int i = 0; i < 20; i++) {Thread.sleep(180L);//使用LocalDRPC 结果需要在wordCountState处理完成之后,才可正确计算,故后续的结果可能为 e.g.: "[[0]]"或者"[[5 * loop]]".....System.out.println("############## DRPC RESULT: " + drpc.execute("countWordFunc", "cat dog the man"));}} else {conf.setNumWorkers(3);StormSubmitter.submitTopology(args[0], conf, buildTopology(null));}}}

注意使用DRPC需要注意,在使用LocalDRPC时,计算的结果可能与预期的结果不同,那是因为storm正在计算中,或者尚未开始执行计算。

TridentKafkaSpout

 private static StormTopology buildTopology() {TridentTopology topology = new TridentTopology();BrokerHosts zk = new ZkHosts("s163:2181", "/brokers");//订阅topic=scadaTridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "testTopic1", "cosumerGroup9");//schemae定义默认outputFields= "str"spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());// Read latest messages from Kafka// -1:LatestTime , -2:EarliestTimespoutConf.startOffsetTime = -1;OpaqueTridentKafkaSpout spout2 = new OpaqueTridentKafkaSpout(spoutConf);Stream stream = topology.newStream("spout2", spout2).each(new Fields("str"), new BaseFunction() {@Overridepublic void execute(TridentTuple tuple, TridentCollector collector) {String msg = tuple.getStringByField("str");try {Map info = new ObjectMapper().readValue(msg, Map.class);//解析获取 消息中的latnId,并emitString latnId = MapUtils.getString(info, "latnId");collector.emit(new Values(msg, latnId));} catch (IOException e) {e.printStackTrace();}}}, new Fields("msg", "latnId")).groupBy(new Fields("latnId"))//统计各个批次中 按照latnId分区的数据总量有多少条.aggregate(new Fields("latnId"), new Count(), new Fields("count")).parallelismHint(5).peek(new Consumer() {@Overridepublic void accept(TridentTuple input) {System.out.println("=========" + input.getStringByField("latnId") + "::" + input.getLongByField("count"));}});return topology.build();}

msg消息为json格式,如下

{"key4" : "value4","latnId" : "010","msg" : "xxxx","msgId" : "1"
}

Storm-Trident实践相关推荐

  1. Storm Trident拓扑中的错误处理

    这篇文章总结了我在设计Storm Trident拓扑时当前的错误处理方法. 我在这里关注代码设计,而不是监督或冗余之类的部署良好实践. 由于Storm的实时流性质,当面对大多数错误时,我们最终将不得不 ...

  2. [Trident] Storm Trident 教程,state详解、trident api详解及实例

    英文原址:https://github.com/nathanmarz/storm/wiki/Trident-tutorial ---------------- Trident是在storm基础上,一个 ...

  3. Storm Trident 详细介绍

    一.概要 1.1 Storm(简介)      Storm是一个实时的可靠地分布式流计算框架.      具体就不多说了,举个例子,它的一个典型的大数据实时计算应用场景:从Kafka消息队列读取消息( ...

  4. Storm Trident API

    在Storm Trident中有五种操作类型 Apply Locally:本地操作,所有操作应用在本地节点数据上,不会产生网络传输 Repartitioning:数据流重定向,单纯的改变数据流向,不会 ...

  5. Storm【实践系列-如何写一个爬虫】 - ParserBolt

    2019独角兽企业重金招聘Python工程师标准>>> 阅读背景: 如果您对爬虫,或则web前端不够了解,请自行google. 代码前提:您需要参阅本ID 所写的前面两篇博文:  S ...

  6. Storm Trident示例shuffleparallelismHint

    本例包括Storm Trident中shuffle与parallelismHint的使用. 代码当中包括注释 maven <dependency><groupId>org.ap ...

  7. Storm Trident简介

    转载自:[翻译][Trident] Storm Trident 教程 英文原址:https://github.com/nathanmarz/storm/wiki/Trident-tutorial -- ...

  8. storm trident mysql_Trident-MySQL

    使用事物TridentTopology 持久化数据到MySQL1.构建拓扑JDBCTopology类packagestorm.trident.mysql;importjava.util.Arrays; ...

  9. Storm - Trident

    [align=center][size=large]Trident[/size][/align] 一.Storm 保证性 1.数据一定会发送 通过 ack / fail 方法确认,若失败,则提供重新发 ...

  10. Storm Trident API实践

    译 在4月10日柏林BigData啤酒节上,Pere介绍了Trident,于此同时,来自Continuum Analytics也介绍了Disco.在Storm环节中大家了解了正确使用Trident的基 ...

最新文章

  1. 张海腾:语音识别实践教程
  2. Android开发--Http操作介绍(一)
  3. mysql密码修改脚本
  4. MDK生成的BIN文件用DNW通过USB下载RAM中运行的问题
  5. MySQL中如何约束和排序数据
  6. Android演示Stack(课下作业)
  7. java模拟数据库压测_Jmeter压测工具使用总结
  8. win7系统mysql连接不上数据库吗_Win7系统使用数据库时mysql频繁掉线无法连接的两种解决方法...
  9. php实现防止sql注入的通用方法,PHP简单实现防止SQL注入的方法
  10. Linux driver读书笔记(2) - Bus Types总线类型(mybus/mydevice/mydriver实例)
  11. 如何判断一个数是否为素数(质数)?
  12. Excel的通配符使用
  13. buuctf——(HDCTF2019)Maze
  14. 【LeetCode-SQL】1179. 重新格式化部门表
  15. 湖南大学计算机专业推免生,湖南大学2018年招收推荐免试攻读研究生简章
  16. 自制万能xp镜像让重做系统变得简单
  17. 联想T440怎么把原装Win8或Win10换成Win7系统
  18. Unity 2D横版移动跳跃
  19. ITIL运维服务管理的26个流程的密切关系说明
  20. 如今学什么编程语言最好?这5种招聘最多的岗位了解一下

热门文章

  1. 三种洗牌算法shuffle
  2. Skyline TerraExplorer 自定义飞行转弯速度不管用解决办法
  3. 苹果切换输入法_轻松搞定缅甸语手机输入法(苹果篇)
  4. 【论文精读1】基于BN的模型剪枝-Learning Efficient Convolution Networks through Network Slimming
  5. 猪哥学习群直播第一期:人工智能在银行电信企业中的应用
  6. 移动端 自适应布局方案
  7. 全同态加密:FHEW
  8. 发图之梅梅的设计图——继续纠结飞机稿
  9. PHPCMS短信接口替换
  10. PHP环境提取m3u8,PHP读取转发M3U8的方法 PHP解码转发M3U8