【Storm】Spout的storm-starter及Grouping策略、并发度讲解、网站浏览量和用户数统计
maven先安装好。
以下讲storm-starter的使用。
1、从github下载官方的storm-starter例子包,是maven工程,
地址 https://github.com/nathanmarz/storm-starter
2、把文件解压复制到workspace目录下,用cmd命令行,在该文件目录下运行mvn eclipse:eclipse,生成eclipse所用的文件,使得maven工程变成eclipse可用的工程。
3、导入到eclipse。新建源码文件夹lesson。把上一节storm入门案例工程的lesson包,整个复制到storm-starter-master的lesson源码文件夹下。
4、选中项目右键,Run as maven package,用maven打包。在target文件夹下有2个jar包。
storm-starter-0.0.1-SNAPSHOT.jar是不含依赖,只含有工程代码,较小。
storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jar 包含依赖,较大,通常需要由依赖的包。
5、官方提供的例子
分组策略(Stream Grouping)
stream grouping 用来定义一个 stream 应该如何分配给 Bolts 上面的多个Tasks,也就是分配给Bolts上面的多个Executors(多线程,并发度)。
1、Storm 里面有 6种类型的 stream grouping:
注:1)、2)、5)最常用,其他基本不用。
单线程等于All Grouping
1)Shuffle Grouping:随机分组,随机派发 stream 里面的 tuple,保证每个 bolt 接收到的 tuple 数目大致相同。通过轮询实现,保证平均分配。
2)Fields Grouping:按字段分组,比如按 userid 来分组,具有同样 userid 的 tuple 会被分到相同的 bolts,而不同的 userid 则会被分配到不同的 bolts。
3)All Grouping:广播发送,对于每一个 tuple,所有的 bolts 都会收到。
4)Global Grouping:全局分组,这个 tuple 被分配到 storm 中的一个 bolt 的其中一个 task。再具体一点就是分配给 id 值最低的那个 task。
5)None Grouping:不分组,这个分组的意思是说 stream 不关心到底谁会收到它的 tuple。目前这种分组和 Shuffle Grouping 是一样的效果,但是多线程下不平均分配。
6)Direct Grouping:直接分组,这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个 task 处理这个消息。只有被声明为 Direct Stream 的消息流可以声明这种分组方法。而且这种消息 tuple 必须使用 emitDirect 方法来发射。消息处理者可以通过 TopologyContext 来获取处理它的消息的 task 的 id (OutputCollector.emit 方法也会返回 task 的 id)。
7)Local or Shuffle Grouping:如果目标 bolt 有一个或者多个 task 在同一个工作进程中,tuple 将会被随机发送给这些 tasks。否则,和普通的 Shuffle Grouping 行为一致。
2、测试
1)Shuffle Grouping 轮循的方式。
在lesson的Main.java改代码。把bolt并发数改为2,也就是bolt会有2个线程。
// bolt的方法有3个参数,// bolt的id(String类型),实例,并发数。大量数据场景并行数设置大一些// bolt的数据来源于spout,名称要和上文setSpout的id一致,否则不能获取到数据// shuffle发射规则,后续详讲topoBuilder.setBolt("bolt", new MyBolt(), 2).shuffleGrouping("spout");
lesson的MyBolt.java改代码。打印的内容增加当前的线程名。
if (null != valStr) {num++;System.out.println(Thread.currentThread().getName() + ", lines : " + num + ", sessionId: " + valStr.split("\t")[1]);}
控制台打印的内容,显示有2个bolt线程,每个线程接收到的个数相同。每个线程打印出来的总行数相加,才等于track.log文件的总行数。
Thread-22-bolt, lines : 1, sessionId: 5CFBA5BD76BACF436ACA9DCC8
Thread-50-bolt, lines : 1, sessionId: 5D16C7A886E2P2AE3EA29FC3E
Thread-22-bolt, lines : 2, sessionId: 5D16C7A886E2P2AE3EA29FC3E
Thread-50-bolt, lines : 2, sessionId: 5C3FBA728FD7D264B80769B23
Thread-22-bolt, lines : 3, sessionId: 5D16C1F5191CF9371Y32B58CF
Thread-50-bolt, lines : 3, sessionId: 5C16BC4MB91B85661FE22F413
.
.
.
Thread-22-bolt, lines : 23, sessionId: 5GFBAT3D3100A7A7255027A70
Thread-50-bolt, lines : 23, sessionId: 5D16C1EB1C7A751AE03201C3F
Thread-50-bolt, lines : 24, sessionId: 5B16C0F7215109AG43528BA2D
Thread-22-bolt, lines : 24, sessionId: 5N16C2FE51E5619C2A1244215
Thread-22-bolt, lines : 25, sessionId: 5X16BCA8823AC4BD9CD196A5D
Thread-50-bolt, lines : 25, sessionId: 5C3FBA728FD7D264B80769B23
2)spout的并发数为2,bolt并发数为1.
topoBuilder.setSpout("spout", new MySpout(), 2);
topoBuilder.setBolt("bolt", new MyBolt(), 1).shuffleGrouping("spout");
控制台打印的内容显示,只有1个bolt线程,符合预期。但是bolt读取到的总行数是track.log文件行数的2倍。这是因为spout有2个线程,所以track.log被读取了2次。说明把文件当做spout数据来源是,shout的线程数只能是1.
Thread-23-bolt, lines : 1, sessionId: 5CFBA5BD76BACF436ACA9DCC8
Thread-23-bolt, lines : 2, sessionId: 5CFBA5BD76BACF436ACA9DCC8
Thread-23-bolt, lines : 3, sessionId: 5D16C7A886E2P2AE3EA29FC3E
Thread-23-bolt, lines : 4, sessionId: 5D16C7A886E2P2AE3EA29FC3E
Thread-23-bolt, lines : 5, sessionId: 5D16C7A886E2P2AE3EA29FC3E
.
.
.
Thread-23-bolt, lines : 96, sessionId: 5N16C2FE51E5619C2A1244215
Thread-23-bolt, lines : 97, sessionId: 5X16BCA8823AC4BD9CD196A5D
Thread-23-bolt, lines : 98, sessionId: 5X16BCA8823AC4BD9CD196A5D
Thread-23-bolt, lines : 99, sessionId: 5C3FBA728FD7D264B80769B23
Thread-23-bolt, lines : 100, sessionId: 5C3FBA728FD7D264B80769B23
3)Non Grouping
spout并发数为1,bolt并发数为2.
topoBuilder.setSpout("spout", new MySpout(), 1);
topoBuilder.setBolt("bolt", new MyBolt(), 2).noneGrouping("spout");
控制台打印内容如下,bolt有2个线程,线程名为Thread-23-bolt的计数器是20,线程名为Thread-50-bolt的计数器是30。说明在non grouping模式下,是不平均分配的。
Thread-23-bolt, lines : 2, sessionId: 5D16C7A886E2P2AE3EA29FC3E
Thread-50-bolt, lines : 1, sessionId: 5D16C7A886E2P2AE3EA29FC3E
Thread-23-bolt, lines : 3, sessionId: 5C3FBA728FD7D264B80769B23
Thread-23-bolt, lines : 4, sessionId: 5D16C1F5191CF9371Y32B58CF
Thread-50-bolt, lines : 2, sessionId: 5C16BC4MB91B85661FE22F413
.
.
.
Thread-23-bolt, lines : 17, sessionId: 5D16C1EB1C7A751AE03201C3F
Thread-23-bolt, lines : 18, sessionId: 5B16C0F7215109AG43528BA2D
Thread-50-bolt, lines : 30, sessionId: 5N16C2FE51E5619C2A1244215
Thread-23-bolt, lines : 19, sessionId: 5X16BCA8823AC4BD9CD196A5D
Thread-23-bolt, lines : 20, sessionId: 5C3FBA728FD7D264B80769B23
4)Fields Grouping 策略,
回顾 fields grouping 策略的作用:
(1)过滤,从源端(spout或上一级bolt)多输出Fields中选择某些field;
(2)相同的tuple会分发给同一个Executor或task处理。
典型场景:去重操作,join(企业用得不多,需要用到2个数据源,且2个数据源要同时,不能相差太久)
1个spout,2个bolt。
topoBuilder.setSpout("spout", new MySpout(), 1);
// Field grouping有2个参数。第一个是spout名称,第二个是field名称
topoBuilder.setBolt("bolt", new MyBolt(), 2).fieldsGrouping("spout", new Fields("log"));
效果和Non Grouping差不多。
Thread-50-bolt, lines : 1, sessionId: 5CFBA5BD76BACF436ACA9DCC8
Thread-50-bolt, lines : 2, sessionId: 5D16C7A886E2P2AE3EA29FC3E
Thread-50-bolt, lines : 3, sessionId: 5D16C7A886E2P2AE3EA29FC3E
Thread-50-bolt, lines : 4, sessionId: 5C3FBA728FD7D264B80769B23
Thread-50-bolt, lines : 5, sessionId: 5D16C1F5191CF9371Y32B58CF
.
.
.
Thread-50-bolt, lines : 26, sessionId: 5B16C0F7215109AG43528BA2D
Thread-21-bolt, lines : 22, sessionId: 5N16C2FE51E5619C2A1244215
Thread-21-bolt, lines : 23, sessionId: 5X16BCA8823AC4BD9CD196A5D
Thread-50-bolt, lines : 27, sessionId: 5C3FBA728FD7D264B80769B23
5)All grouping策略模式
1个spout,2个bolt。
topoBuilder.setSpout("spout", new MySpout(), 1);
// all grouping 广播方式
topoBuilder.setBolt("bolt", new MyBolt(), 2).allGrouping("spout");
spout会把每个数据分发给每一个下级的bolt,每个bolt线程获取到的行数都是一样的。开发时广播方式不常用。
Thread-23-bolt, lines : 1, sessionId: 5CFBA5BD76BACF436ACA9DCC8
Thread-49-bolt, lines : 1, sessionId: 5CFBA5BD76BACF436ACA9DCC8
Thread-23-bolt, lines : 2, sessionId: 5D16C7A886E2P2AE3EA29FC3E
Thread-49-bolt, lines : 2, sessionId: 5D16C7A886E2P2AE3EA29FC3E
.
.
.
Thread-23-bolt, lines : 49, sessionId: 5X16BCA8823AC4BD9CD196A5D
Thread-49-bolt, lines : 49, sessionId: 5X16BCA8823AC4BD9CD196A5D
Thread-23-bolt, lines : 50, sessionId: 5C3FBA728FD7D264B80769B23
Thread-49-bolt, lines : 50, sessionId: 5C3FBA728FD7D264B80769B23
6)Global Grouping 全局分组
1的spout,2个bolt。
topoBuilder.setSpout("spout", new MySpout(), 1);
// Global Grouping 全局分组
topoBuilder.setBolt("bolt", new MyBolt(), 2).globalGrouping("spout");
控制台打印的内容,有2个线程,名称分别为Thread-22-bolt 和 Thread-50-bolt。但是只有序号小的有接收到数据,序号大的没有接收到数据。Global Grouping 全局分组是把数据分配给id值最低的task。
[Thread-22-bolt] INFO backtype.storm.daemon.executor - Prepared bolt bolt:(5)
[Thread-50-bolt] INFO backtype.storm.daemon.executor - Prepared bolt bolt:(6)
.
.
.
Thread-22-bolt, lines : 1, sessionId: 5CFBA5BD76BACF436ACA9DCC8
Thread-22-bolt, lines : 2, sessionId: 5D16C7A886E2P2AE3EA29FC3E
Thread-22-bolt, lines : 3, sessionId: 5D16C7A886E2P2AE3EA29FC3E
.
.
.
Thread-22-bolt, lines : 47, sessionId: 5B16C0F7215109AG43528BA2D
Thread-22-bolt, lines : 48, sessionId: 5N16C2FE51E5619C2A1244215
Thread-22-bolt, lines : 49, sessionId: 5X16BCA8823AC4BD9CD196A5D
Thread-22-bolt, lines : 50, sessionId: 5C3FBA728FD7D264B80769B23
思考:读取文件案例思考
示例中用的是storm读取文件,把文件作为数据源,在企业中很少见。storm是分布式应用,数据会分发到每一台supervisor执行,读本地文件只在一台机器上。
1)Spout数据源可以是数据库、文件、MQ(比如:Kafka)。
2)数据源是数据库:只适合读取数据库的配置文件,但不能读取增量数据。
3)数据源是文件:只适合测试、讲课用(因为集群是分布式集群),其他无用。
4)企业产生的 log 文件处理步骤:
(1)读出内容写 入MQ
(2)Storm 再处理
读文件案例说明:
1)分布式应用无法读文件;
2)spout无法并发读,开并发会重复读。
并发度场景分析
场景分析:
单线程下:加减乘除(其实什么都可以做),和任何类进行操作。
多线程下:可以做局部加减乘除,不适合做全部加减乘除。
多线程下适合:
a、局部加减乘除
b、做处理类Operate,如split
c、持久化,如入DB
官方案例
1、统计单词 WordCountTopology,在storm-starter-master工程的目录如下。
为了便于理解,对官方的案例进行改写。在源码文件夹lesson下,创建包WordCount,复制WordCountTopology.java到WordCount包。
2、程序分析:
(1)有1个spout,有2个bolt。
(2)MyRandomSentenceSpout多次发送数据,nextTuple函数的实现是,每次发送的数据相同。
(3)MySplit,接收到spout的数据,数据都是字符串,对字符串进行分割。
(4)WordCount,接收从split发射的数据,都是单个字符,统计每个字符的个数。
3、各个程序代码和运行结果
(1)主程序WordCountTopology,用于统计单词个数的bolt程序WordCount
package WordCount;import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.task.ShellBolt;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import storm.starter.spout.RandomSentenceSpout;import java.util.HashMap;
import java.util.Map;/*** This topology demonstrates Storm's stream groupings and multilang capabilities.*/
public class WordCountTopology {public static class SplitSentence extends ShellBolt implements IRichBolt {public SplitSentence() {super("python", "splitsentence.py");}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}@Overridepublic Map<String, Object> getComponentConfiguration() {return null;}}// 统计每个单词出现的次数public static class WordCount extends BaseBasicBolt {Map<String, Integer> counts = new HashMap<String, Integer>();@Overridepublic void execute(Tuple tuple, BasicOutputCollector collector) {String word = tuple.getString(0);Integer count = counts.get(word);if (count == null)count = 0;count++;counts.put(word, count);//打印出当前线程名称,单词 和 个数System.out.println(Thread.currentThread().getName() + ", word = " + word + ", count = " + count);collector.emit(new Values(word, count));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word", "count"));}}public static void main(String[] args) throws Exception {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("spout", new MyRandomSentenceSpout(), 1);builder.setBolt("split", new MySplit(" "), 8).shuffleGrouping("spout");builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));Config conf = new Config();conf.setDebug(true);if (args != null && args.length > 0) {conf.setNumWorkers(3);StormSubmitter.submitTopology(args[0], conf, builder.createTopology());}else {conf.setMaxTaskParallelism(3);LocalCluster cluster = new LocalCluster();cluster.submitTopology("word-count", conf, builder.createTopology());// Thread.sleep(10000);
//
// cluster.shutdown();}}
}
(2)spout程序,MyRandomSentenceSpout。定义一个字符串数组,{ "a b c d", "e f g h", "i j k l"},为了便于观察,所以每个字符都不重复。每个字符串的字符由空格隔开,每个字符串逐个发送。
package WordCount;import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;import java.util.Map;
import java.util.Random;public class MyRandomSentenceSpout extends BaseRichSpout {private static final long serialVersionUID = 1L;SpoutOutputCollector _collector;Random _rand;// 单词字符串,由空格隔开String[] sentences = new String[]{ "a b c d", "e f g h", "i j k l"};@Overridepublic void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {_collector = collector;_rand = new Random();}@Overridepublic void nextTuple() {for (String sentence : sentences) {_collector.emit(new Values(sentence));}// 睡眠10秒钟Utils.sleep(10 * 1000);}@Overridepublic void ack(Object id) {}@Overridepublic void fail(Object id) {}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}}
(3)bolt程序,MySplit,接收字符串,分割成单个单词。
package WordCount;import java.util.Map;import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.FailedException;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;/*** IBasic开头的不需要写ack方法。会自动调用ack方法* @description bolt,分割单词. * @author whiteshark* @date 2019年6月30日 下午11:46:02*/
public class MySplit implements IBasicBolt{private String pattern;public MySplit(String pattern) {this.pattern = pattern;}/*** 每个bolt 和 spout 最好序列化,免得开高并发出错*/private static final long serialVersionUID = 1L;@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}@Overridepublic Map<String, Object> getComponentConfiguration() {// TODO Auto-generated method stubreturn null;}@Overridepublic void prepare(Map stormConf, TopologyContext context) {// TODO Auto-generated method stub}@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {try {String sentence = input.getString(0);if (null != sentence){// 字符串由空格隔开,用split分割成单个字符for (String word : sentence.split(pattern)) {collector.emit(new Values(word));}}// IBasic开头的不需要写ack方法。执行成功会自动调用ack方法// 如果抛出FailedException异常,失败了也会通知} catch (FailedException e) {e.printStackTrace();}}@Overridepublic void cleanup() {// TODO Auto-generated method stub}}
(4)运行主程序WordCountTopology,在控制台有打印bolt线程名称,单词,单词个数。spout运行了2次,所有发送了2次数据。可以看到,由于在bolt程序WordCount使用了fields,每个线程两次处理的字符都相同。
验证了上文在Field Grouping策略中提到的,相同的tuple会分发给同一个Executor或task处理。
Thread-16-count, word = e, count = 1
Thread-16-count, word = h, count = 1
Thread-16-count, word = k, count = 1
Thread-16-count, word = b, count = 1
Thread-18-count, word = f, count = 1
Thread-18-count, word = i, count = 1
Thread-18-count, word = l, count = 1
Thread-18-count, word = c, count = 1
Thread-20-count, word = g, count = 1
Thread-20-count, word = j, count = 1
Thread-20-count, word = a, count = 1
Thread-20-count, word = d, count = 1Thread-16-count, word = b, count = 2
Thread-16-count, word = e, count = 2
Thread-16-count, word = h, count = 2
Thread-16-count, word = k, count = 2
Thread-18-count, word = f, count = 2
Thread-18-count, word = c, count = 2
Thread-18-count, word = i, count = 2
Thread-18-count, word = l, count = 2
Thread-20-count, word = a, count = 2
Thread-20-count, word = g, count = 2
Thread-20-count, word = d, count = 2
Thread-20-count, word = j, count = 2
并发度
在Storm中,一个task可以简单的理解为在集群某节点上运行的一个spout或者bolt实例。在集群运行运行中,topology主要有四个组成部分:他们从低到高分别是:task(bolt/spout实例)、Executor(线程)、Workers(JVM虚拟机)、Nodes(服务器)
各个部分的含义如下:
(1)Nodes(服务器):是指配置在一个Storm集群中的服务器,会执行topology的一部分运算。一个Storm集群可以包括一个或者多个工作node。
(2)Workers(JVM虚拟机):是指一个node节点服务器上相互独立运行的JVM进程。每一个node可以配置运行一个或者多个worker。一个topology会分配到一个或者多个worker上运行。
(3)Executor(线程):是指一个worker的JVM进程中运行的Java线程。多个task可以指派给同一个executor来执行。除非是明确指定,Storm默认会给每一个executor分配一个task。
(4)Task(bolt/spout实例):task是spout和bolt的实例,里面的nextTuple()和execute()方法会被executors线程调用执行。
并发度:用户指定一个任务,可以被多个线程执行,并发度的数量等于线程 executor 的数量
。
task 就是具体的处理逻辑对象,一个 executor 线程可以执行一个或多个 tasks,但一般默认每个 executor 只执行一个 task,所以我们往往认为 task 就是执行线程,其实不是。
task 代表最大并发度
,一个 component 的 task 数是不会改变的,但是一个 componet 的 executer 数目是会发生变化的(storm rebalance 命令),task 数 >= executor 数,executor 数代表实际并发数
。
结构图如下:
WordCountTopology 统计单词的案例,包含的的spout和bolt如下,
(1)spout,类名为SentenceSpout,产生字符串。
(2)bolt,类名为SplitSentence,分割字符串为单词。
(3)bolt,类名为WordCount,统计单词。
(4)bolt,类名为ReportBolt,报告单词统计。
设置不同的线程数和任务数,看并发图
(1)默认情况下,每个 spout / bolt 的并发度(executor)是1,任务(task)也是1。
builder.setSpout(SENTENCE_SPOUT_ID, spout);
// SentenceSpout --> SplitSentenceBolt
builder.setBolt(SPLIT_BOLT_ID, splitBolt).shuffleGrouping(SENTENCE_SPOUT_ID);
// SplitSentenceBolt --> WordCountBolt
builder.setBolt(COUNT_BOLT_ID, countBolt).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
// WordCountBolt --> ReportBolt
builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(COUNT_BOLT_ID);
并发图如下。唯一的并发机制出现在线程级,每个任务在同一个JVM的不同线程中运行。如何增加并发度以充分利用硬件能力?让我们来增加分配给topology 的 worker 和 executer 的数量。
(2)把 SentenceSpout 的并发度设置为2,worker不变。
//这个2指的是有两个executor,虽然没有显示指定task的数量,
//1个executor至少有1个task。因为executor为2,默认task也就是2
builder.setSpout(SENTENCE_SPOUT_ID, spout, 2);
并发图如下。SentenceSpout 有2个线程,每个线程有1个任务。
(3)配置worker数量为2,SplitSentence设置为4个task和2个executor。WordCount设置为4个executor。
Config config = new Config();
config.setNumWorkers(2);
builder.setBolt(SPLIT_BOLT_ID, splitBolt, 2).setNumTasks(4).shuffleGrouping(SENTENCE_SPOUT_ID);
// SplitSentenceBolt --> WordCountBolt
builder.setBolt(COUNT_BOLT_ID, countBolt, 4).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
并发图如下:
网站浏览量和用户数统计
PV-UV案例需求分析
网站最常用的2个指标:
PV(page views):count(session_id)
UV(user views):count(distinct session_id)
多线程下,注意线程安全问题。
一、PV统计
方案分析
如下是否可行?
1、定义static long pv,Synchronized控制累计操作。
Synchronized和Lock在单JVM下有效,但在多JVM下无效。
可行的两个方案:
1、shuffleGrouping下,pv * executor并发数。比较简单,但只能局限于shuffleGrouping,且会有中间数据。
2、bolt1进行多并发局部汇总,bolt2单线程进行全局汇总。这种方式可行,推荐这种方式。
线程安全:多线程处理的结果和单线程一致,就是线程安全。否则不安全。
案例代码
准备数据:track.log文件有50行数据。每行有3列,分别为网站,sessionId,时间。列与列由tab隔开。如下图。
www.taobao.com 5CFBA5BD76BACF436ACA9DCC8 2019-06-29 11:01:20
www.taobao.com 5D16C7A886E2P2AE3EA29FC3E 2019-06-29 08:01:36
www.taobao.com 5D16C7A886E2P2AE3EA29FC3E 2019-06-29 10:51:27
(1)程序的启动类。1个spout,4个bolt1,1个汇总的sumbolt。
package visits;import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;/*** 程序的启动类* @description * @author whiteshark* @date 2019年7月6日 下午5:13:42*/
public class Main {public static void main(String[] args) {// 1.创建topology, 拓扑对象TopologyBuilder topoBuilder = new TopologyBuilder();// 2. 设置spout,bolt// setSpout方法的3个参数分别为,// spout的id(string类型),实例,并发数。大量数据场景并行数设置大一些topoBuilder.setSpout("spout", new MySpout(), 1);// shuffle Grouping 分组topoBuilder.setBolt("bolt", new PVBolt1(), 4).shuffleGrouping("spout");// 汇总topoBuilder.setBolt("sumbolt", new PVSumBolt(), 1).shuffleGrouping("bolt");// 3. 设置works个数Config conf = new Config();conf.put(Config.TOPOLOGY_WORKERS, 4);if (args.length > 0) {try {// 4. 分布式提交// 3个参数分别为:拓扑名称,stormconfig配置,拓扑实例StormSubmitter.submitTopology(args[0], conf, topoBuilder.createTopology());} catch (AlreadyAliveException e) {e.printStackTrace();} catch (InvalidTopologyException e) {e.printStackTrace();}} else {// 5. 本地模式提交LocalCluster localCluster = new LocalCluster();localCluster.submitTopology("mytopology", conf, topoBuilder.createTopology());}}
}
(2)spout,从track.log读取每一行,发送到下一级bolt。
package visits;import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.util.Map;import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;/*** * @description Spout,读取数据,一行一行的发送到下一级处理* @author whiteshark* @date 2019年6月30日 上午9:28:17*/
public class MySpout implements IRichSpout{private static final long serialVersionUID = 1L;private FileInputStream fis;private InputStreamReader isr;private BufferedReader br;private String str = null;SpoutOutputCollector collector = null;@Overridepublic void nextTuple() {try {while ((str = this.br.readLine()) != null) {/*在这可以对数据进行加工或过滤*/// 发射数据collector.emit(new Values(str));// 为了在控制台观察打印出来的数据,这里暂停Thread.sleep(500);}} catch (Exception e) {e.printStackTrace();}}// 在提交作业时,会把storm.yaml的所有项读取到Map中,在open方法中如果修改map的值,// 会覆盖原来storm.yaml所定义的值,@Overridepublic void open(Map conf, TopologyContext content, SpoutOutputCollector collector) {this.collector = collector;try {this.fis = new FileInputStream("track.log");this.isr = new InputStreamReader(fis, "UTF-8");this.br = new BufferedReader(isr);} catch (Exception e) {e.printStackTrace();}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// 定义发射数据的格式declarer.declare(new Fields("log"));}@Overridepublic Map<String, Object> getComponentConfiguration() {// TODO Auto-generated method stubreturn null;}// 下一级成功应答ack,打印内容@Overridepublic void ack(Object msgId) {System.out.println("spout ack: " + msgId.toString());}@Overridepublic void activate() {// TODO Auto-generated method stub}// 资源关闭@Overridepublic void close() {try {br.close();isr.close();fis.close();} catch (Exception e) {e.printStackTrace();}}@Overridepublic void deactivate() {// TODO Auto-generated method stub}// 下一级失败应答ack,打印内容@Overridepublic void fail(Object msgId) {System.out.println("spout fail: " + msgId.toString());}
}
(3)bolt1,在main中设置有4个线程。接收从spout发射的每一行,分割成3个字段。并统计第二个字段(sessionId)的个数,sessionId的个数和当前线程序号,一起传递到sumBolt做最后的汇总。
package visits;import java.util.Map;import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;public class PVBolt1 implements IRichBolt{private static final long serialVersionUID = 1L;OutputCollector collector;// 初始化方法@Overridepublic void prepare(Map stormConf, TopologyContext context,OutputCollector collector) {this.collector = collector;}String logString = null;String sessionId = null;Integer pv = 0;@Overridepublic void execute(Tuple input) {logString = input.getString(0);sessionId = logString.split("\t")[1];if (null != sessionId) {pv++;}// 第一个值是当前线程ID,第二个值是浏览次数collector.emit(new Values(Thread.currentThread().getId(), pv));System.out.println(Thread.currentThread().getName() + ", pv = " + pv);}@Overridepublic void cleanup() {// TODO Auto-generated method stub}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("threadId", "pv"));}@Overridepublic Map<String, Object> getComponentConfiguration() {// TODO Auto-generated method stubreturn null;}
}
(4)PVSumBolt,做最后的汇总。从上一级接收线程序号和sessionId个数,保存和更新到map中。遍历map,累加所有的个数。得出总的个数。
package visits;import java.util.HashMap;
import java.util.Map;import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;public class PVSumBolt implements IRichBolt{private static final long serialVersionUID = 1L;Map<Long, Integer> counts = new HashMap<Long, Integer>();@Overridepublic void prepare(Map stormConf, TopologyContext context,OutputCollector collector) {}@Overridepublic void execute(Tuple input) {Long threadId = input.getLong(0);Integer pv = input.getInteger(1);counts.put(threadId, pv);long wordSum =0;// 获取总数,遍历counts的values,进行累加for (Map.Entry<Long, Integer> count : counts.entrySet()) {wordSum += count.getValue();}System.out.println(Thread.currentThread().getName() + ", wordSum = " + wordSum);}@Overridepublic void cleanup() {// TODO Auto-generated method stub}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// TODO Auto-generated method stub}@Overridepublic Map<String, Object> getComponentConfiguration() {// TODO Auto-generated method stubreturn null;}
}
(5)输出结果:可以看出,有4个bolt1线程,只有1个sumbolt汇总结果。
Thread-26-bolt, pv = 1
Thread-57-sumbolt, wordSum = 1
Thread-53-bolt, pv = 1
Thread-57-sumbolt, wordSum = 2
Thread-23-bolt, pv = 1
Thread-57-sumbolt, wordSum = 3
Thread-55-bolt, pv = 1
Thread-57-sumbolt, wordSum = 4
.
.
.
Thread-53-bolt, pv = 11
Thread-57-sumbolt, wordSum = 41
Thread-23-bolt, pv = 11
Thread-57-sumbolt, wordSum = 42
Thread-55-bolt, pv = 11
Thread-57-sumbolt, wordSum = 43
Thread-26-bolt, pv = 11
Thread-57-sumbolt, wordSum = 44
Thread-23-bolt, pv = 12
Thread-57-sumbolt, wordSum = 45
Thread-55-bolt, pv = 12
Thread-57-sumbolt, wordSum = 46
Thread-26-bolt, pv = 12
Thread-57-sumbolt, wordSum = 47
Thread-53-bolt, pv = 12
Thread-57-sumbolt, wordSum = 48
Thread-26-bolt, pv = 13
Thread-57-sumbolt, wordSum = 49
Thread-53-bolt, pv = 13
Thread-57-sumbolt, wordSum = 50
PV-UV案例优化引入Zookeeper锁控制线程操作
汇总型方案:
1、在shuffleGrouping下,pv(单线程结果)*Executer并发数,一个Executer默认一个task,如果设置Task数大于1,公式应该是 pv(单线程结果)*Task数。同一个Executer下task的线程ID相同,taskId不同。
优点:简单、计算量小
缺点:存在有一点误差,但大部分场景能接受。
优化:
案例PVBolt中每个Task都会输出一个汇总值,实际只需要一个Task输出汇总值。利用Zookeeper锁来做到只有一个Task输出汇总值,而且每5秒输出一次。
2、bolt1进行多次并发局部汇总,bolt2单线程进行全局汇总。
优点:
(1)计算绝对准确;
(2)如果用fieldGrouping可以得到中间值,如单个user的访问PV(访问深度,也是有用的指标)
缺点:计算量稍大,且多一个Bolt。
预处理:现在虚拟机hadoop-senior和hadoop-senior02启动zookeeper集群。先创建"/lock”目录,再创建"/lock/storm”目录,保存的值均为空。
案例代码:
(1)SourceSpout用于产生源数据。每次产生100行字符串,每行字符串的格式均为:域名 + "\t" + sessionId + "\t" +时间
package lock;import java.util.Map;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;/*** * @description 生成数据* @author whiteshark* @time 2019年6月29日 上午9:45:40* @version*/
public class SourceSpout implements IRichSpout{private static final long serialVersionUID = 1L;SpoutOutputCollector collector;Queue<String> queue = new ConcurrentLinkedQueue<String>();String str = null;@Overridepublic void open(Map conf, TopologyContext context,SpoutOutputCollector collector) {try {System.out.println("SourceSpout prepare (taskId = " + context.getThisTaskId() + ") - 1");this.collector = collector;Random random = new Random();// 登录的网站是taobaoString hosts = "www.taobao.com";//每次登录的session idString[] sessionId = {"5GFBAT3D3100A7A7255027A70", "5X16BCA8823AC4BD9CD196A5D", "5CFBA5BD76BACF436ACA9DCC8", "5D16C3E0209C16DEAA28L1824","5I16CB309251CCF6CE6223BA1", "5C16BC4MB91B85661FE22F413","5D16C1F5191CF9371Y32B58CF", "5D16C7A886E2P2AE3EA29FC3E","5C3FBA728FD7D264B80769B23", "5B16C0F7215109AG43528BA2D","5N16C2FE51E5619C2A1244215", "5D16C1EB1C7A751AE03201C3F"};//登录的时间String[] times = {"2019-06-29 08:01:36", "2019-06-29 08:11:37", "2019-06-29 08:31:38", "2019-06-29 09:23:07", "2019-06-29 10:51:27", "2019-06-29 10:51:56","2019-06-29 11:01:07", "2019-06-29 11:01:20", "2019-06-29 11:45:30","2019-06-29 12:31:49", "2019-06-29 12:41:51", "2019-06-29 12:51:37", "2019-06-29 13:11:27", "2019-06-29 13:20:40", "2019-06-29 13:31:38",};for (int i = 0; i < 100; i++) {queue.add(hosts + "\t" + sessionId[random.nextInt(12)] + "\t" + times[random.nextInt(15)]);}System.out.println("SourceSpout prepare (taskId = " + context.getThisTaskId() + ") - 2");} catch (Exception e) {e.printStackTrace();}}@Overridepublic void close() {// TODO Auto-generated method stub}@Overridepublic void activate() {// TODO Auto-generated method stub}@Overridepublic void deactivate() {// TODO Auto-generated method stub}@Overridepublic void nextTuple() {// System.out.println("SourceSpout nextTuple");if (queue.size() >= 0) {this.collector.emit(new Values(queue.poll()));}try {Thread.sleep(200);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}@Overridepublic void ack(Object msgId) {// TODO Auto-generated method stub}@Overridepublic void fail(Object msgId) {// TODO Auto-generated method stub}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("threadId"));}@Overridepublic Map<String, Object> getComponentConfiguration() {// TODO Auto-generated method stubreturn null;}}
(2)PVBolt,在prepare中,会创建一个zookeeper 临时目录 "/lock/storm/pv",保存的值为 IP + ":" + taskId,所以只有一个任务能创建成功该临时目录。在execute方法中,每次读取一行字符串,取sessionId累加。如果是创建zookeeper目录的任务task,每隔5秒会输出总的sessionId个数。
package lock;import java.net.InetAddress;
import java.util.Map;import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;public class PVBolt implements IRichBolt{private static final long serialVersionUID = 1L;//在zk中创建锁的路径public static final String zkPath = "/lock/storm/pv";ZooKeeper zKeeper = null;String lockData = null;OutputCollector collector;String logString = null;String sessionId = null;Integer pv = 0;long beginTime = System.currentTimeMillis();long endTime = 0;// 初始化方法@Overridepublic void prepare(Map stormConf, TopologyContext context,OutputCollector collector) {
// this.collector = collector;try {System.out.println("PVBolt prepare (taskId = " + context.getThisTaskId() + ") - 1");// 新建zk客户端到zk集群。3秒连不到集群算超时. ip地址是虚拟机的地址,2台虚拟机都有装zk,并且已经启动,存在目录"/lock/storm"zKeeper = new ZooKeeper("192.168.178.131:2181,192.168.178.132:2181", 20000, new Watcher(){@Overridepublic void process (WatchedEvent event) {System.out.println("event : " + event.getType());}});System.out.println("PVBolt prepare (taskId = " + context.getThisTaskId() + ") - 2");//如果没有连接到集群,休眠1秒,让其重连。如果没有连接成功,一直等待while (zKeeper.getState() != ZooKeeper.States.CONNECTED) {Thread.sleep(1000);}System.out.println("PVBolt prepare (taskId = " + context.getThisTaskId() + ") - 3");//获得本机的ipInetAddress address = InetAddress.getLocalHost();//保存在zk路径中的值,IP + ":" + taskIdlockData = address.getHostAddress() + ":" + context.getThisTaskId();System.out.println("PVBolt prepare (taskId = " + context.getThisTaskId() + ") - 4");// 只是判断目录是否存在,不放监控watchif (null == zKeeper.exists(zkPath, false)) {//如果不存在该目录,创建一个临时目录节点zKeeper.create(zkPath, lockData.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);System.out.println("PVBolt prepare (taskId = " + context.getThisTaskId() + ") - create " + zkPath);}System.out.println("PVBolt prepare (taskId = " + context.getThisTaskId() + ") - 5");} catch (Exception e) {try {zKeeper.close();} catch (InterruptedException e1) {e1.printStackTrace();}}}@Overridepublic void execute(Tuple input) {try {logString = input.getString(0);if (logString != null) {endTime = System.currentTimeMillis();sessionId = logString.split("\t")[1];if (null != sessionId) {pv++;}// 第一个值是当前线程ID,第二个值是浏览次数
// collector.emit(new Values(Thread.currentThread().getId(), pv));// 每5秒打印一次if (endTime - beginTime >= 5000) {System.err.println(lockData + "=============================");
// System.out.println("lockData = " + new String(zKeeper.getData(zkPath, false, null)));if (lockData.equals(new String(zKeeper.getData(zkPath, false, null)))) {System.out.println("pv ====================== " + pv * 4);}beginTime = System.currentTimeMillis();}}} catch (KeeperException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}@Overridepublic void cleanup() {// TODO Auto-generated method stub}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {
// declarer.declare(new Fields("threadId"));}@Overridepublic Map<String, Object> getComponentConfiguration() {// TODO Auto-generated method stubreturn null;}
}
(3)PVTopo,设置spout并发数为1,bolt并发数为4,并且策略为shuffle Grouping,使得数据均匀分配到每个bolt。
package lock;import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;/*** 程序的启动类* @description * @author whiteshark* @date 2019年7月6日 下午5:13:42*/
public class PVTopo {public static void main(String[] args) {// 1.创建topology, 拓扑对象TopologyBuilder topoBuilder = new TopologyBuilder();// 2. 设置spout,bolt// setSpout方法的3个参数分别为,// spout的id(string类型),实例,并发数。大量数据场景并行数设置大一些topoBuilder.setSpout("spout", new SourceSpout(), 1);// shuffle Grouping 分组,并发数为4topoBuilder.setBolt("bolt", new PVBolt(), 4).shuffleGrouping("spout");// 3. 设置works个数Config conf = new Config();conf.put(Config.TOPOLOGY_WORKERS, 4);if (args.length > 0) {try {// 4. 分布式提交// 3个参数分别为:拓扑名称,stormconfig配置,拓扑实例StormSubmitter.submitTopology(args[0], conf, topoBuilder.createTopology());} catch (AlreadyAliveException e) {e.printStackTrace();} catch (InvalidTopologyException e) {e.printStackTrace();}} else {// 5. 本地模式提交LocalCluster localCluster = new LocalCluster();localCluster.submitTopology("mytopology", conf, topoBuilder.createTopology());}}
}
输入的结果为:
192.168.43.105:6=============================
192.168.43.105:8=============================
192.168.43.105:5=============================
pv ====================== 16
192.168.43.105:7=============================
192.168.43.105:8=============================
192.168.43.105:6=============================
192.168.43.105:7=============================
192.168.43.105:5=============================
pv ====================== 44
192.168.43.105:8=============================
192.168.43.105:6=============================
192.168.43.105:7=============================
192.168.43.105:5=============================
pv ====================== 72
192.168.43.105:8=============================
192.168.43.105:6=============================
192.168.43.105:7=============================
192.168.43.105:5=============================
pv ====================== 100
如果程序不关闭,在zookeeper能看到临时目录"/lock/storm/pv"保存的值(IP + ":" + taskId):
[zk: localhost:2181(CONNECTED) 18] get /lock/storm/pv
192.168.43.105:5
cZxid = 0x2000000050
ctime = Tue Jul 30 00:02:39 CST 2019
mZxid = 0x2000000050
mtime = Tue Jul 30 00:02:39 CST 2019
pZxid = 0x2000000050
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x16c3e672803000f
dataLength = 16
numChildren = 0
打包发布到虚拟机storm集群运行
上面的案例是在本地机子上运行。接下来要在storm集群运行。
1、工程打包,工程右键 Run AS,点击Maven install,最终控制台显示“BUILD SUCCESS”,说明打包成功。
在target目录下,有2个包,名称带有depedencies的是有依赖,需要的是这个包。
用Filezilla 把包发送到虚拟机hadoop-senior的目录/opt/datas/stormjars下
2、启动虚拟机的storm集群。
先启动hadoop-senior虚拟机和hadoop-senior02虚拟机上的zookeeper。
在hadoop-senior虚拟机启动storm作为主节点,并启动ui。nohup是后台启动,关闭窗口不会停止该storm节点
nohup ./storm nimbus &
nohup ./storm ui &
在hadoop-senior02虚拟机启动storm作为从节点。
nohup ./storm supervisor &
在浏览器http://hadoop-senior:8081,可以看到集群的监控界面
3、提交拓扑任务
在hadoop-senior的storm安装目录下,进入bin目录。输入命令提交拓扑任务到集群
./storm jar /opt/datas/stormjars/storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jar lock.PVTopo PVTopo
包是在/opt/datas/stormjars下,所以要加上前缀。后面带有2个参数,参数1是类名,类名前缀要加上具体的包,参数2是拓扑名,拓扑名必须唯一。这里参数1是lock.PVTopo,参数2是PVTopo。
输入提交拓扑任务的命令后,截取部分控制台的输出脚本,可以看到已经完成提交。
在监控界面可以看到,新增一个拓扑,名称为PVTopo,状态为active。
点击拓扑名称,进入查看拓扑详情,可以看到有1个spout,4个bolt。spout发送了2060条数据,4个bolt总共接收了2060条数据。
4、查看运行结果
点击bolt进入查看bolt详情,4个bolt都在hadoop-senior02虚拟机的storm上运行,端口是work占用。我们在bolt程序中,有输出总的sessionid个数,有4个bolt,只有1个bolt有输出总数。如果要查看每一个bolt的log文件,这样要耗时。
我们在zookeeper中,保存在"/lock/storm/pv"的值是 IP + taskId,所以找到taskid,就知道在哪个虚拟机和端口。
从图片中可以看到,有输出sessionid总数的taskid是6,输出的log日志在hadoop-senior02虚拟机logs目录下,端口为6707.
因为4个bolt都是在hadoop-senior02虚拟机上运行,所以在hadoop-senior02虚拟机storm目录logs目录下,可以看到各个bolt的log文件。查看logs目录下的文件列表。4个bolt对应的log文件。每个bolt的log文件由端口号来识别。
有输出sessionid总数的log是 worker-6706.log,查看文件,有输出总数。上文中,soput发送和bolt接收的数量都远大于这个数,是因为发送的有空字符串。sessionid统计的是非空。
5、停止拓扑任务
有2中方法。分别是监控界面操作,命令行操作。
(1)界面操作(没有权限控制,只要能登陆进这个界面就能操作,所以不推荐),所有点deactivate,使任务停止(并没有立即停止,而是处理完正在运行中的程序),此时任务状态为inactivate。再点击kill,任务停止运行。
此时任务状态为inactivate,inactivate的下一个操作可以是activate,或者kill。顾名思义,activate是使得任务继续运行,kill是杀死任务,使得任务完全停止。
点击kill,完全停止任务。状态变为killed
(2)命令行操作,在storm的bin目录下,输入命令
./storm kill PVTopo
案例升级 计算网站UV(去重计算模式)
方案分析
1、把sessionId放入set实现自动去重,set.size()获得UV
可行的方案(类似wordcount的计算去重word总数):
bolt1通过fieldGrouping进行多线程局部汇总,下一级bolt2进行单线程保存sessionId和count数到map且进行遍历,可以得到:
PV,UV,访问深度(每个sessionId的浏览数)
简单、快速,但比较耗内存,要求集群资源内存多。
适用于中小企业,不适合大企业。
适用于小数据量,如订单。
2、no-sql分布式数据库,如HBase
通过rowkey实现去重,统计行数得到去重后的sessionId总数。
适用于大数据量,如统计流量。
storm的局限性:
storm应用场景广泛,但能做的复杂度有限,通常为汇总型。
对源数据做预处理,写入数据库。
下文的案例采用第一种方案。
程序分析:
(1)DateFmt,工具类,用于把长日期字符串,转化为短日期字符串(只含有年月日);
(2)SourceSpout ,spout类,1个线程,用于产生源数据。每次产生100行字符串,每行字符串的格式均为:域名 + "\t" + sessionId + "\t" +时间;
(3)FmtLogBolt,bolt类,4个线程,接收spout的数据,截取日期和sessionId发送到下一级;
(4)DeepVisitBolt,bolt类,4个线程,局部汇总,接收FmtLogBolt的数据,把接收到的日期和sessionId拼接成以下形式的字符串:日期+ "_" + sessionId。定义map,把日期+ "_" + sessionId作为key,每次进来相同的key,value加1;把日期+ "_" + sessionId,以及对应的value发送到下一级;
(5)UVSumBolt,bolt类,1个线程,全局汇总,接收DeepVisitBolt的数据。统计PV:用户的浏览数(对用户不去重),UV:用户的浏览数(对用户去重)
(6)UVTopo,topology类,定义spout,bolt类。
案例代码:
(1)DateFmt,工具类
package user_visit;import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;/*** 工具类* @author silva**/
public class DateFmt {public static final String DATE_LONG = "yyyy-MM-dd HH:mm:ss";public static final String DATE_SHORT = "yyyy-MM-dd";public static SimpleDateFormat sdf = new SimpleDateFormat(DATE_SHORT);// 返回指定格式的字符串public static String getCountDate(String date, String pattern) {SimpleDateFormat sdf = new SimpleDateFormat(pattern);Calendar cal = Calendar.getInstance();if (null != date) {try {cal.setTime(sdf.parse(date));} catch (Exception e) {e.printStackTrace();}}return sdf.format(cal.getTime());}// 字符串转化为datepublic static Date parseDate(String dateStr) throws Exception {return sdf.parse(dateStr);}public static void main(String[] args) {String dateStr = "2019-06-29 13:11:27";System.out.println("date = " + getCountDate(dateStr, DATE_SHORT));}
}
(2)SourceSpout ,spout类,1个线程,用于产生源数据。日期要和运行程序为同一个日期,否则最后的出来的结果是0.
package user_visit;import java.util.Map;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;/*** * @description 生成数据* @author whiteshark* @time 2019年6月29日 上午9:45:40* @version*/
public class SourceSpout implements IRichSpout{private static final long serialVersionUID = 1L;SpoutOutputCollector collector;Queue<String> queue = new ConcurrentLinkedQueue<String>();String str = null;@Overridepublic void open(Map conf, TopologyContext context,SpoutOutputCollector collector) {try {System.out.println("SourceSpout prepare (taskId = " + context.getThisTaskId() + ") - 1");this.collector = collector;Random random = new Random();// 登录的网站是taobaoString hosts = "www.taobao.com";//每次登录的session idString[] sessionId = {"5GFBAT3D3100A7A7255027A70", "5X16BCA8823AC4BD9CD196A5D", "5CFBA5BD76BACF436ACA9DCC8", "5D16C3E0209C16DEAA28L1824","5I16CB309251CCF6CE6223BA1", "5C16BC4MB91B85661FE22F413","5D16C1F5191CF9371Y32B58CF", "5D16C7A886E2P2AE3EA29FC3E","5C3FBA728FD7D264B80769B23", "5B16C0F7215109AG43528BA2D","5N16C2FE51E5619C2A1244215", "5D16C1EB1C7A751AE03201C3F"};//登录的时间String[] times = {"2019-08-04 08:01:36", "2019-08-04 08:11:37", "2019-08-04 08:31:38", "2019-08-04 09:23:07", "2019-08-04 10:51:27", "2019-08-04 10:51:56","2019-08-04 11:01:07", "2019-08-04 11:01:20", "2019-08-04 11:45:30","2019-08-04 12:31:49", "2019-08-04 12:41:51", "2019-08-04 12:51:37", "2019-08-04 13:11:27", "2019-08-04 13:20:40", "2019-08-04 13:31:38",};for (int i = 0; i < 100; i++) {queue.add(hosts + "\t" + sessionId[random.nextInt(12)] + "\t" + times[random.nextInt(15)]);}System.out.println("SourceSpout prepare (taskId = " + context.getThisTaskId() + ") - 2");} catch (Exception e) {e.printStackTrace();}}@Overridepublic void close() {// TODO Auto-generated method stub}@Overridepublic void activate() {// TODO Auto-generated method stub}@Overridepublic void deactivate() {// TODO Auto-generated method stub}@Overridepublic void nextTuple() {// System.out.println("SourceSpout nextTuple");if (queue.size() >= 0) {this.collector.emit(new Values(queue.poll()));}try {Thread.sleep(200);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}@Overridepublic void ack(Object msgId) {// TODO Auto-generated method stub}@Overridepublic void fail(Object msgId) {// TODO Auto-generated method stub}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("threadId"));}@Overridepublic Map<String, Object> getComponentConfiguration() {// TODO Auto-generated method stubreturn null;}}
(3)FmtLogBolt,bolt类,4个线程
package user_visit;import java.util.Map;import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;public class FmtLogBolt implements IBasicBolt{/*** */private static final long serialVersionUID = 1L;private String eachLog = null;@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("date", "sessionId"));}@Overridepublic Map<String, Object> getComponentConfiguration() {// TODO Auto-generated method stubreturn null;}@Overridepublic void prepare(Map stormConf, TopologyContext context) {// TODO Auto-generated method stub}@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {eachLog = input.getString(0);if (null != eachLog && eachLog.length() > 0) {// 分别发送日期,sessionIdcollector.emit(new Values(DateFmt.getCountDate(eachLog.split("\t")[2], DateFmt.DATE_SHORT), eachLog.split("\t")[1]));}}@Overridepublic void cleanup() {// TODO Auto-generated method stub}
}
(4)DeepVisitBolt,bolt类,4个线程
package user_visit;import java.util.HashMap;
import java.util.Map;import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;/*** 局部汇总* @author silva**/
public class DeepVisitBolt implements IBasicBolt{/*** */private static final long serialVersionUID = 1L;private Map<String, Integer> counts = new HashMap<>();@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("date_sessionId", "count"));}@Overridepublic Map<String, Object> getComponentConfiguration() {// TODO Auto-generated method stubreturn null;}@Overridepublic void prepare(Map stormConf, TopologyContext context) {// TODO Auto-generated method stub}@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {String dateStr = input.getString(0);String sessionId = input.getString(1);Integer count = counts.get(dateStr + "_" + sessionId);if (null == count) {count = 0;}count++;counts.put(dateStr + "_" + sessionId, count);// 发送到下一级,做全局汇总collector.emit(new Values(dateStr + "_" + sessionId, count));}@Overridepublic void cleanup() {// TODO Auto-generated method stub}}
(5)UVSumBolt,bolt类,1个线程
package user_visit;import java.util.HashMap;
import java.util.Map;import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;/** 全局汇总*/
public class UVSumBolt implements IBasicBolt{private static final long serialVersionUID = 1L;private Map<String, Integer> counts = new HashMap<String, Integer>();private String curDate = null;private long beginTime;@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {try {long PV = 0; // 总数long UV = 0; // 去重后的个数long endTime = System.currentTimeMillis();String dateSessionId = input.getString(0);Integer count = input.getInteger(1);// 如果不是以当前日期开头,并且比当前日期大,说明已经跨天到第二天// 需要把map清空,curDate 设置为新的日期if (!dateSessionId.startsWith(curDate) && DateFmt.parseDate(dateSessionId.split("_")[0]).after(DateFmt.parseDate(curDate))) {curDate = dateSessionId.split("_")[0];counts.clear();}counts.put(dateSessionId, count);if (endTime - beginTime > 1 * 1000) {for (Map.Entry<String, Integer> map : counts.entrySet()) {if (map.getKey().startsWith(curDate)) { // 只统计今天的数据,过滤非今天的数据UV++; // 用户总数+1PV += map.getValue(); // 浏览数累加}}System.out.println("UV = " + UV + ", PV = " + PV);}} catch (Exception e) {e.printStackTrace();}}@Overridepublic void cleanup() {// TODO Auto-generated method stub}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// TODO Auto-generated method stub}@Overridepublic Map<String, Object> getComponentConfiguration() {// TODO Auto-generated method stubreturn null;}@Overridepublic void prepare(Map stormConf, TopologyContext context) {// 预处理,设置为当天时间,格式为 yyyy-MM-ddcurDate = DateFmt.getCountDate(null, DateFmt.DATE_SHORT);beginTime = System.currentTimeMillis();}
}
(6)UVTopo,topology类,定义spout,bolt类。
package user_visit;import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;/*** 程序的启动类* @description * @author whiteshark* @date 2019年8月2日 下午1:03:02*/
public class UVTopo {public static void main(String[] args) {// 1.创建topology, 拓扑对象TopologyBuilder topoBuilder = new TopologyBuilder();// 2. 设置spout,bolt// setSpout方法的3个参数分别为,// spout的id(string类型),实例,并发数。大量数据场景并行数设置大一些topoBuilder.setSpout("spout", new SourceSpout(), 1);// shuffle Grouping 分组,并发数为4topoBuilder.setBolt("fmtLogBolt", new FmtLogBolt(), 4).shuffleGrouping("spout");// fields Grouping 分组,并发数为4。根据 date + sessionId 分组,相同 date + sessionId 的被放到同一个bolt线程处理topoBuilder.setBolt("deepVisitBolt", new DeepVisitBolt(), 4).fieldsGrouping("fmtLogBolt", new Fields("date", "sessionId"));// shuffle Grouping 分组,并发数为1topoBuilder.setBolt("sumBolt", new UVSumBolt(), 1).shuffleGrouping("deepVisitBolt");// 3. 设置works个数Config conf = new Config();conf.put(Config.TOPOLOGY_WORKERS, 4);if (args.length > 0) {try {// 4. 分布式提交// 3个参数分别为:拓扑名称,stormconfig配置,拓扑实例StormSubmitter.submitTopology(args[0], conf, topoBuilder.createTopology());} catch (AlreadyAliveException e) {e.printStackTrace();} catch (InvalidTopologyException e) {e.printStackTrace();}} else {// 5. 本地模式提交LocalCluster localCluster = new LocalCluster();localCluster.submitTopology("mytopology", conf, topoBuilder.createTopology());}}
}
运行程序打印的结果。用户的浏览数为100次,有12个不同的用户访问。与spout发送的数据一致。
.
.
.
UV = 12, PV = 97
UV = 12, PV = 98
UV = 12, PV = 99
UV = 12, PV = 100
【Storm】Spout的storm-starter及Grouping策略、并发度讲解、网站浏览量和用户数统计相关推荐
- 用这4种策略提高你的Facebook广告浏览量
明智的是,你已经开始涉足视频营销.无论你是用视频来制作教育内容还是以产品为中心的广告,你都做了正确的决定.毕竟,现在是2020年:任何缺少视频的数字营销策略都是不完整的. 现在你已经进入了游戏中,是时 ...
- Storm部分:Storm Grouping -- 数据流分组(各种数据分发策略的练习)【Java版纯代码】
1.源数据 www.taobao.com XXYH6YCGFJYERTT834R52FDXV9U34 2017-02-21 12:40:49 www.taobao.com XXYH6YCGFJYERT ...
- 大数据技术之_17_Storm学习_Storm 概述+Storm 基础知识+Storm 集群搭建+Storm 常用 API+Storm 分组策略和并发度
大数据技术之_17_Storm学习 一 Storm 概述 1.1 离线计算是什么? 1.2 流式计算是什么? 1.3 Storm 是什么? 1.4 Storm 与 Hadoop 的区别 1.5 Sto ...
- storm spout mysql_storm+mysql集成
使用storm自带的JdbcInsertBolt插入数据 maven xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http:/ ...
- Storm 01之 Storm基本概念及第一个demo
2.1 Storm基本概念 在运行一个Storm任务之前,需要了解一些概念: Topologies :[tə'pɑ:lədʒɪ]拓扑结构 Streams Spouts:[spaʊt]喷出; 喷射; 滔 ...
- storm apache java_Apache Storm 示例 Java 拓扑 - Azure HDInsight | Microsoft Docs
您现在访问的是微软AZURE全球版技术文档网站,若需要访问由世纪互联运营的MICROSOFT AZURE中国区技术文档网站,请访问 https://docs.azure.cn. 以 Java 语言创建 ...
- 【Storm】Storm简介及Storm集群的安装部署
1.Storm概述 (1)Storm简介 Storm最早是由BackType公司开发的实时处理系统,底层由Clojure实现.Clojure也是一门基于JVM的高级面向函数式的编程语言.2011年Tw ...
- BigData之Storm:Apache Storm的简介、深入理解、下载、案例应用之详细攻略
BigData之Storm:Apache Storm的简介.深入理解.下载.案例应用之详细攻略 目录 Apache Storm的简介 Apache Storm的深入理解 1.Storm与hadoop ...
- 用实例的方式去理解storm的并发度
什么是storm的并发度 一个topology(拓扑)在storm集群上最总是以executor和task的形式运行在suppervisor管理的worker节点上.而worker进程都是运行在jvm ...
最新文章
- Solidworks2017安装与破解
- c语言1到20找最大和最小相邻,一组数据里面怎样查找相邻和相同的整数算法设计解决方案...
- PMCAFF问答精选 | 滴滴跟UBER最主要的区别是什么?
- 幼儿园调查过程怎么写_深圳全托幼儿园哪个好 幼儿园寄宿怎么报名
- 学好英语网首页制作_没有美术基础的新手小白,如何学好淘宝美工?
- java集合是wftc_java的集合是什么?
- mybatis update 返回值
- GIS_gdal geotiff文件与C# 数组array之间的转换
- linux 程序发包,软件测试常用linux发包命令
- 初学Linux的简单命令(一)
- 网络规划设计师教程第二版目录
- matlab 画图 方程,matlab 画图与解方程
- 用html写游戏,Html5写一个简单的俄罗斯方块小游戏
- Problem A. Snapper Chain 问题A.按扣链条 解决办法
- Windows11设置共享打印机
- 使用 Ceph 作为 OpenStack 的统一存储解决方案
- Vue项目的打包\部署\优化
- mysql日期时间类型
- Java中正则表达式的使用
- chatgpt赋能Python-python_hanning
热门文章
- 服务器怎么u盘装麒麟系统,U盘安装优麒麟(ubuntukylin)系统方法
- 2018nit计算机应用基础分数分布,2018考研数学各科目分值比例及题型分布
- Autox.js与vscode的连接,并实现自动打卡功能(ccit)
- 赘婿背后的IP变革,“爽”文正在分化网文变现的方式
- 计算机课教学档案,档案专业计算机课改革项目 计算机课程教学大纲.doc
- 发那科机器人程序备份及导入
- C#连接云服务器MySql数据库
- 命令行 控制 易微联 wifi通断器
- EC20 不常用AT 命令 整理
- EC2創建新用戶用秘钥登录