文章目录

  • 1. spark 批量写入es
  • 2. java-spark写入elasticsearch
  • 3. es_hadoop的源码拓展
    • 1. MyEsSparkSQL
    • 2. MyEsDataFrameWriter

1. spark 批量写入es

正常情况下,我们的spark任务有写入es的需求的时候,我们都是使用ES_Hadoop。参考官方的这里,选择适合自己的版本,如果是hive,spark等都有用到的话可以直接配置

<dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch-hadoop</artifactId><version>7.1.1</version>
</dependency>

因为我们这里只是用到了spark,spark的版本是2.3 , scale 是2.11 ,elasticsearch是7.1.1所以只引入spark的包即可。

<dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch-spark-20_2.11</artifactId><version>7.1.1</version></dependency>

2. java-spark写入elasticsearch

java写入es的代码可以这样

@Data
public class UserProfileRecord {public String uid;public String want_val;
}
 SparkConf sparkConf = new SparkConf().setAppName(JOB_NAME).set(ConfigurationOptions.ES_NODES, esHost).set(ConfigurationOptions.ES_PORT, esPort).set(ConfigurationOptions.ES_NET_HTTP_AUTH_USER, esUser).set(ConfigurationOptions.ES_NET_HTTP_AUTH_PASS, esPass).set(ConfigurationOptions.ES_BATCH_SIZE_ENTRIES, "500").set(ConfigurationOptions.ES_MAPPING_ID, "uid");SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();Dataset<Row> wantedCols = sparkSession.read().parquet(path);Dataset<UserProfileRecord> searchUserProfile = wantedCols.mapPartitions(new MapPartitionsFunction<Row, UserProfileRecord>() {@Overridepublic Iterator<UserProfileRecord> call(Iterator<Row> input) throws Exception {List<UserProfileRecord> cleanProfileList = new LinkedList<>();while (input.hasNext()) {UserProfileRecord aRecord = new UserProfileRecord();.........cleanProfileList.add(aRecord);}return cleanProfileList.iterator();}}, Encoders.bean(UserProfileRecord.class));EsSparkSQL.saveToEs(searchUserProfile.repartition(3), this.writeIndex);

  这里因为es当前只有3个节点,所以用了一个repartition来将写入es的task数变成3个,减小对es的压力,在实际的使用过程中主片的写入速度能够达到平均3w/s,但是当任务产出的数据量比较大的时候写入的时间会比较长,还是会对当前的es集群产生比较大的影响,导致部分查询超时。
  查找了很多官方的文档,发现能够调整的很有限,一般都是调整partition的数量和ConfigurationOptions.ES_BATCH_SIZE_ENTRIES 来throttle写入es的速度。我这边各种试探,收效甚微。
  本来想用elasticsearch的java-client直接做rest请求的(这样就可以控制速速了),但是翻了一下es_hadoop的源码,发现她用的是tranport-client(是es内部通信使用的基于tcp的协议封装)那肯定比http类型的rest更高效啊,而且还有很多partition和es索引的replica的映射关系,想着应该是做了很多优化。所以还是用es_hadoop来做吧,没有办法了,只能看看改改源码了。

3. es_hadoop的源码拓展

增加了两个scala文件(强上scala

spark写入elasticsearch限流相关推荐

  1. spark写入数据到elasticsearch

    本例中将DataFrame写入ES,之后利用kibana进行统计展示. import com.hm.util.SparkHelper import org.apache.spark.sql.{Data ...

  2. 智慧出行/spark Streaming-Dstream流优化:1.消费并行度,2.序列化,3.限流,压背,冷启4.cpu空转时间,5.不要在代码中判断这个表是否存在,6.推测执行7.开启动态资源分配

    1.设置合理的消费并行度 最优的方案是:kafka分区数:broker *3/6/9 kafka分区能不能增加,能不能减少? kafka分区数是可以增加的,但是不能减少 2.序列化 java的序列化, ...

  3. Spark Streaming + Elasticsearch构建App异常监控平台

    本文已发表在<程序员>杂志2016年10月期. 如果在使用App时遇到闪退,你可能会选择卸载App.到应用商店怒斥开发者等方式来表达不满.但开发者也同样感到头疼,因为崩溃可能意味着用户流失 ...

  4. ES限流导致ES数据更新不及时问题

    目录 一.事故经过 二.事故原因 三.问题总结 四.后续处理方案 一.事故经过 12月8日早上8点左右 发现系统不停报错,报错信息如下: amnOkN_kRXGMwU3qmULUxA, primary ...

  5. 分布式技术与实战第七课 高并发下高可用的熔断、降级、限流和负载均衡、监控以及统一的日志系统

    第39讲:从双十一看高可用的保障方式 从这一课时开始,专栏内容进入最后一个模块,即分布式高可用系列,这部分的内容,我将以电商大促为背景,讲解系统限流.降级熔断.负载均衡.稳定性指标.系统监控和日志系统 ...

  6. 不得不了解系列之限流

    点击关注公众号,Java干货及时送达 来源:https://my.oschina.net/qiangmzsx/blog/4277685 限流简介 现在说到高可用系统,都会说到高可用的保护手段:缓存.降 ...

  7. 轻松两步,我在 SpringBoot 服务上实现了接口限流

    点击上方蓝色"方志朋",选择"设为星标" 回复"666"获取独家整理的学习资料! Sentinel是阿里巴巴开源的限流器熔断器,并且带有可视 ...

  8. Spark2.x写入Elasticsearch的性能测试

    为什么80%的码农都做不了架构师?>>> 一.Spark集成ElasticSearch的设计动机 ElasticSearch 毫秒级的查询响应时间还是很惊艳的.其优点有: 1.    ...

  9. 实战 Spring Cloud Gateway 之限流篇

    来源:https://www.aneasystone.com/archives/2020/08/spring-cloud-gateway-current-limiting.html 话说在 Sprin ...

最新文章

  1. C#计算两个日期的相隔天数
  2. Activity采用栈式管理的理解
  3. 苹果系统里面 dictionary 如何加入中文词典
  4. 数据驱动车主 App 产品优化,轻松搞定用户体验与转化
  5. Scala 基础(8)—— 占位符_和部分应用函数
  6. SAP C4C Adapt menu debugging
  7. c 连接mysql数据库查询_C语言实现访问及查询MySQL数据库的方法
  8. Objective-C 【从文件中读写字符串(直接读写/通过NSURL读写)】
  9. 设置相机的距离_讲对焦(四):相机对焦有哪些小技巧?
  10. swift 自定义TabBarItem
  11. CM/CMR/CMP防火等级有何区别?CM/CMR/CMP级电缆网线如何选?
  12. 解锁Insyde的BIOS隐藏设置
  13. 我家遥控器载波波形研究
  14. NFC技术 (二) -硬件设计
  15. IOS下,利用捏合手势实现图像缩放和显示
  16. word上怎么把图片拼接到一起_如何用Word把自己插入的两张图片合在一起?
  17. 关于谷哥传奇工程师Jeff Dean的笑话
  18. Scylladb学习笔记
  19. 云知声深耕智慧语音,让智能“听得见”
  20. Java接口的基本概念详解

热门文章

  1. 秒杀多线程第八篇 经典线程同步 信号量Semaphore
  2. 架构演进,后端开发进入微服务时代!
  3. 2020 JVM生态报告
  4. 这里有一份面筋请查收(二)
  5. 【今晚9点】:对话黄琦——从FB到快手,短视频领域里的“实习生”
  6. 1.RTMP流媒体服务器搭建
  7. warning: control reaches end of non-void function
  8. Tensorflow的高级封装
  9. undefined: resolver.BuildOption undefined: resolver.ResolveNowOption 报错的解决办法
  10. 大剑无锋之Hadoop的三个作业调度器【面试推荐】