1、df.na.fill({'字段名1':'default','字段名2':'default'})   对空值进行替换

2、df.dropDuplicaates()    去重根据字段名进行去重,空参为全部字段

3、df.subtract(df1)     返回在当前df中出现,并且不在df1中出现的元素,不去重。

4、print time.localtime([timestamp])    如下格式

time.struct_time(tm_year=2018, tm_mon=10, tm_mday=9, tm_hour=16, tm_min=52, tm_sec=10, tm_wday=1, tm_yday=282, tm_isdst=0)

5、print time.strptime('2018-12-14 15:45:12', '%Y-%m-%d %H:%M:%S')   如下格式

time.struct_time(tm_year=2018, tm_mon=12, tm_mday=14, tm_hour=15, tm_min=45, tm_sec=12, tm_wday=4, tm_yday=348, tm_isdst=-1)

6、时间戳转格式化时间

time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(timestamp))# 格式化成2016-03-20 11:45:39形式
print time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())# 格式化成Sat Mar 28 22:24:24 2016形式
print time.strftime("%a %b %d %H:%M:%S %Y", time.localtime())

7、时间转时间戳

print time.mktime(time.strptime('2018-12-14 15:45:12', '%Y-%m-%d %H:%M:%S'))

8、取某一天的前后多少天

def getPreDate(cp, days):#转换为时间cp_from = (datetime.strptime(str(cp), '%Y%m%d%H') + timedelta(days)).strftime('%Y%m%d%H')return cp_from

9、os.path.join(path_prefix,'parquet','path')     的使用方法

print os.path.join('/user/bbd/dev/xxxx/xxxx/', 'parquet/', 'ccccc.txt')/user/bbd/dev/xxxx/xxxx/parquet/ccccc.txt

10、时间取差值的方法,计算两个时间相差多少天

def get_history_date():start_date = datetime(2018,10,10)end_date = datetime(2018,10,31)        print (end_date - start_date).days   #返回相差的天print end_date - start_date            #返回相差的for i in range(0,(end_date - start_date).days):cp_from = (start_date + timedelta(days=i)).strftime('%Y%m%d00')cp_end = (end_date + timedelta(days=i + 1)).strftime('%Y%m%d00')print str(cp_from) + '\t' + str(cp_end)return: 如下2121 days, 0:00:002018101000 20181101002018101100    20181102002018101200    2018110300。。。。

11、not  就是取反的意思   字符串为空,none,数字为0, 对象为空, not后返回的都是真

def not_ceshi():a = ''b='as'c= Noned=0e=-1if not a:print 'a为假'if b:print 'b为真'if not c:print 'c为假'if not d:print 'd为0'if e:print 'e为-1'结果:a为假b为真c为假d为0e为-1

12、dataframe的drop,可以先分区排序,再对排序进行drop,num为窗口后分区排序的row_number()字段

df.where(num=='1').drop(num)返回选择num为1的,并将num字段去除

13、sorted 高阶函数的使用    如下

def cmp_ignore_case(s1, s2):if s1.lower()[:1] > s2.lower()[:1]:return 1elif s1.lower()[:1] < s2.lower()[:1]:return -1else:return 0
#sorted 重新定义排序,可以只传入一个参数,给定 (key=比较的参数的自定义函数)
cmp_ignore_case2 = lambda s: s[:1].lower()print sorted(['bob', 'about', 'Zoo', 'Credit'], key=cmp_ignore_case2)
print sorted(['bob', 'about', 'Zoo', 'Credit'], cmp_ignore_case)结果返回相同:['about', 'bob', 'Credit', 'Zoo']['about', 'bob', 'Credit', 'Zoo']

14、在我们进行dataframe的互操作时,需要用到unionall的时候,如果字段数量不一致,可为其填充空白字段,都为空

sql = 'SELECT '' 相同的字段名, '' 相同的字段名 。。。'相当于补全字段

15、验证是否个人邮箱,

email_regx = '([a-zA-Z0-9]{1}[a-zA-Z0-9\.\+\-.]{0,63}@{1}[a-zA-Z0-9\-_]+(\.[a-zA-Z0-9\-_]+){1,4})'
def udf_format_email_string(data):try:data = data.strip()email = re.findall(email_regx, data)print emailif email:res = email[0][0]except_email = ['np-reply@','noreply@','service@','support@','info@','Postmaster@','custmer_service@','admin@','VOICE=','notification@','10086@','10000sx@','10086cq@']for item in except_email:if res.startswith(item):return ''return resexcept:passreturn ''是的话,返回邮箱,不是返回空

这里有一个知识点,就是re.findall(regx,data),就是这里的regx,有两种情况,带大括号和不带括号,还有小括号

import restring="abcdefg  acbdgef  abcdgfe  cadbgfe"#带括号与不带括号的区别
#带大括号和小括号
regex=re.compile("((\w+)\s+\w+)")
print(regex.findall(string))
#输出:[('abcdefg  acbdgef', 'abcdefg'), ('abcdgfe  cadbgfe', 'abcdgfe')]#只带小括号
regex1=re.compile("(\w+)\s+\w+")
print(regex1.findall(string))
#输出:['abcdefg', 'abcdgfe']#不带括号
regex2=re.compile("\w+\s+\w+")
print(regex2.findall(string))
#输出:['abcdefg  acbdgef', 'abcdgfe  cadbgfe']return :regex:返回的是tuple (大括号匹配的,小括号匹配的)regex1:返回的是小括号里匹配的regex2:返回的是全部匹配的

16、在pyspark中,对dataframe进行操作,对某个中文字段进行max(col)操作,分组后取出现的第一个,类似于根据姓名去重,一个邮箱只能对应一个姓名

select email format_data(max(name)) from email_detal group by email根据email进行分组,对姓名进行去重,但是有可能姓名有差异

17、一些dataframe的api示例

def get_df():conf = SparkConf()spark = SparkSession.builder \.master("local[*]") \.appName("get_df") \.config(conf=conf) \.enableHiveSupport() \.getOrCreate()rdd1 = spark.sparkContext.parallelize([('make',24,198),('make',23,198),('tubu',24,198),('tubu',23,198),('mark',24,198),('mark',23,198),('uzi',24,198),('uzi',23,197)])rdd2 = spark.sparkContext.parallelize([('make',24,198),('tubu',24,198)])schema = StructType([StructField('name',StringType(), True),StructField('age',LongType(), True),StructField('hight',LongType(), True)])df1 = spark.createDataFrame(rdd1,schema)df2 = spark.createDataFrame(rdd2,schema)df1.createTempView('tmp')sql = 'select *, row_number() over (partition by name,age order by hight) as num from tmp'sql2 = 'select `name`,age, case when age=24 then age else age+2 end  re_age, hight,"table_name" tablename from tmp'spark.sql(sql).where('num=1').drop("num").show()   #删除某列字段df1.subtract(df2).show()                #df1中去掉df2中的完全相同的数据df1.dropDuplicates(['name','age']).show()  #去掉指定两列的重复数据,不加参,为全部列df1.drop_duplicates(['name','age']).show()    #同上df1.join(df2,df1.name == df2.name,'left').show()   #各种join   spark.sql(sql2).drop('age').show()        #根据某些的字段来确定另一个字段的取值

18、注册 py文件  logging   如下

import logging,os
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)用来测试是否有相关信息

19、将多张表我们需要的字段清洗进parquet文件,后面的维度需要哪些自己读取进行相关过滤就可以了,首先对我们需要对基础表的信息了解,然后根据字段含义,对应清洗进对应字段,比如总共五张表的数据,要写进总表,可按如下方式

def creat_dataframe(source):'''获取每个源表的所有信息到源信息总表'''df = spark.sql("select * from {tablename}".format(tablename=source["table_name"]))if source.get("before_where"):df = df.where(source["before_where"])column_source = OrderedDict(zip(source["column"],source["source"]))columns = ["%s as %s"%(table_file,table_name) for table_name,table_file in column_source.items()]df = df.selectExpr(*columns)if source.get("after_where"):df = df.where(source["after_where"])return dfdef etl_school():columns = ["NAME", "SSQX", "XYMC", "TABLENAME", "table_order"]where = "verify_school(NAME) = 1"data_source = [{"table_name":"tb_gaw_dzxxyryxsjfmxx","column":columns,"source":["DWMC", "SSQX", "''", "'tb_gaw_dzxxyryxsjfmxx'", 5],"after_where":where},{"table_name":"tb_gaw_jw_xxjzygxx","column":columns,"source":["DWMC", "SSQX", "''", "'tb_gaw_jw_xxjzygxx'", 1],"after_where":where},{"table_name":"tb_gaw_jw_zxyjs_new","column":columns,"source":["XXMC", "''", "FYMC", "'tb_gaw_jw_zxyjs_new'", 2],"after_where":where},{"table_name":"tb_gaw_jw_zxdxs_new","column":columns,"source":["XXMC", "''", "FYMC", "'tb_gaw_jw_zxdxs_new'", 3],"after_where":where}]dfs = map(creat_dataframe, data_source)df_union = reduce(lambda x, y: x.union(y), dfs)df = df_union.selectExpr("*", "row_number() over (partition by NAME,XYMC order by table_order asc) as num")df_tmp = df.where("num=1").drop("num")df.tmp = df_tmp.where("NAME !=  XYMC")write_parquet(df_tmp, "tmp/etl_school")logger.info("save to /tmp/etl_school success!!")

20、jg的节点表,需要建索引的话,可按如下方式

index_list = ['jid']
def deal_vertex(tablename):logger.info('dealing %s'%tablename)source_table_info = vertex_table_info[tablename]zd = ','.join(source_table_info)table_index = get_table_index(tablename)sql = 'select %s from bbd.%s'%(zd,tablename)df = spark.sql(sql)def map_rdd(data):ret = []row,index = data#因为最后要入hbase库,这里前面加了随机数,预防热点jid = int(str(random.randint(1,9)) + str(index + table_index))ret.append(jid)row = list(row)for item in row:ret.append(item)return tuple(ret)rdd = df.rdd.zipWithIndex().map(map_rdd)new_schema = index_list + source_table_infores = spark.createDataFrame(rdd,new_schema)res.write.mode('overwrite').saveAsTable('bbd.%s_jg'%tablename)logger.info('bbd.%s_jg   down'%tablename)

工作中 pyspark的小知识点相关推荐

  1. Android开发中的一些小知识点记录(101-120)

    编写不易,如有转载,请声明出处:http://blog.csdn.net/zxc514257857/article/details/118554522 Android开发中的一些小知识点记录(1-20 ...

  2. 工作中涉及运维知识点的汇总

    对工作中常见运维知识点的一个简单汇总 0)设置阿里云pip源,加速pip更新速度 mkdir ~/.pip #创建文件夹 vi ~/.pip/pip.conf #添加如下内容 [global] ind ...

  3. 工作中遇到的小问题的记录

    工作中遇到的小问题的记录 ①Android:open failed: EEXIST  (file exists) 现象: ⑴在实现一个SD卡中的文件从一个路径拷贝到另一个路径的功能时,需要测试当SD卡 ...

  4. 工作中使用的sql知识点及查询语法

    文章目录 只修改字段属性时用modify 设置默认值 删除默认值 修改表名 修改字段位置 查看创建表的语句 复制其他表的数据并插入到该表中 完全复制表以及内容 判断内容并显示 :IF (xxx=xxx ...

  5. 21 Qt中ui设计中的一些小知识点

    记录一下小细节,方便随时查看 Qt小知识点 1 改变布局中组件大小 2 QPixMap 加载显示图片 3 QTableWidget 常见用法 4 frame中嵌套Layout 1 改变布局中组件大小 ...

  6. 测试工作中常用在线小工具-初级篇

    背景 测试过程中经常需要用到一些工具来校验数据的正确性,并且可以帮助测试人员更好的定位问题,所以我总结了我这个初级测试小白日常测试使用的小工具~ 1.在线json转换: http://www.bejs ...

  7. python中的一些小知识点

    本篇博客将会讲述python中存在的一些小知识点. 一.小知识点 1.for-else结构(即for和else不同级)如图: 知识:当迭代的对象迭代完并为空时,位于else的子句将执行,而如果在for ...

  8. 工作中遇到的小技巧 一(暂停更新)

    原来只是在网易博客里整理了一些学习中遇到的小技巧,现在开始在这里慢慢积累希望对大家有所帮助. 1.浏览器技巧: 如果浏览器的缓存已满,服务速度可能会减慢,您可能无法下载和查看附件.一下是在Intern ...

  9. Mybatis中重要的小知识点

    原文链接:http://www.jianshu.com/p/f3b4ff3314fb PrepareStatement和Statement的区别 1.PreparedStatement是预先编译的语句 ...

  10. Makefile中的一些小知识点,及常用的makefile举例

    一.gcc -s(注意是小写的s)命令是什么意思?为什么用此命令编译后的程序比用优化后的程序还小? 这个参数会把符号表从最终的可执行文件中删除.没有符号表,你就不能用gdb调试了,常见的用法是: 比方 ...

最新文章

  1. win8 explorer 进程频繁奔溃的原因及处理
  2. android学习笔记九——RatingBar
  3. oracle最佳环境,创建最适合的Oracle运行环境
  4. Java使用线程并发库模拟弹夹装弹以及发射子弹的过程
  5. python简单应用题_Python简单应用题
  6. Jmeter-基础篇
  7. Linux下添加字体(QT可用)
  8. 超300程序员受益入职的面试经验
  9. 21天让你成为Horizon View高手—Day11:手动池的创建
  10. FIFO、LRU、LFU的含义和原理(转)
  11. 我不要你死于一事无成
  12. grasshopper for rhino 6下载_Grasshopper做分形图案
  13. Windows虚拟机忘记操作系统密码
  14. acdsee怎么改图片大小|acdsee怎么用
  15. 分享33个超棒的海洋地貌风光图片
  16. windows防火墙自动开启的原因
  17. 这5个显示器选购技巧,把显示器讲明白了
  18. iMeta | 南科大夏雨组纳米孔测序揭示微生物可减轻高海拔冻土温室气体排放
  19. 真正的理解setup time/hold time
  20. python audioread 音频处理

热门文章

  1. 将整数翻译成英文(C++)
  2. 人体内部可视化系统市场深度研究分析报告
  3. 全球及中国3D打印人体器官行业发展态势及前景策略分析报告2022-2028年
  4. url采集工具_2022年1月6日更新:关键词URL采集工具最新版
  5. 对应的服务器证书无效。控制台输入 showRequestInfo() 可以获取更详细信息
  6. 欧拉函数与积性函数(互质数)
  7. Theos(二):NIC(New Instance Creator)
  8. 量化股票交易接口如何一键执行委托下单?
  9. 哔哩哔哩视频下载神器
  10. ERROR: child process failed, exited with error number 1