基于Java、Kafka、ElasticSearch的搜索框架的设计与实现
Jkes是一个基于Java、Kafka、ElasticSearch的搜索框架。Jkes提供了注解驱动的JPA风格的对象/文档映射,使用rest api用于文档搜索。
项目主页:https://github.com/chaokunyang/jkes
安装
可以参考jkes-integration-test
项目快速掌握jkes框架的使用方法。jkes-integration-test
是我们用来测试功能完整性的一个Spring Boot Application。
安装
jkes-index-connector
和jkes-delete-connector
到Kafka Connect类路径安装 Smart Chinese Analysis Plugin
sudo bin/elasticsearch-plugin install analysis-smartcn
配置
引入jkes-spring-data-jpa依赖
添加配置
@EnableAspectJAutoProxy@EnableJkes@Configurationpublic class JkesConfig {@Beanpublic PlatformTransactionManager transactionManager(EntityManagerFactory factory, EventSupport eventSupport) { return new SearchPlatformTransactionManager(new JpaTransactionManager(factory), eventSupport);}
}
提供JkesProperties Bean
@Component@Configurationpublic class JkesConf extends DefaultJkesPropertiesImpl {@PostConstructpublic void setUp() {Config.setJkesProperties(this);} @Overridepublic String getKafkaBootstrapServers() { return "k1-test.com:9292,k2-test.com:9292,k3-test.com:9292";} @Overridepublic String getKafkaConnectServers() { return "http://k1-test.com:8084,http://k2-test.com:8084,http://k3-test.com:8084";} @Overridepublic String getEsBootstrapServers() { return "http://es1-test.com:9200,http://es2-test.com:9200,http://es3-test.com:9200";} @Overridepublic String getDocumentBasePackage() { return "com.timeyang.jkes.integration_test.domain";} @Overridepublic String getClientId() { return "integration_test";}}
这里可以很灵活,如果使用Spring Boot,可以使用@ConfigurationProperties
提供配置
增加索引管理端点 因为我们不知道客户端使用的哪种web技术,所以索引端点需要在客户端添加。比如在
Spring MVC
中,可以按照如下方式添加索引端点
@RestController@RequestMapping("/api/search")public class SearchEndpoint {private Indexer indexer; @Autowiredpublic SearchEndpoint(Indexer indexer) { this.indexer = indexer;} @RequestMapping(value = "/start_all", method = RequestMethod.POST) public void startAll() {indexer.startAll();} @RequestMapping(value = "/start/{entityClassName:.+}", method = RequestMethod.POST) public void start(@PathVariable("entityClassName") String entityClassName) {indexer.start(entityClassName);} @RequestMapping(value = "/stop_all", method = RequestMethod.PUT) public Map<String, Boolean> stopAll() { return indexer.stopAll();} @RequestMapping(value = "/stop/{entityClassName:.+}", method = RequestMethod.PUT) public Boolean stop(@PathVariable("entityClassName") String entityClassName) { return indexer.stop(entityClassName);} @RequestMapping(value = "/progress", method = RequestMethod.GET) public Map<String, IndexProgress> getProgress() { return indexer.getProgress();}}
快速开始
索引API
使用com.timeyang.jkes.core.annotation
包下相关注解标记实体
@lombok.Data@Entity@Documentpublic class Person extends AuditedEntity {// @Id will be identified automatically// @Field(type = FieldType.Long)@Id@GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; @MultiFields(mainField = @Field(type = FieldType.Text),otherFields = { @InnerField(suffix = "raw", type = FieldType.Keyword), @InnerField(suffix = "english", type = FieldType.Text, analyzer = "english")}) private String name; @Field(type = FieldType.Keyword) private String gender; @Field(type = FieldType.Integer) private Integer age; // don't add @Field to test whether ignored// @Field(type = FieldType.Text)private String description; @Field(type = FieldType.Object) @ManyToOne(fetch = FetchType.EAGER) @JoinColumn(name = "group_id") private PersonGroup personGroup;}
@lombok.Data@Entity@Document(type = "person_group", alias = "person_group_alias")public class PersonGroup extends AuditedEntity {@Id@GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; private String name; private String interests; @OneToMany(fetch = FetchType.EAGER, cascade = CascadeType.ALL, mappedBy = "personGroup", orphanRemoval = true) private List<Person> persons; private String description; @DocumentId@Field(type = FieldType.Long) public Long getId() { return id;} @MultiFields(mainField = @Field(type = FieldType.Text),otherFields = { @InnerField(suffix = "raw", type = FieldType.Keyword), @InnerField(suffix = "english", type = FieldType.Text, analyzer = "english")}) public String getName() { return name;} @Field(type = FieldType.Text) public String getInterests() { return interests;} @Field(type = FieldType.Nested) public List<Person> getPersons() { return persons;} /*** 不加Field注解,测试序列化时是否忽略*/public String getDescription() { return description;}
}
当更新实体时,文档会被自动索引到ElasticSearch;删除实体时,文档会自动从ElasticSearch删除。
搜索API
启动搜索服务jkes-search-service,搜索服务是一个Spring Boot Application,提供rest搜索api,默认运行在9000端口。
URI query
curl -XPOST localhost:9000/api/v1/integration_test_person_group/person_group/_search?from=3&size=10
Nested query
integration_test_person_group/person_group/_search?from=0&size=10{ "query": { "nested": { "path": "persons", "score_mode": "avg", "query": { "bool": { "must": [{ "range": { "persons.age": { "gt": 5}}}]}}}}
}
match query
integration_test_person_group/person_group/_search?from=0&size=10{ "query": { "match": { "interests": "Hadoop"}}
}
bool query
{"query": {"bool" : {"must" : {"match" : { "interests" : "Hadoop" } },"filter": {"term" : { "name.raw" : "name0" } },"should" : [{ "match" : { "interests" : "Flink" } },{"nested" : { "path" : "persons", "score_mode" : "avg", "query" : { "bool" : { "must" : [ { "match" : {"persons.name" : "name40"} }, { "match" : {"persons.interests" : "interests"} } ], "must_not" : { "range" : { "age" : { "gte" : 50, "lte" : 60 } } } } } } }],"minimum_should_match" : 1,"boost" : 1.0} }}
Source filtering
integration_test_person_group/person_group/_search
{ "_source": false, "query" : { "match" : { "name" : "name17" }}
}
integration_test_person_group/person_group/_search
{ "_source": { "includes": [ "name", "persons.*" ], "excludes": [ "date*", "version", "persons.age" ]}, "query" : { "match" : { "name" : "name17" }}
}
prefix
integration_test_person_group/person_group/_search
{ "query": { "prefix" : { "name" : "name" }}
}
wildcard
integration_test_person_group/person_group/_search
{ "query": { "wildcard" : { "name" : "name*" }}
}
regexp
integration_test_person_group/person_group/_search
{ "query": { "regexp":{ "name": "na.*17"}}
}
Jkes工作原理
索引工作原理:
应用启动时,Jkes扫描所有标注
@Document
注解的实体,为它们构建元数据。基于构建的元数据,创建
index
和mapping
Json格式的配置,然后通过ElasticSearch Java Rest Client
将创建/更新index
配置。为每个文档创建/更新
Kafka ElasticSearch Connector
,用于创建/更新文档为整个项目启动/更新
Jkes Deleter Connector
,用于删除文档拦截数据操作方法。将
* save(*)
方法返回的数据包装为SaveEvent
保存到EventContainer
;使用(* delete*(..)
方法的参数,生成一个DeleteEvent/DeleteAllEvent
保存到EventContainer
。拦截事务。在事务提交后使用
JkesKafkaProducer
发送SaveEvent
中的实体到Kafka,Kafka会使用我们提供的JkesJsonSerializer
序列化指定的数据,然后发送到Kafka。与
SaveEvent
不同,DeleteEvent
会直接被序列化,然后发送到Kafka,而不是只发送一份数据与
SaveEvent
和DeleteEvent
不同,DeleteAllEvent
不会发送数据到Kafka,而是直接通过ElasticSearch Java Rest Client
删除相应的index
,然后重建该索引,重启Kafka ElasticSearch Connector
查询工作原理:
查询服务通过rest api提供
我们没有直接使用ElasticSearch进行查询,因为我们需要在后续版本使用机器学习进行搜索排序,而直接与ElasticSearch进行耦合,会增加搜索排序API的接入难度
查询服务是一个Spring Boot Application,使用docker打包为镜像
查询服务提供多版本API,用于API进化和兼容
查询服务解析
json
请求,进行一些预处理后,使用ElasticSearch Java Rest Client
转发到ElasticSearch,将得到的响应进行解析,进一步处理后返回到客户端。为了便于客户端人员开发,查询服务提供了一个查询UI界面,开发人员可以在这个页面得到预期结果后再把json请求体复制到程序中。
流程图
模块介绍
jkes-core
jkes-core
是整个jkes
的核心部分。主要包括以下功能:
annotation
包提供了jkes的核心注解elasticsearch
包封装了elasticsearch
相关的操作,如为所有的文档创建/更新索引,更新mappingkafka
包提供了Kafka 生产者,Kafka Json Serializer,Kafka Connect Clientmetadata
包提供了核心的注解元数据的构建与结构化模型event
包提供了事件模型与容器exception
包提供了常见的Jkes异常http
包基于Apache Http Client
封装了常见的http json请求support
包暴露了Jkes核心配置支持util
包提供了一些工具类,便于开发。如:Asserts, ClassUtils, DocumentUtils, IOUtils, JsonUtils, ReflectionUtils, StringUtils
jkes-boot
jkes-boot
用于与一些第三方开源框架进行集成。
当前,我们通过jkes-spring-data-jpa
,提供了与spring data jpa
的集成。通过使用Spring的AOP机制,对Repository
方法进行拦截,生成SaveEvent/DeleteEvent/DeleteAllEvent
保存到EventContainer
。通过使用我们提供的SearchPlatformTransactionManager
,对常用的事务管理器(如JpaTransactionManager
)进行包装,提供事务拦截功能。
在后续版本,我们会提供与更多框架的集成。
jkes-spring-data-jpa
说明:
ContextSupport
类用于从bean工厂获取Repository Bean
@EnableJkes
让客户端能够轻松开启Jkes的功能,提供了与Spring一致的配置模型EventSupport
处理事件的细节,在保存和删除数据时生成相应事件存放到EventContainer
,在事务提交和回滚时处理相应的事件SearchPlatformTransactionManager
包装了客户端的事务管理器,在事务提交和回滚时加入了回调hook
audit
包提供了一个简单的AuditedEntity
父类,方便添加审计功能,版本信息可用于结合ElasticSearch
的版本机制保证不会索引过期文档数据exception
包封装了常见异常intercept
包提供了AOP切点和切面index
包提供了全量索引
功能。当前,我们提供了基于线程池
的索引机制和基于ForkJoin
的索引机制。在后续版本,我们会重构代码,增加基于阻塞队列
的生产者-消费者
模式,提供并发性能
jkes-services
jkes-services
主要用来提供一些服务。 目前,jkes-services
提供了以下服务:
jkes-delete-connector
jkes-delete-connector
是一个Kafka Connector
,用于从kafka集群获取索引删除事件(DeleteEvent
),然后使用Jest Client
删除ElasticSearch中相应的文档。借助于Kafka Connect的rest admin api,我们轻松地实现了多租户平台上的文档删除功能。只要为每个项目启动一个
jkes-delete-connector
,就可以自动处理该项目的文档删除工作。避免了每启动一个新的项目,我们都得手动启动一个Kafka Consumer来处理该项目的文档删除工作。尽管可以通过正则订阅来减少这样的工作,但是还是非常不灵活
jkes-search-service
jkes-search-service
是一个restful的搜索服务,提供了多版本的rest query api。查询服务提供多版本API,用于API进化和兼容jkes-search-service
目前支持URI风格的搜索和JSON请求体风格的搜索。我们没有直接使用ElasticSearch进行查询,因为我们需要在后续版本使用机器学习进行搜索排序,而直接与ElasticSearch进行耦合,会增加搜索排序的接入难度
查询服务是一个Spring Boot Application,使用docker打包为镜像
查询服务解析
json
请求,进行一些预处理后,使用ElasticSearch Java Rest Client
转发到ElasticSearch,将得到的响应进行解析,进一步处理后返回到客户端。为了便于客户端人员开发,查询服务提供了一个查询UI界面,开发人员可以在这个页面得到预期结果后再把json请求体复制到程序中。
后续,我们将会基于zookeeper
构建索引集群,提供集群索引管理功能
jkes-integration-test
jkes-integration-test
是一个基于Spring Boot集成测试项目,用于进行功能测试
。同时测量一些常见操作的吞吐率
开发
To build a development version you'll need a recent version of Kafka. You can build jkes with Maven using the standard lifecycle phases.
Contribute
Source Code: https://github.com/chaokunyang/jkes
Issue Tracker: https://github.com/chaokunyang/jkes/issues
LICENSE
This project is licensed under Apache License 2.0.
基于Java、Kafka、ElasticSearch的搜索框架的设计与实现相关推荐
- 基于Java的开源 Carrot2 搜索结果聚合聚类引擎 2.0发布了
基于Java的开源 Carrot2 搜索结果聚合聚类引擎 2.0发布了 专注是不断前进的指南针 --- 题记 基于Java的开源 Carrot2 搜索结果聚合聚类引擎 2.0发布了. Carrot ...
- 基于Java的开源 Carrot2 搜索结果聚合聚类引擎
来自这里:http://blog.csdn.net/accesine960/archive/2006/09/25/1282935.aspx 基于Java的开源 Carrot2 搜索结果聚合聚类引擎 2 ...
- 毕业设计分享----基于Java的个性化博客空间的设计与实现
一.大宇想说的话 大宇大学毕业已经四个月了,这四个月有懒惰的时候,有努力的时候.回首自己走过的路,只有毕业设计最能浓缩我的所学.我早就有这个打算:把自己的毕业设计与大家分享,但一直没有用行动落实.这个 ...
- 基于java的千千影评网站的设计与实现(论文+程序设计源码+数据库文件)
摘要:信息技术高度发达的今天,新闻业已经在互联网行业中占越发主导地位.而我们的生活也跟新闻息息相关,尤其是在高度发达的精神文化社会,人们对于电影的喜爱也越来越热衷,但想挑到自己喜爱的片子,就需要影评网 ...
- 基于java的网络在线考试管理系统的设计与实现--毕业开题报告
基于java的网络在线考试管理系 统的设计与实现开题报告–毕业设计 最近grace刚完成毕业设计 通过了赶紧来给宝贝们分享我的成果哈哈 设计题目:基于java的网络在线考试管理系统的设计与实现 一.选 ...
- java毕业设计——基于Java+Bootstrap+Mysql的电影评论网站设计与实现(毕业论文+程序源码)——电影评论网站
基于Java+Bootstrap+Mysql的电影评论网站设计与实现(毕业论文+程序源码) 大家好,今天给大家介绍基于Java+Bootstrap+Mysql的电影评论网站设计与实现,文章末尾附有本毕 ...
- 基于Java的校园“研帮”系统的设计与实现 毕业设计-附源码201433
基于Java的校园"研帮"系统的设计与实现 摘 要 由于数据库和数据仓库技术的快速发展,学校共享信息系统建设越来越向模块化.智能化.自我服务和管理科学化的方向发展.学校共享信息系统 ...
- 【java毕业设计】基于java+swing+Eclipse的推箱子游戏设计与实现(毕业论文+程序源码)——推箱子游戏
基于java+swing+Eclipse的推箱子游戏设计与实现(毕业论文+程序源码) 大家好,今天给大家介绍基于java+swing+Eclipse的推箱子游戏设计与实现,文章末尾附有本毕业设计的论文 ...
- 【java毕业设计】基于java+swing+Eclipse的俄罗斯方块游戏GUI设计与实现(毕业论文+程序源码)——俄罗斯方块游戏
基于java+swing+Eclipse的俄罗斯方块游戏GUI设计与实现(毕业论文+程序源码) 大家好,今天给大家介绍基于java+swing+Eclipse的俄罗斯方块游戏GUI设计与实现,文章末尾 ...
最新文章
- 查看回调几个选项含义_C 盘总是莫名变大?更改这个文件位置至少腾出几个 G !...
- 近世代数--整环的商域--整环D扩充为域Q
- VTK:Math之PerpendicularVector
- 前后端分离Java后端跨越问题解决
- 用对 gitignore
- 河北软件职业技术学院计算机专业分数线,河北软件职业技术学院录取分数线2021是多少分(附历年录取分数线)...
- JavaScript自适应图片大小的弹出窗口
- python中int什么意思_python3中int(整型)的使用教程
- 20145129 《Java程序设计》第3周学习总结
- C#基础5:字符串操作
- scrapy框架Selector提取数据
- I/O模型+Nginx基本配置
- python实现微信接口——itchat模块
- 计算机专业实训图片,实训一图片的简单处理_计算机软件及应用_IT计算机_专业资料...
- 自建比赛服务器,王者荣耀自建比赛创建功能是什么 王者荣耀如何创建自建比赛...
- 全面提升转化率和客单价的方法和技巧
- time.h时间函数
- 【JAVA案例】判断电话号码运营商
- LCS(longest common sequence)算法的实现(十分详细)
- Google翻译API的使用
热门文章
- 软件测试工资高还是运维高,IT行业的6大热门岗位,薪酬都有多高?
- 计算机软件大专证,在哪报名大专证怎么报考
- 安徽工业大学计算机考研历年分数线,安徽工业大学历年考研分数线汇总[2012-2021]...
- 无符号有符号乘法_刘帅嵌入式系统-乘法指令
- 刚毕业的他仅用1年就拿下了年薪30W的阿里数据分析岗
- 做数学题比统一世界更爽,你会怎么做呢?
- Transform机制(1)
- java set循环取值_java循环遍历类属性 get 和set值方法
- java lambda max_在Java中使用Lambda表达式查找Max
- 如何打开java_怎样运行java