内置source和sink

内置source包括从文件读取,从文件夹读取,从socket中读取、从集合或者迭代器中读取。内置的sink包括写文件、控制台输出、socket

内置connectors

  • Apache Kafka (source/sink)
  • Apache Cassandra (sink)
  • Amazon Kinesis Streams (source/sink)
  • Elasticsearch (sink)
  • Hadoop FileSystem (sink)
  • RabbitMQ (source/sink)
  • Apache NiFi (source/sink)
  • Twitter Streaming API (source)

HDFS Connector

这个connector提供了一个sink,可以写分区到任何一个文件系统(只要支持hadoop filesystem就可以)。

Kafka Connector

添加依赖

     <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>${flink.version}</version></dependency>

下载安装zookeeper

  1. 将zookeeper-3.4.5下面的conf文件夹下的zoo_sample.cfg复制一份zoo.cfg,修改dataDir=/home/vincent/tmp
  2. 运行zookeeper:./zkServer.sh start
  3. 输入jps可以看到QuorumPeerMain进程说明zk成功启动了。

下载安装kafka

  • 同样修改kafka_2.11-2.0.1/config下面的server.properties文件,修改:

log.dirs=/home/vincent/tmp/kafka-logs

zookeeper.connect=localhost:2181

  • 运行Kafka:
./bin/kafka-server-start.sh ./config/server.properties
  • 输入jps可以看到Kafka进程说明kafka成功启动了。
  • 创建一个topic
vincent@ubuntu:~/kafka_2.11-2.0.1$ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mytest
  • 创建成功,查看所有topic
vincent@ubuntu:~/kafka_2.11-2.0.1$ ./bin/kafka-topics.sh --list --zookeeper localhost:2181
mytest
  • 启动生产者
vincent@ubuntu:~/kafka_2.11-2.0.1$ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytest
  • 启动消费者
vincent@ubuntu:~/kafka_2.11-2.0.1$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytest

Flink连接Kafka

source 从Kafka中读取数据

Scala:

object KafkaConnectorConsumerApp {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentval properties = new Properties()properties.setProperty("bootstrap.servers", "192.168.227.128:9092")properties.setProperty("group.id", "test")env.addSource(new FlinkKafkaConsumer[String]("mytest", new SimpleStringSchema(), properties)).print()env.execute("KafkaConnectorConsumerApp")}

Java:

public class JavaKafkaConsumerApp {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", "192.168.227.128:9092");properties.setProperty("group.id", "test");DataStream<String> stream = env.addSource(new FlinkKafkaConsumer<>("mytest", new SimpleStringSchema(), properties));stream.print();env.execute("JavaKafkaConsumerApp");}
}

sink 将数据输出到Kafka中

Scala:

object KafkaConnectorProducerApp {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment// 从socket接受数据,通过Flink,将数据Sink到kafkaval data=env.socketTextStream("192.168.227.128", 9999)val properties = new Properties()properties.setProperty("bootstrap.servers", "192.168.227.128:9092")properties.setProperty("group.id", "test")val kafkaSink = new FlinkKafkaProducer[String]("mytest", new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()), properties)data.addSink(kafkaSink)env.execute("KafkaConnectorProducerApp")}
}

Java:

public class JavaKafkaProducerApp {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> data = env.socketTextStream("192.168.227.128", 9999);Properties properties = new Properties();properties.setProperty("bootstrap.servers", "192.168.227.128:9092");properties.setProperty("group.id", "test");data.addSink(new FlinkKafkaProducer<String>("192.168.227.128:9092", "mytest", new SimpleStringSchema()));env.execute("JavaKafkaProducerApp");}
}

默认的flink kafka消费策略是setStartFromGroupOffsets(default behaviour),会自动从上一次未消费的数据开始

Apache Flink 零基础入门(二十)Flink kafka connector相关推荐

  1. Apache Flink 零基础入门(十二)Flink sink

    将DataSet中的数据Sink到哪里去.使用的是对应的OutPutFormat,也可以使用自定义的sink,有可能写到hbase中,hdfs中. writeAsText() / TextOutput ...

  2. Apache Flink 零基础入门(十五)Flink DataStream编程(如何自定义DataSource)

    数据源可以通过StreamExecutionEnvironment.addSource(sourceFunction)方式来创建,Flink也提供了一些内置的数据源方便使用,例如readTextFil ...

  3. Apache Flink 零基础入门(十九)Flink windows和Time操作

    Time类型 在Flink中常用的Time类型: 处理时间 摄取时间 事件时间 处理时间 是上图中,最后一步的处理时间,表示服务器中执行相关操作的处理时间.例如一些算子操作时间,在服务器上面的时间. ...

  4. Apache Flink 零基础入门(十八)Flink Table APISQL

    什么是Flink关系型API? 虽然Flink已经支持了DataSet和DataStream API,但是有没有一种更好的方式去编程,而不用关心具体的API实现?不需要去了解Java和Scala的具体 ...

  5. Apache Flink 零基础入门(十四)Flink 分布式缓存

    Apache Flink 提供了一个分布式缓存,类似于Hadoop,用户可以并行获取数据. 通过注册一个文件或者文件夹到本地或者远程HDFS等,在getExecutionEnvironment中指定一 ...

  6. Apache Flink 零基础入门(十六)Flink DataStream transformation

    Operators transform one or more DataStreams into a new DataStream. Operators操作转换一个或多个DataStream到一个新的 ...

  7. Apache Flink 零基础入门【转】

    Apache Flink 零基础入门(一):基础概念解析 Apache Flink 零基础入门(二):DataStream API 编程 转载于:https://www.cnblogs.com/dav ...

  8. 罗马音平假名中文可复制_日语零基础入门五十音,日语零基础五十音图表

    日语零基础入门五十音,日语入门的最基本要求就是记住五十音图,但是这个记住不仅是你能背下来或是默写下来.而是你需对号入座! 下面是一张五十音图表. 即每个假名单独拿出来你要立马反应出来怎么读.其重要性甚 ...

  9. Apache Flink 零基础入门(二十)Flink部署与作业的提交

    之前我们都是基于Idea在本地进行开发,这种方式很适合开发以及测试,但是开发完之后,如何提交到服务器中运行? Flink单机部署方式 本地开发和测试过程中非常有用,只要把代码放到服务器直接运行. 前置 ...

最新文章

  1. 【Bootstrap-插件使用】Jcrop+fileinput组合实现头像上传功能
  2. oracle创建分区表
  3. java线程睡眠一分钟_Java中的TimerTimerTask和线程睡眠
  4. C++Primer Plus (第六版)阅读笔记 + 源码分析【第一章:预备知识】
  5. 转-测试用例-基本控件
  6. mfc+mysql+创建表单_一个完整的c++ web框架(webserver + orm)
  7. 用Python分析了20万场吃鸡数据,有不少有趣的发现
  8. C# WebBrowser 设置独立的代理
  9. Android之使用MediaPlayer和SurfaceView组件播放一个简单的视频
  10. POJ 1006 同余方程组
  11. [转载] python中callable_Python callable() 函数
  12. 委托和事件的一些理解笔记
  13. 在线作图|如何绘制一张气泡图
  14. Typescript 类型推断
  15. 零基础学习CANoe Panel(16)—— Clock Control/Panel Control/Start Stop Control/Tab Control
  16. mysql 时间戳查询当天数据_mysql 时间戳查询 当天 本周 当月 数据
  17. java.lang.ClassNotFoundException:teat1问题和CentOS 8 jdk安装
  18. Connection has been closed BEFORE response异常
  19. Uber 和 Lyft 在德克斯萨大获全胜
  20. 一款免费的记忆卡:学习圈

热门文章

  1. php如何判断提交内容为空,php禁止提交空表单(php空值判断)的方法
  2. RabbitMQ的消息确认ACK机制
  3. [pytorch] 通过一个例子分析torch.matmul矩阵与向量相乘的维度
  4. Pycharm中代码自动换行(亲测)
  5. Bug邮件队列插入不了
  6. 职称计算机应用能力考试模拟题,2016年职称计算机考试模拟题
  7. 计算机性能在线测评,关于电脑性能测试的常见的几大方法
  8. 计算机保存的信息是模拟信号,信息技术考试样题(附解析).doc
  9. 火狐浏览器服务器意外响应,Firefox 火狐浏览器 83 发布,已修复任意代码执行漏洞...
  10. android镜像文件怎么命名,android镜像文件说明(示例代码)