思路如下:

  1. 定义一个POJO类,注意flink里使用的类必须有一个无参的构造方法
  2. 自定义DataSource实现SourceFunction接口
  3. 使用ctx.collect()传入想要发送的数据就可以了

首先定义一个POJO类:

class MyData {public int keyId;public long timestamp;public double value;public MyData() {}public MyData(int accountId, long timestamp, double value) {this.keyId = accountId;this.timestamp = timestamp;this.value = value;}public long getKeyId() {return keyId;}public void setKeyId(int keyId) {this.keyId = keyId;}public long getTimestamp() {return timestamp;}public void setTimestamp(long timestamp) {this.timestamp = timestamp;}public double getValue() {return value;}public void setValue(double value) {this.value = value;}@Overridepublic String toString() {return "MyData{" +"keyId=" + keyId +", timestamp=" + timestamp +", value=" + value +'}';}
}

生成自己的数据:

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.Random;public class CreateMyData {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<MyData> sourceStream = env.addSource(new MyDataSource());env.setParallelism(3);sourceStream.print();env.execute();}private static class MyDataSource implements SourceFunction<MyData> {// 定义标志位,用来控制数据的产生private boolean isRunning = true;private final Random random = new Random(0);@Overridepublic void run(SourceContext ctx) throws Exception {while (isRunning) {ctx.collect(new MyData(random.nextInt(5), System.currentTimeMillis(), random.nextFloat()));Thread.sleep(1000L); // 1s生成1个数据}}@Overridepublic void cancel() {isRunning = false;}}
}

Flink java模拟生成自定义流式数据相关推荐

  1. 小白学习Flink系列--第二篇-01(流式数据概念)

    导读 要想彻底理解Flink,就要了解流数据的前世今生,流数据的语义.特点,以及如何处理,以下文章就能很好的解释流数据的概念和模型,对了解Flink有很大的帮助 前言 今天流式数据处理在大数据领域是一 ...

  2. flink源码分析_Flink源码分析之深度解读流式数据写入hive

    前言 前段时间我们讲解了flink1.11中如何将流式数据写入文件系统和hive [flink 1.11 使用sql将流式数据写入hive],今天我们来从源码的角度深入分析一下.以便朋友们对flink ...

  3. Apache Griffin+Flink+Kafka实现流式数据质量监控实战

    点击上方蓝色字体,选择"设为星标" 回复"面试"获取更多惊喜 八股文教给我,你们专心刷题和面试 Hi,我是王知无,一个大数据领域的原创作者. 放心关注我,获取更 ...

  4. Iceberg 在基于 Flink 的流式数据入库场景中的应用

    本文以流式数据入库的场景为基础,介绍引入 Iceberg 作为落地格式和嵌入 Flink sink 的收益,并分析了当前可实现的框架及要点. 应用场景 流式数据入库,是大数据和数据湖的典型应用场景.上 ...

  5. 使用 Flink Hudi 构建流式数据湖

    简介: 本文介绍了 Flink Hudi 通过流计算对原有基于 mini-batch 的增量计算模型的不断优化演进. 本文介绍了 Flink Hudi 通过流计算对原有基于 mini-batch 的增 ...

  6. 大数据Hadoop之——新一代流式数据湖平台 Apache Hudi

    文章目录 一.概述 二.Hudi 架构 三.Hudi的表格式 1)Copy on Write(写时复制) 2)Merge On Read(读时合并) 3)COW vs MOR 四.元数据表(Metad ...

  7. DataSketch求流式数据分位数

    类似的功能Spark.Flink等都实现了,但是使用起来比较繁琐.Apache下的DataSketch用起来稍微简单一些.先导包: <dependency><groupId>o ...

  8. kafka处理流式数据_通过Apache Kafka集成流式传输大数据

    kafka处理流式数据 从实时过滤和处理大量数据,到将日志数据和度量数据记录到不同来源的集中处理程序中,Apache Kafka越来越多地集成到各种系统和解决方案中. 使用CData Sync ,可以 ...

  9. python 可视化监控平台_python可视化篇之流式数据监控的实现

    preface 流式数据的监控,以下主要是从算法的呈现出发,提供一种python的实现思路 其中: 1.python是2.X版本 2.提供两种实现思路,一是基于matplotlib的animation ...

最新文章

  1. python实现图结构github_Github项目+代码:新型深度网络体系结构去除图像中的雨水痕迹...
  2. python面试-python简单面试题
  3. command line
  4. 移动硬盘无法弹出的问题
  5. (231)DPU数据处理单元现有产品介绍
  6. Python 3.65 安装geopandas
  7. 两球完全弹性碰撞反弹方向问题
  8. 使用PS2019制作明信片
  9. 黑白双色背景图java_黄色背景黑白双色简洁ppt图表
  10. Python实现isPrime函数----新手
  11. 众测、专属、渗透测试捡破烂小tips
  12. delphi xe7 EMS是什么 什么作用,怎么使用?
  13. xcode 软件˙∆集~
  14. Cursor攻略,吃个螃蟹
  15. 2022年新一代最强开源UI自动化测试神器 ——Playwright(三)
  16. 关于内外网数据同步解决方案
  17. intellij idea报错:类文件具有错误的版本 61.0, 应为 52.0
  18. 计算机公式SUBSTITUTE,全了,SUBSTITUTE函数常用套路集合!
  19. Storm_Storm主要特点
  20. internal/modules/cjs/loader.js:584

热门文章

  1. C++:数字字符的出现次数
  2. redis 分页_Redis排行榜的设计与实现
  3. 串口读写flash_老司机带路:LPC82x 存储器及读写保护 手到擒来!
  4. UE4 AnimMontage
  5. Python3 离线安装第三方包
  6. 29.C++- 异常处理
  7. NGUI的slider的滑动条制作(SliderScript)
  8. IDEA 工具使用报错总结
  9. ansible安装和基本使用
  10. 小鱼易连全系新品正式发布 引爆音视频会议行业核聚变