pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.kafkaspace</groupId><artifactId>kafkaWorkspace</artifactId><version>1.0-SNAPSHOT</version><build><sourceDirectory>src/main/scala</sourceDirectory><testSourceDirectory>src/test/scala</testSourceDirectory><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_dependencies</arg></args></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass></mainClass></transformer></transformers></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>6</source><target>6</target></configuration></plugin></plugins></build><!--设置依赖版本号--><properties><scala.version>2.11.8</scala.version><hadoop.version>2.7.4</hadoop.version><spark.version>2.3.2</spark.version></properties><dependencies><!--Scala--><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><!--Spark--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>${spark.version}</version></dependency><!--Hadoop--><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.3.2</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.46</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.0.0</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><version>2.0.0</version></dependency></dependencies>
</project>

LogProcessor.java

import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import java.util.HashMap;public class LogProcessor implements Processor<byte[], byte[]> {private ProcessorContext processorContext;@Overridepublic void init(ProcessorContext processorContext) {this.processorContext = processorContext;}@Overridepublic void process(byte[] key, byte[] value) {String inputOri = new String(value);HashMap<String, Integer>map = new HashMap<String, Integer>();int times = 1;if (inputOri.contains(" ")){//截取字段String[] words = inputOri.split(" ");for (String word:words){if (map.containsKey(word)){map.put(word, map.get(word)+1);}else {map.put(word, times);}}}inputOri = map.toString();processorContext.forward(key, inputOri.getBytes());}@Overridepublic void close() {}
}

App.java

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;import java.util.Properties;public class App {public static void main(String[] args) {//声明来源主题String fromTopic = "testStreams1";//声明目标主题String toTopic = "testStreams2";//设置KafkaStreams参数信息Properties props = new Properties();props.put(StreamsConfig.APPLICATION_ID_CONFIG, "logProcessor");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop01:9092,hadoop02:9092,hadoop03:9092");//实例化StreamsConfig对象StreamsConfig config = new StreamsConfig(props);//创建拓扑结构Topology topology = new Topology();//添加处理节点,为源处理节点指定名称和它订阅的主题topology.addSource("SOURCE", fromTopic)//添加自定义处理节点,指定处理器类和上一节点的名称.addProcessor("PROCESSOR", new ProcessorSupplier() {@Overridepublic Processor get() {return new LogProcessor();}}, "SOURCE")//添加目标处理节点,需要指定目标处理节点和上一节点的名称.addSink("SINK", toTopic, "PROCESSOR");//实例化KafkaStreams对象KafkaStreams streams = new KafkaStreams(topology, config);streams.start();}
}

各节点启动kafka和zookeeper集群

在hadoop01中创建两个主题

kafka-topics.sh --create \
--topic testStreams1 \
--partitions 3 \
--replication-factor 2 \
--zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181
kafka-topics.sh --create \
--topic testStreams2 \
--partitions 3 \
--replication-factor 2 \
--zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181

hadoop01中启动生产者服务

kafka-console-producer.sh \
--broker-list hadoop01:9092,hadoop02:9092,hadoop03:9092 \
--topic testStreams1

Hadoop02中启动消费者服务

kafka-console-consumer.sh \
--from-beginning \
--topic testStreams2 \
--bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092

运行App.java,在生产者服务中输入内容,统计后将在消费者中输出。

Kafka Streams开发单词计数应用相关推荐

  1. 学习笔记Flink(五)—— Flink开发环境配置及运行实例(单词计数)

    一.Intellij IDEA 环境配置 1.创建Maven工程 1.1.开发环境 Maven && JDK 1.2.Pom配置 Compiler Configuration 在pom ...

  2. java kafkastream_手把手教你写Kafka Streams程序

    一. 设置Maven项目 我们将使用Kafka Streams Maven Archetype来创建Streams项目结构: mvn archetype:generate \ -DarchetypeG ...

  3. kafka Streams

    目录 一.简介 1.概述 2.批处理和流计算 3.Kafka Streams介绍 特点 概念介绍 二.Kafka Streams示例 1.单词统计 2.求和 3.窗口操作 一.简介 1.概述 Kafk ...

  4. Kafka Streams(三十)

    Kafka Streams Kafka 一直被认为是一个强大的消息中间件,它实现了高吞吐.高可用和低延时的消息传输能力,这让它成为流式处理系统中完美的数据来源.目前通用的一些流式处理框架如 Apach ...

  5. Kafka Streams流式原理解析

    前言 本篇文章会从Kafka的核心流式计算原理进行分析,Kafka Streams Low-level processor API 和 核心概念,以及常见的应用场景分析 流式计算 通过业务场景去分析流 ...

  6. 大数据【四】MapReduce(单词计数;二次排序;计数器;join;分布式缓存)

       前言: 根据前面的几篇博客学习,现在可以进行MapReduce学习了.本篇博客首先阐述了MapReduce的概念及使用原理,其次直接从五个实验中实践学习(单词计数,二次排序,计数器,join,分 ...

  7. MIT自然语言处理第二讲:单词计数(第三、四部分)

    MIT自然语言处理第二讲:单词计数(第三部分) 自然语言处理:单词计数 Natural Language Processing: (Simple) Word Counting 作者:Regina Ba ...

  8. 云计算情报局预告|告别 Kafka Streams,让轻量级流处理更加简单

    作者:不周 关键词:Kafka ETL,高弹性.免运维.低成本 阿里云消息队列 Kafka 版提供兼容 Apache Kafka 生态的全托管服务,彻底解决开源产品长期的痛点,是大数据生态中不可或缺的 ...

  9. Confluent Platform 3.0支持使用Kafka Streams实现实时的数据处理(最新版已经是3.1了,支持kafka0.10了)...

    来自 Confluent 的 Confluent Platform 3.0 消息系统支持使用 Kafka Streams 实现实时的数据处理,这家公司也是在背后支撑 Apache Kafka 消息框架 ...

最新文章

  1. Python教程WEB安全篇
  2. java e次方_java基础知识
  3. python:dataframe保存成csv文件和读取
  4. 从 2017 ChinaJoy 谈起,中国游戏如何数据化前行
  5. 技术实践 | 网易云信 QUIC 加速服务架构与实践
  6. Mysql配置优化浅谈
  7. springboot主要注解及其作用
  8. linux 命令速查手册之十
  9. 【实战】使用Job来修改Transform
  10. chromium关闭更新_你的Win10系统20H2了吗此乃Win10年度最靠谱的更新还有Win10优化大师助阵...
  11. Exsi6.5修改主机密码
  12. vscode右键没有open in browser
  13. 高数测试——3.29
  14. MAC修改.bashrc/.bash_profile无效,默认的用户配置文件是.zshrc,
  15. iOS 15 更新,图标改版
  16. 云和大数据,铺就宁夏特色“信息高速路”
  17. VC(Visual Studio C++)虚拟键VK值列表
  18. linux打开python3_号外:RIDE 可以在 linux+python3 的环境中运行啦!
  19. 燃爆朋友圈!中国设计师携手,用海报为武汉加油!
  20. DYNAMIC MOVEMENT PRIMITIVES PART 1: THE BASICS

热门文章

  1. c++ getline()详解
  2. HG30-3B型多功能校准仪
  3. 搬砖:数据结构之链表基本操作总结
  4. Ping IP时出现 request time out怎么解决?
  5. Pose for Everything: Towards Category-Agnostic Pose Estimation 阅读笔记
  6. Navicat Premium 12.0.22安装与激活
  7. input框点击时去掉默认的外层边框
  8. word2013表格文字上下居中
  9. 移动架构之MVP框架
  10. 构建kd树和kd树的搜索