Elasticsearch和Hive整合,将hive数据同步到ES中
1 Elasticsearch整合Hive
1.1 软件环境
Hadoop软件环境
Hive软件环境
ES软件环境
1.2 ES-Hadoop介绍
1.2.1 官网
https://www.elastic.co/cn/products/hadoop
1.2.2 对 Hadoop 数据进行交互分析
Hadoop 是出色的批量处理系统,但是要想提供实时结果则颇具挑战。为了实现真正的交互式数据探索,您可以使用 ES-Hadoop 将 Hadoop 数据索引到 Elastic Stack,以充分利用快速的 Elasticsearch 引擎和Kibana精美的可视化效果。
有了 ES-Hadoop,您可以轻松构建动态的嵌入式搜索应用来处理您的 Hadoop 数据,或者使用全文本、空间地理查询和聚合,执行深度的低延时分析。从产品推荐到基因组测序,ES-Hadoop 开启了广泛而全新的应用领域。
1.2.3 让数据在 Elasticsearch 和 Hadoop 之间无缝移动
只有实现了数据的快速移动,才能让实时决策成为可能。凭借现有Hadoop API的动态扩展,ES-Hadoop让您能够在Elasticsearch和Hadoop之间轻松地双向移动数据,同时借助HDFS作为存储库,进行长期存档。分区感知、故障处理、类型转换和数据共享均可透明地完成。
1.2.4 本地对接Spark及其衍生技术
ES-Hadoop 完全支持 Spark、Spark Streaming 和 SparkSQL。此外,无论您使用 Hive、Pig、Storm、Cascading,还是标准 MapReduce,ES-Hadoop 都将提供本地对接,供您向 Elasticsearch 索引数据并从 Elasticsearch 查询数据。无论您用哪种技术,Elasticsearch 的所有功能任您支配。
1.2.5 随时随地确保数据安全
ES-Hadoop 拥有您需要的所有安全功能,包括 HTTP 身份验证和对 SSL/TLS 的支持。此外,它还能与支持 Kerberos 的 Hadoop 部署一起使用。
1.3 安装
1.3.1 常规安装
获取Elasticsearch-hadoop二进制文件可以通过从http://elastic.co/下载一个zip包(这个zip包中包含jars,sources,和documention),或者通过添加依赖文件:
<dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch-hadoop</artifactId><version>7.1.1</version>
</dependency>
上面的这个jar包包含所有的Elasticsearch-Hadoop的特性,在运行时期间不需要任何其它的依赖。换句话说,它可以原样使用。
Elasticsearch-hadoop二进制适用于Hadoop 2.x(又叫做yarn)环境,在5.5版本之后支持hadoop 1.x版本环境的将会过时,在6.0之后将不会再进行测试。
1.3.2 最小版二进制包
Elasticsearch-hadoop提供最小版本的用于每个集成的jar包,
1.3.2.1 Map/Reduce集成
<dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch-hadoop-mr</artifactId> <version>7.1.1</version>
</dependency>
1.3.2.2 Hive集成
<dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch-hadoop-hive</artifactId> <version>7.1.1</version>
</dependency>
1.3.2.3 Pig集成
<dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch-hadoop-pig</artifactId> <version>7.1.1</version>
</dependency>
1.3.2.4 Spark集成
<dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch-spark-20_2.10</artifactId> <version>7.1.1</version>
</dependency>
注意:spark artifact。注意后缀中的这个-20表示spark的兼容版本。Spark 2.0+使用20,Spark 1.3 ~1.6使用13.
要注意的是,_2.10后缀表示scala的兼容版本。
以下是Spark version和ES-Hadoop Artifact ID的对应版本。
1.3.2.5 Strom集成
<dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch-storm</artifactId> <version>7.1.1</version>
</dependency>
1.3.3 配置
Elasticsearch-Hadoop的行为可以通过下面的属性来定制。
1.3.3.1 Required settings
es.resource
Elasticsearch资源的位置,数据读取和写入的位置。需要的格式是: /。
es.resource = twitter/tweet #index是’twitter’,type是’tweet’
es.resource.read(默认为es.resource)
Elasticsearch读取的数据资源(不是写)。在使用相同的job将数据读或写到不同的Elasticsearch的indices的时候将会很有用。通常设置成自动(除了Map/Reduce模块需要手动配置)。格式也是/,如artists/_doc。
支持多个index,如artists,bank/_doc,表示从artists和bank索引的_doc/读取数据。artists,bank/,表示从artists和bank索引中读取数据,type任意。_all/_doc表示从所有的_doc读取数据。
add jar elasticsearch-hadoop-6.1.2.jar;
add jar json-udf-1.3.8-jar-with-dependencies.jar;
add jar json-serde-1.3.8-jar-with-dependencies.jar;CREATE TABLE x (`es_metadata` string,`nested1` struct<item1:string, item2:int>,`nested2` struct<iterm3:double, iterm4:string>)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
'es.output.json' = 'true',
'es.resource.read' = 'netsed/test',
'es.nodes'='${nodes}',
'es.read.metadata' = 'true',
'es.read.metadata.field' = 'es_metadata',
'es.field.read.empty.as.null'='false',
'es.mapping.names' = 'nested2:Nested2,nested1:nested1'
);
es.resource.write(默认为es.resource)
Elasticsearch用于写入的资源(而不是读),通常用于动态资源写入,或在同一作业中将数据写入和读取到不同的Elasticsearch索引时使用.通常设置成自动(除了Map/Reduce模块需要手动配置)。
要注意的是在上面的resource设置里面指定的多个indices、type.只允许在reading的时候使用。只有在使用dynamic resource的时候支持指定多个indices。
1.3.3.2 Dynamic/multi resource writes
对于编写,Elasticsearch -hadoop允许在运行时使用模式(通过使用{}格式)解析目标资源,并根据流到Elasticsearch的数据在运行时解析。也就是说,可以基于从要保存的文档解析的一个或多个字段将文档保存到某个索引或类型。
例如,假设有下面的文档集合
{"media_type":"game","title":"Final Fantasy VI","year":"1994"
},
{"media_type":"book","title":"Harry Potter","year":"2010"
},
{"media_type":"music","title":"Surfing With The Alien","year":"1987"
}
要根据它们的media_type为每个类建立索引,可以使用一下模式:
# 根据文档的类型来索引它们
es.resource.write = my-collection/{media_type}
通过上面的配置,将导致”Final Fantasy VI”在my-collection/game中,Harry Potter在my-collection/book,”Surfing With The Alien”在my-collection/music。想了解更多的信息,可以参考专门的dedicated集成章节。
1.3.3.3 Formatting dynamic/multi resource writes
当使用dynamic/multi写时,还可以指定字段返回值的格式。hadoop提供了日期/时间戳字段的开箱即用格式,这对于在相同索引下的特定时间范围内自动分组基于时间的数据(例如日志)非常有用。通过使用Java SimpleDataFormat语法,可以以一种对语言环境敏感的方式格式化和解析日期。
例如,假设数据包含@timestamp字段,可以使用以下配置将文档分组到每日索引中:
@timestamp field formatting - in this case yyyy.MM.dd
@timestamp字段格式,在本例中是yyyy.MM.dd格式。
同样是使用这个相同的配置(es.resource.write),然而,通过特殊的|字符指定格式化模式。请参考SimpleDateFormat的javadocs获取更多的关于这个的语法支持。在这种情况下,yyyy.MM.dd将日期转换为年份(由四位数字指定)、月份(由两位数字指定)和天(如2015.01.28)。
1.3.3.4 Essential Settings
网络相关
es.nodes(默认localhost)
列出要连接的Elasticsearch节点。当远程使用Elasticsearch的时候,请设置这个参数。要注意的是这个列表中不一定非要包含Elasticsearch集群中的每个节点;默认情况下,这些是由elasticsearch-hadoop自动发现的(参见下面)。每个节点还可以单独指定其HTTP/REST端口(例如:mynode:9600)。
es.port(默认9200)
用于连接到Elasticsearch的默认的HTTP/REST端口。这个设置用于在es.nodes中没有指定端口的情况下使用。
es.nodes.path.prefix(默认空)
前缀,以添加到向Elasticsearch发出的所有请求中。适用于集群在特定路径下代理/路由的环境。例如,如果es集群位于someaddress:someport/custom/path/prefix 下,可以设置es.nodes.path.prefix 为 /custom/path/prefix。
1.3.3.5 Querying
es.query(默认为none)
保存从指定的es.resource中读取的数据,默认情况下他不为设置,也不为空。意味着在指定的index/type下的整个数据都被返回。es.query可以有三个来源:
uri query
使用?uri_query的这种格式,可以设置一个query string。要注意的是这个前导’?’。
query dsl
使用query_dsl格式,注意这个query dsl前缀需要以{开始,以}结束。
external resource
如果上面两个都不匹配,elasticsearch-hadoop将尝试将该参数解释为HDFS文件系统中的路径。如果不是这样,它将尝试从类路径加载资源,如果失败,则尝试从Hadoop DistributedCache加载资源。资源应该包含uri查询或查询dsl。
下面是示例:
# uri (or parameter) query
es.query = ?q=costinl# query dsl
es.query = { "query" : { "term" : { "user" : "costinl" } } }# external resource
es.query = org/mypackage/myquery.json
其它参数:https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html
1.3.4 Apache Hive integration
1.3.4.1 Installation
要确保elasticsearch-hadoop的jar能够在Hive classpath能够访问到。取决于你的选择,有很多中方式可以实现这一点。使用add命令添加jar文件,或者归档类路径。
ADD JAR /path/elasticsearch-hadoop.jar;
作为一个替代方案,也可以使用过下面的命令行:
CLI配置
$ bin/hive --auxpath=/path/elasticsearch-hadoop.jar
或者在命令行中使用hive.aux.jars.path属性、或者在hive-site.xml文件中,注册额外的jar(它也接收URI):
$ bin/hive -hiveconf hive.aux.jars.path=/path/elasticsearch-hadoop.jar
在hive-site.xml中也可以配置:
<property><name>hive.aux.jars.path</name><value>/path/elasticsearch-hadoop.jar</value><description>A comma separated list (with no spaces) of the jar files</description>
</property>
1.3.4.2 Configuration
当使用Hive,当声明支持Elasticsearch的外部表的时候,可以使用TBLPROPERTIES指定这个配置属性(作为Hadoop配置对象的可选配置),例如:
CREATE EXTERNAL TABLE artists (...)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES('es.resource' = 'radio/artists','es.index.auto.create' = 'false');
1.3.4.3 Mapping
默认情况下,elasticsearch-hadoop使用Hive的schema去映射在Elasticsearch中的数据。在这个过程中使用字段名称和类型。但是,在某些情况下,Hive中的名称不能与Elasticsearch一起使用(字段名可以包含Elasticsearch接受但Hive不接受的字符)。对于这种情况,可以使用es.mapping.names设置,接收以下的按照冒号分割的格式: Hive field name : Elasticsearch field name.
即:
CREATE EXTERNAL TABLE artists (...)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES('es.resource' = 'radio/artists','es.mapping.names' = 'date:@timestamp, url:url_123'); Hive 列 date映射到 Elasticsearch 中的 @timestamp; Hive的列url映射到Elasticsearch为url_123
注意:
1、Hive是大小写不敏感的然而Elasticsearch不是。数据丢失了可能产生无效的查询(因为在Hive中的列可能不能匹配Elasticsearch中的列)。为了避免这种问题,elasticsearch-hadoop将总是将Hive的列名称都转成小写。这就是说,建议使用默认的Hive样式,只对Hive命令使用大写名称,并避免混合大小写名称。
2、Hive通过一个特殊的NULL来对待丢失的值。这就意味着当运行一个不正确的查询(不正确或者名称不存在)时,Hive表将使用NULL填充,而不是抛出一个异常。确保验证你的数据,密切关注你的schema, 否则由于这种宽松的行为,更新将不会被注意到。
1.3.4.4 Writing data to Elasticsearch
有了elasticsearch-hadoop,Elasticsearch可以仅仅通过一个外部表load和读取数据:
CREATE EXTERNAL TABLE artists (id BIGINT,name STRING,links STRUCT<url:STRING, picture:STRING>)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' (1)
TBLPROPERTIES('es.resource' = 'radio/artists'); (2)-- insert data to Elasticsearch from another table called 'source'
INSERT OVERWRITE TABLE artistsSELECT NULL, s.name, named_struct('url', s.url, 'picture', s.picture)FROM source s;
(1)Elasticsearch Hive StorageHandler
(2)Elasticsearch resource (index and type) associated with the given storage
当文档中需要指定id(或者其他的metadata字段如ttl和timestamp)的时候,可以设置适当的mapping,也就是 es.mapping.id.紧跟上面的例子,指示Elasticsearch使用id作为文档的id,更新表的属性:
CREATE EXTERNAL TABLE artists (id BIGINT,...)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES('es.mapping.id' = 'id'...);
1.3.4.5 Writing existing JSON to Elasticsearch
对于job作业中输入数据已经在JSON中的场景,elasticsearch-hadoop允许直接索引,而不需要应用任何转换。数据直接按照原样直接发送到Elasticsearch.在这种情况下,需要创建为这个json创建索引通过设置es.input.json参数。同样地,elasticsearch-hadoop期望输出表只包含一个字段,这个内容用于作为json文档。就是说,这个library将识别指定的textual类型(例如:string 或 binary),或简单地调用(toString)。
注意:
确保数据以UTF-8正确的编码。字段内容被认为是发送到Elasticsearch的文档的最终形式。
CREATE EXTERNAL TABLE json (data STRING) (1)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES('es.resource' = '...','es.input.json` = 'yes');
(1)这个表的声明中只有一个STRING类型的字段。
(2)表明elasticsearch-hadoop 表的内容是JSON格式。
1.3.4.6 Writing to dynamic/multi-resources
可以使用模式将数据索引到不同的资源,具体取决于读取的行, 回到前面提到的media例子,我们可以这样配置它:
CREATE EXTERNAL TABLE media (name STRING,type STRING, (1)year STRING,
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES('es.resource' = 'my-collection-{type}/doc');
(1)表的字段被用于resource pattern. 可以使用任何声明的字段。
(2)资源的pattern 使用字段type
对于将要编写的每一行,elasticsearch-hadoop将提取type字段并使用其值确定目标资源。
在处理json数据的时候,同样适用,既然这样,这个值将从JSON文档中提取。假设有以下的JSON资源包含的文档结构如下:
{"media_type":"music", (1)"title":"Surfing With The Alien","year":"1987"
}
(1)将被用于pattern的json中的字段。
表的声明可以按照下面的方式声明:
CREATE EXTERNAL TABLE json (data STRING) (1)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES('es.resource' = 'my-collection-{media_type}/doc', 'es.input.json` = 'yes'); (2)
1.3.4.7 Reading data from Elasticsearch
从ElasticSearch中读取数据,类似如下:
CREATE EXTERNAL TABLE artists (id BIGINT,name STRING,links STRUCT<url:STRING, picture:STRING>)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' (1)
TBLPROPERTIES('es.resource' = 'radio/artists', (2)'es.query' = '?q=me*'); (3)-- stream data from Elasticsearch
SELECT * FROM artists;
Type conversion
Hive为定义数据提供了各种类型,并根据目标环境(从JDK本机类型到二进制优化的类型)在内部使用不同的实现。Elasticsearch集成了所有这些,包括和Serde2 lazy和lazy binary:
注意:
尽管Elasticsearch在2.0版之前可以理解Hive类型,但它向后兼容Hive 1.0
1.4 案例
将hive中的数据同步到ES中。
drop table if exists testhadoop;
CREATE EXTERNAL TABLE testhadoop (ID bigint,SUBJECT STRING,xxxxx)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES('es.nodes'='ip:10200',
'es.resource'='xxx/xxxx',
'es.index.auto.create'='false',
'es.mapping.id'='ID',
'es.mapping.names'= 'ID:ID,SUBJECT:SUBJECT,xxxxx',
'es.nodes.wan.only'='true',
'es.batch.write.retry.count'='10',
'es.batch.write.refresh'='true',
'es.batch.write.retry.wait'='60s',
'es.http.timeout'='100m',
'es.batch.size.entries'='100');INSERT INTO TABLE testhadoop
select 子句;
Elasticsearch和Hive整合,将hive数据同步到ES中相关推荐
- java中SpringBoot项目定时将MySql数据同步到ES中
项目所用依赖 <modelVersion>4.0.0</modelVersion><artifactId>tm-shop-model</artifactId& ...
- es 全量同步mysql_使用canal将mysql同步到es中
因为自己项目中需要用到mysql数据同步到es中,查找了相关资料最后决定用canal来做,所以便有了本文,下面一起来看如何使用canal吧 canal教程 根据 https://github.com/ ...
- 客快物流大数据项目(六十一):将消费的kafka数据同步到Kudu中
目录 将消费的kafka数据同步到Kudu中 一.导入表名映射关系类
- solr mysql 自动同步_MongoDB和Solr的整合以及实现数据同步功能
使用mongo-connector实现mongodb与solr数据同步: 1.solr搭建.这个我有记录,可以去找,这里不说了.(此次采用solr版本为4.7) 2.mongo搭建,要搭建集群,就是副 ...
- mysql数据同步到es
线上环境使用了logstash做mysql和es的数据同步.数据量过大时.可能会出现同步延时的问题. 一般同步方案有三种: 1:logstash等工具同步 2:数据库ES双写 3:消息机制 第一种有点 ...
- 使用elasticsearch-dump 复制ES数据到新ES中
业务需求:将ES的数据导入到一个新的ES中(由于之前的节点数为6个节点,现在为1一个节点,所以,不能直接把data下的数据copy到新的ES中,需要使用elasticsearch-dump来复制数据) ...
- java程序中hive数据推送es_Hive表数据同步到es
1.首先服务器节点,进入到对应的数据库. 2. 然后找到要同步的表,show create table + 表名查看一下 或者自己可以新建一个表,用来测试原表,如下 CREATE TABLE`wb_t ...
- Hive 不同存储格式表数据同步问题
本人在同步两个集群的hive数据时遇到了一个坑爹的问题,A集群X表为RCFILE格式,B集群Y表为ORCFILE格式,现需要将X表的历史数据同步到Y表中.当初天真的以为直接导出导入就能解决问题了,完全 ...
- MySQL数据同步到ES集群(MySQL数据库与ElasticSearch全文检索的同步)
简介:MySQL数据库与ElasticSearch全文检索的同步,通过binlog的设置对MySQL数据库操作的日志进行记录,利用Python模块对日志进行操作,再利用kafka的生产者消费者模式进行 ...
最新文章
- k means聚类算法_一文读懂K-means聚类算法
- linux下查看最消耗CPU、内存的进程
- 在做性能测试之前需要知道什么
- 【GAN优化】详解对偶与WGAN
- SQL 注入工具集合
- 解决eclipse中Mybatis框架下sql语句执行后控制台不显示日志问题
- kafka入门之broker--日志存储设计
- CodeForces - 1152B二进制+思维
- 重学java基础第二十四课:标识符合关键字
- 什么样的项目经历会让面试官眼前一亮
- 郫都区计算机老师周俊老师,教师节,带你走进郫都教师背后的故事
- NPM包管理器跟换国内镜像CNPM
- html下移,jQuery实现元素的上移下移删除
- 深度神经网络基本问题的原理详细分析和推导
- 华三交换机配置access命令_华三交换机配置中,shutdown这条命令怎么用?
- 线性系统大作业——0.一阶和二阶倒立摆建模与控制系统设计
- Ubuntu删除U盘分区,并格式化U盘
- Python3 利用阿里接口,根据银行卡号获取银行名称和logo
- bootstrap table合并单元格
- 一次性学会如何选择合适的APS系统
热门文章
- 史上最全 Python Re 模块讲解(二)
- 算法与数据结构(python):快速排序
- 虚拟电路网络与数据报网络
- VTK:选择像素用法实战
- VTK:相交线用法实战
- OpenCASCADE:Foundation Classes之插件管理
- boost::stl_interfaces模块实现重复字符迭代器的测试程序
- boost::signals2模块实现为类定义后构造函数的示例
- boost::process::windows相关的测试程序
- boost::mpl模块实现same_as相关的测试程序