目录

  • 前言
  • Storm基础
    • Storm到底是什么
      • mysql,hadoop与storm关系
      • 自己搞一套storm?
      • storm的特点
  • Storm的集群架构以及核心概念
    • Storm的集群架构
    • Storm核心概念
  • Storm的并行度以及流分组
    • 什么是并行度,什么是流分组
    • 流分组策略
      • 常用
      • 不常用
  • storm 单词计数demo
    • 依赖
    • 代码
    • 测试
  • storm集群
    • 部署storm集群
    • storm集群运行拓扑作业

前言

  1. 接下来热数据的处理,缓存雪崩问题的解决 --> stormhystrix

对于这两个技术,都是关键性的会去影响你的热数据,缓存雪崩时的系统可用性和稳定性,学以致用,用到我们之前搭建的缓存系统架构中。

  1. storm,在做热数据这块,如果要做复杂的热数据的统计和分析,亿流量,高并发的场景下,最合适的技术就是storm
  2. hystrix,分布式系统的高可用性的限流,熔断,降级,还有缓存雪崩的方案,限流的技术

Storm基础

Storm到底是什么

mysql,hadoop与storm关系

  1. mysql:事务性系统,面临海量数据的尴尬
  2. hadoop:离线批处理,今天数据,明天计算
  3. storm:实时计算,来一条,计算结果

自己搞一套storm?

  1. 如果自己搞一套实时流系统出来,也是可以的,但是。。。。
  • 花费大量的时间在底层技术细节上:如何部署各种中间队列,节点间的通信,容错,资源调配,计算节点的迁移和部署,等等
  • 花费大量的时间在系统的高可用上问题上:如何保证各种节点能够高可用稳定运行
  • 花费大量的时间在系统扩容上:吞吐量需要扩容的时候,你需要花费大量的时间去增加节点,修改配置,测试,等等
  1. 国内,国产的实时大数据计算系统,唯一做得影响力特别大,特别牛逼的,就是JStorm,阿里国内顶尖
  2. JStorm,clojure编程预压,Java重新写了一遍,Galaxy流式计算的系统,百度,腾讯,也都自己做了,也能做得很好
  3. 普通公司非常困难的…

storm的特点

  1. 支撑各种实时类的项目场景:实时处理消息以及更新数据库,基于最基础的实时计算语义和API(实时数据处理领域);对实时的数据流持续的进行查询或计算,同时将最新的计算结果持续的推送给客户端展示,同样基于最基础的实时计算语义和API(实时数据分析领域);对耗时的查询进行并行化,基于DRPC,即分布式RPC调用,单表30天数据,并行化,每个进程查询一天数据,最后组装结果

  2. 高度的可伸缩性:如果要扩容,直接加机器,调整storm计算作业的并行度就可以了,storm会自动部署更多的进程和线程到其他的机器上去,无缝快速扩容

  3. 数据不丢失的保证:storm的消息可靠机制开启后,可以保证一条数据都不丢,也不重复计算

  4. 超强的健壮性:从历史经验来看,storm比hadoop、spark等大数据类系统,健壮的多的多,因为元数据全部放zookeeper,不在内存中,随便挂都不要紧,重启简单,稳定性和可用性很高

  5. 使用的便捷性:核心语义非常的简单,开发起来效率很高开发API还是很简单的

大白话讲解

Storm的集群架构以及核心概念

Storm的集群架构


Nimbus,Supervisor,ZooKeeper,Worker,Executor,Task

Storm核心概念

  1. Topology:拓扑,务虚的一个概念
  2. Spout:数据源的一个代码组件,就是可以实现一个spout接口,写一个java类,在这个spout代码中,可以自己尝试去数据源获取数据,比如说从kafka中消费数据
  3. bolt:一个业务处理的代码组件,可以认为spout会将数据传送给bolt,各种bolt还可以串联成一个计算链条,也可以认为写java类实现一个bolt接口

一堆spout+bolt,就会组成一个topology,就是一个拓扑,实时计算作业,spout+bolt,一个拓扑涵盖数据源获取/生产+数据处理的所有的代码逻辑

  1. tuple:就是一条数据,每条数据都会被封装在tuple中,在多个spout和bolt之间传递
  2. stream:就是一个流,务虚的一个概念,抽象的概念,源源不断过来的tuple,就组成了一条数据流

Storm的并行度以及流分组

  1. 对storm的核心的基本原理,门儿清,你都很清楚,集群架构、核心概念、并行度和流分组
  2. 掌握最常见的storm开发范式,spout消费kafka,后面跟一堆bolt,bolt之间设定好流分组的策略
  3. 在bolt中填充各种代码逻辑
  4. 了解如何将storm拓扑打包后提交到storm集群上去运行
  5. 掌握如何能够通过storm ui去查看你的实时计算拓扑的运行现状
  6. 如果说,恰巧没人负责维护storm集群,也没什么大数据的团队,那么你可能需要说再去深入学习一下storm
  7. 当然了,如果你的场景不是特别复杂,整个数据量也不是特别大,其实自己主要研究一下,怎么部署storm集群

什么是并行度,什么是流分组

比如设想一个拓扑结构,几个spout,几个bolt,各种流分组情况下,数据是怎么流向的,要求具体画出集群架构中的流向

worker,executor,task,supervisor,数据流向

  1. 并行度:Worker->Executor->Task,没错,是Task

  2. 流分组:Task与Task之间的数据流向关系,流分组策略

图中两个executor,bolt1,数据流向bolt2的策略,bolt1的每个task流向bolt2的那个task的策略

流分组策略

常用

  1. Shuffle Grouping:随机发射,负载均衡
  2. Fields Grouping:根据某一个或者某些个fields,进行分组,如果这一个或者多个fields如果值完全相同的话,那么这些tuple,就会发送给下游bolt的其中固定的一个task

你发射的每条数据是一个tuple,每个tuple中有多个field作为字段,比如tuple,3个字段,name,age,salary,数据{“name”: “tom”, “age”: 25, “salary”: 10000} -> 1个tuple -> 3个field,name,age,salary

不常用

基本上用不到

All Grouping
Global Grouping
None Grouping
Direct Grouping
Local or Shuffle Grouping

storm 单词计数demo

storm wordcount demo 程序,蕴含了很多的知识点:

  1. 写一个Spout
  2. 写一个Bolt
  3. api怎么用的:OutputCollectorDeclarer
  4. 写一个Topology
  5. 怎么设置workerexecutortask流分组

资料地址:https://blog.csdn.net/qq_34246646/article/details/104596143
项目地址:eshop-study
切换到相应分支:

依赖

<dependency><groupId>org.apache.storm</groupId><artifactId>storm-core</artifactId><version>1.1.0</version>
</dependency>
<dependency><groupId>commons-collections</groupId><artifactId>commons-collections</artifactId><version>3.2.1</version>
</dependency>
<dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId>
</dependency>

代码

功能:storm 源源不断的接收到一些句子,然后需要实时的统计出句子中每个单词的出现次数。

/*** @Author luohongquan* @Description storm demo -- 单词计数拓扑* @Date 21:38 2020/3/16*/
public class WordCountTopology {/*** spout* * spout,继承一个基类,实现接口,这个里面主要是负责从数据源获取数据* * 这里作为一个简化,就不从外部的数据源去获取数据了,只是自己内部不断发射一些句子* * @author Administrator**/public static class RandomSentenceSpout extends BaseRichSpout {private static final long serialVersionUID = 3699352201538354417L;private static final Logger LOGGER = LoggerFactory.getLogger(RandomSentenceSpout.class);private SpoutOutputCollector collector;private Random random;/*** open方法* * open方法,是对spout进行初始化的* * 比如说,创建一个线程池,或者创建一个数据库连接池,或者构造一个httpclient* */@SuppressWarnings("rawtypes")public void open(Map conf, TopologyContext context,SpoutOutputCollector collector) {// 在open方法初始化的时候,会传入进来一个东西,叫做SpoutOutputCollector// 这个SpoutOutputCollector就是用来发射数据出去的this.collector = collector;// 构造一个随机数生产对象this.random = new Random();}/*** nextTuple方法* * 这个spout类,之前说过,最终会运行在task中,某个worker进程的某个executor线程内部的某个task中* 那个task会负责去不断的无限循环调用nextTuple()方法* 无限循环调用,可以不断发射最新的数据出去,形成一个数据流* */public void nextTuple() {Utils.sleep(100); String[] sentences = new String[]{"the cow jumped over the moon","an apple a day keeps the doctor away","four score and seven years ago","snow white and the seven dwarfs","i am at two with nature"};// 随机发送数组中的一个句子String sentence = sentences[random.nextInt(sentences.length)];LOGGER.info("【发射句子】sentence=" + sentence);  // 这个values,你可以认为就是构建一个tuple// tuple是最小的数据单位,无限个tuple组成的流就是一个streamcollector.emit(new Values(sentence)); }/*** 很重要,这个方法是定义一个你发射出去的每个tuple中的每个field的名称是什么* */public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("sentence"));   }}/*** bolt,直接继承一个BaseRichBolt基类* * 实现里面的所有的方法即可,每个bolt代码,同样是发送到worker某个executor的task里面去运行* * @author Administrator**/public static class SplitSentence extends BaseRichBolt {private static final long serialVersionUID = 6604009953652729483L;private OutputCollector collector;/*** 对于bolt来说,第一个方法,就是prepare方法* * OutputCollector,这个也是Bolt的这个tuple的发射器* */public void prepare(Map conf, TopologyContext context, OutputCollector collector) {this.collector = collector;}/*** execute方法* * 就是说,每次接收到一条数据后,就会交给这个executor方法来执行* */public void execute(Tuple tuple) {String sentence = tuple.getStringByField("sentence"); String[] words = sentence.split(" "); for(String word : words) {collector.emit(new Values(word)); }}/*** 定义发射出去的tuple,每个field的名称*/public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));   }}/** * @Author luohongquan * @Description 第二个 bolt * @Date 21:21 2020/3/16* @Param  * @return  */ public static class WordCount extends BaseRichBolt {private static final long serialVersionUID = 7208077706057284643L;private static final Logger LOGGER = LoggerFactory.getLogger(WordCount.class);private OutputCollector collector;private Map<String, Long> wordCounts = new HashMap<String, Long>();@Overridepublic void prepare(Map conf, TopologyContext context, OutputCollector collector) {this.collector = collector;}@Overridepublic void execute(Tuple tuple) {String word = tuple.getStringByField("word");Long count = wordCounts.get(word);if(count == null) {count = 0L;}count++;wordCounts.put(word, count);LOGGER.info("【单词计数】" + word + "出现的次数是" + count);  collector.emit(new Values(word, count));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word", "count"));    }}public static void main(String[] args) {// 将spout和bolts组合起来,构建成一个拓扑TopologyBuilder builder = new TopologyBuilder();// 这里的第一个参数的意思,就是给这个spout设置一个名字// 第二个参数的意思,就是创建一个spout的对象// 第三个参数的意思,就是设置spout的executor有几个builder.setSpout("RandomSentence", new RandomSentenceSpout(), 2);builder.setBolt("SplitSentence", new SplitSentence(), 5).setNumTasks(10).shuffleGrouping("RandomSentence");// 相同的单词,从SplitSentence发射出来时,一定会进入到下游的指定的同一个task中// 只有这样子,才能准确的统计出每个单词的数量// 比如你有个单词,hello,下游task1接收到3个hello,task2接收到2个hello,这样计算结果就不对了// 所以选择流分组策略: fieldsGrouping,5个hello,全都进入一个taskbuilder.setBolt("WordCount", new WordCount(), 10).setNumTasks(20).fieldsGrouping("SplitSentence", new Fields("word"));Config config = new Config();// 说明是在命令行执行,打算提交到storm集群上去if(args != null && args.length > 0) {// 设置3个workerconfig.setNumWorkers(3);try {// 参数1:拓扑名称// 参数2:拓扑配置StormSubmitter.submitTopology(args[0], config, builder.createTopology());} catch (Exception e) {e.printStackTrace();}} else {// 说明是在idea/eclipse里面本地运行config.setMaxTaskParallelism(20);LocalCluster cluster = new LocalCluster();cluster.submitTopology("WordCountTopology", config, builder.createTopology());Utils.sleep(60000);cluster.shutdown();}}}

测试

执行main方法后, 根据单词分别计数:

storm集群

storm集群部署,怎么将storm的拓扑扔到storm集群上去跑

部署storm集群

  1. 安装Java 7和Pythong 2.6.6

  2. 下载storm安装包,解压缩,重命名,配置环境变量

cd /usr/local
tar -zxvf apache-storm-1.1.0.tar.gz
mv apache-storm-1.1.0 stormvi ~/.bashrc
# 添加下面内容
export STORM_HOME=/usr/local/storm
export PATH=$PATH:$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin:$SCALA_HOME/bin:$STORM_HOME/bin

最后 source ~/.bashrc一下。

  1. 修改storm配置文件
mkdir /var/storm
vi /usr/local/storm/conf/storm.yaml
# 修改添加下面内容
storm.zookeeper.servers:     - "192.168.0.106"- "192.168.0.107"- "192.168.0.108"
storm.local.dir: "/var/storm"
nimbus.seeds: ["192.168.0.106"]
supervisor.slots.ports:- 6700- 6701- 6702- 6703

supervisor.slots.ports: 指定每个机器上可以启动多少个worker,一个端口号代表一个worker,这里我们相当于启动4个worker

  1. eshop-cache01[192.168.0.106]各配置copy到其他机器上
scp ~/.bashrc root@eshop-cache02:~/
scp -r /usr/local/storm root@eshop-cache02:/usr/local/
scp ~/.bashrc root@eshop-cache03:~/
scp -r /usr/local/storm root@eshop-cache03:/usr/local/
# 进入eshop02/03中,
mkdir -p /var/storm
source ~/.bashrc
  1. 启动storm集群和ui界面
  • eshop-cache01启动一个节点nimbusstorm nimbus >/dev/null 2>&1 &
  • eshop-cache01/02/03分别启动一个节点supervisorstorm supervisor >/dev/null 2>&1 &
  • eshop-cache01启动节点ui界面storm ui >/dev/null 2>&1 &
  • eshop-cache02/03两个supervisor节点上,启动logviewer,才能看到日志:storm logviewer >/dev/null 2>&1 &
  1. 访问一下ui界面,8080端口
http://192.168.0.106:8080/index.html

storm集群运行拓扑作业

  1. 将java工程storm-helloworld,进行打包:根目录下mvn clean package -DskipTests

  2. 提交作业到storm集群

storm jar /usr/local/storm-helloworld-0.0.1-SNAPSHOT.jar com.roncoo.eshop.storm.WordCountTopology WordCountTopology

打包参考文档: https://blog.csdn.net/m0_37809146/article/details/91128298?depth_1-utm_source=distribute.pc_relevant.none-task&utm_source=distribute.pc_relevant.none-task

  1. 在storm ui上观察storm作业的运行

  2. kill掉某个storm作业,比如我们上面部署的单词计数作业: WordCountTopology

storm kill WordCountTopology

42. 通俗易懂大数据实时计算Storm教程相关推荐

  1. 【大数据实时计算框架】Storm框架

    一.大数据实时计算框架 1.什么是实时计算?流式计算? (一)什么是Storm? Storm为分布式实时计算提供了一组通用原语,可被用于"流处理"之中,实时处理消息并更新数据库.这 ...

  2. 大数据实时计算工程师/Hadoop工程师/数据分析师职业路线图

    大数据实时计算工程师/Hadoop工程师/数据分析师职业路线图 http://edu.51cto.com/roadmap/view/id-29.html http://my.oschina.net/i ...

  3. 接近淘宝 80%的大数据实时计算平台,从0搭建的经验和坑

    上周一,来自武汉的直播平台斗鱼TV宣布C轮融资,腾讯领投的 15 亿人民币,距其获得 B 轮1亿美元不到半年,也是大写的牛逼. 但小寻更关心他们的大数据架构,作为一个在 2 年多时间里崛起的公司,其流 ...

  4. .NET 大数据实时计算--学习笔记

    摘要 纯 .Net 自研大数据实时计算平台,在中通快递服务数百亿包裹,处理数据万亿计!将分享大数据如何落地以及设计思路,技术重难点. 目录 背景介绍 计算平台架构 项目实战 背景介绍 计算平台架构 分 ...

  5. Flink大数据实时计算系列-案例初体验:HotPages

    Flink大数据实时计算系列-案例初体验:HotPages 目录 HotPages代码 输入日志 运行结果 HotPages代码 /*** Copyright (c) 2018-2028 尚硅谷 Al ...

  6. 1. 大数据实时计算介绍

    Spark Streaming,其实就是一种Spark提供的,对于大数据,进行实时计算的一种框架.它的底层,其实,也是基于我们之前讲解的Spark Core的.基本的计算模型,还是基于内存的大数据实时 ...

  7. Flink大数据实时计算系列-Flink的Operator Chains的优化机制

    Flink大数据实时计算系列-Flink的Operator Chains的优化机制 目录 Flink改变并行度 并行度改为3 并行度改为2 Flink Operator Chains Flink gr ...

  8. Flink大数据实时计算系列-Flink写出多个parquet小文件处理方法、Presto的介绍与使用场景

    Flink大数据实时计算系列-Flink写出多个parquet小文件处理方法.Presto的介绍与使用场景 Presto的安装与使用 目录 Flink写出多个parquet小文件处理方法 Presto ...

  9. Flink大数据实时计算系列-列式存储parquet文件格式介绍、Flink进行rowformat格式文件保存

    Flink大数据实时计算系列-列式存储parquet文件格式介绍 Flink进行rowformat格式文件保存 列式存储parquet文件格式介绍

最新文章

  1. 根据关键字检索相关视频
  2. 史上最低估自己的天才科学家!预言自己的发现无用,没想到影响全世界,可他却在37岁..........
  3. c语言第四作业答案,C语言第一次作业及答案
  4. git 提交代码的步骤
  5. 机器学习模型融合stack详解及代码实战
  6. 计算机单词 硬件类、软件类、网络类、其他
  7. linux读取ads1115ADC例程
  8. 婚宴座位图html5,婚宴座位图模版欣赏【婚礼纪】
  9. 拥抱趋势,蓄能跃迁——2018慧点科技企业协同及治理创新论坛圆满举行
  10. 《华杉讲透孙子兵法》分享
  11. 惠普星 TP01-055ccn电脑重装系统步骤
  12. 王者荣耀战力查询小程序源码下载-支持安卓ios微信和QQ战力查询支持打包成APP
  13. dellr420部署os_dell r720服务器OS部署
  14. VTK移动立方体法创建多个等值面的透视3D模型
  15. 从request获取各种路径总结 转载:http://blog.csdn.net/piaoxuan1987/article/details/8541839 equest.getRealPath()
  16. 常见电子邮件英文缩写
  17. javacv实现屏幕录制(一)
  18. MYSQL的sqlca详解_sql数据库如何使用
  19. 1.python自动化登录12306
  20. android音量界面,android 音量调节以及媒体音量界面

热门文章

  1. 常用DOS命令和Path环境变量的配置
  2. 真实评测 rtx3080ti对比rx6800xt选哪个好
  3. 使用bat批量删除修改子目录文件名后缀,超简单
  4. excel冻结窗格--冻结行列标题
  5. 微信多开软件苹果版_快手充值快币微信充值苹果版;
  6. 有关队列的操作 python
  7. WPF自定义控件与样式(13)-自定义窗体Window 自适应内容大小消息框MessageBox
  8. 苹果状态栏HTML,iphoneX 适配客户端H5页面的方法教程
  9. 一分钟教会你音频配音乐怎么制作
  10. NOIP2018 NearlyAFO