storm 实战及实例讲解(三)
                                     ——comaple.zhang

                                                                                              ——2012-09-13
本讲将接着上一讲,把一个完成的topology完成。上一节主要介绍了一个基本的topology的构造过程,以及每一步所对应的storm集群中分配的资源情况。要想开发storm应用必须对上一讲我提到的那些概念有完全的了解,否则开发出来的应用很有可能有这样那样的问题而无法工作。那么接下来我们来一起定义一个spot节点和bolt节点。

spot节点:在实际的开发中这个节点可以起到和外界沟通的作用,他可以从一个数据库中按照某种规则取数据,也可以从分布式队列中取任务(该方式我会在后续章节中谈到)。这里我们将开发一个简单的模拟数据喷发的节点。具体方式见代码:

package com.jd.comaple.storm.test.spout;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import java.util.Map;
import java.util.Random;

/**
 * Created by IntelliJ IDEA.
 * User: comaple.zhang
 * Date: 12-8-28
 * Time: 下午2:11
 * To change this template use File | Settings | File Templates.
 */
public class SimpleSpout extends BaseRichSpout {

/**
     * 用来发射数据的工具类
     */
    private SpoutOutputCollector collector;
    private static String[] info = new String[]{
            "comaple\t,12424,44w46,654,12424,44w46,654,",
            "lisi\t,435435,6537,12424,44w46,654,",
            "lipeng\t,45735,6757,12424,44w46,654,",
            "hujintao\t,45735,6757,12424,44w46,654,",
            "jiangmin\t,23545,6457,2455,7576,qr44453",
            "beijing\t,435435,6537,12424,44w46,654,",
            "xiaoming\t,46654,8579,w3675,85877,077998,",
            "xiaozhang\t,9789,788,97978,656,345235,09889,",
            "ceo\t,46654,8579,w3675,85877,077998,",
            "cto\t,46654,8579,w3675,85877,077998,",
            "zhansan\t,46654,8579,w3675,85877,077998,"};

Random rd = new Random();

/**
     * 这里初始化collector
     * @param conf
     * @param context
     * @param collector
     */
    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;

}

/**
     * 该方法会在SpoutTracker类中被调用每调用一次就可以向storm集群中发射一条数据(一个tuple元组)
     * 该方法会被不停的调用
     */
    @Override
    public void nextTuple() {
        try {
            String msg = info[rd.nextInt(10)];
            //调用发射方法
            collector.emit(new Values(msg));
            //模拟等待100ms
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

/**
     * 这里定义字段id,该id在简单模式下没有用处,但在按照字段分组的模式下有很大的用处。
     * 该declarer变量有很大作用,我们还可以调用  declarer.declareStream();  来定义stramId,该id可以用来定义
     * 更加复杂的流拓扑结构
     * @param declarer
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("source"));

}
}

bolt节点: 处理节点,该节点接收喷发节点发送的数据进行简单的处理后,发射出去。
package com.jd.comaple.storm.test.bolt;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

/**
 * Created by IntelliJ IDEA.
 * User: comaple.zhang
 * Date: 12-8-28
 * Time: 下午2:11
 * To change this template use File | Settings | File Templates.
 */
public class SimpleBolt extends BaseBasicBolt {

@Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields( "info"));
    }

@Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        try {
            String mesg = input.getString(0);
            if (mesg != null)
                collector.emit(new Values( mesg+"mesg is processed!"));
        } catch (Exception e) {
            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
        }

}
}

storm 实战及实例讲解(三)相关推荐

  1. storm 实战及实例讲解(二)

    storm 实战及实例讲解(二)   --comaple.zhang      前面已近介绍了storm集群的搭建,和使用场景,那么现在让我们一起来探讨一下storm具体该怎么使用吧. 首先,我们要明 ...

  2. storm 实战及实例讲解(一)

    storm 实战及实例讲解一 --应用场景分析,drpc服务器配置 --by comaple  2012-08-27 先给大家打打气,看看效果.这是taobao对外公布的storm使用情况,请大家欣赏 ...

  3. Adroid游戏开发实例讲解(三)-小蝌蚪找妈妈附源码

    Adroid游戏开发实例讲解(三)-小蝌蚪找妈妈附源码 程序之美 从小就听着小蝌蚪找妈妈的故事长大,我相信小伙伴们一定都不陌生,因为小学课本中,我们早早的就学过了小蝌蚪找妈妈这篇文章,它既是一篇文章, ...

  4. 用Quartus II Timequest Timing Analyzer进行时序分析 :实例讲解 (三)

    上面已经把DAC7512控制器中所有的时钟都创建好了.下面我们再额外讨论一下关于时钟属性方面的一些问题和在做时序分析时的处理方法. 对于具有单一时钟的系统,设计和时序分析都相对简单.但是现在很多设计都 ...

  5. python判断密码是否正确三次机会_python密码错误三次锁定(实例讲解)

    程序需求: 输入用户名,密码 认证成功显示欢迎信息 输入错误三次后锁定用户 流程图: 好像画的不咋地 查看代码: #!/usr/bin/env python # _*_ coding:utf-8 _* ...

  6. java判断三位数的范围代码_java判断三位数的实例讲解

    java判断三位数的实例讲解 java怎么判断三位数 先定义个测试数字,如图 然后可以把数字转换成字符串来判断它的长度是否为3,如图 获取判断数字范围是否在100到1000之间的值,如图 在或者判断数 ...

  7. JAVA中几种循环结构的表示_本文通过实例讲解给大家介绍Java中for、while、do while三种循环语句的区别,具体详情如下所示:第一种:for循环 循环结构for语句的格式...

    本文通过实例讲解给大家介绍Java中for.while.do while三种循环语句的区别,具体详情如下所示: 第一种:for循环 循环结构for语句的格式: for(初始化表达式;条件表达式;循环后 ...

  8. Android实战简易教程-第三十九枪(第三方短信验证平台Mob和验证码自动填入功能结合实例)

    用户注册或者找回密码时一般会用到短信验证功能,这里我们使用第三方的短信平台进行验证实例. 我们用到第三方短信验证平台是Mob,地址为:http://mob.com/ 一.注册用户.获取SDK 大家可以 ...

  9. QTP对Excel的操作(三)之 Vbs对Excel的操作 实例讲解

    QTP对Excel的操作(一)与(二)中,分别讲解了对Excel的读与写操作,本讲我们以实例讲解,QTP执行测试过程中如何通过对Excel读写实现执行测试用例. 本例通过注册新浪会员页面(http:/ ...

最新文章

  1. ARTS打卡计划第三周-Tips
  2. python如何判断字符串是否包含某些汉字_Python如何判断一个字符串是否包含指定子字符串...
  3. SAP Hybris Commerce,CRM和C4C的登录语言选择
  4. 光伏电站清扫机器人_轻型光伏电站清扫机器人的制作方法
  5. C++:17---函数指针
  6. 2017.8.26 力 思考记录
  7. springcloud服务网关-gateway
  8. commit git 删除文件夹_从Git提交中删除文件
  9. 游戏开发之函数的重载(C++基础)
  10. 1.携程架构实践 --- 携程整体技术架构
  11. 太原理工java实验报告_太原理工大学-JAVA实验报告.doc
  12. LabVIEW_百度百科
  13. 【用户端】家庭医生高保真Axure原型模板
  14. 蔡学镛 java,Java该何去何从思考
  15. Hbase官方文档中文版
  16. 如何在BaseFragment中直接调用BaseActivity中相关属性代码的问题
  17. AMD:无限你我的无限
  18. 钉钉、企业微信平台发送工资条程序--燕春科技工资条管理系统2.0
  19. C语言中的strstr函数
  20. 考试周刊杂志考试周刊杂志社考试周刊编辑部2022年第39期目录

热门文章

  1. 分布式锁的实现- zookeeper
  2. jdk安装教程及java环境配置(Win11)
  3. 无人船也要来了!“陆海空”终将全部进入自驾模式
  4. 真香,理解记忆法学习Python基础语法
  5. 探寻AI未来式,AI Studio两周年惊喜活动开启
  6. 如何在eplan里面画一个伺服驱动器_如何设置伺服电机驱动器
  7. 886n虚拟服务器,联普TL-WR886N V4-V5如何设置虚拟服务器
  8. 桥接路由器总是掉线_Win7系统下tl-wr886n无线桥接上网总掉线如何解决
  9. 广义相对论基础【1】狭义相对论中的张量
  10. JLINK在ADS中的调试心得