spark写入elasticsearch限流
文章目录
- 1. spark 批量写入es
- 2. java-spark写入elasticsearch
- 3. es_hadoop的源码拓展
- 1. MyEsSparkSQL
- 2. MyEsDataFrameWriter
1. spark 批量写入es
正常情况下,我们的spark任务有写入es的需求的时候,我们都是使用ES_Hadoop。参考官方的这里,选择适合自己的版本,如果是hive,spark等都有用到的话可以直接配置
<dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch-hadoop</artifactId><version>7.1.1</version>
</dependency>
因为我们这里只是用到了spark,spark的版本是2.3 , scale 是2.11 ,elasticsearch是7.1.1所以只引入spark的包即可。
<dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch-spark-20_2.11</artifactId><version>7.1.1</version></dependency>
2. java-spark写入elasticsearch
java写入es的代码可以这样
@Data
public class UserProfileRecord {public String uid;public String want_val;
}
SparkConf sparkConf = new SparkConf().setAppName(JOB_NAME).set(ConfigurationOptions.ES_NODES, esHost).set(ConfigurationOptions.ES_PORT, esPort).set(ConfigurationOptions.ES_NET_HTTP_AUTH_USER, esUser).set(ConfigurationOptions.ES_NET_HTTP_AUTH_PASS, esPass).set(ConfigurationOptions.ES_BATCH_SIZE_ENTRIES, "500").set(ConfigurationOptions.ES_MAPPING_ID, "uid");SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();Dataset<Row> wantedCols = sparkSession.read().parquet(path);Dataset<UserProfileRecord> searchUserProfile = wantedCols.mapPartitions(new MapPartitionsFunction<Row, UserProfileRecord>() {@Overridepublic Iterator<UserProfileRecord> call(Iterator<Row> input) throws Exception {List<UserProfileRecord> cleanProfileList = new LinkedList<>();while (input.hasNext()) {UserProfileRecord aRecord = new UserProfileRecord();.........cleanProfileList.add(aRecord);}return cleanProfileList.iterator();}}, Encoders.bean(UserProfileRecord.class));EsSparkSQL.saveToEs(searchUserProfile.repartition(3), this.writeIndex);
这里因为es当前只有3个节点,所以用了一个repartition来将写入es的task数变成3个,减小对es的压力,在实际的使用过程中主片的写入速度能够达到平均3w/s,但是当任务产出的数据量比较大的时候写入的时间会比较长,还是会对当前的es集群产生比较大的影响,导致部分查询超时。
查找了很多官方的文档,发现能够调整的很有限,一般都是调整partition的数量和ConfigurationOptions.ES_BATCH_SIZE_ENTRIES 来throttle写入es的速度。我这边各种试探,收效甚微。
本来想用elasticsearch的java-client直接做rest请求的(这样就可以控制速速了),但是翻了一下es_hadoop的源码,发现她用的是tranport-client(是es内部通信使用的基于tcp的协议封装)那肯定比http类型的rest更高效啊,而且还有很多partition和es索引的replica的映射关系,想着应该是做了很多优化。所以还是用es_hadoop来做吧,没有办法了,只能看看改改源码了。
3. es_hadoop的源码拓展
增加了两个scala文件(强上scala
spark写入elasticsearch限流相关推荐
- spark写入数据到elasticsearch
本例中将DataFrame写入ES,之后利用kibana进行统计展示. import com.hm.util.SparkHelper import org.apache.spark.sql.{Data ...
- 智慧出行/spark Streaming-Dstream流优化:1.消费并行度,2.序列化,3.限流,压背,冷启4.cpu空转时间,5.不要在代码中判断这个表是否存在,6.推测执行7.开启动态资源分配
1.设置合理的消费并行度 最优的方案是:kafka分区数:broker *3/6/9 kafka分区能不能增加,能不能减少? kafka分区数是可以增加的,但是不能减少 2.序列化 java的序列化, ...
- Spark Streaming + Elasticsearch构建App异常监控平台
本文已发表在<程序员>杂志2016年10月期. 如果在使用App时遇到闪退,你可能会选择卸载App.到应用商店怒斥开发者等方式来表达不满.但开发者也同样感到头疼,因为崩溃可能意味着用户流失 ...
- ES限流导致ES数据更新不及时问题
目录 一.事故经过 二.事故原因 三.问题总结 四.后续处理方案 一.事故经过 12月8日早上8点左右 发现系统不停报错,报错信息如下: amnOkN_kRXGMwU3qmULUxA, primary ...
- 分布式技术与实战第七课 高并发下高可用的熔断、降级、限流和负载均衡、监控以及统一的日志系统
第39讲:从双十一看高可用的保障方式 从这一课时开始,专栏内容进入最后一个模块,即分布式高可用系列,这部分的内容,我将以电商大促为背景,讲解系统限流.降级熔断.负载均衡.稳定性指标.系统监控和日志系统 ...
- 不得不了解系列之限流
点击关注公众号,Java干货及时送达 来源:https://my.oschina.net/qiangmzsx/blog/4277685 限流简介 现在说到高可用系统,都会说到高可用的保护手段:缓存.降 ...
- 轻松两步,我在 SpringBoot 服务上实现了接口限流
点击上方蓝色"方志朋",选择"设为星标" 回复"666"获取独家整理的学习资料! Sentinel是阿里巴巴开源的限流器熔断器,并且带有可视 ...
- Spark2.x写入Elasticsearch的性能测试
为什么80%的码农都做不了架构师?>>> 一.Spark集成ElasticSearch的设计动机 ElasticSearch 毫秒级的查询响应时间还是很惊艳的.其优点有: 1. ...
- 实战 Spring Cloud Gateway 之限流篇
来源:https://www.aneasystone.com/archives/2020/08/spring-cloud-gateway-current-limiting.html 话说在 Sprin ...
最新文章
- C#计算两个日期的相隔天数
- Activity采用栈式管理的理解
- 苹果系统里面 dictionary 如何加入中文词典
- 数据驱动车主 App 产品优化,轻松搞定用户体验与转化
- Scala 基础(8)—— 占位符_和部分应用函数
- SAP C4C Adapt menu debugging
- c 连接mysql数据库查询_C语言实现访问及查询MySQL数据库的方法
- Objective-C 【从文件中读写字符串(直接读写/通过NSURL读写)】
- 设置相机的距离_讲对焦(四):相机对焦有哪些小技巧?
- swift 自定义TabBarItem
- CM/CMR/CMP防火等级有何区别?CM/CMR/CMP级电缆网线如何选?
- 解锁Insyde的BIOS隐藏设置
- 我家遥控器载波波形研究
- NFC技术 (二) -硬件设计
- IOS下,利用捏合手势实现图像缩放和显示
- word上怎么把图片拼接到一起_如何用Word把自己插入的两张图片合在一起?
- 关于谷哥传奇工程师Jeff Dean的笑话
- Scylladb学习笔记
- 云知声深耕智慧语音,让智能“听得见”
- Java接口的基本概念详解
热门文章
- 秒杀多线程第八篇 经典线程同步 信号量Semaphore
- 架构演进,后端开发进入微服务时代!
- 2020 JVM生态报告
- 这里有一份面筋请查收(二)
- 【今晚9点】:对话黄琦——从FB到快手,短视频领域里的“实习生”
- 1.RTMP流媒体服务器搭建
- warning: control reaches end of non-void function
- Tensorflow的高级封装
- undefined: resolver.BuildOption undefined: resolver.ResolveNowOption 报错的解决办法
- 大剑无锋之Hadoop的三个作业调度器【面试推荐】