往hive表中插入数据

法1:dataFrame数据写入hive表

def log2Hive():

log=hiveContext.createDataFrame([{"dt":dt,"types":types,"message":msg,"currtime":currTime}]).coalesce(1)#types: "INFO" ,"ERROR"

log.write.mode("append").insertInto("app.app_tion_log")

法2:dataFrame数据写入hive表

temp_vl_org=self.vl_data.join(self.order_slice,["po_no"]).coalesce(10)

os.system("hadoop fs -rm -r -skipTrash dev.db/" + str.lower(temp_vl))

hiveContext.sql("drop table if exists dev."+temp_vl)

temp_vlt_org.write.saveAsTable("dev."+temp_vl)

hiveContext写法

1.

order_slice= hiveContext.table("app.app_ rage").coalesce(10).where(col("dt")==self.order_new_dt).select("po_no","goods_no",col("distribution_no").alias("dc_id")).distinct().coalesce(10)

2.

log =hiveContext.createDataFrame([{"dt":dt,"types":types,"message":msg,"currtime":currTime}]).coalesce(1) #types:"INFO" ,"ERROR"

3.

hiveContext.sql("showpartitions app.app_ rage").select(func.max("partition").alias("partition")).rdd.map(lambda x:str(x[0])).take(1)[0]

4.dataFrame数据写入hive表

temp_vl_org=self.vl_data.join(self.order_slice,["po_no"]).coalesce(10)

os.system("hadoop fs -rm -r -skipTrash dev.db/" +str.lower(temp_vl))

hiveContext.sql("drop table if exists dev."+temp_vl)

temp_vlt_org.write.saveAsTable("dev."+temp_vl)

python 中执行hadoop命令

#删除表

os.system("hadoop fs -rm -r -skipTrashdev.db/" + str.lower(temp_vl))

#从集市1路径localFolde拷贝数据到集市2Hive表分区数据tbFolder

localFolder="app.db/app_filter /"

partitonName="dt="+self.table_new_dt

tbFolder= "hdfs://102.1.1.1:8080/user/cm_pc/app.db/app_filter/"

os.system("hadoop fs -rm -r -skipTrash"+tbFolder+partitonName)

os.system("hadoop distcp"+localFolder+partitonName+" "+tbFolder+partitonName)

# localFolder+partitonName为源路径,tbFolder+partitonName为目标路径

dataFrame和rdd互转,存入Hive表

#以下返回dataFrame类型值

df = hc.sql(sql).coalesce(5)

#dataFrame转rdd

r2 = df.rdd.map(lambda row :((row[0],row[1]),(row[2],row[3],row[4],row[5]))).groupByKey().map(lambda(k, v): sub_process(k, v))

cond=["rowkey","top_10","dt"]

#rdd转dataFrame

result=r2.toDF(cond)

#dataframe存入hive表

result.write.mode("append").insertInto(table_name)

创建dataFrame方法

法1:

#rdd转dataFrame

result=r2.toDF(cond)

法2:

order_slice= hiveContext.table("app.apprage").coalesce(10).where(col("dt")==self.ord_new_dt).select("po_no","goods_no",col("distribution_no").alias("dc_id")).distinct().coalesce(10)

法3:

log =hiveContext.createDataFrame([{"dt":dt,"types":types,"message":msg,"currtime":currTime}]).coalesce(1) #types:"INFO" ,"ERROR"

法4:

hiveContext.sql("showpartitions app.apprage")

法5:

从hdfs中读取数据变成dataFrame

input_path为hdfs路径

textFile=sc.textFile(input_path).map(lambdax: x.split("\x01"))

df = textFile.toDF(["seller_no","seller_name", "dept_no", "goods_no","shop_id", "sp_goods_no", "dc_id","erp_warehouse_no","real_outstore_qty", "sp_create_time"]).groupby(["seller_no","seller_name", "dept_no", "goods_no","dc_id", "erp_warehouse_no","sp_create_time"]).agg(func.sum("real_outstore_qty").alias("real_outstore_qty")).where((col("sp_create_time")< endDate) & (col("sp_create_time") >"2015-01-01"))

pyspark when

dataDf = hiveContext.table(self.table_name).where((col("dt")==self.table_new_dt)& (col("so_status")<> "10028") &(col("so_status")<> "10009"))\

.select("seller_no","seller_name","dept_no",

"goods_no",col("shop_id").cast("int"),F.when(func.isnull("sp_goods_no"),-1).otherwise(col("sp_goods_no").cast("int")).alias("sp_goods_no"),

col("distribution_no").cast("int").alias("dc_id"),col("erp_warehouse_no").cast("int"),

col("sp_create_time").cast("date"),F.when(func.isnull("apply_out_qty"),1).otherwise(col("apply_out_qty")).alias("apply_qty"),).coalesce(10)

dataFrame添加列

法1,2

self.groupedDf=dataDf.groupBy("seller_no","dept_no","goods_no","shop_id","sp_goods_no","dc_id","erp_warehouse_no","sp_create_time")\

#添加两列

.agg(func.max("seller_name").alias("seller_name"),func.sum("apply_outstore_qty").alias("apply_outstore_qty"))\

.withColumn("dt",lit(self.table_new_dt))

判断是否执行成功

if __name__ == "__main__":

main()

r = os.system("hadoop fs -test-e /tmp/for/result/e_cast/_SUCCESS")

if r != 0:

raise Exception("1")

else:

print ("执行spark获取eclp预测数据成功!!")

从hdfs导入数据到hive

def push2hive(self):

hiveContext.sql("LOAD DATA IN PATH '/tmp/for/result/ec_forecast' OVERWRITE INTO TABLE app.app_ ver1 PARTITION( dt  = '"+ _today+"')")

读取全路径下的Hive表

hdfs_path_ver1 ="/user/cmo_ipc/app.db/app_ ver1/dt=" + _today

df1=sc.textFile(hdfs_path_ver1).map(lambda x:x.split("\t")).map(lambdaline:((line[0],line[1],line[2],line[3]),line[8])).groupByKey().map(lambda(k,v): sales_nation_ver1(k,v))

pyspark写法总结相关推荐

  1. 【机器学习】3万字长文,PySpark入门级学习教程,框架思维

    为什么要学习Spark?作为数据从业者多年,个人觉得Spark已经越来越走进我们的日常工作了,无论是使用哪种编程语言,Python.Scala还是Java,都会或多或少接触到Spark,它可以让我们能 ...

  2. Pyspark学习笔记小总

    pyspark官方文档: https://spark.apache.org/docs/latest/api/python/index.html pyspark案例教程: https://sparkby ...

  3. PySpark:DataFrame及其常用列操作

    Spark版本:V3.2.1 1. DataFrame 虽然RDD是Spark最基本的抽象,但RDD的计算函数对Spark而言是不透明的.也就是说Spark并不知道你要在计算函数里干什么.无论你是要做 ...

  4. 【Pyspark教程】SQL、MLlib、Core等模块基础使用

    文章目录 零.Spark基本原理 0.1 pyspark.sql 核心类 0.2 spark的基本概念 0.3 spark部署方式 0.4 RDD数据结构 (1)创建RDD的2种方式 (2)RDD操作 ...

  5. pyspark的聚合函数agg使用

    pyspark中聚合函数agg的使用   作为聚合函数agg,通常是和分组函数groupby一起使用,表示对分组后的数据进行聚合操作: 如果没有分组函数,默认是对整个dataframe进行聚合操作. ...

  6. Pyspark 读 DataFrame 的使用与基本操作

    一.安装 基于 mac 操作系统 安装 jdk jdk 下载地址 安装 pyspark pip install pyspark 二.读取 HDFS 文件 读 json 注意,如果是多行的 json,需 ...

  7. pyspark入门教程

    目录 一.windows下配置pyspark环境 1.1 jdk下载安装 1.2 Scala下载安装 1.3 spark下载安装 1.4 Hadoop下载安装 1.5 pyspark下载安装 1.6 ...

  8. 3万字长文,PySpark入门级学习教程,框架思维

    来源:SAMshare 为什么要学习Spark?作为数据从业者多年,个人觉得Spark已经越来越走进我们的日常工作了,无论是使用哪种编程语言,Python.Scala还是Java,都会或多或少接触到S ...

  9. PySpark基础 —— SparkSQL

    一.快速入门 import findspark from pyspark.sql import SparkSessionfindspark.init() spark = SparkSession.bu ...

  10. hadoop,pySpark环境安装与运行实战《一》

    一.环境准备 环境最好再mac或者liunx环境搭建最为友好,不建议在windows上折腾. 1)安装java jdk 下载java jdk 并在~/.bash_profile配置,jdk  mac路 ...

最新文章

  1. 数据结构实验之链表五:单链表的拆分-sdut
  2. WEB学习-CSS盒模型
  3. 计算机分组教学,中职计算机教学分组协作式学习论文
  4. android 加减乘除计算器,【03-21求助】写一个简易计算器的安卓app,一按加减乘除就退出...
  5. 每日一学:如何读取网络图片
  6. 解决Django静态文件配置pycharm高光问题
  7. jQuery——入门(四)JQuery 事件
  8. 在指定命令下打开命令提示符的几种方式
  9. 微信开发者工具下载及公众号使用介绍
  10. c语言指针详解参数,C语言之指针详解
  11. 使用计算机程序的设计语言是,计算机程序设计语言有哪些?
  12. 三体第一部(13到22节)
  13. Win10声卡驱动正常但没声音怎么办?驱动人生解决办法
  14. 一台计算机多个屏幕,一台主机两个显示器,详细教您电脑两个显示器怎么设置...
  15. 我对智能网卡offload的认识
  16. Github Actions实现自定义编译OpenWRT固件和第三方插件
  17. 三菱PLC FX-3u 台达伺服/步进 8工位转盘螺丝机程序
  18. snipaste截图软件编辑时修改方框粗细
  19. linux log拆分
  20. 腾讯信鸽推送平台集成中的问题

热门文章

  1. Uncaught initialization exception thrown by servlet
  2. PAT乙级(Basic Level)练习题 星际密码[斐波那契][打表]
  3. 74HC595级联电路编程篇(四)
  4. Facecat的iOS自学笔记
  5. TTL 传输中过期,内部网络环路
  6. Linux elf可执行文件加密
  7. 黑盒测试学习笔记-(深圳文鹏)
  8. Android Studio中默认Botton的颜色(学习笔记)
  9. worldpress php7.2,centos7.4下word press环境由php5.6.4升级到php7.2
  10. 学php应该怎么学习数学,数学难学,数学到底该怎么学?