正如注释中所讨论的,基本思想是对数据进行适当的分区,以便具有相同LNAME+Address的记录保持在同一个分区中,运行Python代码在每个分区上生成单独的idx,然后将它们合并到最终的id。在

注意:我在示例记录中添加了一些新行,请参见下面显示的df_new.show()的结果。在from pyspark.sql import Window, Row

from pyspark.sql.functions import coalesce, sum as fsum, col, max as fmax, lit, broadcast

# ...skip code to initialize the dataframe

# tweak the number of repartitioning N based on actual data size

N = 5

# Python function to iterate through the sorted list of elements in the same

# partition and assign an in-partition idx based on Address and LNAME.

def func(partition_id, it):

idx, lname, address = (1, None, None)

for row in sorted(it, key=lambda x: (x.LNAME, x.Address)):

if lname and (row.LNAME != lname or row.Address != address): idx += 1

yield Row(partition_id=partition_id, idx=idx, **row.asDict())

lname = row.LNAME

address = row.Address

# Repartition based on 'LNAME' and 'Address' and then run mapPartitionsWithIndex()

# function to create in-partition idx. Adjust N so that records in each partition

# should be small enough to be loaded into the executor memory:

df1 = df.repartition(N, 'LNAME', 'Address') \

.rdd.mapPartitionsWithIndex(func) \

.toDF()

获取唯一行数cnt(基于Address+LNAME),即max_idx,然后获取该rcnt的运行和。在

^{pr2}$

将df1与df2连接并创建最终的id idx + rcntdf_new = df1.join(broadcast(df2), on=['partition_id']).withColumn('id', col('idx')+col('rcnt'))

df_new.show()

#+ + -+ -+ + -+ + + -+ -+ + -+ + -+

#|partition_id|Address| D| DOB|FNAME|GENDER| LNAME|MNAME|idx|snapshot|cnt|rcnt| id|

#+ + -+ -+ + -+ + + -+ -+ + -+ + -+

#| 0| B| 0|1990|David| M| Lee| H M| 1|201211.0| 3| 0| 1|

#| 0| J| 3|1991|David| M| Lee| HM| 2|201211.0| 3| 0| 2|

#| 0| D| 6|2000| Marc| M|Robert| MS| 3|201211.0| 3| 0| 3|

#| 1| C| 3|2000| Marc| M|Robert| H| 1|201211.0| 1| 3| 4|

#| 1| C| 6|1988| Marc| M|Robert| M| 1|201211.0| 1| 3| 4|

#| 2| J| 6|1991| 66M| F| Rek| null| 1|201211.0| 1| 4| 5|

#| 2| J| 6|1992| 66M| F| Rek| null| 1|201211.0| 1| 4| 5|

#| 4| J| 2|1995| 66M| F| Rock| J| 1|201211.0| 1| 5| 6|

#| 4| J| 6|1990| 66M| F| Rock| null| 1|201211.0| 1| 5| 6|

#| 4| J| 6|1990| 66M| F| Rock| null| 1|201211.0| 1| 5| 6|

#+ + -+ -+ + -+ + + -+ -+ + -+ + -+

df_new = df_new.drop('partition_id', 'idx', 'rcnt', 'cnt')

注意事项:实际上,在将列LNAME和Address用作唯一性检查之前,需要清除/规范化列LNAME。例如,使用一个单独的列uniq_key,它组合了LNAME和Address作为数据帧的唯一键。下面是一些基本数据清理过程的示例:from pyspark.sql.functions import coalesce, lit, concat_ws, upper, regexp_replace, trim

#(1) convert NULL to '': coalesce(col, '')

#(2) concatenate LNAME and Address using NULL char '\x00' or '\0'

#(3) convert to uppercase: upper(text)

#(4) remove all non-[word/whitespace/NULL_char]: regexp_replace(text, r'[^\x00\w\s]', '')

#(5) convert consecutive whitespaces to a SPACE: regexp_replace(text, r'\s+', ' ')

#(6) trim leading/trailing spaces: trim(text)

df = (df.withColumn('uniq_key',

trim(

regexp_replace(

regexp_replace(

upper(

concat_ws('\0', coalesce('LNAME', lit('')), coalesce('Address', lit('')))

),

r'[^\x00\s\w]+',

''

),

r'\s+',

' '

)

)

))

然后在代码中,将'LNAME'和{}替换为uniq_key,以找到idx

正如cronoik在注释中提到的,您也可以尝试使用一个窗口等级函数来计算分区内的idx。例如:from pyspark.sql.functions import spark_partition_id, dense_rank

# use dense_rank to calculate the in-partition idx

w2 = Window.partitionBy('partition_id').orderBy('LNAME', 'Address')

df1 = df.repartition(N, 'LNAME', 'Address') \

.withColumn('partition_id', spark_partition_id()) \

.withColumn('idx', dense_rank().over(w2))

当你有了df1之后,用同样的方法来计算df2和df_new。这应该比使用mapPartitionsWithIndex()更快,后者基本上是一种基于RDD的方法。

对于实际数据,请调整N以适合您的实际数据大小。这个N只影响初始分区,在dataframe连接之后,分区将被重置为默认值(200)。您可以使用spark.sql.shuffle.partitions来调整此值,例如在初始化spark会话时:spark = SparkSession.builder \

....

.config("spark.sql.shuffle.partitions", 500) \

.getOrCreate()

python怎样编写姓名、职业、地址_如何根据姓名、地址识别人际关系,然后通过linux comman或Pysp分配相同的ID...相关推荐

  1. win10无法修改mac地址_路由器无线MAC地址过滤如何设置

    使用MAC地址过滤功能,您可以添加MAC地址过滤规则,设置指定MAC地址访问互联网的权限,包括"允许访问互联网"(白名单)."禁止访问互联网"(黑名单),想要禁 ...

  2. win10无法修改mac地址_一款MAC地址修改软件WiFiSpoof for Mac激活版

    哪里有苹果WiFi地址修改工具?WiFiSpoof mac是一款Mac系统网络地址修改软件,日常网络应用中,由于各种原因计算机的Mac地址有时需要修改,如网卡地址被屏蔽等,需要更换新的Mac地址,Wi ...

  3. 计算机loopback地址怎么查,loopback地址_什么是loopback地址_loopback命令

    loopback地址 原帖由 hyatom 于 2005-11-26 21:48 发表 如果在二层交换上配置了一个10.250.186.87 255.255.255.0的loopback地址,而这个地 ...

  4. python可以用来编写计算机网络程序吗_计算机网络(基于python做的笔记 )

    计算机网络(UDP 和 TCP) 概述 为了让在不同的电脑上运行的软件,之间能够互相传递数据,就需要借助网络的功能 使用网络能够把多方链接在一起,然后可以进行数据传递 所谓的网络编程就是,让在不同的电 ...

  5. python 虚拟mac地址_随机生成MAC地址的N种方法

    进期准备在virt cli管理下的KVM增加一个模板脚本,用于主机的快速安装与部署.这里就涉及到mac地址的自动生成.uuid/Guid值的生成.主机名的自动更改等问题.本文着重介绍下N种自动生成MA ...

  6. python语言编写的modbus协议_基于Python的ModbusTCP客户端实现

    Modbus协议是由Modicon公司(现在的施耐德电气Schneider Electric)推出,主要建立在物理串口.以太网TCP/IP层之上,目前已经成为工业领域通信协议的业界标准,广泛应用在工业 ...

  7. python 设置ip地址_无法设置IP地址

    我不熟悉Python和Mininet.我一直在尝试用mininet模拟网络拓扑.我试图为网络中的每台主机分配IP地址,但是我得到一个属性错误. 下面是我的代码import sys from minin ...

  8. python可以用来编写计算机网络程序吗_不必熟悉python或R编程语言,6步执行计算机视觉应用程序...

    全文共1260字,预计学习时长7分钟 图源:morish 很多人都能运行操作计算机视觉应用程序.是的,学习并执行它并不难,现在有很多库可以用来执行如此强大的计算机视觉应用程序. 你有没有关注最近有没有 ...

  9. python怎么编写口算题_来出口算题—— Python编程

    原标题:来出口算题-- Python编程 今天,小编带大家做一个实用的小程序,也就是计算题生成器. 低年级的小朋友,最需要提高的就是计算能力,而计算能力的提升离不开有效的练习,而手动出题总会有局限性, ...

最新文章

  1. 6421B Lab3 DNS的配置与故障排除
  2. Partition分区及实例
  3. CV报错:CAP_IMAGES: can‘t find starting number (in the name of file): x in function ‘icvExtractPattern‘
  4. K8S集群搭建:虚拟机克隆
  5. js oop写法小例子
  6. 如何转obj_Java 开发中如何正确的踩坑,看完这个你可以避免50%的错误
  7. 推断(inference)、贝叶斯规则(Bayes's rule)与导出分布(derived distribution)
  8. Discuz X1.5 X2.5 X3 UC_KEY Getshell Write PHPCODE into config/config_ucenter.php Via /api/uc.php Vul
  9. [转载] python中sort,sorted,reverse,reversed的区别
  10. 3.9 Spark 键值对RDD编程
  11. 计算机网络分为点到点网络和,计算机网络-判断题
  12. 分布式事务之两阶段提交
  13. PYTHON利用REMOVEBG库实现抠图
  14. U盘量产后USB鼠标和键盘都无法使用,如何解决?
  15. Numpy读取csv文件
  16. Docker提交天池比赛流程
  17. Python异常处理:ImportError: cannot import name 'XXX' from 'XXXX'
  18. LWIP协议与TCP/IP
  19. 某意大利小哥,竟靠一个缓存中间件直接封神?
  20. 在一段英文字母中找出每个字母重复数量的方法(Java)

热门文章

  1. 想要应急稳妥过稿电商海报,这组素材少不了!
  2. 设计素材模板丨极简风简历模板
  3. c++网络编程连接成功后回调onconnected_谈谈网络编程(基于C++)
  4. python设置文件权限_Python os.chmod() 方法
  5. Linux mcheck机制检测内存溢出、内存越界
  6. 计算机主要是以划分发展阶段的,计算机以什么划分发展阶段
  7. java se环境变量_Windows 7中配置JDK(Java SE)环境变量
  8. python socket 主动断开_python之使用ctrl+c断开多线程(TcpSocketServer连接)出现端口占用的情况...
  9. python vs golang_Ruby vs Golang:四个维度对比,谁更胜一筹?
  10. python中的is_python中的is