trident State应用指南

@(STORM)[storm, 大数据]

  • trident State应用指南
  • 一State基础示例
    • 1主类
    • 2Aggregator的用法
      • 1Aggregator接口
      • 2init方法
      • 3aggregate方法
      • 4complete方法
    • 3state的用法
      • 1拓扑定义
      • 2工厂类NameSumStateFactory
      • 3更新类NameSumUpdater
      • 4状态类NameSumState
    • 4state应用步骤总结
    • 5state应用的一些注意事项
    • 6state与MapState的差异
  • 二MapState
    • 1persistentAggregate
    • 2MapStates
    • 3Demo
      • 1创建一个实现IBackingMap的类实现multiGet和multiPut方法
      • 2创建实现StateFactory的类
      • 3定义Count函数
      • 4在拓扑中写入state或者查询state
    • 4关于MapState的总结
      • 1基本步骤
      • 2全流程逻辑
      • 3复杂的情况
      • 4其它思考
    • 5MapState读写mysql示例
      • 1MysqlMapStateFactory
      • 2MysqlMapStateBacking
  • 三以HBaseMapState为例分析MapState代码调用全过程
    • 零概述 MapState被调用的全流程代码

      • 1调用过程
      • TOTO按着这个流程把代码重头读一遍先了解ITridentBatchBolt
      • 2内容概述
    • 一如何使用MapState
    • 二如何实现一个MapStateHBaseMapState源码分析
      • 1Option内部类
      • 2Factory内部类
        • 1构造函数
      • 2makeState方法
      • 3构造函数
      • 4返回StateFactory的方法
      • 5multiGet
      • 6multiPut
      • 7序列化器
    • 三MapState框架
      • TODO补充各个类的关系图参考P323
      • 1build方法
      • 2构造方法
      • 3beginCommit
      • TODO CachedBatchReadsMap分析
      • 4commit
      • 5multiGet
      • 6multiPut
      • 7multiUpdate
    • 四storm如何调用MapState的代码
      • 1GroupedStream类

Trident及State的原理请见另一篇文章:http://blog.csdn.net/lujinhong2/article/details/47132305

简单总结:
1、最简单的情况使用IBacking的逻辑,很容易实现k-v格式的state。
2、如果IBacking不够灵活(不能取得txid,不是kv而是多列的格式),则直接实现MapState的接口。
3、最复杂的是使用State接口,最灵活,但真有必要吗?

第一二种方法比较:persistenceAggregate 第一个参数关键定义了如何去更新state(如mysql中的内容),比如先取出数据,更新txid,再写回去之类的,而第二个参数定义了以什么逻辑去更新数据,如求和、计算、还是平均之类的。 因此,反正第一个参数都只是返回一个MapState对象,那使用IBacking接口还是直接使用MapState接口都可以了,只是前者作了一些txid逻辑的封装,对应于几种state的类型,因此使用方便了一点,便事实上,它的代码是很简单的,它就是通过判断txid的关系来定义了update是如何使用get和put的,所以,可以直接实现MapState接口的update方法即可。

一、State基础示例

trident通过spout的事务性与state的事务处理,保证了恰好一次的语义。这里介绍了如何使用state。

完整代码请见 https://github.com/lujinhong/tridentdemo

1、主类

主类定义了拓扑的整体逻辑,这个拓扑通过一个固定的spout循环产生数据,然后统计消息中每个名字出现的次数。

拓扑中先将消息中的内容提取出来成name, age, title, tel4个field,然后通过project只保留name字段供统计,接着按照name分区后,为每个分区进行聚合,最后将聚合结果通过state写入map中。

    storm.trident.Stream Origin_Stream = topology.newStream("tridentStateDemoId", spout).parallelismHint(3).shuffle().parallelismHint(3).each(new Fields("msg"), new Splitfield(),new Fields("name", "age", "title", "tel")).parallelismHint(3).project(new Fields("name")) //其实没什么必要,上面就不需要发射BCD字段,但可以示范一下project的用法.parallelismHint(3).partitionBy(new Fields("name"));   //根据name的值作分区Origin_Stream.partitionAggregate(new Fields("name"), new NameCountAggregator(),new Fields("nameSumKey", "nameSumValue")).partitionPersist(new NameSumStateFactory(), new Fields("nameSumKey", "nameSumValue"),new NameSumUpdater());

2、Aggregator的用法

这里涉及了一些trident常用的API,但project等相对容易理解,这里只介绍partitionAggregate的用法。

再看看上面代码中对partitionAggregate的使用:

Origin_Stream.partitionAggregate(new Fields("name"), new NameCountAggregator(),new Fields("nameSumKey", "nameSumValue"))

第一,三个参数分别表示输入流的名称与输出流的名称。中间的NameCountAggregator是一个Aggregator的对象,它定义了如何对输入流进行聚合。我们看一下它的代码:

public class NameCountAggregator implements Aggregator<Map<String, Integer>> {private static final long serialVersionUID = -5141558506999420908L;@Overridepublic Map<String, Integer> init(Object batchId,TridentCollector collector) {return new HashMap<String, Integer>();}//判断某个名字是否已经存在于map中,若无,则put,若有,则递增@Overridepublic void aggregate(Map<String, Integer> map,TridentTuple tuple, TridentCollector collector) {String key=tuple.getString(0);if(map.containsKey(key)){Integer tmp=map.get(key);map.put(key, ++tmp);}else{map.put(key, 1);}}//将聚合后的结果emit出去@Overridepublic void complete(Map<String, Integer> map,TridentCollector collector) {if (map.size() > 0) {for(Entry<String, Integer> entry : map.entrySet()){System.out.println("Thread.id="+Thread.currentThread().getId()+"|"+entry.getKey()+"|"+entry.getValue());collector.emit(new Values(entry.getKey(),entry.getValue()));}map.clear();} }@Overridepublic void prepare(Map conf, TridentOperationContext context) {}@Overridepublic void cleanup() {}}

(1)Aggregator接口

它实现了Aggregator接口,这个接口有3个方法:

public interface Aggregator<T> extends Operation {T init(Object batchId, TridentCollector collector);void aggregate(T val, TridentTuple tuple, TridentCollector collector);void complete(T val, TridentCollector collector);
}

init方法:在处理batch之前被调用。init的返回值是一个表示聚合状态的对象,该对象会被传递到aggregate和complete方法。
aggregate方法:为每个在batch分区的输入元组所调用,更新状态
complete方法:当batch分区的所有元组已经被aggregate方法处理完后被调用。

除了实现Aggregator接口,还可以实现ReducerAggregator或者CombinerAggregator,它们使用更方便。详见《从零开始学storm》或者官方文档
https://storm.apache.org/documentation/Trident-API-Overview.html

下面我们看一下这3个方法的实现。

(2)init方法

@Override
public Map<String, Integer> init(Object batchId,TridentCollector collector) {return new HashMap<String, Integer>();
}

仅初始化了一个HashMap对象,这个对象会作为参数传给aggregate和complete方法。对一个batch只执行一次。

(3)aggregate方法

aggregate方法对于batch内的每一个tuple均执行一次。这里将这个batch内的名字出现的次数放到init方法所初始化的map中。

@Override
public void aggregate(Map<String, Integer> map,TridentTuple tuple, TridentCollector collector) {String key=tuple.getString(0);if(map.containsKey(key)){Integer tmp=map.get(key);map.put(key, ++tmp);}else{map.put(key, 1);}
}

(4)complete方法

这里在complete将aggregate处理完的结果发送出去,实际上可以在任何地方emit,比如在aggregate里面。
这个方法对于一个batch也只执行一次。

@Override
public void complete(Map<String, Integer> map,TridentCollector collector) {if (map.size() > 0) {for(Entry<String, Integer> entry : map.entrySet()){System.out.println("Thread.id="+Thread.currentThread().getId()+"|"+entry.getKey()+"|"+entry.getValue());collector.emit(new Values(entry.getKey(),entry.getValue()));}map.clear();}
}

3、state的用法

(1)拓扑定义

先看一下主类中如何将结果写入state:

partitionPersist(new NameSumStateFactory(), new Fields("nameSumKey", "nameSumValue"),new NameSumUpdater());

它的定义为:

TridentState storm.trident.Stream.partitionPersist(StateFactory stateFactory, Fields inputFields, StateUpdater updater)

其中的第二个参数比较容易理解,就是输入流的名称,这里是名字与它出现的个数。下面先看一下Facotry。

(2)工厂类:NameSumStateFactory

很简单,它实现了StateFactory,只有一个方法makeState,返回一个State类型的对象。

public class NameSumStateFactory implements StateFactory {private static final long serialVersionUID = 8753337648320982637L;@Overridepublic State makeState(Map arg0, IMetricsContext arg1, int arg2, int arg3) {return new NameSumState();  }
}

(3)更新类:NameSumUpdater

这个类继承自BaseStateUpdater,它的updateState对batch的内容进行处理,这里是将batch的内容放到一个map中,然后调用setBulk方法

public class NameSumUpdater extends BaseStateUpdater<NameSumState> {private static final long serialVersionUID = -6108745529419385248L;public void updateState(NameSumState state, List<TridentTuple> tuples, TridentCollector collector) {Map<String,Integer> map=new HashMap<String,Integer>();for(TridentTuple t: tuples) {map.put(t.getString(0), t.getInteger(1));}state.setBulk(map);}
}

(4)状态类:NameSumState

这是state最核心的类,它实现了大部分的逻辑。NameSumState实现了State接口:

public interface State {void beginCommit(Long txid); void commit(Long txid);
}

分别在提交之前与提交成功的时候调用,在这里只打印了一些信息。

另外NameSumState还定义了如何处理NameSumUpdater传递的消息:

public void setBulk(Map<String, Integer> map) {// 将新到的tuple累加至map中for (Entry<String, Integer> entry : map.entrySet()) {String key = entry.getKey();if (this.map.containsKey(key)) {this.map.put(key, this.map.get(key) + map.get(key));} else {this.map.put(key, entry.getValue());}}System.out.println("-------");// 将map中的当前状态打印出来。for (Entry<String, Integer> entry : this.map.entrySet()) {String Key = entry.getKey();Integer Value = entry.getValue();System.out.println(Key + "|" + Value);}
}

即将NameSumUpdater传送过来的内容写入一个HashMap中,并打印出来。
此处将state记录在一个HashMap中,如果需要记录在其它地方,如mysql,则使用jdbc写入mysql代替下面的map操作即可。

事实上,这个操作不一定要在state中执行,可以在任何类中,但建议还是在state类中实现。

4、state应用步骤总结

partitionPersist(new NameSumStateFactory(), new Fields("nameSumKey", "nameSumValue"),new NameSumUpdater());

state的应用步骤相当简单,原理也很简单:
(1)NameSumStateFactory()指定了将结果保存在哪里,如本例中的hashset,还可以是mysql/hbase等。当然还有更新逻辑,
(2)NameSumUpdater()指定了更新state的逻辑,如将当前数据和原有数据相加等。

5、state应用的一些注意事项

(1)使用state,你不再需要比较事务id,在数据库中同时写入多个值等内容,而是专注于你的逻辑实现
(2)除了实现State接口,更常用的是实现MapState接口,下次补充。
(3)在拓扑中指定了StateFactory,这个工厂类找到相应的State类。而Updater则每个批次均会调用它的方法。State中则定义了如何保存数据,这里将数据保存在内存中的一个HashMap,还可以保存在mysql, hbase等等。
(4)trident会自动比较txid的值,如果和当前一样,则不更改状态,如果是当前txid的下一个值,则更新状态。这种逻辑不需要用户处理。
(5)如果需要实现透明事务状态,则需要保存当前值与上一个值,在update的时候2个要同时处理。即逻辑由自己实现。在本例子中,大致思路是在NameSumState中创建2个HashMap,分别对应当前与上一个状态的值,而NameSumUpdater每次更新这2个Map。

6、state与MapState的差异

(1)由上面可以看出,state需要自己指定如何更新数据

if (this.map.containsKey(key)) {this.map.put(key, this.map.get(key) + map.get(key));} else {this.map.put(key, entry.getValue());}
}

这里是将原有的值,加上新到的值。而MapState会根据你选择的类型(Transactional, Opaque, NonTransactional)定义好逻辑,只要定义如果向state中读写数据即可。
(2)MapState将State的aggreate与persistent 2部分操作合在一起了,由方法名也可以看出。在State中最后2步是partitionAggregate()partitionPersistent(),而在MapState中最后1步是persistentAggregate()
事实上,查看persistentAggregate()的实现,它最终也是分成aggregate和persistent 2个步骤的。

public TridentState persistentAggregate(StateSpec spec, Fields inputFields, CombinerAggregator agg, Fields functionFields) {return aggregate(inputFields, agg, functionFields).partitionPersist(spec,TridentUtils.fieldsUnion(_groupFields, functionFields),new MapCombinerAggStateUpdater(agg, _groupFields, functionFields),TridentUtils.fieldsConcat(_groupFields, functionFields));
}

二、MapState

1、persistentAggregate

Trident有另外一种更新State的方法叫做persistentAggregate。如下:

TridentTopology topology = new TridentTopology();
TridentState wordCounts =  topology.newStream("spout1", spout)  .each(new Fields("sentence"), new Split(), new Fields("word"))  .groupBy(new Fields("word"))  .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))

persistentAggregate是在partitionPersist之上的另外一层抽象。它知道怎么去使用一个Trident 聚合器来更新State。在这个例子当中,因为这是一个group好的stream,Trident会期待你提供的state是实现了MapState接口的。用来进行group的字段会以key的形式存在于State当中,聚合后的结果会以value的形式存储在State当中。MapState接口看上去如下所示:

public interface MapState<T> extends State {  List<T> multiGet(List<List<Object>> keys);  List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters);  void multiPut(List<List<Object>> keys, List<T> vals);
}

当你在一个未经过group的stream上面进行聚合的话,Trident会期待你的state实现Snapshottable接口:

public interface Snapshottable<T> extends State {  T get();  T update(ValueUpdater updater);  void set(T o);
}

MemoryMapState 和 MemcachedState 都实现了上面的2个接口。

2、MapStates

在Trident中实现MapState是非常简单的,它几乎帮你做了所有的事情。OpaqueMap, TransactionalMap, 和 NonTransactionalMap 类实现了所有相关的逻辑,包括容错的逻辑。你只需要将一个IBackingMap 的实现提供给这些类就可以了。IBackingMap接口看上去如下所示:

public interface IBackingMap<T> {  List<T> multiGet(List<List<Object>> keys);   void multiPut(List<List<Object>> keys, List<T> vals);
}

OpaqueMap’s会用OpaqueValue的value来调用multiPut方法,TransactionalMap’s会提供TransactionalValue中的value,而NonTransactionalMaps只是简单的把从Topology获取的object传递给multiPut。

Trident还提供了一种CachedMap类来进行自动的LRU cache。

另外,Trident 提供了 SnapshottableMap 类将一个MapState 转换成一个 Snapshottable 对象.

大家可以看看 MemcachedState的实现,从而学习一下怎样将这些工具组合在一起形成一个高性能的MapState实现。MemcachedState是允许大家选择使用opaque transactional, transactional, 还是 non-transactional 语义的。

实现一个MapState,可以实现IBackingMap接口(mutliGet()/multiPut),并且实现StateFactory接口(makeState()),返回一个State对象,这是常见的用法
* 但如果有一引起高级需求,可以直接实现MapState接口,这样可以覆盖一些如beginCommit(Long txid);commit(Long txid);这些方法,还有multiUpdate()。*

3、Demo

完整代码请见 https://github.com/lujinhong/tridentdemo

  • 更详细的可以参考trident-memcached(很全面,但较旧)
    https://github.com/nathanmarz/trident-memcached
  • 或者storm-hbase的State实现等

在Trident中实现MapState是非常简单的,它和单纯的State不同点在于:OpaqueMap, TransactionalMap 和 NonTransactionalMap会实现相关的容错逻辑,只需为这些类提供一个IBackingMap接口实现,调用multiGet和multiPut方法访问各自的K/V值。

public interface IBackingMap<T> {List<T> multiGet(List<List<Object>> keys); void multiPut(List<List<Object>> keys, List<T> vals);
}

详细的步骤如下:

(1)创建一个实现IBackingMap的类,实现multiGet和multiPut方法

主要实现multiGet和multiPut的方法,实现如何从state中读写数据。
multiGet 的参数是一个List,可以根据key来查询数据,key本身也是一个List,以方便多个值组合成key的情形。
multiPut的参数是一个List类型的keys和一个List类型的values,它们的size应该是相等的,把这些值写入state中。

public class MemoryMapStateBacking<T> implements IBackingMap<T> {Map<List<Object>, T> db = new HashMap<List<Object>, T>();@Overridepublic List<T> multiGet(List<List<Object>> keys) {List<T> ret = new ArrayList();for (List<Object> key : keys) {ret.add(db.get(key));}return ret;}@Overridepublic void multiPut(List<List<Object>> keys, List<T> vals) {for (int i = 0; i < keys.size(); i++) {List<Object> key = keys.get(i);T val = vals.get(i);db.put(key, val);}}
}

这里将k/v写入了一个HashMap中,如果需要写入mysql,则只需要使用jdbc,把db.put改为写入mysql即可,查询类似。

(2)创建实现StateFactory的类

public class MemoryMapStateFacotry implements StateFactory{@Overridepublic State makeState(Map conf, IMetricsContext metrics,int partitionIndex, int numPartitions) {return TransactionalMap.build((IBackingMap<TransactionalValue>) new MemoryMapStateBacking());}}

很简单,就返回一个实现了MapState接口的类对象,通过把上面定义的MemoryMapStateBacking对象传入TransactionalMap.build作参数即可。当然还可以使用:

NonTransactionalMap.build(state);b
OpaqueMap.build(state);

(3)定义Count函数

用于说明如果将新来的数据与原来state中的数据组合更新。这里使用了storm提供的一个工具类,它将新来到的值与原有的值相加。

public class Count implements CombinerAggregator<Long> {@Overridepublic Long init(TridentTuple tuple) {return 1L;}@Overridepublic Long combine(Long val1, Long val2) {return val1 + val2;}@Overridepublic Long zero() {return 0L;}}

(4)在拓扑中写入state,或者查询state

    //这个流程用于统计单词数据,结果将被保存在wordCounts中TridentState wordCounts =topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"), new Split(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(new MemoryMapStateFacotry(), new Count(),new Fields("count")).parallelismHint(16);//这个流程用于查询上面的统计结果topology.newDRPCStream("words", drpc).each(new Fields("args"), new Split(), new Fields("word")).groupBy(new Fields("word")).stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")).each(new Fields("count"), new FilterNull()).aggregate(new Fields("count"), new Sum(), new Fields("sum"));return topology.build();

4、关于MapState的总结

(1)基本步骤

(1)创建一个实现IBackingMap的类,实现multiGet和multiPut方法

(2)创建实现StateFactory的类,它的makeState返回一个实现了MapState接口的对象,可以通过:

 mapState = TransactionalMap.build(_iBacking);

其中_iBacking就是第一步实现类的对象。当然还可以使用

 mapState = NonTransactionalMap.build(state);mapState = OpaqueMap.build(state);

TransactionalMap,OpaqueMap, NonTransactionalMap已经通过判断txid的值实现了相应的事务逻辑,以TransactionalMap为例,它的源码中会判断batch中的txid与state中已经存储的是否相同,或者同的话则新值等于旧值即可:
if(_currTx!=null && _currTx.equals(val.getTxid()) && !retval.cached)
(3)在拓扑中使用persistentAggregate写入state

(2)全流程逻辑

以事务型状态为例,我们看一下整个存储过程的逻辑:
* 首先,persistentAggregate收到一批数据,它的第一个参数返回的是事务型的MapState
* 然后,TransactionalMap在multiUpdate中会判断这个事务的txid与当前state中的txid是否一致。
* 如果txid一致的话,则保持原来的值即可,如果txid不一致,则更新数值。
* 如果更新数据呢?它是拿新来的值和state中的原有的值,使用persistentAggregate中第2个参数定义的类方法作聚合计算。

* 第一个参数关键定义了如何去更新state(如mysql中的内容),比如先取出数据,更新txid,再写回去之类的,而第二个参数定义了以什么逻辑去更新数据,如求和、计算、还是平均之类的。* 因此,反正第一个参数都只是返回一个MapState对象,那使用IBacking接口还是直接使用MapState接口都可以了,只是前者作了一些txid逻辑的封装,对应于几种state的类型,因此使用方便了一点,便事实上,它的代码是很简单的,它就是通过判断txid的关系来定义了update是如何使用get和put的,所以,可以直接实现MapState接口的update方法即可。

persistentAggregate的第2个参数定义了数据是如何更新的,而IBackingMap中的multiGet和multiPut只定义了如何向state中存取数据。
比如此处的Count,它会将将2个数据相加:

@Override
public Long combine(Long val1, Long val2) {return val1 + val2;
}

因此新来的统计次数与原有的统计次数加起来即是新的总和。

而对于透明事务状态,不管txid是否一致,都需要修改state,同时将当前state保存一下,成为preState。非事务型就简单了,不管你来什么,我都直接更新。

(3)复杂的情况

当然,如果觉得TransactionalMap,OpaqueMap, NonTransactionalMap不能满足业务需求,则可以自定义一个实现了MapState接口的类,而不是直接使用它们。

反正这三个类的实现逻辑非常简单,当不能满足业务需要时,看一下源码,然后参考它创建自己的类即可,此时,关键是multiUpdate的实现。

(4)其它思考

key可以是一个很复杂的List,包括多个字段。

5、MapState读写mysql示例

(1)MysqlMapStateFactory

public class MysqlMapStateFactory<T> implements StateFactory {private static final long serialVersionUID = 1987523234141L;@Overridepublic State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {return  TransactionalMap.build((IBackingMap<TransactionalValue>) new MysqlMapStateBacking());}
}

很简单,就一行,返回一个IBacking对象。这里使用的Transactioal,当然还可以使用NonTransactional和Opaque。

(2)MysqlMapStateBacking

最核心的还是multiGet()和multiPut:

    @Overridepublic List<TransactionalValue> multiGet(List<List<Object>> keys) {if (stmt == null) {stmt = getStatment();}List<TransactionalValue> values = new ArrayList<TransactionalValue>();for (List<Object> key : keys) {String sql = "SELECT req_count FROM edt_analysis where id='" + key.get(0) + "'";LOG.debug("============sql: " + sql);try (ResultSet rs = stmt.executeQuery(sql)) {if (rs.next()) {LOG.info("Get value:{} by key:{}", rs.getObject(1), key);values.add(derialize(rs.getObject(1)));} else {values.add(null);}} catch (SQLException e) {e.printStackTrace();}}return values;}@Overridepublic void multiPut(List<List<Object>> keys, List<TransactionalValue> vals) {if (stmt == null) {stmt = getStatment();}for (int i = 0; i < keys.size(); i++) {String sql = "replace into edt_analysis values('" + keys.get(i).get(0) + "','" + serialize(vals.get(i))+ "')";LOG.debug("===================put sql " + sql);try {stmt.execute(sql);} catch (SQLException e) {e.printStackTrace();}}}

但mysql与redis之类的不同,它需要将一个TransactionalValue对象转换为mysql中的一行数据,同理,需要将mysql中的一行数据转换为一个TransactionalValue对象:

// 将数据库中的varchar转换为TransactionalValue对象
private TransactionalValue derialize(Object object) {String value[] = object.toString().split(",");return new TransactionalValue(Long.parseLong(value[0]), Long.parseLong(value[1]));
}// 将TransactionalValue转换为String
private String serialize(TransactionalValue transactionalValue) {return transactionalValue.getTxid() + "," + transactionalValue.getVal();
}

这是使用了最简单的方式,只有2列,一行是key,一列是value,value中保存了txid及真实的value,之间以逗号分隔。

三、以HBaseMapState为例分析MapState代码调用全过程

(零)概述 & MapState被调用的全流程代码

1、调用过程

(1)SubtopologyBolt implements ITridentBatchBolt这个bolt在完成一个batch的处理后会调用finishBatch(BatchInfo batchInfo)
(2)然后调用PartitionPersistProcessor implements TridentProcessor这个处理器的finishBatch(ProcessorContext processorContext)
(3)接着调用MapCombinerAggStateUpdater implements StateUpdater<MapState>updateState(MapState map, List<TridentTuple> tuples, TridentCollector collector)
(4)再接着调用TransactionalMap<T> implements MapState<T>multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters)
(5)最后就是调用用户定义的MapState类(如HBaseMapState)的multiGet()和multiPut()方法了。

TOTO:按着这个流程把代码重头读一遍,先了解ITridentBatchBolt。

简单的说就是一个blot被处理完后,会调用finishBatch()方法,然后这个方法会调用MapState()框架的updateState(),接着调用mutliUpdate(),最后调用用户定义的multiGet()和multiPut()。

2、内容概述

本部分我们将以HBaseMapState为例,介绍使用MapState保证数据完整唯一的全流程代码调用,主要分成这几个部分:
(1)我们先介绍用户如何在构建代码中使用这个MapState
(2)然后介绍HBaseMapState的源代码,这也是用户需要实现一个MapState的基本方法。
(3)接着介绍MapState框架如何调用用户定义的代码形成事务性。
(4)最后介绍storm的内部机制,如何调用MapState。
这也是用户如何要看源码的逐步深入的过程。

(一)如何使用MapState

详细DEMO请见:https://github.com/lujinhong/stormhbasedemo

1、指定一些配置

    HBaseMapState.Options option = new HBaseMapState.Options();option.tableName = "ljhtest2";option.columnFamily = "f1";option.mapMapper = new SimpleTridentHBaseMapMapper("ms");

SimpleTridentHBaseMapMapper主要用于获取Rowkey和qualifier。Option的完整选项见下面的源码分析。

2、指定state

    topology.newStream("kafka", kafkaSpout).shuffle().each(new Fields("str"), new WordSplit(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(HBaseMapState.transactional(option), new Count(), new Fields("aggregates_words")).parallelismHint(1);

这里使用Option对象来构建一个HBaseMapStateFactory。
还可以通过

HBaseMapState.nonTransactional(option)
HBaseMapState.opaque(option)

分别创建非事务与透明的state。

这里使用了storm内建的Count()方法,如果使用Sum,用法如下:

.persistentAggregate(HBaseMapState.transactional(option), new Fields("cash"), new Sum(), new Fields("state")).parallelismHint(1);

当然还可以自定义方法,这里自定义方法也就可以自定义保存在hbase的数据类型了。

(二)如何实现一个MapState:HBaseMapState源码分析

HBaseMapState的主要代码都在HBaseMapState类中。一个MapState的实现关键在于
* 构建一个实现StateFactory的类,实现makeState() 方法,返回一个State对象。
* 一个MapState,实现IBackingMap接口的multiGet()和multiPut(),指定如何从hbase中读写数据。
关于mapstate的基础介绍请参考上面。

1、Option内部类

HBaseMapState有一个内部类:Option,用于指定一些配置项。

public static class Options<T> implements Serializable {public Serializer<T> serializer = null;public int cacheSize = 5000;public String globalKey = "$HBASE_STATE_GLOBAL$";public String configKey = "hbase.config";public String tableName;public String columnFamily;public TridentHBaseMapMapper mapMapper;
}

分别意思为:
* 序列化器,即以什么格式写入hbase,storm-hbase自带了JSON格式的序列化实现。
* 缓冲大小
* 未知
* 指定hbase-site.xml位置的变量
* 表名
* family名
* 用于获取rowkey和qualifier,创建对象时需要指定一个参数作为qualifier。

2、Factory内部类

(1)构造函数

构造函数接收2个参数,分别为state的类型以及Option对象。
除些以外,还指定了序列化器:

if (this.options.serializer == null) {this.options.serializer = DEFAULT_SERIALZERS.get(stateType);
}

(2)makeState()方法

就是返回一个State对象。

3、构造函数

构造函数用于加载配置文件,安全机制等。

4、返回StateFactory的方法

没什么好介绍的,就是返回各种类型的staStateFactory,具体的说就是返回上面Factory的一个对象。这里只保留了透明型的。

@SuppressWarnings("rawtypes")
public static StateFactory opaque() {Options<OpaqueValue> options = new Options<OpaqueValue>();return opaque(options);
}@SuppressWarnings("rawtypes")
public static StateFactory opaque(Options<OpaqueValue> opts) {return new Factory(StateType.OPAQUE, opts);
}

5、multiGet

根据一个List<List<Object>> keys列表获取到一个返回值的列表List。注意key本身也是一个List<Object>
代码主要是三部分:
(1)创建List<Get> gets

    List<Get> gets = new ArrayList<Get>();for(List<Object> key : keys){byte[] hbaseKey = this.options.mapMapper.rowKey(key);String qualifier = this.options.mapMapper.qualifier(key);LOG.info("Partition: {}, GET: {}", this.partitionNum, new String(hbaseKey));Get get = new Get(hbaseKey);get.addColumn(this.options.columnFamily.getBytes(), qualifier.getBytes());gets.add(get);}

(2)查询hbase:根据gets获取Result[]

List<T> retval = new ArrayList<T>();Result[] results = this.table.get(gets);

(3)将results封装成一个List<T> retval并返回

for (int i = 0; i < keys.size(); i++) {String qualifier = this.options.mapMapper.qualifier(keys.get(i));Result result = results[i];byte[] value = result.getValue(this.options.columnFamily.getBytes(), qualifier.getBytes());if(value != null) {retval.add(this.serializer.deserialize(value));} else {retval.add(null);}}
return retval;

当返回值为空时,则加上null。

6、multiPut

它将一个List<List<Object>> keys, List<T> values的数据写入hbase,注意keys.size()与values.size()必须相等。

List<Put> puts = new ArrayList<Put>(keys.size());for (int i = 0; i < keys.size(); i++) {byte[] hbaseKey = this.options.mapMapper.rowKey(keys.get(i));String qualifier = this.options.mapMapper.qualifier(keys.get(i));LOG.info("Partiton: {}, Key: {}, Value: {}", new Object[]{this.partitionNum, new String(hbaseKey), new String(this.serializer.serialize(values.get(i)))});Put put = new Put(hbaseKey);T val = values.get(i);put.add(this.options.columnFamily.getBytes(),qualifier.getBytes(),this.serializer.serialize(val));puts.add(put);this.table.put(puts);

7、序列化器

序列化器指定了以何种格式将数据写入hbase(序列化),以及取出数据后如何进行解释(反序列化),即关键是serialize()与deserialize()这2个方法。

storm默认提供了json的实现,以Transactional为例:

public class JSONTransactionalSerializer implements Serializer<TransactionalValue>

它的内部只有2个方法:

@Override
public byte[] serialize(TransactionalValue obj) {List toSer = new ArrayList(2);toSer.add(obj.getTxid());toSer.add(obj.getVal());try {return JSONValue.toJSONString(toSer).getBytes("UTF-8");} catch (UnsupportedEncodingException e) {throw new RuntimeException(e);}
}

它将一个TransactionalValue转化为json格式,TransactionalValue只有2个变量,是一个典型的bean:

T val;
Long txid;

而另一个方法deserialize()则刚好相反,它将一个json格式字节流解释为一个TransactionalValue对象:

@Override
public TransactionalValue deserialize(byte[] b) {try {String s = new String(b, "UTF-8");List deser = (List) JSONValue.parse(s);return new TransactionalValue((Long) deser.get(0), deser.get(1));} catch (UnsupportedEncodingException e) {throw new RuntimeException(e);}
}

(三)MapState框架

//TODO:补充各个类的关系图,参考P323

上述介绍了用户如何通过实现IBackingMap接口来创建自己的MapState实现,这里我们将介绍MapState框架是如何调用用户写的mutliGet()和multiPut方法的。

* 另外,如果上述实现iBackingMap的方法不能满足你的要求,你可以实现自己的MapState框架,按照这里介绍的方法即可 *

我们主要以Transactional为例,再简单介绍一下NonTransactional和Opaque的情形。在上面的Factory.makeState()方法中:

IBackingMap state = new HBaseMapState(options, conf, partitionIndex);
mapState = TransactionalMap.build(state);

state就是用户代码定义的MapState实现,此此处是HBaseMapState。我们下面看一下TransactionalMap是如何调用HBaseMapState的mutliGet()和multiPut方法的。

1、build()方法

我们从build方法开始,因为这是用户创建MapState所调用的API。

public static <T> MapState<T> build(IBackingMap<TransactionalValue> backing) {return new TransactionalMap<T>(backing);
}

它使用用户定义的IBackingMap对象创建一个MapState对象,主要通过构造方法来实现。

2、构造方法

protected TransactionalMap(IBackingMap<TransactionalValue> backing) {_backing = new CachedBatchReadsMap(backing);
}

3、beginCommit()

@Override
public void beginCommit(Long txid) {_currTx = txid;_backing.reset();
}

当开始处理一个事务时,设置当前正在处理的txid,reset()是CachedBatchReadsMap类中清空缓存的方法。

TODO: CachedBatchReadsMap分析

4、commit()

@Override
public void commit(Long txid) {_currTx = null;_backing.reset();
}

当一个事务处理完成后,将txid设置为null。

5、multiGet

@Override
public List<T> multiGet(List<List<Object>> keys) {List<CachedBatchReadsMap.RetVal<TransactionalValue>> vals = _backing.multiGet(keys);List<T> ret = new ArrayList<T>(vals.size());for(CachedBatchReadsMap.RetVal<TransactionalValue> retval: vals) {TransactionalValue v = retval.val;if(v!=null) {ret.add((T) v.getVal());} else {ret.add(null);}}return ret;
}

通过调用用户的_backing.multiGet(keys)来实现具体逻辑,作了一些类型转换。

6、multiPut()

@Override
public void multiPut(List<List<Object>> keys, List<T> vals) {List<TransactionalValue> newVals = new ArrayList<TransactionalValue>(vals.size());for(T val: vals) {newVals.add(new TransactionalValue<T>(_currTx, val));}_backing.multiPut(keys, newVals);
}

同样只是调用用户定位的multiPut()。

7、multiUpdate()

核心的逻辑在于这几行:

        if(val==null) {newVal = new TransactionalValue<T>(_currTx, updater.update(null));changed = true;} else {if(_currTx!=null && _currTx.equals(val.getTxid()) && !retval.cached) {newVal = val;} else {newVal = new TransactionalValue<T>(_currTx, updater.update(val.getVal()));changed = true;}}ret.add(newVal.getVal());if(changed) {newVals.add(newVal);newKeys.add(keys.get(i));}}

在这之前,先把数据get出来,然后判断:

  • 如果key对应的value为空,则changed为true
  • 如果key对应的value不为空,而且当前的txid与value中的txid相同,则changed保持为false。
  • 如果key对应的value不为空,但当前的txid与value中的txid不同,则changed为true。

这部分逻辑就是Transactional, NonTransactional和Opaque的差别。
NonTransactional不会判断txid,只要来一批就更新一次。
Opaque基于之前的值作更新。

(四)storm如何调用MapState的代码

根据前面的分析,用户在拓扑定义中通过以下类似的代码来指定state:

    topology.newStream("wordsplit", spout).shuffle().each(new Fields("sentence"), new WordSplit(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(HBaseMapState.transactional(option), new Count(), new Fields("aggregates_words")).parallelismHint(1);

主要看第3、4行,先对数据根据”word”这个流进行分组,然后再调用persistentAggregate()方法。再简单解释一下这个方法,3个参数分别为:
* 返回一个StateFactory对象,它有一个makeState()方法,返回一个State对象。这个state对象就是用户定义的MapState,主要定义了如何从state中读写数据。
* 第二个参数表示如何对取出的数据进行什么操作,这里使用的是Count,如是其它类,如Sum,则多一个参数:

    persistentAggregate(HBaseMapState.transactional(option), new Fields("cash"), new Sum(), new Fields("state")).parallelismHint(1);

* 发送的消息流。

好,我们下面开始分析GroupedStream#persistentAggregate()做了什么东西。

1、GroupedStream类

public TridentState persistentAggregate(StateFactory stateFactory, CombinerAggregator agg, Fields functionFields) {return persistentAggregate(new StateSpec(stateFactory), agg, functionFields);
}public TridentState persistentAggregate(StateSpec spec, CombinerAggregator agg, Fields functionFields) {return persistentAggregate(spec, null, agg, functionFields);
}public TridentState persistentAggregate(StateFactory stateFactory, Fields inputFields, CombinerAggregator agg, Fields functionFields) {return persistentAggregate(new StateSpec(stateFactory), inputFields, agg, functionFields);
}

很简单的代码逻辑,先使用StateFactory对象创建一个StateSpec对象,然后继续调用,从第3个方法可以看出,这里还有一个参数是表示inputFields,即输入的field,即对哪个field执行CombinerAggregator的操作。StateSpec类的定义非常简单:

public class StateSpec implements Serializable {public StateFactory stateFactory;public Integer requiredNumPartitions = null;public StateSpec(StateFactory stateFactory) {this.stateFactory = stateFactory;}
}

最终真正调用的方法是这个:

public TridentState persistentAggregate(StateSpec spec, Fields inputFields, CombinerAggregator agg, Fields functionFields) {return aggregate(inputFields, agg, functionFields).partitionPersist(spec,TridentUtils.fieldsUnion(_groupFields, functionFields),new MapCombinerAggStateUpdater(agg, _groupFields, functionFields),TridentUtils.fieldsConcat(_groupFields, functionFields));
}

这个方法主要分成2个步骤
* 第一个是调用aggregate()方法,主要如何对数据进行操作。这部分我们以后再分析,反正把它理解为一个数据的更新就好了。
* 第二个是调用partitionPersist()方法,如何将数据写入state。

       public TridentState partitionPersist(StateSpec stateSpec, Fields inputFields, StateUpdater updater, Fields functionFields) {projectionValidation(inputFields);String id = _topology.getUniqueStateId();ProcessorNode n = new ProcessorNode(_topology.getUniqueStreamId(),_name,functionFields,functionFields,new PartitionPersistProcessor(id, inputFields, updater));n.committer = true;n.stateInfo = new NodeStateInfo(id, stateSpec);return _topology.addSourcedStateNode(this, n);}

构建一个ProcessorNode,然后将它添加进_topology中。

trident State应用指南相关推荐

  1. Trident State译文

    Trident State 译文 Trident针对状态化的数据源的读写进行了一流的分装.State可以包含在拓扑中-例如,保存在内存中,有HDFS提供备份-也可以保存在一个外部的数据库中,像Memc ...

  2. storm mysql trident_storm trident实战 trident state

    一.认识storm trident trident可以理解为storm批处理的高级抽象,提供了分组.分区.聚合.函数等操作,提供一致性和恰好一次处理的语义. 1)元祖被作为batch处理 2)每个ba ...

  3. Trident state

    State in Trident Trident有对有状态数据源的抽象.state要么在topology内部如内存和HDFS中, 或外部存储于数据库如 Memcached或Cassandr.在Trid ...

  4. Flink Broadcast State实用指南

    从1.5.0开始,Flink提供了一种新的State类型,称为Broadcast State.在这篇文章中,我们将解释什么是Broadcast State,并展示如何将其应用于评估事件流上的动态模式的 ...

  5. trident API指南

    trident API指南 @(STORM)[storm] trident API指南 零 概述 1 本地分区操作 2 重新分区操作 3 聚合操作 4 流分组操作 5合并与连接 一 本地分区操作 一 ...

  6. [Trident] Storm Trident 教程,state详解、trident api详解及实例

    英文原址:https://github.com/nathanmarz/storm/wiki/Trident-tutorial ---------------- Trident是在storm基础上,一个 ...

  7. Trident API 概览

    Trident API 概览 在网上看到了很多有TRIDENT相关API的翻译,看来看去,总觉得没有说清楚很多东西,所以自己结合使用的经验翻译了一篇出来:翻译完以后,也发现 在自己的翻译中也有很多地方 ...

  8. Apache Storm 官方文档 —— Trident API 概述

    转载自并发编程网 – ifeve.com本文链接地址: Apache Storm 官方文档 -- Trident API 概述 窗口部分的内容是我自己翻译的 Trident 的核心数据模型是" ...

  9. storm trident mysql_Trident-MySQL

    使用事物TridentTopology 持久化数据到MySQL1.构建拓扑JDBCTopology类packagestorm.trident.mysql;importjava.util.Arrays; ...

最新文章

  1. HDU2019 数列有序
  2. 图像处理——Edge Boxes边缘检测
  3. c语言编程中%g是什么格式
  4. 从文本分类来看图卷积神经网络
  5. css怎么把横向菜单变纵向_CSS 布局模式 + 居中布局
  6. WPF的ComboBox 数据模板自定义
  7. 数据运营小白如何搭建“初期用户生命周期体系”?
  8. vue监听用户在页面的浏览时间需在beforeDestroy()里面进行销毁
  9. mysql 字符串出现问题_MYSQL 中字符串函数 归纳总结
  10. 函数嵌套和nonlocal声明
  11. Unix操作系统发展历史
  12. 程序员计算器HEX、EDC、OCT的意思
  13. 计算机网络的创新创业计划书,互联网创新创业计划书.doc
  14. python txt追加写入_python 实现在txt指定行追加文本的方法
  15. spring使用之旅 ---- bean的装配
  16. excel区别奇偶行(删除、过滤)
  17. Feign报错java.lang.NoSuchFieldError: MULTIPART_RELATED
  18. Kaldi声学模型训练
  19. PotPlayer不支持S/W HEVC(H.265)解码怎么办?一招解决所有的不支持解码
  20. Photoshop 2018 学习笔记 目录

热门文章

  1. 【自定义排序规则】剑指 Offer 45. 把数组排成最小的数
  2. 【双100%解法】剑指 Offer 24. 反转链表
  3. 【已解决】Exception in thread “Thread-0“ redis.clients.jedis.exceptions.JedisConnectionException: java.n
  4. 43行代码AC_HDU-2604 Queuing(矩阵快速幂,附详细的知识讲解、模板例题)
  5. 实验详解——DNS反向解析、DNS主服务器和从服务器的配置
  6. python日志模块_Python之日志处理(logging模块)
  7. Vmware中mac snow leopard蘋果雪豹系統驅動程式安裝方法
  8. mysql 优化器不准_mysql 优化器有哪些可选开关
  9. mongodb mysql 写_MongoDB与MySQL关于写确认的异同
  10. 接受map_[译] 图解 Map、Reduce 和 Filter 数组方法