kafka sink

添加依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.11_2.11</artifactId><version>1.7.2</version>
</dependency>

将数据写入kafka,然后启动命令行消费者可以看下结果

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011import java.util.Propertiesobject KafkaSink extends App {val properties = new Properties()properties.setProperty("bootstrap.servers", "mypc01:9092,mypc02:9092,mypc03:9092")properties.setProperty("group.id", "consumer-group")properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")properties.setProperty("auto.offset.reset", "latest")//构建环境private val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//读取文件private val stream: DataStream[String] = env.readTextFile("a.txt")//添加kafka sinkstream.addSink(new FlinkKafkaProducer011[String]("flink", new SimpleStringSchema(), properties))//执行env.execute()
}

在kafka集群上启动消费者查看结果即可

kafka-console-consumer.sh \
--bootstrap-server mypc01:9092,mypc02:9092,mypc03:9092 \
--topic flink

Redis Sink

<dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version>
</dependency>

flink API之Sink入门相关推荐

  1. Flink API之Source入门

    从集合构建 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}object Sen ...

  2. 9.FLINK Sink\API\自定义sink

    9.Sink 9.1.API 9.2.注意 9.3.自定义sink 9.Sink 9.1.API 1.ds.print 直接输出到控制台 2.ds.printToErr()直接输出到控制台,用红色 3 ...

  3. Flink之流处理API之Sink

    Sink Flink没有类似于spark中foreach方法,让用户进行迭代的操作.虽有对外的输出操作都要利用Sink完成.最后通过类似如下方式完成整个任务最终输出操作. myDstream.addS ...

  4. flink Table API 与SQL入门实战

    流处理和批处理都可以用,是非常的方便! 导入依赖 <dependency><groupId>org.apache.flink</groupId><artifa ...

  5. Flink 教程 gitbook 从入门到入土(详细教程)

    Flink从入门到入土(详细教程) 和其他所有的计算框架一样,flink也有一些基础的开发步骤以及基础,核心的API,从开发步骤的角度来讲,主要分为四大部分 1.Environment Flink J ...

  6. Flink SQL 自定义 Sink

    1. 背景 2. 步骤 3.自定义 sink 代码 4. 使用 Redis Sink 5.详细解释 6.原理 7.参考 1.背景 内部要做 Flink SQL 平台,本文以自定义 Redis Sink ...

  7. Flink实操 : Sink操作

    . 一 .前言 二 .类型 2.1. 基于本地集合的sink 2.2. 基于文件的sink 2.2.1.将数据写入本地文件 2.2.2.将数据写入HDFS 2.3. Kafka Sink 2.4. M ...

  8. 【Flink】Flink 自定义 redis sink

    1.概述 内部要做 Flink SQL 平台,本文以自定义 Redis Sink 为例来说明 Flink SQL 如何自定义 Sink 以及自定义完了之后如何使用 基于 Flink 1.11 2.步骤 ...

  9. 初识Django —Python API接口编程入门

    初识Django -Python API接口编程入门 一.WEB架构的简单介绍 Django是什么? Django是一个开放源代码的Web应用框架,由Python写成.我们的目标是用Python语言, ...

最新文章

  1. android Theme启动APP闪屏处理
  2. python语言的解释性特点指的是编写的程序不需要编译_解释性与编译型 Python2和python3的区别...
  3. Java面试技巧之MySQL问题梳理
  4. ML二:python批量修改文件名-测试KDTree
  5. union的作用 c语言,C语言(union类型及应用)
  6. nginx源码学习资源
  7. 专题导读:高性能计算虚拟数据空间
  8. 研发团队绩效_如何在团队内部建立网络绩效文化
  9. 百度seo排名规则_百度seo排名优化要点讲解(已帮助5184人)
  10. 11-7 无底洞问题
  11. android+实时ping工具,安卓ping测试工具
  12. EasyDarwin —— ubuntu搭建rtsp服务,使用FFmpeg进行rtsp推拉流
  13. 清华大学计算机系本科课程,清华大学计算机系本科生全部课程详细介绍
  14. 高通设备进入高通9008模式
  15. dede 表单必填_dede自定义表单“必填项”设置方法
  16. 在vue中修改数组某个元素,值变了,渲染不了
  17. 分享老齐【学方法】宽信用周期对股市的影响!
  18. 无线路由器无法在计算机上设置,无线路由器设置管理地址无法打开解决方法
  19. NAACL'22 Findings | 社交媒体上的抱怨强度分析
  20. wordpress主题_选择完美的WordPress主题–您应该考虑的9件事

热门文章

  1. ai跟随路径_AI机器人掌勺 马桥豆腐干飘香,长三角文博会上的这些“马桥元素”大放异彩...
  2. win10家庭版安装iis 微软web服务器 windows安装IIS web服务器
  3. django view返回form error_Django(解决被钓鱼CSRF、Django中间件、反射)
  4. Tcl Tutorial 笔记7 ·for incr
  5. 二层交换机 不在同一子网_IP地址、子网掩码、网关之间有什么联系呢?各自用途有什么?...
  6. 禁用计算机服务,适当禁用系统服务 提升计算机运行速度
  7. 服务器电源控制芯片,服务器电源中的PFC控制芯片HA16141的应用.pdf
  8. python 对角阵_numpy创建单位矩阵和对角矩阵的实例
  9. windows7卸载linux系统,win7下安全卸载linux系统
  10. php网址图片怎么转based4,Ionic4 Base64 转化成图片插件-Base64 转化成图片Base64 To Gallery - Ionic Native...