maven先安装好。

以下讲storm-starter的使用。

1、从github下载官方的storm-starter例子包,是maven工程,

地址 https://github.com/nathanmarz/storm-starter

2、把文件解压复制到workspace目录下,用cmd命令行,在该文件目录下运行mvn eclipse:eclipse,生成eclipse所用的文件,使得maven工程变成eclipse可用的工程。

3、导入到eclipse。新建源码文件夹lesson。把上一节storm入门案例工程的lesson包,整个复制到storm-starter-master的lesson源码文件夹下。

4、选中项目右键,Run as maven package,用maven打包。在target文件夹下有2个jar包。

storm-starter-0.0.1-SNAPSHOT.jar是不含依赖,只含有工程代码,较小。

storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jar 包含依赖,较大,通常需要由依赖的包。

5、官方提供的例子

分组策略(Stream Grouping)

stream grouping 用来定义一个 stream 应该如何分配给 Bolts 上面的多个Tasks,也就是分配给Bolts上面的多个Executors(多线程,并发度)。

1、Storm 里面有 6种类型的 stream grouping:

注:1)、2)、5)最常用,其他基本不用。

单线程等于All Grouping

1)Shuffle Grouping:随机分组,随机派发 stream 里面的 tuple,保证每个 bolt 接收到的 tuple 数目大致相同。通过轮询实现,保证平均分配。

2)Fields Grouping:按字段分组,比如按 userid 来分组,具有同样 userid 的 tuple 会被分到相同的 bolts,而不同的 userid 则会被分配到不同的 bolts。

3)All Grouping:广播发送,对于每一个 tuple,所有的 bolts 都会收到。

4)Global Grouping:全局分组,这个 tuple 被分配到 storm 中的一个 bolt 的其中一个 task。再具体一点就是分配给 id 值最低的那个 task。

5)None Grouping:不分组,这个分组的意思是说 stream 不关心到底谁会收到它的 tuple。目前这种分组和 Shuffle Grouping 是一样的效果,但是多线程下不平均分配。

6)Direct Grouping:直接分组,这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个 task 处理这个消息。只有被声明为 Direct Stream 的消息流可以声明这种分组方法。而且这种消息 tuple 必须使用 emitDirect 方法来发射。消息处理者可以通过 TopologyContext 来获取处理它的消息的 task 的 id (OutputCollector.emit 方法也会返回 task 的 id)。

7)Local or Shuffle Grouping:如果目标 bolt 有一个或者多个 task 在同一个工作进程中,tuple 将会被随机发送给这些 tasks。否则,和普通的 Shuffle Grouping 行为一致。

2、测试

1)Shuffle Grouping 轮循的方式。

在lesson的Main.java改代码。把bolt并发数改为2,也就是bolt会有2个线程。

     // bolt的方法有3个参数,// bolt的id(String类型),实例,并发数。大量数据场景并行数设置大一些// bolt的数据来源于spout,名称要和上文setSpout的id一致,否则不能获取到数据// shuffle发射规则,后续详讲topoBuilder.setBolt("bolt", new MyBolt(), 2).shuffleGrouping("spout");

lesson的MyBolt.java改代码。打印的内容增加当前的线程名。

         if (null != valStr) {num++;System.out.println(Thread.currentThread().getName() + ", lines : " + num + ", sessionId: " + valStr.split("\t")[1]);}

控制台打印的内容,显示有2个bolt线程,每个线程接收到的个数相同。每个线程打印出来的总行数相加,才等于track.log文件的总行数。

Thread-22-bolt, lines : 1, sessionId: 5CFBA5BD76BACF436ACA9DCC8
Thread-50-bolt, lines : 1, sessionId: 5D16C7A886E2P2AE3EA29FC3E
Thread-22-bolt, lines : 2, sessionId: 5D16C7A886E2P2AE3EA29FC3E
Thread-50-bolt, lines : 2, sessionId: 5C3FBA728FD7D264B80769B23
Thread-22-bolt, lines : 3, sessionId: 5D16C1F5191CF9371Y32B58CF
Thread-50-bolt, lines : 3, sessionId: 5C16BC4MB91B85661FE22F413
.
.
.
Thread-22-bolt, lines : 23, sessionId: 5GFBAT3D3100A7A7255027A70
Thread-50-bolt, lines : 23, sessionId: 5D16C1EB1C7A751AE03201C3F
Thread-50-bolt, lines : 24, sessionId: 5B16C0F7215109AG43528BA2D
Thread-22-bolt, lines : 24, sessionId: 5N16C2FE51E5619C2A1244215
Thread-22-bolt, lines : 25, sessionId: 5X16BCA8823AC4BD9CD196A5D
Thread-50-bolt, lines : 25, sessionId: 5C3FBA728FD7D264B80769B23

2)spout的并发数为2,bolt并发数为1.

topoBuilder.setSpout("spout", new MySpout(), 2);
topoBuilder.setBolt("bolt", new MyBolt(), 1).shuffleGrouping("spout");

控制台打印的内容显示,只有1个bolt线程,符合预期。但是bolt读取到的总行数是track.log文件行数的2倍。这是因为spout有2个线程,所以track.log被读取了2次。说明把文件当做spout数据来源是,shout的线程数只能是1.

Thread-23-bolt, lines : 1, sessionId: 5CFBA5BD76BACF436ACA9DCC8
Thread-23-bolt, lines : 2, sessionId: 5CFBA5BD76BACF436ACA9DCC8
Thread-23-bolt, lines : 3, sessionId: 5D16C7A886E2P2AE3EA29FC3E
Thread-23-bolt, lines : 4, sessionId: 5D16C7A886E2P2AE3EA29FC3E
Thread-23-bolt, lines : 5, sessionId: 5D16C7A886E2P2AE3EA29FC3E
.
.
.
Thread-23-bolt, lines : 96, sessionId: 5N16C2FE51E5619C2A1244215
Thread-23-bolt, lines : 97, sessionId: 5X16BCA8823AC4BD9CD196A5D
Thread-23-bolt, lines : 98, sessionId: 5X16BCA8823AC4BD9CD196A5D
Thread-23-bolt, lines : 99, sessionId: 5C3FBA728FD7D264B80769B23
Thread-23-bolt, lines : 100, sessionId: 5C3FBA728FD7D264B80769B23

3)Non Grouping

spout并发数为1,bolt并发数为2.

topoBuilder.setSpout("spout", new MySpout(), 1);
topoBuilder.setBolt("bolt", new MyBolt(), 2).noneGrouping("spout");

控制台打印内容如下,bolt有2个线程,线程名为Thread-23-bolt的计数器是20,线程名为Thread-50-bolt的计数器是30。说明在non grouping模式下,是不平均分配的。

Thread-23-bolt, lines : 2, sessionId: 5D16C7A886E2P2AE3EA29FC3E
Thread-50-bolt, lines : 1, sessionId: 5D16C7A886E2P2AE3EA29FC3E
Thread-23-bolt, lines : 3, sessionId: 5C3FBA728FD7D264B80769B23
Thread-23-bolt, lines : 4, sessionId: 5D16C1F5191CF9371Y32B58CF
Thread-50-bolt, lines : 2, sessionId: 5C16BC4MB91B85661FE22F413
.
.
.
Thread-23-bolt, lines : 17, sessionId: 5D16C1EB1C7A751AE03201C3F
Thread-23-bolt, lines : 18, sessionId: 5B16C0F7215109AG43528BA2D
Thread-50-bolt, lines : 30, sessionId: 5N16C2FE51E5619C2A1244215
Thread-23-bolt, lines : 19, sessionId: 5X16BCA8823AC4BD9CD196A5D
Thread-23-bolt, lines : 20, sessionId: 5C3FBA728FD7D264B80769B23

4)Fields Grouping 策略,

回顾 fields grouping 策略的作用:

(1)过滤,从源端(spout或上一级bolt)多输出Fields中选择某些field;

(2)相同的tuple会分发给同一个Executor或task处理。

典型场景:去重操作,join(企业用得不多,需要用到2个数据源,且2个数据源要同时,不能相差太久)

1个spout,2个bolt。

topoBuilder.setSpout("spout", new MySpout(), 1);
// Field grouping有2个参数。第一个是spout名称,第二个是field名称
topoBuilder.setBolt("bolt", new MyBolt(), 2).fieldsGrouping("spout", new Fields("log"));

效果和Non Grouping差不多。

Thread-50-bolt, lines : 1, sessionId: 5CFBA5BD76BACF436ACA9DCC8
Thread-50-bolt, lines : 2, sessionId: 5D16C7A886E2P2AE3EA29FC3E
Thread-50-bolt, lines : 3, sessionId: 5D16C7A886E2P2AE3EA29FC3E
Thread-50-bolt, lines : 4, sessionId: 5C3FBA728FD7D264B80769B23
Thread-50-bolt, lines : 5, sessionId: 5D16C1F5191CF9371Y32B58CF
.
.
.
Thread-50-bolt, lines : 26, sessionId: 5B16C0F7215109AG43528BA2D
Thread-21-bolt, lines : 22, sessionId: 5N16C2FE51E5619C2A1244215
Thread-21-bolt, lines : 23, sessionId: 5X16BCA8823AC4BD9CD196A5D
Thread-50-bolt, lines : 27, sessionId: 5C3FBA728FD7D264B80769B23

5)All grouping策略模式

1个spout,2个bolt。

topoBuilder.setSpout("spout", new MySpout(), 1);
// all grouping 广播方式
topoBuilder.setBolt("bolt", new MyBolt(), 2).allGrouping("spout");

spout会把每个数据分发给每一个下级的bolt,每个bolt线程获取到的行数都是一样的。开发时广播方式不常用。

Thread-23-bolt, lines : 1, sessionId: 5CFBA5BD76BACF436ACA9DCC8
Thread-49-bolt, lines : 1, sessionId: 5CFBA5BD76BACF436ACA9DCC8
Thread-23-bolt, lines : 2, sessionId: 5D16C7A886E2P2AE3EA29FC3E
Thread-49-bolt, lines : 2, sessionId: 5D16C7A886E2P2AE3EA29FC3E
.
.
.
Thread-23-bolt, lines : 49, sessionId: 5X16BCA8823AC4BD9CD196A5D
Thread-49-bolt, lines : 49, sessionId: 5X16BCA8823AC4BD9CD196A5D
Thread-23-bolt, lines : 50, sessionId: 5C3FBA728FD7D264B80769B23
Thread-49-bolt, lines : 50, sessionId: 5C3FBA728FD7D264B80769B23

6)Global Grouping 全局分组

1的spout,2个bolt。

topoBuilder.setSpout("spout", new MySpout(), 1);
// Global Grouping 全局分组
topoBuilder.setBolt("bolt", new MyBolt(), 2).globalGrouping("spout");

控制台打印的内容,有2个线程,名称分别为Thread-22-bolt 和 Thread-50-bolt。但是只有序号小的有接收到数据,序号大的没有接收到数据。Global Grouping 全局分组是把数据分配给id值最低的task。

[Thread-22-bolt] INFO  backtype.storm.daemon.executor - Prepared bolt bolt:(5)
[Thread-50-bolt] INFO  backtype.storm.daemon.executor - Prepared bolt bolt:(6)
.
.
.
Thread-22-bolt, lines : 1, sessionId: 5CFBA5BD76BACF436ACA9DCC8
Thread-22-bolt, lines : 2, sessionId: 5D16C7A886E2P2AE3EA29FC3E
Thread-22-bolt, lines : 3, sessionId: 5D16C7A886E2P2AE3EA29FC3E
.
.
.
Thread-22-bolt, lines : 47, sessionId: 5B16C0F7215109AG43528BA2D
Thread-22-bolt, lines : 48, sessionId: 5N16C2FE51E5619C2A1244215
Thread-22-bolt, lines : 49, sessionId: 5X16BCA8823AC4BD9CD196A5D
Thread-22-bolt, lines : 50, sessionId: 5C3FBA728FD7D264B80769B23

思考:读取文件案例思考

示例中用的是storm读取文件,把文件作为数据源,在企业中很少见。storm是分布式应用,数据会分发到每一台supervisor执行,读本地文件只在一台机器上。

1)Spout数据源可以是数据库、文件、MQ(比如:Kafka)。
2)数据源是数据库:只适合读取数据库的配置文件,但不能读取增量数据。
3)数据源是文件:只适合测试、讲课用(因为集群是分布式集群),其他无用。
4)企业产生的 log 文件处理步骤:
  (1)读出内容写 入MQ
  (2)Storm 再处理

读文件案例说明:

1)分布式应用无法读文件;

2)spout无法并发读,开并发会重复读。

并发度场景分析

场景分析:

单线程下:加减乘除(其实什么都可以做),和任何类进行操作。

多线程下:可以做局部加减乘除,不适合做全部加减乘除。

多线程下适合:

a、局部加减乘除

b、做处理类Operate,如split

c、持久化,如入DB

官方案例

1、统计单词 WordCountTopology,在storm-starter-master工程的目录如下。

为了便于理解,对官方的案例进行改写。在源码文件夹lesson下,创建包WordCount,复制WordCountTopology.java到WordCount包。

2、程序分析:

(1)有1个spout,有2个bolt。

(2)MyRandomSentenceSpout多次发送数据,nextTuple函数的实现是,每次发送的数据相同。

(3)MySplit,接收到spout的数据,数据都是字符串,对字符串进行分割。

(4)WordCount,接收从split发射的数据,都是单个字符,统计每个字符的个数。

3、各个程序代码和运行结果

(1)主程序WordCountTopology,用于统计单词个数的bolt程序WordCount

package WordCount;import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.task.ShellBolt;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import storm.starter.spout.RandomSentenceSpout;import java.util.HashMap;
import java.util.Map;/*** This topology demonstrates Storm's stream groupings and multilang capabilities.*/
public class WordCountTopology {public static class SplitSentence extends ShellBolt implements IRichBolt {public SplitSentence() {super("python", "splitsentence.py");}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}@Overridepublic Map<String, Object> getComponentConfiguration() {return null;}}// 统计每个单词出现的次数public static class WordCount extends BaseBasicBolt {Map<String, Integer> counts = new HashMap<String, Integer>();@Overridepublic void execute(Tuple tuple, BasicOutputCollector collector) {String word = tuple.getString(0);Integer count = counts.get(word);if (count == null)count = 0;count++;counts.put(word, count);//打印出当前线程名称,单词 和 个数System.out.println(Thread.currentThread().getName() + ", word = " + word + ", count = " + count);collector.emit(new Values(word, count));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word", "count"));}}public static void main(String[] args) throws Exception {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("spout", new MyRandomSentenceSpout(), 1);builder.setBolt("split", new MySplit(" "), 8).shuffleGrouping("spout");builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));Config conf = new Config();conf.setDebug(true);if (args != null && args.length > 0) {conf.setNumWorkers(3);StormSubmitter.submitTopology(args[0], conf, builder.createTopology());}else {conf.setMaxTaskParallelism(3);LocalCluster cluster = new LocalCluster();cluster.submitTopology("word-count", conf, builder.createTopology());//      Thread.sleep(10000);
//
//      cluster.shutdown();}}
}

(2)spout程序,MyRandomSentenceSpout。定义一个字符串数组,{ "a b c d", "e f g h", "i j k l"},为了便于观察,所以每个字符都不重复。每个字符串的字符由空格隔开,每个字符串逐个发送。

package WordCount;import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;import java.util.Map;
import java.util.Random;public class MyRandomSentenceSpout extends BaseRichSpout {private static final long serialVersionUID = 1L;SpoutOutputCollector _collector;Random _rand;// 单词字符串,由空格隔开String[] sentences = new String[]{ "a b c d", "e f g h", "i j k l"};@Overridepublic void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {_collector = collector;_rand = new Random();}@Overridepublic void nextTuple() {for (String sentence : sentences) {_collector.emit(new Values(sentence));}// 睡眠10秒钟Utils.sleep(10 * 1000);}@Overridepublic void ack(Object id) {}@Overridepublic void fail(Object id) {}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}}

(3)bolt程序,MySplit,接收字符串,分割成单个单词。

package WordCount;import java.util.Map;import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.FailedException;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;/*** IBasic开头的不需要写ack方法。会自动调用ack方法* @description bolt,分割单词. * @author whiteshark* @date 2019年6月30日 下午11:46:02*/
public class MySplit implements IBasicBolt{private String pattern;public MySplit(String pattern) {this.pattern = pattern;}/*** 每个bolt 和 spout 最好序列化,免得开高并发出错*/private static final long serialVersionUID = 1L;@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}@Overridepublic Map<String, Object> getComponentConfiguration() {// TODO Auto-generated method stubreturn null;}@Overridepublic void prepare(Map stormConf, TopologyContext context) {// TODO Auto-generated method stub}@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {try {String sentence = input.getString(0);if (null != sentence){// 字符串由空格隔开,用split分割成单个字符for (String word : sentence.split(pattern)) {collector.emit(new Values(word));}}// IBasic开头的不需要写ack方法。执行成功会自动调用ack方法// 如果抛出FailedException异常,失败了也会通知} catch (FailedException e) {e.printStackTrace();}}@Overridepublic void cleanup() {// TODO Auto-generated method stub}}

(4)运行主程序WordCountTopology,在控制台有打印bolt线程名称,单词,单词个数。spout运行了2次,所有发送了2次数据。可以看到,由于在bolt程序WordCount使用了fields,每个线程两次处理的字符都相同。

验证了上文在Field Grouping策略中提到的,相同的tuple会分发给同一个Executor或task处理。

Thread-16-count, word = e, count = 1
Thread-16-count, word = h, count = 1
Thread-16-count, word = k, count = 1
Thread-16-count, word = b, count = 1
Thread-18-count, word = f, count = 1
Thread-18-count, word = i, count = 1
Thread-18-count, word = l, count = 1
Thread-18-count, word = c, count = 1
Thread-20-count, word = g, count = 1
Thread-20-count, word = j, count = 1
Thread-20-count, word = a, count = 1
Thread-20-count, word = d, count = 1Thread-16-count, word = b, count = 2
Thread-16-count, word = e, count = 2
Thread-16-count, word = h, count = 2
Thread-16-count, word = k, count = 2
Thread-18-count, word = f, count = 2
Thread-18-count, word = c, count = 2
Thread-18-count, word = i, count = 2
Thread-18-count, word = l, count = 2
Thread-20-count, word = a, count = 2
Thread-20-count, word = g, count = 2
Thread-20-count, word = d, count = 2
Thread-20-count, word = j, count = 2

并发度

在Storm中,一个task可以简单的理解为在集群某节点上运行的一个spout或者bolt实例。在集群运行运行中,topology主要有四个组成部分:他们从低到高分别是:task(bolt/spout实例)、Executor(线程)、Workers(JVM虚拟机)、Nodes(服务器)

各个部分的含义如下:

(1)Nodes(服务器):是指配置在一个Storm集群中的服务器,会执行topology的一部分运算。一个Storm集群可以包括一个或者多个工作node。

(2)Workers(JVM虚拟机):是指一个node节点服务器上相互独立运行的JVM进程。每一个node可以配置运行一个或者多个worker。一个topology会分配到一个或者多个worker上运行。

(3)Executor(线程):是指一个worker的JVM进程中运行的Java线程。多个task可以指派给同一个executor来执行。除非是明确指定,Storm默认会给每一个executor分配一个task。

(4)Task(bolt/spout实例):task是spout和bolt的实例,里面的nextTuple()和execute()方法会被executors线程调用执行。

并发度:用户指定一个任务,可以被多个线程执行,并发度的数量等于线程 executor 的数量
task 就是具体的处理逻辑对象,一个 executor 线程可以执行一个或多个 tasks,但一般默认每个 executor 只执行一个 task,所以我们往往认为 task 就是执行线程,其实不是。
task 代表最大并发度,一个 component 的 task 数是不会改变的,但是一个 componet 的 executer 数目是会发生变化的(storm rebalance 命令),task 数 >= executor 数,executor 数代表实际并发数

结构图如下:

WordCountTopology 统计单词的案例,包含的的spout和bolt如下,

(1)spout,类名为SentenceSpout,产生字符串。

(2)bolt,类名为SplitSentence,分割字符串为单词。

(3)bolt,类名为WordCount,统计单词。

(4)bolt,类名为ReportBolt,报告单词统计。

设置不同的线程数和任务数,看并发图

(1)默认情况下,每个 spout / bolt 的并发度(executor)是1,任务(task)也是1。

builder.setSpout(SENTENCE_SPOUT_ID, spout);
// SentenceSpout --> SplitSentenceBolt
builder.setBolt(SPLIT_BOLT_ID, splitBolt).shuffleGrouping(SENTENCE_SPOUT_ID);
// SplitSentenceBolt --> WordCountBolt
builder.setBolt(COUNT_BOLT_ID, countBolt).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
// WordCountBolt --> ReportBolt
builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(COUNT_BOLT_ID);

并发图如下。唯一的并发机制出现在线程级,每个任务在同一个JVM的不同线程中运行。如何增加并发度以充分利用硬件能力?让我们来增加分配给topology 的 worker 和 executer 的数量。

(2)把 SentenceSpout 的并发度设置为2,worker不变。


//这个2指的是有两个executor,虽然没有显示指定task的数量,
//1个executor至少有1个task。因为executor为2,默认task也就是2
builder.setSpout(SENTENCE_SPOUT_ID, spout, 2);

并发图如下。SentenceSpout 有2个线程,每个线程有1个任务。

(3)配置worker数量为2,SplitSentence设置为4个task和2个executor。WordCount设置为4个executor。

Config config = new Config();
config.setNumWorkers(2);
builder.setBolt(SPLIT_BOLT_ID, splitBolt, 2).setNumTasks(4).shuffleGrouping(SENTENCE_SPOUT_ID);
// SplitSentenceBolt --> WordCountBolt
builder.setBolt(COUNT_BOLT_ID, countBolt, 4).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));

并发图如下:

网站浏览量和用户数统计

PV-UV案例需求分析

网站最常用的2个指标:

PV(page views):count(session_id)

UV(user views):count(distinct session_id)

多线程下,注意线程安全问题。

一、PV统计

方案分析

如下是否可行?

1、定义static long pv,Synchronized控制累计操作。

Synchronized和Lock在单JVM下有效,但在多JVM下无效。

可行的两个方案:

1、shuffleGrouping下,pv * executor并发数。比较简单,但只能局限于shuffleGrouping,且会有中间数据。

2、bolt1进行多并发局部汇总,bolt2单线程进行全局汇总。这种方式可行,推荐这种方式。

线程安全:多线程处理的结果和单线程一致,就是线程安全。否则不安全。

案例代码

准备数据:track.log文件有50行数据。每行有3列,分别为网站,sessionId,时间。列与列由tab隔开。如下图。

www.taobao.com   5CFBA5BD76BACF436ACA9DCC8   2019-06-29 11:01:20
www.taobao.com  5D16C7A886E2P2AE3EA29FC3E   2019-06-29 08:01:36
www.taobao.com  5D16C7A886E2P2AE3EA29FC3E   2019-06-29 10:51:27

(1)程序的启动类。1个spout,4个bolt1,1个汇总的sumbolt。

package visits;import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;/*** 程序的启动类* @description * @author whiteshark* @date 2019年7月6日 下午5:13:42*/
public class Main {public static void main(String[] args) {// 1.创建topology, 拓扑对象TopologyBuilder topoBuilder = new TopologyBuilder();// 2. 设置spout,bolt// setSpout方法的3个参数分别为,// spout的id(string类型),实例,并发数。大量数据场景并行数设置大一些topoBuilder.setSpout("spout", new MySpout(), 1);// shuffle Grouping 分组topoBuilder.setBolt("bolt", new PVBolt1(), 4).shuffleGrouping("spout");// 汇总topoBuilder.setBolt("sumbolt", new PVSumBolt(), 1).shuffleGrouping("bolt");// 3. 设置works个数Config conf = new Config();conf.put(Config.TOPOLOGY_WORKERS, 4);if (args.length > 0) {try {// 4. 分布式提交// 3个参数分别为:拓扑名称,stormconfig配置,拓扑实例StormSubmitter.submitTopology(args[0], conf, topoBuilder.createTopology());} catch (AlreadyAliveException e) {e.printStackTrace();} catch (InvalidTopologyException e) {e.printStackTrace();}} else {// 5. 本地模式提交LocalCluster localCluster = new LocalCluster();localCluster.submitTopology("mytopology", conf, topoBuilder.createTopology());}}
}

(2)spout,从track.log读取每一行,发送到下一级bolt。

package visits;import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.util.Map;import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;/*** * @description Spout,读取数据,一行一行的发送到下一级处理* @author whiteshark* @date 2019年6月30日 上午9:28:17*/
public class MySpout implements IRichSpout{private static final long serialVersionUID = 1L;private FileInputStream fis;private InputStreamReader isr;private BufferedReader br;private String str  = null;SpoutOutputCollector collector = null;@Overridepublic void nextTuple() {try {while ((str = this.br.readLine()) != null) {/*在这可以对数据进行加工或过滤*/// 发射数据collector.emit(new Values(str));// 为了在控制台观察打印出来的数据,这里暂停Thread.sleep(500);}} catch (Exception e) {e.printStackTrace();}}// 在提交作业时,会把storm.yaml的所有项读取到Map中,在open方法中如果修改map的值,// 会覆盖原来storm.yaml所定义的值,@Overridepublic void open(Map conf, TopologyContext content, SpoutOutputCollector collector) {this.collector = collector;try {this.fis = new FileInputStream("track.log");this.isr = new InputStreamReader(fis, "UTF-8");this.br = new BufferedReader(isr);} catch (Exception e) {e.printStackTrace();}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// 定义发射数据的格式declarer.declare(new Fields("log"));}@Overridepublic Map<String, Object> getComponentConfiguration() {// TODO Auto-generated method stubreturn null;}// 下一级成功应答ack,打印内容@Overridepublic void ack(Object msgId) {System.out.println("spout ack: " + msgId.toString());}@Overridepublic void activate() {// TODO Auto-generated method stub}// 资源关闭@Overridepublic void close() {try {br.close();isr.close();fis.close();} catch (Exception e) {e.printStackTrace();}}@Overridepublic void deactivate() {// TODO Auto-generated method stub}// 下一级失败应答ack,打印内容@Overridepublic void fail(Object msgId) {System.out.println("spout fail: " + msgId.toString());}
}

(3)bolt1,在main中设置有4个线程。接收从spout发射的每一行,分割成3个字段。并统计第二个字段(sessionId)的个数,sessionId的个数和当前线程序号,一起传递到sumBolt做最后的汇总。

package visits;import java.util.Map;import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;public class PVBolt1 implements IRichBolt{private static final long serialVersionUID = 1L;OutputCollector collector;// 初始化方法@Overridepublic void prepare(Map stormConf, TopologyContext context,OutputCollector collector) {this.collector = collector;}String logString = null;String sessionId = null;Integer pv = 0;@Overridepublic void execute(Tuple input) {logString = input.getString(0);sessionId = logString.split("\t")[1];if (null != sessionId) {pv++;}// 第一个值是当前线程ID,第二个值是浏览次数collector.emit(new Values(Thread.currentThread().getId(), pv));System.out.println(Thread.currentThread().getName() + ", pv = " + pv);}@Overridepublic void cleanup() {// TODO Auto-generated method stub}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("threadId", "pv"));}@Overridepublic Map<String, Object> getComponentConfiguration() {// TODO Auto-generated method stubreturn null;}
}

(4)PVSumBolt,做最后的汇总。从上一级接收线程序号和sessionId个数,保存和更新到map中。遍历map,累加所有的个数。得出总的个数。

package visits;import java.util.HashMap;
import java.util.Map;import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;public class PVSumBolt implements IRichBolt{private static final long serialVersionUID = 1L;Map<Long, Integer> counts = new HashMap<Long, Integer>();@Overridepublic void prepare(Map stormConf, TopologyContext context,OutputCollector collector) {}@Overridepublic void execute(Tuple input) {Long threadId = input.getLong(0);Integer pv = input.getInteger(1);counts.put(threadId, pv);long wordSum =0;// 获取总数,遍历counts的values,进行累加for (Map.Entry<Long, Integer> count : counts.entrySet()) {wordSum += count.getValue();}System.out.println(Thread.currentThread().getName() + ", wordSum = " + wordSum);}@Overridepublic void cleanup() {// TODO Auto-generated method stub}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// TODO Auto-generated method stub}@Overridepublic Map<String, Object> getComponentConfiguration() {// TODO Auto-generated method stubreturn null;}
}

(5)输出结果:可以看出,有4个bolt1线程,只有1个sumbolt汇总结果。

Thread-26-bolt, pv = 1
Thread-57-sumbolt, wordSum = 1
Thread-53-bolt, pv = 1
Thread-57-sumbolt, wordSum = 2
Thread-23-bolt, pv = 1
Thread-57-sumbolt, wordSum = 3
Thread-55-bolt, pv = 1
Thread-57-sumbolt, wordSum = 4
.
.
.
Thread-53-bolt, pv = 11
Thread-57-sumbolt, wordSum = 41
Thread-23-bolt, pv = 11
Thread-57-sumbolt, wordSum = 42
Thread-55-bolt, pv = 11
Thread-57-sumbolt, wordSum = 43
Thread-26-bolt, pv = 11
Thread-57-sumbolt, wordSum = 44
Thread-23-bolt, pv = 12
Thread-57-sumbolt, wordSum = 45
Thread-55-bolt, pv = 12
Thread-57-sumbolt, wordSum = 46
Thread-26-bolt, pv = 12
Thread-57-sumbolt, wordSum = 47
Thread-53-bolt, pv = 12
Thread-57-sumbolt, wordSum = 48
Thread-26-bolt, pv = 13
Thread-57-sumbolt, wordSum = 49
Thread-53-bolt, pv = 13
Thread-57-sumbolt, wordSum = 50

PV-UV案例优化引入Zookeeper锁控制线程操作

汇总型方案:

1、在shuffleGrouping下,pv(单线程结果)*Executer并发数,一个Executer默认一个task,如果设置Task数大于1,公式应该是 pv(单线程结果)*Task数。同一个Executer下task的线程ID相同,taskId不同。

优点:简单、计算量小

缺点:存在有一点误差,但大部分场景能接受。

优化:

案例PVBolt中每个Task都会输出一个汇总值,实际只需要一个Task输出汇总值。利用Zookeeper锁来做到只有一个Task输出汇总值,而且每5秒输出一次。

2、bolt1进行多次并发局部汇总,bolt2单线程进行全局汇总。

优点:

(1)计算绝对准确;

(2)如果用fieldGrouping可以得到中间值,如单个user的访问PV(访问深度,也是有用的指标)

缺点:计算量稍大,且多一个Bolt。

预处理:现在虚拟机hadoop-senior和hadoop-senior02启动zookeeper集群。先创建"/lock”目录,再创建"/lock/storm”目录,保存的值均为空。

案例代码:

(1)SourceSpout用于产生源数据。每次产生100行字符串,每行字符串的格式均为:域名 + "\t" + sessionId + "\t" +时间

package lock;import java.util.Map;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;/*** * @description 生成数据* @author whiteshark* @time 2019年6月29日 上午9:45:40* @version*/
public class SourceSpout implements IRichSpout{private static final long serialVersionUID = 1L;SpoutOutputCollector collector;Queue<String> queue = new ConcurrentLinkedQueue<String>();String str = null;@Overridepublic void open(Map conf, TopologyContext context,SpoutOutputCollector collector) {try {System.out.println("SourceSpout prepare (taskId = " + context.getThisTaskId() + ") - 1");this.collector = collector;Random random = new Random();// 登录的网站是taobaoString hosts = "www.taobao.com";//每次登录的session idString[] sessionId = {"5GFBAT3D3100A7A7255027A70", "5X16BCA8823AC4BD9CD196A5D", "5CFBA5BD76BACF436ACA9DCC8", "5D16C3E0209C16DEAA28L1824","5I16CB309251CCF6CE6223BA1", "5C16BC4MB91B85661FE22F413","5D16C1F5191CF9371Y32B58CF", "5D16C7A886E2P2AE3EA29FC3E","5C3FBA728FD7D264B80769B23", "5B16C0F7215109AG43528BA2D","5N16C2FE51E5619C2A1244215", "5D16C1EB1C7A751AE03201C3F"};//登录的时间String[] times = {"2019-06-29 08:01:36", "2019-06-29 08:11:37", "2019-06-29 08:31:38", "2019-06-29 09:23:07", "2019-06-29 10:51:27", "2019-06-29 10:51:56","2019-06-29 11:01:07", "2019-06-29 11:01:20", "2019-06-29 11:45:30","2019-06-29 12:31:49", "2019-06-29 12:41:51", "2019-06-29 12:51:37", "2019-06-29 13:11:27", "2019-06-29 13:20:40", "2019-06-29 13:31:38",};for (int i = 0; i < 100; i++) {queue.add(hosts + "\t" + sessionId[random.nextInt(12)] + "\t" + times[random.nextInt(15)]);}System.out.println("SourceSpout prepare (taskId = " + context.getThisTaskId() + ") - 2");} catch (Exception e) {e.printStackTrace();}}@Overridepublic void close() {// TODO Auto-generated method stub}@Overridepublic void activate() {// TODO Auto-generated method stub}@Overridepublic void deactivate() {// TODO Auto-generated method stub}@Overridepublic void nextTuple() {// System.out.println("SourceSpout nextTuple");if (queue.size() >= 0) {this.collector.emit(new Values(queue.poll()));}try {Thread.sleep(200);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}@Overridepublic void ack(Object msgId) {// TODO Auto-generated method stub}@Overridepublic void fail(Object msgId) {// TODO Auto-generated method stub}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("threadId"));}@Overridepublic Map<String, Object> getComponentConfiguration() {// TODO Auto-generated method stubreturn null;}}

(2)PVBolt,在prepare中,会创建一个zookeeper 临时目录 "/lock/storm/pv",保存的值为 IP + ":" + taskId,所以只有一个任务能创建成功该临时目录。在execute方法中,每次读取一行字符串,取sessionId累加。如果是创建zookeeper目录的任务task,每隔5秒会输出总的sessionId个数。

package lock;import java.net.InetAddress;
import java.util.Map;import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;public class PVBolt implements IRichBolt{private static final long serialVersionUID = 1L;//在zk中创建锁的路径public static final String zkPath = "/lock/storm/pv";ZooKeeper zKeeper = null;String lockData = null;OutputCollector collector;String logString = null;String sessionId = null;Integer pv = 0;long beginTime = System.currentTimeMillis();long endTime = 0;// 初始化方法@Overridepublic void prepare(Map stormConf, TopologyContext context,OutputCollector collector) {
//      this.collector = collector;try {System.out.println("PVBolt prepare (taskId = " + context.getThisTaskId() + ") - 1");// 新建zk客户端到zk集群。3秒连不到集群算超时. ip地址是虚拟机的地址,2台虚拟机都有装zk,并且已经启动,存在目录"/lock/storm"zKeeper = new ZooKeeper("192.168.178.131:2181,192.168.178.132:2181", 20000, new Watcher(){@Overridepublic void process (WatchedEvent event) {System.out.println("event : " + event.getType());}});System.out.println("PVBolt prepare (taskId = " + context.getThisTaskId() + ") - 2");//如果没有连接到集群,休眠1秒,让其重连。如果没有连接成功,一直等待while (zKeeper.getState() != ZooKeeper.States.CONNECTED) {Thread.sleep(1000);}System.out.println("PVBolt prepare (taskId = " + context.getThisTaskId() + ") - 3");//获得本机的ipInetAddress address = InetAddress.getLocalHost();//保存在zk路径中的值,IP + ":" + taskIdlockData = address.getHostAddress() + ":" + context.getThisTaskId();System.out.println("PVBolt prepare (taskId = " + context.getThisTaskId() + ") - 4");// 只是判断目录是否存在,不放监控watchif (null == zKeeper.exists(zkPath, false)) {//如果不存在该目录,创建一个临时目录节点zKeeper.create(zkPath, lockData.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);System.out.println("PVBolt prepare (taskId = " + context.getThisTaskId() + ") - create " + zkPath);}System.out.println("PVBolt prepare (taskId = " + context.getThisTaskId() + ") - 5");} catch (Exception e) {try {zKeeper.close();} catch (InterruptedException e1) {e1.printStackTrace();}}}@Overridepublic void execute(Tuple input) {try {logString = input.getString(0);if (logString != null) {endTime = System.currentTimeMillis();sessionId = logString.split("\t")[1];if (null != sessionId) {pv++;}// 第一个值是当前线程ID,第二个值是浏览次数
//              collector.emit(new Values(Thread.currentThread().getId(), pv));// 每5秒打印一次if (endTime - beginTime >= 5000) {System.err.println(lockData + "=============================");
//                  System.out.println("lockData = " + new String(zKeeper.getData(zkPath, false, null)));if (lockData.equals(new String(zKeeper.getData(zkPath, false, null)))) {System.out.println("pv ====================== " + pv * 4);}beginTime = System.currentTimeMillis();}}} catch (KeeperException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}@Overridepublic void cleanup() {// TODO Auto-generated method stub}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {
//      declarer.declare(new Fields("threadId"));}@Overridepublic Map<String, Object> getComponentConfiguration() {// TODO Auto-generated method stubreturn null;}
}

(3)PVTopo,设置spout并发数为1,bolt并发数为4,并且策略为shuffle Grouping,使得数据均匀分配到每个bolt。

package lock;import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;/*** 程序的启动类* @description * @author whiteshark* @date 2019年7月6日 下午5:13:42*/
public class PVTopo {public static void main(String[] args) {// 1.创建topology, 拓扑对象TopologyBuilder topoBuilder = new TopologyBuilder();// 2. 设置spout,bolt// setSpout方法的3个参数分别为,// spout的id(string类型),实例,并发数。大量数据场景并行数设置大一些topoBuilder.setSpout("spout", new SourceSpout(), 1);// shuffle Grouping 分组,并发数为4topoBuilder.setBolt("bolt", new PVBolt(), 4).shuffleGrouping("spout");// 3. 设置works个数Config conf = new Config();conf.put(Config.TOPOLOGY_WORKERS, 4);if (args.length > 0) {try {// 4. 分布式提交// 3个参数分别为:拓扑名称,stormconfig配置,拓扑实例StormSubmitter.submitTopology(args[0], conf, topoBuilder.createTopology());} catch (AlreadyAliveException e) {e.printStackTrace();} catch (InvalidTopologyException e) {e.printStackTrace();}} else {// 5. 本地模式提交LocalCluster localCluster = new LocalCluster();localCluster.submitTopology("mytopology", conf, topoBuilder.createTopology());}}
}

输入的结果为:

192.168.43.105:6=============================
192.168.43.105:8=============================
192.168.43.105:5=============================
pv ====================== 16
192.168.43.105:7=============================
192.168.43.105:8=============================
192.168.43.105:6=============================
192.168.43.105:7=============================
192.168.43.105:5=============================
pv ====================== 44
192.168.43.105:8=============================
192.168.43.105:6=============================
192.168.43.105:7=============================
192.168.43.105:5=============================
pv ====================== 72
192.168.43.105:8=============================
192.168.43.105:6=============================
192.168.43.105:7=============================
192.168.43.105:5=============================
pv ====================== 100

如果程序不关闭,在zookeeper能看到临时目录"/lock/storm/pv"保存的值(IP + ":" + taskId):

[zk: localhost:2181(CONNECTED) 18] get /lock/storm/pv
192.168.43.105:5
cZxid = 0x2000000050
ctime = Tue Jul 30 00:02:39 CST 2019
mZxid = 0x2000000050
mtime = Tue Jul 30 00:02:39 CST 2019
pZxid = 0x2000000050
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x16c3e672803000f
dataLength = 16
numChildren = 0

打包发布到虚拟机storm集群运行

上面的案例是在本地机子上运行。接下来要在storm集群运行。

1、工程打包,工程右键 Run AS,点击Maven install,最终控制台显示“BUILD SUCCESS”,说明打包成功。

在target目录下,有2个包,名称带有depedencies的是有依赖,需要的是这个包。

用Filezilla 把包发送到虚拟机hadoop-senior的目录/opt/datas/stormjars下

2、启动虚拟机的storm集群。

先启动hadoop-senior虚拟机和hadoop-senior02虚拟机上的zookeeper。

在hadoop-senior虚拟机启动storm作为主节点,并启动ui。nohup是后台启动,关闭窗口不会停止该storm节点

nohup ./storm nimbus &
nohup ./storm ui &

在hadoop-senior02虚拟机启动storm作为从节点。

nohup ./storm supervisor &

在浏览器http://hadoop-senior:8081,可以看到集群的监控界面

3、提交拓扑任务

在hadoop-senior的storm安装目录下,进入bin目录。输入命令提交拓扑任务到集群

./storm jar /opt/datas/stormjars/storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jar lock.PVTopo PVTopo

包是在/opt/datas/stormjars下,所以要加上前缀。后面带有2个参数,参数1是类名,类名前缀要加上具体的包,参数2是拓扑名,拓扑名必须唯一。这里参数1是lock.PVTopo,参数2是PVTopo。

输入提交拓扑任务的命令后,截取部分控制台的输出脚本,可以看到已经完成提交。

在监控界面可以看到,新增一个拓扑,名称为PVTopo,状态为active。

点击拓扑名称,进入查看拓扑详情,可以看到有1个spout,4个bolt。spout发送了2060条数据,4个bolt总共接收了2060条数据。

4、查看运行结果

点击bolt进入查看bolt详情,4个bolt都在hadoop-senior02虚拟机的storm上运行,端口是work占用。我们在bolt程序中,有输出总的sessionid个数,有4个bolt,只有1个bolt有输出总数。如果要查看每一个bolt的log文件,这样要耗时。

我们在zookeeper中,保存在"/lock/storm/pv"的值是 IP + taskId,所以找到taskid,就知道在哪个虚拟机和端口。

从图片中可以看到,有输出sessionid总数的taskid是6,输出的log日志在hadoop-senior02虚拟机logs目录下,端口为6707.

因为4个bolt都是在hadoop-senior02虚拟机上运行,所以在hadoop-senior02虚拟机storm目录logs目录下,可以看到各个bolt的log文件。查看logs目录下的文件列表。4个bolt对应的log文件。每个bolt的log文件由端口号来识别。

有输出sessionid总数的log是 worker-6706.log,查看文件,有输出总数。上文中,soput发送和bolt接收的数量都远大于这个数,是因为发送的有空字符串。sessionid统计的是非空。

5、停止拓扑任务

有2中方法。分别是监控界面操作,命令行操作。

(1)界面操作(没有权限控制,只要能登陆进这个界面就能操作,所以不推荐),所有点deactivate,使任务停止(并没有立即停止,而是处理完正在运行中的程序),此时任务状态为inactivate。再点击kill,任务停止运行。

此时任务状态为inactivate,inactivate的下一个操作可以是activate,或者kill。顾名思义,activate是使得任务继续运行,kill是杀死任务,使得任务完全停止。

点击kill,完全停止任务。状态变为killed

(2)命令行操作,在storm的bin目录下,输入命令

./storm kill PVTopo

案例升级 计算网站UV(去重计算模式)

方案分析

1、把sessionId放入set实现自动去重,set.size()获得UV

可行的方案(类似wordcount的计算去重word总数):

bolt1通过fieldGrouping进行多线程局部汇总,下一级bolt2进行单线程保存sessionId和count数到map且进行遍历,可以得到:

PV,UV,访问深度(每个sessionId的浏览数)

简单、快速,但比较耗内存,要求集群资源内存多。

适用于中小企业,不适合大企业。

适用于小数据量,如订单。

2、no-sql分布式数据库,如HBase

通过rowkey实现去重,统计行数得到去重后的sessionId总数。

适用于大数据量,如统计流量。

storm的局限性:

storm应用场景广泛,但能做的复杂度有限,通常为汇总型。

对源数据做预处理,写入数据库。

下文的案例采用第一种方案。

程序分析:

(1)DateFmt,工具类,用于把长日期字符串,转化为短日期字符串(只含有年月日);

(2)SourceSpout ,spout类,1个线程,用于产生源数据。每次产生100行字符串,每行字符串的格式均为:域名 + "\t" + sessionId + "\t" +时间;

(3)FmtLogBolt,bolt类,4个线程,接收spout的数据,截取日期和sessionId发送到下一级;

(4)DeepVisitBolt,bolt类,4个线程,局部汇总,接收FmtLogBolt的数据,把接收到的日期和sessionId拼接成以下形式的字符串:日期+ "_" + sessionId。定义map,把日期+ "_" + sessionId作为key,每次进来相同的key,value加1;把日期+ "_" + sessionId,以及对应的value发送到下一级;

(5)UVSumBolt,bolt类,1个线程,全局汇总,接收DeepVisitBolt的数据。统计PV:用户的浏览数(对用户不去重),UV:用户的浏览数(对用户去重)

(6)UVTopo,topology类,定义spout,bolt类。

案例代码:

(1)DateFmt,工具类

package user_visit;import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;/*** 工具类* @author silva**/
public class DateFmt {public static final String DATE_LONG = "yyyy-MM-dd HH:mm:ss";public static final String DATE_SHORT = "yyyy-MM-dd";public static SimpleDateFormat sdf = new SimpleDateFormat(DATE_SHORT);// 返回指定格式的字符串public static String getCountDate(String date, String pattern) {SimpleDateFormat sdf = new SimpleDateFormat(pattern);Calendar cal = Calendar.getInstance();if (null != date) {try {cal.setTime(sdf.parse(date));} catch (Exception e) {e.printStackTrace();}}return sdf.format(cal.getTime());}// 字符串转化为datepublic static Date parseDate(String dateStr) throws Exception {return sdf.parse(dateStr);}public static void main(String[] args) {String dateStr = "2019-06-29 13:11:27";System.out.println("date = " + getCountDate(dateStr, DATE_SHORT));}
}

(2)SourceSpout ,spout类,1个线程,用于产生源数据。日期要和运行程序为同一个日期,否则最后的出来的结果是0.

package user_visit;import java.util.Map;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;/*** * @description 生成数据* @author whiteshark* @time 2019年6月29日 上午9:45:40* @version*/
public class SourceSpout implements IRichSpout{private static final long serialVersionUID = 1L;SpoutOutputCollector collector;Queue<String> queue = new ConcurrentLinkedQueue<String>();String str = null;@Overridepublic void open(Map conf, TopologyContext context,SpoutOutputCollector collector) {try {System.out.println("SourceSpout prepare (taskId = " + context.getThisTaskId() + ") - 1");this.collector = collector;Random random = new Random();// 登录的网站是taobaoString hosts = "www.taobao.com";//每次登录的session idString[] sessionId = {"5GFBAT3D3100A7A7255027A70", "5X16BCA8823AC4BD9CD196A5D", "5CFBA5BD76BACF436ACA9DCC8", "5D16C3E0209C16DEAA28L1824","5I16CB309251CCF6CE6223BA1", "5C16BC4MB91B85661FE22F413","5D16C1F5191CF9371Y32B58CF", "5D16C7A886E2P2AE3EA29FC3E","5C3FBA728FD7D264B80769B23", "5B16C0F7215109AG43528BA2D","5N16C2FE51E5619C2A1244215", "5D16C1EB1C7A751AE03201C3F"};//登录的时间String[] times = {"2019-08-04 08:01:36", "2019-08-04 08:11:37", "2019-08-04 08:31:38", "2019-08-04 09:23:07", "2019-08-04 10:51:27", "2019-08-04 10:51:56","2019-08-04 11:01:07", "2019-08-04 11:01:20", "2019-08-04 11:45:30","2019-08-04 12:31:49", "2019-08-04 12:41:51", "2019-08-04 12:51:37", "2019-08-04 13:11:27", "2019-08-04 13:20:40", "2019-08-04 13:31:38",};for (int i = 0; i < 100; i++) {queue.add(hosts + "\t" + sessionId[random.nextInt(12)] + "\t" + times[random.nextInt(15)]);}System.out.println("SourceSpout prepare (taskId = " + context.getThisTaskId() + ") - 2");} catch (Exception e) {e.printStackTrace();}}@Overridepublic void close() {// TODO Auto-generated method stub}@Overridepublic void activate() {// TODO Auto-generated method stub}@Overridepublic void deactivate() {// TODO Auto-generated method stub}@Overridepublic void nextTuple() {// System.out.println("SourceSpout nextTuple");if (queue.size() >= 0) {this.collector.emit(new Values(queue.poll()));}try {Thread.sleep(200);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}@Overridepublic void ack(Object msgId) {// TODO Auto-generated method stub}@Overridepublic void fail(Object msgId) {// TODO Auto-generated method stub}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("threadId"));}@Overridepublic Map<String, Object> getComponentConfiguration() {// TODO Auto-generated method stubreturn null;}}

(3)FmtLogBolt,bolt类,4个线程

package user_visit;import java.util.Map;import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;public class FmtLogBolt implements IBasicBolt{/*** */private static final long serialVersionUID = 1L;private String eachLog = null;@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("date", "sessionId"));}@Overridepublic Map<String, Object> getComponentConfiguration() {// TODO Auto-generated method stubreturn null;}@Overridepublic void prepare(Map stormConf, TopologyContext context) {// TODO Auto-generated method stub}@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {eachLog = input.getString(0);if (null != eachLog && eachLog.length() > 0) {// 分别发送日期,sessionIdcollector.emit(new Values(DateFmt.getCountDate(eachLog.split("\t")[2], DateFmt.DATE_SHORT), eachLog.split("\t")[1]));}}@Overridepublic void cleanup() {// TODO Auto-generated method stub}
}

(4)DeepVisitBolt,bolt类,4个线程

package user_visit;import java.util.HashMap;
import java.util.Map;import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;/*** 局部汇总* @author silva**/
public class DeepVisitBolt implements IBasicBolt{/*** */private static final long serialVersionUID = 1L;private Map<String, Integer> counts = new HashMap<>();@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("date_sessionId", "count"));}@Overridepublic Map<String, Object> getComponentConfiguration() {// TODO Auto-generated method stubreturn null;}@Overridepublic void prepare(Map stormConf, TopologyContext context) {// TODO Auto-generated method stub}@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {String dateStr = input.getString(0);String sessionId = input.getString(1);Integer count = counts.get(dateStr + "_" + sessionId);if (null == count) {count = 0;}count++;counts.put(dateStr + "_" + sessionId, count);// 发送到下一级,做全局汇总collector.emit(new Values(dateStr + "_" + sessionId, count));}@Overridepublic void cleanup() {// TODO Auto-generated method stub}}

(5)UVSumBolt,bolt类,1个线程

package user_visit;import java.util.HashMap;
import java.util.Map;import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;/** 全局汇总*/
public class UVSumBolt implements IBasicBolt{private static final long serialVersionUID = 1L;private Map<String, Integer> counts = new HashMap<String, Integer>();private String curDate = null;private long beginTime;@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {try {long PV = 0;        // 总数long UV = 0;      // 去重后的个数long endTime = System.currentTimeMillis();String dateSessionId = input.getString(0);Integer count = input.getInteger(1);// 如果不是以当前日期开头,并且比当前日期大,说明已经跨天到第二天// 需要把map清空,curDate 设置为新的日期if (!dateSessionId.startsWith(curDate) && DateFmt.parseDate(dateSessionId.split("_")[0]).after(DateFmt.parseDate(curDate))) {curDate = dateSessionId.split("_")[0];counts.clear();}counts.put(dateSessionId, count);if (endTime - beginTime > 1 * 1000) {for (Map.Entry<String, Integer> map : counts.entrySet()) {if (map.getKey().startsWith(curDate)) {        // 只统计今天的数据,过滤非今天的数据UV++;                    // 用户总数+1PV += map.getValue();    // 浏览数累加}}System.out.println("UV = " + UV + ", PV = " + PV);}} catch (Exception e) {e.printStackTrace();}}@Overridepublic void cleanup() {// TODO Auto-generated method stub}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// TODO Auto-generated method stub}@Overridepublic Map<String, Object> getComponentConfiguration() {// TODO Auto-generated method stubreturn null;}@Overridepublic void prepare(Map stormConf, TopologyContext context) {// 预处理,设置为当天时间,格式为 yyyy-MM-ddcurDate = DateFmt.getCountDate(null, DateFmt.DATE_SHORT);beginTime = System.currentTimeMillis();}
}

(6)UVTopo,topology类,定义spout,bolt类。

package user_visit;import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;/*** 程序的启动类* @description * @author whiteshark* @date 2019年8月2日 下午1:03:02*/
public class UVTopo {public static void main(String[] args) {// 1.创建topology, 拓扑对象TopologyBuilder topoBuilder = new TopologyBuilder();// 2. 设置spout,bolt// setSpout方法的3个参数分别为,// spout的id(string类型),实例,并发数。大量数据场景并行数设置大一些topoBuilder.setSpout("spout", new SourceSpout(), 1);// shuffle Grouping 分组,并发数为4topoBuilder.setBolt("fmtLogBolt", new FmtLogBolt(), 4).shuffleGrouping("spout");// fields Grouping 分组,并发数为4。根据 date + sessionId 分组,相同 date + sessionId 的被放到同一个bolt线程处理topoBuilder.setBolt("deepVisitBolt", new DeepVisitBolt(), 4).fieldsGrouping("fmtLogBolt", new Fields("date", "sessionId"));// shuffle Grouping 分组,并发数为1topoBuilder.setBolt("sumBolt", new UVSumBolt(), 1).shuffleGrouping("deepVisitBolt");// 3. 设置works个数Config conf = new Config();conf.put(Config.TOPOLOGY_WORKERS, 4);if (args.length > 0) {try {// 4. 分布式提交// 3个参数分别为:拓扑名称,stormconfig配置,拓扑实例StormSubmitter.submitTopology(args[0], conf, topoBuilder.createTopology());} catch (AlreadyAliveException e) {e.printStackTrace();} catch (InvalidTopologyException e) {e.printStackTrace();}} else {// 5. 本地模式提交LocalCluster localCluster = new LocalCluster();localCluster.submitTopology("mytopology", conf, topoBuilder.createTopology());}}
}

运行程序打印的结果。用户的浏览数为100次,有12个不同的用户访问。与spout发送的数据一致。

.
.
.
UV = 12, PV = 97
UV = 12, PV = 98
UV = 12, PV = 99
UV = 12, PV = 100

【Storm】Spout的storm-starter及Grouping策略、并发度讲解、网站浏览量和用户数统计相关推荐

  1. 用这4种策略提高你的Facebook广告浏览量

    明智的是,你已经开始涉足视频营销.无论你是用视频来制作教育内容还是以产品为中心的广告,你都做了正确的决定.毕竟,现在是2020年:任何缺少视频的数字营销策略都是不完整的. 现在你已经进入了游戏中,是时 ...

  2. Storm部分:Storm Grouping -- 数据流分组(各种数据分发策略的练习)【Java版纯代码】

    1.源数据 www.taobao.com XXYH6YCGFJYERTT834R52FDXV9U34 2017-02-21 12:40:49 www.taobao.com XXYH6YCGFJYERT ...

  3. 大数据技术之_17_Storm学习_Storm 概述+Storm 基础知识+Storm 集群搭建+Storm 常用 API+Storm 分组策略和并发度

    大数据技术之_17_Storm学习 一 Storm 概述 1.1 离线计算是什么? 1.2 流式计算是什么? 1.3 Storm 是什么? 1.4 Storm 与 Hadoop 的区别 1.5 Sto ...

  4. storm spout mysql_storm+mysql集成

    使用storm自带的JdbcInsertBolt插入数据 maven xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http:/ ...

  5. Storm 01之 Storm基本概念及第一个demo

    2.1 Storm基本概念 在运行一个Storm任务之前,需要了解一些概念: Topologies :[tə'pɑ:lədʒɪ]拓扑结构 Streams Spouts:[spaʊt]喷出; 喷射; 滔 ...

  6. storm apache java_Apache Storm 示例 Java 拓扑 - Azure HDInsight | Microsoft Docs

    您现在访问的是微软AZURE全球版技术文档网站,若需要访问由世纪互联运营的MICROSOFT AZURE中国区技术文档网站,请访问 https://docs.azure.cn. 以 Java 语言创建 ...

  7. 【Storm】Storm简介及Storm集群的安装部署

    1.Storm概述 (1)Storm简介 Storm最早是由BackType公司开发的实时处理系统,底层由Clojure实现.Clojure也是一门基于JVM的高级面向函数式的编程语言.2011年Tw ...

  8. BigData之Storm:Apache Storm的简介、深入理解、下载、案例应用之详细攻略

    BigData之Storm:Apache Storm的简介.深入理解.下载.案例应用之详细攻略 目录 Apache Storm的简介 Apache Storm的深入理解 1.Storm与hadoop ...

  9. 用实例的方式去理解storm的并发度

    什么是storm的并发度 一个topology(拓扑)在storm集群上最总是以executor和task的形式运行在suppervisor管理的worker节点上.而worker进程都是运行在jvm ...

最新文章

  1. Solidworks2017安装与破解
  2. c语言1到20找最大和最小相邻,一组数据里面怎样查找相邻和相同的整数算法设计解决方案...
  3. PMCAFF问答精选 | 滴滴跟UBER最主要的区别是什么?
  4. 幼儿园调查过程怎么写_深圳全托幼儿园哪个好 幼儿园寄宿怎么报名
  5. 学好英语网首页制作_没有美术基础的新手小白,如何学好淘宝美工?
  6. java集合是wftc_java的集合是什么?
  7. mybatis update 返回值
  8. GIS_gdal geotiff文件与C# 数组array之间的转换
  9. linux 程序发包,软件测试常用linux发包命令
  10. 初学Linux的简单命令(一)
  11. 网络规划设计师教程第二版目录
  12. matlab 画图 方程,matlab 画图与解方程
  13. 用html写游戏,Html5写一个简单的俄罗斯方块小游戏
  14. Problem A. Snapper Chain 问题A.按扣链条 解决办法
  15. Windows11设置共享打印机
  16. 使用 Ceph 作为 OpenStack 的统一存储解决方案
  17. Vue项目的打包\部署\优化
  18. mysql日期时间类型
  19. Java中正则表达式的使用
  20. chatgpt赋能Python-python_hanning

热门文章

  1. 服务器怎么u盘装麒麟系统,U盘安装优麒麟(ubuntukylin)系统方法
  2. 2018nit计算机应用基础分数分布,2018考研数学各科目分值比例及题型分布
  3. Autox.js与vscode的连接,并实现自动打卡功能(ccit)
  4. 赘婿背后的IP变革,“爽”文正在分化网文变现的方式
  5. 计算机课教学档案,档案专业计算机课改革项目 计算机课程教学大纲.doc
  6. 发那科机器人程序备份及导入
  7. C#连接云服务器MySql数据库
  8. 命令行 控制 易微联 wifi通断器
  9. EC20 不常用AT 命令 整理
  10. EC2創建新用戶用秘钥登录