我和这位哥简直一摸一样,来自https://zhuanlan.zhihu.com/p/142186760

在默认设置下,Pandas只使用单个CPU内核,对于稍大一些的数据,用Pandas来处理,通常会显得比较慢。

学习目标:

Dask、Vaex、Modin、Cupy、Ray、Mars、Cpython、swifter 、pandarallel 、Polars

额,笔记写得很杂,主要是给自己看

pandas 读取csv文件

import time
import pandas as  pd
s = time.time()
df  = pd.read_csv('train.csv')
e = time.time()
print("Pandas Loading Time = {}".format(e-s))

在读数据时候,可以指定列类型,减小内存占用

df = pd.read_csv('train.csv', nrows=1000,dtype={'x1': 'int32','x2': 'int16','x3': 'int16','x4': 'int16','x5': 'int16','x6': 'int8'})

只读需要的列

df = pd.read_csv('train.csv', usecols=['x1', 'x3', 'x6'])

面对大量数据,也可以使用 read_csv 中的 chunksize 参数,分块读取来提高速度

利用chunksize参数,可以为指定的数据集创建分块读取IO流,每次最多读取设定的chunksize行数据,这样就可以把针对整个数据集的任务拆分为一个一个小任务最后再汇总结果:

def read_single_csv(input_path):'''读入数据'''import timeprint("开始处理...")start = time.time()df_chunk=pd.read_csv(input_path,chunksize=1000000,encoding='utf-8')res_chunk=[]for chunk in df_chunk:res_chunk.append(chunk)res_df=pd.concat(res_chunk)end = time.time()shi = end - startprint("已完成!总耗时%s秒!" % shi)print("*"*50)print(res_df.shape)return res_df

或者

读一百万行写入新的文件,可以用readline,一次读取一行,边读边写

with open('/path/to/input') as fi, open('/path/to/output/' as fo: for i in xrange(1000000): chunk_data = fi.readline() if not chunk_data: break fo.write(content)链接:https://www.zhihu.com/question/56153676/answer/147882741

查看内存函数

def memory():import psutilmem = psutil.virtual_memory()zj = float(mem.total) / 1024 / 1024 / 1024ysy = float(mem.used) / 1024 / 1024 / 1024kx = float(mem.free) / 1024 / 1024 / 1024print('Total system memory:%d.3GB' % zj)print('The system has used memory:%d.3GB' % ysy)print('System free memory:%d.3GB' % kx)
memory()
from tqdm.notebook import tqdm
# 在降低数据精度及筛选指定列的情况下,以1千万行为块大小
df = pd.read_csv('train.csv', dtype={'x1': 'int32','x3': 'int16','x6': 'int16'},usecols=['x1', 'x3', 'x6'],chunksize=10000000)
# 从df中循环提取每个块并进行分组聚合,最后再汇总结果
result = pd.concat([chunk for chunk in tqdm(df)])

批量读取

边读边存

import csv
import pandas as pd
import numpy as np
data1 = pd.DataFrame()
for i in range(6):print(f'The {i+1} file is executing')try:path = '/dev/shm/data_2021_{}.dat'.format(i)da_li = []with open(path,mode='rt',encoding='utf8' ) as f:reader = csv.reader(f)head_row = next(reader)for item in reader:da_li.append(item[0].split('€€'))dat_1 = pd.DataFrame(np.array(da_li))data1 = pd.concat([data1,dat_1],axis=0)print('ok',data1.shape)print(f'The {i+1} file save success')print()except Exception:print(f'{i+1} file execution error')

dask

官网

https://docs.dask.org/en/latest/

Dask是一个并行计算库,能在集群中进行分布式计算,能以一种更方便简洁的方式处理大数据量,与Spark这些大数据处理框架相比较,Dask更轻。

调用时,dask具有延时加载技术,最后加上.compute(),dask才会基于前面搭建好的计算图进行正式的结果运算

.compute() 相当于激活计算图,加上 .compute() 才能达到真正的结果。

import dask.dataframe as dd
df = dd.read_csv('csv_files/*.csv')
df.head()
df.info(memory_usage='deep')
quantile = df.col1.quantile(0.1).compute() # Dask具有分位数功能,可以计算实际分位数,而不是近似值。
df['col1_binary'] = df.col1 > df.col1.quantile(0.1)
df = df[(df.col2 > 10)]
roup_res = df.groupby('col1_binary').col3.mean().compute()
monthly_total = df.groupby(df[‘Date’].dt.month).sum().compute()
plot = df.col3.compute().plot.hist(bins=64, ylim=(13900, 14400))
suma = df.sum().sum().compute()
df[df.col1.between(2, 4)]
df[df['col4'].str.contains('small|medium')]
import numpy
import dask
from dask import array as darray
arr = dask.from_array(numpy.array(my_data), chunks=(1000,))
mean = darray.mean()
stddev = darray.std(arr)
unnormalized_moment = darry.mean(arr * arr * arr)

dask 读取庞大的数据

import dask
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
from numba import jit
import pandas as pd
import numpy as np
import sys
# ----------------------------------------------------------------------------
switchDict = {0 : 'TEST',1 : 'ALL'
}# 编译数据量状态开关 0为测试(读部分数据),1为全量
status = switchDict[1]
@jit
def importData(fileName):if status == 'TEST':df = dd.read_csv(fileName, header=None, blocksize="100MB").head(17000)else:df = dd.read_csv(fileName,  blocksize="64MB").compute()df.index = pd.RangeIndex(start=0, stop=len(df))return df# 读正样本
with ProgressBar():data = importData('train.csv')print(f"当前数据框占用内存大小:{sys.getsizeof(data)/1024/1024:.2f}M")
data.shape
data.memory_usage(deep=True)

把数据读取出来以后,对内存进行优化,可以大幅提高数据处理效率

def reduce_mem_usage(df):'''内存优化   数据精度量化压缩'''# 处理前 数据集总内存计算start_mem = df.memory_usage().sum() / 1024**2 print('Memory usage of dataframe is {:.2f} MB'.format(start_mem))# 遍历特征列for col in df.columns:# 当前特征类型col_type = df[col].dtype# 处理 numeric 型数据if col_type != object:c_min = df[col].min()  # 最小值c_max = df[col].max()  # 最大值# int 型数据 精度转换if str(col_type)[:3] == 'int':if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:df[col] = df[col].astype(np.int8)elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:df[col] = df[col].astype(np.int16)elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:df[col] = df[col].astype(np.int32)elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:df[col] = df[col].astype(np.int64)  # float 型数据 精度转换else:if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:df[col] = df[col].astype(np.float16)elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:df[col] = df[col].astype(np.float32)else:df[col] = df[col].astype(np.float64)# 处理 object 型数据else:df[col] = df[col].astype('category')  # object 转 category# 处理后 数据集总内存计算end_mem = df.memory_usage().sum() / 1024**2 print('Memory usage after optimization is: {:.2f} MB'.format(end_mem))print('Decreased by {:.1f}%'.format(100 * (start_mem - end_mem) / start_mem))print('=========================================================')print(df.info(verbose=True))return df

参考:https://zhuanlan.zhihu.com/p/137292923

当读取批量数据时,可能会使用glob包,这个包将一次处理多个csv文件。可以使用data/*. CSV模式来获取data文件夹中的所有csv文件。

Pandas没有本地的glob支持,因此我们需要循环读取文件。

import glob
all_files = glob.glob('data/*.csv')
dfs = []
for fname in all_files:dfs.append(pd.read_csv(fname, parse_dates=['Date']))
df = pd.concat(dfs, axis=0)
dfsum = df.groupby(df['Date'].dt.year).sum()

dask 可以通过将数据分成块并指定任务链来处理不适合内存的数据,并且 dask 接受read_csv()函数的glob模式,这意味着不必使用循环。在调用compute()函数之前,不会执行任何操作

import dask.dataframe as dd
df = dd.read_csv(‘data/*.csv’, parse_dates=[‘Date’])
dfsum = df.groupby(df[‘Date’].dt.year).sum().compute()

建议只对不适合主内存的数据集使用Dask。

modin

modin 的原理:将 DataFrame分割成不同的部分,而每个部分由发送给不同的CPU处理。modin 可以切割DataFrame的横列和纵列,任何形状的DataFrames都能平行处理。

modin 依赖 ray
modin 还是相对比较新的库,还在开发扩展中。所以并不是所有Pandas函数都能在modin 中得以实现。如果想用 modin 来运行一个尚未加速的函数,它还是会默认在Pandas中运行,来保证没有任何代码错误。

import ray
ray.init(num_cpus=4, ignore_reinit_error=True)
# 第一个参数充分利用4核CPU。
# 第二个参数 ignore_reinit_error=True, 忽略重复初始化的 而产生的报错。
import modin
import modin.pandas as  mpd
s = time.time()
df  = mpd.read_csv('train.csv')
e = time.time()
print("Modin Loading Time = {}".format(e-s))

Vaex

Vaex是一个开源的DataFrame库(类似于Pandas),对和你硬盘空间一样大小的表格数据集,它可以有效进行可视化、探索、分析甚至进行实践机器学习。

Vaex 采用内存映射、高效的核外算法和延迟计算等概念

Vaex要求将CSV转换为HDF5格式,才能看到Vaex的优点。

HDF5是一种全新的分层数据格式产品,由数据格式规范和支持库实现组成。
HDF5旨在解决较旧的HDF产品的一些限制,满足现代系统和应用需求。
HDF5文件以分层结构组织,其中包含两个主要结构:组和数据集。
HDF5 group:分组结构包含零个或多个组或数据集的实例,以及支持元数据(metadata)。
HDF5 dataset:数据元素的多维数组,以及支持元数据。

import glob
import vaex# csv_files = glob.glob('csv_files/*.csv')
csv_files = glob.glob('train.csv')
for i, csv_file in enumerate(csv_files, 1):for j, dv in enumerate(vaex.from_csv(csv_file, convert=True, chunk_size=5_000_000), 1):print('Exporting %d %s to hdf5 part %d' % (i, csv_file, j))dv.export_hdf5(f'hdf5_files/analysis_{i:02}_{j:02}.hdf5')
dv = vaex.open('hdf5_files/*.hdf5')

Vaex实际上并没有读取文件,因为延迟加载。

quantile = dv.percentile_approx('col1', 10)

Vaex具有虚拟列的概念,在添加新列时创建一个虚拟列,虚拟列的处理方式与普通列相同,但是它们不占用内存。Vaex只记得定义它们的表达式,而不预先计算值。这些列仅在必要时才被延迟计算,从而保持较低的内存使用率。

dv['col1_plus_col2'] = dv.col1 + dv.col2
dv['col1_binary'] = dv.col1> dv.percentile_approx('col1',10)

CuPy

CuPy 是一个借助 CUDA GPU 库在英伟达 GPU 上实现 Numpy 数组的库。

只要用兼容的 CuPy 代码替换 Numpy 代码,用户就可以实现 GPU 加速。

Swifter

import pandas as pd
import swifterdf.swifter.apply(lambda x: x.sum() - x.min())

Mars

基于张量的大规模数据计算的统一框架,即使在单块CPU的情况下,它的矩阵运算速度也比NumPy(MKL)快

pandarallel

Pandarallel 的想法是将pandas计算分布在计算机上所有可用的CPU上,以显着提高速度。

拐求

暂时不支持windows

https://zhuanlan.zhihu.com/p/65647604

Polars

Polars使用语法和Pandas差不多,处理数据的速度却比Pandas快了不少

安装

pip  install  -i  https://pypi.doubanio.com/simple/  --trusted-host pypi.doubanio.com  polars

读取数据

import time
import polars as pl
s = time.time()
df = pl.read_csv('train.csv')
e = time.time()
print("polars Loading Time = {}".format(e-s))

Cpython

Python 中处理大型数据工具(dask)相关推荐

  1. Python中常用的数据分析工具(模块)有哪些?

    本期Python培训分享:Python中常用的数据分析工具(模块)有哪些?Python本身的数据分析功能并不强,需要安装一些第三方的扩展库来增强它的能力.我们课程用到的库包括NumPy.Pandas. ...

  2. 什么是数据标准化?在Python中如何进行数据标准化?「必学」

    转载:https://www.toutiao.com/i6644145067256709645/?tt_from=weixin&utm_campaign=client_share&wx ...

  3. python csv库,Python 中导入csv数据的三种方法

    Python 中导入csv数据的三种方法,具体内容如下所示: 1.通过标准的Python库导入CSV文件: Python提供了一个标准的类库CSV文件.这个类库中的reader()函数用来导入CSV文 ...

  4. python类型转换-Python中如何进行数据类型转换?

    原标题:Python中如何进行数据类型转换? 这一次要讲的是Python中的数据类型转换,Python中的数据类型转换是什么?就是将数据由当前类型变化为其他类型的操作就是数据类型转换.数据类型转换分为 ...

  5. 【机器学习基础】如何在Python中处理不平衡数据

    特征锦囊:如何在Python中处理不平衡数据 ???? Index 1.到底什么是不平衡数据 2.处理不平衡数据的理论方法 3.Python里有什么包可以处理不平衡样本 4.Python中具体如何处理 ...

  6. linux中python如何调用matlab的数据_特征锦囊:如何在Python中处理不平衡数据

    今日锦囊 特征锦囊:如何在Python中处理不平衡数据 ? Index 1.到底什么是不平衡数据 2.处理不平衡数据的理论方法 3.Python里有什么包可以处理不平衡样本 4.Python中具体如何 ...

  7. python金融数据怎么获取_class类怎样在python中获取金融数据?

    我们搜集金融数据,通常想要的是利用爬虫的方法.其实我们最近所学的class不仅可以进行类调用,在获取数据方面同样是可行的,很多小伙伴都比较关注理财方面的情况,对金融数据的需要也是比较多的.下面就cla ...

  8. python中pandas的数据输出显示设置

    python中pandas的数据输出显示设置1 pandas数据分析时经常需要打印输出数据,当数据量大时,输出的展示设置非常重要,好的展示可以帮助更好地理解数据. pandas相关的显示设置函数主要有 ...

  9. python如何收集数据的方法有哪些_class类在python中获取金融数据的实例方法

    我们搜集金融数据,通常想要的是利用爬虫的方法.其实我们最近所学的class不仅可以进行类调用,在获取数据方面同样是可行的,很多小伙伴都比较关注理财方面的情况,对金融数据的需要也是比较多的.下面就cla ...

最新文章

  1. C语言标量变向量的函数,GLSL 详解(基础篇)
  2. pat 乙级 1047 编程团体赛(C++)
  3. 令人眼睛一亮的履历表
  4. Python高级知识点学习(九)
  5. 安卓“新皇”来了!华为Mate 40确定10月22日发布
  6. python 给定n,返回n以内的斐波那契数列
  7. Windows下GDAL3.1.2编译 (VS2015)
  8. 抖音数据统计_抖音大数据,抖音最全数据分析工具,全知道算你厉害!
  9. 地学计算方法/地统计学(5第五章 空间插值与克里格法)
  10. opencv读取16位色深图片
  11. java swing漂亮界面框架_开源软件分享-漂亮的JavaFx GUI界面框架
  12. 简单解决Edge浏览器被sb360篡改的方法
  13. csirs参考信号_一种信道状态信息参考信号CSI-RS的发送方法、装置及基站_2015109520063_说明书_专利查询_专利网_钻瓜专利网...
  14. Java:Excel写入“合并单元格“
  15. 前端学习——这十本书一定要看
  16. CF 1562 C. Rings (思维+模拟)
  17. 洛谷P1605 迷宫 纯C语言题解
  18. 《Python3》读书笔记(上)
  19. 2022年G2电站锅炉司炉操作证考试题库及模拟考试
  20. 热血江湖服务器位置,热血江湖服务器地理位置

热门文章

  1. 2021.11.22-11.28 AI行业周刊(第73期):工作的需求
  2. CVPR2018资源汇总-
  3. 挖矿病毒之CoinMiner入侵SQLServer
  4. GitKraken 7.5.1 无法连接GitHub和GitLab
  5. 十种技术思维(好文转发分享)
  6. Edge浏览器主页锁定360解决方法
  7. Linux服务器分区的方案
  8. Microsoft Visio 2013 设计E-R图和数据库模型图
  9. PLSQL显示中文乱码
  10. 【NIO】Selector(选择器)