基于事务

static interface ITridentSpout.BatchCoordinator<X>

static interface ITridentSpout.Emitter<X>

接口类的实现和之前事务ITransactionalSpout 非常类似。


Topo例子

topology.newDRPCStream("top", drpc)
.each(new Fields("args"), new Split(“ ”), new Fields("time"))
.parallelismHint(5)
.stateQuery(myStates,newFields("time"),new QueryPacketDB(),new Fields("srcip", "byt", "pkt"))
.groupBy(new Fields("srcip"))
.chainedAgg()
.aggregate(new Fields("byt"), new Sum(), new Fields("yt"))
.aggregate(new Fields("pkt"), new Sum(), new Fields("kt"))
.chainEnd()
.applyAssembly(new FirstN(10, "yt", true));
调用链用于执行多个聚合

如果想同事执行多个聚合,可以使用如下的调用链

mystream.chainedAgg()

.partitionAggregate(new Count(), new Fields("count"))

.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))

.chainEnd()

这个代码将会在每个分区上执行count和sum聚合。输出将包含【“count”,“sum”】字段。
投影(projection)

投影操作是对数据上进行列裁剪。

如果你有一个流有【“a”,“b”,“c”,“d”】四个字段,执行下面的代码:

mystream.project(new Fields("b","d"));

输出流将只有【“b”,“d”】两个字段。

重分区(repartition)操作

重分区操作是通过一个函数改变元组(tuple)在task之间的分布,   重分区(repatition)需要网络传输,目的是方便聚合或查询。如下是重分区函数:

1.      Shuffle:

2.      Broadcast:每个元组重复的发送到所有的目标分区。这个在DRPC中很有用。  如果你想做在每个分区上做一个statequery。

3.      paritionBy:根据一系列分发字段(fields)做一个语义的分区。通过对这些字  段取hash值并对目标分区数取模获取目标分区。paritionBy保证相同的分发  字段(fields)分发到相同的目标分区。

4.      global:所有的tuple分发到相同的分区。

5.      batchGobal:本批次的所有tuple发送到相同的分区,不通批次可以在不通的分  区。

6.      patition:这个函数接受用户自定义的分区函数。用户自定义函数事项 backtype.storm.grouping.CustomStreamGrouping接口。

合并和关联

合并(merge)多个流成为一个流,可以如下:

topology.merge(stream1, stream2, stream3);

Trident合并的流字段会以第一个流的字段命名。

另一个合并流的方法是join。类似SQL的join都是对固定输入的。而流的输入是不固定的,所以不能按照sql的方法做join。

Trident中的join只会在spout发出的每个批次间进行。

如一个流包含字段【“key”,“val1”,“val2”】,

另一个流包含字段【“x”,“val1”】:

topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key","a","b","c"));

Stream1的“key”和stream2的“x”关联,Trident要求所有的字段要改名字。

1.      首先是join字段。例子中stream1中的“key”对应stream2中的“x”。

2.      接下来,会把非join字段依次列出来,排列顺序按照传给join的顺序。例子中“a”,“b”对应stream1中的“val1”和“wal2”,“c”对应stream2中的“val1”。
FirstN

取Top N

用法:

stream.applyAssembly(new FirstN(TOP_N, "sortField", true));

小结

Trident适合做汇总型,不大适合做去重型

STORM之ITridentSpout、FirstN(取Top N)实现、流合并和join相关推荐

  1. 巧用row_number和partition by分组取top数据

    2019独角兽企业重金招聘Python工程师标准>>> 分组取TOP数据是T-SQL中的常用查询, 如学生信息管理系统中取出每个学科前3名的学生.这种查询在SQL Server 20 ...

  2. Python取top N相关的模块:heapq模块

    Python取top N相关的模块:heapq模块 原文:[Python]Python取top N相关的模块:heapq模块_小怪孩的成长之路-CSDN博客_python 取top 最近在程序中需要取 ...

  3. MariaDB Window Functions窗口函数分组取TOP N记录

    窗口函数在MariaDB10.2版本里实现,其简化了复杂SQL的撰写,提高了可读性. 在某些方面,窗口函数类似于聚集函数, 但它不像聚集函数那样每组只返回一个值,窗口函数可以为每组返回多个值. 作为一 ...

  4. 爬取B站直播流 - http+flv的相关研究

    参考链接 HTTP-FLV直播初探 HTTP-FLV的两种方式 Json将&符号转成了 \u0026 python requests提示警告InsecureRequestWarning 爬取B ...

  5. oracle分组_MySQL分组取TOP,实现Oracle的ROW_NUMBER函数的功能

    Excel中分组排序只需要对数据进行升序降序,再利用if函数添加排序序号,即可筛选出分组top数据. Oracle也有row_number()函数对数据进行分组排序,而MySQL并没有此类函数,那么如 ...

  6. hive udf 分组取top1_Hive分组取Top K数据

    1.ROW_NUMBER,RANK(),DENSE_RANK() 语法格式:row_number() OVER (partition by COL1 order by COL2 desc ) rank ...

  7. mysql group top_Mysql group by 后取 top N 问题

    测试数据 DROP TABLE IF EXISTS `t`; CREATE TABLE `t` ( `id` int(11) NOT NULL, `class` varchar(255) DEFAUL ...

  8. SQL:分组排序取top N

    1 法一 在hive上查询 select a.course,a.score from( select course,score,row_number() over(partition by cours ...

  9. Hive 分组取Top N

    成绩表S,字段sid,cid,score 求每门课程的前2名与后2名sid with a as (select sid,rank() over(distribute by cid sort by sc ...

  10. mysql 取top 10_我的mysql如何分组取top10?

    mysql 5.7.18 表DDL如下 CREATE TABLE 'roadnet_monitor_flowdata2' ( 'id' INT(10) DEFAULT NULL, '交通类型' VAR ...

最新文章

  1. Python:pip 和pip3的区别
  2. jdk是什么?jdk1.8安装配置方法
  3. 有小数点的补码怎么算_写给投资小白,指数基金,在哪买?怎么买?
  4. 计算机视觉与深度学习 | 对恐怖袭击事件记录数据的量化分析(附源代码):博主的机器学习首秀(研究生数学建模二等奖)
  5. @action 注解
  6. JBoss Data Virtualization 6.1 Beta现在可用
  7. postman接口自动化
  8. springboot api文档_SpringCloud/SpringBoot - 自动生成API文档
  9. Vue优化策略_项目上线_02
  10. 统计学习方法笔记(李航)———第六章(逻辑斯谛回归)
  11. delphi启动ie调用本地html传参数_年轻人不讲武德啊!了解下浏览器如何解析html、css,js
  12. nRF52840(一) 环境搭建
  13. 快速学会CC2530单片机基础点灯
  14. vs2015升级旧工程报snprintf已有主体及“int8_t” 重定义
  15. 【大咖说Ⅰ】北邮博导石川教授:图机器学习及其应用
  16. 高通平台ITS:sensor_fusion test_sensor_fusion.py Fail
  17. php-fpm安装,启动,重启
  18. 项目提测CheckList通用版
  19. 无法访问其他计算机共享打印机,想共享别人的打印机,但显示无法连接到打印机怎么处理?高手来!...
  20. matlab设计激光腔,激光原理课程设计--平行平面腔自再现模Fox-Li数值迭代解法及MATLAB实现...

热门文章

  1. 阶段3 3.SpringMVC·_07.SSM整合案例_02.ssm整合之搭建环境
  2. el-dialog 一些问题 局中滚动
  3. ZOJ3953 Intervals
  4. 好公司和差公司的对比
  5. 在64位系统中无法看到Microsoft Excel Application的问题
  6. Android Butterknife框架 注解攻略
  7. 【IPC通信】基于管道的popen和pclose函数
  8. linux下proc里关于磁盘性能的参数(转)
  9. 在应用程序级别之外使用注册为 allowDefinition='MachineToApplication'
  10. 使用Vue做评论+localStorage存储(js模块化)