大数据课程——Storm综合应用

实验内容以及要求

假设在某一搜索应用中,需要实时统计搜索次数最多的热门关键词,并按照实时统计搜索次数输出最热门的20大热门关键词及被搜索次数。用户搜索的日志通过Flume采集,并写入Kafka,Storm从Kafka中实时读取数据,并完成实时统计工作,并进行输出。

提示:
(1)搜索日志可以采用实验5的数据(搜狗搜索数据),一行代表一次用户搜索;
(2)Flume Agent的Source可以配置为syslogudp类型(端口5640),监控搜索日志;
(3)输出形式自定。

问题总结

Agent配置时的Source类型

Client包是模拟数据包的产生的,将数据发往5640端口。因此Flume的Agent配置要写明Source是syslogudp类型,并且监控5640端口。

Storm的UI界面端口问题

Storm的UI界面端口是可以改动的,默认为8080。但在实验室里做实验的话,就发现老师已经吧端口改了,改成了8099,因此要访问localhost:8099才可以进入Storm的UI界面,否则会出现404错误,这个要注意。

Key、Value问题

写代码的时候,一开始照着书上的Kafka整合Storm的代码来写,但其实获取数据的时候,其实Key是没东西的,Value才能获得具体数据。Key得到的全是null。如下图所示。

吐槽一下

顺便吐槽一下,实验数据输出的排行榜略少儿不宜。但没办法,只能说数据太真实了哈哈哈。
另外具体代码放在本文最后面。

实验步骤

Flume Agent 配置


本次将Centos01作为Flume的Agent,负责监控5640端口,拦截收到的数据包,并将其中的数据存储到Kafka集群的topictest话题中。
所以将Source的类型设置为syslogudp,监控5640端口,接受其中产生的数据包。
Sink设置为Kafka集群,将得到的数据包传入kafka集群中。

Storm代码编写思路


如上图所示,我的项目中主体由KafkaSpout、SplitDataBolt、WordCountBolt、ReportBolt四项组成,数据在其中依次流动,StormTopology对他们进行设置、调用。(代码在文末)。
KafkaSpout作为数据源,从kafka集群中提取需要的数据,并且发送给SplitDataBolt进行处理。
SplitOutBolt作为一个处理Bolt,将接收到的数据进行分割处理,从中提取出关键字字段,并且设置值为1(因为出现一次),发送给WordCountBolt进行统计。
WordCountBolt作为第二个处理Bolt,对关键词的出现次数进行统计和存储,并将统计结果发送给ReportBolt。
ReportBolt作为第三个Bolt,将接收到的“关键词-出现次数”数据进行存储,并且按照“出现次数”进行排序,将排序结果进行输出,得到最终的结果排行榜。

代码展示

首先依次开启Zookeeper、HDFS、HA集群、Kafka、Flume、Storm。并且可以进入Storm的UI界面来测试Storm是否开启成功,其他的集群开启这里不予以赘述。


系统开启完毕后,就可以运行Client.jar包,该JAR包的作用是负责从数据集中提取数据,整合成数据包,并且随机间隔一定时间向5640端口发送该数据包。这样就可以模拟真实的数据产生,并且被Flume抓住该数据包,发送到Kafka集群中。


如果一切顺利的话,可以开启一个消费者,查看topictest话题,能看到数据正常被发送到Kafka集群中。如下图所示。

项目代码写好后,打包成Storm.jar包,在Centos01中运行,成功运行后如下图所示。


每间隔一段时间,都会打印出排行榜信息,如下图所示。可以看到随着时间的增加,排行榜是在根据数据不断变化的,不断输出排名前20的关键词排行信息,可以看到关键词以及其出现次数统计。

代码

KafkaSpout

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;/*** @author: 冰冷灬泡面* @date: 2021/5/7 13:31* @description:* @modifiedBy:*/
public class KafkaSpout extends BaseRichSpout {private static final long serialVersionUID = 7582771881226024741L;private KafkaConsumer<String, String> consumer;SpoutOutputCollector collector;/*---------------设置Kafka配置-------------*/public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {this.collector = spoutOutputCollector;//Kafka属性信息Properties props = new Properties();props.put("bootstrap.servers", "centos01:9092");props.put("group.id", "test");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//实例化消费者,设置消费主题consumer = new KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList("topictest"));}/*------------从Kafka集群中获取数据-------------*/public void nextTuple() {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));for (ConsumerRecord<String, String> record : records) {String key = record.key();String value = record.value();//其实KEY值是没有的,Value值才是有的//所以这里吧value的值赋给了Keykey = value;System.out.println("key " + key);
//            System.out.println("value " + value);collector.emit(new Values(key));}}public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare(new Fields("key"));}
}

SplitDataBolt

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;import java.util.Map;/*** @author: 冰冷灬泡面* @date: 2021/5/7 13:23* @description:* @modifiedBy:*/
public class SplitDataBolt extends BaseRichBolt {private static final long serialVersionUID = 1L;private OutputCollector outputcollector;@Overridepublic void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {this.outputcollector = outputCollector;}/*--------------对接收到的Tuple进行处理--------------------*/@Overridepublic void execute(Tuple tuple) {String key =tuple.getStringByField("key");//分割数据String[] words =key.split("\t");//提取关键词字段String keyword = words[2].trim();Integer cnt = 1;//发送给下一boltthis.outputcollector.emit(new Values(keyword, cnt));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare(new Fields("keyword", "cnt"));}
}

WordCountBolt

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;import java.util.HashMap;
import java.util.Map;/*** @author: 冰冷灬泡面* @date: 2021/5/7 13:23* @description:* @modifiedBy:*/
public class WordCountBolt extends BaseRichBolt {private static final long serialVersionUID = 2374950653902413273L;private OutputCollector outputcollector;@Overridepublic void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {this.outputcollector = outputCollector;this.keywordMap = new HashMap<String, Integer>();}/*----------存储、统计关键词和出现次数------------*///定义存放单词与词频的Mapprivate HashMap<String, Integer> keywordMap = null;@Overridepublic void execute(Tuple tuple) {String keyword = tuple.getStringByField("keyword");int cnt = tuple.getIntegerByField("cnt");Integer sum = keywordMap.get(keyword);if (sum == null) {sum = 0;}sum++;keywordMap.put(keyword, sum);this.outputcollector.emit(new Values(keyword, sum));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare(new Fields("keyword", "sum"));}
}

ReportBolt


import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;import java.util.*;
import java.util.Map.Entry;/*** @author: 冰冷灬泡面* @date: 2021/5/7 13:23* @description:* @modifiedBy:*/
public class ReportBolt extends BaseRichBolt {private static final long serialVersionUID = -1512537746316594950L;@Overridepublic void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {this.keywordMap = new HashMap<String, Integer>();}/*-------------对存储的数据进行排序,并且输出出现次数最多的前20位关键词------------*/private HashMap<String, Integer> keywordMap = null;@Overridepublic void execute(Tuple tuple) {String keyword = tuple.getStringByField("keyword");int count = tuple.getIntegerByField("sum");keywordMap.put(keyword, count);List<Entry<String, Integer>> list = new ArrayList<Entry<String, Integer>>(keywordMap.entrySet());Collections.sort(list, new Comparator<Entry<String, Integer>>() {//升序排列@Overridepublic int compare(Entry<String, Integer> o1, Entry<String, Integer> o2) {return (o2.getValue() - o1.getValue());}});//输出排名前20的数据int n = list.size() <= 20 ? list.size() : 20;String result = "";for (int i = 0; i < n; i++) {Entry<String, Integer> entry = list.get(i);String sortWord = entry.getKey();Integer sortCount = entry.getValue();result += sortWord + " ------- " + sortCount +"\n";}System.out.println("--------(关键词)--------搜索关键词排行榜--------(搜索次数)--------");System.out.println(result);}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {}
}

StormTopology

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;/*** @author: 冰冷灬泡面* @date: 2021/5/7 14:25* @description:* @modifiedBy:*/
public class StormTopology {public static void main(String[] args) throws Exception {KafkaSpout kafkaSpout = new KafkaSpout();SplitDataBolt splitDataBolt = new SplitDataBolt();WordCountBolt wordCountBolt = new WordCountBolt();ReportBolt reportBolt = new ReportBolt();//创建一个拓扑TopologyBuilder topologyBuilder = new TopologyBuilder();//TODO 这些命名还有fields的参数可能会出问题,可以看看//设置Spout,名称为"kafka-spout",并行度为2(也就是线程数),任务数为4(也就是实例数)。默认是1个线程,1个任务。  如果不指定Task数量,则一个线程执行一个Task,Task数量与线程数量一样。topologyBuilder.setSpout("kafka-spout", kafkaSpout,2).setNumTasks(4);//设置bolt,名称为"split-bolt",数据来源是名称为"sentence-spout"的spout,//ShuffleGrouping:随机选择一个Task来发送,对Task的分配比较均匀。topologyBuilder.setBolt("split-bolt", splitDataBolt,2).setNumTasks(4).shuffleGrouping("kafka-spout");//FiledGrouping:根据Tuple中Fields来做一致性hash,相同hash值的Tuple被发送到相同的Task。topologyBuilder.setBolt("count-bolt", wordCountBolt,2).setNumTasks(4).fieldsGrouping("split-bolt", new Fields("keyword"));//GlobalGrouping:所有的Tuple会被发送到某个Bolt中的id最小的那个Task,此时不管有多少个Task,只发往一个TasktopologyBuilder.setBolt("report-bolt", reportBolt,2).setNumTasks(4).globalGrouping("count-bolt");Config config = new Config();LocalCluster cluster = new LocalCluster();//本地模式 ,第一个参数为定义拓扑名称
//      cluster.submitTopology("word-count-topology", config, topologyBuilder.createTopology());/* Utils.sleep(5000);cluster.killTopology(TOPOLOGY_NAME);cluster.shutdown();  *///集群模式,需要打包jar上传到集群,然后使用命令 :storm jar storm_demo-0.0.1-SNAPSHOT.jar com.zwy.storm.demo.wordcount.WordCountTopologyconfig.setNumWorkers(2); //设置Worker进程数量config.setNumAckers(0);//设置acker并发数,0代表取消acker任务。Acker任务默认是每个worker进程启动一个executor线程来执行,该实例启动了2个worker,则默认会启动2个executor线程,2个acker任务StormSubmitter.submitTopology("keyword-rank-topology",config,topologyBuilder.createTopology());}
}

大数据课程——Storm综合应用相关推荐

  1. 大数据课程综合实验案例:网站用户行为分析

    大数据课程综合实验案例 1 案例简介 1.1 案例目的 1.2 适用对象 1.3 时间安排 1.4 预备知识 1.5 硬件要求 1.6 软件工具 1.7 数据集 1.8 案例任务 1.9 实验步骤 2 ...

  2. 大数据课程体系-学习笔记概要

    目录 目录 大数据课程体系 简介 学习阶段不定时更新 大数据课程体系 简介 作为一名物联网工程专业的学生,对于大数据有着不同寻常的热情,在有了一定的Android基础和J2EE基础后,希望学习更多的数 ...

  3. 不错的大数据课程体系(感谢某机构,希望不属于侵权)

    2019独角兽企业重金招聘Python工程师标准>>> 阶段一.大数据.云计算 - Hadoop大数据开发技术 课程一.大数据运维之Linux基础 本部分是基础课程,帮大家进入大数据 ...

  4. 大数据开发工程师都需要学什么大数据课程?

    学习大数据需要的基础:java SE.EE(SSM).MySQL.Linux等,大数据的框架安装在Linux操作系统上. 大数据开发工程师都需要学什么大数据课程? 第一.需要学习Java基础 很多人好 ...

  5. 大数据课程培训大纲及详细说明(全)

    一.大数据处理技术-基于Hadoop/Yarn的实战(含Spark.Storm和Docker应用介绍 ) 本课程从大数据技术以及Hadoop/Yarn实战的角度,结合理论和实践,全方位地介绍Hadoo ...

  6. 大数据要学什么?看看这份大数据课程大纲

    大数据领域每年都会涌现出大量新的技术,成为大数据获取.存储.处理分析或可视化的有效手段.大数据技术能够将大规模数据中隐藏的信息和知识挖掘出来,为人类社会经济活动提供依据,提高各个领域的运行效率,甚至整 ...

  7. 大数据课程培训大纲详解,大数据培训学习内容

    大数据助力成就非凡.大数据正在改变着商业游戏规则,为企业解决传统业务问题带来变革的机遇.毫无疑问,当未来企业尝试分析现有海量信息以推动业务价值增值时,必定会采用大数据技术.那么大数据培训哪家好呢?今天 ...

  8. python大数据课程培训大纲

    一.大数据处理技术-基于Hadoop/Yarn的实战(含Spark.Storm和Docker应用介绍 ) 本课程从大数据技术以及Hadoop/Yarn实战的角度,结合理论和实践,全方位地介绍Hadoo ...

  9. 学习大数据不知从何学起?看看这份大数据课程大纲

    大数据领域每年都会涌现出大量新的技术,成为大数据获取.存储.处理分析或可视化的有效手段.大数据技术能够将大规模数据中隐藏的信息和知识挖掘出来,为人类社会经济活动提供依据,提高各个领域的运行效率,甚至整 ...

最新文章

  1. 备忘之--apache下为站点添加错误页面
  2. js 读取php页面内容,js读取html文件 js获取html页面显示内容
  3. leetcode 300. 最长上升子序列
  4. [源码和文档分享]基于C#和MYSQL数据库实现的课程自动考试系统
  5. Scrapy:学习笔记(2)——Scrapy项目
  6. 第三百七十二天 how can I 坚持
  7. Python数据库sqlite3详解
  8. 基于低代码平台的OA系统,更灵活高效!
  9. VC++使用OD反汇编引擎(非BC做DLL或LIB包装)
  10. 统计(statistic)(二分查找+离散化)
  11. 做LED照明类产品有感
  12. 送书活动还有最后一本书,怎么办呢?
  13. 微信小程序请求wx.request报400(Bad Request)解决
  14. python学习笔记(九)异常处理
  15. sklearn提示cannot import name ‘MaskedArray‘ from ‘sklearn.utils.fixes‘的解决方法
  16. 从键盘读入一个字符串,若遇到字母,则输出0;若遇到数字则输出1;否则不输出。例如:输入ab@12c,输出00110
  17. 计算机培训中学语文研修计划,中学2018教师培训研修计划
  18. php面向对象之tian,php之面向对象
  19. 04 JDK环境安装
  20. 2023年可见光通信(LiFi)研究新进展

热门文章

  1. 我拼命买的学区房,现在可能要凉了
  2. 使用shell下载查找对应的电影
  3. 推荐几个epub转mobi的软件给你
  4. AWS认证是什么?怎么报考?
  5. ptcms自动采集小说系统电脑版+手机版源码免费下载
  6. 吉多·范罗苏姆 --python创始人
  7. 美国亚马逊最新要求ASTM F2641 电动滑板车、自平衡踏板车UL2272认证办理流程
  8. Blender:Lowpoly人物模型
  9. GAT:图注意力模型介绍及PyTorch代码分析
  10. layui 点击行事件 修改当前行颜色