1. Actors

Ray中的远程函数被认为是功能性强和副作用低的框架。 仅限于远程函数的情况下,可以为我们提供分布式函数编程,这对于许多使用情况非常有用,但在实践中会受到一些限制。

Ray通过actor扩展了数据流模型。 一个actor本质上是一个有状态的worker(或service)。 当一个新的actor被实例化时,一个新的worker被创建,并且该actor的方法被安排在该特定的worker上,并且可以访问和改变该worker的状态。

假设我们已经开始了Ray。

import ray
ray.init()

2. 定义和创建一个actor

考虑下面的简单例子。装饰器 ray.remote表明类Counter实例化后为actor。

@ray.remote
class Counter(object):def __init__(self):self.value = 0def increment(self):self.value += 1return self.value

为了真正创建一个actor,我们通过调用Counter.remote()来实例化该类。

a1 = Counter.remote()
a2 = Counter.remote()

当一个actor被实例化,下面的情况发生

  1. 选择集群中的一个节点,并在该节点上(由该节点上的本地调度程序)创建一个worker,以便运行在该actor上调用的方法。
  2. 在该worker上创建一个Counter对象,并运行Counter构造函数。

3. 使用actor

我们可以通过调用actor的方法来调度它的任务。

a1.increment.remote()  # ray.get returns 1
a2.increment.remote()  # ray.get returns 1

当a1.increment.remote()被调用,下面的情况发生。

  1. 任务被创建。
  2. 该任务被直接分配给负责该actor的本地调度程序(由驱动程序的本地调度器控制)。 因此,此调度过程绕过全局调度程序。
  3. 一个对象ID被返回。

然后,我们可以调用ray.get根据对象ID来获取实际值。

同样,对a2.increment.remote()的调用将产生一个任务,该任务被调度到第二个Counter actor上。 由于这两个任务在不同的actor上运行,它们可以并行执行(请注意,只有该actor上的方法才被调度分配到actor worker上,常规的远程函数不会)。

另一方面,在同一个Counter actor上调用的方法按照它们被调用的顺序依次执行。 因此它们可以相互共享状态,如下所示。

# Create ten Counter actors.
counters = [Counter.remote() for _ in range(10)]# Increment each Counter once and get the results. These tasks all happen in parallel.
results = ray.get([c.increment.remote() for c in counters])
print(results)  # prints [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]# Increment the first Counter five times. These tasks are executed serially and share state.
results = ray.get([counters[0].increment.remote() for _ in range(5)])
print(results)  # prints [2, 3, 4, 5, 6]

4. 一个更加有趣的actor示例

一个常见的模式是使用actor来封装由外部库或服务管理的可变状态。

Gym为许多模拟环境提供了一个接口,用于测试和训练强化学习的agent。 这些模拟器是有状态的,使用这些模拟器的任务肯定会改变它们的状态。 我们可以使用actor来封装这些模拟器的状态。

import gym@ray.remote
class GymEnvironment(object):def __init__(self, name):self.env = gym.make(name)self.env.reset()def step(self, action):return self.env.step(action)def reset(self):self.env.reset()

我们然后如下在该actor节点上示例化一个actor并调度分配一个任务。

pong = GymEnvironment.remote("Pong-v0")
pong.step.remote(0)  # Take action 0 in the simulator.

5. 在actor上使用GPU

一个常见的用例是actor包含一个神经网络。 例如,假设我们已经导入了Tensorflow并创建了一个构建神经网络的方法。

import tensorflow as tfdef construct_network():x = tf.placeholder(tf.float32, [None, 784])y_ = tf.placeholder(tf.float32, [None, 10])W = tf.Variable(tf.zeros([784, 10]))b = tf.Variable(tf.zeros([10]))y = tf.nn.softmax(tf.matmul(x, W) + b)cross_entropy = tf.reduce_mean(-tf.reduce_sum(y_ * tf.log(y), reduction_indices=[1]))train_step = tf.train.GradientDescentOptimizer(0.5).minimize(cross_entropy)correct_prediction = tf.equal(tf.argmax(y,1), tf.argmax(y_,1))accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))return x, y_, train_step, accuracy

我们然后可以如下为神经网络创建一个actor。

import os# Define an actor that runs on GPUs. If there are no GPUs, then simply use
# ray.remote without any arguments and no parentheses.
@ray.remote(num_gpus=1)
class NeuralNetOnGPU(object):def __init__(self):# Set an environment variable to tell TensorFlow which GPUs to use. Note# that this must be done before the call to tf.Session.os.environ["CUDA_VISIBLE_DEVICES"] = ",".join([str(i) for i in ray.get_gpu_ids()])with tf.Graph().as_default():with tf.device("/gpu:0"):self.x, self.y_, self.train_step, self.accuracy = construct_network()# Allow this to run on CPUs if there aren't any GPUs.config = tf.ConfigProto(allow_soft_placement=True)self.sess = tf.Session(config=config)# Initialize the network.init = tf.global_variables_initializer()self.sess.run(init)

为了表明一个actor需要用到GPU,我们将num_gpus = 1传递给ray.remote。 请注意,为了实现这一点,Ray必须初始化时指定使用GPU,例如,通过ray.init(num_gpus = 2)。 否则,当你尝试使用NeuralNetOnGPU.remote()实例化GPU版本时,会引发异常,说明系统中没有足够的GPU。

当actor创建时,它将有权通过ray.get_gpu_ids()得到可以使用的GPU的ID的列表。 这是一个整数列表,如[]或[1]或[2,5,6]。 由于我们传入了ray.remote(num_gpus = 1),因此此列表将具有一个长度。

我们可以将这一切放在一起,如下所示。

import os
import ray
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_dataray.init(num_gpus=8)def construct_network():x = tf.placeholder(tf.float32, [None, 784])y_ = tf.placeholder(tf.float32, [None, 10])W = tf.Variable(tf.zeros([784, 10]))b = tf.Variable(tf.zeros([10]))y = tf.nn.softmax(tf.matmul(x, W) + b)cross_entropy = tf.reduce_mean(-tf.reduce_sum(y_ * tf.log(y), reduction_indices=[1]))train_step = tf.train.GradientDescentOptimizer(0.5).minimize(cross_entropy)correct_prediction = tf.equal(tf.argmax(y,1), tf.argmax(y_,1))accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))return x, y_, train_step, accuracy@ray.remote(num_gpus=1)
class NeuralNetOnGPU(object):def __init__(self, mnist_data):self.mnist = mnist_data# Set an environment variable to tell TensorFlow which GPUs to use. Note# that this must be done before the call to tf.Session.os.environ["CUDA_VISIBLE_DEVICES"] = ",".join([str(i) for i in ray.get_gpu_ids()])with tf.Graph().as_default():with tf.device("/gpu:0"):self.x, self.y_, self.train_step, self.accuracy = construct_network()# Allow this to run on CPUs if there aren't any GPUs.config = tf.ConfigProto(allow_soft_placement=True)self.sess = tf.Session(config=config)# Initialize the network.init = tf.global_variables_initializer()self.sess.run(init)def train(self, num_steps):for _ in range(num_steps):batch_xs, batch_ys = self.mnist.train.next_batch(100)self.sess.run(self.train_step, feed_dict={self.x: batch_xs, self.y_: batch_ys})def get_accuracy(self):return self.sess.run(self.accuracy, feed_dict={self.x: self.mnist.test.images,self.y_: self.mnist.test.labels})# Load the MNIST dataset and tell Ray how to serialize the custom classes.
mnist = input_data.read_data_sets("MNIST_data", one_hot=True)# Create the actor.
nn = NeuralNetOnGPU.remote(mnist)# Run a few steps of training and print the accuracy.
nn.train.remote(100)
accuracy = ray.get(nn.get_accuracy.remote())
print("Accuracy is {}.".format(accuracy))

6. 围绕actor句柄传递

Actor句柄可以传递到其他任务。 为了用一个简单的例子来说明这一点,考虑一个简单的actor定义。 此功能目前是实验性的,并受以下所述的限制。

@ray.remote
class Counter(object):def __init__(self):self.counter = 0def inc(self):self.counter += 1def get_counter(self):return self.counter

我们可以定义使用actor句柄的远程函数(或者actor方法)。f(counter)中的counter为一个actor的句柄。

@ray.remote
def f(counter):while True:counter.inc.remote()

如果我们实例化一个actor,我们可以将这个句柄传递给各种任务。

counter = Counter.remote()# Start some tasks that use the actor.
[f.remote(counter) for _ in range(4)]# Print the counter value.
for _ in range(10):time.sleep(0.01)print(ray.get(counter.get_counter.remote()))

7. 当前actor的限制

我们正在努力解决以下问题。

  1. Actor生命周期管理:当前,当原始actor的句柄超出范围时,会为该actor安排一个任务来杀死actor进程。这会产生一个问题,如果最初的actor句柄超出了作用域,但actor仍然被已经传递了actor句柄的任务使用。
  2. 返回actor句柄:Actor句柄当前不能从远程函数或actor方法返回。同样,ray.put不能在actor句柄上调用。如上,句柄counter由Counter.remote()在本地调用,而不是从远程函数返回。
  3. 重建被丢弃的actor对象:如果在由actor方法创建的已丢弃对象上调用ray.get,则Ray当前不会重建该对象。
  4. 确定性重建丢失的actor:如果actor由于节点故障而丢失,则根据初始执行的顺序在新节点上重构actor。然而,同时安排在actor身上的新任务可能会在重新执行的任务之间执行。如果你的应用程序对状态一致性有严格的要求,这可能会成为问题。

机器学习框架Ray——Actor模型相关推荐

  1. 有望取代Spark,Michael Jordan和Ion Stoica提出下一代分布式实时机器学习框架Ray牛在哪?...

    从MR到Spark再到Ray Michael I. Jordan力荐的Ray 尽在"Ray Summit Pre-Con" 2020年9月21日 09:00-12:10 Ray项目 ...

  2. 机器学习框架Ray -- 1.2 Ray Core简介

    关键概念 概述Ray的关键概念. 这些原语一起工作,使得Ray能够灵活地支持广泛的分布式应用程序. Tasks 任务 Ray允许在不同的Python工作进程上异步执行任意函数.这些异步的Ray函数被称 ...

  3. 2020中国Ray技术峰会丨取代Spark,Michael Jordan和Ion Stoica提出下一代分布式实时机器学习框架...

    从MR到Spark再到Ray Michael I. Jordan力荐的Ray 尽在"Ray Summit Pre-Con" 2020年9月21日 09:00-12:10 Ray项目 ...

  4. C++并发编程框架Theron(1)——Actor模型介绍

    1 说在前面的话 Theron是近些年发展起来的一个非常不错的C++并发编程框架,最近有详细阅读Theron的相关资料,发现它思想非常有条理,结构很明朗,非常适合项目开发.其实Theron国内研究还是 ...

  5. ML之ME/LF:基于不同机器学习框架(sklearn/TF)下算法的模型评估指标(损失函数)代码实现及其函数(Scoring/metrics)代码实现(仅代码)

    ML之ME/LF:基于不同机器学习框架(sklearn/TF)下算法的模型评估指标(损失函数)代码实现及其函数(Scoring/metrics)代码实现(仅代码) 目录 单个评价指标各种框架下实现 1 ...

  6. 下一代的 Actor 模型框架 Proto Actor

    ProtoAct 是下一代的 Actor 模型框架,提供了 .NET 和 Go 语言的实现,默认支持分布式,提供管理和监控功能.在过去几年,我们经常看到两种 Actor 模型方法相互竞争,首先是经典的 ...

  7. mysql逻辑架构连接池_GitHub - zzjzzb/ycsocket: 基于swoole的socket框架,支持协程版MySQL、Redis连接池、Actor模型...

    ycsocket 基于 swoole 和 swoole_orm 的 websocket 框架,各位可以自己扩展到 TCP/UDP,HTTP. 在ycsocket 中,采用的是全协程化,全池化的数据库. ...

  8. Orleans 分布式 计算框架-Actor模型的一种实现

    写在前面 Orleans是基于Actor模型思想的.NET领域的框架,它提供了一种直接而简单的方法来构建分布式大规模计算应用程序,而无需学习和应用复杂的并发或其他扩展模式.我在2015年下半年开始应用 ...

  9. 高性能最终一致性框架Ray之基本概念原理

    一.Actor介绍 Actor是一种并发模型,是共享内存并发模型的替代方案. 共享内存模型的缺点: 共享内存模型使用各种各样的锁来解决状态竞争问题,性能低下且让编码变得复杂和容易出错. 共享内存受限于 ...

最新文章

  1. 认认真真推荐几个机器学习类的公众号
  2. MVVM架构~knockoutjs系列之为validation.js扩展minLength和maxLength
  3. Debug enterprise search menu
  4. 《线程管理:传递参数、确定线程数量、线程标识》
  5. 2d游戏地图编辑器_C语言实现大型2D格斗游戏,1.8万行代码!
  6. linux常用压缩/解压命令
  7. 7. Simple Product
  8. 什么是网络Bypass交换机?
  9. 数据结构1800题-错题集-第一章
  10. java调用mac终端命令_JAVA之前 - mac终端命令行
  11. 小白从零开发鸿蒙小游戏(1)“数字华容道”—【深鸿会学习小组学习心得及笔记】
  12. PS怎么把人物扣的更干净_PS抠图技巧
  13. 在elementUI中sort-orders排序,默认为三种,怎么改成两种
  14. 如何下载720云上的全景图片?
  15. 洛谷-P1107 [BJWC2008]雷涛的小猫
  16. python中从键盘输入五个单词输出以元音字母开头的单词_matlab中 从一个文本读出所有英文单词,并且把所有以元音字母开头的字母首字母 的代码怎么写...
  17. gateway自定义负载均衡策略
  18. ES分组聚合Agg nested
  19. 如何理解向量组的秩和矩阵的秩
  20. wi8ndows无法加载,Win8.1系统更新Flash插件后无法自动加载插件怎么办

热门文章

  1. 解决Flex项目下bin-debug文件在其他电脑打开不显示的问题
  2. php下lua的运行,phpStudy中起用lua脚本
  3. 杯具”箴言成网络流行语
  4. JavaScript前端判断文件是否存在(案例详解)
  5. 集丰照明|一帖说清6种防眩射灯应用在家装修上
  6. HCL Domino/Notes专业课程和认证体系介绍
  7. 糖儿飞教你学C++ Socket网络编程——2.本书目录
  8. 获取选中状态复选框的值并添加id
  9. android 怎样设置铃声
  10. 微信公众号推文发布方法(内涵详细步骤)