并行是提高大数据处理和分析速度的关键,不管是CPU并行还是GPU并行,核心的观念就是将大数据变为小的分块,然后让计算机操作系统调配可用资源进行处理。使用C,C++等语言可以调用openmpi,CPP-taskflow以及Interl TBB等框架管理执行并行程序,但是,程序执行的效率取决于对底层数据分块策略以及需要仔细考虑网络传输成本,并行程序的效率完全取决于程序员的天赋和水平。在这里,暂且不讨论GPU这一块,因为在后续我会专门进行讨论。对于Java的忠实粉来讲,使用Spark框架无疑会极大提高速度,只可惜,如果手头上只有一台好点的机器,且老板又在追成果时,就会比较难过了。因为配置Spark集群也是需要超多耐心去配置参数的,而且虚拟机本身负载也是很大的。对于使用python进行时空数据分析和实验时,dask框架无疑带来了极大的方便,可以不考虑那么多底层并行细节。考虑到大家有很多都会使用python的Multithreading以及Multiprocessing模块,例如,使用以下python文件

from multiprocessingimport Pool

pool = Pool(N) #这里的N表示开启多少个进程

area_list= pool.map(func, parameters)#func表示每个进程都会执行的函数

pool.close() #上面的parameters表示向函数传递的参数

pool.join() #最后我们等待每一个线程执行完成并回收进程所用资源

但是,同样这种底层的操作,让我们无法完全利用可用的计算资源,例如,我手头上有一个 E5-2699 v4 CPU,22核心44线程,如果只开2个进程,那这台几乎等于在休眠了。

而python dask提供了动态资源调度能力以及“大”集合支持(使用分布式内存缓存,例如矩阵、列表以及数据帧dataframes)。和Spark弹性分布式数据集类似, python依靠这些对象,生成有向无环图描述复杂算法的并行逻辑。在这里,首先介绍Dask Array, Dask Bag, Dask Dataframe, Dask Dealyed以及Dask Futhure对象之后,用一个具体的例子说明,选择使用Dask Bag做影像矢量化操作的并行化方法。最后,总结下全文核心部分。

Dask Array对于执行幂等操作的大型矩阵数据特别适用,它首先会按照Chunk分块策略将矩阵分成小块,然后会将这些分块矩阵操作转换为互相关任务图模型,用数据之间的依赖关系,用多线程执行图模型中分块矩阵操作。说它对于幂等操作特别适用,是因为,各分块矩阵独立计算的结果,彼此不会根据另外分块计算的结果改变而改变。例如,对矩阵中所有元素进行加操作,分块矩阵的计算结果就与其它分块计算的结果没有关系。在Dask Array上执行并行操作,程序的运行效率与分块策略相关。官方给出的五点建议是:

1.分块的数据大小必须足够小,能够被装载到内存。

2.分块的数据大小必须足够大,对每一分块的计算操作要大于任务调度的时间开销(1ms)

3.分块的大小在10MB-1GB之间是合适和常见的,只要不超过内存大小限制并考虑计算时延

4.对多个矩阵进行操作时,矩阵分块的大小尽量一致

5 当存储和导入数据时,分块大小尽量和特定存储约束保持一致

Dask Dataframe对象则 在处理远大于当前主机内存的表格数据有用。与传统pandas Dataframe在加载完成所有数据后继续数据类型推断不同Dask Datadrame支持部分加载数据时,对表格数据类型进行推断。Dask Dataframe实现了分块并行Dataframe, 对Dask Dataframe的操作将被映射到按索引列划分的子Dataframe上,例如:可以使用DepDelay延迟执行操作

maxes = []

for fn in filenames:

df = pd.read_csv(fn)

maxes.append(df.DepDelay.max())

和pandas Datafeames不同的地方还在于延迟执行操作,常用的方法包括map_paritions,map_overlap以及reduction方法.map_paritions对每一分块施加并计算操作,map_overlap和Spark windows函数类似,对于处理不同分块相邻近的行数据进行操作比较有用。reduction则代表对分块进行聚合的操作函数。

Dask future和Dask bag以及Dask delayed密切相关,它代表在分布式环境下执行的某个操作将在未来某个时间返回,例如:

from dask.distributed import Client

client = Client() # start local workers as processes

def inc(x):

return x + 1

a = client.submit(inc, 10) # calls inc(10) in background thread or process

a.result() # blocks until task completes and data arrives

在调用result之前,这里的a实际上就是一个Future对象。

在实际工作中,用到最多的还是Dask Dealyed以及Dask bag两个对象,这也是本次要介绍的重点。为方便理解,我们将Delayed对象看成是python装饰器,它的作用是将普通的python函数封装成Dask延迟装载的函数,例如:

import dask

@dask.delayed

def inc(x):

return x + 1

@dask.delayed

def double(x):

return x * 2

@dask.delayed

def add(x, y):

return x + y

data = [1, 2, 3, 4, 5]

output = []

for x in data:

a = inc(x)

b = double(x)

c = add(a, b)

output.append(c)

但这种直接装饰器假比较麻烦,假设我们要为所有感新区的湖泊水库计算面积,我们可以直接使用dask.delayed对象初始化要执行的python函数。

client = Client(threads_per_worker=10, n_workers=2)

print(client)

lazy_results = [] #记录延迟结果

for specify_which_reservior in lakes:

lazy_result =dask.delayed(calculat_regions)(specify_which_reservior)

lazy_results.append(lazy_result)

futures = dask.persist(*lazy_results) # trigger computation in the background

results = dask.compute(*futures)

Dask Dealyed在分布式环境中注册执行任务,对任务丢的开销通常在几百微秒左右,通常这对于要处理巨型集合的任务来讲是不使用的,例如,我们要处理的得到的是全球湖泊水库的面积,这种任务生成和调度的开销是比较大的。另外一个明智的选择是使用 Dask Bag,它更适合以批的方式执行复杂算法,是类似于Spark RDD分布式数据集的对象,当然,在Dask bag中是允许嵌套Dealyed对象的,本文在后面如何并行矢量化栅格数据集。

栅格数据例如遥感影像通常都比较巨大,生硬地调用gdal_polygonize.py会造成未知的处理延时,特别是当我们需要矢量化的影像数据特别多时,因此,比较合理的做法是先将这些影像转换为小的分块,然后在单独矢量化,最后通过ogr2ogr合并这些矢量:我们使用了清华大学宫鹏团队发布的全球水体数据集,期望通过这些数据集,以及全球水库湖泊本底矢量获取其中大型湖泊水库的逐日面积数据。

# -*- coding: GBK -*-

import os

import geopandas as gpd

import earthpy as et

import os

import numpy as np

import rasterio as rio

from rasterio.mask import mask

from rasterio.plot import show

import pandas as pd

# Package created for the earth analytics program

import earthpy as et

import earthpy.spatial as es

from rasterio.warp import calculate_default_transform, reproject, Resampling

from fiona.crs import from_epsg,from_string

from utils import *

from fnmatch import fnmatchcase as match

import pyproj

from functools import partial

from multiprocessing import Pool

from functools import partial

from osgeo import ogr

from osgeo import osr

from area import area

from shapely.geometry import box

from fiona.crs import from_epsg,from_string

from utils import *

from fnmatch import fnmatchcase as match

import pyproj

from functools import partial

import warnings

from gdalconst import *

import dask

from dask.distributed import Client, progress

import dask.bag as db

import glob

import shutil

from pathlib import Path

warnings.simplefilter('ignore')

#输入一个reservior,输出一个列表

def calculat_regions(specify_which_reservior):

utility = utils()

specify_year=2008

df = gpd.read_file(r"./shapefilegallery/Export_Output.shp")

df.crs = {'init': 'epsg:4326', 'no_defs': True}

#df = df.to_crs('PROJCS["unnamed",GEOGCS["Unknown datum based upon the custom spheroid",DATUM["Not_specified_based_on_custom_spheroid",SPHEROID["Custom spheroid",6371007.181,0]],PRIMEM["Greenwich",0],UNIT["degree",0.0174532925199433]],PROJECTION["Sinusoidal"],PARAMETER["longitude_of_center",0],PARAMETER["false_easting",0],PARAMETER["false_northing",0],UNIT["metre",1,AUTHORITY["EPSG","9001"]]]')

df = df.to_crs("EPSG:4326")

modis_bound_data = np.genfromtxt('sn_bound_10deg.txt',

skip_header=7,

skip_footer=3)

modis_tiles = []

for row in modis_bound_data:

bbox_= box(row[2],row[4], row[3], row[5])

bbox_.crs= {'init': 'epsg:4326', 'no_defs': True}

bbox_.crs = from_epsg(4326)

modis_tiles.append(bbox_)

hvmps=[]

for tile in modis_tiles:

intersection = df.loc[df['GLWD_ID']==specify_which_reservior,'geometry'].iloc[0].intersects(tile)

if(intersection !=False):

temppair=retrive_hv(modis_bound_data,tile.bounds)

hvmps.append(temppair)

allfiles=[]

for pair in hvmps:

convertedH = int(pair[0])

convertedV =int(pair[1])

temp_str="h"+ str(convertedH).rjust(2,'0')+"v"+ str(convertedV).rjust(2,'0')

image_path = os.path.join("D:\\globaldatasets"+"\\"+str(specify_year)+"_waterbody",str(specify_year),temp_str)

tmp = utility.printPath(1, image_path)

for i in tmp:

temp_str = os.path.join(image_path, str(i))

allfiles.append(temp_str)

#specify_pattern = str(specify_day).rjust(3, '0') + "_water"

#matched_files = [fpth for fpth in allfiles if match(fpth, "*" + specify_pattern + ".tiff")]

return (specify_which_reservior,allfiles)

#我们只需要水体部分的数据

def filter_water(allfiles):

#specify_pattern = "_water"

#specify_day = 1

#specify_pattern = str(specify_day).rjust(3, '0') + "_water"

specify_pattern = "_water"

allfile = allfiles[1]

specify_which_reservior = allfiles[0]

matched_files = [fpth for fpth in allfile if match(fpth, "*" + specify_pattern + ".tiff")]

return (specify_which_reservior, matched_files)

#将单个水体文件切分成64*64的小块,其实在这个地方就可以求它的额面积

def spilt_image_using_gdal(image_file,specify_which_reservior):

print("we going to process the resevior....." + str(specify_which_reservior))

subarrays_files = []

dset = gdal.Open(image_file)

if dset is None:

return subarrays_files

width = dset.RasterXSize

height = dset.RasterYSize

tilesize = 240

#判断存放shp文件夹是否存在,如果不存在则创建

final_output_shp_path = os.path.join(os.getcwd(), "outputshp"+str(specify_which_reservior))

if os.path.isdir(final_output_shp_path)==False:

os.mkdir(final_output_shp_path)

#判断存放tiff的文件夹是否存在,如果不存在则创建

final_output_tiff_path = os.path.join(os.getcwd(),"outputtiff"+str(specify_which_reservior))

if os.path.isdir(final_output_tiff_path)==False:

os.mkdir(final_output_tiff_path)

outputname_prefix = os.path.splitext(os.path.basename(image_file))[0]

filnal_output = os.path.join(final_output_shp_path, outputname_prefix + ".shp")

if(os.path.exists(filnal_output)==True): #有时候程序自然挂了,有点sb的样子

return filnal_output

for i in range(0, width, tilesize):

for j in range(0, height, tilesize):

w = min(i + tilesize, width) - i

h = min(j + tilesize, height) - j

the_file = os.path.join(final_output_tiff_path,outputname_prefix + "_" + str(i) + "_" + str(j) + ".tiff")

#the_file = outputname_prefix + "_" + str(i) + "_" + str(j) + ".tiff"

#如果这个tiff存在,我们就将其转换为shap

if os.path.exists(the_file)==True:

output_name = os.path.join(final_output_shp_path,

outputname_prefix + "_" + str(i) + "_" + str(j) + ".shp")

if os.path.exists(output_name)==False:

command = " gdal_polygonize.py " + the_file + " "

command = command + output_name + " -f \"ESRI Shapefile\""

os.system(command)

subarrays_files.append(output_name)

else: # 如果存在则删除之后,在运行上面的

os.remove(os.path.join(final_output_shp_path, outputname_prefix + "_" + str(i) + "_" + str(j) + ".shp"))

os.remove(os.path.join(final_output_shp_path, outputname_prefix + "_" + str(i) + "_" + str(j) + ".shx"))

os.remove(os.path.join(final_output_shp_path, outputname_prefix + "_" + str(i) + "_" + str(j) + ".dbf"))

os.remove(os.path.join(final_output_shp_path, outputname_prefix + "_" + str(i) + "_" + str(j) + ".prj"))

output_name = os.path.join(final_output_shp_path,

outputname_prefix + "_" + str(i) + "_" + str(j) + ".shp")

command = " gdal_polygonize.py " + the_file + " "

command = command + output_name + " -f \"ESRI Shapefile\""

os.system(command)

subarrays_files.append(output_name)

else: #不存在,则按原定程序运行

gdaltranString = "gdal_translate-of GTiff -srcwin " + str(i) + ", " + str(j) + ", " + str(w) + ", " \

+ str(h) + " " + image_file + " " + the_file

os.system(gdaltranString)

# 再转换之后,开始对其进行矢量化,并删除对应的tif

output_name = os.path.join(final_output_shp_path,

outputname_prefix + "_" + str(i) + "_" + str(j) + ".shp")

command = " gdal_polygonize.py " + the_file + " "

command = command + output_name + " -f \"ESRI Shapefile\""

os.system(command)

try:

os.remove(the_file)

except FileNotFoundError as e:

print("cannot remove unfound file ")

subarrays_files.append(output_name)

#filnal_output = os.path.join(final_output_shp_path, outputname_prefix+".shp")

filnal_output = os.path.join(final_output_shp_path, outputname_prefix + ".shp")

for the_subarray in subarrays_files: #the_subarray 表示的是经过转换为shp的子文件

command = "ogr2ogr" + " -f \"ESRI Shapefile\"" + " -update -append " + filnal_output + " "

command = command + the_subarray

os.system(command)

#最后我们删除所有不需要的文件

for the_subarray in subarrays_files:

try:

zz=os.path.splitext(os.path.basename(the_subarray))[0]

os.remove(os.path.join(final_output_shp_path, zz+".shp"))

os.remove(os.path.join(final_output_shp_path, zz + ".shx"))

os.remove(os.path.join(final_output_shp_path, zz + ".dbf"))

os.remove(os.path.join(final_output_shp_path, zz + ".prj"))

except FileNotFoundError as e:

print("cannot remove unfound file ")

print("meregy done ........................hwhhwwhhwhwhwhwhhhhhhhhhhhhhh...." + filnal_output)

return filnal_output

#将所有大的水体切分成小的水体

def convert_to_small_tif(waterfiles):

lazy_results = [] # 记录延迟结果,看下这里能否用delayed

for the_water_file in waterfiles[1]:

#这里的waterfiles其实就是那个waterreserviro编号

lazy_result = dask.delayed(spilt_image_using_gdal)(the_water_file, waterfiles[0])

lazy_results.append(lazy_result)

return lazy_results

if __name__ == "__main__":

#需要求最大湖泊和水库的GWID

utility = utils()

lake_reserviro = utility.getmaxreserviors_lakes()

print(lake_reserviro[1])

lakes = lake_reserviro[1][2:]

print(lakes)

bag_allfiles= db.from_sequence(lakes) #根据所有lake获得对应的所有文件

bbag_= bag_allfiles.map(calculat_regions) #在所有文件中筛选指定区域的文件

bbag_water= bbag_.map(lambda x: filter_water(x)) #在所有筛选过的文件中,只找出是水体的文件

bbag_small_water= bbag_water.map(lambda x: convert_to_small_tif(x))

#好像这个地方也是可以继续在dealyed结果上,并行的

result = bbag_small_water.compute()

futures = dask.persist(*result) # trigger computation in the background

results = dask.compute(*futures)

print(results)

最为重要的是标注黑色的部分代码,bag_allfiles, bbag_ bbag_water 这三个Dask bag对象分别是获取特定湖泊水库的影像文件对应的所有文件,获取覆盖指定水湖泊区域的文件,以及找出那写特定水体影像文件,为了便于理解,可以看下解压全球水体数据集之后的文件目录情况:

在最里层的文件夹里面,包含的文件信息如下:

通过使用Dask bag以及嵌套Dask Dealyed对象,可以对覆盖特定湖泊水库的影像文件进行矢量化。

最后,作为总结,使用python Dask的动态调度和Bag对象,可以并行化对海量空间数据进行处理和分析。尽量使用 Dask bag对象创建延迟加载的复杂算法,可以看到使用这些对象做并处理还是十分方便和具有资源利用优势的,例如下图1表示用python多进程并行矢量化影像的资源利用情况:Python Dask并行矢量化海量影像的资源负载情况

python dask_使用Python并行框架Dask处理和分析大规模时空数据相关推荐

  1. python dask_利用python的dask搭建分布式集群

    一.dask介绍 优势:dask内部自动实现了分布式调度.无需用户自行编写复杂的调度逻辑和程序:通过调用简单的方法就可以进行分布式计算.并支持部分模型的并行化处理:内部实现的分布式算法:xgboost ...

  2. Python中MNE库的事件相关特定频段分析(MEG数据)

    最近做运动想象分类的时候遇到一个问题就是分类结果始终不准,想从原始数据分析一下脑电数据,找了下MNE提供的examples.里面还真有一个按频带分析的例子,说实话打开这个例子最主要的原因是这个图看着比 ...

  3. python dask_使用 Dask 在 Python 中进行并行计算 | Linux 中国

    原标题:使用 Dask 在 Python 中进行并行计算 | Linux 中国 Dask 库可以将 Python 计算扩展到多个核心甚至是多台机器. -- Moshe Zadka 关于 Python ...

  4. Windows上python开发--2安装django框架

    Windows上python开发--2安装django框架 分类: 服务器后台开发2014-05-17 21:22 2310人阅读 评论(2) 收藏 举报 python django 上一篇文章中讲了 ...

  5. 从Theano到Lasagne:基于Python的深度学习的框架和库

    从Theano到Lasagne:基于Python的深度学习的框架和库 [日期:2015-08-03] 来源:http://creative-punch.net/  作者:Creative Punch ...

  6. python爬虫库的功能_Python学习爬虫掌握的库资料大全和框架的选择的分析

    学Python,想必大家都是从爬虫开始的吧.毕竟网上类似的资源很丰富,开源项目也非常多. Python学习网络爬虫主要分3个大的版块:抓取,分析,存储 当我们在浏览器中输入一个url后回车,后台会发生 ...

  7. Python之web开发(一):python常用搭建网站的框架简介

    谈及WEB开发,使用java来的确要比python多的多.但实际上还是有很多大型的网站都是使用python搭建起来的,如国外最大的视频分析网站YouTube.国内的豆瓣.搜狐以及知乎等都是使用pyth ...

  8. 【Python进阶】Python进阶专栏、编程与开源框架知识星球上线,等你来follow

    大家好,今天我将在有三AI开设新专栏<Python进阶>.在这个专栏中,我们会讲述Python的各种进阶操作,包括Python对文件.数据的处理,Python各种好用的库如NumPy.Sc ...

  9. flask post json_【python:flask-SocketIO】网络通信框架简单了解

    Flask是一个用python开发的网络应用微框架. http://docs.jinkan.org/docs/flask/​docs.jinkan.org 而flask-SocketIO 为flask ...

  10. python测试4_Python 各种测试框架简介(四):pytest

    pytest 有时也被称为 py.test,是因为它使用的执行命令是 $ py.test.本文中我们使用 pytest 指代这个测试框架,py.test 特指运行命令. ##较于 nose 这里没有使 ...

最新文章

  1. spark on yarn 完全分布式_Spark编程笔记(1)-架构基础与运行原理
  2. CSS盒子模型之CSS3可伸缩框属性(Flexible Box)
  3. 【转载】从康耐视(NASDAQ : CGNX)看国内视觉识别行业的机会
  4. PHP替换回车换行的三种方法
  5. git 发布android 系统版本 修改版本型号 查看指定文件的修改记录
  6. HDFS概述和设计目标
  7. 关于SQL Server自动备份无法删除过期的备份文件奇怪现象
  8. 从入门到入土:python爬虫|scrapy初体验|安装教程|爬取豆瓣电影短评相关信息(昵称,内容,时间和评分)
  9. cd oracle home/dbs,Oracle专家高级编程学习笔记
  10. 【接口时序】8、DDR3驱动原理与FPGA实现(一、DDR的基本原理)
  11. 浏览器指纹?(防关联浏览器/指纹浏览器/超级浏览器/候鸟浏览器)
  12. matlab解线性方程组后结果是小数,MATLAB线性方程组求解
  13. python获取B站单个视频的封面
  14. Laravel php 框架的使用写出第一个hello world,Laravel 入门配置
  15. 北京第9届.NET俱乐部参与有感
  16. 学校计算机学院教学管理ER图,教学管理系统数据库ER图及SQL语句.doc
  17. varchar长度需要是2的倍数吗?
  18. 汽车零部件行业应用APS的必要性
  19. 令你痛苦者 必令你成长
  20. 供应链管理系统--(7)调拨管理--调拨单创建

热门文章

  1. 电脑搜索文件的服务器,Archivarius注册版
  2. 网红茶饮难逃“短命”之殇,喜茶能否打破这个魔咒?
  3. iapp禁止抓包软件代码
  4. python编程游戏-9个Python编程小游戏,有趣又好玩,简直太棒了
  5. 别再傻傻分不清 AVSx H.26x MPEG-x 了
  6. 分布式事务之柔性事务
  7. 34604-52-9,Ms-PEG3-Ms甲磺酸基是良好的离去基,也可用作伯醇的保护基
  8. Unity3D U3D安装教程
  9. 电脑版微信提示音mp3_短的微信提示音什么好?40首好听的微信提示音试听下载...
  10. AndroidStudio报错Transform output file D:\android\RfidDemo\app\libs\RFID_lib.jar does not exist.