pyspark写法总结
往hive表中插入数据
法1:dataFrame数据写入hive表
def log2Hive():
log=hiveContext.createDataFrame([{"dt":dt,"types":types,"message":msg,"currtime":currTime}]).coalesce(1)#types: "INFO" ,"ERROR"
log.write.mode("append").insertInto("app.app_tion_log")
法2:dataFrame数据写入hive表
temp_vl_org=self.vl_data.join(self.order_slice,["po_no"]).coalesce(10)
os.system("hadoop fs -rm -r -skipTrash dev.db/" + str.lower(temp_vl))
hiveContext.sql("drop table if exists dev."+temp_vl)
temp_vlt_org.write.saveAsTable("dev."+temp_vl)
hiveContext写法
1.
order_slice= hiveContext.table("app.app_ rage").coalesce(10).where(col("dt")==self.order_new_dt).select("po_no","goods_no",col("distribution_no").alias("dc_id")).distinct().coalesce(10)
2.
log =hiveContext.createDataFrame([{"dt":dt,"types":types,"message":msg,"currtime":currTime}]).coalesce(1) #types:"INFO" ,"ERROR"
3.
hiveContext.sql("showpartitions app.app_ rage").select(func.max("partition").alias("partition")).rdd.map(lambda x:str(x[0])).take(1)[0]
4.dataFrame数据写入hive表
temp_vl_org=self.vl_data.join(self.order_slice,["po_no"]).coalesce(10)
os.system("hadoop fs -rm -r -skipTrash dev.db/" +str.lower(temp_vl))
hiveContext.sql("drop table if exists dev."+temp_vl)
temp_vlt_org.write.saveAsTable("dev."+temp_vl)
python 中执行hadoop命令
#删除表
os.system("hadoop fs -rm -r -skipTrashdev.db/" + str.lower(temp_vl))
#从集市1路径localFolde拷贝数据到集市2Hive表分区数据tbFolder
localFolder="app.db/app_filter /"
partitonName="dt="+self.table_new_dt
tbFolder= "hdfs://102.1.1.1:8080/user/cm_pc/app.db/app_filter/"
os.system("hadoop fs -rm -r -skipTrash"+tbFolder+partitonName)
os.system("hadoop distcp"+localFolder+partitonName+" "+tbFolder+partitonName)
# localFolder+partitonName为源路径,tbFolder+partitonName为目标路径
dataFrame和rdd互转,存入Hive表
#以下返回dataFrame类型值
df = hc.sql(sql).coalesce(5)
#dataFrame转rdd
r2 = df.rdd.map(lambda row :((row[0],row[1]),(row[2],row[3],row[4],row[5]))).groupByKey().map(lambda(k, v): sub_process(k, v))
cond=["rowkey","top_10","dt"]
#rdd转dataFrame
result=r2.toDF(cond)
#dataframe存入hive表
result.write.mode("append").insertInto(table_name)
创建dataFrame方法
法1:
#rdd转dataFrame
result=r2.toDF(cond)
法2:
order_slice= hiveContext.table("app.apprage").coalesce(10).where(col("dt")==self.ord_new_dt).select("po_no","goods_no",col("distribution_no").alias("dc_id")).distinct().coalesce(10)
法3:
log =hiveContext.createDataFrame([{"dt":dt,"types":types,"message":msg,"currtime":currTime}]).coalesce(1) #types:"INFO" ,"ERROR"
法4:
hiveContext.sql("showpartitions app.apprage")
法5:
从hdfs中读取数据变成dataFrame
input_path为hdfs路径
textFile=sc.textFile(input_path).map(lambdax: x.split("\x01"))
df = textFile.toDF(["seller_no","seller_name", "dept_no", "goods_no","shop_id", "sp_goods_no", "dc_id","erp_warehouse_no","real_outstore_qty", "sp_create_time"]).groupby(["seller_no","seller_name", "dept_no", "goods_no","dc_id", "erp_warehouse_no","sp_create_time"]).agg(func.sum("real_outstore_qty").alias("real_outstore_qty")).where((col("sp_create_time")< endDate) & (col("sp_create_time") >"2015-01-01"))
pyspark when
dataDf = hiveContext.table(self.table_name).where((col("dt")==self.table_new_dt)& (col("so_status")<> "10028") &(col("so_status")<> "10009"))\
.select("seller_no","seller_name","dept_no",
"goods_no",col("shop_id").cast("int"),F.when(func.isnull("sp_goods_no"),-1).otherwise(col("sp_goods_no").cast("int")).alias("sp_goods_no"),
col("distribution_no").cast("int").alias("dc_id"),col("erp_warehouse_no").cast("int"),
col("sp_create_time").cast("date"),F.when(func.isnull("apply_out_qty"),1).otherwise(col("apply_out_qty")).alias("apply_qty"),).coalesce(10)
dataFrame添加列
法1,2
self.groupedDf=dataDf.groupBy("seller_no","dept_no","goods_no","shop_id","sp_goods_no","dc_id","erp_warehouse_no","sp_create_time")\
#添加两列
.agg(func.max("seller_name").alias("seller_name"),func.sum("apply_outstore_qty").alias("apply_outstore_qty"))\
.withColumn("dt",lit(self.table_new_dt))
判断是否执行成功
if __name__ == "__main__":
main()
r = os.system("hadoop fs -test-e /tmp/for/result/e_cast/_SUCCESS")
if r != 0:
raise Exception("1")
else:
print ("执行spark获取eclp预测数据成功!!")
从hdfs导入数据到hive
def push2hive(self):
hiveContext.sql("LOAD DATA IN PATH '/tmp/for/result/ec_forecast' OVERWRITE INTO TABLE app.app_ ver1 PARTITION( dt = '"+ _today+"')")
读取全路径下的Hive表
hdfs_path_ver1 ="/user/cmo_ipc/app.db/app_ ver1/dt=" + _today
df1=sc.textFile(hdfs_path_ver1).map(lambda x:x.split("\t")).map(lambdaline:((line[0],line[1],line[2],line[3]),line[8])).groupByKey().map(lambda(k,v): sales_nation_ver1(k,v))
pyspark写法总结相关推荐
- 【机器学习】3万字长文,PySpark入门级学习教程,框架思维
为什么要学习Spark?作为数据从业者多年,个人觉得Spark已经越来越走进我们的日常工作了,无论是使用哪种编程语言,Python.Scala还是Java,都会或多或少接触到Spark,它可以让我们能 ...
- Pyspark学习笔记小总
pyspark官方文档: https://spark.apache.org/docs/latest/api/python/index.html pyspark案例教程: https://sparkby ...
- PySpark:DataFrame及其常用列操作
Spark版本:V3.2.1 1. DataFrame 虽然RDD是Spark最基本的抽象,但RDD的计算函数对Spark而言是不透明的.也就是说Spark并不知道你要在计算函数里干什么.无论你是要做 ...
- 【Pyspark教程】SQL、MLlib、Core等模块基础使用
文章目录 零.Spark基本原理 0.1 pyspark.sql 核心类 0.2 spark的基本概念 0.3 spark部署方式 0.4 RDD数据结构 (1)创建RDD的2种方式 (2)RDD操作 ...
- pyspark的聚合函数agg使用
pyspark中聚合函数agg的使用 作为聚合函数agg,通常是和分组函数groupby一起使用,表示对分组后的数据进行聚合操作: 如果没有分组函数,默认是对整个dataframe进行聚合操作. ...
- Pyspark 读 DataFrame 的使用与基本操作
一.安装 基于 mac 操作系统 安装 jdk jdk 下载地址 安装 pyspark pip install pyspark 二.读取 HDFS 文件 读 json 注意,如果是多行的 json,需 ...
- pyspark入门教程
目录 一.windows下配置pyspark环境 1.1 jdk下载安装 1.2 Scala下载安装 1.3 spark下载安装 1.4 Hadoop下载安装 1.5 pyspark下载安装 1.6 ...
- 3万字长文,PySpark入门级学习教程,框架思维
来源:SAMshare 为什么要学习Spark?作为数据从业者多年,个人觉得Spark已经越来越走进我们的日常工作了,无论是使用哪种编程语言,Python.Scala还是Java,都会或多或少接触到S ...
- PySpark基础 —— SparkSQL
一.快速入门 import findspark from pyspark.sql import SparkSessionfindspark.init() spark = SparkSession.bu ...
- hadoop,pySpark环境安装与运行实战《一》
一.环境准备 环境最好再mac或者liunx环境搭建最为友好,不建议在windows上折腾. 1)安装java jdk 下载java jdk 并在~/.bash_profile配置,jdk mac路 ...
最新文章
- 数据结构实验之链表五:单链表的拆分-sdut
- WEB学习-CSS盒模型
- 计算机分组教学,中职计算机教学分组协作式学习论文
- android 加减乘除计算器,【03-21求助】写一个简易计算器的安卓app,一按加减乘除就退出...
- 每日一学:如何读取网络图片
- 解决Django静态文件配置pycharm高光问题
- jQuery——入门(四)JQuery 事件
- 在指定命令下打开命令提示符的几种方式
- 微信开发者工具下载及公众号使用介绍
- c语言指针详解参数,C语言之指针详解
- 使用计算机程序的设计语言是,计算机程序设计语言有哪些?
- 三体第一部(13到22节)
- Win10声卡驱动正常但没声音怎么办?驱动人生解决办法
- 一台计算机多个屏幕,一台主机两个显示器,详细教您电脑两个显示器怎么设置...
- 我对智能网卡offload的认识
- Github Actions实现自定义编译OpenWRT固件和第三方插件
- 三菱PLC FX-3u 台达伺服/步进 8工位转盘螺丝机程序
- snipaste截图软件编辑时修改方框粗细
- linux log拆分
- 腾讯信鸽推送平台集成中的问题
热门文章
- Uncaught initialization exception thrown by servlet
- PAT乙级(Basic Level)练习题 星际密码[斐波那契][打表]
- 74HC595级联电路编程篇(四)
- Facecat的iOS自学笔记
- TTL 传输中过期,内部网络环路
- Linux elf可执行文件加密
- 黑盒测试学习笔记-(深圳文鹏)
- Android Studio中默认Botton的颜色(学习笔记)
- worldpress php7.2,centos7.4下word press环境由php5.6.4升级到php7.2
- 学php应该怎么学习数学,数学难学,数学到底该怎么学?