storm trident mysql_Trident-MySQL
使用事物TridentTopology 持久化数据到MySQL1、构建拓扑JDBCTopology类packagestorm.trident.mysql;importjava.util.Arrays;importjava.util.Map;importorg.apache.storm.Config;importorg.apache.storm.LocalCluster;importorg.apache.storm.trident.TridentState;importorg.apache.storm.trident.TridentTopology;importorg.apache.storm.trident.operation.BaseFunction;importorg.apache.storm.trident.operation.CombinerAggregator;importorg.apache.storm.trident.operation.TridentCollector;importorg.apache.storm.trident.spout.IBatchSpout;importorg.apache.storm.trident.state.StateType;importorg.apache.storm.trident.testing.FixedBatchSpout;importorg.apache.storm.trident.testing.MemoryMapState;importorg.apache.storm.trident.tuple.TridentTuple;importorg.apache.storm.tuple.Fields;importorg.apache.storm.tuple.Values;/*** 事物Trident-MySQL Topology
*@authormengyao
**/@SuppressWarnings("all")public classJDBCTopology {public static voidmain(String[] args) {
TridentTopology topology= newTridentTopology();//Spout数据源
FixedBatchSpout spout = new FixedBatchSpout(new Fields("tels"), 7,new Values("189111 3"),new Values("135111 7"),new Values("189111 2"),new Values("158111 5"),new Values("159111 6"),new Values("159111 3"),new Values("158111 5")
);
spout.setCycle(false);//State持久化配置属性
JDBCStateConfig config = newJDBCStateConfig();
config.setDriver("com.mysql.jdbc.Driver");
config.setUrl("jdbc:mysql://localhost:3306/test");
config.setUsername("root");
config.setPassword("123456");
config.setBatchSize(10);
config.setCacheSize(10);
config.setType(StateType.TRANSACTIONAL);
config.setCols("tel");
config.setColVals("sum");
config.setTable("tbl_tel");
topology.newStream("spout", spout)
.each(new Fields("tels"), new KeyValueFun(), new Fields("tel", "money"))
.groupBy(new Fields("tel"))
.persistentAggregate(JDBCState.getFactory(config),new Fields("money"), new SumCombinerAgg(), new Fields("sum"));
LocalCluster cluster= newLocalCluster();
cluster.submitTopology("test1", newConfig(), topology.build());
}
}
@SuppressWarnings("all")class KeyValueFun extendsBaseFunction {
@Overridepublic voidexecute(TridentTuple tuple, TridentCollector collector) {
String record= tuple.getString(0);
collector.emit(new Values(record.split("\t")[0], record.split("\t")[1]));
}
}
@SuppressWarnings("all")class SumCombinerAgg implements CombinerAggregator{
@OverridepublicLong init(TridentTuple tuple) {return Long.parseLong(tuple.getString(0));
}
@OverridepublicLong combine(Long val1, Long val2) {
Long val= val1+val2;
System.out.println(val);returnval;
}
@OverridepublicLong zero() {return 0L;
}
}2、构建基于IBackingMap的JDBCState类packagestorm.trident.mysql;importjava.util.ArrayList;importjava.util.HashMap;importjava.util.List;importjava.util.Map;importorg.apache.storm.task.IMetricsContext;importorg.apache.storm.trident.state.OpaqueValue;importorg.apache.storm.trident.state.State;importorg.apache.storm.trident.state.StateFactory;importorg.apache.storm.trident.state.StateType;importorg.apache.storm.trident.state.TransactionalValue;importorg.apache.storm.trident.state.map.CachedMap;importorg.apache.storm.trident.state.map.IBackingMap;importorg.apache.storm.trident.state.map.NonTransactionalMap;importorg.apache.storm.trident.state.map.OpaqueMap;importorg.apache.storm.trident.state.map.TransactionalMap;
@SuppressWarnings("all")public class JDBCState implements IBackingMap{private staticJDBCStateConfig config;
JDBCState(JDBCStateConfig config){this.config =config;
}
@Overridepublic List multiGet(List>keys) {
StringBuilder sqlBuilder= new StringBuilder("SELECT ").append(config.getCols())
.append(","+config.getColVals())
.append(",txid")
.append(" FROM "+config.getTable())
.append(" WHERE ")
.append(config.getCols())
.append("='");
JDBCUtil jdbcUtil= newJDBCUtil(config.getDriver(),config.getUrl(),config.getUsername(),config.getPassword());
List result = new ArrayList();
Map map = null;for (Listlist : keys) {
Object key= list.get(0);
map= jdbcUtil.queryForMap(sqlBuilder.toString()+key+"'");
System.out.println(sqlBuilder.toString()+key+"'"+" 【"+map);
Bean itemBean=(Bean)map.get(key);long txid=0L;long val=0L;if (itemBean!=null) {
val=itemBean.getSum();
txid=itemBean.getTxid();
}if (config.getType()==StateType.OPAQUE) {
result.add(newOpaqueValue(txid, val));
}else if (config.getType()==StateType.TRANSACTIONAL) {
result.add(newTransactionalValue(txid, val));
}else{
result.add(val);
}
}return (List) result;
}
@Overridepublic void multiPut(List> keys, Listvals) {//构建新增SQL
StringBuilder sqlBuilder = new StringBuilder("INSERT INTO ").append(config.getTable())
.append("("+config.getCols())
.append(","+config.getColVals())
.append(",txid")
.append(",time")
.append(") VALUES ");for (int i = 0; i < keys.size(); i++) {
List key =keys.get(i);if (config.getType()==StateType.TRANSACTIONAL) {
TransactionalValue val=(TransactionalValue)vals.get(i);
sqlBuilder.append("(");
sqlBuilder.append(key.get(0));
sqlBuilder.append(",");
sqlBuilder.append(val.getVal());
sqlBuilder.append(",");
sqlBuilder.append(val.getTxid());
sqlBuilder.append(",NOW()");
sqlBuilder.append("),");
}
}
sqlBuilder.setLength(sqlBuilder.length()-1);
System.out.println(sqlBuilder.toString());//新增数据
JDBCUtil jdbcUtil = newJDBCUtil(config.getDriver(),config.getUrl(),config.getUsername(),config.getPassword());
jdbcUtil.insert(sqlBuilder.toString());
}public staticFactory getFactory(JDBCStateConfig config) {return newFactory(config);
}static class Factory implementsStateFactory {private staticJDBCStateConfig config;publicFactory(JDBCStateConfig config) {this.config =config;
}
@Overridepublic State makeState(Map conf, IMetricsContext metrics, int partitionIndex, intnumPartitions) {final CachedMap map = new CachedMap(newJDBCState(config), config.getCacheSize());
System.out.println(config);if(config.getType()==StateType.OPAQUE) {returnOpaqueMap.build(map);
}else if(config.getType()==StateType.TRANSACTIONAL){returnTransactionalMap.build(map);
}else{returnNonTransactionalMap.build(map);
}
}
}
}3、构建基于IBackingMap的JDBCStateConfig配置类packagestorm.trident.mysql;importjava.util.List;importorg.apache.storm.trident.state.StateType;
@SuppressWarnings("all")public classJDBCStateConfig {privateString url;privateString driver;privateString username;privateString password;privateString table;private intbatchSize;privateString cols;privateString colVals;private int cacheSize = 100;private StateType type =StateType.OPAQUE;publicString getUrl() {returnurl;
}public voidsetUrl(String url) {this.url =url;
}publicString getDriver() {returndriver;
}public voidsetDriver(String driver) {this.driver =driver;
}publicString getUsername() {returnusername;
}public voidsetUsername(String username) {this.username =username;
}publicString getPassword() {returnpassword;
}public voidsetPassword(String password) {this.password =password;
}publicString getTable() {returntable;
}public voidsetTable(String table) {this.table =table;
}public intgetBatchSize() {returnbatchSize;
}public void setBatchSize(intbatchSize) {this.batchSize =batchSize;
}publicString getCols() {returncols;
}public voidsetCols(String cols) {this.cols =cols;
}publicString getColVals() {returncolVals;
}public voidsetColVals(String colVals) {this.colVals =colVals;
}public intgetCacheSize() {returncacheSize;
}public void setCacheSize(intcacheSize) {this.cacheSize =cacheSize;
}publicStateType getType() {returntype;
}public voidsetType(StateType type) {this.type =type;
}
@OverridepublicString toString() {return "Test2StateConfig [url=" + url + ", driver=" + driver + ", username=" + username + ", password="
+ password + ", table=" + table + ", batchSize=" + batchSize + ", cols=" +cols+ ", colVals=" + colVals + ", cacheSize=" + cacheSize + ", type=" + type + "]";
}
}4、构建JDBC工具类和实体Beanpackagestorm.trident.mysql;importjava.sql.Connection;importjava.sql.DriverManager;importjava.sql.PreparedStatement;importjava.sql.ResultSet;importjava.sql.SQLException;importjava.util.HashMap;importjava.util.Map;public classJDBCUtil {privateString driver;privateString url;privateString username;privateString password;privateConnection connection;privatePreparedStatement ps;privateResultSet rs;publicJDBCUtil(String driver, String url, String username, String password) {this.driver =driver;this.url =url;this.username =username;this.password =password;
init();
}voidinit(){try{
Class.forName(driver);
}catch(ClassNotFoundException e) {
e.printStackTrace();
}
}public booleaninsert(String sql) {int state = 0;try{
connection=DriverManager.getConnection(url, username, password);
ps=connection.prepareStatement(sql);
state=ps.executeUpdate();
}catch(SQLException e) {
e.printStackTrace();
}finally{try{
ps.close();
connection.close();
}catch(SQLException e) {
e.printStackTrace();
}
}if (state>0) {return true;
}return false;
}public MapqueryForMap(String sql) {
Map result = new HashMap();try{
connection=DriverManager.getConnection(url, username, password);
ps=connection.prepareStatement(sql);
rs=ps.executeQuery();if(rs.next()){
Bean iteBean=new Bean(rs.getString("tel"), rs.getLong("sum"), rs.getLong("txid"), null);
result.put(rs.getString("tel"), iteBean);
}
}catch(SQLException e) {
e.printStackTrace();
}finally{try{
ps.close();
connection.close();
}catch(SQLException e) {
e.printStackTrace();
}
}returnresult;
}publicString getDriver() {returndriver;
}public voidsetDriver(String driver) {this.driver =driver;
}publicString getUrl() {returnurl;
}public voidsetUrl(String url) {this.url =url;
}publicString getUsername() {returnusername;
}public voidsetUsername(String username) {this.username =username;
}publicString getPassword() {returnpassword;
}public voidsetPassword(String password) {this.password =password;
}
}packagestorm.trident.mysql;public classBean {privateString tel;private longsum;private longtxid;privateString time;publicBean(){
}public Bean(String tel, long sum, longtxid, String time) {super();this.tel =tel;this.sum =sum;this.txid =txid;this.time =time;
}publicString getTel() {returntel;
}public voidsetTel(String tel) {this.tel =tel;
}public longgetSum() {returnsum;
}public void setSum(longsum) {this.sum =sum;
}public longgetTxid() {returntxid;
}public void setTxid(longtxid) {this.txid =txid;
}publicString getTime() {returntime;
}public voidsetTime(String time) {this.time =time;
}
@OverridepublicString toString() {return "Bean [tel=" + tel + ", sum=" + sum + ", txid=" + txid + ", time=" + time + "]";
}
}
storm trident mysql_Trident-MySQL相关推荐
- Storm Trident拓扑中的错误处理
这篇文章总结了我在设计Storm Trident拓扑时当前的错误处理方法. 我在这里关注代码设计,而不是监督或冗余之类的部署良好实践. 由于Storm的实时流性质,当面对大多数错误时,我们最终将不得不 ...
- Storm Trident API
在Storm Trident中有五种操作类型 Apply Locally:本地操作,所有操作应用在本地节点数据上,不会产生网络传输 Repartitioning:数据流重定向,单纯的改变数据流向,不会 ...
- Storm精华问答 | Storm如何连接MySQL?
Storm是Twitter开源的分布式实时大数据处理框架,被业界称为实时版Hadoop.随着越来越多的场景对Hadoop的MapReduce高延迟无法容忍,大数据实时处理解决方案的应用日趋广泛,目前已 ...
- Storm Trident示例shuffleparallelismHint
本例包括Storm Trident中shuffle与parallelismHint的使用. 代码当中包括注释 maven <dependency><groupId>org.ap ...
- Storm Trident简介
转载自:[翻译][Trident] Storm Trident 教程 英文原址:https://github.com/nathanmarz/storm/wiki/Trident-tutorial -- ...
- [Trident] Storm Trident 教程,state详解、trident api详解及实例
英文原址:https://github.com/nathanmarz/storm/wiki/Trident-tutorial ---------------- Trident是在storm基础上,一个 ...
- Storm Trident 详细介绍
一.概要 1.1 Storm(简介) Storm是一个实时的可靠地分布式流计算框架. 具体就不多说了,举个例子,它的一个典型的大数据实时计算应用场景:从Kafka消息队列读取消息( ...
- Storm - Trident
[align=center][size=large]Trident[/size][/align] 一.Storm 保证性 1.数据一定会发送 通过 ack / fail 方法确认,若失败,则提供重新发 ...
- storm trident mysql,storm_Trident
简介: Trident 是 Storm 的一种高度抽象的实时计算模型,它可以将高吞吐量(每秒百万级)数据输入.有状态的流式处理与低延时的分布式查询无缝结合起来.如果你了解 Pig 或者 Cascadi ...
最新文章
- golang install/build 生成的文件命名和路径
- Antd组件中单选框、复选框自定义样式的优雅实现
- LBP(局部二值模式)特征提取原理
- 分享一个有趣的学习方法,欢迎一起探讨如何提高学习兴趣作者:1-2-3 来源:博客园 发布时间:2009-03-09 16:20 阅读:2820 次 原文链接 [收藏]
- 职场‘下班沉默症’调查
- 分布式缓存服务是什么?
- 『天涯杂谈』走的地方越多,越觉的中国的狭隘
- java国际化程序创建属性文件_Java国际化(i18n)ResourceBundle类
- ubuntu 20.04中火狐浏览器安装flash插件
- java j2se下载_J2SE基础之下载eclipse并创建项目
- 计算机怎么算折扣价格,5折怎么算-商场打折背后的猫腻:满100减50和打5折哪个更划算?...
- 从play_mp3例程出发理解ESP32-ADF的使用方法
- 金泰克/tigo S300 240G SM2256K H27QFG8PEM5R 完整开盘教程
- 内容推荐Mahout
- Camera Tuning
- 09年第一次团队活动感想
- 优盘制作服务器引导盘,USBOS V3.0彪悍版U盘启动盘制作工具-用于PC/工控机/服务器/Surface/Mac...
- spring-boot-starter-custom
- 让计算变简单 华为RH2288HV3服务器评测
- SVG的国旗图标集合flag-icon-css在vue中的使用
热门文章
- 云队友丨李善友:人生为一件大事而来!
- C#,使用office组件Microsoft.Office.Interop.Word,将网页内容下载为word的demo及权限配置要点。
- 【快代理】独享代理使用教程
- FME支持的Autodesk Revit 格式概要
- 为什么没有参数的函数(与实际函数定义相比)会编译?
- 智能家居论文文献_智能家居控制系统界面设计结论与参考文献
- 大气快速辐射传输模型RTTOV12.2安装教程及心得体会
- 5G时代下的室内定位技术--精准室内定位--新导智能
- 如何制作一个可以自动更新的Github个人主页
- 白帽子黑客教你:如何下载Windows原版操作系统?(2种方式)