Spark 读 Elasticsearch
2019独角兽企业重金招聘Python工程师标准>>>
没什么好说的,直接粘贴代码吧
public class MySparkReadEs implements Serializable {private transient JavaSparkContext javaSparkContext = null;private transient SparkConf sparkConf = null;private transient Configuration esConf = null;private String esSource = "post/post";private String esNodes = "192.168.1.235,192.168.1.236";private static final Log LOG = LogFactory.getLog(MySparkReadEs.class);private MySparkReadEs() {}private MySparkReadEs(String esSource, String esNodes) {this.esSource = esSource;this.esNodes = esNodes;}//初始化Spark contextprivate void initSparkContext() {this.sparkConf = new SparkConf().setMaster("local[2]").setAppName("my spark rdd");this.javaSparkContext = new JavaSparkContext(this.sparkConf);}//初始化查询Es的配置文件private void initEsConfig(String urn) {this.esConf = new Configuration((Configuration) HBaseConfiguration.create());//指定读取的索引名称this.esConf.set("es.resource", this.esSource);//指定es节点this.esConf.set("es.nodes", this.esNodes);//加入ES查询条件this.esConf.set("es.query", buildQueryESCondition(urn));}//构建查询Es的条件private String buildQueryESCondition(String urn) {//支持lucence的写法 里面可以继续添加其他字段StringBuilder sb = new StringBuilder();sb.append("postUrn:").append("(").append(urn).append(")");//查询条件的构造ImmutableMap<String, ImmutableMap<String, ImmutableMap<String, String>>> conditionMap = ImmutableMap.of("query", ImmutableMap.of("query_string", ImmutableMap.of("query", sb.toString())));//SPARK查询ES的查询条件LOG.info("SPARK 查询 ES 的条件为:" + JSON.toJSONString(conditionMap));return JSON.toJSONString(conditionMap);}//开始任务private void startJob() {List<String> extensionCollect = this.javaSparkContext.newAPIHadoopRDD(this.esConf, EsInputFormat.class, NullWritable.class, MapWritable.class).map(new Function<Tuple2<NullWritable, MapWritable>, String>() {@Overridepublic String call(Tuple2<NullWritable, MapWritable> v1) throws Exception {MapWritable mapWritable = v1._2();Map<String, Object> o = (Map<String, Object>) WritableUtils.fromWritable(mapWritable);String extension = (String) o.get("extension");return extension;}}).collect();for (String extension : extensionCollect) {LOG.info("Extension : " + extension);}}//主方法public static void main(String[] args) {String esNodes = "192.168.1.235,192.168.1.236";String esSource = "post/post";String userurn = "3136949-bf30c1da6f8b47d3d93a5c4f75194447";MySparkReadEs mySparkRdd = new MySparkReadEs(esSource, esNodes);//初始化 spark contextmySparkRdd.initSparkContext();//初始化 es 配置mySparkRdd.initEsConfig(userurn);//开始作业mySparkRdd.startJob();}
}
转载于:https://my.oschina.net/momisabuilder/blog/605956
Spark 读 Elasticsearch相关推荐
- Spark整合ElasticSearch
2019独角兽企业重金招聘Python工程师标准>>> spark整合elasticsearch两种方式 1.自己生成_id等元数据 2.使用ES默认生成 引入对应依赖 <de ...
- 数据湖应用解析:Spark on Elasticsearch一致性问题
概述 Spark与Elasticsearch(es)的结合,是近年来大数据解决方案很火热的一个话题.一个是出色的分布式计算引擎,另一个是出色的搜索引擎.近年来,越来越多的成熟方案落地到行业产品中,包括 ...
- Spark 整合ElasticSearch
Spark 整合ElasticSearch 因为做资料搜索用到了ElasticSearch,最近又了解一下 Spark ML,先来演示一个Spark 读取/写入 ElasticSearch 简单示例. ...
- Spark读HBASE - shc方案
shc是hortonworks出品的开源方案,基于spark的特性,分片处理,并通过谓词下推,提高处理性能. 1. 引入依赖包 <dependency><groupId>com ...
- Spark 与 Elasticsearch交互的一些配置和问题解决
最近刚开始接触大数据,一个日志分析系统,需要用Spark开发,Elasticsearch作为数据库来使用.所以第一步要解决的就是怎么从Spark去取Elasticsearch上的数据,下面是软件的版本 ...
- spark读Hbase数据集成Hbase Filter(过滤器)
文章目录 过滤器简介 spark 读Hbase集成Filter TableInputFormat 源码 代码示例 基于hbase版本2.3.5 过滤器简介 Hbase 提供了种类丰富的过滤器(filt ...
- Spark Streaming + Elasticsearch构建App异常监控平台
本文已发表在<程序员>杂志2016年10月期. 如果在使用App时遇到闪退,你可能会选择卸载App.到应用商店怒斥开发者等方式来表达不满.但开发者也同样感到头疼,因为崩溃可能意味着用户流失 ...
- 用 Spark 为 Elasticsearch 导入搜索数据
越来越健忘了,得记录下自己的操作才行! ES和spark版本: spark-1.6.0-bin-hadoop2.6 Elasticsearch for Apache Hadoop 2.1.2 如果是其 ...
- spark写入elasticsearch限流
文章目录 1. spark 批量写入es 2. java-spark写入elasticsearch 3. es_hadoop的源码拓展 1. MyEsSparkSQL 2. MyEsDataFrame ...
最新文章
- [LeetCode题解] ZigZag Conversion
- stm32F105的can2问题
- 【linux】kill命令模板
- Vue Vuex todo举例
- C++基础知识(四)—— 操作符/运算符
- WPF 4 动态覆盖图标(Dynamic Overlay Icon)
- linux主题管理器,XFCE 主题管理器:一款单一的GUI主题管理器,更改任何XFCE主题(带预览)...
- [渝粤教育] 西南科技大学 供用电网络及变电所设备 在线考试复习资料
- 从六爻分析淘宝的发展
- seraph_256 写给自己的编程小事
- C++ and C# 从TLS握手二进制中获取SNI(服务器名称指示)域名
- 百度地图Javascript API 使用记录
- 使用Mysql Navcat导出查询数据excel时出现数据丢失
- excel高级功能-分级显示
- 085 《穷查理宝典》简记
- C语言实现3个数的最小公倍数和最大公约数
- 派生类成员的访问属性
- Druid连接池实现数据库加密
- 图灵完备 java_图灵完备
- electron仿微信截图工具(初学者的尝试笔记)