Storm部分:Storm Grouping -- 数据流分组(各种数据分发策略的练习)【Java版纯代码】
1.源数据
www.taobao.com XXYH6YCGFJYERTT834R52FDXV9U34 2017-02-21 12:40:49
www.taobao.com XXYH6YCGFJYERTT834R52FDXV9U34 2017-02-21 09:40:49
www.taobao.com XXYH6YCGFJYERTT834R52FDXV9U34 2017-02-21 08:40:51
www.taobao.com VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 12:40:49
www.taobao.com BBYH61456FGHHJ7JL89RG5VV9UYU7 2017-02-21 08:40:51
www.taobao.com VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 08:40:52
www.taobao.com BBYH61456FGHHJ7JL89RG5VV9UYU7 2017-02-21 12:40:49
www.taobao.com BBYH61456FGHHJ7JL89RG5VV9UYU7 2017-02-21 09:40:49
www.taobao.com ABYH6Y4V4SCVXTG6DPB4VH9U123 2017-02-21 08:40:53
www.taobao.com ABYH6Y4V4SCVXTG6DPB4VH9U123 2017-02-21 09:40:49
www.taobao.com ABYH6Y4V4SCVXTG6DPB4VH9U123 2017-02-21 10:40:49
www.taobao.com XXYH6YCGFJYERTT834R52FDXV9U34 2017-02-21 12:40:49
www.taobao.com ABYH6Y4V4SCVXTG6DPB4VH9U123 2017-02-21 08:40:50
www.taobao.com CYYH6Y2345GHI899OFG4V9U567 2017-02-21 11:40:49
www.taobao.com VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com XXYH6YCGFJYERTT834R52FDXV9U34 2017-02-21 08:40:50
www.taobao.com XXYH6YCGFJYERTT834R52FDXV9U34 2017-02-21 08:40:53
www.taobao.com VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 09:40:49
www.taobao.com BBYH61456FGHHJ7JL89RG5VV9UYU7 2017-02-21 09:40:49
www.taobao.com BBYH61456FGHHJ7JL89RG5VV9UYU7 2017-02-21 08:40:52
www.taobao.com XXYH6YCGFJYERTT834R52FDXV9U34 2017-02-21 11:40:49
www.taobao.com XXYH6YCGFJYERTT834R52FDXV9U34 2017-02-21 08:40:51
www.taobao.com XXYH6YCGFJYERTT834R52FDXV9U34 2017-02-21 08:40:53
www.taobao.com BBYH61456FGHHJ7JL89RG5VV9UYU7 2017-02-21 08:40:53
www.taobao.com VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 08:40:50
www.taobao.com CYYH6Y2345GHI899OFG4V9U567 2017-02-21 08:40:53
www.taobao.com BBYH61456FGHHJ7JL89RG5VV9UYU7 2017-02-21 12:40:49
www.taobao.com XXYH6YCGFJYERTT834R52FDXV9U34 2017-02-21 11:40:49
www.taobao.com XXYH6YCGFJYERTT834R52FDXV9U34 2017-02-21 08:40:50
www.taobao.com XXYH6YCGFJYERTT834R52FDXV9U34 2017-02-21 08:40:53
www.taobao.com BBYH61456FGHHJ7JL89RG5VV9UYU7 2017-02-21 08:40:52
www.taobao.com CYYH6Y2345GHI899OFG4V9U567 2017-02-21 08:40:51
www.taobao.com ABYH6Y4V4SCVXTG6DPB4VH9U123 2017-02-21 10:40:49
www.taobao.com BBYH61456FGHHJ7JL89RG5VV9UYU7 2017-02-21 09:40:49
www.taobao.com XXYH6YCGFJYERTT834R52FDXV9U34 2017-02-21 08:40:50
www.taobao.com BBYH61456FGHHJ7JL89RG5VV9UYU7 2017-02-21 08:40:52
www.taobao.com VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 08:40:50
www.taobao.com VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 10:40:49
www.taobao.com XXYH6YCGFJYERTT834R52FDXV9U34 2017-02-21 12:40:49
www.taobao.com CYYH6Y2345GHI899OFG4V9U567 2017-02-21 10:40:49
www.taobao.com XXYH6YCGFJYERTT834R52FDXV9U34 2017-02-21 08:40:52
www.taobao.com BBYH61456FGHHJ7JL89RG5VV9UYU7 2017-02-21 12:40:49
www.taobao.com VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 08:40:51
www.taobao.com XXYH6YCGFJYERTT834R52FDXV9U34 2017-02-21 08:40:51
www.taobao.com BBYH61456FGHHJ7JL89RG5VV9UYU7 2017-02-21 08:40:52
www.taobao.com BBYH61456FGHHJ7JL89RG5VV9UYU7 2017-02-21 08:40:51
www.taobao.com XXYH6YCGFJYERTT834R52FDXV9U34 2017-02-21 08:40:53
www.taobao.com VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 08:40:53
www.taobao.com CYYH6Y2345GHI899OFG4V9U567 2017-02-21 08:40:50
www.taobao.com VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com CYYH6Y2345GHI899OFG4V9U567 2017-02-21 08:40:50
www.taobao.com CYYH6Y2345GHI899OFG4V9U567 2017-02-21 08:40:50
www.taobao.com BBYH61456FGHHJ7JL89RG5VV9UYU7 2017-02-21 08:40:51
www.taobao.com BBYH61456FGHHJ7JL89RG5VV9UYU7 2017-02-21 08:40:51
www.taobao.com BBYH61456FGHHJ7JL89RG5VV9UYU7 2017-02-21 08:40:51
www.taobao.com BBYH61456FGHHJ7JL89RG5VV9UYU7 2017-02-21 08:40:51
www.taobao.com BBYH61456FGHHJ7JL89RG5VV9UYU7 2017-02-21 08:40:51
www.taobao.com BBYH61456FGHHJ7JL89RG5VV9UYU7 2017-02-21 08:40:51
www.taobao.com BBYH61456FGHHJ7JL89RG5VV9UYU7 2017-02-21 08:40:51
www.taobao.com VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com BBYH61456FGHHJ7JL89RG5VV9UYU7 2017-02-21 08:40:51
www.taobao.com VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com BBYH61456FGHHJ7JL89RG5VV9UYU7 2017-02-21 08:40:51
www.taobao.com VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
www.taobao.com VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-02-21 11:40:49
2.main方法:
package com.sxt.storm.grouping;import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;public class Main {/*** @param args*/public static void main(String[] args) {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("spout", new MySpout(), 1);// shuffleGrouping其实就是随机往下游去发,不自觉的做到了负载均衡
// builder.setBolt("bolt", new MyBolt(), 2).shuffleGrouping("spout");// fieldsGrouping其实就是MapReduce里面理解的Shuffle,根据fields求hash来取模
// builder.setBolt("bolt", new MyBolt(), 2).fieldsGrouping("spout", new Fields("session_id"));// 只往一个里面发,往taskId小的那个里面去发送
// builder.setBolt("bolt", new MyBolt(), 2).globalGrouping("spout");// 等于shuffleGrouping
// builder.setBolt("bolt", new MyBolt(), 2).noneGrouping("spout");// 广播builder.setBolt("bolt", new MyBolt(), 2).allGrouping("spout");// Map conf = new HashMap();// conf.put(Config.TOPOLOGY_WORKERS, 4);Config conf = new Config();conf.setDebug(false);conf.setMessageTimeoutSecs(30);if (args.length > 0) {try {StormSubmitter.submitTopology(args[0], conf, builder.createTopology());} catch (AlreadyAliveException e) {e.printStackTrace();} catch (InvalidTopologyException e) {e.printStackTrace();}} else {LocalCluster localCluster = new LocalCluster();localCluster.submitTopology("mytopology", conf, builder.createTopology());}}}
3.Spout:
package com.sxt.storm.grouping;import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.util.Map;import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;public class MySpout implements IRichSpout {private static final long serialVersionUID = 1L;FileInputStream fis;InputStreamReader isr;BufferedReader br;SpoutOutputCollector collector = null;String str = null;@Overridepublic void nextTuple() {try {while ((str = this.br.readLine()) != null) {// 过滤动作collector.emit(new Values(str, str.split("\t")[1]));}} catch (Exception e) {}}@Overridepublic void close() {try {br.close();isr.close();fis.close();} catch (Exception e) {e.printStackTrace();}}@Overridepublic void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {try {this.collector = collector;this.fis = new FileInputStream("track.log");this.isr = new InputStreamReader(fis, "UTF-8");this.br = new BufferedReader(isr);} catch (Exception e) {e.printStackTrace();}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("log", "session_id"));}@Overridepublic Map<String, Object> getComponentConfiguration() {return null;}@Overridepublic void ack(Object msgId) {System.out.println("spout ack:" + msgId.toString());}@Overridepublic void activate() {}@Overridepublic void deactivate() {}@Overridepublic void fail(Object msgId) {System.out.println("spout fail:" + msgId.toString());}}
4.Boult:
package com.sxt.storm.grouping;import java.util.Map;import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;public class MyBolt implements IRichBolt {private static final long serialVersionUID = 1L;OutputCollector collector = null;int num = 0;String valueString = null;@Overridepublic void cleanup() {}@Overridepublic void execute(Tuple input) {try {valueString = input.getStringByField("log");if (valueString != null) {num++;System.err.println(input.getSourceStreamId() + " " + Thread.currentThread().getName() + "--id="+ Thread.currentThread().getId() + " lines :" + num + " session_id:"+ valueString.split("\t")[1]);}collector.ack(input);// Thread.sleep(2000);} catch (Exception e) {collector.fail(input);e.printStackTrace();}}@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.collector = collector;}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields(""));}@Overridepublic Map<String, Object> getComponentConfiguration() {return null;}}
Storm部分:Storm Grouping -- 数据流分组(各种数据分发策略的练习)【Java版纯代码】相关推荐
- 2021年大数据Kafka(十):kafka生产者数据分发策略
全网最详细的大数据Kafka文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 生产者数据分发策略 策略一:用户指定了partition 策 ...
- 基于WiFi的CSI数据做呼吸频率检测-python版(含代码和数据)
一.概述 本Demo无需机器学习模型,Demo功能涉及的理论主要参考了硕士学位论文<基于WiFi的人体行为感知技术研究>,作者是南京邮电大学的朱XX,本人用python复现了论文中呼吸频率 ...
- Marc数据解析和拼接(java版)
解析marc数据: marc数据分为三部分:标识区.目次区.数据记录区.详情请仔细查找资料,本文不多介绍,直接上代码 import java.util.ArrayList; import java.u ...
- DNN 数据访问策略 (转)
经过几天断断续续的努力,这篇文章终于翻译结束,文章主要讲了DNN的数据访问策略,对于了解系统整体上是如何工作的有一定的帮助,希望能给dnn的初学者一些有用的信息.由于翻译的匆忙+水平有限,错误或不当之 ...
- 聊聊storm的direct grouping
序 本文主要研究一下storm的direct grouping direct grouping direct grouping是一种特殊的grouping,它是由上游的producer直接指定下游哪个 ...
- 【Storm】storm安装、配置、使用以及Storm单词计数程序的实例分析
前言:阅读笔记 storm和hadoop集群非常像.hadoop执行mr.storm执行topologies. mr和topologies最关键的不同点是:mr执行终于会结束,而topologies永 ...
- Storm入门-Storm与Spark对比
作为一名程序员通病就是不安分,对业界的技术总要折腾一番,哪怕在最终实际工作中应用到的就那么一点.最近自己准备入门Storm学习,关于流式大数据框架目前比较流行的有Spark和Storm等,在入门之前, ...
- 【Storm】Storm简介及Storm集群的安装部署
1.Storm概述 (1)Storm简介 Storm最早是由BackType公司开发的实时处理系统,底层由Clojure实现.Clojure也是一门基于JVM的高级面向函数式的编程语言.2011年Tw ...
- 用户组管理之删除分组表数据
删除分组表数据 接口分析 请求方式: Delte /meiduo_admin/permission/groups/(?P<pk>\d+)/ 请求参数: 通过请求头传递jwt token数据 ...
最新文章
- python中双冒号[::]切片的作用
- 对象序列化实现深度克隆
- QT学习:Qt操作数据库
- Glib 对 C 函数进行单元测试
- java返回泛型_Java泛型从泛型方法返回持有者对象
- 项目中的textarea遇到的小问题
- 漏洞防御方案_越权漏洞原理及防御方案
- 音乐类Demo资源大全
- png批量转换成jpg,png批量转jpg方法
- keil编译出现多重定义的问题
- Visual Studio添加代码片段
- 【linux基础1】linux命令行使用技巧
- 【非原创】这一生至少当一次傻瓜(r4笔记第88天)
- drf路由组件Routers
- 基于UML的软件开发过程
- input函数以及while处理列表和字典
- 库编译:opencv 交叉编译静态库
- 重磅!详解阿里研究院互联网+报告
- 数学推理相关的几个名词及LaTeX用法
- linux 阻止 复位命令,Linux下锁定账号,禁止登录系统的设置总结
热门文章
- Word2013论文的目录和页眉页脚设置
- C语言打开文件,并把文件内容打印
- Python 爬虫篇#笔记02# | 网页请求原理 和 抓取网页数据
- 2023年推荐几款开源或免费的web应用防火墙
- Kubernetes上安装weblogic monitoring exporter
- 【源码篇】源码阅读集合
- 再生核希尔伯特空间(RKHS)
- 【嵌入式Linux学习七步曲之第五篇 Linux内核及驱动编程】Oops在Linux 2.6内核+PowerPC架构下的前世今生
- python_paramiko模块用法
- 思科虚拟服务器修改域名,思科域名服务器设置