Sink

Flink没有类似于spark中foreach方法,让用户进行迭代的操作。虽有对外的输出操作都要利用Sink完成。最后通过类似如下方式完成整个任务最终输出操作。

myDstream.addSink(new MySink(xxxx))

官方提供了一部分的框架的sink。除此以外,需要用户自定义实现sink。

  

为了保证端到端的一次精确记录传递(除了exactly-once状态语义),数据接收器需要参与检查点机制。下表列出了Flink和捆绑的接收器的交付保证(假设exactly-once更新):

一、Kafka

  • pom.xml文件
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 -->
<dependency><groupId>org.apache.flink</groupId><!--<artifactId>flink-connector-kafka_2.12</artifactId>--><artifactId>flink-connector-kafka-0.10_2.12</artifactId><version>1.7.2</version>
</dependency>
<dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.47</version>
</dependency>
  • FlinkKafkaUtil中增加方法
package com.lxk.util
import java.util.Propertiesimport org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010, FlinkKafkaProducer010}object FlinkKafkaUtil {val prop = new Properties()val brokerList = "192.168.18.103:9092,192.168.18.104:9092,192.168.18.105:9092"prop.setProperty("bootstrap.servers", brokerList)prop.setProperty("zookeeper.connect", "192.168.18.103:2181,192.168.18.104:2181,192.168.18.105:2181")prop.setProperty("group.id", "gmall")def getConsumer(topic: String): FlinkKafkaConsumer010[String] = {//消费Kafka数据//Flink’s Kafka consumer is called FlinkKafkaConsumer08 (// or 09 for Kafka 0.9.0.x versions, etc.// or just FlinkKafkaConsumer for Kafka >= 1.0.0 versions).val myKafkaConsumer: FlinkKafkaConsumer010[String] = new FlinkKafkaConsumer010[String](topic, new SimpleStringSchema(), prop)myKafkaConsumer}def getProducer(topic:String): FlinkKafkaProducer010[String] ={new FlinkKafkaProducer010[String](brokerList,topic,new SimpleStringSchema())}
}
  • 主函数中添加sink
package com.lxk.serviceimport com.alibaba.fastjson.JSON
import com.lxk.bean.UserLog
import com.lxk.util.FlinkKafkaUtil
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010, FlinkKafkaProducer010}
import org.apache.flink.api.scala._object StartupApp {def main(args: Array[String]): Unit = {val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.createRemoteEnvironment("node03", 6123,"F:\\10_code\\scala\\flink2020\\target\\flink-streaming-scala_2.11-1.0-SNAPSHOT.jar")val kafkaConsumer: FlinkKafkaConsumer010[String] = FlinkKafkaUtil.getConsumer("GMALL_STARTUP")val dstream: DataStream[String] = environment.addSource(kafkaConsumer)//val userLogStream: DataStream[UserLog] = dstream.map { userLog => JSON.parseObject(userLog, classOf[UserLog]) }val myKafkaProducer: FlinkKafkaProducer010[String] = FlinkKafkaUtil.getProducer("sink_kafka")dstream.addSink(myKafkaProducer)//dstream.print()environment.execute()}
}
  • output:

 

二、Redis

  • pom.xml文件
<!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis -->
<dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version>
</dependency>
  • FlinkRedisUtil中增加方法
package com.lxk.utilimport org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}object FlinkRedisUtil {val conf = new FlinkJedisPoolConfig.Builder().setHost("192.168.18.151").setPort(6379).setPassword("redis_zy_passpwd").build()def getRedisSink(): RedisSink[(String, String)] = {new RedisSink[(String, String)](conf, new MyRedisMapper)}class MyRedisMapper extends RedisMapper[(String, String)] {override def getCommandDescription: RedisCommandDescription = {new RedisCommandDescription(RedisCommand.HSET, "channel_count")//new RedisCommandDescription(RedisCommand.SET)}override def getKeyFromData(t: (String, String)): String = t._1override def getValueFromData(t: (String, String)): String = t._2.toString}
}
  • 主函数中添加sink
package com.lxk.serviceimport com.alibaba.fastjson.JSON
import com.lxk.bean.UserLog
import com.lxk.util.{FlinkKafkaUtil, FlinkRedisUtil}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, SplitStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010, FlinkKafkaProducer010}
import org.apache.flink.streaming.connectors.redis.RedisSinkobject StartupApp04 {def main(args: Array[String]): Unit = {val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval kafkaConsumer: FlinkKafkaConsumer010[String] = FlinkKafkaUtil.getConsumer("GMALL_STARTUP")val dstream: DataStream[String] = environment.addSource(kafkaConsumer)//求各个渠道的累计个数val userLogDstream: DataStream[UserLog] = dstream.map {JSON.parseObject(_, classOf[UserLog])}val keyedStream: KeyedStream[(String, Int), Tuple] = userLogDstream.map(userLog => (userLog.channel, 1)).keyBy(0)//reduce //sumval sumStream: DataStream[(String, Int)] = keyedStream.reduce { (ch1, ch2) => (ch1._1, ch1._2 + ch2._2) }val redisSink: RedisSink[(String, String)] = FlinkRedisUtil.getRedisSink()sumStream.map(chCount => (chCount._1, chCount._2 + "")).addSink(redisSink)dstream.print()environment.execute()}
}
  • output: 

三、Elasticsearch  

  • pom.xml文件
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-elasticsearch6 -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch6_2.12</artifactId><version>1.7.2</version>
</dependency>
<dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactId><version>4.5.3</version>
</dependency>
  • FlinkEsUtil中增加方法
package com.lxk.util
import java.utilimport com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.Requestsobject FlinkEsUtil {val httpHosts = new util.ArrayList[HttpHost]httpHosts.add(new HttpHost("node03", 9200, "http"))httpHosts.add(new HttpHost("node04", 9200, "http"))httpHosts.add(new HttpHost("node05", 9200, "http"))def getElasticSearchSink(indexName: String): ElasticsearchSink[String] = {val esFunc = new ElasticsearchSinkFunction[String] {override def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer): Unit = {println("试图保存:" + element)val jsonObj: JSONObject = JSON.parseObject(element)val indexRequest: IndexRequest = Requests.indexRequest().index(indexName).`type`("userlog").source(jsonObj)indexer.add(indexRequest)println("保存1条")}}val sinkBuilder = new ElasticsearchSink.Builder[String](httpHosts, esFunc)//刷新前缓冲的最大动作量sinkBuilder.setBulkFlushMaxActions(10)sinkBuilder.build()}
}
  • 在main方法中调用
package com.lxk.serviceimport com.alibaba.fastjson.JSON
import com.lxk.bean.UserLog
import com.lxk.util.{FlinkEsUtil, FlinkKafkaUtil, FlinkRedisUtil}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.connectors.redis.RedisSinkobject StartupApp05 {def main(args: Array[String]): Unit = {val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval kafkaConsumer: FlinkKafkaConsumer010[String] = FlinkKafkaUtil.getConsumer("GMALL_STARTUP")val dstream: DataStream[String] = environment.addSource(kafkaConsumer)// 明细发送到es 中val esSink: ElasticsearchSink[String] = FlinkEsUtil.getElasticSearchSink("gmall_startup")dstream.addSink(esSink)//dstream.print()environment.execute()}
}
  • output: 

数据源

主函数结果:

es持久化结果:

四、JDBC 自定义sink--实现RichSinkFunction接口

  • pom.xml文件
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.44</version>
</dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.1.10</version>
</dependency>
  • 添加MyJdbcSink
package com.lxk.utilimport java.sql.{Connection, PreparedStatement}
import java.util.Propertiesimport com.alibaba.druid.pool.DruidDataSourceFactory
import javax.sql.DataSource
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.RichSinkFunctionclass MyJdbcSink(sql:String ) extends  RichSinkFunction[Array[Any]] {val driver="com.mysql.jdbc.Driver"val url="jdbc:mysql://node04:3306/gmall2020?useSSL=false"val username="root"val password="123"val maxActive="20"var connection:Connection=null;//创建连接override def open(parameters: Configuration): Unit = {val properties = new Properties()properties.put("driverClassName",driver)properties.put("url",url)properties.put("username",username)properties.put("password",password)properties.put("maxActive",maxActive)val dataSource: DataSource = DruidDataSourceFactory.createDataSource(properties)connection = dataSource.getConnection()}//反复调用override def invoke(values: Array[Any]): Unit = {val ps: PreparedStatement = connection.prepareStatement(sql )println(values.mkString(","))for (i <- 0 until values.length) {ps.setObject(i + 1, values(i))}ps.executeUpdate()}override def close(): Unit = {if(connection!=null){connection.close()}}
}
  •  在main方法中增加( 把明细保存到mysql中)
package com.lxk.serviceimport com.alibaba.fastjson.JSON
import com.lxk.bean.UserLog
import com.lxk.util.{ FlinkKafkaUtil, MyJdbcSink}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010object StartupApp06 {def main(args: Array[String]): Unit = {val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval kafkaConsumer: FlinkKafkaConsumer010[String] = FlinkKafkaUtil.getConsumer("GMALL_STARTUP")val dstream: DataStream[String] = environment.addSource(kafkaConsumer)val userLogStream: DataStream[UserLog] = dstream.map { userLog => JSON.parseObject(userLog, classOf[UserLog]) }//val userLogStream: DataStream[UserLog] = dstream.map{ JSON.parseObject(_,classOf[UserLog])}val jdbcSink = new MyJdbcSink("insert into userlog values(?,?,?,?,?,?,?,?)")userLogStream.map(userLog =>Array(userLog.dateToday, userLog.area, userLog.uid, userLog.os, userLog.channel, userLog.appid, userLog.ver, userLog.timestamp)).addSink(jdbcSink)//dstream.print()environment.execute()}
}
  • output: 

数据源

主函数结果:

mysql持久化结果:

Flink之流处理API之Sink相关推荐

  1. Flink教程(10)- Flink批流一体API(其它)

    文章目录 01 引言 02 累加器 2.1 相关API 2.2 示例代码 03 广播变量 3.1 原理 3.2 示例代码 04 分布式缓存 4.1 原理 4.2 示例代码 05 文末 01 引言 在前 ...

  2. Flink教程(09)- Flink批流一体API(Connectors示例)

    文章目录 01 引言 02 Connectors 2.1 Flink目前支持的Connectors 2.2 JDBC案例 2.3 Kafa案例 2.3.1 Kafa相关命令 2.3.2 Kafka C ...

  3. Flink教程(07)- Flink批流一体API(Transformation示例)

    文章目录 01 引言 02 Transformation 2.1 基本操作 2.1.1 API 解析 2.1.2 示例代码 2.2 合并 2.2.1 union 2.2.2 connect 2.2.3 ...

  4. Flink教程(06)- Flink批流一体API(Source示例)

    文章目录 01 引言 02 Source 2.1 基于集合的Source 2.2 基于文件的Source 2.3 基于Socket的Source 2.4 自定义Source 2.4.1 案例 - 随机 ...

  5. flink读取不到文件_Flink流处理API——Source

    本文主要从以下几个方面介绍Flink的流处理API--Source 一.从集合中读取数据 二.从文件中读取数据 三.从Kafka中读取数据 四.自定义Source 数据处理的过程基本可以分为三个阶段分 ...

  6. 9.FLINK Sink\API\自定义sink

    9.Sink 9.1.API 9.2.注意 9.3.自定义sink 9.Sink 9.1.API 1.ds.print 直接输出到控制台 2.ds.printToErr()直接输出到控制台,用红色 3 ...

  7. Flink常见流处理API

    Flink 流处理API的编程可以分为environment,source,transform,sink四大部分 1 Flink支持的数据类型   在Flink底层因为要对所有的数据序列化,反序列化对 ...

  8. Iceberg 在基于 Flink 的流式数据入库场景中的应用

    本文以流式数据入库的场景为基础,介绍引入 Iceberg 作为落地格式和嵌入 Flink sink 的收益,并分析了当前可实现的框架及要点. 应用场景 流式数据入库,是大数据和数据湖的典型应用场景.上 ...

  9. 如何在 Apache Flink 中使用 Python API?

    本文根据 Apache Flink 系列直播课程整理而成,由 Apache Flink PMC,阿里巴巴高级技术专家 孙金城 分享.重点为大家介绍 Flink Python API 的现状及未来规划, ...

最新文章

  1. python数码时钟代码_Python+Pyqt实现简单GUI电子时钟
  2. Logtail 混合模式:使用插件处理文件日志
  3. 修改某个UITextField的键盘的返回键类型以及监听键盘的高度变化,取到键盘动画退出弹出的时间,一起随着键盘顶出来或者压下去,...
  4. python做审计底稿视频_最新Python教学视频,每天自学俩小时,让你offer拿到手软...
  5. iPhone11系列新配色售空 暗夜绿溢价超过500元
  6. css工程师技巧,web前端工程师必须掌握的技巧–CSS Sprites技术(附基础操作教程)...
  7. 织梦php 文章采集规则,如何正确写DedeCms采集规则
  8. 我常去的编程技术网站[最近更新:2011.07.10
  9. Bootstrap如何设置table样式
  10. 员工转正述职答辩问什么问题_新员工转正述职答辩.ppt
  11. 吃饭速度跟肥胖挂钩,吃太快容易长胖
  12. 正好配资点评北交所成立,新基建起爆
  13. java-php-python-ssm-民航售票管理系统-计算机毕业设计
  14. 华为鸿蒙摄像头,随时随地看一看!华为首款鸿蒙智能摄像头发布
  15. Shell编程扩展正则表达式(egrep、awk)
  16. 在linux下安装xp系统
  17. DA1458x使用之第四篇——ADC
  18. iPad内置时钟走快 苹果不允许第三方应用校准
  19. Nu2menu 插件
  20. MySQL最左匹配原则,道儿上兄弟都得知道的原则

热门文章

  1. 职场10个受欢迎的英文名
  2. 原型模式的深浅克隆区别
  3. 使用mshta和csv注入配合获得主机权限
  4. 微信公众号开发---nginx反向代理
  5. collectl 命令收集描述当前系统状态的数据
  6. 创新的S2B2B电商系统网站解决方案:为家居用品行业带来更多商机
  7. Attention 一综述
  8. 教你用AI修复旧相片
  9. JS实现购物车功能(JS案例)
  10. div内li标签间距_html ul li 横排 间距