1、 需求

增量导入elasticsearch的数据到kafka。

2、 解决方式

1) 自定义一个flume的essource

2)使用spark 的 es rdd

3) 自定义flink的es source

3、解决问题

1) 思路:es中的数据有一个sendTime。也就是发送到es的时间。我们就根据这个时间来增量采集数据。使用es的

transport api。并且使用scorll api来分页。所以我们使用自定义es source 。首先我们是要继承SourceFunction这个类。在run方法中实现查找逻辑。

2)注意点

假如我们的程序挂掉了怎么办。怎么知道我们采集到了哪个时间段呢?~~

这个问题我是这样想的的 首先我是5分钟采集一次。然后记录好每五分钟采集的的条数,es的index,采集的时间段。采集成功了就写入到mysql表中做记录。失败也会写入记录失败。然后如果是因为异常采集失败了。那么就重新采集。采集三次还失败程序就直接退出。然后检查原因再重新启动程序。重新启动先去mysql读取上一次采集的位置。然后从下一次记录开始采集。

2)代码:es -source 是scala代码

package com.rongan.sourceimport java.util.Dateimport com.rongan.commos.{DateUtils, EsUtils, PropertiesUtil}
import com.rongan.constants.Constants
import com.rongan.dao.EsExportRecordDAO
import com.rongan.model.EsExportRecord
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.elasticsearch.search.SearchHitimport scala.util.control.Breaks.{break, breakable}/*** 自定义es的数据源** @param clusterName :集群名称* @param esNode      :集群节点* @param esPort      :es通信端口* @param index       :索引名字* @param type1       :tpye*/
class EsSource(val clusterName: String, val esNode: String, val esPort: Int, val index: String, val type1: String, var fromDate: String) extends SourceFunction[String] {//判断是否取消运行var isRunning = true//es的客户端EsUtils.getClient(clusterName, esNode, esPort)val properties = PropertiesUtil.getProperties(Constants.PROPERTIES_PATH)override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {//定义一个标志位,标志这是第一次采集var flag = true;//创建客户端EsUtils.getClient(clusterName, esNode, esPort)var toDate = fromDatevar fromDate1 = fromDatevar errorCount = 0;//开始采集数据while (true && isRunning) {//判断是否是第一次采集。创建lastUpdateTime的采集时间if (flag) {fromDate1 = toDate;flag = false}else fromDate1 = DateUtils.targetFormat(DateUtils.add5Minute(DateUtils.strToDate(fromDate1)))toDate = DateUtils.targetFormat(DateUtils.subtraction1second(DateUtils.add5Minute(DateUtils.strToDate(fromDate1))))try {var startTime = DateUtils.targetFormat(new Date())println("start collection data  index = " + index + " send_time (start)= " + fromDate1 + " send_time (end)= "+ toDate + " currentTime" + startTime)val count: Int = collect(sourceContext, fromDate1, toDate)var endTime = DateUtils.targetFormat(new Date())EsExportRecordDAO.updateRecord(EsExportRecord(fromDate1, toDate, count, startTime, endTime, 1, index))errorCount = 0println("end of data collection index = " + index + " send_time (start)= " + fromDate1 + " send_time (end)= "+ toDate + " currentTime " + endTime + " count data =  " + count)Thread.sleep(properties.getProperty(Constants.ES_COLLECT_INTERVAL).toLong)} catch {case e: Exception => {e.printStackTrace()errorCount += 1println("采集数据出错 index = " + index + " send_time (开始)= " + fromDate1 + " send_time (结束) ")EsExportRecordDAO.updateRecord(EsExportRecord(fromDate1, "00000000", 0, "00000000", "00000000", 0, index))fromDate1 = DateUtils.targetFormat(DateUtils.subtraction5Minute(DateUtils.strToDate(fromDate1)))//如果采集三次失败那么就停止程序if (errorCount >= 3) {cancel()}}}}}//采集数据def collect(sourceContext: SourceFunction.SourceContext[String], fromDate: String, toDate: String) = {var count = 0;val tuple: (Array[SearchHit], String) = EsUtils.searchByScrollRangeQuery(index, type1, "send_time.keyword", fromDate, toDate)count = tuple._1.lengthfor (hit <- tuple._1) {sourceContext.collect(hit.getSourceAsString)}var scrollID = tuple._2// println(new Date().toString + " count= " + count)breakable {while (isRunning) {val result: (Array[SearchHit], String) = EsUtils.searchByScrollId(scrollID)if (result._1.length == 0) {break;}for (hit <- result._1) {sourceContext.collect(hit.getSourceAsString)}count += result._1.lengthscrollID = result._2}}EsUtils.clearScroll(scrollID)count}override def cancel(): Unit = {isRunning = false}}//kafkatopic :roi-center.incident.detail.topicobject EsCollect {}

4.整个项目代码请留言~。暂时就是实现这么多。如有更好的想法可以讨论讨论~

esutil代码:

package rongan.utilimport org.elasticsearch.action.search.{ClearScrollResponse, SearchRequestBuilder, SearchResponse}
import org.elasticsearch.client.transport.TransportClient
import org.elasticsearch.common.transport.TransportAddress
import org.elasticsearch.common.unit.TimeValue
import org.elasticsearch.index.query.QueryBuilders
import org.elasticsearch.search.SearchHit
import org.elasticsearch.search.sort.SortOrder
import rongan.business.tornado.RsdTornadoIpcDeviceEsToHbase.properties
import rongan.config.Constansimport scala.util.control.Breaks.{break, breakable}object EsUtils {import java.net.InetAddressimport org.elasticsearch.common.settings.Settingsimport org.elasticsearch.transport.client.PreBuiltTransportClient//创建clientvar client: TransportClient = _def getClient(clusterName: String, host: String, port: Int) = {val settings: Settings = Settings.builder().put("cluster.name", clusterName).buildclient = new PreBuiltTransportClient(settings).addTransportAddress(new TransportAddress(InetAddress.getByName(host), port))}/*** 该方法用于做范围查询** @param index    :索引名* @param `type`   :type 的名字* @param field    : 要根据哪个字段的范围来查询* @param fromData :开头的数据* @param toData   :结束的数据* @return scroollId*/def searchByScrollRangeQuery(index: String, `type`: String, field: String, fromData: Any, toData: Any) = {//1.创建搜索条件val searchRequestBuilder: SearchRequestBuilder = client.prepareSearch()searchRequestBuilder.setIndices(index)searchRequestBuilder.setTypes(`type`)searchRequestBuilder.setScroll(new TimeValue(30000))//2.设置根据范围查询searchRequestBuilder.setQuery(QueryBuilders.rangeQuery(field).from(fromData).to(toData)).setSize(10000)searchRequestBuilder.addSort("send_time.keyword", SortOrder.ASC)//3.执行查询val searchResponse: SearchResponse = searchRequestBuilder.get//4获取scrollIdval scrollId: String = searchResponse.getScrollId//println("scrollID = " + scrollId)//將这一页的数据和scrollId返回val searchHits: Array[SearchHit] = searchResponse.getHits.getHits(searchHits, scrollId)}/*** 根據scrollId查询数据,只查询一页的数据** @param scrollId1* @return*/def searchByScrollId(scrollId1: String): (Array[SearchHit], String) = {if (scrollId1 == null) {return (Array[SearchHit](), null);}//   println(scrollId1)// 结果val searchScrollRequestBuilder = client.prepareSearchScroll(scrollId1)// 重新设定滚动时间searchScrollRequestBuilder.setScroll(new TimeValue(30000))// 请求val response = searchScrollRequestBuilder.get// 每次返回下一个批次结果 直到没有结果返回时停止 即hits数组空时//if (response.getHits.getHits.length == 0) break(response.getHits.getHits, response.getScrollId)}/*** 清除scrollID** @param scrollId*/def clearScroll(scrollId: String) {if (scrollId == null) returnvar clearScrollRequestBuilder = client.prepareClearScrollclearScrollRequestBuilder.addScrollId(scrollId)val response: ClearScrollResponse = clearScrollRequestBuilder.getresponse.isSucceeded}def main(args: Array[String]): Unit = {//       searchByScrollPrefixQuery("a", "b", "c", "d")//    左闭合 右闭合。如果是下一个五分钟。最终的秒数要往后面退一位EsUtils.getClient(properties.getProperty(Constans.ES_CLUSTER_NAME), properties.getProperty(Constans.ES_NODE),properties.getProperty(Constans.ES_PORT).toInt)var count = 0;val tuple: (Array[SearchHit], String) = searchByScrollRangeQuery("firewall.ipc.info*","alert", "send_time.keyword", "2019-01-28 19:15:20", "2019-09-28 19:15:2")count = tuple._1.lengthvar scrollID = tuple._2println(count)for (hit <- tuple._1) {println(hit.getSourceAsString)}//        EsUtils.getClient("")breakable {while (true) {val result: (Array[SearchHit], String) = searchByScrollId(scrollID)count += result._1.lengthfor (hit <- result._1) {println(hit.getSourceAsString)}if (result._1.length == 0) {break;}scrollID = result._2}println(count)}clearScroll(scrollID)}}

自定义flink es source相关推荐

  1. Flume+kafka+flink+es 构建大数据实时处理

    大数据目前的处理方法有两种:一种是离线处理,一种是实时处理.如何构建我们自己的实时数据处理系统我们选用flume+kafka+flink+es来作为我们实时数据处理工具.因此我们的架构是: flume ...

  2. Flink的Source端和Sink端大全

    Flink和各种组件 enviroment Source flink + kafka (flink 消费 kafka 中的数据) Transform Transformation 的介绍 复杂的方法 ...

  3. Flink专题-Source

    Flink Source 进入flink的数据源大致分为以下几类: 集合 Collection 文件 File Kafka UDF 一般都是使用前三个source源即可,如果想要使用其他数据源就可以自 ...

  4. Flink之Source

    Flink 可以从各种来源获取数据,然后构建 DataStream 进行转换处理.一般将数据的输入来源称为数据源,而读取数据的算子就是源算子(Source).所以,Source 就是我们整个处理程序的 ...

  5. 【elasticsearch】 flink es sink Connection refused

    1.概述 java.net.ConnectException: Connection refusedat sun.nio.ch.SocketChannelImpl.checkConnect(Nativ ...

  6. 《从0到1学习Flink》—— 如何自定义 Data Source ?

    前言 在 <从0到1学习Flink>-- Data Source 介绍 文章中,我给大家介绍了 Flink Data Source 以及简短的介绍了一下自定义 Data Source,这篇 ...

  7. 1.31.Flink自定义rocketmq(source/sink)+自定义redis source和sink

    1.31.Flink自定义rocketmq(source/sink)+自定义redis+自定义 1.31.1.工程结构 1.31.2.定义pom.xml文件 1.31.3.log4j2.propert ...

  8. Flink SQL 自定义 redis connector

    一般情况下,我们不需要创建新的 connector,因为 Flink SQL 已经内置了丰富的 connector 供我们使用,但是在实际生产环境中我们的存储是多种多样的,所以原生的 connecto ...

  9. Flink编程三大组件(一)——Source

    Data Source 就是数据来源. Flink 作为一款流式计算框架,它可用来做批处理,即处理静态的数据集.历史的数据集: 也可以用来做流处理,即实时的处理些实时数据流,实时的产生数据流结果,只要 ...

最新文章

  1. python动态心形代码-Python数学方程式画心型图案源码示例
  2. github创建项目,并提交本地文件
  3. OpenCV之imgproc 模块. 图像处理(1)图像平滑处理 腐蚀与膨胀(Eroding and Dilating) 更多形态学变换 图像金字塔 基本的阈值操作
  4. [Robot Framework] SikuliLibrary的关键字执行依赖java进程,但是上次的java进程如果没有杀掉,robot framework控制台的日志出不来,怎么办?...
  5. 怎么用python分析数据_如何用python进行数据分析?
  6. Python之路--Django--auth认证系统
  7. asp导出excel文件格式
  8. 如何玩转互联网金融大数据——征信
  9. 网站让浏览器崩溃的原因有哪些
  10. 【FFMPEG】解决截取MP4视频的中间段时,截取完成后前几帧视频卡住,但是有声音的情况
  11. 数组反转,Java实现
  12. 安徽大学在校生如何校外访问图书馆资源
  13. 40岁应该学会的是面对和取舍
  14. Cell Metabolism:碳水化合物限制饮食对人类肝脂肪变性的快速代谢益处的综合理解
  15. VBOX虚拟硬件修改
  16. arcgis终结时间表
  17. 永磁同步电机基本控制方法
  18. htk 语音识别 linux,【语音识别】HTK安装及学习
  19. java版原生app,追剧达人app
  20. 中兴F460 EPON v3.0 光猫获取超级密码、开启路由功能

热门文章

  1. python 转义字符 html 爬虫
  2. 贵金属跌跌不休竞相比惨黄金跌去二位数钯金跌去三位数
  3. 新基建语境下,网络安全也要换新打法
  4. 什么是闭包? 闭包有哪些优缺点?
  5. 天津科技大学Oracle试题,oracle期末考试题及答案
  6. Java中文处理学习笔记--Hello Unicode
  7. zui佳情侣身高差问题,专家通过多组情侣身高数据研究发现,zui佳的情侣身高差遵循着一个公式:(女方的身高)×1.09 =(男方的身高)。
  8. 【掌控板】3、向txt文件写入字符串、声音数据获取
  9. 网络端口采用了1000M速率时候出现网络通信丢包+IDC机房托管服务器之间通信不畅...
  10. SpringBoot内置Tomcat启动不了的原因