Trident是Storm的延伸。像Storm一样,Trident也是由Twitter开发的。开发Trident的主要原因是在Storm之上提供高级抽象以及有状态流处理和低延迟分布式查询。

Trident使用喷嘴和螺栓,但这些底层组件在执行前由Trident自动生成。Trident具有功能,过滤器,连接,分组和聚合。

Trident处理流作为一系列被称为交易的批次。通常,这些小批量的大小将取决于数千或数百万个元组,取决于输入流。这样,Trident不同于Storm,它执行元组处理。

批处理概念与数据库事务非常相似。每笔交易都分配一个交易ID。一旦所有处理完成,交易即被视为成功。但是,处理一个事务元组失败将导致整个事务被重新传输。对于每个批次,Trident将在交易开始时调用beginCommit,并在结束时进行提交。

Trident拓扑

Trident API公开了使用“TridentTopology”类创建Trident拓扑的简单选项。基本上,Trident拓扑接收来自喷口的输入流并且在该流上执行有序的操作序列(过滤,聚合,分组等)。Storm元组被Trident元组取代,螺栓被操作取代。一个简单的Trident拓扑可以创建如下

TridentTopology topology = new TridentTopology();

Trident元组

Trident元组是一个已命名的值列表。TridentTuple接口是Trident拓扑的数据模型。TridentTuple接口是可以由Trident拓扑处理的基本数据单元。

Trident喷口

Trident喷口与Storm喷口相似,具有使用Trident功能的附加选项。实际上,我们仍然可以使用我们在Storm拓扑中使用的IRichSpout,但它本质上不具有事务性,我们将无法使用Trident提供的优势。

具有使用Trident功能的所有功能的基本喷嘴是“ITridentSpout”。它支持事务性和不透明事务语义。其他喷嘴是IBatchSpout,IPartitionedTridentSpout和IOpaquePartitionedTridentSpout。

除了这些通用喷嘴之外,Trident还有很多Trident喷嘴的实例。其中一个是FeederBatchSpout喷口,我们可以使用它轻松发送Trident元组的命名列表,而无需担心批处理,并行性等问题。

FeederBatchSpout的创建和数据馈送可以按照如下所示完成

TridentTopology topology = new TridentTopology();

FeederBatchSpout testSpout = new FeederBatchSpout(

ImmutableList.of("fromMobileNumber", "toMobileNumber", “duration”));

topology.newStream("fixed-batch-spout", testSpout)

testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", 20)));

Trident操作

Trident依靠“Trident操作”来处理Trident元组的输入流。Trident

API具有许多内置操作来处理从简单到复杂的流处理。这些操作从简单验证到复杂的Trident元组分组和聚合。让我们来看看最重要和最常用的操作。

过滤

过滤器是用于执行输入验证任务的对象。Trident过滤器获取Trident元组字段的子集作为输入,并根据某些条件是否满足返回true或false。如果返回true,则元组保存在输出流中;

否则,该元组将从流中移除。过滤器将基本上继承自 BaseFilter 类并实现 isKeep 方法。以下是过滤器操作的示例实现

public class MyFilter extends BaseFilter {

public boolean isKeep(TridentTuple tuple) {

return tuple.getInteger(1) % 2 == 0;

}

}

输入

[1, 2]

[1, 3]

[1, 4]

输出

[1, 2]

[1, 4]

可以使用“每个”方法在拓扑中调用过滤器函数。“Fields”类可用于指定输入(Trident元组的子集)。示例代码如下

TridentTopology topology = new TridentTopology();

topology.newStream("spout", spout)

.each(new Fields("a", "b"), new MyFilter())

功能

函数 是用于在单个Trident元组上执行简单操作的对象。它需要Trident元组字段的子集并发出零个或更多新的Trident元组字段。

函数 基本上从 BaseFunction 类继承并实现了 execute 方法。下面给出了一个示例实现 -

public class MyFunction extends BaseFunction {

public void execute(TridentTuple tuple, TridentCollector collector) {

int a = tuple.getInteger(0);

int b = tuple.getInteger(1);

collector.emit(new Values(a + b));

}

}

输入

[1, 2]

[1, 3]

[1, 4]

输出

[1, 2, 3]

[1, 3, 4]

[1, 4, 5]

就像过滤器操作一样,可以使用 每种 方法在拓扑中调用函数操作。示例代码如下

TridentTopology topology = new TridentTopology();

topology.newStream("spout", spout)

.each(new Fields(“a, b"), new MyFunction(), new Fields(“d")));

聚合

聚集是用于对输入批处理或分区或流执行聚合操作的对象。Trident有三种类型的聚合。他们如下

聚集 - 孤立地聚集每批Trident元组。 在聚合过程中,元组最初使用全局分组重新分区,以将同一批次的所有分区合并到一个分区中。

partitionAggregate - 聚合每个分区,而不是整个批次的Trident元组。 分区聚合的输出完全替换了输入元组。分区聚合的输出包含单个字段元组。

persistentaggregate - 在所有批次的所有Trident元组上聚合并将结果存储在内存或数据库中。

TridentTopology topology = new TridentTopology();

// aggregate operation

topology.newStream("spout", spout)

.each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))

.aggregate(new Count(), new Fields(“count”))

// partitionAggregate operation

topology.newStream("spout", spout)

.each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))

.partitionAggregate(new Count(), new Fields(“count"))

// persistentAggregate - saving the count to memory

topology.newStream("spout", spout)

.each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))

.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

可以使用CombinerAggregator,ReducerAggregator或通用Aggregator接口创建聚合操作。上面例子中使用的“count”聚合器是内置聚合器之一,它使用“CombinerAggregator”实现,具体实现如下

public class Count implements CombinerAggregator {

@Override

public Long init(TridentTuple tuple) {

return 1L;

}

@Override

public Long combine(Long val1, Long val2) {

return val1 + val2;

}

@Override

public Long zero() {

return 0L;

}

}

分组

分组操作是一种内置操作,可以通过 groupBy

方法调用。groupBy方法通过在指定的字段上执行partitionBy来重新分区流,然后在每个分区内将它的组字段相等的元组分组在一起。通常,我们使用“groupBy”和“persistentAggregate”来获得分组聚合。示例代码如下

TridentTopology topology = new TridentTopology();

// persistentAggregate - saving the count to memory

topology.newStream("spout", spout)

.each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))

.groupBy(new Fields(“d”)

.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

合并和加入

合并和连接可以分别使用“合并”和“连接”方法完成。合并合并一个或多个流。连接类似于合并,除了连接使用来自双方的三叉形元组字段来检查和连接两个流。而且,加入只能在批次级别下工作。示例代码如下

TridentTopology topology = new TridentTopology();

topology.merge(stream1, stream2, stream3);

topology.join(stream1, new Fields("key"), stream2, new Fields("x"),

new Fields("key", "a", "b", "c"));

状态维护

Trident提供了状态维护机制。状态信息可以存储在拓扑本身中,否则可以将其存储在单独的数据库中。原因是维护一个状态,即如果任何元组在处理期间失败,则重试失败的元组。这在更新状态时会产生问题,因为您不确定此元组的状态是否已更新过。如果元组在更新状态之前失败了,那么重试元组将使状态稳定。但是,如果元组在更新状态后失败,则重试同一元组将再次增加数据库中的计数并使状态不稳定。需要执行以下步骤来确保消息只处理一次

小批量处理元组。

为每个批次分配一个唯一的ID。如果批次重试,则会给出相同的唯一ID。

状态更新在批次中排序。例如,第二批次的状态更新将不可能,直到第一批次的状态更新完成。

分布式RPC

分布式RPC用于查询和检索Trident拓扑的结果。Storm有一个内置的分布式RPC服务器。分布式RPC服务器接收来自客户端的RPC请求并将其传递给拓扑。拓扑处理请求并将结果发送到分布式RPC服务器,该服务器由分布式RPC服务器重定向到客户端。Trident的分布式RPC查询像普通的RPC查询一样执行,除了这些查询是并行运行的。

何时使用Trident?

和许多用例一样,如果需求只处理查询一次,我们可以通过在Trident中编写拓扑来实现。另一方面,在Storm的情况下,很难实现一次处理。因此Trident对那些需要精确处理一次的用例非常有用。Trident并非针对所有用例,特别是高性能用例,因为它增加了Storm的复杂性并管理了状态。

Trident的工作例子

我们将把我们在前一节中制定的呼叫日志分析器应用程序转换为Trident框架。由于其高级API,Trident应用相对简单Storm相对容易。Storm基本上需要执行Trident中的Function,Filter,Aggregate,GroupBy,Join和Merge操作中的任何一个。最后,我们将使用

LocalDRPC 类启动DRPC服务器,并使用 LocalDRPC 类的 执行 方法搜索一些关键字。

格式化通话信息

FormatCall类的用途是格式化包含“呼叫者号码”和“接收者号码”的呼叫信息。完整的程序代码如下所示 -

编码:FormatCall.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;

import storm.trident.operation.TridentCollector;

import storm.trident.tuple.TridentTuple;

public class FormatCall extends BaseFunction {

@Override

public void execute(TridentTuple tuple, TridentCollector collector) {

String fromMobileNumber = tuple.getString(0);

String toMobileNumber = tuple.getString(1);

collector.emit(new Values(fromMobileNumber + " - " + toMobileNumber));

}

}

CSVSplit

CSVSplit类的用途是根据“逗号(,)”分割输入字符串并发送字符串中的每个单词。该函数用于解析分布式查询的输入参数。完整的代码如下 -

编码:CSVSplit.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;

import storm.trident.operation.TridentCollector;

import storm.trident.tuple.TridentTuple;

public class CSVSplit extends BaseFunction {

@Override

public void execute(TridentTuple tuple, TridentCollector collector) {

for(String word: tuple.getString(0).split(",")) {

if(word.length() > 0) {

collector.emit(new Values(word));

}

}

}

}

日志分析器

这是主要的应用程序。最初,应用程序将使用 FeederBatchSpout

初始化TridentTopology并提供来电者信息。Trident拓扑流可以使用TridentTopology类的 newStream

方法创建。同样,可以使用TridentTopology类的 newDRCPStream

方法创建Trident拓扑DRPC流。一个简单的DRCP服务器可以使用LocalDRPC类创建。 LocalDRPC

具有执行搜索某个关键字的方法。完整的代码如下。

编码:LogAnalyserTrident.java

import java.util.*;

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.LocalDRPC;

import backtype.storm.utils.DRPCClient;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Values;

import storm.trident.TridentState;

import storm.trident.TridentTopology;

import storm.trident.tuple.TridentTuple;

import storm.trident.operation.builtin.FilterNull;

import storm.trident.operation.builtin.Count;

import storm.trident.operation.builtin.Sum;

import storm.trident.operation.builtin.MapGet;

import storm.trident.operation.builtin.Debug;

import storm.trident.operation.BaseFilter;

import storm.trident.testing.FixedBatchSpout;

import storm.trident.testing.FeederBatchSpout;

import storm.trident.testing.Split;

import storm.trident.testing.MemoryMapState;

import com.google.common.collect.ImmutableList;

public class LogAnalyserTrident {

public static void main(String[] args) throws Exception {

System.out.println("Log Analyser Trident");

TridentTopology topology = new TridentTopology();

FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("fromMobileNumber",

"toMobileNumber", "duration"));

TridentState callCounts = topology

.newStream("fixed-batch-spout", testSpout)

.each(new Fields("fromMobileNumber", "toMobileNumber"),

new FormatCall(), new Fields("call"))

.groupBy(new Fields("call"))

.persistentAggregate(new MemoryMapState.Factory(), new Count(),

new Fields("count"));

LocalDRPC drpc = new LocalDRPC();

topology.newDRPCStream("call_count", drpc)

.stateQuery(callCounts, new Fields("args"), new MapGet(), new Fields("count"));

topology.newDRPCStream("multiple_call_count", drpc)

.each(new Fields("args"), new CSVSplit(), new Fields("call"))

.groupBy(new Fields("call"))

.stateQuery(callCounts, new Fields("call"), new MapGet(),

new Fields("count"))

.each(new Fields("call", "count"), new Debug())

.each(new Fields("count"), new FilterNull())

.aggregate(new Fields("count"), new Sum(), new Fields("sum"));

Config conf = new Config();

LocalCluster cluster = new LocalCluster();

cluster.submitTopology("trident", conf, topology.build());

Random randomGenerator = new Random();

int idx = 0;

while(idx < 10) {

testSpout.feed(ImmutableList.of(new Values("1234123401",

"1234123402", randomGenerator.nextInt(60))));

testSpout.feed(ImmutableList.of(new Values("1234123401",

"1234123403", randomGenerator.nextInt(60))));

testSpout.feed(ImmutableList.of(new Values("1234123401",

"1234123404", randomGenerator.nextInt(60))));

testSpout.feed(ImmutableList.of(new Values("1234123402",

"1234123403", randomGenerator.nextInt(60))));

idx = idx + 1;

}

System.out.println("DRPC : Query starts");

System.out.println(drpc.execute("call_count","1234123401 - 1234123402"));

System.out.println(drpc.execute("multiple_call_count", "1234123401 -

1234123402,1234123401 - 1234123403"));

System.out.println("DRPC : Query ends");

cluster.shutdown();

drpc.shutdown();

// DRPCClient client = new DRPCClient("drpc.server.location", 3772);

}

}

构建和运行应用程序

完整的应用程序有三个Java代码。他们如下 -

FormatCall.java

CSVSplit.java

LogAnalyerTrident.java

应用程序可以通过使用以下命令来构建 -

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

该应用程序可以通过使用以下命令运行 -

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserTrident

输出

应用程序启动后,应用程序将输出关于集群启动过程,操作处理,DRPC服务器和客户端信息以及集群关闭过程的完整详细信息。该输出将显示在控制台上,如下所示。

DRPC : Query starts

[["1234123401 - 1234123402",10]]

DEBUG: [1234123401 - 1234123402, 10]

DEBUG: [1234123401 - 1234123403, 10]

[[20]]

DRPC : Query ends

java操作storm_Storm Trident相关推荐

  1. java 操作 redis_java操作Redis

    10. java操作Redis 10.1 环境准备 1. 引入依赖 redis.clients jedis 2.9.0 2.创建jedis对象 package org.example; import ...

  2. HBase安装配置以及Java操作hbase

    2019独角兽企业重金招聘Python工程师标准>>> Apache HBase Apache HBase™是Hadoop数据库,是一个分布式,可扩展的大数据存储. 当您需要对大数据 ...

  3. Java操作Kafka执行不成功

    使用kafka-clients操作kafka始终不成功,原因不清楚,下面贴出相关代码及配置,请懂得指点一下,谢谢! 环境及依赖 <dependency><groupId>org ...

  4. java操作elasticsearch实现query String

    1.CommonTersQuery: 指定字段进行模糊查询 //commonTermsQuery @Test public void test35() throws UnknownHostExcept ...

  5. java操作dom节点的添加_java操作DOM节点的添加,删除,修改

    java操作DOM节点的添加,删除,修改 下面我们开始对此xml添加,删除,修改:方法一 import java.io.File; import java.io.IOException; import ...

  6. rocketmq(三 java操作rocket API, rocketmq 幂等性)

    JAVA操作rocketmq: 1.导入rocketmq所需要的依赖: <dependency><groupId>com.alibaba.rocketmq</groupI ...

  7. redis入门及java操作

    redis 命令可以去菜鸟教程http://www.runoob.com/redis/redis-tutorial.html 或者以下地址去学习http://www.cnblogs.com/huang ...

  8. java excel读取操作,Java 操作 Excel (读取Excel2003 2007,Poi兑现)

    Java 操作 Excel (读取Excel2003 2007,Poi实现) 一. Apache POI 简介( http://poi.apache.org/) 使用Java程序读写Microsoft ...

  9. hadoop java操作hdfs

    hfds 是一种文件系统,用于存储hadoop将要处理的数据.适用于大规模分布式数据处理,是一个可扩展行的文件分布式系统: 优点 1.如果出现节点宕机,hdfs,可以持续监视,错误检查,容错处理,文档 ...

最新文章

  1. [Java]Stack栈和Heap堆的区别(终结篇)[转]
  2. 如何通过一个类名找到它属于哪个jar包?
  3. 机器学习的练功心法(一)——机器学习概述
  4. 工作日志20150202
  5. WSL 1 运行 Ubuntu 20.04 将会出现问题
  6. CVPR2022车道线检测Efficient Lane Detection via Curve Modeling
  7. Nginx SSL 性能调优
  8. 生物聚集细胞生物化学反应的组织者Biomolecular condensates: organizers of cellular biochemistry
  9. 爬虫python下载视频_利用python爬虫通过m3u8文件下载ts视频
  10. Ubuntu18.04终端里,随意拖动或双击会出现ctrl+C的效果,解决
  11. 服装计算机辅助设计(CAD)技能证书
  12. python成绩统计_巧用python对学生成绩计算总分并排序
  13. Linux 配置 VNC 远程桌面
  14. 无线信道特性分析及建模仿真
  15. openwrt 3G上网功能配置(联通版本)
  16. 微信小程序navigateBack返回数据
  17. 深度学习提高模型准确率方法
  18. Mac文件编码格式转换
  19. 【cmake学习】cmake 引入第三方库(头文件目录、库目录、库文件)
  20. 腾讯云从业者资格认证考试模拟题

热门文章

  1. CCF系列题解--2016年9月第二题 火车购票
  2. unity 完美像素_完美风暴过后– Unity Hack Week XII
  3. 《C Primer Plus》第五章 编程习题
  4. ubuntu18.04使用蓝牙适配器
  5. usart串口发送与接收问题
  6. 神经网络训练会释放60多万磅二氧化碳?MIT的方法让它降到1/1300
  7. K3S - 轻量级Kubernetes集群
  8. python 读写csv文件(创建、追加、覆盖)_python 读写csv文件(创建,追加,覆盖)...
  9. PLM系统应用案例:重庆通用工业(集团)有限责任公司
  10. 迅雷ios版下载beta