使用事物TridentTopology 持久化数据到MySQL1、构建拓扑JDBCTopology类packagestorm.trident.mysql;importjava.util.Arrays;importjava.util.Map;importorg.apache.storm.Config;importorg.apache.storm.LocalCluster;importorg.apache.storm.trident.TridentState;importorg.apache.storm.trident.TridentTopology;importorg.apache.storm.trident.operation.BaseFunction;importorg.apache.storm.trident.operation.CombinerAggregator;importorg.apache.storm.trident.operation.TridentCollector;importorg.apache.storm.trident.spout.IBatchSpout;importorg.apache.storm.trident.state.StateType;importorg.apache.storm.trident.testing.FixedBatchSpout;importorg.apache.storm.trident.testing.MemoryMapState;importorg.apache.storm.trident.tuple.TridentTuple;importorg.apache.storm.tuple.Fields;importorg.apache.storm.tuple.Values;/*** 事物Trident-MySQL Topology

*@authormengyao

**/@SuppressWarnings("all")public classJDBCTopology {public static voidmain(String[] args) {

TridentTopology topology= newTridentTopology();//Spout数据源

FixedBatchSpout spout = new FixedBatchSpout(new Fields("tels"), 7,new Values("189111 3"),new Values("135111 7"),new Values("189111 2"),new Values("158111 5"),new Values("159111 6"),new Values("159111 3"),new Values("158111 5")

);

spout.setCycle(false);//State持久化配置属性

JDBCStateConfig config = newJDBCStateConfig();

config.setDriver("com.mysql.jdbc.Driver");

config.setUrl("jdbc:mysql://localhost:3306/test");

config.setUsername("root");

config.setPassword("123456");

config.setBatchSize(10);

config.setCacheSize(10);

config.setType(StateType.TRANSACTIONAL);

config.setCols("tel");

config.setColVals("sum");

config.setTable("tbl_tel");

topology.newStream("spout", spout)

.each(new Fields("tels"), new KeyValueFun(), new Fields("tel", "money"))

.groupBy(new Fields("tel"))

.persistentAggregate(JDBCState.getFactory(config),new Fields("money"), new SumCombinerAgg(), new Fields("sum"));

LocalCluster cluster= newLocalCluster();

cluster.submitTopology("test1", newConfig(), topology.build());

}

}

@SuppressWarnings("all")class KeyValueFun extendsBaseFunction {

@Overridepublic voidexecute(TridentTuple tuple, TridentCollector collector) {

String record= tuple.getString(0);

collector.emit(new Values(record.split("\t")[0], record.split("\t")[1]));

}

}

@SuppressWarnings("all")class SumCombinerAgg implements CombinerAggregator{

@OverridepublicLong init(TridentTuple tuple) {return Long.parseLong(tuple.getString(0));

}

@OverridepublicLong combine(Long val1, Long val2) {

Long val= val1+val2;

System.out.println(val);returnval;

}

@OverridepublicLong zero() {return 0L;

}

}2、构建基于IBackingMap的JDBCState类packagestorm.trident.mysql;importjava.util.ArrayList;importjava.util.HashMap;importjava.util.List;importjava.util.Map;importorg.apache.storm.task.IMetricsContext;importorg.apache.storm.trident.state.OpaqueValue;importorg.apache.storm.trident.state.State;importorg.apache.storm.trident.state.StateFactory;importorg.apache.storm.trident.state.StateType;importorg.apache.storm.trident.state.TransactionalValue;importorg.apache.storm.trident.state.map.CachedMap;importorg.apache.storm.trident.state.map.IBackingMap;importorg.apache.storm.trident.state.map.NonTransactionalMap;importorg.apache.storm.trident.state.map.OpaqueMap;importorg.apache.storm.trident.state.map.TransactionalMap;

@SuppressWarnings("all")public class JDBCState implements IBackingMap{private staticJDBCStateConfig config;

JDBCState(JDBCStateConfig config){this.config =config;

}

@Overridepublic List multiGet(List>keys) {

StringBuilder sqlBuilder= new StringBuilder("SELECT ").append(config.getCols())

.append(","+config.getColVals())

.append(",txid")

.append(" FROM "+config.getTable())

.append(" WHERE ")

.append(config.getCols())

.append("='");

JDBCUtil jdbcUtil= newJDBCUtil(config.getDriver(),config.getUrl(),config.getUsername(),config.getPassword());

List result = new ArrayList();

Map map = null;for (Listlist : keys) {

Object key= list.get(0);

map= jdbcUtil.queryForMap(sqlBuilder.toString()+key+"'");

System.out.println(sqlBuilder.toString()+key+"'"+" 【"+map);

Bean itemBean=(Bean)map.get(key);long txid=0L;long val=0L;if (itemBean!=null) {

val=itemBean.getSum();

txid=itemBean.getTxid();

}if (config.getType()==StateType.OPAQUE) {

result.add(newOpaqueValue(txid, val));

}else if (config.getType()==StateType.TRANSACTIONAL) {

result.add(newTransactionalValue(txid, val));

}else{

result.add(val);

}

}return (List) result;

}

@Overridepublic void multiPut(List> keys, Listvals) {//构建新增SQL

StringBuilder sqlBuilder = new StringBuilder("INSERT INTO ").append(config.getTable())

.append("("+config.getCols())

.append(","+config.getColVals())

.append(",txid")

.append(",time")

.append(") VALUES ");for (int i = 0; i < keys.size(); i++) {

List key =keys.get(i);if (config.getType()==StateType.TRANSACTIONAL) {

TransactionalValue val=(TransactionalValue)vals.get(i);

sqlBuilder.append("(");

sqlBuilder.append(key.get(0));

sqlBuilder.append(",");

sqlBuilder.append(val.getVal());

sqlBuilder.append(",");

sqlBuilder.append(val.getTxid());

sqlBuilder.append(",NOW()");

sqlBuilder.append("),");

}

}

sqlBuilder.setLength(sqlBuilder.length()-1);

System.out.println(sqlBuilder.toString());//新增数据

JDBCUtil jdbcUtil = newJDBCUtil(config.getDriver(),config.getUrl(),config.getUsername(),config.getPassword());

jdbcUtil.insert(sqlBuilder.toString());

}public staticFactory getFactory(JDBCStateConfig config) {return newFactory(config);

}static class Factory implementsStateFactory {private staticJDBCStateConfig config;publicFactory(JDBCStateConfig config) {this.config =config;

}

@Overridepublic State makeState(Map conf, IMetricsContext metrics, int partitionIndex, intnumPartitions) {final CachedMap map = new CachedMap(newJDBCState(config), config.getCacheSize());

System.out.println(config);if(config.getType()==StateType.OPAQUE) {returnOpaqueMap.build(map);

}else if(config.getType()==StateType.TRANSACTIONAL){returnTransactionalMap.build(map);

}else{returnNonTransactionalMap.build(map);

}

}

}

}3、构建基于IBackingMap的JDBCStateConfig配置类packagestorm.trident.mysql;importjava.util.List;importorg.apache.storm.trident.state.StateType;

@SuppressWarnings("all")public classJDBCStateConfig {privateString url;privateString driver;privateString username;privateString password;privateString table;private intbatchSize;privateString cols;privateString colVals;private int cacheSize = 100;private StateType type =StateType.OPAQUE;publicString getUrl() {returnurl;

}public voidsetUrl(String url) {this.url =url;

}publicString getDriver() {returndriver;

}public voidsetDriver(String driver) {this.driver =driver;

}publicString getUsername() {returnusername;

}public voidsetUsername(String username) {this.username =username;

}publicString getPassword() {returnpassword;

}public voidsetPassword(String password) {this.password =password;

}publicString getTable() {returntable;

}public voidsetTable(String table) {this.table =table;

}public intgetBatchSize() {returnbatchSize;

}public void setBatchSize(intbatchSize) {this.batchSize =batchSize;

}publicString getCols() {returncols;

}public voidsetCols(String cols) {this.cols =cols;

}publicString getColVals() {returncolVals;

}public voidsetColVals(String colVals) {this.colVals =colVals;

}public intgetCacheSize() {returncacheSize;

}public void setCacheSize(intcacheSize) {this.cacheSize =cacheSize;

}publicStateType getType() {returntype;

}public voidsetType(StateType type) {this.type =type;

}

@OverridepublicString toString() {return "Test2StateConfig [url=" + url + ", driver=" + driver + ", username=" + username + ", password="

+ password + ", table=" + table + ", batchSize=" + batchSize + ", cols=" +cols+ ", colVals=" + colVals + ", cacheSize=" + cacheSize + ", type=" + type + "]";

}

}4、构建JDBC工具类和实体Beanpackagestorm.trident.mysql;importjava.sql.Connection;importjava.sql.DriverManager;importjava.sql.PreparedStatement;importjava.sql.ResultSet;importjava.sql.SQLException;importjava.util.HashMap;importjava.util.Map;public classJDBCUtil {privateString driver;privateString url;privateString username;privateString password;privateConnection connection;privatePreparedStatement ps;privateResultSet rs;publicJDBCUtil(String driver, String url, String username, String password) {this.driver =driver;this.url =url;this.username =username;this.password =password;

init();

}voidinit(){try{

Class.forName(driver);

}catch(ClassNotFoundException e) {

e.printStackTrace();

}

}public booleaninsert(String sql) {int state = 0;try{

connection=DriverManager.getConnection(url, username, password);

ps=connection.prepareStatement(sql);

state=ps.executeUpdate();

}catch(SQLException e) {

e.printStackTrace();

}finally{try{

ps.close();

connection.close();

}catch(SQLException e) {

e.printStackTrace();

}

}if (state>0) {return true;

}return false;

}public MapqueryForMap(String sql) {

Map result = new HashMap();try{

connection=DriverManager.getConnection(url, username, password);

ps=connection.prepareStatement(sql);

rs=ps.executeQuery();if(rs.next()){

Bean iteBean=new Bean(rs.getString("tel"), rs.getLong("sum"), rs.getLong("txid"), null);

result.put(rs.getString("tel"), iteBean);

}

}catch(SQLException e) {

e.printStackTrace();

}finally{try{

ps.close();

connection.close();

}catch(SQLException e) {

e.printStackTrace();

}

}returnresult;

}publicString getDriver() {returndriver;

}public voidsetDriver(String driver) {this.driver =driver;

}publicString getUrl() {returnurl;

}public voidsetUrl(String url) {this.url =url;

}publicString getUsername() {returnusername;

}public voidsetUsername(String username) {this.username =username;

}publicString getPassword() {returnpassword;

}public voidsetPassword(String password) {this.password =password;

}

}packagestorm.trident.mysql;public classBean {privateString tel;private longsum;private longtxid;privateString time;publicBean(){

}public Bean(String tel, long sum, longtxid, String time) {super();this.tel =tel;this.sum =sum;this.txid =txid;this.time =time;

}publicString getTel() {returntel;

}public voidsetTel(String tel) {this.tel =tel;

}public longgetSum() {returnsum;

}public void setSum(longsum) {this.sum =sum;

}public longgetTxid() {returntxid;

}public void setTxid(longtxid) {this.txid =txid;

}publicString getTime() {returntime;

}public voidsetTime(String time) {this.time =time;

}

@OverridepublicString toString() {return "Bean [tel=" + tel + ", sum=" + sum + ", txid=" + txid + ", time=" + time + "]";

}

}

storm trident mysql_Trident-MySQL相关推荐

  1. Storm Trident拓扑中的错误处理

    这篇文章总结了我在设计Storm Trident拓扑时当前的错误处理方法. 我在这里关注代码设计,而不是监督或冗余之类的部署良好实践. 由于Storm的实时流性质,当面对大多数错误时,我们最终将不得不 ...

  2. Storm Trident API

    在Storm Trident中有五种操作类型 Apply Locally:本地操作,所有操作应用在本地节点数据上,不会产生网络传输 Repartitioning:数据流重定向,单纯的改变数据流向,不会 ...

  3. Storm精华问答 | Storm如何连接MySQL?

    Storm是Twitter开源的分布式实时大数据处理框架,被业界称为实时版Hadoop.随着越来越多的场景对Hadoop的MapReduce高延迟无法容忍,大数据实时处理解决方案的应用日趋广泛,目前已 ...

  4. Storm Trident示例shuffleparallelismHint

    本例包括Storm Trident中shuffle与parallelismHint的使用. 代码当中包括注释 maven <dependency><groupId>org.ap ...

  5. Storm Trident简介

    转载自:[翻译][Trident] Storm Trident 教程 英文原址:https://github.com/nathanmarz/storm/wiki/Trident-tutorial -- ...

  6. [Trident] Storm Trident 教程,state详解、trident api详解及实例

    英文原址:https://github.com/nathanmarz/storm/wiki/Trident-tutorial ---------------- Trident是在storm基础上,一个 ...

  7. Storm Trident 详细介绍

    一.概要 1.1 Storm(简介)      Storm是一个实时的可靠地分布式流计算框架.      具体就不多说了,举个例子,它的一个典型的大数据实时计算应用场景:从Kafka消息队列读取消息( ...

  8. Storm - Trident

    [align=center][size=large]Trident[/size][/align] 一.Storm 保证性 1.数据一定会发送 通过 ack / fail 方法确认,若失败,则提供重新发 ...

  9. storm trident mysql,storm_Trident

    简介: Trident 是 Storm 的一种高度抽象的实时计算模型,它可以将高吞吐量(每秒百万级)数据输入.有状态的流式处理与低延时的分布式查询无缝结合起来.如果你了解 Pig 或者 Cascadi ...

最新文章

  1. golang install/build 生成的文件命名和路径
  2. Antd组件中单选框、复选框自定义样式的优雅实现
  3. LBP(局部二值模式)特征提取原理
  4. 分享一个有趣的学习方法,欢迎一起探讨如何提高学习兴趣作者:1-2-3 来源:博客园 发布时间:2009-03-09 16:20 阅读:2820 次 原文链接 [收藏]
  5. 职场‘下班沉默症’调查
  6. 分布式缓存服务是什么?
  7. 『天涯杂谈』走的地方越多,越觉的中国的狭隘
  8. java国际化程序创建属性文件_Java国际化(i18n)ResourceBundle类
  9. ubuntu 20.04中火狐浏览器安装flash插件
  10. java j2se下载_J2SE基础之下载eclipse并创建项目
  11. 计算机怎么算折扣价格,5折怎么算-商场打折背后的猫腻:满100减50和打5折哪个更划算?...
  12. 从play_mp3例程出发理解ESP32-ADF的使用方法
  13. 金泰克/tigo S300 240G SM2256K H27QFG8PEM5R 完整开盘教程
  14. 内容推荐Mahout
  15. Camera Tuning
  16. 09年第一次团队活动感想
  17. 优盘制作服务器引导盘,USBOS V3.0彪悍版U盘启动盘制作工具-用于PC/工控机/服务器/Surface/Mac...
  18. spring-boot-starter-custom
  19. 让计算变简单 华为RH2288HV3服务器评测
  20. SVG的国旗图标集合flag-icon-css在vue中的使用

热门文章

  1. 云队友丨李善友:人生为一件大事而来!
  2. C#,使用office组件Microsoft.Office.Interop.Word,将网页内容下载为word的demo及权限配置要点。
  3. 【快代理】独享代理使用教程
  4. FME支持的Autodesk Revit 格式概要
  5. 为什么没有参数的函数(与实际函数定义相比)会编译?
  6. 智能家居论文文献_智能家居控制系统界面设计结论与参考文献
  7. 大气快速辐射传输模型RTTOV12.2安装教程及心得体会
  8. 5G时代下的室内定位技术--精准室内定位--新导智能
  9. 如何制作一个可以自动更新的Github个人主页
  10. 白帽子黑客教你:如何下载Windows原版操作系统?(2种方式)