使用Flink时,如果从Kafka中读取输入流,默认提供的是String类型的Schema:

val myConsumer = new FlinkKafkaConsumer08[String]("Topic名称", new SimpleStringSchema(), properties);

如果存入Kafka中的数据不是JSON,而是Protobuf类型的数据,需要用二进制的Schema进行接收,可以自己实现一个类,很简单,只有一行代码:

 class ByteArrayDeserializationSchema[T] extends AbstractDeserializationSchema[Array[Byte]]{@throws[IOException]override def deserialize(message: Array[Byte]): Array[Byte] = message
}

然后使用时,如下所示:

val myConsumer = new FlinkKafkaConsumer08[String]("Topic名称", new ByteArrayDeserializationSchema[Array[Byte]](), properties);

转载于:https://www.cnblogs.com/liugh/p/7448554.html

使用Flink时从Kafka中读取Array[Byte]类型的Schema相关推荐

  1. 【Kafka】从kafka中读取最新数据

    [Kafka]从kafka中读取最新数据 一.死循环无限拉取kafka数据 1.1 整体框架剖析 1.2 测试 二.@KafkaListener注解 实现监听kafka数据 三.参考资料 前情提要:我 ...

  2. 使用Spark Streaming从kafka中读取数据把数据写入到mysql 实例

    文章目录 一. 题目 题目和数据 二. pom依赖 三.建表语句 四. 连接kafka配置类 五. 自定义分区类 六. 读取数据并发送数据 七. 消费数据,把数据存储到mysql 一. 题目 题目和数 ...

  3. 在启动时从配置文件中读取对象

    目录 介绍 背景 使用代码 如何从Web应用程序的配置文件反序列化对象 介绍 本技巧揭示了一种非常简单的方法,可以将Web应用程序的配置文件的各个部分作为对象读取,而无需选择模式或依赖注入. 背景 在 ...

  4. flink实时消费kafka中oracle的DML数据写入mysql

    1.需要环境 zookeeper,小编安装环境为zookeeper-3.4.10 kakfa,小编安装环境为kafka_2.13-2.8.0 kafka-connect-oracle,此为kafka- ...

  5. 深度拷贝时,类中的非简单类型字段

    2019独角兽企业重金招聘Python工程师标准>>> public class Test implements Cloneable {private String id;priva ...

  6. Flink Caused by:org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException

    Flink程序从kafka中读取数据进行计算,FLink程序一启动就报以下错误,看到错误很懵逼.加班到9点没解决,第二天提前来半小时,把如下错误信息又看了一遍.具体错误如下: 错误信息1. 20/12 ...

  7. Flink从Kafka 0.8中读取多个Topic时的问题

    Flink提供了FlinkKafkaConsumer08,使用Kafka的High-level接口,从Kafka中读取指定Topic的数据,如果要从多个Topic读取数据,可以如下操作: 1.appl ...

  8. 中读取数据_Flink入门实战 (中)

    一.Flink 流处理 API 1.Environment getExecutionEnvironment 创建一个执行环境,表示当前执行程序的上下文. 如果程序是独立调用的,则 此方法返回本地执行环 ...

  9. ifstream 和 ofstream 文件中读取和写入操作

    导读 ofstream是从内存到硬盘,ifstream是从硬盘到内存,其实所谓的流缓冲就是内存空间. 在C++中,有一个stream这个类,所有的I/O都以这个"流"类为基础的,包 ...

最新文章

  1. Gartner2018新兴技术成熟度曲线:人机界线日益模糊!
  2. Java中 Iterable 和 Iterator 的区别
  3. Linux redhat下安装jdk-6u45-linux-x64.bin
  4. 深度学习:优化器工厂,各种优化器介绍,numpy实现深度学习(一)
  5. google的api key调用次数是多少_Sprint Boot如何基于Redis发布订阅实现异步消息系统的同步调用?...
  6. 怎么样能找到国外的群?
  7. 鹿邑2021高考成绩查询,鹿邑中考成绩查询2021
  8. 线程打印_经典面试题——两个线程交替打印奇数和偶数
  9. 计算机设备问题代码43,win10系统提示由于该设备有问题windows已将其停止(代码43)的修复方案...
  10. html打印预览空白,打印预览空白,网页打印空白原因及解决办法汇总
  11. 跨境电商必看:amazon账号关联因素
  12. Latex公式编号: 多行公式多编号,多行公式单编号
  13. 肝移植笔记1:论文阅读-对率回归预测移植物失功概率
  14. linux 1000 ask(转)
  15. 如何使用 transform 来跟踪你最近的客户订单
  16. android 揭示动画_揭示自动驾驶汽车第4级和第5级的真实含义
  17. 003day (css文本、列表、背景相关属性,精灵图的制作)
  18. 生活中的定律之马太效应
  19. 145只基金隐性重仓股现身中报
  20. Airtest+Poco+Pytest框架搭建1

热门文章

  1. 用css动画写一个下红包雨的效果
  2. 撩课-Web大前端每天5道面试题-Day7
  3. 浅析开源数据库MySQL架构
  4. C#温故而知新学习系列之.NET框架高级特性—概述.NET框架中的反射(一)
  5. JSP获得客服端MAC地址
  6. Spring.NET学习笔记9——打造简易的依赖注入框架(练习篇) Level 100
  7. ATLAS入门篇之HoverMenuExtender控件编程(2)
  8. linux7虚拟机修改主机名,centos 7 更改网卡名,主机名,虚拟机添加网卡
  9. C++前插法实现链表
  10. c语言funcode空格消失的函数,01北科大暑期计算机实践FunCode游戏设计+C++课程设计 - 海底世界 - 图文...