代码结构如下:
pom文件内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.toto.storm.kafkastormredis</groupId><artifactId>kafkastormredis</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.storm</groupId><artifactId>storm-core</artifactId><!--<scope>provided</scope>--><version>1.1.0</version></dependency><dependency><groupId>org.apache.storm</groupId><artifactId>storm-kafka</artifactId><version>1.1.0</version></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.7.3</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.8.2</artifactId><version>0.8.1</version><exclusions><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion></exclusions></dependency></dependencies><build><plugins><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><!--告诉运行的主类是哪个,注意根据自己的情况,下面的包名做相应的修改--><mainClass>cn.toto.strom.wordcount.StormTopologyDriver</mainClass></manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.7</source><target>1.7</target></configuration></plugin></plugins></build>
</project>

AckSpout如下:

package cn.toto.storm.ack;import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;import java.util.List;
import java.util.Map;
import java.util.UUID;/*** 代码说明** @author tuzq* @create 2017-06-21 14:27*/
public class AckSpout extends BaseRichSpout {private SpoutOutputCollector collector;@Overridepublic void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {this.collector = collector;}//每次调用一次就发送一条消息@Overridepublic void nextTuple() {//生产一条数据String uuid = UUID.randomUUID().toString().replace("_","");collector.emit(new Values(uuid),new Values(uuid));try{Thread.sleep(5 * 10000);} catch(Exception e) {e.printStackTrace();}}//的定义发送的字段@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("uuid"));}@Overridepublic void ack(Object msgId) {System.out.println("xiaoxi:" + msgId);}@Overridepublic void fail(Object msgId) {System.out.println("xiaoxi" + msgId);collector.emit((List)msgId,msgId);}
}

Bolt1的代码如下:

package cn.toto.storm.ack;/*** Created by toto on 2017/6/21.*/import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;import java.util.Map;/*** 代码说明** @author tuzq* @create 2017-06-21 14:56*/
public class Bolt1 extends BaseRichBolt {private OutputCollector collector;//初始化方法,只调用一次@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.collector = collector;}//被循环调用@Overridepublic void execute(Tuple input) {collector.emit(input,new Values(input.getString(0)));System.out.println("bolt1的execute方法被调用一次" + input.getString(0));collector.ack(input);}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("uuid"));}
}

Bolt2的代码如下:

package cn.toto.storm.ack;/*** Created by toto on 2017/6/21.*/import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;import java.util.Map;/*** 代码说明** @author tuzq* @create 2017-06-21 15:01*/
public class Bolt2 extends BaseRichBolt {private OutputCollector collector;//初始化方法,只调动一次@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.collector = collector;}//被循环调用@Overridepublic void execute(Tuple input) {collector.emit(input,new Values(input.getString(0)));System.out.println("bolt2的execute方法被调用一次" + input.getString(0));collector.ack(input);}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("uuid"));}
}

Bolt3的代码如下:

package cn.toto.storm.ack;/*** Created by toto on 2017/6/21.*/import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;import java.util.Map;/*** 代码说明** @author tuzq* @create 2017-06-21 15:04*/
public class Bolt3 extends BaseRichBolt {private OutputCollector collector;//初始化方法,只调用一次@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.collector = collector;}//被循环调用@Overridepublic void execute(Tuple input) {collector.emit(input,new Values(input.getString(0)));System.out.println("bolt3的execute方法被调用一次" + input.getString(0));collector.fail(input);}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("uuid"));}
}

Bolt4的代码如下:

package cn.toto.storm.ack;/*** Created by toto on 2017/6/21.*/import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;import java.util.Map;/*** 代码说明** @author tuzq* @create 2017-06-21 15:06*/
public class Bolt4 extends BaseRichBolt {private OutputCollector collector;//初始化方法,只调用一次@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.collector = collector;}//被循环调用@Overridepublic void execute(Tuple input) {collector.emit(input,new Values(input.getString(0)));System.out.println("bolt4的execute方法调用一次" + input.getString(0));collector.ack(input);}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("uuid"));}
}

右键运行项目


案例2
AckSpout代码如下:

package cn.toto.storm.basebasicbolt;/*** Created by toto on 2017/6/21.*/import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;import java.util.List;
import java.util.Map;
import java.util.UUID;/*** 代码说明** @author tuzq* @create 2017-06-21 15:23*/
public class AckSpout  extends BaseRichSpout {private SpoutOutputCollector collector;//初始化方法public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {this.collector = collector;}//上帝之手,循环调用,每调用过一次就发送一条消息public void nextTuple() {//生产一条数据String uuid = UUID.randomUUID().toString().replace("_", "");collector.emit(new Values(uuid),new Values(uuid));try {Thread.sleep(10*1000);} catch (InterruptedException e) {e.printStackTrace();}}//定义发送的字段public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("uuid"));}@Overridepublic void ack(Object msgId) {System.out.println("消息处理成功" + msgId);}@Overridepublic void fail(Object msgId) {System.out.println("消息处理失败:重发" + msgId);collector.emit((List)msgId,msgId);}
}

Bolt1的配置如下:

package cn.toto.storm.basebasicbolt;/*** Created by toto on 2017/6/21.*/import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;/*** 代码说明** @author tuzq* @create 20`这里写代码片`17-06-21 15:32*/
public class Bolt1 extends BaseBasicBolt {@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {collector.emit(new Values(input.getString(0)));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("uuid"));}
}

Bolt2的配置如下:

package cn.toto.storm.basebasicbolt;/*** Created by toto on 2017/6/21.*/import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;/*** 代码说明** @author tuzq* @create 2017-06-21 15:33*/
public class Bolt2 extends BaseBasicBolt {@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {collector.emit(new Values(input.getString(0)));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("uuid"));}
}

Bolt3的代码配置如下:

package cn.toto.storm.basebasicbolt;/*** Created by toto on 2017/6/21.*/import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;/*** 代码说明** @author tuzq* @create 2017-06-21 15:34*/
public class Bolt3 extends BaseBasicBolt {@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {collector.emit(new Values(input.getString(0)));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("uuid"));}
}

Bolt4的代码配置如下:

package cn.toto.storm.basebasicbolt;/*** Created by toto on 2017/6/21.*/import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;/*** 代码说明** @author tuzq* @create 2017-06-21 15:35*/
public class Bolt4  extends BaseBasicBolt {@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {collector.emit(new Values(input.getString(0)));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("uuid"));}
}

AckTopologyDriver的代码如下:

package cn.toto.storm.basebasicbolt;/*** Created by toto on 2017/6/21.*/import cn.toto.storm.ack.AckSpout;
import cn.toto.storm.ack.Bolt1;
import cn.toto.storm.ack.Bolt3;
import cn.toto.storm.ack.Bolt4;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.topology.TopologyBuilder;/*** 代码说明** @author tuzq* @create 2017-06-21 15:37*/
public class AckTopologyDriver {public static void main(String[] args) {//1、准备任务信息TopologyBuilder topologyBuilder = new TopologyBuilder();topologyBuilder.setSpout("mySpout", new AckSpout(), 1);topologyBuilder.setBolt("bolt1",new Bolt1(),1).shuffleGrouping("mySpout");topologyBuilder.setBolt("bolt2",new Bolt2(),1).shuffleGrouping("bolt1");topologyBuilder.setBolt("bolt3",new Bolt3(),1).shuffleGrouping("bolt2");topologyBuilder.setBolt("bolt4",new Bolt4(),1).shuffleGrouping("bolt3");Config config = new Config();config.setNumWorkers(2);StormTopology stormTopology = topologyBuilder.createTopology();LocalCluster localCluster = new LocalCluster();localCluster.submitTopology("wordcount",config,stormTopology);}
}

当失败了的时候,抛出:throw FailedException ,然后可以实现类似fail()方法。

ack机制之代码实现,实现BaseRichBolt的方式,使用BaseBasicBolt的方式实现BaseRichBolt发ack和fail的功能相关推荐

  1. ack strom 保证只有一次_Storm容错机制(一):ACK机制

    前言 好久没有写文章了,然后一连就写了三篇, 前两篇文章 Storm入门(一):编程模型 Storm入门(二):架构模型和集群部署 都是一些比较简单的入门教程,这一篇我们来聊一聊稍微高级点的话题, 关 ...

  2. ack是什么,如何使用Ack机制,如何关闭Ack机制,基本实现,STORM的消息容错机制,Ack机制

    1.ack是什么 ack 机制是storm整个技术体系中非常闪亮的一个创新点. 通过Ack机制,spout发送出去的每一条消息,都可以确定是被成功处理或失败处理, 从而可以让开发者采取动作.比如在Me ...

  3. java基础总结(八十七)--Ack机制

    转载的第一篇博客 原文链接 1.ack是什么 ack 机制是storm整个技术体系中非常闪亮的一个创新点. 通过Ack机制,spout发送出去的每一条消息,都可以确定是被成功处理或失败处理, 从而可以 ...

  4. Apache Storm 实时流处理系统ACK机制以及源码分析

    1.ACK机制简介 Storm的可靠性是指Storm会告知用户每一个消息单元是否在一个指定的时间(timeout)内被完全处理.完全处理的意思是该MessageId绑定的源Tuple以及由该源Tupl ...

  5. Kafka的ack机制

    文章目Kafka的ack机制录 前言 一.ACK的三个可选值 1.ACK = 1 (默认) 2.ACK = 0 3.ACK = -1 前言 Kafka的ACK机制,指的是producer的消息发送确认 ...

  6. mysql jstorm_jstorm进阶-ack机制及KafkaSpout

    安装部署使用 ack机制 ack机制原理 这里不讲什么是ack机制,可以参考官网的文档Ack 机制 我们只要知道它是使用异或xor的原理即可: A xor A = 0 A xor B xor B xo ...

  7. 【转】ActiveMQ消息传送机制以及ACK机制详解

    2019独角兽企业重金招聘Python工程师标准>>> 本文转载自 http://shift-alt-ctrl.iteye.com/blog/2020182 AcitveMQ是作为一 ...

  8. 用redis实现消息队列(实时消费+ack机制)【转】

    用redis实现消息队列(实时消费+ack机制) java queue 消息队列 redis 消息队列 首先做简单的引入. MQ主要是用来: 解耦应用. 异步化消息 流量削峰填谷 目前使用的较多的有A ...

  9. RabbitMQ的消息确认ACK机制

    1.什么是消息确认ACK. 答:如果在处理消息的过程中,消费者的服务器在处理消息的时候出现异常,那么可能这条正在处理的消息就没有完成消息消费,数据就会丢失.为了确保数据不会丢失,RabbitMQ支持消 ...

最新文章

  1. 科研指导:机器学习与自然语言处理
  2. Python时间戳转时间
  3. 收到计算机工程与应用的退修通知,《计算机工程与应用》退修意见
  4. BGP——本地始发选路+AS-PATH选路(讲解+命令)
  5. java的线程池的基础类
  6. u盘最大单个文件支持多少g_解决U盘拷贝时提示文件过大问题(不能拷贝超过4个g的文件)...
  7. idea切换工作空间_IntelliJIDEA使用技巧
  8. 为什么阿里,腾讯,百度和京东都是在开曼岛注册的?
  9. mysql 回归分析_统计科学之多元回归分析
  10. 【渝粤题库】广东开放大学 计算机应用基础(专科) 形成性考核
  11. What are Triangulation, Trilateration, and Multilateration?
  12. 拉格朗日乘子法:写得很通俗的文章
  13. Oracle数据库的基础
  14. 打造「可盈利个人品牌」终极指南,8个步骤开始建立你的个人品牌吧!
  15. c语言调用鼠标驱动函数,鼠标驱动程序
  16. 如果我们真的发现了外星人?
  17. UNP读书笔记第一章
  18. MicroBlaze软核处理器简介
  19. 3dMax中如何设置指定渲染器?
  20. c语言双精度小数点后取几位_c语言中怎么保留小数2位

热门文章

  1. Selenium3 + Python3自动化测试系列——多窗口切换
  2. 《高性能JavaScript》第五章 字符串和正则表达式
  3. C++学习笔记2[表达式与语句]
  4. Python学习笔记--数据类型
  5. OpenCASCADE:Mac OS X平台使用Xcode构建OCCT
  6. wxWidgets:内存检查示例
  7. boost::regex模块部分正则匹配相关的测试程序
  8. boost::multiprecision模块将 std::numeric_limits 用作 multiprecision.qbk 上的多精度文档片段的示例
  9. boost::math模块实现对贝塞尔函数的零点求和的测试程序
  10. boost::static_min_max_signed_type用法的测试程序