微信公众号: 大数据开发运维架构

关注可了解更多大数据相关的资讯。问题或建议,请公众号留言;

如果您觉得“大数据开发运维架构”对你有帮助,欢迎转发朋友圈


kafka中的数据通常是键值对的,所以我们这里自定义反序列化类从kafka中消费键值对的消息,为方便大家学习,这里我实现了Java/Scala两个版本,由于比较简单这里直接上代码:

一、Scala代码:

1.自定义反序列化类:

package comhadoop.ljs.flink010.kafkaimport org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchemaimport org.apache.kafka.clients.consumer.ConsumerRecord/**  * @author: Created By lujisen  * @company ChinaUnicom Software JiNan  * @date: 2020-04-25 18:31  * @version: v1.0  * @description: comhadoop.ljs.flink010.kafka  */class MyKafkaDeserializationSchema  extends KafkaDeserializationSchema[ConsumerRecord[String, String]]{  /*是否流结束,比如读到一个key为end的字符串结束,这里不再判断,直接返回false 不结束*/  override def isEndOfStream(t: ConsumerRecord[String, String]): Boolean ={    false  }  override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): ConsumerRecord[String, String] = {    new ConsumerRecord(record.topic(),record.partition(),record.offset(),new String(record.key(),"UTF-8"),new String(record.value(),"UTF-8"))  }  /*用于获取反序列化对象的类型*/  override def getProducedType: TypeInformation[ConsumerRecord[String, String]] = {    TypeInformation.of(new TypeHint[ConsumerRecord[String, String]] {})  }}

2.主函数类:

package comhadoop.ljs.flink010.kafkaimport java.util.Propertiesimport org.apache.flink.api.common.functions.MapFunctionimport org.apache.flink.streaming.api.datastream.DataStreamimport org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentimport org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerimport org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionimport org.apache.kafka.clients.consumer.ConsumerRecordimport org.apache.kafka.common.serialization.StringDeserializer/**  * @author: Created By lujisen  * @company ChinaUnicom Software JiNan  * @date: 2020-04-25 16:32  * @version: v1.0  * @description: comhadoop.ljs.flink010.kafka  */object KafkaDeserializerSchemaTest {  def main(args: Array[String]): Unit = {     /*环境初始化*/    val senv:StreamExecutionEnvironment =StreamExecutionEnvironment.getExecutionEnvironment()    /*启用checkpoint,这里我没有对消息体的key value进行判断,即使为空启动了checkpoint,遇到错误也会无限次重启*/    senv.enableCheckpointing(2000)    /*topic2不存在话会自动在kafka创建,一个分区 分区名称0*/    val  myConsumer=new FlinkKafkaConsumer[ConsumerRecord[String, String]]("topic3",new MyKafkaDeserializationSchema(),getKafkaConfig())    /*指定消费位点*/    val specificStartOffsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]()    /*这里从topic3 的0分区的第一条开始消费*/    specificStartOffsets.put(new KafkaTopicPartition("topic3", 0), 0L)    myConsumer.setStartFromSpecificOffsets(specificStartOffsets)    /*指定source数据源*/    val source:DataStream[ConsumerRecord[String, String]]=senv.addSource(myConsumer)     val keyValue=source.map(new MapFunction[ConsumerRecord[String, String],String] {      override def map(message: ConsumerRecord[String, String]): String = {        "key" + message.key + "  value:" + message.value      }    })    /*打印接收的数据*/    keyValue.print()    /*启动执行*/    senv.execute()  }   def getKafkaConfig():Properties={    val props:Properties=new Properties()    props.setProperty("bootstrap.servers","worker1.hadoop.ljs:6667,worker2.hadoop.ljs:6667")    props.setProperty("group.id","topic_1")    props.setProperty("key.deserializer",classOf[StringDeserializer].getName)    props.setProperty("value.deserializer",classOf[StringDeserializer].getName)    props.setProperty("auto.offset.reset","latest")    props  }}

二、Java代码:
1.自定义反序列化类:

package com.hadoop.ljs.flink110.kafka;import org.apache.flink.api.common.typeinfo.TypeHint;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;import org.apache.kafka.clients.consumer.ConsumerRecord;/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-04-25 18:45 * @version: v1.0 * @description: com.hadoop.ljs.flink110.kafka */public class MyKafkaDeserializationSchema implements KafkaDeserializationSchema> {     private static  String encoding = "UTF8";    @Override    public boolean isEndOfStream(ConsumerRecord nextElement) {        return false;    }    @Override    public ConsumerRecord deserialize(ConsumerRecord record) throws Exception {       /* System.out.println("Record--partition::"+record.partition());        System.out.println("Record--offset::"+record.offset());        System.out.println("Record--timestamp::"+record.timestamp());        System.out.println("Record--timestampType::"+record.timestampType());        System.out.println("Record--checksum::"+record.checksum());        System.out.println("Record--key::"+record.key());        System.out.println("Record--value::"+record.value());*/        return new ConsumerRecord(record.topic(),                record.partition(),                record.offset(),                record.timestamp(),                record.timestampType(),                record.checksum(),                record.serializedKeySize(),                record.serializedValueSize(),                /*这里我没有进行空值判断,生产一定记得处理*/                new  String(record.key(), encoding),                new  String(record.value(), encoding));    }    @Override    public TypeInformation> getProducedType() {        return TypeInformation.of(new TypeHint>(){});    }}

2.主函数类:

package com.hadoop.ljs.flink110.kafka;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;import org.apache.kafka.clients.consumer.ConsumerRecord;import java.util.HashMap;import java.util.Map;import java.util.Properties;/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-04-25 18:41 * @version: v1.0 * @description: com.hadoop.ljs.flink110.kafka */public class KafkaDeserializerSchemaTest {    public static void main(String[] args) throws Exception {        /*环境初始化*/        StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();        /*启用checkpoint,这里我没有对消息体的key value进行判断,即使为空启动了checkpoint,遇到错误也会无限次重启*/        senv.enableCheckpointing(2000);        /*topic2不存在话会自动在kafka创建,一个分区 分区名称0*/        FlinkKafkaConsumer> myConsumer=new FlinkKafkaConsumer>("topic3",new MyKafkaDeserializationSchema(),getKafkaConfig());         /*指定消费位点*/        Map specificStartOffsets = new HashMap<>();        /*这里从topic3 的0分区的第一条开始消费*/        specificStartOffsets.put(new KafkaTopicPartition("topic3", 0), 0L);        myConsumer.setStartFromSpecificOffsets(specificStartOffsets);         DataStream> source = senv.addSource(myConsumer);        DataStream keyValue = source.map(new MapFunction, String>() {            @Override            public String map(ConsumerRecord message) throws Exception {                return "key"+message.key()+"  value:"+message.value();            }        });        /*打印结果*/        keyValue.print();        /*启动执行*/        senv.execute();    }    public static Properties getKafkaConfig(){        Properties  props=new Properties();        props.setProperty("bootstrap.servers","worker1.hadoop.ljs:6667,worker2.hadoop.ljs:6667");        props.setProperty("group.id","topic_group2");        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        props.setProperty("auto.offset.reset","latest");        return props;    }}

三、函数测试

1.KafkaProducer发送测试数据类:

package com.hadoop.ljs.kafka220;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Date;import java.util.Properties;public class KafkaPartitionProducer extends Thread{  private static long count =10;  private static String topic="topic3";  private static String brokerList="worker1.hadoop.ljs:6667,worker2.hadoop.ljs:6667";  public static void main(String[] args) {        KafkaPartitionProducer jproducer = new KafkaPartitionProducer();        jproducer.start();    }    @Override    public void run() {        producer();    }    private void producer() {        Properties props = config();        KafkaProducer producer = new KafkaProducer<>(props);        ProducerRecord record=null;        System.out.println("kafka生产数据条数:"+count);        for (int i = 1; i <= count; i++) {            String json = "{"id":" + i + ","ip":"192.168.0." + i + "","date":" + new Date().toString() + "}";            String key ="key"+i;            record = new ProducerRecord(topic, key, json);            producer.send(record, (metadata, e) -> {                // 使用回调函数                if (null != e) {                    e.printStackTrace();                }                if (null != metadata) {                    System.out.println(String.format("offset: %s, partition:%s, topic:%s  timestamp:%s", metadata.offset(), metadata.partition(), metadata.topic(), metadata.timestamp()));                }            });        }        producer.close();    }    private Properties config() {        Properties props = new Properties();        props.put("bootstrap.servers",brokerList);        props.put("acks", "1");        props.put("retries", 3);        props.put("batch.size", 16384);        props.put("linger.ms", 1);        props.put("buffer.memory", 33554432);        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");        /*自定义分区,两种形式*/        /*props.put("partitioner.class", PartitionUtil.class.getName());*/        return props;    }}

2.测试结果

如果觉得我的文章能帮到您,请关注微信公众号“大数据开发运维架构”,并转发朋友圈,谢谢支持!!!

java判断读到末尾_Flink实战:自定义KafkaDeserializationSchema(Java/Scala)相关推荐

  1. java判断读到末尾_Java Web入门之java--第一节 java 简介及开发环境安装

    本篇博客是Java web入门的第一篇博客,这篇博客主要讲述java语言的一些简介. 一)先从Java语言的诞生说起. 1991年,Sun公司在一个叫做James Gosling的人的带领下,成立了一 ...

  2. java判断读到末尾_IO流如何判断读取到了流的结尾,程序中以-1来判断,是流中写入一个EOF表示流结束吗,底层实现呢?...

    -1不是流中写入的数据.read()方法返回的数据都是unsigned byte,即是[0,255].底层实现有很多,比如socket IO和文件IO,甚至你自己也可以实现. ------------ ...

  3. java判断那个时间更晚_如何用Java判断日期是早于还是晚于另一个日期

    如何用Java判断日期是早于还是晚于另一个日期 另一个工作中常见的操作就是如何判断给定的一个日期是大于某天还是小于某天?在Java 8中,LocalDate类有两类方法isBefore()和isAft ...

  4. java 判断是否夏令时_确定指定日期的Java夏令时(DST)是否处于活动状态

    我有一个Java类,它占用一个位置的纬度/经度,并在夏时制开启和关闭时返回GMT偏移量.我正在寻找一个简单的方法来确定Java如果当前日期是在夏令时间,所以我可以应用正确的偏移量.目前,我只对美国时区 ...

  5. java 判断文件类型是否是音频_用java流方式判断文件类型

    这个方法只能在有限的范围内有效.并不是万金油 比如 图片类型判断,音频文件格式判断,视频文件格式判断等这种肯定是2进制且专业性很强的文件类型判断. 下面给出完整版代码 首先是文件类型枚取 packag ...

  6. java 判断数组已经存满_详解Java中数组判断元素存在几种方式比较

    1. 通过将数组转换成List,然后使用List中的contains进行判断其是否存在 public static boolean useList(String[] arr,String contai ...

  7. java 去除干扰_【Selenium-WebDriver实战篇】Java丨验证码图片去除干扰像素,方便验证码的识别(转)...

    1.先来看看效果: 原图 除去干扰像素后 2.解析代码: 1).读取文件夹里面的图片 1 String fileName = "picture"; 2 BufferedImage ...

  8. java 判断 中文字符_java中判断字符串中是否有中文字符

    package com.meritit.test; public class TestChart { public static void main(String[] args) throws Exc ...

  9. java阿姆斯特朗数,Java判断阿姆斯特朗数

    Java判断阿姆斯特朗数 1 什么是阿姆斯特朗数 Java中的阿姆斯壮数字:如果正数等于其数字的立方之和,例如0.1.153.370.371.407等,则称为阿姆斯特朗数. 让我们尝试了解为什么153 ...

最新文章

  1. linux如何卸载挂载文件
  2. CentOS7.5下yum安装MySQL8图文教程
  3. squid启动失败的解决办法
  4. 单独的plsql链接数据库
  5. 重装系统后软件安装 ----一直更新
  6. 全国计算机等级考试题库二级C操作题100套(第38套)
  7. 黑马程序员--C语言基础之--sizeof()运算符的使用以及注意
  8. 支付巨头Visa宣布计划在巴西将加密货币服务引入传统银行
  9. C++ 为什么要引入异常处理机制
  10. teamviewer13试用期已到期,错装商业版怎么还原成个人版?
  11. 基带丢失、IMEI丢失、手机无信号--高通通用解决办法
  12. Distantly Supervised Named Entity Recognition using Positive-Unlabeled Learning(DS——NER识别(减少人工参与))
  13. python文档学习
  14. jrebel 反代理服务搭建
  15. 中国高校计算机大赛——微信小程序应用开发赛
  16. 大整数加法——求两个不超过200位的非负整数的和
  17. 如何编译Android的kernel,编译Android的kernel
  18. 美国标准信息交换标准码(ASCII)
  19. Android简易录音器——实现录音和播放功能
  20. src refspec test does not match any.

热门文章

  1. 中职高级计算机操作员,计算机操作员专业排行榜
  2. mysql+keepalived必须要lvs吗_Mysql双主热备+LVS+Keepalived高可用操作记录
  3. python开发小型数据库_Python开发【第十七篇】:MySQL(一)
  4. $.ajax防止多次点击重复提交的方法
  5. [BZOJ2017][Usaco2009 Nov]硬币游戏
  6. 【转】void及void指针的深刻解析
  7. 【bzoj1597】 土地购买
  8. 文档生成器 Xcode与Appledoc
  9. Ubuntu下MySQL忘记root密码重置
  10. c#中获取控件窗体句柄,获取窗体等的一些操作