Flink之流处理API之Sink
Sink
Flink没有类似于spark中foreach方法,让用户进行迭代的操作。虽有对外的输出操作都要利用Sink完成。最后通过类似如下方式完成整个任务最终输出操作。
myDstream.addSink(new MySink(xxxx))
官方提供了一部分的框架的sink。除此以外,需要用户自定义实现sink。
为了保证端到端的一次精确记录传递(除了exactly-once状态语义),数据接收器需要参与检查点机制。下表列出了Flink和捆绑的接收器的交付保证(假设exactly-once更新):
一、Kafka
- pom.xml文件
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 --> <dependency><groupId>org.apache.flink</groupId><!--<artifactId>flink-connector-kafka_2.12</artifactId>--><artifactId>flink-connector-kafka-0.10_2.12</artifactId><version>1.7.2</version> </dependency> <dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.47</version> </dependency>
- FlinkKafkaUtil中增加方法
package com.lxk.util
import java.util.Propertiesimport org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010, FlinkKafkaProducer010}object FlinkKafkaUtil {val prop = new Properties()val brokerList = "192.168.18.103:9092,192.168.18.104:9092,192.168.18.105:9092"prop.setProperty("bootstrap.servers", brokerList)prop.setProperty("zookeeper.connect", "192.168.18.103:2181,192.168.18.104:2181,192.168.18.105:2181")prop.setProperty("group.id", "gmall")def getConsumer(topic: String): FlinkKafkaConsumer010[String] = {//消费Kafka数据//Flink’s Kafka consumer is called FlinkKafkaConsumer08 (// or 09 for Kafka 0.9.0.x versions, etc.// or just FlinkKafkaConsumer for Kafka >= 1.0.0 versions).val myKafkaConsumer: FlinkKafkaConsumer010[String] = new FlinkKafkaConsumer010[String](topic, new SimpleStringSchema(), prop)myKafkaConsumer}def getProducer(topic:String): FlinkKafkaProducer010[String] ={new FlinkKafkaProducer010[String](brokerList,topic,new SimpleStringSchema())}
}
- 主函数中添加sink
package com.lxk.serviceimport com.alibaba.fastjson.JSON
import com.lxk.bean.UserLog
import com.lxk.util.FlinkKafkaUtil
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010, FlinkKafkaProducer010}
import org.apache.flink.api.scala._object StartupApp {def main(args: Array[String]): Unit = {val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.createRemoteEnvironment("node03", 6123,"F:\\10_code\\scala\\flink2020\\target\\flink-streaming-scala_2.11-1.0-SNAPSHOT.jar")val kafkaConsumer: FlinkKafkaConsumer010[String] = FlinkKafkaUtil.getConsumer("GMALL_STARTUP")val dstream: DataStream[String] = environment.addSource(kafkaConsumer)//val userLogStream: DataStream[UserLog] = dstream.map { userLog => JSON.parseObject(userLog, classOf[UserLog]) }val myKafkaProducer: FlinkKafkaProducer010[String] = FlinkKafkaUtil.getProducer("sink_kafka")dstream.addSink(myKafkaProducer)//dstream.print()environment.execute()}
}
- output:
二、Redis
- pom.xml文件
<!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis --> <dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version> </dependency>
- FlinkRedisUtil中增加方法
package com.lxk.utilimport org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}object FlinkRedisUtil {val conf = new FlinkJedisPoolConfig.Builder().setHost("192.168.18.151").setPort(6379).setPassword("redis_zy_passpwd").build()def getRedisSink(): RedisSink[(String, String)] = {new RedisSink[(String, String)](conf, new MyRedisMapper)}class MyRedisMapper extends RedisMapper[(String, String)] {override def getCommandDescription: RedisCommandDescription = {new RedisCommandDescription(RedisCommand.HSET, "channel_count")//new RedisCommandDescription(RedisCommand.SET)}override def getKeyFromData(t: (String, String)): String = t._1override def getValueFromData(t: (String, String)): String = t._2.toString}
}
- 主函数中添加sink
package com.lxk.serviceimport com.alibaba.fastjson.JSON
import com.lxk.bean.UserLog
import com.lxk.util.{FlinkKafkaUtil, FlinkRedisUtil}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, SplitStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010, FlinkKafkaProducer010}
import org.apache.flink.streaming.connectors.redis.RedisSinkobject StartupApp04 {def main(args: Array[String]): Unit = {val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval kafkaConsumer: FlinkKafkaConsumer010[String] = FlinkKafkaUtil.getConsumer("GMALL_STARTUP")val dstream: DataStream[String] = environment.addSource(kafkaConsumer)//求各个渠道的累计个数val userLogDstream: DataStream[UserLog] = dstream.map {JSON.parseObject(_, classOf[UserLog])}val keyedStream: KeyedStream[(String, Int), Tuple] = userLogDstream.map(userLog => (userLog.channel, 1)).keyBy(0)//reduce //sumval sumStream: DataStream[(String, Int)] = keyedStream.reduce { (ch1, ch2) => (ch1._1, ch1._2 + ch2._2) }val redisSink: RedisSink[(String, String)] = FlinkRedisUtil.getRedisSink()sumStream.map(chCount => (chCount._1, chCount._2 + "")).addSink(redisSink)dstream.print()environment.execute()}
}
- output:
三、Elasticsearch
- pom.xml文件
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-elasticsearch6 --> <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch6_2.12</artifactId><version>1.7.2</version> </dependency> <dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactId><version>4.5.3</version> </dependency>
- FlinkEsUtil中增加方法
package com.lxk.util
import java.utilimport com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.Requestsobject FlinkEsUtil {val httpHosts = new util.ArrayList[HttpHost]httpHosts.add(new HttpHost("node03", 9200, "http"))httpHosts.add(new HttpHost("node04", 9200, "http"))httpHosts.add(new HttpHost("node05", 9200, "http"))def getElasticSearchSink(indexName: String): ElasticsearchSink[String] = {val esFunc = new ElasticsearchSinkFunction[String] {override def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer): Unit = {println("试图保存:" + element)val jsonObj: JSONObject = JSON.parseObject(element)val indexRequest: IndexRequest = Requests.indexRequest().index(indexName).`type`("userlog").source(jsonObj)indexer.add(indexRequest)println("保存1条")}}val sinkBuilder = new ElasticsearchSink.Builder[String](httpHosts, esFunc)//刷新前缓冲的最大动作量sinkBuilder.setBulkFlushMaxActions(10)sinkBuilder.build()}
}
- 在main方法中调用
package com.lxk.serviceimport com.alibaba.fastjson.JSON
import com.lxk.bean.UserLog
import com.lxk.util.{FlinkEsUtil, FlinkKafkaUtil, FlinkRedisUtil}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.connectors.redis.RedisSinkobject StartupApp05 {def main(args: Array[String]): Unit = {val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval kafkaConsumer: FlinkKafkaConsumer010[String] = FlinkKafkaUtil.getConsumer("GMALL_STARTUP")val dstream: DataStream[String] = environment.addSource(kafkaConsumer)// 明细发送到es 中val esSink: ElasticsearchSink[String] = FlinkEsUtil.getElasticSearchSink("gmall_startup")dstream.addSink(esSink)//dstream.print()environment.execute()}
}
- output:
数据源
主函数结果:
es持久化结果:
四、JDBC 自定义sink--实现RichSinkFunction接口
- pom.xml文件
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java --> <dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.44</version> </dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.1.10</version> </dependency>
- 添加MyJdbcSink
package com.lxk.utilimport java.sql.{Connection, PreparedStatement}
import java.util.Propertiesimport com.alibaba.druid.pool.DruidDataSourceFactory
import javax.sql.DataSource
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.RichSinkFunctionclass MyJdbcSink(sql:String ) extends RichSinkFunction[Array[Any]] {val driver="com.mysql.jdbc.Driver"val url="jdbc:mysql://node04:3306/gmall2020?useSSL=false"val username="root"val password="123"val maxActive="20"var connection:Connection=null;//创建连接override def open(parameters: Configuration): Unit = {val properties = new Properties()properties.put("driverClassName",driver)properties.put("url",url)properties.put("username",username)properties.put("password",password)properties.put("maxActive",maxActive)val dataSource: DataSource = DruidDataSourceFactory.createDataSource(properties)connection = dataSource.getConnection()}//反复调用override def invoke(values: Array[Any]): Unit = {val ps: PreparedStatement = connection.prepareStatement(sql )println(values.mkString(","))for (i <- 0 until values.length) {ps.setObject(i + 1, values(i))}ps.executeUpdate()}override def close(): Unit = {if(connection!=null){connection.close()}}
}
- 在main方法中增加( 把明细保存到mysql中)
package com.lxk.serviceimport com.alibaba.fastjson.JSON
import com.lxk.bean.UserLog
import com.lxk.util.{ FlinkKafkaUtil, MyJdbcSink}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010object StartupApp06 {def main(args: Array[String]): Unit = {val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval kafkaConsumer: FlinkKafkaConsumer010[String] = FlinkKafkaUtil.getConsumer("GMALL_STARTUP")val dstream: DataStream[String] = environment.addSource(kafkaConsumer)val userLogStream: DataStream[UserLog] = dstream.map { userLog => JSON.parseObject(userLog, classOf[UserLog]) }//val userLogStream: DataStream[UserLog] = dstream.map{ JSON.parseObject(_,classOf[UserLog])}val jdbcSink = new MyJdbcSink("insert into userlog values(?,?,?,?,?,?,?,?)")userLogStream.map(userLog =>Array(userLog.dateToday, userLog.area, userLog.uid, userLog.os, userLog.channel, userLog.appid, userLog.ver, userLog.timestamp)).addSink(jdbcSink)//dstream.print()environment.execute()}
}
- output:
数据源
主函数结果:
mysql持久化结果:
Flink之流处理API之Sink相关推荐
- Flink教程(10)- Flink批流一体API(其它)
文章目录 01 引言 02 累加器 2.1 相关API 2.2 示例代码 03 广播变量 3.1 原理 3.2 示例代码 04 分布式缓存 4.1 原理 4.2 示例代码 05 文末 01 引言 在前 ...
- Flink教程(09)- Flink批流一体API(Connectors示例)
文章目录 01 引言 02 Connectors 2.1 Flink目前支持的Connectors 2.2 JDBC案例 2.3 Kafa案例 2.3.1 Kafa相关命令 2.3.2 Kafka C ...
- Flink教程(07)- Flink批流一体API(Transformation示例)
文章目录 01 引言 02 Transformation 2.1 基本操作 2.1.1 API 解析 2.1.2 示例代码 2.2 合并 2.2.1 union 2.2.2 connect 2.2.3 ...
- Flink教程(06)- Flink批流一体API(Source示例)
文章目录 01 引言 02 Source 2.1 基于集合的Source 2.2 基于文件的Source 2.3 基于Socket的Source 2.4 自定义Source 2.4.1 案例 - 随机 ...
- flink读取不到文件_Flink流处理API——Source
本文主要从以下几个方面介绍Flink的流处理API--Source 一.从集合中读取数据 二.从文件中读取数据 三.从Kafka中读取数据 四.自定义Source 数据处理的过程基本可以分为三个阶段分 ...
- 9.FLINK Sink\API\自定义sink
9.Sink 9.1.API 9.2.注意 9.3.自定义sink 9.Sink 9.1.API 1.ds.print 直接输出到控制台 2.ds.printToErr()直接输出到控制台,用红色 3 ...
- Flink常见流处理API
Flink 流处理API的编程可以分为environment,source,transform,sink四大部分 1 Flink支持的数据类型 在Flink底层因为要对所有的数据序列化,反序列化对 ...
- Iceberg 在基于 Flink 的流式数据入库场景中的应用
本文以流式数据入库的场景为基础,介绍引入 Iceberg 作为落地格式和嵌入 Flink sink 的收益,并分析了当前可实现的框架及要点. 应用场景 流式数据入库,是大数据和数据湖的典型应用场景.上 ...
- 如何在 Apache Flink 中使用 Python API?
本文根据 Apache Flink 系列直播课程整理而成,由 Apache Flink PMC,阿里巴巴高级技术专家 孙金城 分享.重点为大家介绍 Flink Python API 的现状及未来规划, ...
最新文章
- python数码时钟代码_Python+Pyqt实现简单GUI电子时钟
- Logtail 混合模式:使用插件处理文件日志
- 修改某个UITextField的键盘的返回键类型以及监听键盘的高度变化,取到键盘动画退出弹出的时间,一起随着键盘顶出来或者压下去,...
- python做审计底稿视频_最新Python教学视频,每天自学俩小时,让你offer拿到手软...
- iPhone11系列新配色售空 暗夜绿溢价超过500元
- css工程师技巧,web前端工程师必须掌握的技巧–CSS Sprites技术(附基础操作教程)...
- 织梦php 文章采集规则,如何正确写DedeCms采集规则
- 我常去的编程技术网站[最近更新:2011.07.10
- Bootstrap如何设置table样式
- 员工转正述职答辩问什么问题_新员工转正述职答辩.ppt
- 吃饭速度跟肥胖挂钩,吃太快容易长胖
- 正好配资点评北交所成立,新基建起爆
- java-php-python-ssm-民航售票管理系统-计算机毕业设计
- 华为鸿蒙摄像头,随时随地看一看!华为首款鸿蒙智能摄像头发布
- Shell编程扩展正则表达式(egrep、awk)
- 在linux下安装xp系统
- DA1458x使用之第四篇——ADC
- iPad内置时钟走快 苹果不允许第三方应用校准
- Nu2menu 插件
- MySQL最左匹配原则,道儿上兄弟都得知道的原则