Spark2.0流式处理读Kafka并写ES
2019独角兽企业重金招聘Python工程师标准>>>
maven依赖:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.11</artifactId>
<version>6.2.0</version>
<exclusions>
<exclusion>
<artifactId>log4j-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
代码:
package com.suning.sevs.bussiness
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils
import kafka.serializer.StringDecoder
import org.slf4j.LoggerFactory
import org.elasticsearch.spark._
//测试kafka
object testKafka {
def main(args: Array[String]): Unit = {
val logger = LoggerFactory.getLogger(testKafka.getClass)
val sparkconf = new SparkConf().setAppName("testKafka ")
.set("HADOOP_USER_NAME", “user”)
.set("HADOOP_GROUP_NAME", "user")
.set("es.nodes", "10.10.2.1,10.10.2.2")
.set("es.port", "9900")
// val spark = SparkSession
// .builder
// .appName("testKafka")
// .config(sparkconf)
// .getOrCreate()
// import spark.implicits._
// val topic = spark.readStream.format("kafka")
// .option("kafka.bootstrap.servers", "10.10.1.245:9092,10.10.1.246:9092")
// .option("subscribe", "mytopic")
// .option("startingOffsets", "latest")
// .option("minPartitions", "2")
// .load()
//
// val query=topic.writeStream.format("console").outputMode(OutputMode.Append()).start()
val ssc = new StreamingContext(sparkconf, Seconds(1))
val topicsSet = "mytopic".split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> "10.10.1.245:9092,10.10.1.246:9092")
val directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
val lines = directKafkaStream.map(_._2)
lines.foreachRDD(rdd=>{
val esRdd=rdd.map(line=>{
Map("sys"->line,"mycode" -> "1")
}
)
esRdd.saveToEs("indexName/typeName")
})
// Start the computation
ssc.start()
ssc.awaitTermination()
}
}
转载于:https://my.oschina.net/u/778683/blog/2996104
Spark2.0流式处理读Kafka并写ES相关推荐
- 流式计算之kafka Stream
流式计算之kafka Stream 概念 一般流式计算会与批量计算相比较.在流式计算模型中,输入是持续的,可以认为在时间上是无界的,也就意味着,永远拿不到全量数据去做计算.同时,计算结果是持续输出的, ...
- 流式计算新贵Kafka Stream设计详解--转
原文地址:https://mp.weixin.qq.com/s?__biz=MzA5NzkxMzg1Nw==&mid=2653162822&idx=1&sn=8c4611436 ...
- TDengine3.0流式计算引擎语法规则介绍
小 T 导读:TDengine 3.0 引入了全新的流式计算引擎,既支持时间驱动的流式计算,也支持事件驱动的流式计算.本文将对新的流式计算引擎的语法规则进行详细介绍,方便开发者及企业使用. TDeng ...
- 124.Spark2Streaming读Kafka并写数据到Kudu
环境介绍 非Kerberos环境 CM和CDH均为:5.15 准备环境 Spark2Streaming示例 pom.xml依赖 # 使用maven创建scala语言的spark2demo工程 < ...
- 大数据凉了?No,流式计算浪潮才刚刚开始!
AI 前线导读:本文重点讨论了大数据系统发展的历史轨迹,行文轻松活泼,内容通俗易懂,是一篇茶余饭后用来作为大数据谈资的不严肃说明文.本文翻译自<Streaming System>最后一章& ...
- Java8 新特性之流式数据处理(转)
转自:https://www.cnblogs.com/shenlanzhizun/p/6027042.html 一. 流式处理简介 在我接触到java8流式处理的时候,我的第一感觉是流式处理让集合操作 ...
- html5语音听写流式,iOS 讯飞语音听写(流式版)
最近项目中用到了讯飞的语音识别,然后稍微看了一下,里面有几个值得注意的点,记录一下,先说语音听写(流式版),实时语音转写后期会附上 ,文末有 demo //语音听写(流式版) 语音听写流式版其实没设么 ...
- Java8 :流式数据处理
java8的流式处理极大了简化我们对于集合.数组等结构的操作,让我们可以以函数式的思想去操作,本篇文章将探讨java8的流式数据处理的基本使用. 一. 流式处理简介 在我接触到java8流式处理的时候 ...
- 移动web基础:视口(viewport),流式布局 JDM京东移动端开发
目标 能够理解视口的概念并进行视口的设置 能够说出流式布局的基本布局特征 能够使用 2倍图进行页面开发 能够实现 京东首页的 头部布局 移动web基础 移动端调试问题 模拟器调试 真机调试:使用手机进 ...
最新文章
- 关于手机的完美ROOT和一些问题的解决【OPPOx903亲测通过】
- Java 抛异常的两种方法
- 推荐两款 GTD 工具
- 获取周一_周一个股精选:光伏概念、央企改革:太阳能(000591)
- 如何去掉自动弹出IE9介绍页
- 压缩感知及应用 源代码_信言动态|学院成功举办2019年机器学习与压缩感知理论及其应用研讨会...
- 专利与论文-5:《专利说明书》的撰写与注意事项
- 天下武功唯快不破 实验吧(小结)
- 基因表达谱热图的绘制
- element-UI设置背景色和边框色
- Oracle数据库启动与关闭
- 芯片ECO的种类和修复方法介绍
- 雾霾“倾国倾城” 谣言肆虐你中招了么?
- 句柄的本质——拨乱反正篇 (转)
- java的网络协议学习_协议简史:如何学习网络协议?
- 录屏程序之屏幕实时录制保存成AVI视频文件
- 更好的子表单数据填充方式
- 高考录取查询一直显示服务器错误什么情况,今年高考再出“屏蔽生”,查分页面没成绩,反应过来坐等985录取...
- Matlab与外部接口:MAT文件基础
- vue实时显示日期时间星期几
热门文章
- native封装卡片 react_自己动手封装一个React Native多级联动
- git 克隆远端分支,关联到本地,修改代码并提交到远程分支
- 移植开源QT软件-SameGame
- 支付系统整体设计:整体架构设计以及注意要点(一)
- SpringBoot开发案例之CountDownLatch多任务并行处理
- Eclipse上Maven环境配置使用
- 004,配置文件详解:Properties和YAML
- SpringBoot 配置文件 application.properties(二)
- Yii2的MVC新特性
- yii 2.0 代码阅读 小记