.

  • 一 .前言
  • 二 .类型
    • 2.1. 基于本地集合的sink
    • 2.2. 基于文件的sink
      • 2.2.1.将数据写入本地文件
      • 2.2.2.将数据写入HDFS
    • 2.3. Kafka Sink
    • 2.4. MySQL Sink

一 .前言

二 .类型

2.1. 基于本地集合的sink

目标:

基于下列数据,分别 进行打印输出,error输出,collect()

(19, "zhangsan", 178.8),
(17, "lisi", 168.8),
(18, "wangwu", 184.8),
(21, "zhaoliu", 164.8)

代码:

import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentobject SinkCollection {def main(args: Array[String]): Unit = {// 1. 创建流处理环境val env = StreamExecutionEnvironment.getExecutionEnvironment// 2.用fromCollection创建DataStream(fromCollection)val data = env.fromCollection(List((19, "zhangsan", 178.8),  (17, "lisi", 168.8), (18, "wangwu", 184.8),  (21, "zhaoliu", 164.8)  ))// 3.处理数据// 4.打印输出data.print()data.printToErr()// data 的数据为批处理的时候可以使用collect// print(data.collect())// 5.执行任务env.execute()}}

2.2. 基于文件的sink

  • flink支持多种存储设备上的文件,包括本地文件,hdfs文件等。

  • flink支持多种文件的存储格式,包括text文件,CSV文件等。

  • writeAsText():TextOuputFormat - 将元素作为字符串写入行。字符串是通过调用每个元素的toString()方法获得的。

2.2.1.将数据写入本地文件

目标:

基于下列数据,写入到文件中

List((1,"flink"),(2,"sink"))

代码:


import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentobject LocalFileSink {def main(args: Array[String]): Unit = {// 1. 创建流处理环境val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)// 2.用fromCollection创建DataStream(fromCollection)val data = env.fromCollection(List((1,"flink"),(2,"sink")))// 3.处理数据// 4.打印输出data.writeAsText("/opt/a/tmp/FileSink.txt",WriteMode.OVERWRITE)// 5.执行任务env.execute()}
}

2.2.2.将数据写入HDFS

写入数据到HDFS中


import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentobject HdfsFileSink {def main(args: Array[String]): Unit = {// 1. 创建流处理环境val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)// 2.用fromCollection创建DataStream(fromCollection)val data = env.fromCollection(List((1,"flink"),(2,"sink")))// 3.处理数据// 4.打印输出data.writeAsText("hdfs://h23:8020/tmp/test/tmp/FileSink.txt",WriteMode.OVERWRITE)// 5.执行任务env.execute()}
}

2.3. Kafka Sink

kafka-console-consumer.sh --from-beginning --topic test2 --zookeeper node01:2181,node02:2181,node03:2181

示例

将数据落地到Kafka中

cd /usr/hdp/current/kafka-broker/bin查看topic列表 : ./kafka-topics.sh --zookeeper hxx:2181 list创建topic :./kafka-topics.sh --zookeeper hxx:2181 --create --topic test --partitions 3 --replication-factor 1生产topic数据
./kafka-console-producer.sh --broker-list 192.168.xx.xx:9092 --topic test读取topic数据
./kafka-console-consumer.sh --bootstrap-server 192.168.xx.xx:9092 --topic test --from-beginning

import java.util.Propertiesimport org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}object KafkaSink {def main(args: Array[String]): Unit = {// 1. 创建流处理环境val env = StreamExecutionEnvironment.getExecutionEnvironment// 2.用fromCollection创建DataStream(fromCollection)val data = env.fromCollection(List("flink","Spark"))// 3.构造Kafka Sinkval pro: Properties = new Propertiespro.setProperty("bootstrap.servers", " 192.168.xx.xx:9092")val kafkaSink = new FlinkKafkaProducer[String]("test",new SimpleStringSchema(),pro)// 4.打印输出data.addSink(kafkaSink)// 5.执行任务env.execute()}
}

2.4. MySQL Sink

示例

加载下列本地集合,导入MySql中

List((10, "dazhuang", "123456", "大壮"),(11, "erya", "123456", "二丫"),(12, "sanpang", "123456", "三胖"))

代码


import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}object MySqlSink {def main(args: Array[String]): Unit = {// 1. 创建流处理环境val env = StreamExecutionEnvironment.getExecutionEnvironment// 2.用fromCollection创建DataStream(fromCollection)val data : DataStream[(Int, String, String, String)] = env.fromCollection(List((10, "dazhuang", "123456", "大壮"),(11, "erya", "123456", "二丫"),(12, "sanpang", "123456", "三胖")))// 3.设置sinkdata.addSink(new MySqlCustomSink)// 4.执行任务env.execute()}
}class MySqlCustomSink extends RichSinkFunction [(Int,String,String,String)] {private var connection: Connection = nullprivate var ps: PreparedStatement = nulloverride def open(parameters: Configuration): Unit = {//1:加载驱动Class.forName("com.mysql.jdbc.Driver")//2:创建连接connection = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306/tmp?useUnicode=true&characterEncoding=UTF-8&useSSL=false", "root", "root")//3:获得执行语句val sql = "insert into user(id , username , password , name) values(?,?,?,?);"ps = connection.prepareStatement(sql)}override def invoke(value: (Int, String, String, String), context: SinkFunction.Context): Unit = {try {//4.组装数据,执行插入操作ps.setInt(1, value._1)ps.setString(2, value._2)ps.setString(3, value._3)ps.setString(4, value._4)ps.executeUpdate()} catch {case e: Exception => println(e.getMessage)}}//关闭连接操作override def close(): Unit = {if (connection != null) {connection.close()}if (ps != null) {ps.close()}}}

github 地址: https://github.com/BoYiZhang/flink-demo

Flink实操 : Sink操作相关推荐

  1. Flink实操 : 算子操作

    . 一 .前言 二 .算子操作 2.1. map 2.2. flatMap 2.3. mapPartition 2.4. filter 2.5. reduce/groupBy 2.6. reduceG ...

  2. Flink实操 : DataSource操作

    . 一 .前言 二 .四种读取类型 2.1. 基于本地集合的source(Collection-based-source) 2.2. 基于文件的source(File-based-source) 2. ...

  3. jQuery入门实操-css操作,鼠标点击事件,页面计算器

    前言 本文是学习jQuery中的一些实践,是jQuery入门的实操案例.更多语法可参考w3school的jQuery参考手册 jQuery是一个快速.简洁的JavaScript框架,是继Prototy ...

  4. Flink实操 : 广播变量/累加器/分布式缓存

    . 一 .前言 二 .广播变量使用 2.1.前言 2.2. 使用 三 .累加器 3.1. 前言 3.2. 使用 四 .分布式缓存 4.1. 前言 4.2.使用 一 .前言 二 .广播变量使用 2.1. ...

  5. Flink实操 : 状态管理

    . 一 .概念 1.1. 什么是有状态的计算? 1.2. 传统的流计算系统缺少对于程序状态的有效支持 1.3. Flink丰富的状态访问和高效的容错机制 二 .Keyed State 2.1.保存st ...

  6. 完整代码+实操!手把手教你操作Faster R-CNN和Mask R-CNN

    点击上方↑↑↑蓝字关注我们~ 「2019 Python开发者日」全日程揭晓,请扫码咨询 ↑↑↑ 机器视觉领域的核心问题之一就是目标检测(Object Detection),它的任务是找出图像当中所有感 ...

  7. rtk采点后如何导入cad_【干货】RTK实操视频:工程之星5.0操作攻略!(第五部分)...

    前期回顾:[干货]RTK实操视频:工程之星5.0操作攻略!(第一部分)[干货]RTK实操视频:工程之星5.0操作教程(第二部分) [干货]RTK实操视频:工程之星5.0操作攻略!(第三部分) [干货] ...

  8. 从实操教学到赛题演练,腾讯专家亲授TI-ONE平台操作攻略!

    ​ 5月10日,我们迎来了"视"界直播周的首场直播--"2021腾讯广告算法大赛赛题解析".直播现场,芦清林和熊江丰老师对本届赛事的两大赛题进行了深入浅出的解析 ...

  9. 2021年R1快开门式压力容器操作考试题及R1快开门式压力容器操作实操考试视频

    题库来源:安全生产模拟考试一点通公众号小程序 安全生产模拟考试一点通:R1快开门式压力容器操作考试题是安全生产模拟考试一点通生成的,R1快开门式压力容器操作证模拟考试题库是根据R1快开门式压力容器操作 ...

最新文章

  1. 中国程序员开发的远程桌面火了!Mac 可用,只有 9MB,支持自建中继器
  2. python输入列表方法_Python用input输入列表的方法
  3. Kconfig文件结构(图文)简介
  4. Open Infrastructure开启开放协作新时代
  5. ajax获得excel文件流在前端打开_主流前端技术讲解,面试必考!
  6. java8--IO(java疯狂讲义3复习笔记)
  7. 845B - Luba And The Ticket
  8. Java中的深拷贝(深复制)和浅拷贝(浅复制)
  9. 利用syslinux制作Dos、WinPE、Slax Linux集成u盘
  10. jQuery基础--选择器
  11. 开两个服务内存溢出_详解JVM内存区域
  12. linux vi 字符串替换
  13. c语言一本书的页码从自然数1开始顺序编码,算法设计与分析 1-1 统计数字问题(C语言版)...
  14. Ubuntu20.04 系统搭建 NetBox(开源 IPAM/DC 管理工具)
  15. 解决conda在Downloading and Extracting Packages时下载速度慢的问题
  16. to be ruled from Windows Space about C++
  17. 跨国项目要注意时区问题
  18. JavaScript实现在线websocket WSS测试工具 -toolfk程序员工具网
  19. 聊天的一点笔记--顺便浅谈技术服务公司的激励制度设计
  20. ipad出现support.apple.com

热门文章

  1. getline的用法
  2. 简析TCP的三次握手与四次分手原理
  3. 认认真真给你推荐一本书(附10本赠书)
  4. LoRa开发板升级的时候,为什么需要修改跳线帽?
  5. JAVA爬虫三剑客,JAVAWEB三剑客之Filter
  6. 基于Python的图像分类 项目实践——图像分类项目的指导文档
  7. linux superblock位置,Linux下恢复EXT3 Superblock的正确方法
  8. uni-app IOS 音乐无法自动播放
  9. go benchmark 基准测试
  10. 采购工作必备Excel实用技巧大全(收藏)