使用Flink时从Kafka中读取Array[Byte]类型的Schema
使用Flink时,如果从Kafka中读取输入流,默认提供的是String类型的Schema:
val myConsumer = new FlinkKafkaConsumer08[String]("Topic名称", new SimpleStringSchema(), properties);
如果存入Kafka中的数据不是JSON,而是Protobuf类型的数据,需要用二进制的Schema进行接收,可以自己实现一个类,很简单,只有一行代码:
class ByteArrayDeserializationSchema[T] extends AbstractDeserializationSchema[Array[Byte]]{@throws[IOException]override def deserialize(message: Array[Byte]): Array[Byte] = message }
然后使用时,如下所示:
val myConsumer = new FlinkKafkaConsumer08[String]("Topic名称", new ByteArrayDeserializationSchema[Array[Byte]](), properties);
转载于:https://www.cnblogs.com/liugh/p/7448554.html
使用Flink时从Kafka中读取Array[Byte]类型的Schema相关推荐
- 【Kafka】从kafka中读取最新数据
[Kafka]从kafka中读取最新数据 一.死循环无限拉取kafka数据 1.1 整体框架剖析 1.2 测试 二.@KafkaListener注解 实现监听kafka数据 三.参考资料 前情提要:我 ...
- 使用Spark Streaming从kafka中读取数据把数据写入到mysql 实例
文章目录 一. 题目 题目和数据 二. pom依赖 三.建表语句 四. 连接kafka配置类 五. 自定义分区类 六. 读取数据并发送数据 七. 消费数据,把数据存储到mysql 一. 题目 题目和数 ...
- 在启动时从配置文件中读取对象
目录 介绍 背景 使用代码 如何从Web应用程序的配置文件反序列化对象 介绍 本技巧揭示了一种非常简单的方法,可以将Web应用程序的配置文件的各个部分作为对象读取,而无需选择模式或依赖注入. 背景 在 ...
- flink实时消费kafka中oracle的DML数据写入mysql
1.需要环境 zookeeper,小编安装环境为zookeeper-3.4.10 kakfa,小编安装环境为kafka_2.13-2.8.0 kafka-connect-oracle,此为kafka- ...
- 深度拷贝时,类中的非简单类型字段
2019独角兽企业重金招聘Python工程师标准>>> public class Test implements Cloneable {private String id;priva ...
- Flink Caused by:org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
Flink程序从kafka中读取数据进行计算,FLink程序一启动就报以下错误,看到错误很懵逼.加班到9点没解决,第二天提前来半小时,把如下错误信息又看了一遍.具体错误如下: 错误信息1. 20/12 ...
- Flink从Kafka 0.8中读取多个Topic时的问题
Flink提供了FlinkKafkaConsumer08,使用Kafka的High-level接口,从Kafka中读取指定Topic的数据,如果要从多个Topic读取数据,可以如下操作: 1.appl ...
- 中读取数据_Flink入门实战 (中)
一.Flink 流处理 API 1.Environment getExecutionEnvironment 创建一个执行环境,表示当前执行程序的上下文. 如果程序是独立调用的,则 此方法返回本地执行环 ...
- ifstream 和 ofstream 文件中读取和写入操作
导读 ofstream是从内存到硬盘,ifstream是从硬盘到内存,其实所谓的流缓冲就是内存空间. 在C++中,有一个stream这个类,所有的I/O都以这个"流"类为基础的,包 ...
最新文章
- Gartner2018新兴技术成熟度曲线:人机界线日益模糊!
- Java中 Iterable 和 Iterator 的区别
- Linux redhat下安装jdk-6u45-linux-x64.bin
- 深度学习:优化器工厂,各种优化器介绍,numpy实现深度学习(一)
- google的api key调用次数是多少_Sprint Boot如何基于Redis发布订阅实现异步消息系统的同步调用?...
- 怎么样能找到国外的群?
- 鹿邑2021高考成绩查询,鹿邑中考成绩查询2021
- 线程打印_经典面试题——两个线程交替打印奇数和偶数
- 计算机设备问题代码43,win10系统提示由于该设备有问题windows已将其停止(代码43)的修复方案...
- html打印预览空白,打印预览空白,网页打印空白原因及解决办法汇总
- 跨境电商必看:amazon账号关联因素
- Latex公式编号: 多行公式多编号,多行公式单编号
- 肝移植笔记1:论文阅读-对率回归预测移植物失功概率
- linux 1000 ask(转)
- 如何使用 transform 来跟踪你最近的客户订单
- android 揭示动画_揭示自动驾驶汽车第4级和第5级的真实含义
- 003day (css文本、列表、背景相关属性,精灵图的制作)
- 生活中的定律之马太效应
- 145只基金隐性重仓股现身中报
- Airtest+Poco+Pytest框架搭建1
热门文章
- 用css动画写一个下红包雨的效果
- 撩课-Web大前端每天5道面试题-Day7
- 浅析开源数据库MySQL架构
- C#温故而知新学习系列之.NET框架高级特性—概述.NET框架中的反射(一)
- JSP获得客服端MAC地址
- Spring.NET学习笔记9——打造简易的依赖注入框架(练习篇) Level 100
- ATLAS入门篇之HoverMenuExtender控件编程(2)
- linux7虚拟机修改主机名,centos 7 更改网卡名,主机名,虚拟机添加网卡
- C++前插法实现链表
- c语言funcode空格消失的函数,01北科大暑期计算机实践FunCode游戏设计+C++课程设计 - 海底世界 - 图文...