数据湖之iceberg系列(五)-Spark实时处理数据
1 接收网络数据 将数据实时写入到iceberg表中
开启nc 服务用于模拟数据输出
nc -lk 9999
2 spark实时读取数据将数据写入到iceberg表中
// 获取spark对象
val spark = SparkSession.builder()
.config("spark.sql.catalog.hadoop_prod.type", "hadoop") // 设置数据源类别为hadoop
.config("spark.sql.catalog.hadoop_prod", classOf[SparkCatalog].getName)
// 指定Hadoop数据源的根目录
.config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://linux01:8020//doit/iceberg/warehouse/") // 设置数据源位置
.appName(this.getClass.getSimpleName)
.master("local[*]")
.getOrCreate()
// 接收数据
val lines = spark.readStream.format("socket").option("host", "linux01").option("port", 9999).load()
// 处理数据成DF
import spark.implicits._
val data: DataFrame = lines.map(row => row.getAs[String]("value")).map(s => {
val split: Array[String] = s.split(",")
(split(0).toInt, split(1),split(2).toInt)
}).toDF("id", "name","age")
// 指定hadoop表位置
val tableIdentifier: String = "hdfs://linux01:8020/doit/iceberg/warehouse/default/tb_user"
// 将数据写入到hadoop类型的表中
val query = data.writeStream.outputMode("append").format("iceberg").option("path", tableIdentifier).option("checkpointLocation", "/").start()
query.awaitTermination()
spark.close()
3 spark读取iceberg表中的数据
Logger.getLogger("org").setLevel(Level.ERROR)
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.config("spark.sql.catalog.hadoop_prod.type", "hadoop") // 设置数据源类别为hadoop
.config("spark.sql.catalog.hadoop_prod", classOf[SparkCatalog].getName)
// 指定Hadoop数据源的根目录
.config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://linux01:8020//doit/iceberg/warehouse/") // 设置数据源位置
.appName(this.getClass.getSimpleName)
.master("local[*]")
.getOrCreate()
val lines = spark.read.format("iceberg").load("hdfs://linux01:8020/doit/iceberg/warehouse/default/tb_user")
lines.printSchema()
lines.createTempView("tb_user")
// 展示表所有的文件和所有的快照信息
spark.sql("select * from hadoop_prod.default.tb_user.files").show()
spark.sql("select * from hadoop_prod.default.tb_user.snapshots").show()
// 查询指定快照的数据
val lines2= spark.read.format("iceberg").option("snapshot-id", 9146975902480919479L).load("hdfs://linux01:8020/doit/iceberg/warehouse/default/tb_user")
lines2.show()
// lines.show()
spark.close()
}
结果如下
root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
+-------+--------------------+-----------+------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+-------------+------------+
|content| file_path|file_format|record_count|file_size_in_bytes| column_sizes| value_counts| null_value_counts| lower_bounds| upper_bounds|key_metadata|split_offsets|equality_ids|
+-------+--------------------+-----------+------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+-------------+------------+
| 0|hdfs://linux01:80...| PARQUET| 1| 833|[1 -> 46, 2 -> 53...|[1 -> 1, 2 -> 1, ...|[1 -> 0, 2 -> 0, ...|[1 -> ! , 2 -> ...|[1 -> ! , 2 -> ...| null| [4]| null|
| 0|hdfs://linux01:80...| PARQUET| 1| 835|[1 -> 47, 2 -> 53...|[1 -> 1, 2 -> 1, ...|[1 -> 0, 2 -> 0, ...|[1 -> , 2 -> ...|[1 -> , 2 -> ...| null| [4]| null|
| 0|hdfs://linux01:80...| PARQUET| 1| 840|[1 -> 47, 2 -> 53...|[1 -> 1, 2 -> 1, ...|[1 -> 0, 2 -> 0, ...|[1 -> , 2 -> ...|[1 -> , 2 -> ...| null| [4]| null|
| 0|hdfs://linux01:80...| PARQUET| 1| 842|[1 -> 47, 2 -> 54...|[1 -> 1, 2 -> 1, ...|[1 -> 0, 2 -> 0, ...|[1 -> , 2 -> ...|[1 -> , 2 -> ...| null| [4]| null|
| 0|hdfs://linux01:80...| PARQUET| 1| 842|[1 -> 47, 2 -> 54...|[1 -> 1, 2 -> 1, ...|[1 -> 0, 2 -> 0, ...|[1 -> , 2 -> ...|[1 -> , 2 -> ...| null| [4]| null|
| 0|hdfs://linux01:80...| PARQUET| 1| 849|[1 -> 47, 2 -> 55...|[1 -> 1, 2 -> 1, ...|[1 -> 0, 2 -> 0, ...|[1 -> , 2 -> ...|[1 -> , 2 -> ...| null| [4]| null|
+-------+--------------------+-----------+------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+-------------+------------+
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
| committed_at| snapshot_id| parent_id|operation| manifest_list| summary|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|2020-12-05 15:13:...|4974727741303617264| null| append|hdfs://linux01:80...|[spark.app.id -> ...|
|2020-12-05 15:13:...|6649969826606152854|4974727741303617264| append|hdfs://linux01:80...|[spark.app.id -> ...|
|2020-12-05 15:14:...|9146975902480919479|6649969826606152854| append|hdfs://linux01:80...|[spark.app.id -> ...|
|2020-12-05 15:26:...|3789248833638708269|9146975902480919479| append|hdfs://linux01:80...|[spark.app.id -> ...|
|2020-12-05 15:27:...| 145534978715502615|3789248833638708269| append|hdfs://linux01:80...|[spark.app.id -> ...|
|2020-12-05 15:43:...| 677713801965958716| 145534978715502615| append|hdfs://linux01:80...|[spark.app.id -> ...|
|2020-12-05 15:44:...|3022463020588869964| 677713801965958716| append|hdfs://linux01:80...|[spark.app.id -> ...|
|2020-12-05 15:44:...|4764864293483030282|3022463020588869964| append|hdfs://linux01:80...|[spark.app.id -> ...|
|2020-12-05 15:44:...|8363256205651138549|4764864293483030282| append|hdfs://linux01:80...|[spark.app.id -> ...|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
————————————————
版权声明:本文为CSDN博主「白眼黑刺猬」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/qq_37933018/article/details/110690749
数据湖之iceberg系列(五)-Spark实时处理数据相关推荐
- 数据湖之iceberg系列(一)iceberg能做什么
1 前言 HIVE的缺陷 Hive的元数据依赖一个外部的MySQL和HDFS文件系统,通过MySQL找到相关的parition之后,需要为每个partition去HDFS文件系统上按照分区做目录的li ...
- 数据湖之iceberg系列(三)iceberg快速入门
1 环境准备 准备大数据集群 .安装HDFS ,HIVE,SAPRK ,FLINK 下载运行集群环境运行是需要的jar包 下载地址:http://iceberg.apache.org/releases ...
- 数据湖之iceberg系列(四)iceberg-spark编程
1 创建maven项目 添加依赖 <properties> <maven.compiler.source>1.8</maven.compiler.sour ...
- 数据湖技术之Hudi 集成 Spark
数据湖技术之Hudi 集成 Spark 数据湖框架Hudi,从诞生之初支持Spark进行操作,后期支持Flink,接下来先看看与Spark整合使用,并且在0.9.0版本中,提供SparkSQL支持,编 ...
- 数据湖技术 Iceberg 的探索与实践
随着大数据存储和处理需求的多样化,如何构建一个统一的数据湖存储,并在其上进行多种形式的数据分析成了企业构建大数据生态的一个重要方向.Netflix 发起的 Apache Iceberg 项目具备 AC ...
- 云原生数据湖为什么要选择腾讯云大数据DLC,一份性能分析报告告诉你!
摘要 日前,腾讯云大数据数据湖计算 DLC 与国内两家知名云厂商的数据湖产品进行了性能对比,其中腾讯云 DLC 在三款产品中SQL平均执行查询时间短,性能表现优.腾讯云大数据 DLC 在存算分离和大数 ...
- 【Python数据科学快速入门系列 | 06】Matplotlib数据可视化基础入门(一)
这是机器未来的第52篇文章 原文首发地址:https://robotsfutures.blog.csdn.net/article/details/126899226 <Python数据科学快速入 ...
- rdd数据存内存 数据量_spark系列:spark core 数据交互技术点(数据模型)
spark core实现了spark的基本功能:存储交互.任务调度.内存管理.错误恢复等:本片文章主要介绍与数据交互相关的核心技术点. 本文目录: RDD特性及交互 shuffle操作及调优 RDD持 ...
- 数据湖架构Hudi(五)Hudi集成Flink案例详解
五.Hudi集成Flink案例详解 5.1 hudi集成flink flink的下载地址: https://archive.apache.org/dist/flink/ Hudi Supported ...
最新文章
- oracle添加语句 commit,Oracle COMMIT语句
- Kaggle金牌得主的Python数据挖掘框架,机器学习基本流程都讲清楚了
- 是什么造成了网管员的低工资?
- python中处理日期和时间的标准模块是-Python time模块参考手册
- 重拾简单的linux指令之info 【转】
- python abs()函数是什么意思?
- 线程池拒绝策略 开发中常用什么策略_面试官:说说你知道多少种线程池拒绝策略...
- html5制作人物动作,骨骼动画制作新利器:快速制作动作人物动画,省时简单!...
- 加权平均np.average()
- Excel表格复制到Foxmail不显示边框
- 详解互联网平台的资金系统方案 自建支付清结算系统优势明显
- 《C++ Concurrency in Action》笔记28 无锁并行数据结构
- 五、椒盐排骨(Pepper Salt Spareribs)
- 哪些5G芯片和5G模组已经问世?| 截止至2020年Q1
- 考研专业课,到底要不要报辅导班?
- 免费报名 | 腾讯云自研数据库CynosDB交流会
- java官网教程(基础篇)—— 基础的Java类 —— 基础 I / O
- 简易天体运动—— sun earth moon(计算机图形学)
- sublime粘贴图片问题
- 专利与论文-7:专利在哪儿?如何发现专利?思维分析方法
热门文章
- 9行代码AC_HDU-6374 Decimal(余数,因子)
- 原理详解与标准解法——蓝桥杯_2016年省赛B组 第七题 剪邮票(暴力+迷宫变形)
- 电信服务器维修人员职责,维修人员岗位职责
- 小学计算机考查方案,宋家塘街道中心学校2020年理化生实验操作和信息技术考试方案...
- OpenStack(三)——Glance组件
- spring cloud 熔断_Spring Cloud 熔断器/断路器 Hystrix
- python笔记之序列(str的基本使用和常用操作)
- STM32 ADC采样使用内部参考电压
- Select的OnChange()事件中获取选中的值
- linux环境没有bzip2,Linux系统中安装使用Bzip2来压缩文件的方法讲解