Apache Spark技术实战之1 -- KafkaWordCount
欢迎转载,转载请注明出处,徽沪一郎。
概要
Spark应用开发实践性非常强,很多时候可能都会将时间花费在环境的搭建和运行上,如果有一个比较好的指导将会大大的缩短应用开发流程。Spark Streaming中涉及到和许多第三方程序的整合,源码中的例子如何真正跑起来,文档不是很多也不详细。
本篇主要讲述如何运行KafkaWordCount,这个需要涉及Kafka集群的搭建,还是说的越仔细越好。
搭建Kafka集群
步骤1:下载kafka 0.8.1及解压
wget https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.1.1/kafka_2.10-0.8.1.1.tgz
tar zvxf kafka_2.10-0.8.1.1.tgz
cd kafka_2.10-0.8.1.1
步骤2:启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
步骤3:修改配置文件config/server.properties,添加如下内容
host.name=localhost# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured. Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
advertised.host.name=localhost
步骤4:启动Kafka server
bin/kafka-server-start.sh config/server.properties
步骤5:创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
检验topic创建是否成功
bin/kafka-topics.sh --list --zookeeper localhost:2181
如果正常返回test
步骤6:打开producer,发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
##启动成功后,输入以下内容测试
This is a message
This is another message
步骤7:打开consumer,接收消息
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
###启动成功后,如果一切正常将会显示producer端输入的内容
This is a message
This is another message
运行KafkaWordCount
KafkaWordCount源文件位置 examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
尽管里面有使用说明,见下文,但如果不是事先对Kafka有一定的了解的话,决然不知道这些参数是什么意思,也不知道该如何填写。
/*** Consumes messages from one or more topics in Kafka and does wordcount.* Usage: KafkaWordCount * is a list of one or more zookeeper servers that make quorum* is the name of kafka consumer group* is a list of one or more kafka topics to consume from* is the number of threads the kafka consumer should use** Example:* `$ bin/run-example \* org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03 \* my-consumer-group topic1,topic2 1`*/
object KafkaWordCount {def main(args: Array[String]) {if (args.length < 4) {System.err.println("Usage: KafkaWordCount ")System.exit(1)}StreamingExamples.setStreamingLogLevels()val Array(zkQuorum, group, topics, numThreads) = argsval sparkConf = new SparkConf().setAppName("KafkaWordCount")val ssc = new StreamingContext(sparkConf, Seconds(2))ssc.checkpoint("checkpoint")val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMapval lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)val words = lines.flatMap(_.split(" "))val wordCounts = words.map(x => (x, 1L)).reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)wordCounts.print()ssc.start()ssc.awaitTermination()}
}
讲清楚了写这篇博客的主要原因之后,来看一看该如何运行KafkaWordCount
步骤1:停止运行刚才的kafka-console-producer和kafka-console-consumer
步骤2:运行KafkaWordCountProducer
bin/run-example org.apache.spark.examples.streaming.KafkaWordCountProducer localhost:9092 test 3 5
解释一下参数的意思,localhost:9092表示producer的地址和端口, test表示topic,3表示每秒发多少条消息,5表示每条消息中有几个单词
步骤3:运行KafkaWordCount
bin/run-example org.apache.spark.examples.streaming.KafkaWordCount localhost:2181 test-consumer-group test 1
解释一下参数, localhost:2181表示zookeeper的监听地址,test-consumer-group表示consumer-group的名称,必须和$KAFKA_HOME/config/consumer.properties中的group.id的配置内容一致,test表示topic,1表示线程数。
转载于:https://www.cnblogs.com/hseagle/p/3887507.html
Apache Spark技术实战之1 -- KafkaWordCount相关推荐
- Apache Spark技术实战之6 -- spark-submit常见问题及其解决
除本人同意外,严禁一切转载,徽沪一郎. 概要 编写了独立运行的Spark Application之后,需要将其提交到Spark Cluster中运行,一般会采用spark-submit来进行应用的提交 ...
- Apache Spark 技术团队开源机器学习平台 MLflow
开发四年只会写业务代码,分布式高并发都不会还做程序员? 近日,来自 Databricks 的 Matei Zaharia 宣布推出开源机器学习平台 MLflow .Matei Zaharia 是 ...
- Apache Storm技术实战之3 -- TridentWordCount
欢迎转载,转载请注明出处. 介绍TridentTopology的使用,重点分析newDRPCStream和stateQuery的实现机理. 使用TridentTopology进行数据处理的时候,经常会 ...
- Apache Storm技术实战之2 -- BasicDRPCTopology
欢迎转载,转载请注明出处,徽沪一郎. 本文通过BasicDRPCTopology的实例来分析DRPCTopology在提交的时候, Topology中究竟含有哪些内容? BasicDRPCTopolo ...
- Apache Spark+PyTorch 案例实战
Apache Spark+PyTorch 案例实战 随着数据量和复杂性的不断增长,深度学习是提供大数据预测分析解决方案的理想方法,需要增加计算处理能力和更先进的图形处理器.通过深度学习,能够利用非结 ...
- #Apache Spark系列技术直播# 第六讲【 What's New in Apache Spark 2.4? 】
Apache Spark系列技术直播第六讲 [ What's New in Apache Spark 2.4? ] Abstract(简介): This talk will provide an ov ...
- 多云时代下数据管理技术_建立一个混合的多云数据湖并使用Apache Spark执行数据处理...
多云时代下数据管理技术 Azure / GCP / AWS / Terraform / Spark (Azure/GCP/AWS/Terraform/Spark) Five years back wh ...
- 中国Spark技术峰会(上):Spark与生态圈中组件结合实战
5月13日-15日,由全球最大中文IT社区CSDN主办的"2016中国云计算技术大会"(Cloud Computing Technology Conference 2016,简称C ...
- Spark机器学习实战 (十二) - 推荐系统实战
0 相关源码 将结合前述知识进行综合实战,以达到所学即所用.在推荐系统项目中,讲解了推荐系统基本原理以及实现推荐系统的架构思路,有其他相关研发经验基础的同学可以结合以往的经验,实现自己的推荐系统. 1 ...
- Spark入门实战系列--8.Spark MLlib(上)--机器学习及SparkMLlib简介
[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.机器学习概念 1.1 机器学习的定义 在维基百科上对机器学习提出以下几种定义: l&qu ...
最新文章
- 《统计学习方法》-李航、《机器学习-西瓜书》-周志华总结+Python代码连载(一)--模型选择+误差评估
- 【转】 [C/OC的那点事儿]NSMutableArray排序的三种实现(依赖学生成绩管理系统).
- ubuntu16.04安装php5出现Package php5 have no installation candidate,解决方法
- vim之好用插件-ctrlp.vim
- 纪念第一次青海湖之行泡汤
- JavaScript的10种跨域共享的方法
- 梯度下降、随机梯度下降、方差减小的梯度下降(matlab实现)
- mysql,jdbc、连接池
- JOIN查询流程与驱动表
- 小程序源码:老人疯狂裂变引流视频推广
- 李白打酒 递归 C语言
- vscode打开项目从中文界面变成英文界面的问题
- CEF 、chromium源码下载前相关代理配置
- 考研政治(一)马克思原理
- 2月12日第壹简报,星期日,农历正月廿二
- Lattice FPGA ecp5u mico32软核使用
- 视听说教程(第三版)4 quiz 3
- 梦三国2显示服务器失败,windows10系统下梦三国2无法全屏如何解决
- 被判还钱5亿多 乐视移动年底被催账年关难过
- 16 种设计思想 – Design for failure
热门文章
- 高德地图显示多个气泡_arcgis api 4.x for js 地图加载多个气泡窗口展示(附源码下载)...
- linux查看cpt硬盘命令,Linux基础知识复习之命令篇
- vue host配置_从零开始部署一个 vue 项目
- java http 工具类_Java发送Http请求工具类
- 计算机制作乘法表格,excel表格乘法怎么用,excel表格怎么算乘法
- python3捕获异常_Python 异常处理和捕获信息教程|python3教程|python入门|python教程
- java - day006 - 构造方法
- php 数据库备份还原
- [ASP.NET AJAX]Function对象及Type类的方法介绍
- java基础-final