trident State应用指南
trident State应用指南
@(STORM)[storm, 大数据]
- trident State应用指南
- 一State基础示例
- 1主类
- 2Aggregator的用法
- 1Aggregator接口
- 2init方法
- 3aggregate方法
- 4complete方法
- 3state的用法
- 1拓扑定义
- 2工厂类NameSumStateFactory
- 3更新类NameSumUpdater
- 4状态类NameSumState
- 4state应用步骤总结
- 5state应用的一些注意事项
- 6state与MapState的差异
- 二MapState
- 1persistentAggregate
- 2MapStates
- 3Demo
- 1创建一个实现IBackingMap的类实现multiGet和multiPut方法
- 2创建实现StateFactory的类
- 3定义Count函数
- 4在拓扑中写入state或者查询state
- 4关于MapState的总结
- 1基本步骤
- 2全流程逻辑
- 3复杂的情况
- 4其它思考
- 5MapState读写mysql示例
- 1MysqlMapStateFactory
- 2MysqlMapStateBacking
- 三以HBaseMapState为例分析MapState代码调用全过程
- 零概述 MapState被调用的全流程代码
- 1调用过程
- TOTO按着这个流程把代码重头读一遍先了解ITridentBatchBolt
- 2内容概述
- 一如何使用MapState
- 二如何实现一个MapStateHBaseMapState源码分析
- 1Option内部类
- 2Factory内部类
- 1构造函数
- 2makeState方法
- 3构造函数
- 4返回StateFactory的方法
- 5multiGet
- 6multiPut
- 7序列化器
- 三MapState框架
- TODO补充各个类的关系图参考P323
- 1build方法
- 2构造方法
- 3beginCommit
- TODO CachedBatchReadsMap分析
- 4commit
- 5multiGet
- 6multiPut
- 7multiUpdate
- 四storm如何调用MapState的代码
- 1GroupedStream类
- 零概述 MapState被调用的全流程代码
Trident及State的原理请见另一篇文章:http://blog.csdn.net/lujinhong2/article/details/47132305
简单总结:
1、最简单的情况使用IBacking的逻辑,很容易实现k-v格式的state。
2、如果IBacking不够灵活(不能取得txid,不是kv而是多列的格式),则直接实现MapState的接口。
3、最复杂的是使用State接口,最灵活,但真有必要吗?
第一二种方法比较:persistenceAggregate 第一个参数关键定义了如何去更新state(如mysql中的内容),比如先取出数据,更新txid,再写回去之类的,而第二个参数定义了以什么逻辑去更新数据,如求和、计算、还是平均之类的。 因此,反正第一个参数都只是返回一个MapState对象,那使用IBacking接口还是直接使用MapState接口都可以了,只是前者作了一些txid逻辑的封装,对应于几种state的类型,因此使用方便了一点,便事实上,它的代码是很简单的,它就是通过判断txid的关系来定义了update是如何使用get和put的,所以,可以直接实现MapState接口的update方法即可。
一、State基础示例
trident通过spout的事务性与state的事务处理,保证了恰好一次的语义。这里介绍了如何使用state。
完整代码请见 https://github.com/lujinhong/tridentdemo
1、主类
主类定义了拓扑的整体逻辑,这个拓扑通过一个固定的spout循环产生数据,然后统计消息中每个名字出现的次数。
拓扑中先将消息中的内容提取出来成name, age, title, tel4个field,然后通过project只保留name字段供统计,接着按照name分区后,为每个分区进行聚合,最后将聚合结果通过state写入map中。
storm.trident.Stream Origin_Stream = topology.newStream("tridentStateDemoId", spout).parallelismHint(3).shuffle().parallelismHint(3).each(new Fields("msg"), new Splitfield(),new Fields("name", "age", "title", "tel")).parallelismHint(3).project(new Fields("name")) //其实没什么必要,上面就不需要发射BCD字段,但可以示范一下project的用法.parallelismHint(3).partitionBy(new Fields("name")); //根据name的值作分区Origin_Stream.partitionAggregate(new Fields("name"), new NameCountAggregator(),new Fields("nameSumKey", "nameSumValue")).partitionPersist(new NameSumStateFactory(), new Fields("nameSumKey", "nameSumValue"),new NameSumUpdater());
2、Aggregator的用法
这里涉及了一些trident常用的API,但project等相对容易理解,这里只介绍partitionAggregate的用法。
再看看上面代码中对partitionAggregate的使用:
Origin_Stream.partitionAggregate(new Fields("name"), new NameCountAggregator(),new Fields("nameSumKey", "nameSumValue"))
第一,三个参数分别表示输入流的名称与输出流的名称。中间的NameCountAggregator是一个Aggregator的对象,它定义了如何对输入流进行聚合。我们看一下它的代码:
public class NameCountAggregator implements Aggregator<Map<String, Integer>> {private static final long serialVersionUID = -5141558506999420908L;@Overridepublic Map<String, Integer> init(Object batchId,TridentCollector collector) {return new HashMap<String, Integer>();}//判断某个名字是否已经存在于map中,若无,则put,若有,则递增@Overridepublic void aggregate(Map<String, Integer> map,TridentTuple tuple, TridentCollector collector) {String key=tuple.getString(0);if(map.containsKey(key)){Integer tmp=map.get(key);map.put(key, ++tmp);}else{map.put(key, 1);}}//将聚合后的结果emit出去@Overridepublic void complete(Map<String, Integer> map,TridentCollector collector) {if (map.size() > 0) {for(Entry<String, Integer> entry : map.entrySet()){System.out.println("Thread.id="+Thread.currentThread().getId()+"|"+entry.getKey()+"|"+entry.getValue());collector.emit(new Values(entry.getKey(),entry.getValue()));}map.clear();} }@Overridepublic void prepare(Map conf, TridentOperationContext context) {}@Overridepublic void cleanup() {}}
(1)Aggregator接口
它实现了Aggregator接口,这个接口有3个方法:
public interface Aggregator<T> extends Operation {T init(Object batchId, TridentCollector collector);void aggregate(T val, TridentTuple tuple, TridentCollector collector);void complete(T val, TridentCollector collector);
}
init方法:在处理batch之前被调用。init的返回值是一个表示聚合状态的对象,该对象会被传递到aggregate和complete方法。
aggregate方法:为每个在batch分区的输入元组所调用,更新状态
complete方法:当batch分区的所有元组已经被aggregate方法处理完后被调用。
除了实现Aggregator接口,还可以实现ReducerAggregator或者CombinerAggregator,它们使用更方便。详见《从零开始学storm》或者官方文档
https://storm.apache.org/documentation/Trident-API-Overview.html
下面我们看一下这3个方法的实现。
(2)init方法
@Override
public Map<String, Integer> init(Object batchId,TridentCollector collector) {return new HashMap<String, Integer>();
}
仅初始化了一个HashMap对象,这个对象会作为参数传给aggregate和complete方法。对一个batch只执行一次。
(3)aggregate方法
aggregate方法对于batch内的每一个tuple均执行一次。这里将这个batch内的名字出现的次数放到init方法所初始化的map中。
@Override
public void aggregate(Map<String, Integer> map,TridentTuple tuple, TridentCollector collector) {String key=tuple.getString(0);if(map.containsKey(key)){Integer tmp=map.get(key);map.put(key, ++tmp);}else{map.put(key, 1);}
}
(4)complete方法
这里在complete将aggregate处理完的结果发送出去,实际上可以在任何地方emit,比如在aggregate里面。
这个方法对于一个batch也只执行一次。
@Override
public void complete(Map<String, Integer> map,TridentCollector collector) {if (map.size() > 0) {for(Entry<String, Integer> entry : map.entrySet()){System.out.println("Thread.id="+Thread.currentThread().getId()+"|"+entry.getKey()+"|"+entry.getValue());collector.emit(new Values(entry.getKey(),entry.getValue()));}map.clear();}
}
3、state的用法
(1)拓扑定义
先看一下主类中如何将结果写入state:
partitionPersist(new NameSumStateFactory(), new Fields("nameSumKey", "nameSumValue"),new NameSumUpdater());
它的定义为:
TridentState storm.trident.Stream.partitionPersist(StateFactory stateFactory, Fields inputFields, StateUpdater updater)
其中的第二个参数比较容易理解,就是输入流的名称,这里是名字与它出现的个数。下面先看一下Facotry。
(2)工厂类:NameSumStateFactory
很简单,它实现了StateFactory,只有一个方法makeState,返回一个State类型的对象。
public class NameSumStateFactory implements StateFactory {private static final long serialVersionUID = 8753337648320982637L;@Overridepublic State makeState(Map arg0, IMetricsContext arg1, int arg2, int arg3) {return new NameSumState(); }
}
(3)更新类:NameSumUpdater
这个类继承自BaseStateUpdater,它的updateState对batch的内容进行处理,这里是将batch的内容放到一个map中,然后调用setBulk方法
public class NameSumUpdater extends BaseStateUpdater<NameSumState> {private static final long serialVersionUID = -6108745529419385248L;public void updateState(NameSumState state, List<TridentTuple> tuples, TridentCollector collector) {Map<String,Integer> map=new HashMap<String,Integer>();for(TridentTuple t: tuples) {map.put(t.getString(0), t.getInteger(1));}state.setBulk(map);}
}
(4)状态类:NameSumState
这是state最核心的类,它实现了大部分的逻辑。NameSumState实现了State接口:
public interface State {void beginCommit(Long txid); void commit(Long txid);
}
分别在提交之前与提交成功的时候调用,在这里只打印了一些信息。
另外NameSumState还定义了如何处理NameSumUpdater传递的消息:
public void setBulk(Map<String, Integer> map) {// 将新到的tuple累加至map中for (Entry<String, Integer> entry : map.entrySet()) {String key = entry.getKey();if (this.map.containsKey(key)) {this.map.put(key, this.map.get(key) + map.get(key));} else {this.map.put(key, entry.getValue());}}System.out.println("-------");// 将map中的当前状态打印出来。for (Entry<String, Integer> entry : this.map.entrySet()) {String Key = entry.getKey();Integer Value = entry.getValue();System.out.println(Key + "|" + Value);}
}
即将NameSumUpdater传送过来的内容写入一个HashMap中,并打印出来。
此处将state记录在一个HashMap中,如果需要记录在其它地方,如mysql,则使用jdbc写入mysql代替下面的map操作即可。
事实上,这个操作不一定要在state中执行,可以在任何类中,但建议还是在state类中实现。
4、state应用步骤总结
partitionPersist(new NameSumStateFactory(), new Fields("nameSumKey", "nameSumValue"),new NameSumUpdater());
state的应用步骤相当简单,原理也很简单:
(1)NameSumStateFactory()指定了将结果保存在哪里,如本例中的hashset,还可以是mysql/hbase等。当然还有更新逻辑,
(2)NameSumUpdater()指定了更新state的逻辑,如将当前数据和原有数据相加等。
5、state应用的一些注意事项
(1)使用state,你不再需要比较事务id,在数据库中同时写入多个值等内容,而是专注于你的逻辑实现
(2)除了实现State接口,更常用的是实现MapState接口,下次补充。
(3)在拓扑中指定了StateFactory,这个工厂类找到相应的State类。而Updater则每个批次均会调用它的方法。State中则定义了如何保存数据,这里将数据保存在内存中的一个HashMap,还可以保存在mysql, hbase等等。
(4)trident会自动比较txid的值,如果和当前一样,则不更改状态,如果是当前txid的下一个值,则更新状态。这种逻辑不需要用户处理。
(5)如果需要实现透明事务状态,则需要保存当前值与上一个值,在update的时候2个要同时处理。即逻辑由自己实现。在本例子中,大致思路是在NameSumState中创建2个HashMap,分别对应当前与上一个状态的值,而NameSumUpdater每次更新这2个Map。
6、state与MapState的差异
(1)由上面可以看出,state需要自己指定如何更新数据
if (this.map.containsKey(key)) {this.map.put(key, this.map.get(key) + map.get(key));} else {this.map.put(key, entry.getValue());}
}
这里是将原有的值,加上新到的值。而MapState会根据你选择的类型(Transactional, Opaque, NonTransactional)定义好逻辑,只要定义如果向state中读写数据即可。
(2)MapState将State的aggreate与persistent 2部分操作合在一起了,由方法名也可以看出。在State中最后2步是partitionAggregate()
与partitionPersistent()
,而在MapState中最后1步是persistentAggregate()
事实上,查看persistentAggregate()的实现,它最终也是分成aggregate和persistent 2个步骤的。
public TridentState persistentAggregate(StateSpec spec, Fields inputFields, CombinerAggregator agg, Fields functionFields) {return aggregate(inputFields, agg, functionFields).partitionPersist(spec,TridentUtils.fieldsUnion(_groupFields, functionFields),new MapCombinerAggStateUpdater(agg, _groupFields, functionFields),TridentUtils.fieldsConcat(_groupFields, functionFields));
}
二、MapState
1、persistentAggregate
Trident有另外一种更新State的方法叫做persistentAggregate。如下:
TridentTopology topology = new TridentTopology();
TridentState wordCounts = topology.newStream("spout1", spout) .each(new Fields("sentence"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
persistentAggregate是在partitionPersist之上的另外一层抽象。它知道怎么去使用一个Trident 聚合器来更新State。在这个例子当中,因为这是一个group好的stream,Trident会期待你提供的state是实现了MapState接口的。用来进行group的字段会以key的形式存在于State当中,聚合后的结果会以value的形式存储在State当中。MapState接口看上去如下所示:
public interface MapState<T> extends State { List<T> multiGet(List<List<Object>> keys); List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters); void multiPut(List<List<Object>> keys, List<T> vals);
}
当你在一个未经过group的stream上面进行聚合的话,Trident会期待你的state实现Snapshottable接口:
public interface Snapshottable<T> extends State { T get(); T update(ValueUpdater updater); void set(T o);
}
MemoryMapState 和 MemcachedState 都实现了上面的2个接口。
2、MapStates
在Trident中实现MapState是非常简单的,它几乎帮你做了所有的事情。OpaqueMap, TransactionalMap, 和 NonTransactionalMap 类实现了所有相关的逻辑,包括容错的逻辑。你只需要将一个IBackingMap 的实现提供给这些类就可以了。IBackingMap接口看上去如下所示:
public interface IBackingMap<T> { List<T> multiGet(List<List<Object>> keys); void multiPut(List<List<Object>> keys, List<T> vals);
}
OpaqueMap’s会用OpaqueValue的value来调用multiPut方法,TransactionalMap’s会提供TransactionalValue中的value,而NonTransactionalMaps只是简单的把从Topology获取的object传递给multiPut。
Trident还提供了一种CachedMap类来进行自动的LRU cache。
另外,Trident 提供了 SnapshottableMap 类将一个MapState 转换成一个 Snapshottable 对象.
大家可以看看 MemcachedState的实现,从而学习一下怎样将这些工具组合在一起形成一个高性能的MapState实现。MemcachedState是允许大家选择使用opaque transactional, transactional, 还是 non-transactional 语义的。
实现一个MapState,可以实现IBackingMap接口(mutliGet()/multiPut),并且实现StateFactory接口(makeState()),返回一个State对象,这是常见的用法
* 但如果有一引起高级需求,可以直接实现MapState接口,这样可以覆盖一些如beginCommit(Long txid);commit(Long txid);这些方法,还有multiUpdate()。*
3、Demo
完整代码请见 https://github.com/lujinhong/tridentdemo
- 更详细的可以参考trident-memcached(很全面,但较旧)
https://github.com/nathanmarz/trident-memcached - 或者storm-hbase的State实现等
在Trident中实现MapState是非常简单的,它和单纯的State不同点在于:OpaqueMap, TransactionalMap 和 NonTransactionalMap会实现相关的容错逻辑,只需为这些类提供一个IBackingMap接口实现,调用multiGet和multiPut方法访问各自的K/V值。
public interface IBackingMap<T> {List<T> multiGet(List<List<Object>> keys); void multiPut(List<List<Object>> keys, List<T> vals);
}
详细的步骤如下:
(1)创建一个实现IBackingMap的类,实现multiGet和multiPut方法
主要实现multiGet和multiPut的方法,实现如何从state中读写数据。
multiGet 的参数是一个List,可以根据key来查询数据,key本身也是一个List,以方便多个值组合成key的情形。
multiPut的参数是一个List类型的keys和一个List类型的values,它们的size应该是相等的,把这些值写入state中。
public class MemoryMapStateBacking<T> implements IBackingMap<T> {Map<List<Object>, T> db = new HashMap<List<Object>, T>();@Overridepublic List<T> multiGet(List<List<Object>> keys) {List<T> ret = new ArrayList();for (List<Object> key : keys) {ret.add(db.get(key));}return ret;}@Overridepublic void multiPut(List<List<Object>> keys, List<T> vals) {for (int i = 0; i < keys.size(); i++) {List<Object> key = keys.get(i);T val = vals.get(i);db.put(key, val);}}
}
这里将k/v写入了一个HashMap中,如果需要写入mysql,则只需要使用jdbc,把db.put改为写入mysql即可,查询类似。
(2)创建实现StateFactory的类
public class MemoryMapStateFacotry implements StateFactory{@Overridepublic State makeState(Map conf, IMetricsContext metrics,int partitionIndex, int numPartitions) {return TransactionalMap.build((IBackingMap<TransactionalValue>) new MemoryMapStateBacking());}}
很简单,就返回一个实现了MapState接口的类对象,通过把上面定义的MemoryMapStateBacking对象传入TransactionalMap.build作参数即可。当然还可以使用:
NonTransactionalMap.build(state);b
OpaqueMap.build(state);
(3)定义Count函数
用于说明如果将新来的数据与原来state中的数据组合更新。这里使用了storm提供的一个工具类,它将新来到的值与原有的值相加。
public class Count implements CombinerAggregator<Long> {@Overridepublic Long init(TridentTuple tuple) {return 1L;}@Overridepublic Long combine(Long val1, Long val2) {return val1 + val2;}@Overridepublic Long zero() {return 0L;}}
(4)在拓扑中写入state,或者查询state
//这个流程用于统计单词数据,结果将被保存在wordCounts中TridentState wordCounts =topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"), new Split(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(new MemoryMapStateFacotry(), new Count(),new Fields("count")).parallelismHint(16);//这个流程用于查询上面的统计结果topology.newDRPCStream("words", drpc).each(new Fields("args"), new Split(), new Fields("word")).groupBy(new Fields("word")).stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")).each(new Fields("count"), new FilterNull()).aggregate(new Fields("count"), new Sum(), new Fields("sum"));return topology.build();
4、关于MapState的总结
(1)基本步骤
(1)创建一个实现IBackingMap的类,实现multiGet和multiPut方法
(2)创建实现StateFactory的类,它的makeState返回一个实现了MapState接口的对象,可以通过:
mapState = TransactionalMap.build(_iBacking);
其中_iBacking就是第一步实现类的对象。当然还可以使用
mapState = NonTransactionalMap.build(state);mapState = OpaqueMap.build(state);
TransactionalMap,OpaqueMap, NonTransactionalMap已经通过判断txid的值实现了相应的事务逻辑,以TransactionalMap为例,它的源码中会判断batch中的txid与state中已经存储的是否相同,或者同的话则新值等于旧值即可:
if(_currTx!=null && _currTx.equals(val.getTxid()) && !retval.cached)
(3)在拓扑中使用persistentAggregate写入state
(2)全流程逻辑
以事务型状态为例,我们看一下整个存储过程的逻辑:
* 首先,persistentAggregate收到一批数据,它的第一个参数返回的是事务型的MapState
* 然后,TransactionalMap在multiUpdate中会判断这个事务的txid与当前state中的txid是否一致。
* 如果txid一致的话,则保持原来的值即可,如果txid不一致,则更新数值。
* 如果更新数据呢?它是拿新来的值和state中的原有的值,使用persistentAggregate中第2个参数定义的类方法作聚合计算。
* 第一个参数关键定义了如何去更新state(如mysql中的内容),比如先取出数据,更新txid,再写回去之类的,而第二个参数定义了以什么逻辑去更新数据,如求和、计算、还是平均之类的。* 因此,反正第一个参数都只是返回一个MapState对象,那使用IBacking接口还是直接使用MapState接口都可以了,只是前者作了一些txid逻辑的封装,对应于几种state的类型,因此使用方便了一点,便事实上,它的代码是很简单的,它就是通过判断txid的关系来定义了update是如何使用get和put的,所以,可以直接实现MapState接口的update方法即可。
persistentAggregate的第2个参数定义了数据是如何更新的,而IBackingMap中的multiGet和multiPut只定义了如何向state中存取数据。
比如此处的Count,它会将将2个数据相加:
@Override
public Long combine(Long val1, Long val2) {return val1 + val2;
}
因此新来的统计次数与原有的统计次数加起来即是新的总和。
而对于透明事务状态,不管txid是否一致,都需要修改state,同时将当前state保存一下,成为preState。非事务型就简单了,不管你来什么,我都直接更新。
(3)复杂的情况
当然,如果觉得TransactionalMap,OpaqueMap, NonTransactionalMap不能满足业务需求,则可以自定义一个实现了MapState接口的类,而不是直接使用它们。
反正这三个类的实现逻辑非常简单,当不能满足业务需要时,看一下源码,然后参考它创建自己的类即可,此时,关键是multiUpdate的实现。
(4)其它思考
key可以是一个很复杂的List,包括多个字段。
5、MapState读写mysql示例
(1)MysqlMapStateFactory
public class MysqlMapStateFactory<T> implements StateFactory {private static final long serialVersionUID = 1987523234141L;@Overridepublic State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {return TransactionalMap.build((IBackingMap<TransactionalValue>) new MysqlMapStateBacking());}
}
很简单,就一行,返回一个IBacking对象。这里使用的Transactioal,当然还可以使用NonTransactional和Opaque。
(2)MysqlMapStateBacking
最核心的还是multiGet()和multiPut:
@Overridepublic List<TransactionalValue> multiGet(List<List<Object>> keys) {if (stmt == null) {stmt = getStatment();}List<TransactionalValue> values = new ArrayList<TransactionalValue>();for (List<Object> key : keys) {String sql = "SELECT req_count FROM edt_analysis where id='" + key.get(0) + "'";LOG.debug("============sql: " + sql);try (ResultSet rs = stmt.executeQuery(sql)) {if (rs.next()) {LOG.info("Get value:{} by key:{}", rs.getObject(1), key);values.add(derialize(rs.getObject(1)));} else {values.add(null);}} catch (SQLException e) {e.printStackTrace();}}return values;}@Overridepublic void multiPut(List<List<Object>> keys, List<TransactionalValue> vals) {if (stmt == null) {stmt = getStatment();}for (int i = 0; i < keys.size(); i++) {String sql = "replace into edt_analysis values('" + keys.get(i).get(0) + "','" + serialize(vals.get(i))+ "')";LOG.debug("===================put sql " + sql);try {stmt.execute(sql);} catch (SQLException e) {e.printStackTrace();}}}
但mysql与redis之类的不同,它需要将一个TransactionalValue对象转换为mysql中的一行数据,同理,需要将mysql中的一行数据转换为一个TransactionalValue对象:
// 将数据库中的varchar转换为TransactionalValue对象
private TransactionalValue derialize(Object object) {String value[] = object.toString().split(",");return new TransactionalValue(Long.parseLong(value[0]), Long.parseLong(value[1]));
}// 将TransactionalValue转换为String
private String serialize(TransactionalValue transactionalValue) {return transactionalValue.getTxid() + "," + transactionalValue.getVal();
}
这是使用了最简单的方式,只有2列,一行是key,一列是value,value中保存了txid及真实的value,之间以逗号分隔。
三、以HBaseMapState为例分析MapState代码调用全过程
(零)概述 & MapState被调用的全流程代码
1、调用过程
(1)SubtopologyBolt implements ITridentBatchBolt
这个bolt在完成一个batch的处理后会调用finishBatch(BatchInfo batchInfo)
(2)然后调用PartitionPersistProcessor implements TridentProcessor
这个处理器的finishBatch(ProcessorContext processorContext)
(3)接着调用MapCombinerAggStateUpdater implements StateUpdater<MapState>
的updateState(MapState map, List<TridentTuple> tuples, TridentCollector collector)
(4)再接着调用TransactionalMap<T> implements MapState<T>
的 multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters)
(5)最后就是调用用户定义的MapState类(如HBaseMapState)的multiGet()和multiPut()方法了。
TOTO:按着这个流程把代码重头读一遍,先了解ITridentBatchBolt。
简单的说就是一个blot被处理完后,会调用finishBatch()方法,然后这个方法会调用MapState()框架的updateState(),接着调用mutliUpdate(),最后调用用户定义的multiGet()和multiPut()。
2、内容概述
本部分我们将以HBaseMapState为例,介绍使用MapState保证数据完整唯一的全流程代码调用,主要分成这几个部分:
(1)我们先介绍用户如何在构建代码中使用这个MapState
(2)然后介绍HBaseMapState的源代码,这也是用户需要实现一个MapState的基本方法。
(3)接着介绍MapState框架如何调用用户定义的代码形成事务性。
(4)最后介绍storm的内部机制,如何调用MapState。
这也是用户如何要看源码的逐步深入的过程。
(一)如何使用MapState
详细DEMO请见:https://github.com/lujinhong/stormhbasedemo
1、指定一些配置
HBaseMapState.Options option = new HBaseMapState.Options();option.tableName = "ljhtest2";option.columnFamily = "f1";option.mapMapper = new SimpleTridentHBaseMapMapper("ms");
SimpleTridentHBaseMapMapper主要用于获取Rowkey和qualifier。Option的完整选项见下面的源码分析。
2、指定state
topology.newStream("kafka", kafkaSpout).shuffle().each(new Fields("str"), new WordSplit(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(HBaseMapState.transactional(option), new Count(), new Fields("aggregates_words")).parallelismHint(1);
这里使用Option对象来构建一个HBaseMapStateFactory。
还可以通过
HBaseMapState.nonTransactional(option)
HBaseMapState.opaque(option)
分别创建非事务与透明的state。
这里使用了storm内建的Count()方法,如果使用Sum,用法如下:
.persistentAggregate(HBaseMapState.transactional(option), new Fields("cash"), new Sum(), new Fields("state")).parallelismHint(1);
当然还可以自定义方法,这里自定义方法也就可以自定义保存在hbase的数据类型了。
(二)如何实现一个MapState:HBaseMapState源码分析
HBaseMapState的主要代码都在HBaseMapState类中。一个MapState的实现关键在于
* 构建一个实现StateFactory的类,实现makeState() 方法,返回一个State对象。
* 一个MapState,实现IBackingMap接口的multiGet()和multiPut(),指定如何从hbase中读写数据。
关于mapstate的基础介绍请参考上面。
1、Option内部类
HBaseMapState有一个内部类:Option,用于指定一些配置项。
public static class Options<T> implements Serializable {public Serializer<T> serializer = null;public int cacheSize = 5000;public String globalKey = "$HBASE_STATE_GLOBAL$";public String configKey = "hbase.config";public String tableName;public String columnFamily;public TridentHBaseMapMapper mapMapper;
}
分别意思为:
* 序列化器,即以什么格式写入hbase,storm-hbase自带了JSON格式的序列化实现。
* 缓冲大小
* 未知
* 指定hbase-site.xml位置的变量
* 表名
* family名
* 用于获取rowkey和qualifier,创建对象时需要指定一个参数作为qualifier。
2、Factory内部类
(1)构造函数
构造函数接收2个参数,分别为state的类型以及Option对象。
除些以外,还指定了序列化器:
if (this.options.serializer == null) {this.options.serializer = DEFAULT_SERIALZERS.get(stateType);
}
(2)makeState()方法
就是返回一个State对象。
3、构造函数
构造函数用于加载配置文件,安全机制等。
4、返回StateFactory的方法
没什么好介绍的,就是返回各种类型的staStateFactory,具体的说就是返回上面Factory的一个对象。这里只保留了透明型的。
@SuppressWarnings("rawtypes")
public static StateFactory opaque() {Options<OpaqueValue> options = new Options<OpaqueValue>();return opaque(options);
}@SuppressWarnings("rawtypes")
public static StateFactory opaque(Options<OpaqueValue> opts) {return new Factory(StateType.OPAQUE, opts);
}
5、multiGet
根据一个List<List<Object>> keys
列表获取到一个返回值的列表List。注意key本身也是一个List<Object>
。
代码主要是三部分:
(1)创建List<Get> gets
List<Get> gets = new ArrayList<Get>();for(List<Object> key : keys){byte[] hbaseKey = this.options.mapMapper.rowKey(key);String qualifier = this.options.mapMapper.qualifier(key);LOG.info("Partition: {}, GET: {}", this.partitionNum, new String(hbaseKey));Get get = new Get(hbaseKey);get.addColumn(this.options.columnFamily.getBytes(), qualifier.getBytes());gets.add(get);}
(2)查询hbase:根据gets获取Result[]
List<T> retval = new ArrayList<T>();Result[] results = this.table.get(gets);
(3)将results封装成一个List<T> retval
并返回
for (int i = 0; i < keys.size(); i++) {String qualifier = this.options.mapMapper.qualifier(keys.get(i));Result result = results[i];byte[] value = result.getValue(this.options.columnFamily.getBytes(), qualifier.getBytes());if(value != null) {retval.add(this.serializer.deserialize(value));} else {retval.add(null);}}
return retval;
当返回值为空时,则加上null。
6、multiPut
它将一个List<List<Object>> keys, List<T> values
的数据写入hbase,注意keys.size()与values.size()必须相等。
List<Put> puts = new ArrayList<Put>(keys.size());for (int i = 0; i < keys.size(); i++) {byte[] hbaseKey = this.options.mapMapper.rowKey(keys.get(i));String qualifier = this.options.mapMapper.qualifier(keys.get(i));LOG.info("Partiton: {}, Key: {}, Value: {}", new Object[]{this.partitionNum, new String(hbaseKey), new String(this.serializer.serialize(values.get(i)))});Put put = new Put(hbaseKey);T val = values.get(i);put.add(this.options.columnFamily.getBytes(),qualifier.getBytes(),this.serializer.serialize(val));puts.add(put);this.table.put(puts);
7、序列化器
序列化器指定了以何种格式将数据写入hbase(序列化),以及取出数据后如何进行解释(反序列化),即关键是serialize()与deserialize()这2个方法。
storm默认提供了json的实现,以Transactional为例:
public class JSONTransactionalSerializer implements Serializer<TransactionalValue>
它的内部只有2个方法:
@Override
public byte[] serialize(TransactionalValue obj) {List toSer = new ArrayList(2);toSer.add(obj.getTxid());toSer.add(obj.getVal());try {return JSONValue.toJSONString(toSer).getBytes("UTF-8");} catch (UnsupportedEncodingException e) {throw new RuntimeException(e);}
}
它将一个TransactionalValue转化为json格式,TransactionalValue只有2个变量,是一个典型的bean:
T val;
Long txid;
而另一个方法deserialize()则刚好相反,它将一个json格式字节流解释为一个TransactionalValue对象:
@Override
public TransactionalValue deserialize(byte[] b) {try {String s = new String(b, "UTF-8");List deser = (List) JSONValue.parse(s);return new TransactionalValue((Long) deser.get(0), deser.get(1));} catch (UnsupportedEncodingException e) {throw new RuntimeException(e);}
}
(三)MapState框架
//TODO:补充各个类的关系图,参考P323
上述介绍了用户如何通过实现IBackingMap接口来创建自己的MapState实现,这里我们将介绍MapState框架是如何调用用户写的mutliGet()和multiPut方法的。
* 另外,如果上述实现iBackingMap的方法不能满足你的要求,你可以实现自己的MapState框架,按照这里介绍的方法即可 *
我们主要以Transactional为例,再简单介绍一下NonTransactional和Opaque的情形。在上面的Factory.makeState()方法中:
IBackingMap state = new HBaseMapState(options, conf, partitionIndex);
mapState = TransactionalMap.build(state);
state就是用户代码定义的MapState实现,此此处是HBaseMapState。我们下面看一下TransactionalMap是如何调用HBaseMapState的mutliGet()和multiPut方法的。
1、build()方法
我们从build方法开始,因为这是用户创建MapState所调用的API。
public static <T> MapState<T> build(IBackingMap<TransactionalValue> backing) {return new TransactionalMap<T>(backing);
}
它使用用户定义的IBackingMap对象创建一个MapState对象,主要通过构造方法来实现。
2、构造方法
protected TransactionalMap(IBackingMap<TransactionalValue> backing) {_backing = new CachedBatchReadsMap(backing);
}
3、beginCommit()
@Override
public void beginCommit(Long txid) {_currTx = txid;_backing.reset();
}
当开始处理一个事务时,设置当前正在处理的txid,reset()是CachedBatchReadsMap类中清空缓存的方法。
TODO: CachedBatchReadsMap分析
4、commit()
@Override
public void commit(Long txid) {_currTx = null;_backing.reset();
}
当一个事务处理完成后,将txid设置为null。
5、multiGet
@Override
public List<T> multiGet(List<List<Object>> keys) {List<CachedBatchReadsMap.RetVal<TransactionalValue>> vals = _backing.multiGet(keys);List<T> ret = new ArrayList<T>(vals.size());for(CachedBatchReadsMap.RetVal<TransactionalValue> retval: vals) {TransactionalValue v = retval.val;if(v!=null) {ret.add((T) v.getVal());} else {ret.add(null);}}return ret;
}
通过调用用户的_backing.multiGet(keys)来实现具体逻辑,作了一些类型转换。
6、multiPut()
@Override
public void multiPut(List<List<Object>> keys, List<T> vals) {List<TransactionalValue> newVals = new ArrayList<TransactionalValue>(vals.size());for(T val: vals) {newVals.add(new TransactionalValue<T>(_currTx, val));}_backing.multiPut(keys, newVals);
}
同样只是调用用户定位的multiPut()。
7、multiUpdate()
核心的逻辑在于这几行:
if(val==null) {newVal = new TransactionalValue<T>(_currTx, updater.update(null));changed = true;} else {if(_currTx!=null && _currTx.equals(val.getTxid()) && !retval.cached) {newVal = val;} else {newVal = new TransactionalValue<T>(_currTx, updater.update(val.getVal()));changed = true;}}ret.add(newVal.getVal());if(changed) {newVals.add(newVal);newKeys.add(keys.get(i));}}
在这之前,先把数据get出来,然后判断:
- 如果key对应的value为空,则changed为true
- 如果key对应的value不为空,而且当前的txid与value中的txid相同,则changed保持为false。
- 如果key对应的value不为空,但当前的txid与value中的txid不同,则changed为true。
这部分逻辑就是Transactional, NonTransactional和Opaque的差别。
NonTransactional不会判断txid,只要来一批就更新一次。
Opaque基于之前的值作更新。
(四)storm如何调用MapState的代码
根据前面的分析,用户在拓扑定义中通过以下类似的代码来指定state:
topology.newStream("wordsplit", spout).shuffle().each(new Fields("sentence"), new WordSplit(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(HBaseMapState.transactional(option), new Count(), new Fields("aggregates_words")).parallelismHint(1);
主要看第3、4行,先对数据根据”word”这个流进行分组,然后再调用persistentAggregate()方法。再简单解释一下这个方法,3个参数分别为:
* 返回一个StateFactory对象,它有一个makeState()方法,返回一个State对象。这个state对象就是用户定义的MapState,主要定义了如何从state中读写数据。
* 第二个参数表示如何对取出的数据进行什么操作,这里使用的是Count,如是其它类,如Sum,则多一个参数:
persistentAggregate(HBaseMapState.transactional(option), new Fields("cash"), new Sum(), new Fields("state")).parallelismHint(1);
* 发送的消息流。
好,我们下面开始分析GroupedStream#persistentAggregate()做了什么东西。
1、GroupedStream类
public TridentState persistentAggregate(StateFactory stateFactory, CombinerAggregator agg, Fields functionFields) {return persistentAggregate(new StateSpec(stateFactory), agg, functionFields);
}public TridentState persistentAggregate(StateSpec spec, CombinerAggregator agg, Fields functionFields) {return persistentAggregate(spec, null, agg, functionFields);
}public TridentState persistentAggregate(StateFactory stateFactory, Fields inputFields, CombinerAggregator agg, Fields functionFields) {return persistentAggregate(new StateSpec(stateFactory), inputFields, agg, functionFields);
}
很简单的代码逻辑,先使用StateFactory对象创建一个StateSpec对象,然后继续调用,从第3个方法可以看出,这里还有一个参数是表示inputFields,即输入的field,即对哪个field执行CombinerAggregator的操作。StateSpec类的定义非常简单:
public class StateSpec implements Serializable {public StateFactory stateFactory;public Integer requiredNumPartitions = null;public StateSpec(StateFactory stateFactory) {this.stateFactory = stateFactory;}
}
最终真正调用的方法是这个:
public TridentState persistentAggregate(StateSpec spec, Fields inputFields, CombinerAggregator agg, Fields functionFields) {return aggregate(inputFields, agg, functionFields).partitionPersist(spec,TridentUtils.fieldsUnion(_groupFields, functionFields),new MapCombinerAggStateUpdater(agg, _groupFields, functionFields),TridentUtils.fieldsConcat(_groupFields, functionFields));
}
这个方法主要分成2个步骤
* 第一个是调用aggregate()方法,主要如何对数据进行操作。这部分我们以后再分析,反正把它理解为一个数据的更新就好了。
* 第二个是调用partitionPersist()方法,如何将数据写入state。
public TridentState partitionPersist(StateSpec stateSpec, Fields inputFields, StateUpdater updater, Fields functionFields) {projectionValidation(inputFields);String id = _topology.getUniqueStateId();ProcessorNode n = new ProcessorNode(_topology.getUniqueStreamId(),_name,functionFields,functionFields,new PartitionPersistProcessor(id, inputFields, updater));n.committer = true;n.stateInfo = new NodeStateInfo(id, stateSpec);return _topology.addSourcedStateNode(this, n);}
构建一个ProcessorNode,然后将它添加进_topology中。
trident State应用指南相关推荐
- Trident State译文
Trident State 译文 Trident针对状态化的数据源的读写进行了一流的分装.State可以包含在拓扑中-例如,保存在内存中,有HDFS提供备份-也可以保存在一个外部的数据库中,像Memc ...
- storm mysql trident_storm trident实战 trident state
一.认识storm trident trident可以理解为storm批处理的高级抽象,提供了分组.分区.聚合.函数等操作,提供一致性和恰好一次处理的语义. 1)元祖被作为batch处理 2)每个ba ...
- Trident state
State in Trident Trident有对有状态数据源的抽象.state要么在topology内部如内存和HDFS中, 或外部存储于数据库如 Memcached或Cassandr.在Trid ...
- Flink Broadcast State实用指南
从1.5.0开始,Flink提供了一种新的State类型,称为Broadcast State.在这篇文章中,我们将解释什么是Broadcast State,并展示如何将其应用于评估事件流上的动态模式的 ...
- trident API指南
trident API指南 @(STORM)[storm] trident API指南 零 概述 1 本地分区操作 2 重新分区操作 3 聚合操作 4 流分组操作 5合并与连接 一 本地分区操作 一 ...
- [Trident] Storm Trident 教程,state详解、trident api详解及实例
英文原址:https://github.com/nathanmarz/storm/wiki/Trident-tutorial ---------------- Trident是在storm基础上,一个 ...
- Trident API 概览
Trident API 概览 在网上看到了很多有TRIDENT相关API的翻译,看来看去,总觉得没有说清楚很多东西,所以自己结合使用的经验翻译了一篇出来:翻译完以后,也发现 在自己的翻译中也有很多地方 ...
- Apache Storm 官方文档 —— Trident API 概述
转载自并发编程网 – ifeve.com本文链接地址: Apache Storm 官方文档 -- Trident API 概述 窗口部分的内容是我自己翻译的 Trident 的核心数据模型是" ...
- storm trident mysql_Trident-MySQL
使用事物TridentTopology 持久化数据到MySQL1.构建拓扑JDBCTopology类packagestorm.trident.mysql;importjava.util.Arrays; ...
最新文章
- HDU2019 数列有序
- 图像处理——Edge Boxes边缘检测
- c语言编程中%g是什么格式
- 从文本分类来看图卷积神经网络
- css怎么把横向菜单变纵向_CSS 布局模式 + 居中布局
- WPF的ComboBox 数据模板自定义
- 数据运营小白如何搭建“初期用户生命周期体系”?
- vue监听用户在页面的浏览时间需在beforeDestroy()里面进行销毁
- mysql 字符串出现问题_MYSQL 中字符串函数 归纳总结
- 函数嵌套和nonlocal声明
- Unix操作系统发展历史
- 程序员计算器HEX、EDC、OCT的意思
- 计算机网络的创新创业计划书,互联网创新创业计划书.doc
- python txt追加写入_python 实现在txt指定行追加文本的方法
- spring使用之旅 ---- bean的装配
- excel区别奇偶行(删除、过滤)
- Feign报错java.lang.NoSuchFieldError: MULTIPART_RELATED
- Kaldi声学模型训练
- PotPlayer不支持S/W HEVC(H.265)解码怎么办?一招解决所有的不支持解码
- Photoshop 2018 学习笔记 目录
热门文章
- 【自定义排序规则】剑指 Offer 45. 把数组排成最小的数
- 【双100%解法】剑指 Offer 24. 反转链表
- 【已解决】Exception in thread “Thread-0“ redis.clients.jedis.exceptions.JedisConnectionException: java.n
- 43行代码AC_HDU-2604 Queuing(矩阵快速幂,附详细的知识讲解、模板例题)
- 实验详解——DNS反向解析、DNS主服务器和从服务器的配置
- python日志模块_Python之日志处理(logging模块)
- Vmware中mac snow leopard蘋果雪豹系統驅動程式安裝方法
- mysql 优化器不准_mysql 优化器有哪些可选开关
- mongodb mysql 写_MongoDB与MySQL关于写确认的异同
- 接受map_[译] 图解 Map、Reduce 和 Filter 数组方法