1.源数据

www.taobao.com   XXYH6YCGFJYERTT834R52FDXV9U34   2017-02-21 12:40:49
www.taobao.com  XXYH6YCGFJYERTT834R52FDXV9U34   2017-02-21 09:40:49
www.taobao.com  XXYH6YCGFJYERTT834R52FDXV9U34   2017-02-21 08:40:51
www.taobao.com  VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 12:40:49
www.taobao.com  BBYH61456FGHHJ7JL89RG5VV9UYU7   2017-02-21 08:40:51
www.taobao.com  VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 08:40:52
www.taobao.com  BBYH61456FGHHJ7JL89RG5VV9UYU7   2017-02-21 12:40:49
www.taobao.com  BBYH61456FGHHJ7JL89RG5VV9UYU7   2017-02-21 09:40:49
www.taobao.com  ABYH6Y4V4SCVXTG6DPB4VH9U123 2017-02-21 08:40:53
www.taobao.com  ABYH6Y4V4SCVXTG6DPB4VH9U123 2017-02-21 09:40:49
www.taobao.com  ABYH6Y4V4SCVXTG6DPB4VH9U123 2017-02-21 10:40:49
www.taobao.com  XXYH6YCGFJYERTT834R52FDXV9U34   2017-02-21 12:40:49
www.taobao.com  ABYH6Y4V4SCVXTG6DPB4VH9U123 2017-02-21 08:40:50
www.taobao.com  CYYH6Y2345GHI899OFG4V9U567  2017-02-21 11:40:49
www.taobao.com  VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com  XXYH6YCGFJYERTT834R52FDXV9U34   2017-02-21 08:40:50
www.taobao.com  XXYH6YCGFJYERTT834R52FDXV9U34   2017-02-21 08:40:53
www.taobao.com  VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 09:40:49
www.taobao.com  BBYH61456FGHHJ7JL89RG5VV9UYU7   2017-02-21 09:40:49
www.taobao.com  BBYH61456FGHHJ7JL89RG5VV9UYU7   2017-02-21 08:40:52
www.taobao.com  XXYH6YCGFJYERTT834R52FDXV9U34   2017-02-21 11:40:49
www.taobao.com  XXYH6YCGFJYERTT834R52FDXV9U34   2017-02-21 08:40:51
www.taobao.com  XXYH6YCGFJYERTT834R52FDXV9U34   2017-02-21 08:40:53
www.taobao.com  BBYH61456FGHHJ7JL89RG5VV9UYU7   2017-02-21 08:40:53
www.taobao.com  VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 08:40:50
www.taobao.com  CYYH6Y2345GHI899OFG4V9U567  2017-02-21 08:40:53
www.taobao.com  BBYH61456FGHHJ7JL89RG5VV9UYU7   2017-02-21 12:40:49
www.taobao.com  XXYH6YCGFJYERTT834R52FDXV9U34   2017-02-21 11:40:49
www.taobao.com  XXYH6YCGFJYERTT834R52FDXV9U34   2017-02-21 08:40:50
www.taobao.com  XXYH6YCGFJYERTT834R52FDXV9U34   2017-02-21 08:40:53
www.taobao.com  BBYH61456FGHHJ7JL89RG5VV9UYU7   2017-02-21 08:40:52
www.taobao.com  CYYH6Y2345GHI899OFG4V9U567  2017-02-21 08:40:51
www.taobao.com  ABYH6Y4V4SCVXTG6DPB4VH9U123 2017-02-21 10:40:49
www.taobao.com  BBYH61456FGHHJ7JL89RG5VV9UYU7   2017-02-21 09:40:49
www.taobao.com  XXYH6YCGFJYERTT834R52FDXV9U34   2017-02-21 08:40:50
www.taobao.com  BBYH61456FGHHJ7JL89RG5VV9UYU7   2017-02-21 08:40:52
www.taobao.com  VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 08:40:50
www.taobao.com  VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 10:40:49
www.taobao.com  XXYH6YCGFJYERTT834R52FDXV9U34   2017-02-21 12:40:49
www.taobao.com  CYYH6Y2345GHI899OFG4V9U567  2017-02-21 10:40:49
www.taobao.com  XXYH6YCGFJYERTT834R52FDXV9U34   2017-02-21 08:40:52
www.taobao.com  BBYH61456FGHHJ7JL89RG5VV9UYU7   2017-02-21 12:40:49
www.taobao.com  VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 08:40:51
www.taobao.com  XXYH6YCGFJYERTT834R52FDXV9U34   2017-02-21 08:40:51
www.taobao.com  BBYH61456FGHHJ7JL89RG5VV9UYU7   2017-02-21 08:40:52
www.taobao.com  BBYH61456FGHHJ7JL89RG5VV9UYU7   2017-02-21 08:40:51
www.taobao.com  XXYH6YCGFJYERTT834R52FDXV9U34   2017-02-21 08:40:53
www.taobao.com  VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 08:40:53
www.taobao.com  CYYH6Y2345GHI899OFG4V9U567  2017-02-21 08:40:50
www.taobao.com  VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com  VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com  VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com  VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com  VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com  VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com  VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com  VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com  VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com  VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com  VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com  VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com  CYYH6Y2345GHI899OFG4V9U567  2017-02-21 08:40:50
www.taobao.com  CYYH6Y2345GHI899OFG4V9U567  2017-02-21 08:40:50
www.taobao.com  BBYH61456FGHHJ7JL89RG5VV9UYU7   2017-02-21 08:40:51
www.taobao.com  BBYH61456FGHHJ7JL89RG5VV9UYU7   2017-02-21 08:40:51
www.taobao.com  BBYH61456FGHHJ7JL89RG5VV9UYU7   2017-02-21 08:40:51
www.taobao.com  BBYH61456FGHHJ7JL89RG5VV9UYU7   2017-02-21 08:40:51
www.taobao.com  BBYH61456FGHHJ7JL89RG5VV9UYU7   2017-02-21 08:40:51
www.taobao.com  BBYH61456FGHHJ7JL89RG5VV9UYU7   2017-02-21 08:40:51
www.taobao.com  BBYH61456FGHHJ7JL89RG5VV9UYU7   2017-02-21 08:40:51
www.taobao.com  VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com  VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com  BBYH61456FGHHJ7JL89RG5VV9UYU7   2017-02-21 08:40:51
www.taobao.com  VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com  VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com  VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com  VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com  VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com  VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com  VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com  VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com  VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com  BBYH61456FGHHJ7JL89RG5VV9UYU7   2017-02-21 08:40:51
www.taobao.com  VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com  VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com  VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49

2.main方法:

package com.sxt.storm.grouping;import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;public class Main {/*** @param args*/public static void main(String[] args) {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("spout", new MySpout(), 1);// shuffleGrouping其实就是随机往下游去发,不自觉的做到了负载均衡
//      builder.setBolt("bolt", new MyBolt(), 2).shuffleGrouping("spout");// fieldsGrouping其实就是MapReduce里面理解的Shuffle,根据fields求hash来取模
//      builder.setBolt("bolt", new MyBolt(), 2).fieldsGrouping("spout", new Fields("session_id"));// 只往一个里面发,往taskId小的那个里面去发送
//      builder.setBolt("bolt", new MyBolt(), 2).globalGrouping("spout");// 等于shuffleGrouping
//      builder.setBolt("bolt", new MyBolt(), 2).noneGrouping("spout");// 广播builder.setBolt("bolt", new MyBolt(), 2).allGrouping("spout");// Map conf = new HashMap();// conf.put(Config.TOPOLOGY_WORKERS, 4);Config conf = new Config();conf.setDebug(false);conf.setMessageTimeoutSecs(30);if (args.length > 0) {try {StormSubmitter.submitTopology(args[0], conf, builder.createTopology());} catch (AlreadyAliveException e) {e.printStackTrace();} catch (InvalidTopologyException e) {e.printStackTrace();}} else {LocalCluster localCluster = new LocalCluster();localCluster.submitTopology("mytopology", conf, builder.createTopology());}}}

3.Spout:

package com.sxt.storm.grouping;import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.util.Map;import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;public class MySpout implements IRichSpout {private static final long serialVersionUID = 1L;FileInputStream fis;InputStreamReader isr;BufferedReader br;SpoutOutputCollector collector = null;String str = null;@Overridepublic void nextTuple() {try {while ((str = this.br.readLine()) != null) {// 过滤动作collector.emit(new Values(str, str.split("\t")[1]));}} catch (Exception e) {}}@Overridepublic void close() {try {br.close();isr.close();fis.close();} catch (Exception e) {e.printStackTrace();}}@Overridepublic void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {try {this.collector = collector;this.fis = new FileInputStream("track.log");this.isr = new InputStreamReader(fis, "UTF-8");this.br = new BufferedReader(isr);} catch (Exception e) {e.printStackTrace();}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("log", "session_id"));}@Overridepublic Map<String, Object> getComponentConfiguration() {return null;}@Overridepublic void ack(Object msgId) {System.out.println("spout ack:" + msgId.toString());}@Overridepublic void activate() {}@Overridepublic void deactivate() {}@Overridepublic void fail(Object msgId) {System.out.println("spout fail:" + msgId.toString());}}

4.Boult:

package com.sxt.storm.grouping;import java.util.Map;import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;public class MyBolt implements IRichBolt {private static final long serialVersionUID = 1L;OutputCollector collector = null;int num = 0;String valueString = null;@Overridepublic void cleanup() {}@Overridepublic void execute(Tuple input) {try {valueString = input.getStringByField("log");if (valueString != null) {num++;System.err.println(input.getSourceStreamId() + " " + Thread.currentThread().getName() + "--id="+ Thread.currentThread().getId() + "   lines  :" + num + "   session_id:"+ valueString.split("\t")[1]);}collector.ack(input);// Thread.sleep(2000);} catch (Exception e) {collector.fail(input);e.printStackTrace();}}@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.collector = collector;}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields(""));}@Overridepublic Map<String, Object> getComponentConfiguration() {return null;}}

Storm部分:Storm Grouping -- 数据流分组(各种数据分发策略的练习)【Java版纯代码】相关推荐

  1. 2021年大数据Kafka(十):kafka生产者数据分发策略

    全网最详细的大数据Kafka文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 生产者数据分发策略 策略一:用户指定了partition 策 ...

  2. 基于WiFi的CSI数据做呼吸频率检测-python版(含代码和数据)

    一.概述 本Demo无需机器学习模型,Demo功能涉及的理论主要参考了硕士学位论文<基于WiFi的人体行为感知技术研究>,作者是南京邮电大学的朱XX,本人用python复现了论文中呼吸频率 ...

  3. Marc数据解析和拼接(java版)

    解析marc数据: marc数据分为三部分:标识区.目次区.数据记录区.详情请仔细查找资料,本文不多介绍,直接上代码 import java.util.ArrayList; import java.u ...

  4. DNN 数据访问策略 (转)

    经过几天断断续续的努力,这篇文章终于翻译结束,文章主要讲了DNN的数据访问策略,对于了解系统整体上是如何工作的有一定的帮助,希望能给dnn的初学者一些有用的信息.由于翻译的匆忙+水平有限,错误或不当之 ...

  5. 聊聊storm的direct grouping

    序 本文主要研究一下storm的direct grouping direct grouping direct grouping是一种特殊的grouping,它是由上游的producer直接指定下游哪个 ...

  6. 【Storm】storm安装、配置、使用以及Storm单词计数程序的实例分析

    前言:阅读笔记 storm和hadoop集群非常像.hadoop执行mr.storm执行topologies. mr和topologies最关键的不同点是:mr执行终于会结束,而topologies永 ...

  7. Storm入门-Storm与Spark对比

    作为一名程序员通病就是不安分,对业界的技术总要折腾一番,哪怕在最终实际工作中应用到的就那么一点.最近自己准备入门Storm学习,关于流式大数据框架目前比较流行的有Spark和Storm等,在入门之前, ...

  8. 【Storm】Storm简介及Storm集群的安装部署

    1.Storm概述 (1)Storm简介 Storm最早是由BackType公司开发的实时处理系统,底层由Clojure实现.Clojure也是一门基于JVM的高级面向函数式的编程语言.2011年Tw ...

  9. 用户组管理之删除分组表数据

    删除分组表数据 接口分析 请求方式: Delte /meiduo_admin/permission/groups/(?P<pk>\d+)/ 请求参数: 通过请求头传递jwt token数据 ...

最新文章

  1. python中双冒号[::]切片的作用
  2. 对象序列化实现深度克隆
  3. QT学习:Qt操作数据库
  4. Glib 对 C 函数进行单元测试
  5. java返回泛型_Java泛型从泛型方法返回持有者对象
  6. 项目中的textarea遇到的小问题
  7. 漏洞防御方案_越权漏洞原理及防御方案
  8. 音乐类Demo资源大全
  9. png批量转换成jpg,png批量转jpg方法
  10. keil编译出现多重定义的问题
  11. Visual Studio添加代码片段
  12. 【linux基础1】linux命令行使用技巧
  13. 【非原创】这一生至少当一次傻瓜(r4笔记第88天)
  14. drf路由组件Routers
  15. 基于UML的软件开发过程
  16. input函数以及while处理列表和字典
  17. 库编译:opencv 交叉编译静态库
  18. 重磅!详解阿里研究院互联网+报告
  19. 数学推理相关的几个名词及LaTeX用法
  20. linux 阻止 复位命令,Linux下锁定账号,禁止登录系统的设置总结

热门文章

  1. Word2013论文的目录和页眉页脚设置
  2. C语言打开文件,并把文件内容打印
  3. Python 爬虫篇#笔记02# | 网页请求原理 和 抓取网页数据
  4. 2023年推荐几款开源或免费的web应用防火墙
  5. Kubernetes上安装weblogic monitoring exporter
  6. 【源码篇】源码阅读集合
  7. 再生核希尔伯特空间(RKHS)
  8. 【嵌入式Linux学习七步曲之第五篇 Linux内核及驱动编程】Oops在Linux 2.6内核+PowerPC架构下的前世今生
  9. python_paramiko模块用法
  10. 思科虚拟服务器修改域名,思科域名服务器设置