自定义flink es source
1、 需求
增量导入elasticsearch的数据到kafka。
2、 解决方式
1) 自定义一个flume的essource
2)使用spark 的 es rdd
3) 自定义flink的es source
3、解决问题
1) 思路:es中的数据有一个sendTime。也就是发送到es的时间。我们就根据这个时间来增量采集数据。使用es的
transport api。并且使用scorll api来分页。所以我们使用自定义es source 。首先我们是要继承SourceFunction这个类。在run方法中实现查找逻辑。
2)注意点
假如我们的程序挂掉了怎么办。怎么知道我们采集到了哪个时间段呢?~~
这个问题我是这样想的的 首先我是5分钟采集一次。然后记录好每五分钟采集的的条数,es的index,采集的时间段。采集成功了就写入到mysql表中做记录。失败也会写入记录失败。然后如果是因为异常采集失败了。那么就重新采集。采集三次还失败程序就直接退出。然后检查原因再重新启动程序。重新启动先去mysql读取上一次采集的位置。然后从下一次记录开始采集。
2)代码:es -source 是scala代码
package com.rongan.sourceimport java.util.Dateimport com.rongan.commos.{DateUtils, EsUtils, PropertiesUtil}
import com.rongan.constants.Constants
import com.rongan.dao.EsExportRecordDAO
import com.rongan.model.EsExportRecord
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.elasticsearch.search.SearchHitimport scala.util.control.Breaks.{break, breakable}/*** 自定义es的数据源** @param clusterName :集群名称* @param esNode :集群节点* @param esPort :es通信端口* @param index :索引名字* @param type1 :tpye*/
class EsSource(val clusterName: String, val esNode: String, val esPort: Int, val index: String, val type1: String, var fromDate: String) extends SourceFunction[String] {//判断是否取消运行var isRunning = true//es的客户端EsUtils.getClient(clusterName, esNode, esPort)val properties = PropertiesUtil.getProperties(Constants.PROPERTIES_PATH)override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {//定义一个标志位,标志这是第一次采集var flag = true;//创建客户端EsUtils.getClient(clusterName, esNode, esPort)var toDate = fromDatevar fromDate1 = fromDatevar errorCount = 0;//开始采集数据while (true && isRunning) {//判断是否是第一次采集。创建lastUpdateTime的采集时间if (flag) {fromDate1 = toDate;flag = false}else fromDate1 = DateUtils.targetFormat(DateUtils.add5Minute(DateUtils.strToDate(fromDate1)))toDate = DateUtils.targetFormat(DateUtils.subtraction1second(DateUtils.add5Minute(DateUtils.strToDate(fromDate1))))try {var startTime = DateUtils.targetFormat(new Date())println("start collection data index = " + index + " send_time (start)= " + fromDate1 + " send_time (end)= "+ toDate + " currentTime" + startTime)val count: Int = collect(sourceContext, fromDate1, toDate)var endTime = DateUtils.targetFormat(new Date())EsExportRecordDAO.updateRecord(EsExportRecord(fromDate1, toDate, count, startTime, endTime, 1, index))errorCount = 0println("end of data collection index = " + index + " send_time (start)= " + fromDate1 + " send_time (end)= "+ toDate + " currentTime " + endTime + " count data = " + count)Thread.sleep(properties.getProperty(Constants.ES_COLLECT_INTERVAL).toLong)} catch {case e: Exception => {e.printStackTrace()errorCount += 1println("采集数据出错 index = " + index + " send_time (开始)= " + fromDate1 + " send_time (结束) ")EsExportRecordDAO.updateRecord(EsExportRecord(fromDate1, "00000000", 0, "00000000", "00000000", 0, index))fromDate1 = DateUtils.targetFormat(DateUtils.subtraction5Minute(DateUtils.strToDate(fromDate1)))//如果采集三次失败那么就停止程序if (errorCount >= 3) {cancel()}}}}}//采集数据def collect(sourceContext: SourceFunction.SourceContext[String], fromDate: String, toDate: String) = {var count = 0;val tuple: (Array[SearchHit], String) = EsUtils.searchByScrollRangeQuery(index, type1, "send_time.keyword", fromDate, toDate)count = tuple._1.lengthfor (hit <- tuple._1) {sourceContext.collect(hit.getSourceAsString)}var scrollID = tuple._2// println(new Date().toString + " count= " + count)breakable {while (isRunning) {val result: (Array[SearchHit], String) = EsUtils.searchByScrollId(scrollID)if (result._1.length == 0) {break;}for (hit <- result._1) {sourceContext.collect(hit.getSourceAsString)}count += result._1.lengthscrollID = result._2}}EsUtils.clearScroll(scrollID)count}override def cancel(): Unit = {isRunning = false}}//kafkatopic :roi-center.incident.detail.topicobject EsCollect {}
4.整个项目代码请留言~。暂时就是实现这么多。如有更好的想法可以讨论讨论~
esutil代码:
package rongan.utilimport org.elasticsearch.action.search.{ClearScrollResponse, SearchRequestBuilder, SearchResponse}
import org.elasticsearch.client.transport.TransportClient
import org.elasticsearch.common.transport.TransportAddress
import org.elasticsearch.common.unit.TimeValue
import org.elasticsearch.index.query.QueryBuilders
import org.elasticsearch.search.SearchHit
import org.elasticsearch.search.sort.SortOrder
import rongan.business.tornado.RsdTornadoIpcDeviceEsToHbase.properties
import rongan.config.Constansimport scala.util.control.Breaks.{break, breakable}object EsUtils {import java.net.InetAddressimport org.elasticsearch.common.settings.Settingsimport org.elasticsearch.transport.client.PreBuiltTransportClient//创建clientvar client: TransportClient = _def getClient(clusterName: String, host: String, port: Int) = {val settings: Settings = Settings.builder().put("cluster.name", clusterName).buildclient = new PreBuiltTransportClient(settings).addTransportAddress(new TransportAddress(InetAddress.getByName(host), port))}/*** 该方法用于做范围查询** @param index :索引名* @param `type` :type 的名字* @param field : 要根据哪个字段的范围来查询* @param fromData :开头的数据* @param toData :结束的数据* @return scroollId*/def searchByScrollRangeQuery(index: String, `type`: String, field: String, fromData: Any, toData: Any) = {//1.创建搜索条件val searchRequestBuilder: SearchRequestBuilder = client.prepareSearch()searchRequestBuilder.setIndices(index)searchRequestBuilder.setTypes(`type`)searchRequestBuilder.setScroll(new TimeValue(30000))//2.设置根据范围查询searchRequestBuilder.setQuery(QueryBuilders.rangeQuery(field).from(fromData).to(toData)).setSize(10000)searchRequestBuilder.addSort("send_time.keyword", SortOrder.ASC)//3.执行查询val searchResponse: SearchResponse = searchRequestBuilder.get//4获取scrollIdval scrollId: String = searchResponse.getScrollId//println("scrollID = " + scrollId)//將这一页的数据和scrollId返回val searchHits: Array[SearchHit] = searchResponse.getHits.getHits(searchHits, scrollId)}/*** 根據scrollId查询数据,只查询一页的数据** @param scrollId1* @return*/def searchByScrollId(scrollId1: String): (Array[SearchHit], String) = {if (scrollId1 == null) {return (Array[SearchHit](), null);}// println(scrollId1)// 结果val searchScrollRequestBuilder = client.prepareSearchScroll(scrollId1)// 重新设定滚动时间searchScrollRequestBuilder.setScroll(new TimeValue(30000))// 请求val response = searchScrollRequestBuilder.get// 每次返回下一个批次结果 直到没有结果返回时停止 即hits数组空时//if (response.getHits.getHits.length == 0) break(response.getHits.getHits, response.getScrollId)}/*** 清除scrollID** @param scrollId*/def clearScroll(scrollId: String) {if (scrollId == null) returnvar clearScrollRequestBuilder = client.prepareClearScrollclearScrollRequestBuilder.addScrollId(scrollId)val response: ClearScrollResponse = clearScrollRequestBuilder.getresponse.isSucceeded}def main(args: Array[String]): Unit = {// searchByScrollPrefixQuery("a", "b", "c", "d")// 左闭合 右闭合。如果是下一个五分钟。最终的秒数要往后面退一位EsUtils.getClient(properties.getProperty(Constans.ES_CLUSTER_NAME), properties.getProperty(Constans.ES_NODE),properties.getProperty(Constans.ES_PORT).toInt)var count = 0;val tuple: (Array[SearchHit], String) = searchByScrollRangeQuery("firewall.ipc.info*","alert", "send_time.keyword", "2019-01-28 19:15:20", "2019-09-28 19:15:2")count = tuple._1.lengthvar scrollID = tuple._2println(count)for (hit <- tuple._1) {println(hit.getSourceAsString)}// EsUtils.getClient("")breakable {while (true) {val result: (Array[SearchHit], String) = searchByScrollId(scrollID)count += result._1.lengthfor (hit <- result._1) {println(hit.getSourceAsString)}if (result._1.length == 0) {break;}scrollID = result._2}println(count)}clearScroll(scrollID)}}
自定义flink es source相关推荐
- Flume+kafka+flink+es 构建大数据实时处理
大数据目前的处理方法有两种:一种是离线处理,一种是实时处理.如何构建我们自己的实时数据处理系统我们选用flume+kafka+flink+es来作为我们实时数据处理工具.因此我们的架构是: flume ...
- Flink的Source端和Sink端大全
Flink和各种组件 enviroment Source flink + kafka (flink 消费 kafka 中的数据) Transform Transformation 的介绍 复杂的方法 ...
- Flink专题-Source
Flink Source 进入flink的数据源大致分为以下几类: 集合 Collection 文件 File Kafka UDF 一般都是使用前三个source源即可,如果想要使用其他数据源就可以自 ...
- Flink之Source
Flink 可以从各种来源获取数据,然后构建 DataStream 进行转换处理.一般将数据的输入来源称为数据源,而读取数据的算子就是源算子(Source).所以,Source 就是我们整个处理程序的 ...
- 【elasticsearch】 flink es sink Connection refused
1.概述 java.net.ConnectException: Connection refusedat sun.nio.ch.SocketChannelImpl.checkConnect(Nativ ...
- 《从0到1学习Flink》—— 如何自定义 Data Source ?
前言 在 <从0到1学习Flink>-- Data Source 介绍 文章中,我给大家介绍了 Flink Data Source 以及简短的介绍了一下自定义 Data Source,这篇 ...
- 1.31.Flink自定义rocketmq(source/sink)+自定义redis source和sink
1.31.Flink自定义rocketmq(source/sink)+自定义redis+自定义 1.31.1.工程结构 1.31.2.定义pom.xml文件 1.31.3.log4j2.propert ...
- Flink SQL 自定义 redis connector
一般情况下,我们不需要创建新的 connector,因为 Flink SQL 已经内置了丰富的 connector 供我们使用,但是在实际生产环境中我们的存储是多种多样的,所以原生的 connecto ...
- Flink编程三大组件(一)——Source
Data Source 就是数据来源. Flink 作为一款流式计算框架,它可用来做批处理,即处理静态的数据集.历史的数据集: 也可以用来做流处理,即实时的处理些实时数据流,实时的产生数据流结果,只要 ...
最新文章
- python动态心形代码-Python数学方程式画心型图案源码示例
- github创建项目,并提交本地文件
- OpenCV之imgproc 模块. 图像处理(1)图像平滑处理 腐蚀与膨胀(Eroding and Dilating) 更多形态学变换 图像金字塔 基本的阈值操作
- [Robot Framework] SikuliLibrary的关键字执行依赖java进程,但是上次的java进程如果没有杀掉,robot framework控制台的日志出不来,怎么办?...
- 怎么用python分析数据_如何用python进行数据分析?
- Python之路--Django--auth认证系统
- asp导出excel文件格式
- 如何玩转互联网金融大数据——征信
- 网站让浏览器崩溃的原因有哪些
- 【FFMPEG】解决截取MP4视频的中间段时,截取完成后前几帧视频卡住,但是有声音的情况
- 数组反转,Java实现
- 安徽大学在校生如何校外访问图书馆资源
- 40岁应该学会的是面对和取舍
- Cell Metabolism:碳水化合物限制饮食对人类肝脂肪变性的快速代谢益处的综合理解
- VBOX虚拟硬件修改
- arcgis终结时间表
- 永磁同步电机基本控制方法
- htk 语音识别 linux,【语音识别】HTK安装及学习
- java版原生app,追剧达人app
- 中兴F460 EPON v3.0 光猫获取超级密码、开启路由功能
热门文章
- python 转义字符 html 爬虫
- 贵金属跌跌不休竞相比惨黄金跌去二位数钯金跌去三位数
- 新基建语境下,网络安全也要换新打法
- 什么是闭包? 闭包有哪些优缺点?
- 天津科技大学Oracle试题,oracle期末考试题及答案
- Java中文处理学习笔记--Hello Unicode
- zui佳情侣身高差问题,专家通过多组情侣身高数据研究发现,zui佳的情侣身高差遵循着一个公式:(女方的身高)×1.09 =(男方的身高)。
- 【掌控板】3、向txt文件写入字符串、声音数据获取
- 网络端口采用了1000M速率时候出现网络通信丢包+IDC机房托管服务器之间通信不畅...
- SpringBoot内置Tomcat启动不了的原因