2019独角兽企业重金招聘Python工程师标准>>>

maven依赖:

<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.0</version>
            <scope>provided</scope>            
        </dependency>

<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.1.0</version>
            <scope>provided</scope>
        </dependency>
        
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.1.0</version>
            <scope>provided</scope>
        </dependency>
        
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-spark-20_2.11</artifactId>
            <version>6.2.0</version>
            <exclusions>
                <exclusion>
                    <artifactId>log4j-over-slf4j</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>            
         </dependency>

代码:

package com.suning.sevs.bussiness

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils
import kafka.serializer.StringDecoder
import org.slf4j.LoggerFactory
import org.elasticsearch.spark._

//测试kafka
object testKafka {
  def main(args: Array[String]): Unit = {
    val logger = LoggerFactory.getLogger(testKafka.getClass)

val sparkconf = new SparkConf().setAppName("testKafka ")
      .set("HADOOP_USER_NAME", “user”)
      .set("HADOOP_GROUP_NAME", "user")
            .set("es.nodes", "10.10.2.1,10.10.2.2")
            .set("es.port", "9900")

//     val spark = SparkSession
    //                 .builder
    //                 .appName("testKafka")
    //                 .config(sparkconf)
    //                 .getOrCreate()
    //       import spark.implicits._
    //        val topic = spark.readStream.format("kafka")
    //     .option("kafka.bootstrap.servers", "10.10.1.245:9092,10.10.1.246:9092")
    //    .option("subscribe", "mytopic")     
    //    .option("startingOffsets", "latest")  
    //    .option("minPartitions", "2") 
    //    .load()
    //    
    //    val query=topic.writeStream.format("console").outputMode(OutputMode.Append()).start()

val ssc = new StreamingContext(sparkconf, Seconds(1))

val topicsSet = "mytopic".split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> "10.10.1.245:9092,10.10.1.246:9092")
    val directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)

val lines = directKafkaStream.map(_._2)

lines.foreachRDD(rdd=>{
      
      val esRdd=rdd.map(line=>{
        Map("sys"->line,"mycode" -> "1")
      }
      )
      esRdd.saveToEs("indexName/typeName")
        
    })
  
    // Start the computation
    ssc.start()
    ssc.awaitTermination()

}

}

转载于:https://my.oschina.net/u/778683/blog/2996104

Spark2.0流式处理读Kafka并写ES相关推荐

  1. 流式计算之kafka Stream

    流式计算之kafka Stream 概念 一般流式计算会与批量计算相比较.在流式计算模型中,输入是持续的,可以认为在时间上是无界的,也就意味着,永远拿不到全量数据去做计算.同时,计算结果是持续输出的, ...

  2. 流式计算新贵Kafka Stream设计详解--转

    原文地址:https://mp.weixin.qq.com/s?__biz=MzA5NzkxMzg1Nw==&mid=2653162822&idx=1&sn=8c4611436 ...

  3. TDengine3.0流式计算引擎语法规则介绍

    小 T 导读:TDengine 3.0 引入了全新的流式计算引擎,既支持时间驱动的流式计算,也支持事件驱动的流式计算.本文将对新的流式计算引擎的语法规则进行详细介绍,方便开发者及企业使用. TDeng ...

  4. 124.Spark2Streaming读Kafka并写数据到Kudu

    环境介绍 非Kerberos环境 CM和CDH均为:5.15 准备环境 Spark2Streaming示例 pom.xml依赖 # 使用maven创建scala语言的spark2demo工程 < ...

  5. 大数据凉了?No,流式计算浪潮才刚刚开始!

    AI 前线导读:本文重点讨论了大数据系统发展的历史轨迹,行文轻松活泼,内容通俗易懂,是一篇茶余饭后用来作为大数据谈资的不严肃说明文.本文翻译自<Streaming System>最后一章& ...

  6. Java8 新特性之流式数据处理(转)

    转自:https://www.cnblogs.com/shenlanzhizun/p/6027042.html 一. 流式处理简介 在我接触到java8流式处理的时候,我的第一感觉是流式处理让集合操作 ...

  7. html5语音听写流式,iOS 讯飞语音听写(流式版)

    最近项目中用到了讯飞的语音识别,然后稍微看了一下,里面有几个值得注意的点,记录一下,先说语音听写(流式版),实时语音转写后期会附上 ,文末有 demo //语音听写(流式版) 语音听写流式版其实没设么 ...

  8. Java8 :流式数据处理

    java8的流式处理极大了简化我们对于集合.数组等结构的操作,让我们可以以函数式的思想去操作,本篇文章将探讨java8的流式数据处理的基本使用. 一. 流式处理简介 在我接触到java8流式处理的时候 ...

  9. 移动web基础:视口(viewport),流式布局 JDM京东移动端开发

    目标 能够理解视口的概念并进行视口的设置 能够说出流式布局的基本布局特征 能够使用 2倍图进行页面开发 能够实现 京东首页的 头部布局 移动web基础 移动端调试问题 模拟器调试 真机调试:使用手机进 ...

最新文章

  1. 关于手机的完美ROOT和一些问题的解决【OPPOx903亲测通过】
  2. Java 抛异常的两种方法
  3. 推荐两款 GTD 工具
  4. 获取周一_周一个股精选:光伏概念、央企改革:太阳能(000591)
  5. 如何去掉自动弹出IE9介绍页
  6. 压缩感知及应用 源代码_信言动态|学院成功举办2019年机器学习与压缩感知理论及其应用研讨会...
  7. 专利与论文-5:《专利说明书》的撰写与注意事项
  8. 天下武功唯快不破 实验吧(小结)
  9. 基因表达谱热图的绘制
  10. element-UI设置背景色和边框色
  11. Oracle数据库启动与关闭
  12. 芯片ECO的种类和修复方法介绍
  13. 雾霾“倾国倾城” 谣言肆虐你中招了么?
  14. 句柄的本质——拨乱反正篇 (转)
  15. java的网络协议学习_协议简史:如何学习网络协议?
  16. 录屏程序之屏幕实时录制保存成AVI视频文件
  17. 更好的子表单数据填充方式
  18. 高考录取查询一直显示服务器错误什么情况,今年高考再出“屏蔽生”,查分页面没成绩,反应过来坐等985录取...
  19. Matlab与外部接口:MAT文件基础
  20. vue实时显示日期时间星期几

热门文章

  1. native封装卡片 react_自己动手封装一个React Native多级联动
  2. git 克隆远端分支,关联到本地,修改代码并提交到远程分支
  3. 移植开源QT软件-SameGame
  4. 支付系统整体设计:整体架构设计以及注意要点(一)
  5. SpringBoot开发案例之CountDownLatch多任务并行处理
  6. Eclipse上Maven环境配置使用
  7. 004,配置文件详解:Properties和YAML
  8. SpringBoot 配置文件 application.properties(二)
  9. Yii2的MVC新特性
  10. yii 2.0 代码阅读 小记