DGL dist sampling OP
DGL dist sampling OP
- Standalone mode
- sample_neighbors
- distributed mode(multi-server one-client)
- one partition
- multi-partition
- ‼️DGL重要property
- ndata and edata
- dgl.NID and EID
- srcdata and dstdata
- ntypes and etypes
- 遇到的一些问题
- partition_graph的reshuffle若为False,在开启GraphServer是会报错
Traceback (most recent call last):
File “/home/amax/anaconda3/envs/torch/lib/python3.8/multiprocessing/process.py”, line 315, in _bootstrap
self.run()
File “/home/amax/anaconda3/envs/torch/lib/python3.8/multiprocessing/process.py”, line 108, in run
self._target(*self._args, **self._kwargs)
File “/home/amax/gnn-tutorial/tests/distributed/test_distributed_sampling_byme.py”, line 105, in start_server
g = DistGraphServer(rank, “kv_ip_config.txt”, num_servers, num_clients,
File “/home/amax/anaconda3/envs/torch/lib/python3.8/site-packages/dgl/distributed/dist_graph.py”, line 314, in init
self.gpb, graph_name, ntypes, etypes = load_partition_book(part_config, self.part_id)
File “/home/amax/anaconda3/envs/torch/lib/python3.8/site-packages/dgl/distributed/partition.py”, line 213, in load_partition_book
return BasicPartitionBook(part_id, num_parts, node_map, edge_map, graph),
File “/home/amax/anaconda3/envs/torch/lib/python3.8/site-packages/dgl/distributed/graph_partition_book.py”, line 457, in init
global_id = part_graph.ndata[NID]
AttributeError: ‘NoneType’ object has no attribute ‘ndata’
- 这是因为如果在partition_graph在reshuffle为False的时候,除了不会给图的partition添加’orig_id’这个name_field之外,还会对patition的meta文件有些不同,而这个会决定在使用load_partition_book返回的是BasicPartitionBook还是RangePartitionBook。Ture的时候是后者,False是为前者。BasicPartitionBook需要整张图要求比较高,而在开GraphServer的时候没有传图,所以会报错。而且load_partition_book是在back_up server的时候使用的,所以若没有back_up Server也会报错。
- 需要通过
os.environ['DGL_NUM_SERVER']
来告诉Client Server数量,不然Client只会连到一台Server上然后关闭这台Server而另一个Server不会关闭,始终在等待连接 - 将num_part设置为大于1的数的时候,会发现无论主Server还是back_up Server都只能load partition 0,也就是说这台机器的GraphServer只能load一个partition,同时也会报错
Traceback (most recent call last):
File “test_distributed_sampling_byme.py”, line 123, in
example2(file_dir, 2)
File “test_distributed_sampling_byme.py”, line 72, in example2
sampled_graph = start_sample_client(0, num_servers, tmpdir, num_servers>1)
File “test_distributed_sampling_byme.py”, line 92, in start_sample_client
dist_graph = DistGraph(“test_sampling”, gpb=gpb)
File “/home/amax/anaconda3/envs/torch/lib/python3.8/site-packages/dgl/distributed/dist_graph.py”, line 483, in init
self._ndata = NodeDataView(self)
File “/home/amax/anaconda3/envs/torch/lib/python3.8/site-packages/dgl/distributed/dist_graph.py”, line 182, in init
self._data[name.get_name()] = DistTensor(shape, dtype, name.get_name(),
File “/home/amax/anaconda3/envs/torch/lib/python3.8/site-packages/dgl/distributed/dist_tensor.py”, line 133, in init
assert part_policy.get_size() == shape[0],
AssertionError: The partition policy does not match the input shape.
Exception ignored in: <function DistTensor.del at 0x7f085d667700>
Traceback (most recent call last):
File “/home/amax/anaconda3/envs/torch/lib/python3.8/site-packages/dgl/distributed/dist_tensor.py”, line 166, in del
if not self._persistent and self._owner and initialized:
AttributeError: ‘DistTensor’ object has no attribute ‘_persistent’
当把num_parts设置为1错误消失
- 若将"ip_config"设置2个端口,会不会就能解决?–>因为等于说是给了两台机器,所以另一个端口上的GraphServer上就可以load进第二个Graph partition了 ?(貌似不行,)
- 若将shared memory设置为false,则会报错
Traceback (most recent call last):
File “/home/amax/anaconda3/envs/torch/lib/python3.8/multiprocessing/process.py”, line 315, in _bootstrap
self.run()
File “/home/amax/anaconda3/envs/torch/lib/python3.8/multiprocessing/process.py”, line 108, in run
self._target(*self._args, **self._kwargs)
File “/home/amax/gnn-tutorial/tests/distributed/test_distributed_sampling_byme.py”, line 108, in start_server
g.start()
File “/home/amax/anaconda3/envs/torch/lib/python3.8/site-packages/dgl/distributed/dist_graph.py”, line 364, in start
start_server(server_id=self.server_id,
File “/home/amax/anaconda3/envs/torch/lib/python3.8/site-packages/dgl/distributed/rpc_server.py”, line 92, in start_server
res = req.process_request(server_state)
File “/home/amax/anaconda3/envs/torch/lib/python3.8/site-packages/dgl/distributed/graph_services.py”, line 133, in process_request
global_src, global_dst, global_eids = _sample_neighbors(local_g, partition_book,
File “/home/amax/anaconda3/envs/torch/lib/python3.8/site-packages/dgl/distributed/graph_services.py”, line 58, in _sample_neighbors
local_ids = F.astype(local_ids, local_g.idtype)
AttributeError: ‘NoneType’ object has no attribute ‘idtype’
- 应该和上面reshuffle的问题差不多,没有shared_mem,没有local_g
设置为true则可以通过。
Standalone mode
sample_neighbors
sample_neighbors,看了一圈应该是dgl里面采样用的最多的函数了,使用起来也非常简单,最重要的三个参数
- graph: DistGraph
- nodes: 选择采样的node_ID,
- 这个node_ID是不是partition_graph reshuffle(如果有)之后重排的ID
- fanout: 最大的采样node数,超过就随机下采样
Returns
-------
DGLGraph
A sampled subgraph containing only the sampled neighboring edges. It is on CPU.
- 返回的graph的node_ID与原graph的node_ID一样?然后edge_ID重0开始排?与parition中的又如何?
示例
# Check standalone sampling
def example1(tmpdir, reshuffle):import numpy as npfrom dgl.data import CitationGraphDatasetfrom dgl.distributed import partition_graph, DistGraph, sample_neighbors, load_partitiong = CitationGraphDataset("cora")[0]num_parts = 1num_hops = 1partition_graph(g, 'test_sampling', num_parts, tmpdir, num_hops=num_hops, part_method='metis', reshuffle=reshuffle)os.environ['DGL_DIST_MODE'] = 'standalone'dgl.distributed.initialize('kv_ip_config.txt')dist_graph = DistGraph("test_sampling", part_config=tmpdir+'/test_sampling.json')sampled_graph = sample_neighbors(dist_graph, [0, 10, 99, 66, 1024, 2048], 3)# Test # graph, _, _, _, _, _, _ = load_partition(tmpdir+'/test_sampling.json', 0)# print(graph.ndata[dgl.NID], graph.ndata['orig_id'])src, dst = sampled_graph.edges()assert sampled_graph.number_of_nodes() == g.number_of_nodes()assert np.all(F.asnumpy(g.has_edges_between(src, dst)))print(sampled_graph.srcdata)# assert np.array_equal(# F.asnumpy(sampled_graph.ndata[dgl.NID]), F.asnumpy(g.ndata[dgl.NID])# )eids = g.edge_ids(src, dst)print(src, dst)print(eids)print(sampled_graph.edge_ids(src, dst))print(sampled_graph.find_edges(sampled_graph.edge_ids(src, dst)))# print(sampled_graph.find_edges(sampled_graph.edata[dgl.EID]))print(sampled_graph.edata[dgl.EID])# print(sampled_graph.has_edges_between(src, dst))assert np.array_equal(F.asnumpy(sampled_graph.edata[dgl.EID]), F.asnumpy(eids))dgl.distributed.exit_client()
Results
Graph(num_nodes=2708, num_edges=15,ndata_schemes={}edata_schemes={'_ID': Scheme(shape=(), dtype=torch.int64)})
tensor([ 3, 5, 6, 12, 11, 9, 100, 69, 65, 67, 1307, 594,1824, 2044, 2439]) tensor([ 0, 0, 0, 10, 10, 10, 99, 66, 66, 66, 1024, 1024,1024, 2048, 2048])
tensor([ 0, 1, 2, 49, 50, 51, 220, 168, 169, 170, 2647, 2648,2649, 7161, 7162])
tensor([ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14])
(tensor([ 3, 5, 6, 12, 11, 9, 100, 69, 65, 67, 1307, 594,1824, 2044, 2439]), tensor([ 0, 0, 0, 10, 10, 10, 99, 66, 66, 66, 1024, 1024,1024, 2048, 2048]))
tensor([ 0, 1, 2, 49, 50, 51, 220, 168, 169, 170, 2647, 2648,2649, 7161, 7162])
- sampled_graph里面ndata属性中没有’_ID’,edata中有’_ID’
- sampled_graph EID的顺序与原graph的顺序一致
- 通过edge_ids获得edge_id是重排后的连续edge id
distributed mode(multi-server one-client)
one partition
## test with 1 partition but multi-servers with back-up servers
def example2_v2(tmpdir, num_servers):import timeimport multiprocessing as mpimport numpy as npfrom utils import generate_ip_configfrom dgl.data import CitationGraphDatasetfrom dgl.distributed import partition_graphos.environ['DGL_DIST_MODE'] = 'distributed'# os.environ['DGL_NUM_SERVER'] = str(num_servers)ip_config = "kv_ip_config.txt"generate_ip_config(ip_config, 1, num_servers)g = CitationGraphDataset("cora")[0]print(g.num_nodes(), g.num_edges())# g.readonly()print(g.idtype)graph_name = "test_sampling"num_parts = 1num_hops = 1partition_graph(g, graph_name, num_parts, tmpdir,num_hops, reshuffle=True)pserver_list = []ctx = mp.get_context('spawn')for i in range(num_servers):p = ctx.Process(target=start_server_v2, args=(i, num_servers, 1, tmpdir, num_servers>1, 'test_sampling'))p.start()# time.sleep(1)pserver_list.append(p)sampled_graph = start_sample_client_v2(0, num_servers, tmpdir, num_servers>1)print("Done sampling")for p in pserver_list:p.join()src, dst = sampled_graph.edges()assert sampled_graph.number_of_nodes() == g.number_of_nodes()print(src, dst)assert np.all(F.asnumpy(g.has_edges_between(src, dst)))eids = g.edge_ids(src, dst)print(src, dst)print(eids)print(sampled_graph.edge_ids(src, dst))print(sampled_graph.find_edges(sampled_graph.edge_ids(src, dst)))# print(sampled_graph.find_edges(sampled_graph.edata[dgl.EID]))print(sampled_graph.edata[dgl.EID])assert np.array_equal(F.asnumpy(sampled_graph.edata[dgl.EID]), F.asnumpy(eids))def start_sample_client_v2(rank, num_servers, tmpdir, disable_shared_mem):from dgl.distributed import load_partition, DistGraph, sample_neighborsos.environ['DGL_NUM_SERVER'] = str(num_servers)gpb = Noneif disable_shared_mem:_, _, _, gpb, _, _, _ = load_partition(os.path.join(tmpdir, 'test_sampling.json'), rank)dgl.distributed.initialize("kv_ip_config.txt")dist_graph = DistGraph("test_sampling", gpb=gpb)try:sampled_graph = sample_neighbors(dist_graph, [0, 10, 99, 66, 1024, 2048], 3)except Exception as e:print(e)sampled_graph = Nonedgl.distributed.exit_client()return sampled_graphdef start_server_v2(rank, num_servers, num_clients, tmpdir, disable_shared_mem, graph_name, graph_format=['csc', 'coo']):from dgl.distributed import DistGraphServerg = DistGraphServer(rank, "kv_ip_config.txt", num_servers, num_clients, os.path.join(tmpdir, graph_name)+'.json', disable_shared_mem=not disable_shared_mem,graph_format=graph_format)print("Server {} is back_ip server: {}".format(rank, g.is_backup_server()))g.start()
multi-partition
## Test with multi partition and multi-server with back-up servers
def example2_v3(tmpdir, num_servers):import timeimport multiprocessing as mpimport numpy as npfrom utils import generate_ip_configfrom dgl.data import CitationGraphDatasetfrom dgl.distributed import partition_graphos.environ['DGL_DIST_MODE'] = 'distributed'# os.environ['DGL_NUM_SERVER'] = str(num_servers)ip_config = "kv_ip_config.txt"num_machines = 3generate_ip_config(ip_config, num_machines, num_servers)num_servers_total = num_machines * num_serversg = CitationGraphDataset("cora")[0]print(g.num_nodes(), g.num_edges())# g.readonly()print(g.idtype)graph_name = "test_sampling"num_parts = num_machinesnum_hops = 1partition_graph(g, graph_name, num_parts, tmpdir,num_hops, reshuffle=True)pserver_list = []ctx = mp.get_context('spawn')for i in range(num_servers_total):p = ctx.Process(target=start_server_v3, args=(i, num_servers, 1, tmpdir, num_servers_total>1, 'test_sampling'))p.start()# time.sleep(1)pserver_list.append(p)sampled_graph = start_sample_client_v3(0, num_servers, tmpdir, num_servers_total>1)print("Done sampling")for p in pserver_list:p.join()src, dst = sampled_graph.edges()assert sampled_graph.number_of_nodes() == g.number_of_nodes()print(src, dst)assert np.all(F.asnumpy(g.has_edges_between(src, dst)))eids = g.edge_ids(src, dst)print(src, dst)print(eids)print(sampled_graph.edge_ids(src, dst))print(sampled_graph.find_edges(sampled_graph.edge_ids(src, dst)))# print(sampled_graph.find_edges(sampled_graph.edata[dgl.EID]))print(sampled_graph.edata[dgl.EID])assert np.array_equal(F.asnumpy(sampled_graph.edata[dgl.EID]), F.asnumpy(eids))def start_sample_client_v3(rank, num_servers, tmpdir, disable_shared_mem):from dgl.distributed import load_partition, DistGraph, sample_neighborsos.environ['DGL_NUM_SERVER'] = str(num_servers)gpb = Noneif disable_shared_mem:graph, _, _, gpb, _, _, _ = load_partition(os.path.join(tmpdir, 'test_sampling.json'), rank)# print(graph.edata[dgl.EID])print(graph)# print(os.environ.get('DGL_ROLE'))# print(os.environ.get('client'))dgl.distributed.initialize("kv_ip_config.txt")dist_graph = DistGraph("test_sampling", gpb=gpb)try:sampled_graph = sample_neighbors(dist_graph, [0, 10, 99, 66, 1024, 2048], 3)except Exception as e:print(e)sampled_graph = Nonedgl.distributed.exit_client()return sampled_graphdef start_server_v3(rank, num_servers, num_clients, tmpdir, disable_shared_mem, graph_name, graph_format=['csc', 'coo']):# rank = rank *num_serversfrom dgl.distributed import DistGraphServerg = DistGraphServer(rank, "kv_ip_config.txt", num_servers, num_clients, os.path.join(tmpdir, graph_name)+'.json', disable_shared_mem=not disable_shared_mem,graph_format=graph_format)print("Server {} is back_ip server: {}".format(rank, g.is_backup_server()))g.start()# print("Server {} start".format(rank))
‼️DGL重要property
来自dgl.heterograph.py文件查看重要property
ndata and edata
dgl.NID and EID
partition: 如果reshuffle,与’orig_id’的区别是什么
sampling
srcdata and dstdata
ntypes and etypes
_N
DGL dist sampling OP相关推荐
- DGL distgraph store OP
DGL distgraph store OP DistGraph Server and Client Server(含对比) RPC OP KVStore OP Client(含对比) RPC OP ...
- GNN教程:DGL框架中的采样模型!
↑↑↑关注后"星标"Datawhale 每日干货 & 每月组队学习,不错过 Datawhale干货 作者:秦州,算法工程师,Datawhale成员 引言 本文为GNN教程的 ...
- IDE Framework之mmdetection使用记录20200707-
文章目录 mmdetection代码阅读 Git版本控制时的commit message mmdet_v2.7 MMDet v2.7.0 MMDet v2.3.0 {Config}与{ConfigDi ...
- GNN教程:大规模分布式训练
转载 目录 引言 多进程方案 Graph Store Distributed Sampler 后话 引言 本文为GNN教程的DGL框架之大规模分布式训练,前面的文章中我们介绍了图神经网络框架DGL如何 ...
- QBXT 2018春季DP图论班 2018.4.29 --- 最短路差分约束
*钟皓曦老师授课* 常见最短路算法: Floyd → O(n^3) //floydint dist[maxn][maxn];memset(dist,0x3f,sizeof(dist)); for (i ...
- 语义分割系列19-EMANet(pytorch实现)
EMANet:<Expectation-Maximization Attention Networks for Semantic Segmentation> 发布于2019ICCV,一作的 ...
- DGL RDKit | 基于Attentive FP可视化训练模型原子权重
DGL具有许多用于化学信息学.药物与生物信息学任务的函数. DGL开发人员提供了用于可视化训练模型原子权重的代码.使用Attentive FP构建模型后,可以可视化给定分子的原子权重,意味着每个原子对 ...
- DGL实现同构/异构图卷积模型
同构图卷积 from time import time import numpy as np import dgl import torch import torch.nn as nn import ...
- 重要性采样(Importance Sampling)简介和简单样例实现
重要性采样(Importance Sampling)简介和简单样例实现 在渲染领域,重要性采样这个术语是很常见的,但它究竟是什么呢?我们首先考虑这样的一种情况: 如果场景里有一点P,我们想计算P点的最 ...
- 使用DGL进行异构图元路径采样
异构图元路径采样 # -*- coding: utf-8 -*- import dgl import tqdm import os import multiprocessingnum_workers ...
最新文章
- vue填坑指南之模板的使用
- Shell echo-使用echo实现更复杂的输出格式控制
- 计算机网络 ip协议是,计算机网络知识:TCP/IP协议
- Go 语言中的 new() 和 make()的区别
- ubuntu18.10无法ping百度
- logcat read :Invaild argument
- linux 查看日志_干货 | 名企高频考点之谈谈Linux日志查看方式都有哪些
- testlink mysql配置_TestLink安装配置手册
- 如何在CentOS 7上安装Kubernetes Docker群集
- php 类的属性与方法的注意事项
- 六步搞定RHEL5下的mysql镜像数据库配置
- Java 会被新兴语言取代吗?
- 中国环境统计年鉴(2000到2018年)
- matlab中适应度函数怎么编写,matlab常用的几个适应度评价函数
- 护眼色的RGB值及颜色代码
- html图片显示详情,纯CSS鼠标经过图片视差弹出层显示详情链接按钮特效代码.html...
- 【项目实训】七牛云测试域名过期后所能采用的方法
- 2018-2-13-win10-UWP--蜘蛛网效果
- 使用disk genius合并C盘和D盘
- 我猜,你还不知道数据标注公司在做什么吧?