注:我是用一起写office写的,发到博客上格式就变了,,,变了,,,

1.微批处理可以根据数据的条数或者间隔时间来定。

实时处理有两种方式。

一是持续流处理,

二是微批处理。

2数据纪录处理情况

一是至少一次,  存在重复处理

二是有且仅有一次 严格

三是至多一次    存在漏出里

3.配置

stom.yaml(python格式,注意文件的配置)

# Licensed to the Apache Software Foundation (ASF) under one

# or more contributor license agreements.  See the NOTICE file

# distributed with this work for additional information

# regarding copyright ownership.  The ASF licenses this file

# to you under the Apache License, Version 2.0 (the

# "License"); you may not use this file except in compliance

# with the License.  You may obtain a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS,

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

# See the License for the specific language governing permissions and

# limitations under the License.

########### These MUST be filled in for a storm configuration

storm.zookeeper.servers:

- "hadoop02-linux.alibaba.com"

#     - "server2"

#

nimbus.host: "hadoop02-linux.alibaba.com"

#

#

storm.local.dir: "/opt/modules/apache-storm-0.9.6/worspace"

# ##### These may optionally be filled in:

#

## List of custom serializations

# topology.kryo.register:

#     - org.mycompany.MyType

#     - org.mycompany.MyType2: org.mycompany.MyType2Serializer

#

## List of custom kryo decorators

# topology.kryo.decorators:

#     - org.mycompany.MyDecorator

#

## Locations of the drpc servers

# drpc.servers:

#     - "server1"

#     - "server2"

supervisor.slots.ports:

- 6700

- 6701

- 6702

- 6703

ui.port: 8082

## Metrics Consumers

# topology.metrics.consumer.register:

#   - class: "backtype.storm.metric.LoggingMetricsConsumer"

#     parallelism.hint: 1

#   - class: "org.mycompany.MyMetricsConsumer"

#     parallelism.hint: 1

#     argument:

#       - endpoint: "metrics-collector.mycompany.org"

4.nimbus

当集群中没有人提交任务,没有supervisor节点挂掉,主节点短时间挂掉是没有问题的,只要配合运维,监控重新启动就可以。

启动,并查看进程

运行官方示例程序

bin/storm jar  examples/storm-starter/storm-starter-topologies-0.9.6.jar storm.starter.WordCountTopology wordcount

注意的是: 停止Storm集群之前,一定要先将运行在Storm集群上的Topology先停掉,否则下次重启Storm集群,Storm会将这些Topology都启动起来

5.停止集群,没有脚本文件,可以自行编写,或者使用kill

跑bolt是线程跑的。

6.编写word topology


SentenceSpout:

package com.ibeifeng.bigdata.storm.topo;

import java.util.Map;

import java.util.Random;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import backtype.storm.spout.SpoutOutputCollector;

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.IRichSpout;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Values;

/**

* spout采集数据源

* @author lm

*

*/

public class SentenceSpout implements IRichSpout{

//抽象类 BaseRichSpout

//BaseRichSpout

private static final Logger log = LoggerFactory.getLogger(SentenceSpout.class);

private SpoutOutputCollector collector;

private static final String[] Sentence =

{"hadoop yarn mapreduce",

"flume error liangman yunkjhhj",

"abc dec abc abd fff",

"dhjfh jkdshg hjdkfgh"};

@Override

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {

this.collector = collector;

}

@Override

public void close() {

// TODO Auto-generated method stub

}

@Override

public void activate() {

// 和页面激活有关,一般不写

}

@Override

public void deactivate() {

//  和页面上的失效有关,一般不写

}

@Override

public void nextTuple() {

//random get the data

String tmp = Sentence[ new Random().nextInt(Sentence.length)];

if(tmp.contains("error")){

log.error("数据错误");

}else{

//告诉发送的value是什么,但是没告诉key是什么,declareOutputFields会告诉key的名称

//key and value 是一一对应的

this.collector.emit(new Values(tmp));

}

}

@Override

public void ack(Object msgId) {

// TODO Auto-generated method stub

}

@Override

public void fail(Object msgId) {

// TODO Auto-generated method stub

}

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("seq"));

}

@Override

public Map<String, Object> getComponentConfiguration() {

//针对本组件单独的配置

return null;

}

}


SplitBolt

package com.ibeifeng.bigdata.storm.topo;

import java.util.Map;

import org.apache.commons.lang.StringUtils;

import backtype.storm.task.OutputCollector;

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.IRichBolt;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;

/**

* 开发一个bolt

* @author lm

*

*/

public class SplitBolt implements IRichBolt{

private OutputCollector collector;

@Override

public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

this.collector = collector;

}

/**

*接收到前面的一条tuple就调用一次

*

*/

@Override

public void execute(Tuple input) {

//

String sentence = input.getStringByField("seq");

if(StringUtils.isNotBlank(sentence)){

String[] words = sentence.split(" ");

for(String word:words){

this.collector.emit(new Values(word));

}

}

}

@Override

public void cleanup() {

// TODO Auto-generated method stub

}

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("word"));

}

@Override

public Map<String, Object> getComponentConfiguration() {

// TODO Auto-generated method stub

return null;

}

}


CountBolt

package com.ibeifeng.bigdata.storm.topo;

import java.util.HashMap;

import java.util.Map;

import backtype.storm.task.OutputCollector;

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseRichBolt;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;

/**

* 计数bolt

* @author lm

*

*/

public class CountBolt  extends BaseRichBolt{

private  OutputCollector collector;

private Map<String ,Integer> map;

@Override

public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

// TODO Auto-generated method stub

this.collector=collector;

map = new HashMap<String,Integer>();

}

@Override

public void execute(Tuple input) {

String word = input.getStringByField("word");

int count = 1;

if(map.containsKey(word)){

count = map.get(word) + 1;

}

this.collector.emit(new Values(word,count));

}

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("word","count"));

}

}


PrintBolt

package com.ibeifeng.bigdata.storm.topo;

import java.util.Map;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import backtype.storm.task.OutputCollector;

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseRichBolt;

import backtype.storm.tuple.Tuple;

public class PrintBolt extends BaseRichBolt{

private static final Logger logger = LoggerFactory.getLogger(PrintBolt.class);

@Override

public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

}

@Override

public void execute(Tuple input) {

String word = input.getStringByField("word");

Integer value = input.getIntegerByField("count");

logger.info("单次"+word+"数量"+value); //单词 数量

}

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

// TODO Auto-generated method stub

}

}


主程序:

package com.ibeifeng.bigdata.storm.topo;

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.StormSubmitter;

import backtype.storm.generated.AlreadyAliveException;

import backtype.storm.generated.InvalidTopologyException;

import backtype.storm.topology.TopologyBuilder;

import backtype.storm.tuple.Fields;

/**

* 一个实时计数单词的例子

* @author lm

*

*/

public class WordCountToplogy {

public static final String SPOUT_ID ="sentenceSpout";

public static final String SPILT_ID ="spiltBolt";

public static final String COUNT_ID ="countBolt";

public static final String PRINT_ID ="printBolt";

public static void main(String[] args){

//点和边的连接

//边就是数据流分组的问题

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout(SPOUT_ID, new SentenceSpout());

builder.setBolt(SPILT_ID, new SplitBolt()).shuffleGrouping(SPOUT_ID);

builder.setBolt(COUNT_ID, new CountBolt()).fieldsGrouping(SPILT_ID,new Fields("word"));

builder.setBolt(PRINT_ID, new PrintBolt()).globalGrouping(COUNT_ID);

Config conf = new Config();

if(args == null || args.length == 0){

//本地执行

LocalCluster lc = new LocalCluster();

lc.submitTopology("wordcount", conf, builder.createTopology());

}else{

//指定多少个进程执行

conf.setNumWorkers(1);

try {

StormSubmitter.submitTopology(args[0], conf,  builder.createTopology());

} catch (AlreadyAliveException | InvalidTopologyException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

}

}

打包:在集群上运行

工作节点:

打印节点的输出信息:

stom实时单词统计相关推荐

  1. Spark Streaming从Kafka中获取数据,并进行实时单词统计,统计URL出现的次数

    1.创建Maven项目 创建的过程参考:http://blog.csdn.net/tototuzuoquan/article/details/74571374 2.启动Kafka A:安装kafka集 ...

  2. 大数据阶段划分及案例单词统计

    大数据阶段的重要课程划分 离线分析 : hadoop生态圈 HDFS, MapReduce(概念偏多), hive(底层是MapReduce), 离线业务分析80%都是使用hive实时分析 : spa ...

  3. 【Spark分布式内存计算框架——Spark Streaming】11. 应用案例:百度搜索风云榜(下)实时窗口统计

    5.5 实时窗口统计 SparkStreaming中提供一些列窗口函数,方便对窗口数据进行分析,文档: http://spark.apache.org/docs/2.4.5/streaming-pro ...

  4. 从单词统计问题看面试

    本文的很多内容来自网络.如有错误,欢迎指出. 问题描写叙述 首先这里对单词的界定是:以空白切割的字符序列. 单词统计的问题能够描写叙述为:在一篇正常格式的英文文档中(作为面试.这里并没有提及中文分词和 ...

  5. C语言怎么实现单词下落,如何用c语言实现单词统计

    如何用c语言实现单词统计 输入一串字符串,输出其中有多少个单词. 代码如下:#include #include #define SIZE 20 int main(){ char str[SIZE]={ ...

  6. SQL Server 2016:实时查询统计

    一个数据库查询超时了,而你并不知道原因.估计查询计划可以揭示问题所在,因此,你彻底地消除了超时.但一个小时后,查询还在运行,而你无法获得真正的执行计划.要是有一种方法可以找出服务器内部实际正在发生的事 ...

  7. spark 集群单词统计_最近Kafka这么火,聊一聊Kafka:Kafka与Spark的集成

    Spark 编程模型 在Spark 中, 我们通过对分布式数据集的操作来表达计算意图 ,这些计算会自动在集群上 井行执行 这样的数据集被称为弹性分布式数据集 Resilient Distributed ...

  8. 启动Spark Shell,在Spark Shell中编写WordCount程序,在IDEA中编写WordCount的Maven程序,spark-submit使用spark的jar来做单词统计

    1.启动Spark Shell spark-shell是Spark自带的交互式Shell程序,方便用户进行交互式编程,用户可以在该命令行下用scala编写spark程序.要注意的是要启动Spark-S ...

  9. 指标统计:基于流计算 Oceanus(Flink) 实现实时 UVPV 统计

    作者:吴云涛,腾讯 CSIG 高级工程师 导语 | 最近梳理了一下如何用 Flink 来实现实时的 UV.PV 指标的统计,并和公司内微视部门的同事交流.然后针对该场景做了简化,并发现使用 Flink ...

最新文章

  1. Docker、kubernetes、微服务、SpringBoot/Cloud...好乱!到底要不要学?
  2. python写出的程序如何给别人使用-如何用PYTHON代码写出音乐
  3. 如何在 Flutter 中禁用默认的 Widget 飞溅效果
  4. south eastern china at a scope of 100km
  5. 简述dijkstra算法原理_Dijkstra算法之 Java详解
  6. python 菜品识别_利用百度智能云结合Python体验图像识别(来自qylruirui)
  7. C语言实训作业PPT,C语言实训作业.docx
  8. VMware ESXi 6.7注入第三方RAID驱动
  9. 「白帽黑客成长记」Windows提权基本原理(上)
  10. 树莓派4B平台部署 kubeedge (cloudcore)+ isula
  11. cad化工设备绘图_auto cad在化工设备制图中的应用 ——致初学cad绘图者.ppt
  12. 纳什均衡/双人纯策略
  13. vue 实现打字机效果
  14. 【安全】被黑客要挟的一天,All your data is a backed up. You must pay 0.25BTC
  15. 浪潮刀片服务器型号,浪潮刀片服务器 NF600 Center
  16. 套接字I/O模型-WSAEventSelect(转载)
  17. 粤嵌实验板 linux 环境,粤嵌实习报告
  18. 迭代法求解非线性方程组(含python代码)
  19. GoSURF、MyIe2、小树浏览肉搏篇(转)
  20. 软考系统分析师-湖南省历年通过人数

热门文章

  1. python可以干什么
  2. keystore格式与pfx格式证书互转
  3. MES管理系统中的批次管理,贯穿了生产制造的整个流程
  4. 易经八卦原理图谱和记忆方法总结
  5. 在word上写博客直接发到CSDN ——失败哈哈
  6. [Swift]LeetCode1135. 最低成本联通所有城市 | Connecting Cities With Minimum Cost
  7. 2014 ACM亚洲区域赛 - 北京现场赛
  8. 分析响应时间ns级别的TVS管个中奥秘
  9. ThinkPHP5.0商城项目目录搭建(菜鸟)
  10. 理解伯德图-2/4什么是伯德图