http://www.jianshu.com/p/cccc56e39429/comments/2022782 和 https://github.com/elastic/elasticsearch-hadoop/issues/745 都有提到通过自定义Spark Partitioner提升es-hadoop Bulk效率,但是无可运行代码,自己针对其思路在spark-shell里实现了一份。

思路:

spark streming监控/tmp/data下的新文件,并将文中每行内容存储到ES的web/blog索引里!

注意:代码里使用了doc ID来定制路由,该id为自动生成的uuid!因此在启动ES后,需要:

curl -s -XPUT localhost:9200/web -d '
{"mappings": {"blog": {"_id": {"path": "uuid"},"properties": {"title": {"type":   "string","index":  "analyzed"}}}}
}'

告诉ES使用blog document中的uuid字段作为_id。ES 2.0以后见 http://stackoverflow.com/questions/32334709/how-to-set-id-in-elasticsearch-2-0

下面是spark-shell代码:

import org.apache.spark._
import org.apache.spark.streaming._
import org.elasticsearch.spark._
import org.apache.spark.Partitioner
import org.elasticsearch.hadoop.cfg.PropertiesSettings
import org.elasticsearch.spark.cfg.SparkSettingsManager
import org.elasticsearch.hadoop.cfg.Settings
import org.elasticsearch.hadoop.rest.RestRepository
import scala.collection.JavaConversions._// 为方便测试,下面是自己用scala实现的es hash函数
// 尤其注意:在生产环境下,使用ES jar包里的函数,位置为:
// https://github.com/elastic/elasticsearch/blob/master/core/src/main/java/org/elasticsearch/cluster/routing/Murmur3HashFunction.java
object Murmur3HashFunction {def hash(routing: String): Int = {val bytesToHash = Array.ofDim[Byte](routing.length * 2)for (i <- 0 until routing.length) {val c = routing.charAt(i)val b1 = c.toByteval b2 = (c >>> 8).toByteassert(((b1 & 0xFF) | ((b2 & 0xFF) << 8)) == c)bytesToHash(i * 2) = b1bytesToHash(i * 2 + 1) = b2}hash(bytesToHash, 0, bytesToHash.length)}def hash(bytes: Array[Byte], offset: Int, length: Int): Int = {murmurhash3_x86_32(bytes, offset, length, 0)}def murmurhash3_x86_32(data: Array[Byte], offset: Int, len: Int, seed: Int): Int = {val c1 = 0xcc9e2d51val c2 = 0x1b873593var h1 = seedval roundedEnd = offset + (len & 0xfffffffc)var i = offsetwhile (i < roundedEnd) {var k1 = (data(i) & 0xff) | ((data(i + 1) & 0xff) << 8) | ((data(i + 2) & 0xff) << 16) | (data(i + 3) << 24)k1 *= c1k1 = (k1 << 15) | (k1 >>> 17)k1 *= c2h1 ^= k1h1 = (h1 << 13) | (h1 >>> 19)h1 = h1 * 5 + 0xe6546b64i += 4}var k1 = 0len & 0x03 match {case 3 => k1 = (data(roundedEnd + 2) & 0xff) << 16case 2 => k1 |= (data(roundedEnd + 1) & 0xff) << 8case 1 => k1 |= (data(roundedEnd) & 0xff)k1 *= c1k1 = (k1 << 15) | (k1 >>> 17)k1 *= c2h1 ^= k1case _ => //break
    }h1 ^= lenh1 ^= h1 >>> 16h1 *= 0x85ebca6bh1 ^= h1 >>> 13h1 *= 0xc2b2ae35h1 ^= h1 >>> 16h1}
}// 自定义Partitioner
class ESShardPartitioner(settings: String) extends Partitioner {protected var _numPartitions = -1override def numPartitions: Int = {   val newSettings = new PropertiesSettings().load(settings)// 生产环境下,需要自行设置索引的 index/type,我是以web/blog作为实验的indexnewSettings.setResourceRead("web/blog") // ******************** !!! modify it !!! ******************** newSettings.setResourceWrite("web/blog") // ******************** !!! modify it !!! ******************** val repository = new RestRepository(newSettings)val targetShards = repository.getWriteTargetPrimaryShards(newSettings.getNodesClientOnly())repository.close()_numPartitions = targetShards.size()_numPartitions} override def getPartition(docID: Any): Int = {var shardId = Murmur3HashFunction.hash(docID.toString()) % _numPartitions;if (shardId < 0) {shardId += _numPartitions;}shardId}
}sc.getConf.setMaster("local").setAppName("RDDTest").set("es.nodes", "127.0.0.1").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("es.index.auto.create", "true");
val ssc = new StreamingContext(sc, Seconds(2));
val fileStream = ssc.textFileStream("/tmp/data");fileStream.foreachRDD { rdd => {def makeItem(content: String) : (String, Map[String,String]) = {val uuid = java.util.UUID.randomUUID.toString();(uuid, Map("content"->content, "uuid"->uuid))     }println("********************start*************************");var r2 = rdd.map(makeItem);val sparkCfg = new SparkSettingsManager().load(rdd.sparkContext.getConf)val settings = sparkCfg.save();var r3 = r2.partitionBy(new ESShardPartitioner(settings));    r3.map(x=>x._2).saveToEs("web/blog")println("data count: " + rdd.count.toString);println("*********************end************************");
}};ssc.start();
ssc.awaitTermination();

运行方法:

./spark-shell --jars ../lib/elasticsearch-spark-1.2_2.10-2.1.2.jar

然后在spark shell里运行上述代码。

通过shell 伪造数据:

mkdir /mmp/data
#rm -rf  /tmp/ ata"
rm -f "/tmp/data/*"
for ((j=0;j<30;j++)); do{for ((i=0;i<20;i++)); dofile_name=`python -c 'import random;print random.random()'`echo "$j $i is sad story." >"/tmp/data/$file_name.log"donesleep 1}
done
echo "OK, waiting..."
echo "done"

运行上述脚本,看到spark shell里显示:

见http://www.cnblogs.com/bonelee/p/6078956.html ES路由底层实现!

转载于:https://www.cnblogs.com/bonelee/p/6057211.html

自定义Spark Partitioner提升es-hadoop Bulk效率相关推荐

  1. 自定义Spark Partitioner提升es-hadoop Bulk效率——续

    对于es 2.4版本,要能定制spark partitioner需要如下方式启动spark shell: spark-2.0.0-bin-hadoop2.6/bin/spark-shell --jar ...

  2. Spark自定义分区(Partitioner)

    我们都知道Spark内部提供了HashPartitioner和RangePartitioner两种分区策略(这两种分区的代码解析可以参见:<Spark分区器HashPartitioner和Ran ...

  3. 微信小程序开发05 研发加速:使用 Webpack 提升小程序研发效率

    你好,我是俊鹏.从今天开始,我会用四节课的时间带你学习微信小程序在工程化方面的实践方案. 小程序发展到今天已经成为很多产品的重要流量入口,随着用户量的增加,功能不断复杂化,小程序的体量不断增长,原始的 ...

  4. 免费良心软件整理,提升你的工作效率

    作为一个软件工具发烧友,我收集了超多的神器,分享给你,希望能提升你的工作效率. 陆续不断的更新着软件,不知不觉更新了好多软件,几乎涵盖了全网的好用软件,为了方便各位看官看,我做了一个小目录: 01 截 ...

  5. 如何提升各开发角色效率,既保证产品质量又能快速上线?

    前言 网易杭州研究院 · 叶锋 当一个产品或项目变得比较大的时候,开发效率就会提上议程.如果项目管理混乱或者某个环节脱节,往往会让项目进展缓慢,走弯路,甚至失败. 一个大的产品开发往往包含了项目经理, ...

  6. 腾讯再次开源三项技术,提升企业运维效率

    (2019年4月11日,深圳)在腾讯内部,工程师文化依然是主流,鼓励用代码.用技术说话--"talk is cheap, show me the code."而其中优质的技术,也正 ...

  7. word2003如何设置护眼模式_ERP系统上线,如何设置采购收货的模式,提升企业的采购效率...

    如何合理的规划采购计划 上次去拜访一个朋友,他们说公司既然出现没有下达采购订单,供应商也有送货过来的事情,对于公司来说,这个是非常严重的问题. 若用了ERP系统之后,如何避免类似的事情发生,今天我们来 ...

  8. typora 分割线_实战 | 五分钟,使用Typora+PicGo提升百倍写作效率

    学习和工作中,我们经常需要写作.如果你正在学习编程,可能会写篇学习笔记记录一下.如果你是一名开发者,可能会写篇技术文档或者接口文档.如果一是一名博主,可能会写篇文章分享知识.又或者写篇散文,抒发下内心 ...

  9. CSharpGL(30)用条件渲染(Conditional Rendering)来提升OpenGL的渲染效率

    CSharpGL(30)用条件渲染(Conditional Rendering)来提升OpenGL的渲染效率 当场景中有比较复杂的模型时,条件渲染能够加速对复杂模型的渲染. 条件渲染(Conditio ...

最新文章

  1. linux的运维管理UNIT3
  2. 动态绑定 dgvlist 列
  3. python使用界面-推荐8款常用的Python GUI图形界面开发框架
  4. python代码案例详解-我用Python抓取了7000 多本电子书案例详解
  5. 论文笔记 Bayesian Probabilistic Matrix Factorizationusing Markov Chain Monte Carlo (ICML 2008)
  6. 如何关闭Windows XP/Vista/Windows 7的DEP数据执行保护
  7. linux中_在 Linux 桌面中开始使用 Lumina | Linux 中国
  8. JavaScript 运行机制详解(理解同步、异步和事件循环)
  9. 安装eclipse及android,Eclipse Android 安装
  10. C++读取字符串中的数字的方法
  11. 关于降低锁的竞争程度------从奶爸的角度思考
  12. Sublime Text实现代码自动生成,快速编写HTML/CSS代码
  13. MBBR 物联网大数据监控系统
  14. GMM-HMM 详解
  15. 通过黑客代号带你回顾九位世界顶尖的黑客大咖
  16. Trusted Boot和Secure Boot的区别
  17. dolphinscheduler 2.0.5 告警组件-HTTP试用及改造
  18. arm linux源更新,[Linux] - Manjaro ARM 系统配置(更新镜像源,安装 Docker 和 Dotnet Core)...
  19. 【Spring】AOP实现日志记录
  20. 冷知识点:COLLATE 关键字是什么意思?

热门文章

  1. 【转】关于Apache与Nginx的优势比较
  2. mysql8解压版安装没有密码_MySQL8解压版安装
  3. java callback类_利用java8新特性实现类似javascript callback特性
  4. linux mysql5.7.11安装_centos 7 安装mysql 5.7.11
  5. egg mysql 连表查询_Egg中使用Sequelize框架关联查询Mysql数据库
  6. java override格式_Lambda表达式告别@override
  7. Spring事务原理,Java开发岗还不会这些问题
  8. 【2021年度训练联盟热身训练赛第四场】Game Map(python C++)
  9. 静态链表的插入和删除
  10. java spring redis订阅_spring中订阅redis键值过期消息通知