欢迎转载,转载请注明出处,徽沪一郎。

概要

Spark应用开发实践性非常强,很多时候可能都会将时间花费在环境的搭建和运行上,如果有一个比较好的指导将会大大的缩短应用开发流程。Spark Streaming中涉及到和许多第三方程序的整合,源码中的例子如何真正跑起来,文档不是很多也不详细。

本篇主要讲述如何运行KafkaWordCount,这个需要涉及Kafka集群的搭建,还是说的越仔细越好。

搭建Kafka集群

步骤1:下载kafka 0.8.1及解压

wget https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.1.1/kafka_2.10-0.8.1.1.tgz
tar zvxf kafka_2.10-0.8.1.1.tgz
cd kafka_2.10-0.8.1.1

步骤2:启动zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

步骤3:修改配置文件config/server.properties,添加如下内容

host.name=localhost# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured.  Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
advertised.host.name=localhost

步骤4:启动Kafka server

bin/kafka-server-start.sh config/server.properties

步骤5:创建topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1  --topic test

检验topic创建是否成功

bin/kafka-topics.sh --list --zookeeper localhost:2181

如果正常返回test

步骤6:打开producer,发送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
##启动成功后,输入以下内容测试
This is a message
This is another message

步骤7:打开consumer,接收消息

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
###启动成功后,如果一切正常将会显示producer端输入的内容
This is a message
This is another message

运行KafkaWordCount

KafkaWordCount源文件位置 examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala

尽管里面有使用说明,见下文,但如果不是事先对Kafka有一定的了解的话,决然不知道这些参数是什么意思,也不知道该如何填写

/*** Consumes messages from one or more topics in Kafka and does wordcount.* Usage: KafkaWordCount    *    is a list of one or more zookeeper servers that make quorum*    is the name of kafka consumer group*    is a list of one or more kafka topics to consume from*    is the number of threads the kafka consumer should use** Example:*    `$ bin/run-example \*      org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03 \*      my-consumer-group topic1,topic2 1`*/
object KafkaWordCount {def main(args: Array[String]) {if (args.length < 4) {System.err.println("Usage: KafkaWordCount    ")System.exit(1)}StreamingExamples.setStreamingLogLevels()val Array(zkQuorum, group, topics, numThreads) = argsval sparkConf = new SparkConf().setAppName("KafkaWordCount")val ssc =  new StreamingContext(sparkConf, Seconds(2))ssc.checkpoint("checkpoint")val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMapval lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)val words = lines.flatMap(_.split(" "))val wordCounts = words.map(x => (x, 1L)).reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)wordCounts.print()ssc.start()ssc.awaitTermination()}
}

讲清楚了写这篇博客的主要原因之后,来看一看该如何运行KafkaWordCount

步骤1:停止运行刚才的kafka-console-producer和kafka-console-consumer

步骤2:运行KafkaWordCountProducer

bin/run-example org.apache.spark.examples.streaming.KafkaWordCountProducer localhost:9092 test 3 5

解释一下参数的意思,localhost:9092表示producer的地址和端口, test表示topic,3表示每秒发多少条消息,5表示每条消息中有几个单词

步骤3:运行KafkaWordCount

 bin/run-example org.apache.spark.examples.streaming.KafkaWordCount localhost:2181 test-consumer-group test 1

解释一下参数, localhost:2181表示zookeeper的监听地址,test-consumer-group表示consumer-group的名称,必须和$KAFKA_HOME/config/consumer.properties中的group.id的配置内容一致,test表示topic,1表示线程数。

转载于:https://www.cnblogs.com/hseagle/p/3887507.html

Apache Spark技术实战之1 -- KafkaWordCount相关推荐

  1. Apache Spark技术实战之6 -- spark-submit常见问题及其解决

    除本人同意外,严禁一切转载,徽沪一郎. 概要 编写了独立运行的Spark Application之后,需要将其提交到Spark Cluster中运行,一般会采用spark-submit来进行应用的提交 ...

  2. Apache Spark 技术团队开源机器学习平台 MLflow

    开发四年只会写业务代码,分布式高并发都不会还做程序员?   近日,来自 Databricks 的 Matei Zaharia 宣布推出开源机器学习平台 MLflow .Matei Zaharia 是 ...

  3. Apache Storm技术实战之3 -- TridentWordCount

    欢迎转载,转载请注明出处. 介绍TridentTopology的使用,重点分析newDRPCStream和stateQuery的实现机理. 使用TridentTopology进行数据处理的时候,经常会 ...

  4. Apache Storm技术实战之2 -- BasicDRPCTopology

    欢迎转载,转载请注明出处,徽沪一郎. 本文通过BasicDRPCTopology的实例来分析DRPCTopology在提交的时候, Topology中究竟含有哪些内容? BasicDRPCTopolo ...

  5. Apache Spark+PyTorch 案例实战

    Apache Spark+PyTorch 案例实战  随着数据量和复杂性的不断增长,深度学习是提供大数据预测分析解决方案的理想方法,需要增加计算处理能力和更先进的图形处理器.通过深度学习,能够利用非结 ...

  6. #Apache Spark系列技术直播# 第六讲【 What's New in Apache Spark 2.4? 】

    Apache Spark系列技术直播第六讲 [ What's New in Apache Spark 2.4? ] Abstract(简介): This talk will provide an ov ...

  7. 多云时代下数据管理技术_建立一个混合的多云数据湖并使用Apache Spark执行数据处理...

    多云时代下数据管理技术 Azure / GCP / AWS / Terraform / Spark (Azure/GCP/AWS/Terraform/Spark) Five years back wh ...

  8. 中国Spark技术峰会(上):Spark与生态圈中组件结合实战

    5月13日-15日,由全球最大中文IT社区CSDN主办的"2016中国云计算技术大会"(Cloud Computing Technology Conference 2016,简称C ...

  9. Spark机器学习实战 (十二) - 推荐系统实战

    0 相关源码 将结合前述知识进行综合实战,以达到所学即所用.在推荐系统项目中,讲解了推荐系统基本原理以及实现推荐系统的架构思路,有其他相关研发经验基础的同学可以结合以往的经验,实现自己的推荐系统. 1 ...

  10. Spark入门实战系列--8.Spark MLlib(上)--机器学习及SparkMLlib简介

    [注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.机器学习概念 1.1 机器学习的定义 在维基百科上对机器学习提出以下几种定义: l&qu ...

最新文章

  1. 《统计学习方法》-李航、《机器学习-西瓜书》-周志华总结+Python代码连载(一)--模型选择+误差评估
  2. 【转】 [C/OC的那点事儿]NSMutableArray排序的三种实现(依赖学生成绩管理系统).
  3. ubuntu16.04安装php5出现Package php5 have no installation candidate,解决方法
  4. vim之好用插件-ctrlp.vim
  5. 纪念第一次青海湖之行泡汤
  6. JavaScript的10种跨域共享的方法
  7. 梯度下降、随机梯度下降、方差减小的梯度下降(matlab实现)
  8. mysql,jdbc、连接池
  9. JOIN查询流程与驱动表
  10. 小程序源码:老人疯狂裂变引流视频推广
  11. 李白打酒 递归 C语言
  12. vscode打开项目从中文界面变成英文界面的问题
  13. CEF 、chromium源码下载前相关代理配置
  14. 考研政治(一)马克思原理
  15. 2月12日第壹简报,星期日,农历正月廿二
  16. Lattice FPGA ecp5u mico32软核使用
  17. 视听说教程(第三版)4 quiz 3
  18. 梦三国2显示服务器失败,windows10系统下梦三国2无法全屏如何解决
  19. 被判还钱5亿多 乐视移动年底被催账年关难过
  20. 16 种设计思想 – Design for failure

热门文章

  1. 高德地图显示多个气泡_arcgis api 4.x for js 地图加载多个气泡窗口展示(附源码下载)...
  2. linux查看cpt硬盘命令,Linux基础知识复习之命令篇
  3. vue host配置_从零开始部署一个 vue 项目
  4. java http 工具类_Java发送Http请求工具类
  5. 计算机制作乘法表格,excel表格乘法怎么用,excel表格怎么算乘法
  6. python3捕获异常_Python 异常处理和捕获信息教程|python3教程|python入门|python教程
  7. java - day006 - 构造方法
  8. php 数据库备份还原
  9. [ASP.NET AJAX]Function对象及Type类的方法介绍
  10. java基础-final