1、需求场景:spout从ftp列表中拿到未读取的文件读取并发射行到Bolt,bolt进行业务处理后提交下一Bolt入库。用redis主要是:保存文件列表对象,使用分布锁来同步互斥访问共享对象,使文件处理不重复。

2、topo主函数代码:

package ct.topo;import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;import ct.tool.ComUtil;public class TopoMain {public static void main(String[] args) throws InterruptedException {//Configuration  Config conf = new Config();  //conf.setMaxSpoutPending(2); //缓存tuple   //conf.setMessageTimeoutSecs(5);  //  消息处理延时//conf.setNumAckers(2);           //  消息处理ackerconf.setNumWorkers(3);//设置个进程  //提取参数conf.put("city", args[0]); //地址,如深圳输入szconf.put("date", args[1]); //日期,如20160808//ftp服务器字符串格式:IP|port|username|passwordString strFtpSrv="127.0.0.1|21|name|pwd";conf.put("FtpSrv", strFtpSrv);//Topology definition  TopologyBuilder builder = new TopologyBuilder();  builder.setSpout("FTPReader",new FTPReader(),3);  //根据IMSI字段汇聚到同一task执行builder.setBolt("IMSICounter",new IMSICounter(),300).fieldsGrouping("FTPReader", new Fields("imsi"));builder.setBolt("DBWriter", new DBWriter(),50).shuffleGrouping("IMSICounter");  //集群生产模式  try {  //storm jar /mnt/dis.jar ct.topo.TopoMain sz 20160825 > /data/storm/log/debug.logStormSubmitter.submitTopology("O2OTopo", conf, builder.createTopology());  } catch (AlreadyAliveException e) {  // TODO Auto-generated catch block  e.printStackTrace();  } catch (InvalidTopologyException e) {  // TODO Auto-generated catch block  e.printStackTrace();  } catch (AuthorizationException e) {  // TODO Auto-generated catch block  e.printStackTrace();  }  //本地调试模式 //storm jar /mnt/dis.jar ct.topo.TopoMain sz 20160825 > /data/storm/log/debug.log &//conf.setMaxTaskParallelism(1);    //LocalCluster cluster = new LocalCluster();    //cluster.submitTopology("O2OTopo", conf, builder.createTopology()); }
}

3、spout代码:

package ct.topo;import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;import ct.tool.ComUtil;
import ct.tool.FTPHandle;
import ct.tool.RedisDisLock;
import redis.clients.jedis.Jedis;public class FTPReader extends BaseRichSpout{private SpoutOutputCollector collector;private String FtpSrv=null;private String FtpCD=null;private Jedis jd;public void ack(Object msgId) {//成功处理tuple//ComUtil.writeLogs("OK:"+msgId);}public void close() {this.jd.close();}public void fail(Object msgId) {//失败处理tupleComUtil.writeLogs("FTPReader.fail:"+msgId+"emit fail!");}/*** We will create the file and get the collector object* 三个参数,第一个是创建Topology时的配置,第二个是所有的Topology数据,第三个是用来把Spout的数据发射给bolt *  */public void open(Map conf, TopologyContext context,SpoutOutputCollector collector) {  this.jd = new Jedis("127.0.0.1",6379); //jedis长连接//初始化发射器this.collector = collector;//获取ftp服务器参数和文件读取路径FtpSrv=conf.get("FtpSrv").toString();String city=conf.get("city").toString();String date=conf.get("date").toString();FtpCD="/"+city+"/"+date;//文件目录}/*** ftp下载文件并解压,去除每一行发射* 下载的文件需要过滤已下载*/public void nextTuple() { //Utils.sleep(200);String fileName=null;try{//从redis的FtpFileMap对象中获取未读取的文件名//业务处理,获取一份未读取的文件名字列表            Iterator<String> iter=jd.hkeys("FtpFileMap").iterator(); while (iter.hasNext()){ String key = iter.next(); List<String> list=jd.hmget("FtpFileMap",key);String value=list.get(0);if(value.equals("n")) {fileName=key;//如果文件未读取,则提取        //非阻塞拿锁,nexttuple不能阻塞,本身就是循环,没拿到锁直接返回if (RedisDisLock.acquireLockNonBlocking(jd,"FtpFileLock")){return;}             jd.hset("FtpFileMap", key,"y");//设置为已读,原子性更新操作//释放锁RedisDisLock.releaseLock(jd, "FtpFileLock");break;}}    jd.close(); }catch(Exception e){ComUtil.writeLogs("FTPReader.nextTuple:"+e.getMessage());}try{//对未读取的文件列表下载并解压、读取并发射if(fileName!=null){String localFileName=FTPHandle.FtpFileDownLoad(FtpSrv,FtpCD,fileName);BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(localFileName)));String line=null;List<String> baselist=jd.lrange("BaseList",0,-1);//返回所有元素while((line = reader.readLine()) != null){if (line != null){String[] fields=line.split("\\|",-1);if(fields.length>=12){String imsi=fields[4];String userno=fields[5];String lac=fields[9];String ci=fields[10];String starttime=fields[11];this.collector.emit(new Values(imsi,userno,lac,ci,starttime),imsi);                  }}}}}reader.close();ComUtil.writeLogs("FTPReader.nextTuple:"+localFileName+":File has been processed successfully!");}        }catch(Exception e){ComUtil.writeLogs("FTPReader.nextTuple:"+e.getMessage());}}/*** Declare the output field */public void declareOutputFields(OutputFieldsDeclarer declarer) {String[] fields=new String[]{"imsi","userno","lac","ci","starttime"};declarer.declare(new Fields(fields));}
}

4、Bolt代码的业务逻辑就不体现;

5、总结:集成redis,主要是保存共享的对象,然后在对象访问时使用分布锁来互斥操作,锁内是原子性操作,nexttuple要使用非阻塞锁。但加锁会影响性能,在调试过程中出现部分记录发射失败且spout/bolt任务停止情况,需要进一步优化。

本文提供了一种解决思路,就是集成reids,应用其分布锁,解决多spout重复处理输入文件的情况。

在线实时大数据平台Storm集成redis开发(分布锁)相关推荐

  1. 在线实时大数据平台Storm集群组件学习

    Hadoop常用于离线的复杂的大数据处理,Spark常用于离线的快速(轻量级)的大数据处理, Storm常用于在线的实时的大数据处理:这句话一定程度上反应了三套大数据平台的鲜明特征.Storm是一套实 ...

  2. 在线实时大数据平台Storm单机部署

    centos单机下部署storm,主要用于开发测试用.部署的IP地址用zoo1代替,要注意/etc/hosts下对应ip和主机名. 1.Zookeeper部署 Apache官网http://zooke ...

  3. 在线实时大数据平台Storm并行和通信机制理解

    1.storm系统角色和应用组件基本理解: 和Hadoop一起理解,清晰点. 1)物理节点Nimubus,负责资源分配和任务调度: 2)物理节点Supervisor负责接受nimbus分配的任务,启动 ...

  4. 在线实时大数据平台Storm输入源共享试验

    1.背景:topology程序提交集群模式运行试验,验证在同一文件输入源情况下,worker之间是否会重复输入处理,以及数据变量能否在不同worker之间共享,如果文件新增数据,topology会不会 ...

  5. 在线实时大数据平台Storm并行度试验

    集群模式试验:同一文件输入数据如何处理,数据变量共享 1)集群模式一个worker内一个spout一个Bolt jps:1个worker storm list:1个wokers,4个tasks 2)集 ...

  6. 在线实时大数据平台Storm开发之wordcount

    可以在Eclipse下通过Maven引入storm-starter项目,这里直接将storm目录下lib中的jar包引入到工程中. 由于storm-core-1.0.1.jar中带有default.y ...

  7. 在线实时大数据平台Storm本地模式运行的一个小发现

    1.现象:生产中分别部署了两台服务器,独立运行storm,然后拓扑程序提交是本地模式,发现不用启动storm和zookeeper也可以运行: #jps  没有下面进程  QuorumPeerMain ...

  8. 在线实时大数据平台Storm版本兼容的问题

    部署了storm1.0.1最新版,但原来生产的程序是storm0.8.2版本并在该版本环境中运行,直接将程序放到1.0.1环境中storm jar运行失败. 重构程序,引入storm-core-1.0 ...

  9. 工商银行实时大数据平台建设历程及展望

    ‍ 摘要:本文整理自中国工商银行大数据平台负责人袁一在 Flink Forward Asia 2021 的分享.主要内容包括: 工行实时大数据平台建设历程 工行实时大数据平台建设思路 展望 Tips: ...

最新文章

  1. 九度oj 题目1411:转圈
  2. 虚拟机及VmBasic编译引擎实现
  3. 胡言乱语集锦-大数据,手机,传统,养生
  4. python阿里巴巴排名_python使用urllib模块和pyquery实现阿里巴巴排名查询
  5. 利用滞后——超前系统解决高阶随动系统(课程设计)
  6. java时间戳龙_Java时间戳与日期格式字符串的互转
  7. r语言c50算法的过程,【机器学习与R语言】5-规则学习算法
  8. Charles进行弱网测试
  9. 软件工程实践 Blog17
  10. 深圳宝安学区房_查查吧深圳学区地图
  11. linux 检测SSD寿命
  12. 1.1 windows环境安装Perl
  13. 《headfirst设计模式》读书笔记9-迭代器和组合模式
  14. 2021年vmware安装archlinux
  15. 【OpenCV入门指南】第十篇 彩色直方图均衡化
  16. 你不知道的USB知识二——USB认证
  17. 大二物竞金牌转北大计算机,2011年第28届全国中学生物理竞赛决赛金牌选手去向表...
  18. 三方登录---新浪微博登录
  19. 理解GAM和SGAM页
  20. Ubuntu 安装 wine (使用windows下软件)

热门文章

  1. ios html双击下移,H5页面在ios上双击div,导致屏幕上移的js解决办法
  2. javascript 权威指南第7版_免费领书 | 气相色谱与质谱实用指南(原著第2版)
  3. BZOJ 1852 [MexicoOI06]最长不下降序列(贪心+DP+线段树+离散化)
  4. MapReduce源码分析之JobSplitWriter
  5. C#调试中,在VS的输出窗口或即时窗口显示消息
  6. Android设备唯一性判断
  7. WINCE应用BINFS
  8. AA065VD数据线连接错位的现象及分析总结
  9. 基于WINCE6.0的nandflash驱动(基于K9F1G08U0B)
  10. 接口测试工具Postman(转)