您现在访问的是微软AZURE全球版技术文档网站,若需要访问由世纪互联运营的MICROSOFT AZURE中国区技术文档网站,请访问 https://docs.azure.cn.

以 Java 语言创建 Apache Storm 拓扑Create an Apache Storm topology in Java

04/27/2020

本文内容

了解如何为 Apache Storm 创建基于 Java 的拓扑。Learn how to create a Java-based topology for Apache Storm. 将创建一个实现单词计数应用程序的 Storm 拓扑。You create a Storm topology that implements a word-count application. 将使用 Apache Maven 生成并打包项目。You use Apache Maven to build and package the project. 然后,了解如何使用 Apache Storm Flux 框架定义拓扑。Then, you learn how to define the topology using the Apache Storm Flux framework.

完成本文档中的步骤之后,便可以将拓扑部署到 Apache Storm on HDInsight。After completing the steps in this document, you can deploy the topology to Apache Storm on HDInsight.

先决条件Prerequisites

根据 Apache 要求正确安装的 Apache Maven。Apache Maven properly installed according to Apache. Maven 是 Java 项目的项目生成系统。Maven is a project build system for Java projects.

测试环境Test environment

本文使用的环境是一台运行 Windows 10 的计算机。The environment used for this article was a computer running Windows 10. 命令在命令提示符下执行,各种文件使用记事本进行编辑。The commands were executed in a command prompt, and the various files were edited with Notepad.

在命令提示符下,输入以下命令以创建工作环境:From a command prompt, enter the commands below to create a working environment:

mkdir C:\HDI

cd C:\HDI

创建 Maven 项目Create a Maven project

输入以下命令,创建名为 WordCount 的 Maven 项目:Enter the following command to create a Maven project named WordCount:

mvn archetype:generate -DarchetypeArtifactId=maven-archetype-quickstart -DgroupId=com.microsoft.example -DartifactId=WordCount -DinteractiveMode=false

cd WordCount

mkdir resources

该命令会在当前位置创建名为 WordCount 的目录,其中包含基本 Maven 项目。This command creates a directory named WordCount at the current location, which contains a basic Maven project. 第二条命令将现有工作目录更改为 WordCount。The second command changes the present working directory to WordCount. 第三条命令创建稍后要使用的新目录 resources。The third command creates a new directory, resources, which will be used later. WordCount 目录包含以下项:The WordCount directory contains the following items:

pom.xml:包含 Maven 项目的设置。pom.xml: Contains settings for the Maven project.

src\main\java\com\microsoft\example:包含应用程序代码。src\main\java\com\microsoft\example: Contains your application code.

src\test\java\com\microsoft\example:包含应用程序的测试。src\test\java\com\microsoft\example: Contains tests for your application.

删除生成的示例代码Remove the generated example code

输入以下命令,删除生成的测试和应用程序文件 AppTest.java 与 App.java:Delete the generated test and application files AppTest.java, and App.java by entering the commands below:

DEL src\main\java\com\microsoft\example\App.java

DEL src\test\java\com\microsoft\example\AppTest.java

添加 Maven 存储库Add Maven repositories

由于 HDInsight 基于 Hortonworks Data Platform (HDP),因此我们建议使用 Hortonworks 存储库来下载 Apache Storm 项目的依赖项。HDInsight is based on the Hortonworks Data Platform (HDP), so we recommend using the Hortonworks repository to download dependencies for your Apache Storm projects.

输入以下命令打开 pom.xml:Open pom.xml by entering the command below:

notepad pom.xml

然后,在 https://maven.apache.org 行的后面添加以下 XML:Then add the following XML after the https://maven.apache.org line:

true

always

warn

false

never

fail

HDPReleases

HDP Releases

https://repo.hortonworks.com/content/repositories/releases/

default

true

always

warn

false

never

fail

HDPJetty

Hadoop Jetty

https://repo.hortonworks.com/content/repositories/jetty-hadoop/

default

添加属性Add properties

Maven 允许定义项目级的值,称为属性。Maven allows you to define project-level values called properties. 在 pom.xml 中的 行后面添加以下文本:In pom.xml, add the following text after the line:

UTF-8

1.1.0.2.6.1.9-1

现在,可以在 pom.xml 的其他部分中使用此值。You can now use this value in other sections of the pom.xml. 例如,在指定 Storm 组件的版本时,可以使用 ${storm.version} 而无需将值硬编码。For example, when specifying the version of Storm components, you can use ${storm.version} instead of hard coding a value.

添加依赖项Add dependencies

添加 Storm 组件的依赖项。Add a dependency for Storm components. 在 pom.xml 的 节中添加以下文本:In pom.xml, add the following text in the section:

org.apache.storm

storm-core

${storm.version}

provided

在编译时,Maven 会使用此信息在 Maven 存储库中查找 storm-core。At compile time, Maven uses this information to look up storm-core in the Maven repository. 它会先查找本地计算机上的存储库。It first looks in the repository on your local computer. 如果文件不存在,Maven 会从公共 Maven 存储库下载这些文件,并将其存储在本地存储库中。If the files aren't there, Maven downloads them from the public Maven repository and stores them in the local repository.

备注

请注意该部分中的 provided 行。Notice the provided line in this section. 此设置会告诉 Maven 从创建的任何 JAR 文件中排除 storm-core,因为系统会提供它。This setting tells Maven to exclude storm-core from any JAR files that are created, because it is provided by the system.

生成配置Build configuration

Maven 插件可用于自定义项目的生成阶段。Maven plug-ins allow you to customize the build stages of the project. 例如,如何编译项目或者如何将其打包到 JAR 文件中。For example, how the project is compiled or how to package it into a JAR file. 在 pom.xml 中,紧靠在 行的上面添加以下文本:In pom.xml, add the following text directly above the line.

此节用于添加插件、资源和其他生成配置选项。This section is used to add plug-ins, resources, and other build configuration options. For a full reference of the pom.xml file, see https://maven.apache.org/pom.html.

添加插件Add plug-ins

Exec Maven 插件Exec Maven Plugin

对于以 Java 语言实现的 Apache Storm 拓扑,Exec Maven 插件十分有用,因为它可让你轻松地在开发环境本地运行拓扑。For Apache Storm topologies implemented in Java, the Exec Maven Plugin is useful because it allows you to easily run the topology locally in your development environment. 在 pom.xml 文件的 部分中添加以下内容,以包括 Exec Maven 插件:Add the following to the section of the pom.xml file to include the Exec Maven plugin:

org.codehaus.mojo

exec-maven-plugin

1.6.0

exec

java

true

false

compile

${storm.topology}

false

Apache Maven Compiler 插件Apache Maven Compiler Plugin

Another useful plug-in is the Apache Maven Compiler Plugin, which is used to change compilation options. 更改 Maven 用作应用程序源和目标的 Java 版本。Change the Java version that Maven uses for the source and target for your application.

对于 HDInsight 3.4 或更早的版本,请将源和目标 Java 版本设置为 1.7。For HDInsight 3.4 or earlier, set the source and target Java version to 1.7.

对于 HDInsight 3.5,请将源和目标 Java 版本设置为 1.8。For HDInsight 3.5, set the source and target Java version to 1.8.

在 pom.xml 文件的 部分添加以下文本,以包括 Apache Maven Compiler 插件。Add the following text in the section of the pom.xml file to include the Apache Maven Compiler plugin. 此示例指定 1.8,因此目标 HDInsight 版本为 3.5。This example specifies 1.8, so the target HDInsight version is 3.5.

org.apache.maven.plugins

maven-compiler-plugin

3.8.1

1.8

1.8

配置资源Configure resources

使用 resources 节可以包含非代码资源,例如拓扑中组件所需的配置文件。The resources section allows you to include non-code resources such as configuration files needed by components in the topology. 本示例在 pom.xml 文件的 节中添加以下文本。For this example, add the following text in the section of the pom.xml file. 然后保存并关闭该文件。Then save and close the file.

${basedir}/resources

false

log4j2.xml

本示例会将项目根目录 (${basedir}) 中的 resources 目录添加为包含资源的位置,并包含名为 log4j2.xml 的文件。This example adds the resources directory in the root of the project (${basedir}) as a location that contains resources, and includes the file named log4j2.xml. 此文件用于配置拓扑所要记录的信息。This file is used to configure what information is logged by the topology.

创建拓扑Create the topology

基于 Java 的 Apache Storm 拓扑包含必须编写(或引用)为依赖项的三个组件。A Java-based Apache Storm topology consists of three components that you must author (or reference) as a dependency.

Spout:读取外部源中的数据,并发出进入拓扑的数据流。Spouts: Reads data from external sources and emits streams of data into the topology.

Bolt:对 Spout 或其他 Bolt 发出的数据流进行处理,并发出一个或多个数据流。Bolts: Does processing on streams emitted by spouts or other bolts, and emits one or more streams.

拓扑:定义如何排列 Spout 和 Bolt,并提供拓扑的入口点。Topology: Defines how the spouts and bolts are arranged, and provides the entry point for the topology.

创建 SpoutCreate the spout

为了降低设置外部数据源的要求,以下 Spout 只会发出随机句子。To reduce requirements for setting up external data sources, the following spout simply emits random sentences. 它是 Storm-Starter 示例随附的 Spout 的修改版本。It's a modified version of a spout that is provided with the Storm-Starter examples. 虽然此拓扑使用一个 Spout,但其他拓扑可能存在将数据从不同源送入拓扑的多个 Spout。Although this topology uses one spout, others may have several that feed data from different sources into the topology.

输入以下命令,以创建并打开新文件 RandomSentenceSpout.java:Enter the command below to create and open a new file RandomSentenceSpout.java:

notepad src\main\java\com\microsoft\example\RandomSentenceSpout.java

将以下 Java 代码复制并粘贴到新文件中。Then copy and paste the java code below into the new file. 然后关闭该文件。Then close the file.

package com.microsoft.example;

import org.apache.storm.spout.SpoutOutputCollector;

import org.apache.storm.task.TopologyContext;

import org.apache.storm.topology.OutputFieldsDeclarer;

import org.apache.storm.topology.base.BaseRichSpout;

import org.apache.storm.tuple.Fields;

import org.apache.storm.tuple.Values;

import org.apache.storm.utils.Utils;

import java.util.Map;

import java.util.Random;

//This spout randomly emits sentences

public class RandomSentenceSpout extends BaseRichSpout {

//Collector used to emit output

SpoutOutputCollector _collector;

//Used to generate a random number

Random _rand;

//Open is called when an instance of the class is created

@Override

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {

//Set the instance collector to the one passed in

_collector = collector;

//For randomness

_rand = new Random();

}

//Emit data to the stream

@Override

public void nextTuple() {

//Sleep for a bit

Utils.sleep(100);

//The sentences that are randomly emitted

String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",

"four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };

//Randomly pick a sentence

String sentence = sentences[_rand.nextInt(sentences.length)];

//Emit the sentence

_collector.emit(new Values(sentence));

}

//Ack is not implemented since this is a basic example

@Override

public void ack(Object id) {

}

//Fail is not implemented since this is a basic example

@Override

public void fail(Object id) {

}

//Declare the output fields. In this case, an sentence

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("sentence"));

}

}

备注

有关从外部数据源读取的 Spout 的示例,请参阅以下示例之一:For an example of a spout that reads from an external data source, see one of the following examples:

TwitterSampleSpout:从Twitter 读取数据的示例 Spout。TwitterSampleSPout: An example spout that reads from Twitter.

Storm-Kafka:从 Kafka 读取数据的 Spout。Storm-Kafka: A spout that reads from Kafka.

创建 BoltCreate the bolts

Bolt 用于处理数据。Bolts handle the data processing. Bolt 可以执行任何操作,例如,计算、保存,或者与外部组件通信。Bolts can do anything, for example, computation, persistence, or talking to external components. 此拓扑使用两个 Bolt:This topology uses two bolts:

SplitSentence:将 RandomSentenceSpout 发出的句子分割成不同的单词。SplitSentence: Splits the sentences emitted by RandomSentenceSpout into individual words.

WordCount:统计每个单词的出现次数。WordCount: Counts how many times each word has occurred.

SplitSentenceSplitSentence

输入以下命令,以创建并打开新文件 SplitSentence.java:Enter the command below to create and open a new file SplitSentence.java:

notepad src\main\java\com\microsoft\example\SplitSentence.java

将以下 Java 代码复制并粘贴到新文件中。Then copy and paste the java code below into the new file. 然后关闭该文件。Then close the file.

package com.microsoft.example;

import java.text.BreakIterator;

import org.apache.storm.topology.BasicOutputCollector;

import org.apache.storm.topology.OutputFieldsDeclarer;

import org.apache.storm.topology.base.BaseBasicBolt;

import org.apache.storm.tuple.Fields;

import org.apache.storm.tuple.Tuple;

import org.apache.storm.tuple.Values;

//There are a variety of bolt types. In this case, use BaseBasicBolt

public class SplitSentence extends BaseBasicBolt {

//Execute is called to process tuples

@Override

public void execute(Tuple tuple, BasicOutputCollector collector) {

//Get the sentence content from the tuple

String sentence = tuple.getString(0);

//An iterator to get each word

BreakIterator boundary=BreakIterator.getWordInstance();

//Give the iterator the sentence

boundary.setText(sentence);

//Find the beginning first word

int start=boundary.first();

//Iterate over each word and emit it to the output stream

for (int end=boundary.next(); end != BreakIterator.DONE; start=end, end=boundary.next()) {

//get the word

String word=sentence.substring(start,end);

//If a word is whitespace characters, replace it with empty

word=word.replaceAll("\\s+","");

//if it's an actual word, emit it

if (!word.equals("")) {

collector.emit(new Values(word));

}

}

}

//Declare that emitted tuples contain a word field

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("word"));

}

}

WordCountWordCount

输入以下命令,以创建并打开新文件 WordCount.java:Enter the command below to create and open a new file WordCount.java:

notepad src\main\java\com\microsoft\example\WordCount.java

将以下 Java 代码复制并粘贴到新文件中。Then copy and paste the java code below into the new file. 然后关闭该文件。Then close the file.

package com.microsoft.example;

import java.util.HashMap;

import java.util.Map;

import java.util.Iterator;

import org.apache.storm.Constants;

import org.apache.storm.topology.BasicOutputCollector;

import org.apache.storm.topology.OutputFieldsDeclarer;

import org.apache.storm.topology.base.BaseBasicBolt;

import org.apache.storm.tuple.Fields;

import org.apache.storm.tuple.Tuple;

import org.apache.storm.tuple.Values;

import org.apache.storm.Config;

// For logging

import org.apache.logging.log4j.Logger;

import org.apache.logging.log4j.LogManager;

//There are a variety of bolt types. In this case, use BaseBasicBolt

public class WordCount extends BaseBasicBolt {

//Create logger for this class

private static final Logger logger = LogManager.getLogger(WordCount.class);

//For holding words and counts

Map counts = new HashMap();

//How often to emit a count of words

private Integer emitFrequency;

// Default constructor

public WordCount() {

emitFrequency=5; // Default to 60 seconds

}

// Constructor that sets emit frequency

public WordCount(Integer frequency) {

emitFrequency=frequency;

}

//Configure frequency of tick tuples for this bolt

//This delivers a 'tick' tuple on a specific interval,

//which is used to trigger certain actions

@Override

public Map getComponentConfiguration() {

Config conf = new Config();

conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequency);

return conf;

}

//execute is called to process tuples

@Override

public void execute(Tuple tuple, BasicOutputCollector collector) {

//If it's a tick tuple, emit all words and counts

if(tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)

&& tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)) {

for(String word : counts.keySet()) {

Integer count = counts.get(word);

collector.emit(new Values(word, count));

logger.info("Emitting a count of " + count + " for word " + word);

}

} else {

//Get the word contents from the tuple

String word = tuple.getString(0);

//Have we counted any already?

Integer count = counts.get(word);

if (count == null)

count = 0;

//Increment the count and store it

count++;

counts.put(word, count);

}

}

//Declare that this emits a tuple containing two fields; word and count

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("word", "count"));

}

}

定义拓扑Define the topology

拓扑将 Spout 和 Bolt 连接在一起,并绘制成图形。The topology ties the spouts and bolts together into a graph. 图形定义了数据在组件之间的流动。The graph defines how data flows between the components. 它还提供 Storm 在群集内创建组件的实例时使用的并行度提示。It also provides parallelism hints that Storm uses when creating instances of the components within the cluster.

下图是此拓扑的组件的基本原理图。The following image is a basic diagram of the graph of components for this topology.

若要实现该拓扑,请输入以下命令,以创建并打开新文件 WordCountTopology.java:To implement the topology, enter the command below to create and open a new file WordCountTopology.java:

notepad src\main\java\com\microsoft\example\WordCountTopology.java

将以下 Java 代码复制并粘贴到新文件中。Then copy and paste the java code below into the new file. 然后关闭该文件。Then close the file.

package com.microsoft.example;

import org.apache.storm.Config;

import org.apache.storm.LocalCluster;

import org.apache.storm.StormSubmitter;

import org.apache.storm.topology.TopologyBuilder;

import org.apache.storm.tuple.Fields;

import com.microsoft.example.RandomSentenceSpout;

public class WordCountTopology {

//Entry point for the topology

public static void main(String[] args) throws Exception {

//Used to build the topology

TopologyBuilder builder = new TopologyBuilder();

//Add the spout, with a name of 'spout'

//and parallelism hint of 5 executors

builder.setSpout("spout", new RandomSentenceSpout(), 5);

//Add the SplitSentence bolt, with a name of 'split'

//and parallelism hint of 8 executors

//shufflegrouping subscribes to the spout, and equally distributes

//tuples (sentences) across instances of the SplitSentence bolt

builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");

//Add the counter, with a name of 'count'

//and parallelism hint of 12 executors

//fieldsgrouping subscribes to the split bolt, and

//ensures that the same word is sent to the same instance (group by field 'word')

builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));

//new configuration

Config conf = new Config();

//Set to false to disable debug information when

// running in production on a cluster

conf.setDebug(false);

//If there are arguments, we are running on a cluster

if (args != null && args.length > 0) {

//parallelism hint to set the number of workers

conf.setNumWorkers(3);

//submit the topology

StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

}

//Otherwise, we are running locally

else {

//Cap the maximum number of executors that can be spawned

//for a component to 3

conf.setMaxTaskParallelism(3);

//LocalCluster is used to run locally

LocalCluster cluster = new LocalCluster();

//submit the topology

cluster.submitTopology("word-count", conf, builder.createTopology());

//sleep

Thread.sleep(10000);

//shut down the cluster

cluster.shutdown();

}

}

}

配置日志记录Configure logging

Storm 使用 Apache Log4j 2 来记录信息。Storm uses Apache Log4j 2 to log information. 如果未配置日志记录,拓扑将发出诊断信息。If you don't configure logging, the topology emits diagnostic information. 若要控制所要记录的内容,请输入以下命令,在 resources 目录中创建名为 log4j2.xml 的文件:To control what is logged, create a file named log4j2.xml in the resources directory by entering the command below:

notepad resources\log4j2.xml

将以下 XML 文本复制并粘贴到新文件中。Then copy and paste the XML text below into the new file. 然后关闭该文件。Then close the file.

此 XML 为 com.microsoft.example 类(其中包含本示例拓扑中的组件)配置一个新记录器。This XML configures a new logger for the com.microsoft.example class, which includes the components in this example topology. 此记录器的级别设置为“跟踪”,可以捕获此拓扑中的组件发出的任何日志记录信息。The level is set to trace for this logger, which captures any logging information emitted by components in this topology.

部分将日志记录的根级别(不在 com.microsoft.example 中的所有内容)配置为只记录错误信息。The section configures the root level of logging (everything not in com.microsoft.example) to only log error information.

备注

Storm 0.10.0 版及更高版本使用 Log4j 2.x。Storm version 0.10.0 and higher use Log4j 2.x. 早期版本的 Storm 使用 Log4j 1.x(为日志配置使用的格式不同)。Older versions of storm used Log4j 1.x, which used a different format for log configuration.

在本地测试拓扑Test the topology locally

保存文件之后,请使用以下命令在本地测试拓扑。After you save the files, use the following command to test the topology locally.

mvn compile exec:java -Dstorm.topology=com.microsoft.example.WordCountTopology

运行该命令时,拓扑显示启动信息。As it runs, the topology displays startup information. 以下文本是单词计数输出的示例:The following text is an example of the word count output:

17:33:27 [Thread-12-count] INFO com.microsoft.example.WordCount - Emitting a count of 56 for word snow

17:33:27 [Thread-12-count] INFO com.microsoft.example.WordCount - Emitting a count of 56 for word white

17:33:27 [Thread-12-count] INFO com.microsoft.example.WordCount - Emitting a count of 112 for word seven

17:33:27 [Thread-16-count] INFO com.microsoft.example.WordCount - Emitting a count of 195 for word the

17:33:27 [Thread-30-count] INFO com.microsoft.example.WordCount - Emitting a count of 113 for word and

17:33:27 [Thread-30-count] INFO com.microsoft.example.WordCount - Emitting a count of 57 for word dwarfs

17:33:27 [Thread-12-count] INFO com.microsoft.example.WordCount - Emitting a count of 57 for word snow

此示例日志指示单词“and”已发出了 113 次。This example log indicates that the word 'and' has been emitted 113 times. 只要拓扑运行,计数就继续增加。The count continues to increase as long as the topology runs. 这种增加是因为 Spout 不断发出相同的句子。This increase is because the spout continuously emits the same sentences.

每两次发出单词和句子的间隔为 5 秒。There's a 5-second interval between emission of words and counts. WordCount 组件配置为仅当 tick 元组到达时才发出信息。The WordCount component is configured to only emit information when a tick tuple arrives. 它要求仅每五秒钟传送一次 tick 元组。It requests that tick tuples are only delivered every five seconds.

将拓扑转换为 FluxConvert the topology to Flux

Flux 是一种新的框架,在 Storm 0.10.0 及更高版本可用。Flux is a new framework available with Storm 0.10.0 and higher. 借助 Flux,你可以将配置与实现分开。Flux allows you to separate configuration from implementation. 组件仍然是以 Java 语言定义的,但拓扑是使用 YAML 文件定义的。Your components are still defined in Java, but the topology is defined using a YAML file. 可以随项目一起打包默认的拓扑定义,也可以在提交拓扑时使用独立的文件。You can package a default topology definition with your project, or use a standalone file when submitting the topology. 将拓扑提交到 Storm 时,使用环境变量或配置文件来填充 YAML 拓扑定义值。When submitting the topology to Storm, use environment variables or configuration files to populate YAML topology definition values.

YAML 文件定义了要用于拓扑的组件以及它们之间的数据流。The YAML file defines the components to use for the topology and the data flow between them. 可以将 YAML 文件包含在 jar 文件中。You can include a YAML file as part of the jar file. 或者,可以使用外部 YAML 文件。Or you can use an external YAML file.

以前,WordCountTopology.java 会定义拓扑,但使用 Flux 时无需这样做。Previously, WordCountTopology.java defined the topology, but isn't needed with Flux. 使用以下命令删除该文件:Delete the file with the following command:

DEL src\main\java\com\microsoft\example\WordCountTopology.java

输入以下命令,以创建并打开新文件 topology.yaml:Enter the command below to create and open a new file topology.yaml:

notepad resources\topology.yaml

将以下文本复制并粘贴到新文件中。Then copy and paste the text below into the new file. 然后关闭该文件。Then close the file.

name: "wordcount" # friendly name for the topology

config: # Topology configuration

topology.workers: 1 # Hint for the number of workers to create

spouts: # Spout definitions

- id: "sentence-spout"

className: "com.microsoft.example.RandomSentenceSpout"

parallelism: 1 # parallelism hint

bolts: # Bolt definitions

- id: "splitter-bolt"

className: "com.microsoft.example.SplitSentence"

parallelism: 1

- id: "counter-bolt"

className: "com.microsoft.example.WordCount"

constructorArgs:

- 10

parallelism: 1

streams: # Stream definitions

- name: "Spout --> Splitter" # name isn't used (placeholder for logging, UI, etc.)

from: "sentence-spout" # The stream emitter

to: "splitter-bolt" # The stream consumer

grouping: # Grouping type

type: SHUFFLE

- name: "Splitter -> Counter"

from: "splitter-bolt"

to: "counter-bolt"

grouping:

type: FIELDS

args: ["word"] # field(s) to group on

输入以下命令打开 pom.xml,并做出下面所述的修改:Enter the command below to open pom.xml to make the described revisions below:

notepad pom.xml

在 节中添加以下新依赖关系:Add the following new dependency in the section:

org.apache.storm

flux-core

${storm.version}

将以下插件添加到 节。Add the following plugin to the section. 此插件处理项目包(jar 文件)的创建,并在创建包时应用一些特定于 Flux 的转换。This plugin handles the creation of a package (jar file) for the project, and applies some transformations specific to Flux when creating the package.

org.apache.maven.plugins

maven-shade-plugin

3.2.1

org.apache.storm.flux.Flux

*:*

META-INF/*.SF

META-INF/*.DSA

META-INF/*.RSA

package

shade

在“Exec Maven Plugin”节中,导航至 > ,并将 ${storm.topology} 更改为 org.apache.storm.flux.Flux。For the Exec Maven Plugin section, navigate to > and change ${storm.topology} to org.apache.storm.flux.Flux. 在开发环境中本地运行拓扑时,Flux 可以使用此设置处理拓扑运行。This setting allows Flux to handle running the topology locally in development.

将以下内容添加到 节中的 。In the section, add the following to . 此 XML 包括了将拓扑定义为项目一部分的 YAML 文件。This XML includes the YAML file that defines the topology as part of the project.

topology.yaml

在本地测试 Flux 拓扑Test the flux topology locally

输入以下命令,以使用 Maven 编译并执行 Flux 拓扑:Enter the following command to compile and execute the Flux topology using Maven:

mvn compile exec:java -Dexec.args="--local -R /topology.yaml"

警告

如果拓扑使用 Storm 1.0.1 位,此命令会失败。If your topology uses Storm 1.0.1 bits, this command fails. 相反,在开发环境中安装 Storm,并按照以下步骤操作:

如果已在开发环境中安装 Storm,则可以改用以下命令:If you have installed Storm in your development environment, you can use the following commands instead:

mvn compile package

storm jar target/WordCount-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local -R /topology.yaml

--local 参数在开发环境中以本地模式运行拓扑。The --local parameter runs the topology in local mode on your development environment. -R /topology.yaml 参数使用 jar 文件中的 topology.yaml 文件资源来定义拓扑。The -R /topology.yaml parameter uses the topology.yaml file resource from the jar file to define the topology.

运行该命令时,拓扑显示启动信息。As it runs, the topology displays startup information. 以下文本是输出的示例:The following text is an example of the output:

17:33:27 [Thread-12-count] INFO com.microsoft.example.WordCount - Emitting a count of 56 for word snow

17:33:27 [Thread-12-count] INFO com.microsoft.example.WordCount - Emitting a count of 56 for word white

17:33:27 [Thread-12-count] INFO com.microsoft.example.WordCount - Emitting a count of 112 for word seven

17:33:27 [Thread-16-count] INFO com.microsoft.example.WordCount - Emitting a count of 195 for word the

17:33:27 [Thread-30-count] INFO com.microsoft.example.WordCount - Emitting a count of 113 for word and

17:33:27 [Thread-30-count] INFO com.microsoft.example.WordCount - Emitting a count of 57 for word dwarfs

记录的信息的批次之间存在10秒的延迟。There's a 10-second delay between batches of logged information.

基于项目创建新的拓扑 yaml。Create a new topology yaml from the project.

输入以下命令打开 topology.xml:Enter the command below to open topology.xml:

notepad resources\topology.yaml

找到以下节,将 10 的值更改为 5。Find the following section and change the value of 10 to 5. 此修改会将发出单词计数批的间隔时间从 10 秒更改为 5 秒。This modification changes the interval between emitting batches of word counts from 10 seconds to 5.

- id: "counter-bolt"

className: "com.microsoft.example.WordCount"

constructorArgs:

- 5

parallelism: 1

将文件另存为 newtopology.yaml。Save file as newtopology.yaml.

若要运行拓扑,请输入以下命令:To run the topology, enter the following command:

mvn exec:java -Dexec.args="--local resources/newtopology.yaml"

或者,如果开发环境中有 Storm,则执行以下操作:Or, if you have Storm on your development environment:

storm jar target/WordCount-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local resources/newtopology.yaml

此命令使用 newtopology.yaml 作为拓扑定义。This command uses the newtopology.yaml as the topology definition. 由于没有包含 compile 参数,Maven 使用前面步骤中生成的项目的版本。Since we didn't include the compile parameter, Maven uses the version of the project built in previous steps.

启动拓扑后,你将发现,发出批的间隔时间已更改,会反映 newtopology.yaml 中的值。Once the topology starts, you should notice that the time between emitted batches has changed to reflect the value in newtopology.yaml. 因此可以看到,无需重新编译拓扑即可通过 YAML 文件更改配置。So you can see that you can change your configuration through a YAML file without having to recompile the topology.

TridentTrident

Trident 是 Storm 提供的高级抽象。Trident is a high-level abstraction that is provided by Storm. 它支持有状态处理。It supports stateful processing. Trident 的主要优点在于,它可以保证进入拓扑的每条消息只会处理一次。The primary advantage of Trident is that it guarantees that every message that enters the topology is processed only once. 如果不使用 Trident,则拓扑只能保证至少将消息处理一次。Without using Trident, your topology can only guarantee that messages are processed at least once. 两者还有其他方面的差异,例如,可以使用内置组件,而无需创建 Bolt。There are also other differences, such as built-in components that can be used instead of creating bolts. 可以使用低泛型组件(例如筛选、投影和函数)来取代 Bolt。Bolts are replaced by less-generic components, such as filters, projections, and functions.

可以使用 Maven 项目来创建 Trident 应用程序。Trident applications can be created by using Maven projects. 使用本文前面所述的相同基本步骤 - 只有代码不同。You use the same basic steps as presented earlier in this article—only the code is different. (当前) 无法与 Flux 框架一起使用。Trident also can't (currently) be used with the Flux framework.

有关 Trident 的详细信息,请参阅 Trident API 概述。For more information about Trident, see the Trident API Overview.

后续步骤Next Steps

已学习如何使用 Java 创建 Apache Storm 拓扑。You've learned how to create an Apache Storm topology by using Java. 接下来,请学习如何:Now learn how to:

You can find more example Apache Storm topologies by visiting Example topologies for Apache Storm on HDInsight.

storm apache java_Apache Storm 示例 Java 拓扑 - Azure HDInsight | Microsoft Docs相关推荐

  1. hadoop创建java项目的步骤_为 Apache Hadoop 创建 Java MapReduce - Azure HDInsight | Microsoft Docs...

    您现在访问的是微软AZURE全球版技术文档网站,若需要访问由世纪互联运营的MICROSOFT AZURE中国区技术文档网站,请访问 https://docs.azure.cn. 为 HDInsight ...

  2. storm apache java_Apache Ignite与Apache Storm(深入)

    Apache Ignite和Apache Storm在很多方面都是两种截然不同的技术 - 特别是因为Storm有一个非常具体的用例,而Ignite在同一个屋檐下有相当多的工具 . 据我了解,Ignit ...

  3. 桌面计算机性能监控系统,监视 Windows 桌面应用的使用情况和性能 - Azure Monitor | Microsoft Docs...

    您现在访问的是微软AZURE全球版技术文档网站,若需要访问由世纪互联运营的MICROSOFT AZURE中国区技术文档网站,请访问 https://docs.azure.cn. 监视经典 Window ...

  4. 微软服务器迁移工具,在 Azure Migrate 中添加迁移工具 - Azure Migrate | Microsoft Docs

    您现在访问的是微软AZURE全球版技术文档网站,若需要访问由世纪互联运营的MICROSOFT AZURE中国区技术文档网站,请访问 https://docs.azure.cn. 添加迁移工具 11/2 ...

  5. 量子计算机epr,量子计算术语表 - Azure Quantum | Microsoft Docs

    量子计算术语表 2021/5/15 本文内容 伴随 某个操作的复共轭转置. 对于实现幺正算子的操作,伴随是该操作的逆操作,用剑号表示. 例如,如果操作 U 表示幺正算子 U$$,则 Adjoint U ...

  6. apache storm视频_Apache Storm安装

    Apache Storm安装 ps:mac上有点问题,抽时间看一下 单机版安装过程. 现在,让我们来看看如何在你的机器上安装Apache Storm框架.这里有三个步骤 -在系统上安装Java,如果你 ...

  7. storm和kafka集成报java.lang.ClassNotFoundException: kafka.api.OffsetRequest解决方法

    添加依赖 <dependency><groupId>org.apache.storm</groupId><artifactId>storm-kafka& ...

  8. java message bus_【Microsoft Azure学习之旅】消息服务Service Bus的学习笔记及Demo示例...

    今年项目组做的是Cloud产品,有幸接触到了云计算的知识,也了解并使用了当今流行的云计算平台Amazon AWS与Microsoft Azure.我们的产品最初只部署在AWS平台上,现在产品决定同时支 ...

  9. 大数据技术之_17_Storm学习_Storm 概述+Storm 基础知识+Storm 集群搭建+Storm 常用 API+Storm 分组策略和并发度

    大数据技术之_17_Storm学习 一 Storm 概述 1.1 离线计算是什么? 1.2 流式计算是什么? 1.3 Storm 是什么? 1.4 Storm 与 Hadoop 的区别 1.5 Sto ...

最新文章

  1. 提升开发效率的一款mybatis开发神器
  2. pandas使用pd.concat纵向合并多个dataframe实战:多个dataframe的纵向合并、为纵向合并的多个dataframe设置标识符指定数据来源
  3. IMO班聊乔月猛:聊天不如聊工作
  4. 多线程的两种实现方式和区别?
  5. 【Python】挑战SQL:图解Pandas的数据合并merge
  6. Nginx + Lua + 共享内存实现动态查询(简单例子)
  7. WeText项目:一个基于.NET实现的DDD、CQRS与微服务架构的演示案例
  8. 创建自定义Maven原型
  9. window 下分linux分区,如何在windows9x下访问linux分区
  10. Flowable通过api查询流程返回流程图节点
  11. 《剑指Offer》 滑动窗口的最大值
  12. Python实现计数排序
  13. 【jquery】find() 方法,filter()方法和children()方法
  14. Linux下的MySQL主主复制
  15. [HDU 1003] Max Sum
  16. Footprint Analytics: 从多个维度带你进入 GameFi 领域
  17. 你们想要的这本硬件书!终于来了!(内含赠书福利)
  18. Pandas数据分析实战(1)——探索Chipotle快餐数据
  19. 前端项目搭建部署全流程(一):搭建React项目
  20. 饥荒联机版连不上服务器_饥荒无法连接klei服务器刷不出服务器解决办法

热门文章

  1. cf 1677 B. Tokitsukaze and Meeting
  2. 快速无损原样提取PDF文档中的图片
  3. Android 听筒模式和扬声器模式切换的 实现
  4. scala中sorted,sortby,sortwith的用法(转)
  5. 修复WIN10下Prolific USB-to-Serial Comm Port驱动无法使用
  6. 2019-04-07 Python之利用PIL改变图片颜色和生成手绘图
  7. 在家月入5q+有手机就能赚米,推荐14个让你在家就能挣米的软件
  8. 高德地图自定义点标记大小_高德地图 自定义点标记 图标大小
  9. [杂记]LeTeX模板——ppt
  10. chrome浏览器安装右键翻译插件