flink API之Sink入门
kafka sink
添加依赖
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.11_2.11</artifactId><version>1.7.2</version>
</dependency>
将数据写入kafka,然后启动命令行消费者可以看下结果
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011import java.util.Propertiesobject KafkaSink extends App {val properties = new Properties()properties.setProperty("bootstrap.servers", "mypc01:9092,mypc02:9092,mypc03:9092")properties.setProperty("group.id", "consumer-group")properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")properties.setProperty("auto.offset.reset", "latest")//构建环境private val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//读取文件private val stream: DataStream[String] = env.readTextFile("a.txt")//添加kafka sinkstream.addSink(new FlinkKafkaProducer011[String]("flink", new SimpleStringSchema(), properties))//执行env.execute()
}
在kafka集群上启动消费者查看结果即可
kafka-console-consumer.sh \
--bootstrap-server mypc01:9092,mypc02:9092,mypc03:9092 \
--topic flink
Redis Sink
<dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version>
</dependency>
flink API之Sink入门相关推荐
- Flink API之Source入门
从集合构建 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}object Sen ...
- 9.FLINK Sink\API\自定义sink
9.Sink 9.1.API 9.2.注意 9.3.自定义sink 9.Sink 9.1.API 1.ds.print 直接输出到控制台 2.ds.printToErr()直接输出到控制台,用红色 3 ...
- Flink之流处理API之Sink
Sink Flink没有类似于spark中foreach方法,让用户进行迭代的操作.虽有对外的输出操作都要利用Sink完成.最后通过类似如下方式完成整个任务最终输出操作. myDstream.addS ...
- flink Table API 与SQL入门实战
流处理和批处理都可以用,是非常的方便! 导入依赖 <dependency><groupId>org.apache.flink</groupId><artifa ...
- Flink 教程 gitbook 从入门到入土(详细教程)
Flink从入门到入土(详细教程) 和其他所有的计算框架一样,flink也有一些基础的开发步骤以及基础,核心的API,从开发步骤的角度来讲,主要分为四大部分 1.Environment Flink J ...
- Flink SQL 自定义 Sink
1. 背景 2. 步骤 3.自定义 sink 代码 4. 使用 Redis Sink 5.详细解释 6.原理 7.参考 1.背景 内部要做 Flink SQL 平台,本文以自定义 Redis Sink ...
- Flink实操 : Sink操作
. 一 .前言 二 .类型 2.1. 基于本地集合的sink 2.2. 基于文件的sink 2.2.1.将数据写入本地文件 2.2.2.将数据写入HDFS 2.3. Kafka Sink 2.4. M ...
- 【Flink】Flink 自定义 redis sink
1.概述 内部要做 Flink SQL 平台,本文以自定义 Redis Sink 为例来说明 Flink SQL 如何自定义 Sink 以及自定义完了之后如何使用 基于 Flink 1.11 2.步骤 ...
- 初识Django —Python API接口编程入门
初识Django -Python API接口编程入门 一.WEB架构的简单介绍 Django是什么? Django是一个开放源代码的Web应用框架,由Python写成.我们的目标是用Python语言, ...
最新文章
- android Theme启动APP闪屏处理
- python语言的解释性特点指的是编写的程序不需要编译_解释性与编译型 Python2和python3的区别...
- Java面试技巧之MySQL问题梳理
- ML二:python批量修改文件名-测试KDTree
- union的作用 c语言,C语言(union类型及应用)
- nginx源码学习资源
- 专题导读:高性能计算虚拟数据空间
- 研发团队绩效_如何在团队内部建立网络绩效文化
- 百度seo排名规则_百度seo排名优化要点讲解(已帮助5184人)
- 11-7 无底洞问题
- android+实时ping工具,安卓ping测试工具
- EasyDarwin —— ubuntu搭建rtsp服务,使用FFmpeg进行rtsp推拉流
- 清华大学计算机系本科课程,清华大学计算机系本科生全部课程详细介绍
- 高通设备进入高通9008模式
- dede 表单必填_dede自定义表单“必填项”设置方法
- 在vue中修改数组某个元素,值变了,渲染不了
- 分享老齐【学方法】宽信用周期对股市的影响!
- 无线路由器无法在计算机上设置,无线路由器设置管理地址无法打开解决方法
- NAACL'22 Findings | 社交媒体上的抱怨强度分析
- wordpress主题_选择完美的WordPress主题–您应该考虑的9件事
热门文章
- ai跟随路径_AI机器人掌勺 马桥豆腐干飘香,长三角文博会上的这些“马桥元素”大放异彩...
- win10家庭版安装iis 微软web服务器 windows安装IIS web服务器
- django view返回form error_Django(解决被钓鱼CSRF、Django中间件、反射)
- Tcl Tutorial 笔记7 ·for incr
- 二层交换机 不在同一子网_IP地址、子网掩码、网关之间有什么联系呢?各自用途有什么?...
- 禁用计算机服务,适当禁用系统服务 提升计算机运行速度
- 服务器电源控制芯片,服务器电源中的PFC控制芯片HA16141的应用.pdf
- python 对角阵_numpy创建单位矩阵和对角矩阵的实例
- windows7卸载linux系统,win7下安全卸载linux系统
- php网址图片怎么转based4,Ionic4 Base64 转化成图片插件-Base64 转化成图片Base64 To Gallery - Ionic Native...