2019独角兽企业重金招聘Python工程师标准>>>

没什么好说的,直接粘贴代码吧

public class MySparkReadEs implements Serializable {private transient JavaSparkContext javaSparkContext = null;private transient SparkConf sparkConf = null;private transient Configuration esConf = null;private String esSource = "post/post";private String esNodes = "192.168.1.235,192.168.1.236";private static final Log LOG = LogFactory.getLog(MySparkReadEs.class);private MySparkReadEs() {}private MySparkReadEs(String esSource, String esNodes) {this.esSource = esSource;this.esNodes = esNodes;}//初始化Spark contextprivate void initSparkContext() {this.sparkConf = new SparkConf().setMaster("local[2]").setAppName("my spark rdd");this.javaSparkContext = new JavaSparkContext(this.sparkConf);}//初始化查询Es的配置文件private void initEsConfig(String urn) {this.esConf = new Configuration((Configuration) HBaseConfiguration.create());//指定读取的索引名称this.esConf.set("es.resource", this.esSource);//指定es节点this.esConf.set("es.nodes", this.esNodes);//加入ES查询条件this.esConf.set("es.query", buildQueryESCondition(urn));}//构建查询Es的条件private String buildQueryESCondition(String urn) {//支持lucence的写法 里面可以继续添加其他字段StringBuilder sb = new StringBuilder();sb.append("postUrn:").append("(").append(urn).append(")");//查询条件的构造ImmutableMap<String, ImmutableMap<String, ImmutableMap<String, String>>> conditionMap = ImmutableMap.of("query", ImmutableMap.of("query_string", ImmutableMap.of("query", sb.toString())));//SPARK查询ES的查询条件LOG.info("SPARK 查询 ES 的条件为:" + JSON.toJSONString(conditionMap));return JSON.toJSONString(conditionMap);}//开始任务private void startJob() {List<String> extensionCollect = this.javaSparkContext.newAPIHadoopRDD(this.esConf, EsInputFormat.class, NullWritable.class, MapWritable.class).map(new Function<Tuple2<NullWritable, MapWritable>, String>() {@Overridepublic String call(Tuple2<NullWritable, MapWritable> v1) throws Exception {MapWritable mapWritable = v1._2();Map<String, Object> o = (Map<String, Object>) WritableUtils.fromWritable(mapWritable);String extension = (String) o.get("extension");return extension;}}).collect();for (String extension : extensionCollect) {LOG.info("Extension : " + extension);}}//主方法public static void main(String[] args) {String esNodes = "192.168.1.235,192.168.1.236";String esSource = "post/post";String userurn = "3136949-bf30c1da6f8b47d3d93a5c4f75194447";MySparkReadEs mySparkRdd = new MySparkReadEs(esSource, esNodes);//初始化 spark contextmySparkRdd.initSparkContext();//初始化 es 配置mySparkRdd.initEsConfig(userurn);//开始作业mySparkRdd.startJob();}
}

转载于:https://my.oschina.net/momisabuilder/blog/605956

Spark 读 Elasticsearch相关推荐

  1. Spark整合ElasticSearch

    2019独角兽企业重金招聘Python工程师标准>>> spark整合elasticsearch两种方式 1.自己生成_id等元数据 2.使用ES默认生成 引入对应依赖 <de ...

  2. 数据湖应用解析:Spark on Elasticsearch一致性问题

    概述 Spark与Elasticsearch(es)的结合,是近年来大数据解决方案很火热的一个话题.一个是出色的分布式计算引擎,另一个是出色的搜索引擎.近年来,越来越多的成熟方案落地到行业产品中,包括 ...

  3. Spark 整合ElasticSearch

    Spark 整合ElasticSearch 因为做资料搜索用到了ElasticSearch,最近又了解一下 Spark ML,先来演示一个Spark 读取/写入 ElasticSearch 简单示例. ...

  4. Spark读HBASE - shc方案

    shc是hortonworks出品的开源方案,基于spark的特性,分片处理,并通过谓词下推,提高处理性能. 1. 引入依赖包 <dependency><groupId>com ...

  5. Spark 与 Elasticsearch交互的一些配置和问题解决

    最近刚开始接触大数据,一个日志分析系统,需要用Spark开发,Elasticsearch作为数据库来使用.所以第一步要解决的就是怎么从Spark去取Elasticsearch上的数据,下面是软件的版本 ...

  6. spark读Hbase数据集成Hbase Filter(过滤器)

    文章目录 过滤器简介 spark 读Hbase集成Filter TableInputFormat 源码 代码示例 基于hbase版本2.3.5 过滤器简介 Hbase 提供了种类丰富的过滤器(filt ...

  7. Spark Streaming + Elasticsearch构建App异常监控平台

    本文已发表在<程序员>杂志2016年10月期. 如果在使用App时遇到闪退,你可能会选择卸载App.到应用商店怒斥开发者等方式来表达不满.但开发者也同样感到头疼,因为崩溃可能意味着用户流失 ...

  8. 用 Spark 为 Elasticsearch 导入搜索数据

    越来越健忘了,得记录下自己的操作才行! ES和spark版本: spark-1.6.0-bin-hadoop2.6 Elasticsearch for Apache Hadoop 2.1.2 如果是其 ...

  9. spark写入elasticsearch限流

    文章目录 1. spark 批量写入es 2. java-spark写入elasticsearch 3. es_hadoop的源码拓展 1. MyEsSparkSQL 2. MyEsDataFrame ...

最新文章

  1. [LeetCode题解] ZigZag Conversion
  2. stm32F105的can2问题
  3. 【linux】kill命令模板
  4. Vue Vuex todo举例
  5. C++基础知识(四)—— 操作符/运算符
  6. WPF 4 动态覆盖图标(Dynamic Overlay Icon)
  7. linux主题管理器,XFCE 主题管理器:一款单一的GUI主题管理器,更改任何XFCE主题(带预览)...
  8. [渝粤教育] 西南科技大学 供用电网络及变电所设备 在线考试复习资料
  9. 从六爻分析淘宝的发展
  10. seraph_256 写给自己的编程小事
  11. C++ and C# 从TLS握手二进制中获取SNI(服务器名称指示)域名
  12. 百度地图Javascript API 使用记录
  13. 使用Mysql Navcat导出查询数据excel时出现数据丢失
  14. excel高级功能-分级显示
  15. 085 《穷查理宝典》简记
  16. C语言实现3个数的最小公倍数和最大公约数
  17. 派生类成员的访问属性
  18. Druid连接池实现数据库加密
  19. 图灵完备 java_图灵完备
  20. electron仿微信截图工具(初学者的尝试笔记)

热门文章

  1. Linux安装mysql学习
  2. C#—使用InstallerProjects打包桌面应用程序
  3. [APIO2018]铁人两项——圆方树+树形DP
  4. HashSet 与TreeSet和LinkedHashSet的区别
  5. _ZNote_Qt_定时器的总结
  6. SRS学习笔记7-SrsHttpServer
  7. Swift - 文本输入框(UITextField)的用法
  8. 消除文法中一切左递归算法
  9. Editplus PHP版
  10. 测试集的构成比例对网络分类性能的影响cp