storm mysql trident_storm trident实战 trident state
一、认识storm trident
trident可以理解为storm批处理的高级抽象,提供了分组、分区、聚合、函数等操作,提供一致性和恰好一次处理的语义。
1)元祖被作为batch处理
2)每个batch的元祖都被指定唯一的一个事物id,如果因为处理失败导致batch重发,也和保证和重发前一样的事物id
3)数据更新操作严格有序,比如batch1必须在batch2之前被成功处理,且如果batch1失败了,后面的处理也会失败。
假如: batch1处理1--20
batch2处理21--40
batch1处理失败,那么batch2也会失败
虽然数据更新操作严格有序,但是数据处理阶段也可以并行的,只是最后的持久化操作必须有序。
1.1 trident state
trident的状态具有仅仅处理一次,持续聚合的语义,使用trident来实现恰好一次的语义不需要开发人员去处理事务相关的工作,因为trident state已经帮我们封装好了,只需要编写类似于如下的代码:
topology.newStream("sentencestream", spout)
.each(new Fields("sentence"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.persistentAggregate(new MyHbaseState.HbaseFactory(options), new Count(), new Fields("count"))
.parallelismHint(3);
所有处理事务逻辑都在MyHbaseState.HbaseFactory中处理了(这个是我自己定义的,trident支持在内存里面处理,类似于MemachedState.opaque)。
trident提供了一个StateFactory用来创建State对象的实例,行如:
public final class XFactory implements StateFactory{
public State makeState(Map conf,int partitonIndex,int numPartitions){
return new State();
}
}
1.2 persistentAggregate
persistentAggregate是trident中用来更新来源的状态,如果前面是一个分好组的流,trident希望你提供的状态实现MapState接口,其中key是分组的字段,
而聚合结果是状态的值。
1.3 实现MapStates
trident中实现MapState非常简单,只需要为这个类提供一个IBackingMap的接口实现接口。
二、实战
首先搭建好zk,storm,hadoop,hbase的分布式环境
master:
slave1:
slave2:
main方法:
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
TridentTopology topology = new TridentTopology();
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
new Values("tanjie is a good man"), new Values(
"what is your name"), new Values("how old are you"),
new Values("my name is tanjie"), new Values("i am 18"));
spout.setCycle(false);
tridentStreamToHbase(topology,spout);
Config config = new Config();
config.setDebug(false);
StormSubmitter.submitTopologyWithProgressBar("word_count_trident_state_HbaseState", config, topology.build());
}
tridentStreamToHbase方法:
private static TridentState tridentStreamToHbase(TridentTopology topology,
FixedBatchSpout spout) {
MyHbaseState.Options options = new MyHbaseState.Options();
options.setTableName("storm_trident_state");
options.setColumFamily("colum1");
options.setQualifier("q1");
/**
* 根据数据源拆分单词后,然后分区操作,在每个分区上又进行分组(hash算法),然后在每个分组上进行聚合
* 所以这里可能有多个分区,每个分区有多个分组,然后在多个分组上进行聚合
* 用来进行group的字段会以key的形式存在于State当中,聚合后的结果会以value的形式存储在State当中
*/
return topology.newStream("sentencestream", spout)
.each(new Fields("sentence"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.persistentAggregate(new MyHbaseState.HbaseFactory(options), new Count(), new Fields("count"))
.parallelismHint(3);
}
MyHbaseState实现:
package com.storm.trident.state.hbase;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.storm.task.IMetricsContext;
import org.apache.storm.trident.state.JSONNonTransactionalSerializer;
import org.apache.storm.trident.state.JSONOpaqueSerializer;
import org.apache.storm.trident.state.JSONTransactionalSerializer;
import org.apache.storm.trident.state.Serializer;
import org.apache.storm.trident.state.State;
import org.apache.storm.trident.state.StateFactory;
import org.apache.storm.trident.state.StateType;
import org.apache.storm.trident.state.map.IBackingMap;
import org.apache.storm.trident.state.map.MapState;
import org.apache.storm.trident.state.map.OpaqueMap;
import org.apache.storm.trident.state.map.SnapshottableMap;
import org.apache.storm.tuple.Values;
import com.google.common.collect.Maps;
@SuppressWarnings({ "unchecked", "rawtypes" })
public class MyHbaseState implements IBackingMap {
private static final Map DEFAULT_SERIALZERS = Maps
.newHashMap();
private int partitionNum;
private Options options;
private Serializer serializer;
private Connection connection;
private Table table;
static {
DEFAULT_SERIALZERS.put(StateType.NON_TRANSACTIONAL,
new JSONNonTransactionalSerializer());
DEFAULT_SERIALZERS.put(StateType.TRANSACTIONAL,
new JSONTransactionalSerializer());
DEFAULT_SERIALZERS.put(StateType.OPAQUE, new JSONOpaqueSerializer());
}
public MyHbaseState(final Options options, Map conf, int partitionNum) {
this.options = options;
this.serializer = options.serializer;
this.partitionNum = partitionNum;
try {
connection = ConnectionFactory.createConnection(HBaseConfiguration
.create());
table = connection.getTable(TableName.valueOf(options.tableName));
} catch (IOException e) {
e.printStackTrace();
}
}
public static class Options implements Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;
public Serializer serializer = null;
public String globalkey = "$HBASE_STATE_GLOBAL$";
/**
* 表名
*/
public String tableName;
/**
* 列族
*/
public String columFamily;
/**
*
*/
public String qualifier;
public String getTableName() {
return tableName;
}
public void setTableName(String tableName) {
this.tableName = tableName;
}
public String getColumFamily() {
return columFamily;
}
public void setColumFamily(String columFamily) {
this.columFamily = columFamily;
}
public String getQualifier() {
return qualifier;
}
public void setQualifier(String qualifier) {
this.qualifier = qualifier;
}
}
protected static class HbaseFactory implements StateFactory {
private static final long serialVersionUID = 1L;
private Options options;
public HbaseFactory(Options options) {
this.options = options;
if (this.options.serializer == null) {
this.options.serializer = DEFAULT_SERIALZERS
.get(StateType.OPAQUE);
}
}
@Override
public State makeState(Map conf, IMetricsContext metrics,
int partitionIndex, int numPartitions) {
System.out.println("partitionIndex:" + partitionIndex
+ ",numPartitions:" + numPartitions);
IBackingMap state = new MyHbaseState(options, conf, partitionIndex);
MapState mapState = OpaqueMap.build(state);
return new SnapshottableMap(mapState, new Values(options.globalkey));
}
}
@Override
public void multiPut(List> keys, List values) {
List puts = new ArrayList(keys.size());
for (int i = 0; i < keys.size(); i++) {
Put put = new Put(toRowKey(keys.get(i)));
T val = values.get(i);
System.out.println("partitionIndex: " + this.partitionNum
+ ",key.get(i):" + keys.get(i) + "value值:" + val);
put.addColumn(this.options.columFamily.getBytes(),
this.options.qualifier.getBytes(),
this.options.serializer.serialize(val));
puts.add(put);
}
try {
this.table.put(puts);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public List multiGet(List> keys) {
List gets = new ArrayList();
for (final List key : keys) {
// LOG.info("Partition: {}, GET: {}", this.partitionNum, key);
Get get = new Get(toRowKey(key));
get.addColumn(this.options.columFamily.getBytes(),
this.options.qualifier.getBytes());
gets.add(get);
}
List retval = new ArrayList();
try {
// 批量获取所有rowKey的数据
Result[] results = this.table.get(gets);
for (final Result result : results) {
byte[] value = result.getValue(
this.options.columFamily.getBytes(),
this.options.qualifier.getBytes());
if (value != null) {
retval.add(this.serializer.deserialize(value));
} else {
retval.add(null);
}
}
} catch (IOException e) {
e.printStackTrace();
}
return retval;
}
private byte[] toRowKey(List keys) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
try {
for (Object key : keys) {
bos.write(String.valueOf(key).getBytes());
}
bos.close();
} catch (IOException e) {
throw new RuntimeException("IOException creating HBase row key.", e);
}
return bos.toByteArray();
}
}
运行结果:
查看supervisor日志:
2016-12-23 11:34:25.576 STDIO [INFO] partitionIndex: 0,key.get(i):[good]value值:org.apache.storm.trident.state.OpaqueValue@6498fd6a[currTxid=1,prev=,curr=1]
2016-12-23 11:34:25.582 STDIO [INFO] partitionIndex: 1,key.get(i):[name]value值:org.apache.storm.trident.state.OpaqueValue@81e227f[currTxid=1,prev=,curr=1]
2016-12-23 11:34:25.582 STDIO [INFO] partitionIndex: 1,key.get(i):[are]value值:org.apache.storm.trident.state.OpaqueValue@726ac402[currTxid=1,prev=,curr=1]
2016-12-23 11:34:25.585 STDIO [INFO] partitionIndex: 2,key.get(i):[what]value值:org.apache.storm.trident.state.OpaqueValue@2667735e[currTxid=1,prev=,curr=1]
2016-12-23 11:34:25.585 STDIO [INFO] partitionIndex: 2,key.get(i):[your]value值:org.apache.storm.trident.state.OpaqueValue@51c73404[currTxid=1,prev=,curr=1]
2016-12-23 11:34:25.585 STDIO [INFO] partitionIndex: 2,key.get(i):[tanjie]value值:org.apache.storm.trident.state.OpaqueValue@6d281c8d[currTxid=1,prev=,curr=1]
2016-12-23 11:34:25.585 STDIO [INFO] partitionIndex: 2,key.get(i):[old]value值:org.apache.storm.trident.state.OpaqueValue@646aa4f7[currTxid=1,prev=,curr=1]
2016-12-23 11:34:25.586 STDIO [INFO] partitionIndex: 2,key.get(i):[is]value值:org.apache.storm.trident.state.OpaqueValue@157487a2[currTxid=1,prev=,curr=2]
2016-12-23 11:34:25.586 STDIO [INFO] partitionIndex: 2,key.get(i):[a]value值:org.apache.storm.trident.state.OpaqueValue@1574a7af[currTxid=1,prev=,curr=1]
2016-12-23 11:34:25.586 STDIO [INFO] partitionIndex: 2,key.get(i):[how]value值:org.apache.storm.trident.state.OpaqueValue@1dacdd2a[currTxid=1,prev=,curr=1]
2016-12-23 11:34:25.587 STDIO [INFO] partitionIndex: 2,key.get(i):[you]value值:org.apache.storm.trident.state.OpaqueValue@3febff9e[currTxid=1,prev=,curr=1]
2016-12-23 11:34:25.587 STDIO [INFO] partitionIndex: 2,key.get(i):[man]value值:org.apache.storm.trident.state.OpaqueValue@1edafedb[currTxid=1,prev=,curr=1]
2016-12-23 11:34:25.812 STDIO [INFO] partitionIndex: 2,key.get(i):[tanjie]value值:org.apache.storm.trident.state.OpaqueValue@38a106df[currTxid=2,prev=1,curr=2]
2016-12-23 11:34:25.812 STDIO [INFO] partitionIndex: 2,key.get(i):[is]value值:org.apache.storm.trident.state.OpaqueValue@53ca3784[currTxid=2,prev=2,curr=3]
2016-12-23 11:34:25.815 STDIO [INFO] partitionIndex: 0,key.get(i):[am]value值:org.apache.storm.trident.state.OpaqueValue@5261a4c8[currTxid=2,prev=,curr=1]
2016-12-23 11:34:25.815 STDIO [INFO] partitionIndex: 0,key.get(i):[my]value值:org.apache.storm.trident.state.OpaqueValue@88970b9[currTxid=2,prev=,curr=1]
2016-12-23 11:34:25.826 STDIO [INFO] partitionIndex: 1,key.get(i):[i]value值:org.apache.storm.trident.state.OpaqueValue@78b27ff6[currTxid=2,prev=,curr=1]
2016-12-23 11:34:25.827 STDIO [INFO] partitionIndex: 1,key.get(i):[name]value值:org.apache.storm.trident.state.OpaqueValue@eef2d62[currTxid=2,prev=1,curr=2]
2016-12-23 11:34:25.828 STDIO [INFO] partitionIndex: 1,key.get(i):[18]value值:org.apache.storm.trident.state.OpaqueValue@788c8496[currTxid=2,prev=,curr=1]
查看hbase表
storm mysql trident_storm trident实战 trident state相关推荐
- storm mysql trident_Storm Trident详解
Trident是基于Storm进行实时留处理的高级抽象,提供了对实时流4的聚集,投影,过滤等操作,从而大大减少了开发Storm程序的工作量.Trident还提供了针对数据库或则其他持久化存储的有状态的 ...
- storm mysql trident_Storm入门(十三)Storm Trident 教程
英文原址:https://github.com/nathanmarz/storm/wiki/Trident-tutorial ---------------- Trident是在storm基础上,一个 ...
- storm mysql trident_Storm Trident状态
Trident中有对状态数据进行读取和写入操作的一流抽象工具.状态既可以保存在拓扑内部,比如保存在内容中并由HDFS存储,也可以通过外部存储(比如Memcached或Cassandra)存储在数据库中 ...
- [Trident] Storm Trident 教程,state详解、trident api详解及实例
英文原址:https://github.com/nathanmarz/storm/wiki/Trident-tutorial ---------------- Trident是在storm基础上,一个 ...
- [实战]MVC5+EF6+MySql企业网盘实战(2)——用户注册
写在前面 上篇文章简单介绍了项目的结构,这篇文章将实现用户的注册.当然关于漂亮的ui,这在追后再去添加了,先将功能实现.也许代码中有不合适的地方,也只有在之后慢慢去优化了. 系列文章 [EF]vs15 ...
- 从前慢-Mysql高级及实战
Mysql高级及实战 1 Linux 系统安装MySQL 1.1 下载Linux 安装包 https://dev.mysql.com/downloads/mysql/5.7.html#download ...
- 开发人员MySQL调优-实战篇2-让SQL使用索引详解
2019独角兽企业重金招聘Python工程师标准>>> 建议先看看开发人员MySQL调优-实战篇0 让执行的SQL使用索引 虽然DBA给我们建了很多索引,但没有经验的开发人员往往只看 ...
- [实战]MVC5+EF6+MySql企业网盘实战(16)——逻辑重构3
写在前面 本篇文章将新建文件夹的逻辑也进行一下修改. 系列文章 [EF]vs15+ef6+mysql code first方式 [实战]MVC5+EF6+MySql企业网盘实战(1) [实战]MVC5 ...
- 《Storm企业级应用:实战、运维和调优》——1.4 Storm的特性
本节书摘来自华章计算机<Storm企业级应用:实战.运维和调优>一书中的第1章,第1.4节,作者:马延辉 陈书美 雷葆华著, 更多章节内容可以访问云栖社区"华章计算机" ...
最新文章
- apache2.4中layout模块和ssi模块的冲突
- ORACLE常用性能监控SQL【一】
- Ubuntu系统下桌面卡死,但是鼠标键盘可以动
- 切记:只有肯吃苦才能赚大钱!
- Java面向对象(12)--对象类型转换 (Casting )
- php echo 前后有字符串,php echo 输出字符串函数详解、多行输出方法
- 一张图看清自然语言处理脉络
- 【学习笔记】深入理解js原型和闭包(9)—— 简述【执行上下文】下
- java 基础知识3
- 什么是操作系统啊 | 战术后仰
- 获评优秀案例!IMG光线追踪技术实现卓越云游戏体验
- Java实现 蓝桥杯VIP 算法提高 彩票
- 4个高质量站点推荐值得收藏
- SDTM submission - 如何处理split domain
- ps抠出图像的透明阴影
- 华为Nova8缺货怎么办?这款手机也不错
- nvme固态硬盘开机慢_6个固态硬盘优化设置技巧 让你的SSD速度飞起来 (全文)
- CentOS 7 升级内核
- 2021年从事Linux运维云计算前景如何?
- OpenCV中拆分通道、合并通道、alpha通道的讲解及实战演示(附python源码 超详细)
热门文章
- OA协同办公系统对企业有什么作用?
- c语言control函数,C语言05-ControlFl.ppt
- 咖说 | ​数字基建狂潮中:区块链处于什么位置?
- plm软件试用测试报告,ANSYS 15.0系列测试报告——FLUENT Meshing 15.0
- 重学 Java 设计模式:实战备忘录模式「模拟互联网系统上线过程中,配置文件回滚场景」
- GooglePlay OAuth使用
- Excel如何将阿拉伯数字转为中文数字
- 两年经验斩获蚂蚁/头条/PingCAP Offer,牛逼了
- 一文搞懂测试左移和测试右移
- 数据库外键级联修改删除