DGL dist sampling OP

  • Standalone mode
    • sample_neighbors
  • distributed mode(multi-server one-client)
    • one partition
    • multi-partition
  • ‼️DGL重要property
    • ndata and edata
    • dgl.NID and EID
    • srcdata and dstdata
    • ntypes and etypes
  • 遇到的一些问题
  • partition_graph的reshuffle若为False,在开启GraphServer是会报错

Traceback (most recent call last):
File “/home/amax/anaconda3/envs/torch/lib/python3.8/multiprocessing/process.py”, line 315, in _bootstrap
self.run()
File “/home/amax/anaconda3/envs/torch/lib/python3.8/multiprocessing/process.py”, line 108, in run
self._target(*self._args, **self._kwargs)
File “/home/amax/gnn-tutorial/tests/distributed/test_distributed_sampling_byme.py”, line 105, in start_server
g = DistGraphServer(rank, “kv_ip_config.txt”, num_servers, num_clients,
File “/home/amax/anaconda3/envs/torch/lib/python3.8/site-packages/dgl/distributed/dist_graph.py”, line 314, in init
self.gpb, graph_name, ntypes, etypes = load_partition_book(part_config, self.part_id)
File “/home/amax/anaconda3/envs/torch/lib/python3.8/site-packages/dgl/distributed/partition.py”, line 213, in load_partition_book
return BasicPartitionBook(part_id, num_parts, node_map, edge_map, graph),
File “/home/amax/anaconda3/envs/torch/lib/python3.8/site-packages/dgl/distributed/graph_partition_book.py”, line 457, in init
global_id = part_graph.ndata[NID]
AttributeError: ‘NoneType’ object has no attribute ‘ndata’

  • 这是因为如果在partition_graph在reshuffle为False的时候,除了不会给图的partition添加’orig_id’这个name_field之外,还会对patition的meta文件有些不同,而这个会决定在使用load_partition_book返回的是BasicPartitionBook还是RangePartitionBook。Ture的时候是后者,False是为前者。BasicPartitionBook需要整张图要求比较高,而在开GraphServer的时候没有传图,所以会报错。而且load_partition_book是在back_up server的时候使用的,所以若没有back_up Server也会报错。
  • 需要通过os.environ['DGL_NUM_SERVER']来告诉Client Server数量,不然Client只会连到一台Server上然后关闭这台Server而另一个Server不会关闭,始终在等待连接
  • 将num_part设置为大于1的数的时候,会发现无论主Server还是back_up Server都只能load partition 0,也就是说这台机器的GraphServer只能load一个partition,同时也会报错

Traceback (most recent call last):
File “test_distributed_sampling_byme.py”, line 123, in
example2(file_dir, 2)
File “test_distributed_sampling_byme.py”, line 72, in example2
sampled_graph = start_sample_client(0, num_servers, tmpdir, num_servers>1)
File “test_distributed_sampling_byme.py”, line 92, in start_sample_client
dist_graph = DistGraph(“test_sampling”, gpb=gpb)
File “/home/amax/anaconda3/envs/torch/lib/python3.8/site-packages/dgl/distributed/dist_graph.py”, line 483, in init
self._ndata = NodeDataView(self)
File “/home/amax/anaconda3/envs/torch/lib/python3.8/site-packages/dgl/distributed/dist_graph.py”, line 182, in init
self._data[name.get_name()] = DistTensor(shape, dtype, name.get_name(),
File “/home/amax/anaconda3/envs/torch/lib/python3.8/site-packages/dgl/distributed/dist_tensor.py”, line 133, in init
assert part_policy.get_size() == shape[0],
AssertionError: The partition policy does not match the input shape.
Exception ignored in: <function DistTensor.del at 0x7f085d667700>
Traceback (most recent call last):
File “/home/amax/anaconda3/envs/torch/lib/python3.8/site-packages/dgl/distributed/dist_tensor.py”, line 166, in del
if not self._persistent and self._owner and initialized:
AttributeError: ‘DistTensor’ object has no attribute ‘_persistent’

当把num_parts设置为1错误消失

  • 若将"ip_config"设置2个端口,会不会就能解决?–>因为等于说是给了两台机器,所以另一个端口上的GraphServer上就可以load进第二个Graph partition了 ?(貌似不行,)
  • 若将shared memory设置为false,则会报错

Traceback (most recent call last):
File “/home/amax/anaconda3/envs/torch/lib/python3.8/multiprocessing/process.py”, line 315, in _bootstrap
self.run()
File “/home/amax/anaconda3/envs/torch/lib/python3.8/multiprocessing/process.py”, line 108, in run
self._target(*self._args, **self._kwargs)
File “/home/amax/gnn-tutorial/tests/distributed/test_distributed_sampling_byme.py”, line 108, in start_server
g.start()
File “/home/amax/anaconda3/envs/torch/lib/python3.8/site-packages/dgl/distributed/dist_graph.py”, line 364, in start
start_server(server_id=self.server_id,
File “/home/amax/anaconda3/envs/torch/lib/python3.8/site-packages/dgl/distributed/rpc_server.py”, line 92, in start_server
res = req.process_request(server_state)
File “/home/amax/anaconda3/envs/torch/lib/python3.8/site-packages/dgl/distributed/graph_services.py”, line 133, in process_request
global_src, global_dst, global_eids = _sample_neighbors(local_g, partition_book,
File “/home/amax/anaconda3/envs/torch/lib/python3.8/site-packages/dgl/distributed/graph_services.py”, line 58, in _sample_neighbors
local_ids = F.astype(local_ids, local_g.idtype)
AttributeError: ‘NoneType’ object has no attribute ‘idtype’

  • 应该和上面reshuffle的问题差不多,没有shared_mem,没有local_g
    设置为true则可以通过。

Standalone mode

sample_neighbors

sample_neighbors,看了一圈应该是dgl里面采样用的最多的函数了,使用起来也非常简单,最重要的三个参数

  • graph: DistGraph
  • nodes: 选择采样的node_ID,
    • 这个node_ID是不是partition_graph reshuffle(如果有)之后重排的ID
  • fanout: 最大的采样node数,超过就随机下采样

Returns
-------
DGLGraph
A sampled subgraph containing only the sampled neighboring edges. It is on CPU.

  • 返回的graph的node_ID与原graph的node_ID一样?然后edge_ID重0开始排?与parition中的又如何?
    示例
# Check standalone sampling
def example1(tmpdir, reshuffle):import numpy as npfrom dgl.data import CitationGraphDatasetfrom dgl.distributed import partition_graph, DistGraph, sample_neighbors, load_partitiong = CitationGraphDataset("cora")[0]num_parts = 1num_hops = 1partition_graph(g, 'test_sampling', num_parts, tmpdir, num_hops=num_hops, part_method='metis', reshuffle=reshuffle)os.environ['DGL_DIST_MODE'] = 'standalone'dgl.distributed.initialize('kv_ip_config.txt')dist_graph = DistGraph("test_sampling", part_config=tmpdir+'/test_sampling.json')sampled_graph = sample_neighbors(dist_graph, [0, 10, 99, 66, 1024, 2048], 3)# Test # graph, _, _, _, _, _, _ = load_partition(tmpdir+'/test_sampling.json', 0)# print(graph.ndata[dgl.NID], graph.ndata['orig_id'])src, dst = sampled_graph.edges()assert sampled_graph.number_of_nodes() == g.number_of_nodes()assert np.all(F.asnumpy(g.has_edges_between(src, dst)))print(sampled_graph.srcdata)# assert np.array_equal(#     F.asnumpy(sampled_graph.ndata[dgl.NID]), F.asnumpy(g.ndata[dgl.NID])# )eids = g.edge_ids(src, dst)print(src, dst)print(eids)print(sampled_graph.edge_ids(src, dst))print(sampled_graph.find_edges(sampled_graph.edge_ids(src, dst)))# print(sampled_graph.find_edges(sampled_graph.edata[dgl.EID]))print(sampled_graph.edata[dgl.EID])# print(sampled_graph.has_edges_between(src, dst))assert np.array_equal(F.asnumpy(sampled_graph.edata[dgl.EID]), F.asnumpy(eids))dgl.distributed.exit_client()

Results

Graph(num_nodes=2708, num_edges=15,ndata_schemes={}edata_schemes={'_ID': Scheme(shape=(), dtype=torch.int64)})
tensor([   3,    5,    6,   12,   11,    9,  100,   69,   65,   67, 1307,  594,1824, 2044, 2439]) tensor([   0,    0,    0,   10,   10,   10,   99,   66,   66,   66, 1024, 1024,1024, 2048, 2048])
tensor([   0,    1,    2,   49,   50,   51,  220,  168,  169,  170, 2647, 2648,2649, 7161, 7162])
tensor([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14])
(tensor([   3,    5,    6,   12,   11,    9,  100,   69,   65,   67, 1307,  594,1824, 2044, 2439]), tensor([   0,    0,    0,   10,   10,   10,   99,   66,   66,   66, 1024, 1024,1024, 2048, 2048]))
tensor([   0,    1,    2,   49,   50,   51,  220,  168,  169,  170, 2647, 2648,2649, 7161, 7162])
  • sampled_graph里面ndata属性中没有’_ID’,edata中有’_ID’
  • sampled_graph EID的顺序与原graph的顺序一致
  • 通过edge_ids获得edge_id是重排后的连续edge id

distributed mode(multi-server one-client)

one partition

## test with 1 partition but multi-servers with back-up servers
def example2_v2(tmpdir, num_servers):import timeimport multiprocessing as mpimport numpy as npfrom utils import generate_ip_configfrom dgl.data import CitationGraphDatasetfrom dgl.distributed import partition_graphos.environ['DGL_DIST_MODE'] = 'distributed'# os.environ['DGL_NUM_SERVER'] = str(num_servers)ip_config = "kv_ip_config.txt"generate_ip_config(ip_config, 1, num_servers)g = CitationGraphDataset("cora")[0]print(g.num_nodes(), g.num_edges())# g.readonly()print(g.idtype)graph_name = "test_sampling"num_parts = 1num_hops = 1partition_graph(g, graph_name, num_parts, tmpdir,num_hops, reshuffle=True)pserver_list = []ctx = mp.get_context('spawn')for i in range(num_servers):p = ctx.Process(target=start_server_v2, args=(i, num_servers, 1, tmpdir, num_servers>1, 'test_sampling'))p.start()# time.sleep(1)pserver_list.append(p)sampled_graph = start_sample_client_v2(0, num_servers, tmpdir, num_servers>1)print("Done sampling")for p in pserver_list:p.join()src, dst = sampled_graph.edges()assert sampled_graph.number_of_nodes() == g.number_of_nodes()print(src, dst)assert np.all(F.asnumpy(g.has_edges_between(src, dst)))eids = g.edge_ids(src, dst)print(src, dst)print(eids)print(sampled_graph.edge_ids(src, dst))print(sampled_graph.find_edges(sampled_graph.edge_ids(src, dst)))# print(sampled_graph.find_edges(sampled_graph.edata[dgl.EID]))print(sampled_graph.edata[dgl.EID])assert np.array_equal(F.asnumpy(sampled_graph.edata[dgl.EID]), F.asnumpy(eids))def start_sample_client_v2(rank, num_servers, tmpdir, disable_shared_mem):from dgl.distributed import load_partition, DistGraph, sample_neighborsos.environ['DGL_NUM_SERVER'] = str(num_servers)gpb = Noneif disable_shared_mem:_, _, _, gpb, _, _, _ = load_partition(os.path.join(tmpdir, 'test_sampling.json'), rank)dgl.distributed.initialize("kv_ip_config.txt")dist_graph = DistGraph("test_sampling", gpb=gpb)try:sampled_graph = sample_neighbors(dist_graph, [0, 10, 99, 66, 1024, 2048], 3)except Exception as e:print(e)sampled_graph = Nonedgl.distributed.exit_client()return sampled_graphdef start_server_v2(rank, num_servers, num_clients, tmpdir, disable_shared_mem, graph_name, graph_format=['csc', 'coo']):from dgl.distributed import DistGraphServerg = DistGraphServer(rank, "kv_ip_config.txt", num_servers, num_clients, os.path.join(tmpdir, graph_name)+'.json', disable_shared_mem=not disable_shared_mem,graph_format=graph_format)print("Server {} is back_ip server: {}".format(rank, g.is_backup_server()))g.start()

multi-partition

## Test with multi partition and multi-server with back-up servers
def example2_v3(tmpdir, num_servers):import timeimport multiprocessing as mpimport numpy as npfrom utils import generate_ip_configfrom dgl.data import CitationGraphDatasetfrom dgl.distributed import partition_graphos.environ['DGL_DIST_MODE'] = 'distributed'# os.environ['DGL_NUM_SERVER'] = str(num_servers)ip_config = "kv_ip_config.txt"num_machines = 3generate_ip_config(ip_config, num_machines, num_servers)num_servers_total = num_machines * num_serversg = CitationGraphDataset("cora")[0]print(g.num_nodes(), g.num_edges())# g.readonly()print(g.idtype)graph_name = "test_sampling"num_parts = num_machinesnum_hops = 1partition_graph(g, graph_name, num_parts, tmpdir,num_hops, reshuffle=True)pserver_list = []ctx = mp.get_context('spawn')for i in range(num_servers_total):p = ctx.Process(target=start_server_v3, args=(i, num_servers, 1, tmpdir, num_servers_total>1, 'test_sampling'))p.start()# time.sleep(1)pserver_list.append(p)sampled_graph = start_sample_client_v3(0, num_servers, tmpdir, num_servers_total>1)print("Done sampling")for p in pserver_list:p.join()src, dst = sampled_graph.edges()assert sampled_graph.number_of_nodes() == g.number_of_nodes()print(src, dst)assert np.all(F.asnumpy(g.has_edges_between(src, dst)))eids = g.edge_ids(src, dst)print(src, dst)print(eids)print(sampled_graph.edge_ids(src, dst))print(sampled_graph.find_edges(sampled_graph.edge_ids(src, dst)))# print(sampled_graph.find_edges(sampled_graph.edata[dgl.EID]))print(sampled_graph.edata[dgl.EID])assert np.array_equal(F.asnumpy(sampled_graph.edata[dgl.EID]), F.asnumpy(eids))def start_sample_client_v3(rank, num_servers, tmpdir, disable_shared_mem):from dgl.distributed import load_partition, DistGraph, sample_neighborsos.environ['DGL_NUM_SERVER'] = str(num_servers)gpb = Noneif disable_shared_mem:graph, _, _, gpb, _, _, _ = load_partition(os.path.join(tmpdir, 'test_sampling.json'), rank)# print(graph.edata[dgl.EID])print(graph)# print(os.environ.get('DGL_ROLE'))# print(os.environ.get('client'))dgl.distributed.initialize("kv_ip_config.txt")dist_graph = DistGraph("test_sampling", gpb=gpb)try:sampled_graph = sample_neighbors(dist_graph, [0, 10, 99, 66, 1024, 2048], 3)except Exception as e:print(e)sampled_graph = Nonedgl.distributed.exit_client()return sampled_graphdef start_server_v3(rank, num_servers, num_clients, tmpdir, disable_shared_mem, graph_name, graph_format=['csc', 'coo']):# rank = rank *num_serversfrom dgl.distributed import DistGraphServerg = DistGraphServer(rank, "kv_ip_config.txt", num_servers, num_clients, os.path.join(tmpdir, graph_name)+'.json', disable_shared_mem=not disable_shared_mem,graph_format=graph_format)print("Server {} is back_ip server: {}".format(rank, g.is_backup_server()))g.start()# print("Server {} start".format(rank))

‼️DGL重要property

来自dgl.heterograph.py文件查看重要property

ndata and edata

dgl.NID and EID

partition: 如果reshuffle,与’orig_id’的区别是什么
sampling

srcdata and dstdata

ntypes and etypes

_N

DGL dist sampling OP相关推荐

  1. DGL distgraph store OP

    DGL distgraph store OP DistGraph Server and Client Server(含对比) RPC OP KVStore OP Client(含对比) RPC OP ...

  2. GNN教程:DGL框架中的采样模型!

    ↑↑↑关注后"星标"Datawhale 每日干货 & 每月组队学习,不错过 Datawhale干货 作者:秦州,算法工程师,Datawhale成员 引言 本文为GNN教程的 ...

  3. IDE Framework之mmdetection使用记录20200707-

    文章目录 mmdetection代码阅读 Git版本控制时的commit message mmdet_v2.7 MMDet v2.7.0 MMDet v2.3.0 {Config}与{ConfigDi ...

  4. GNN教程:大规模分布式训练

    转载 目录 引言 多进程方案 Graph Store Distributed Sampler 后话 引言 本文为GNN教程的DGL框架之大规模分布式训练,前面的文章中我们介绍了图神经网络框架DGL如何 ...

  5. QBXT 2018春季DP图论班 2018.4.29 --- 最短路差分约束

    *钟皓曦老师授课* 常见最短路算法: Floyd → O(n^3) //floydint dist[maxn][maxn];memset(dist,0x3f,sizeof(dist)); for (i ...

  6. 语义分割系列19-EMANet(pytorch实现)

    EMANet:<Expectation-Maximization Attention Networks for Semantic Segmentation> 发布于2019ICCV,一作的 ...

  7. DGL RDKit | 基于Attentive FP可视化训练模型原子权重

    DGL具有许多用于化学信息学.药物与生物信息学任务的函数. DGL开发人员提供了用于可视化训练模型原子权重的代码.使用Attentive FP构建模型后,可以可视化给定分子的原子权重,意味着每个原子对 ...

  8. DGL实现同构/异构图卷积模型

    同构图卷积 from time import time import numpy as np import dgl import torch import torch.nn as nn import ...

  9. 重要性采样(Importance Sampling)简介和简单样例实现

    重要性采样(Importance Sampling)简介和简单样例实现 在渲染领域,重要性采样这个术语是很常见的,但它究竟是什么呢?我们首先考虑这样的一种情况: 如果场景里有一点P,我们想计算P点的最 ...

  10. 使用DGL进行异构图元路径采样

    异构图元路径采样 # -*- coding: utf-8 -*- import dgl import tqdm import os import multiprocessingnum_workers ...

最新文章

  1. vue填坑指南之模板的使用
  2. Shell echo-使用echo实现更复杂的输出格式控制
  3. 计算机网络 ip协议是,计算机网络知识:TCP/IP协议
  4. Go 语言中的 new() 和 make()的区别
  5. ubuntu18.10无法ping百度
  6. logcat read :Invaild argument
  7. linux 查看日志_干货 | 名企高频考点之谈谈Linux日志查看方式都有哪些
  8. testlink mysql配置_TestLink安装配置手册
  9. 如何在CentOS 7上安装Kubernetes Docker群集
  10. php 类的属性与方法的注意事项
  11. 六步搞定RHEL5下的mysql镜像数据库配置
  12. Java 会被新兴语言取代吗?
  13. 中国环境统计年鉴(2000到2018年)
  14. matlab中适应度函数怎么编写,matlab常用的几个适应度评价函数
  15. 护眼色的RGB值及颜色代码
  16. html图片显示详情,纯CSS鼠标经过图片视差弹出层显示详情链接按钮特效代码.html...
  17. 【项目实训】七牛云测试域名过期后所能采用的方法
  18. 2018-2-13-win10-UWP--蜘蛛网效果
  19. 使用disk genius合并C盘和D盘
  20. 我猜,你还不知道数据标注公司在做什么吧?

热门文章

  1. 软件工程:第一章笔记下
  2. 敬业签桌面便签软件:该openid已被使用,无法绑定!(适用QQ微信互联登录解绑)
  3. UE4编译错误:Unable to delete XXX
  4. 使用svn向指定文件夹下载数据
  5. HTML+CSS写一个三角形(原理+实例)
  6. Android Studio 部分查找快捷键
  7. r语言中trifit怎么用_R语言中的stargazer包的使用方法
  8. 前端程序员为何焦虑?web前端未来终将是什么样?
  9. QModelIndex/Role/Model介紹一
  10. php修改服务器ip地址,php修改服务器ip地址