iceberg问题小结
20220402
pyspark读写iceberg# code:utf-8
import findspark
findspark.init(r"D:\Python37\Lib\site-packages\pyspark")
这里要指定pyspark的路径,如果是服务器的话最好用spark所在的pyspark路径
import os
java8_location = r'D:\Java\jdk1.8.0_301/' # 设置你自己的路径
os.environ['JAVA_HOME'] = java8_location
from pyspark.sql import SparkSessiondef get_spark():# pyspark 读iceberg表spark = SparkSession.builder.getOrCreate()spark.conf.set("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog")spark.conf.set("spark.sql.catalog.iceberg.type", "hive")spark.conf.set("spark.sql.catalog.iceberg.uri", "thrift://192.168.1.54:9083")不同的目标地址,不同的服务器集群,要拷贝对应的两个hive文件到当地客户端的pyspar conf文件夹下return sparkif __name__ == '__main__':spark = get_spark()pdf = spark.sql("select shangpgg from iceberg.test.end_spec limit 10")spark.sql("insert into iceberg.test.end_spec values ('aa','bb')")pdf.show()print()
1. 在pyspark下新建conf文件夹,把iceberg下的两个hive配置文件
放在下面
hdfs-site.xml
hive-site.xm
2. iceberg-spark3-runtime-0.13.1.jar 把这个文件放在pyspark的jars文件夹
20220315
self.config_iceberg = {"host": "192.168.1.55","port": 8881,"user": "root","catalog": "iceberg","schema": "ice_ods",}import trinoif connected_type == "iceberg":self.conn = trino.dbapi.connect(**self.config_iceberg)iceberg和trino的关系连接
sink写
source读
分区数最好和kafkatopic的分区数一样,否则用默认的200个分区很慢
按天分区相当于一天只有一个目录
https://blog.csdn.net/xuronghao/article/details/106184831
spark写入iceberg
partition具体分区
hadoop_prod是具体的catalog,tb是数据库
通过catalog两种方式hive或者hadoop来创建数仓
def save_to_db(data,database_type):"""保存至数据库:param data: 要保存的数据:return: 无返回值"""if database_type == '生产':trino_engine = create_engine("trino://root@192.168.1.55:8881/iceberg/ice_dwt") # 生产库else:trino_engine = create_engine('trino://root@192.168.40.11:8882/iceberg/ice_dwt') # 测试库times = int(np.ceil(data.shape[0] / 1000))for i in tqdm(range(times)):data.iloc[i * 1000 : (i + 1) * 1000, :].to_sql(name="dwt_dm_bi_b2b_customer_churn_wide",con=trino_engine,index=False,if_exists="append",schema="ice_dwt",method="multi",)logger.debug("存入数据库成功")
可以用此方式插入iceberg虽然速度慢一点
20220314
spark要写入iceberg需要一个alluxio-2.7.3-client.jar这个jar包
在alluxio下载下来的zip包里面
spark读写iceberg没有测试成功
iceberg问题小结相关推荐
- Flink、Iceberg和Hive的Catalog比较研究
所谓Catalog即数据目录,简单讲,Catalog是企业用于管理数据资产的方式,Catalog借助元数据来管理数据,包括数据收集.组织.访问.发现和治理.可见,Catalog在数据资产管理中处于核心 ...
- 开源数据湖方案选型:Hudi、Delta、Iceberg深度对比
文章目录 前言: 共同点 一.Databricks 和 Delta 1.1.Delta的意图,解决的疼点 1.没有 Delta 数据湖之前存在的问题 : 二.Uber和Apache Hudi 三.Ne ...
- 【阶段小结】协同开发——这学期的Git使用小结
[阶段小结]协同开发--这学期的Git使用小结 一.Git简介 1. Git简单介绍 2. Git工作流程以及各个区域 3. Git文件状态变化 二.Git安装&Git基本配置 三.个人踩坑 ...
- pyspark汇总小结
20220402 Spark报Total size of serialized results of 12189 tasks is bigger than spark.driver.maxResult ...
- 正则表达式(括号)、[中括号]、{大括号}的区别小结
正则表达式(括号).[中括号].{大括号}的区别小结 </h1><div class="clear"></div><div class=& ...
- php中$_REQUEST、$_POST、$_GET的区别和联系小结
php中$_REQUEST.$_POST.$_GET的区别和联系小结 作者: 字体:[增加 减小] 类型:转载 php中有$_request与$_post.$_get用于接受表单数据,当时他们有何种区 ...
- c cin.get()的用法小结_c语言中static 用法
static在c里面可以用来修饰变量,也可以用来修饰函数. 先看用来修饰变量的时候.变量在c里面可分为存在全局数据区.栈和堆里.其实我们平时所说的堆栈是栈而不是堆,不要弄混. int a ; int ...
- linux 压缩文件夹格式,Linux下常见文件格式的压缩、解压小结
Linux下常见文件格式的压缩.解压小结 .tar 解包: tar xvf FileName.tar 打包:tar cvf FileName.tar DirName (注:tar是打包,不是压缩!) ...
- 设计模式:简单工厂、工厂方法、抽象工厂之小结与区别
简单工厂,工厂方法,抽象工厂都属于设计模式中的创建型模式.其主要功能都是帮助我们把对象的实例化部分抽取了出来,优化了系统的架构,并且增强了系统的扩展性. 本文是本人对这三种模式学习后的一个小结以及对他 ...
最新文章
- 修改jenkins启动的默认用户
- python pexpect包的一些用法
- python类_python类和对象
- 创建 OVS vlan101 并部署 instance - 每天5分钟玩转 OpenStack(139)
- 01背包 || BZOJ 1606: [Usaco2008 Dec]Hay For Sale 购买干草 || Luogu P2925 [USACO08DEC]干草出售Hay For Sale...
- obj: object是什么意思_面试官问你JavaScript基本类型时他想知道什么?
- java是解释型编程语言_程序设计语言可以分为两类:编译型语言和解释型语言...
- android绘制论文,基于Android平台的三维地形绘制研究与实现
- python all和any用法_python any()和all()用法
- 【CCCC】L2-026 小字辈 (25分),求多叉树的深度和底层叶节点
- Hosts文件与钓鱼网站
- 让 Edit 只接受数字《转》
- 几种常见的JavaScript特效
- 交通灯控制(软件延时法)C语言,智能交通灯控制系统软件部分(49页)-原创力文档...
- 微信服务号、订阅号和企业号的区别(运营和开发两个角度)
- gwas snp 和_GWAS笔记SNP过滤
- aruba交换机配置命令_Aruba 无线交换机基本操作命令
- openfiler修改ip的命令_openfiler模拟ISCSI设备的配置
- linux 监控报文命令 nc,linux监控命令nc用法
- 【Unity VR开发基础】Player视角设置调整与地面的相对高度