蒙特卡罗模拟需要数以百万计的路径来得到精确的答案,这需要大量的计算。Ryan等人得研究表明,可以训练深度学习模型对衍生品进行估值。深度学习模型是准确和快速的,能够产生比传统模型快一百万倍的估值。在今天的推文中,我们将使用一个 全连接网络来学习亚式障碍期权的定价模式 。采用蒙特卡罗模拟作为训练的定价依据。我们使用与上一篇文章相同的亚式障碍期权模型,参数如下:T:到期如(年)

S:现货(美元)

K:Strike(美元)

sigma:波动率(per)

r:无风险利率(per)

mu:Drift Rate(per)

B:Barrier(美元)

下面的内容主要包括两个主题:使用蒙特卡罗定价动态数据集训练期权定价神的经网络模型。

使用蒙特卡罗定价静态数据集训练期权定价神经网络模型并进行推断。

2

批处理数据生成

数据集是深度学习训练的重要组成部分。我们将修改之前的单一亚式障碍期权定价代码来处理一批障碍期权定价。

加载库:

import cupy

import numpy as np

import math

import time

import torch

cupy.cuda.set_allocator(None)

from torch.utils.dlpack import from_dlpack

批量障碍期权定价模拟的CuPy版本如下:

cupy_batched_barrier_option = cupy.RawKernel(r'''

extern "C" __global__ void batched_barrier_option(

float *d_s,

const float T,

const float * K,

const float * B,

const float * S0,

const float * sigma,

const float * mu,

const float * r,

const float * d_normals,

const long N_STEPS,

const long N_PATHS,

const long N_BATCH)

{

unsigned idx = threadIdx.x + blockIdx.x * blockDim.x;

unsigned stride = blockDim.x * gridDim.x;

unsigned tid = threadIdx.x;

const float tmp3 = sqrt(T/N_STEPS);

for (unsigned i = idx; i

{

int batch_id = i / N_PATHS;

int path_id = i % N_PATHS;

float s_curr = S0[batch_id];

float tmp1 = mu[batch_id]*T/N_STEPS;

float tmp2 = exp(-r[batch_id]*T);

unsigned n=0;

double running_average = 0.0;

for(unsigned n = 0; n < N_STEPS; n++){

s_curr += tmp1 * s_curr + sigma[batch_id]*s_curr*tmp3*d_normals[path_id + batch_id * N_PATHS + n * N_PATHS * N_BATCH];

running_average += (s_curr - running_average) / (n + 1.0);

if (running_average <= B[batch_id]){

break;

}

}

float payoff = (running_average>K[batch_id] ? running_average-K[batch_id] : 0.f);

d_s[i] = tmp2 * payoff;

}

}

''', 'batched_barrier_option')

注意,参数(K, B, S0, sigma, mu, r)以批处理长度的数组形式传入。输出数组是一个1-D 的二维数组。第一个维度用于 Batch,第二个维度用于 Path。。

通过输入两组选项参数进行测试:

N_PATHS = 2048000

N_STEPS = 365

N_BATCH = 2

T = 1.0

K = cupy.array([110.0, 120.0], dtype=cupy.float32)

B = cupy.array([100.0, 90.0], dtype=cupy.float32)

S0 = cupy.array([120.0, 100.0], dtype=cupy.float32)

sigma = cupy.array([0.35, 0.2], dtype=cupy.float32)

mu = cupy.array([0.15, 0.1], dtype=cupy.float32)

r =cupy.array([0.05, 0.05], dtype=cupy.float32)

把这一切放进一个简单的函数来启动1GPU内核。每个Path的期权价格是相应路径terminal值的平均值。这可以很容易地通过Cupy函数平均值(axis=1)计算出来

def batch_run():

number_of_threads = 256

number_of_blocks = (N_PATHS * N_BATCH - 1) // number_of_threads + 1

randoms_gpu = cupy.random.normal(0, 1, N_BATCH*N_PATHS * N_STEPS, dtype=cupy.float32)

output = cupy.zeros(N_BATCH*N_PATHS, dtype=cupy.float32)

cupy.cuda.stream.get_current_stream().synchronize()

s = time.time()

cupy_batched_barrier_option((number_of_blocks,), (number_of_threads,),

(output, np.float32(T), K, B, S0, sigma, mu, r,

randoms_gpu, N_STEPS, N_PATHS, N_BATCH))

v = output.reshape(N_BATCH, N_PATHS).mean(axis=1)

cupy.cuda.stream.get_current_stream().synchronize()

e = time.time()

print('time', e-s, 'v',v)

batch_run()

time 0.013919591903686523 v [21.22405 0.8480416]

这将为66ms中的这两组期权参数生成21.22和0.848的期权价格。

它的工作效率很高,因此我们将构造一个OptionDataSet类来包装上面的代码,以便我们可以在Pytorch中使用它。对于每个下一个元素,生成指定范围内的均匀分布随机期权参数,启动GPU内核计算期权价格,通过DLPack将CuPy数组转换为带有zero-copy的Pytorch张量。请注意我们是如何实现iterable Dataset接口的:

class OptionDataSet(torch.utils.data.IterableDataset):

def __init__(self, max_len=10, number_path = 1000, batch=2, threads=256,seed=15):

self.num = 0

self.max_length = max_len

self.N_PATHS = number_path

self.N_STEPS = 365

self.N_BATCH = batch

self.T = np.float32(1.0)

self.output = cupy.zeros(self.N_BATCH*self.N_PATHS, dtype=cupy.float32)

self.number_of_blocks = (self.N_PATHS * self.N_BATCH - 1) // threads + 1

self.number_of_threads = threads

cupy.random.seed(seed)

def __len__(self):

return self.max_length

def __iter__(self):

self.num = 0

return self

def __next__(self):

if self.num > self.max_length:

raise StopIteration

X = cupy.random.rand(self.N_BATCH, 6, dtype=cupy.float32)

X = X * cupy.array([200.0, 0.99, 200.0, 0.4, 0.2, 0.2], dtype=cupy.float32)

X[:, 1] = X[:, 0] * X[:, 1]

randoms = cupy.random.normal(0, 1, self.N_BATCH * self.N_PATHS * self.N_STEPS, dtype=cupy.float32)

cupy_batched_barrier_option((self.number_of_blocks,), (self.number_of_threads,), (self.output, self.T, cupy.ascontiguousarray(X[:, 0]),

cupy.ascontiguousarray(X[:, 1]), cupy.ascontiguousarray(X[:, 2]), cupy.ascontiguousarray(X[:, 3]), cupy.ascontiguousarray(X[:, 4]), cupy.ascontiguousarray(X[:, 5]), randoms, self.N_STEPS, self.N_PATHS, self.N_BATCH))

Y = self.output.reshape(self.N_BATCH, self.N_PATHS).mean(axis=1)

self.num += 1

return (from_dlpack(X.toDlpack()), from_dlpack(Y.toDlpack()))

将所有与Pytorch数据集相关的内容都放到一个名为cupy_dataset.py的文件中:

%%writefile cupy_dataset.py

import cupy

import numpy as np

import torch

from torch.utils.dlpack import from_dlpack

cupy.cuda.set_allocator(None)

cupy_batched_barrier_option = cupy.RawKernel(r'''

extern "C" __global__ void batched_barrier_option(

float *d_s,

const float T,

const float * K,

const float * B,

const float * S0,

const float * sigma,

const float * mu,

const float * r,

const float * d_normals,

const long N_STEPS,

const long N_PATHS,

const long N_BATCH)

{

unsigned idx = threadIdx.x + blockIdx.x * blockDim.x;

unsigned stride = blockDim.x * gridDim.x;

unsigned tid = threadIdx.x;

const float tmp3 = sqrt(T/N_STEPS);

for (unsigned i = idx; i

{

int batch_id = i / N_PATHS;

int path_id = i % N_PATHS;

float s_curr = S0[batch_id];

float tmp1 = mu[batch_id]*T/N_STEPS;

float tmp2 = exp(-r[batch_id]*T);

unsigned n=0;

double running_average = 0.0;

for(unsigned n = 0; n < N_STEPS; n++){

s_curr += tmp1 * s_curr + sigma[batch_id]*s_curr*tmp3*d_normals[path_id + batch_id * N_PATHS + n * N_PATHS * N_BATCH];

running_average += (s_curr - running_average) / (n + 1.0);

if (running_average <= B[batch_id]){

break;

}

}

float payoff = (running_average>K[batch_id] ? running_average-K[batch_id] : 0.f);

d_s[i] = tmp2 * payoff;

}

}

''', 'batched_barrier_option')

class OptionDataSet(torch.utils.data.IterableDataset):

def __init__(self, max_len=10, number_path = 1000, batch=2, threads=256,seed=15):

self.num = 0

self.max_length = max_len

self.N_PATHS = number_path

self.N_STEPS = 365

self.N_BATCH = batch

self.T = np.float32(1.0)

self.output = cupy.zeros(self.N_BATCH*self.N_PATHS, dtype=cupy.float32)

self.number_of_blocks = (self.N_PATHS * self.N_BATCH - 1) // threads + 1

self.number_of_threads = threads

cupy.random.seed(seed)

def __len__(self):

return self.max_length

def __iter__(self):

self.num = 0

return self

def __next__(self):

if self.num > self.max_length:

raise StopIteration

X = cupy.random.rand(self.N_BATCH, 6, dtype=cupy.float32)

X = X * cupy.array([200.0, 0.99, 200.0, 0.4, 0.2, 0.2], dtype=cupy.float32)

X[:, 1] = X[:, 0] * X[:, 1]

randoms = cupy.random.normal(0, 1, self.N_BATCH * self.N_PATHS * self.N_STEPS, dtype=cupy.float32)

cupy_batched_barrier_option((self.number_of_blocks,), (self.number_of_threads,), (self.output, self.T, cupy.ascontiguousarray(X[:, 0]),

cupy.ascontiguousarray(X[:, 1]), cupy.ascontiguousarray(X[:, 2]), cupy.ascontiguousarray(X[:, 3]), cupy.ascontiguousarray(X[:, 4]), cupy.ascontiguousarray(X[:, 5]), randoms, self.N_STEPS, self.N_PATHS, self.N_BATCH))

Y = self.output.reshape(self.N_BATCH, self.N_PATHS).mean(axis=1)

self.num += 1

return (from_dlpack(X.toDlpack()), from_dlpack(Y.toDlpack()))

覆盖cupy_dataset.py

这里是一个测试代码样本,有10个数据点、 batch为 16:

ds = OptionDataSet(10, number_path=100000, batch=16, seed=15)

for i in ds:

print(i[1])

我们可以实现相同的代码使用Numba加速计算在GPU:

import numba

from numba import cuda

@cuda.jit

def batch_barrier_option(d_s, T, K, B, S0, sigma, mu, r, d_normals, N_STEPS, N_PATHS, N_BATCH):

ii = cuda.threadIdx.x + cuda.blockIdx.x * cuda.blockDim.x

stride = cuda.gridDim.x * cuda.blockDim.x

tmp3 = math.sqrt(T/N_STEPS)

for i in range(ii, N_PATHS * N_BATCH, stride):

batch_id = i // N_PATHS

path_id = i % N_PATHS

tmp1 = mu[batch_id]*T/N_STEPS

tmp2 = math.exp(-r[batch_id]*T)

running_average = 0.0

s_curr = S0[batch_id]

for n in range(N_STEPS):

s_curr += tmp1 * s_curr + sigma[batch_id]*s_curr*tmp3*d_normals[path_id + batch_id * N_PATHS + n * N_PATHS * N_BATCH]

running_average = running_average + 1.0/(n + 1.0) * (s_curr - running_average)

if i==0 and batch_id == 2:

print(s_curr)

if running_average <= B[batch_id]:

break

payoff = running_average - K[batch_id] if running_average > K[batch_id] else 0

d_s[i] = tmp2 * payoff

class NumbaOptionDataSet(object):

def __init__(self, max_len=10, number_path = 1000, batch=2, threads=512, seed=15):

self.num = 0

self.max_length = max_len

self.N_PATHS = number_path

self.N_STEPS = 365

self.N_BATCH = batch

self.T = np.float32(1.0)

self.output = cupy.zeros(self.N_BATCH*self.N_PATHS, dtype=cupy.float32)

self.number_of_blocks = (self.N_PATHS * self.N_BATCH - 1) // threads + 1

self.number_of_threads = threads

cupy.random.seed(seed)

def __len__(self):

return self.max_length

def __iter__(self):

self.num = 0

return self

def __next__(self):

if self.num > self.max_length:

raise StopIteration

X = cupy.random.rand(self.N_BATCH, 6, dtype=cupy.float32)

X = X * cupy.array([200.0, 0.99, 200.0, 0.4, 0.2, 0.2], dtype=cupy.float32)

X[:, 1] = X[:, 0] * X[:, 1]

randoms = cupy.random.normal(0, 1, self.N_BATCH * self.N_PATHS * self.N_STEPS, dtype=cupy.float32)

batch_barrier_option[(self.number_of_blocks,), (self.number_of_threads,)](self.output, self.T, X[:, 0],

X[:, 1], X[:, 2], X[:, 3], X[:, 4], X[:, 5], randoms, self.N_STEPS, self.N_PATHS, self.N_BATCH)

o = self.output.reshape(self.N_BATCH, self.N_PATHS)

Y = o.mean(axis = 1)

self.num += 1

return (from_dlpack(X.toDlpack()), from_dlpack(Y.toDlpack()))

ds = NumbaOptionDataSet(10, number_path=100000, batch=16, seed=15)

for i in ds:

print(i[1])

3

模型

为了将期权参数映射到价格,我们使用了6层全连接神经网络,其隐含维度为512。将此DL价格模型写入model.py:

%%writefile model.py

import torch.nn as nn

import torch.nn.functional as F

import torch

class Net(nn.Module):

def __init__(self, hidden=1024):

super(Net, self).__init__()

self.fc1 = nn.Linear(6, hidden)

self.fc2 = nn.Linear(hidden, hidden)

self.fc3 = nn.Linear(hidden, hidden)

self.fc4 = nn.Linear(hidden, hidden)

self.fc5 = nn.Linear(hidden, hidden)

self.fc6 = nn.Linear(hidden, 1)

self.register_buffer('norm',

torch.tensor([200.0,

198.0,

200.0,

0.4,

0.2,

0.2]))

def forward(self, x):

x = x / self.norm

x = F.elu(self.fc1(x))

x = F.elu(self.fc2(x))

x = F.elu(self.fc3(x))

x = F.elu(self.fc4(x))

x = F.elu(self.fc5(x))

return self.fc6(x)

覆盖model.py

输入参数首先通过除以(200.0,198.0,200.0,0.4,0.2,0.2)缩小到0-1范围。然后在ELu激活函数后,将其5次隐射到隐藏维度512。选择ELu是因为我们需要计算参数的二阶微分。如果使用ReLu,二阶微分总是0。最后一层是线性层,它将隐藏维度映射到预测的期权价格。

在训练方面,我们使用了一个高级库Ignite来训练PyTorch中的神经网络:

我们使用MSELoss作为损失函数,Adam作为优化器,CosineAnnealingScheduler作为学习率调度器。下面的代码将随机期权数据提供给定价模型进行训练。

from ignite.engine import Engine, Events

from ignite.handlers import Timer

from torch.nn import MSELoss

from torch.optim import Adam

from ignite.contrib.handlers.param_scheduler import CosineAnnealingScheduler

from ignite.handlers import ModelCheckpoint

from model import Net

from cupy_dataset import OptionDataSet

timer = Timer(average=True)

model = Net().cuda()

loss_fn = MSELoss()

optimizer = Adam(model.parameters(), lr=1e-3)

dataset = OptionDataSet(max_len=10000, number_path = 1024, batch=4800)

def train_update(engine, batch):

model.train()

optimizer.zero_grad()

x = batch[0]

y = batch[1]

y_pred = model(x)

loss = loss_fn(y_pred[:,0], y)

loss.backward()

optimizer.step()

return loss.item()

trainer = Engine(train_update)

log_interval = 100

scheduler = CosineAnnealingScheduler(optimizer, 'lr', 1e-4, 1e-6, len(dataset))

trainer.add_event_handler(Events.ITERATION_STARTED, scheduler)

timer.attach(trainer,

start=Events.EPOCH_STARTED,

resume=Events.ITERATION_STARTED,

pause=Events.ITERATION_COMPLETED,

step=Events.ITERATION_COMPLETED)

@trainer.on(Events.ITERATION_COMPLETED)

def log_training_loss(engine):

iter = (engine.state.iteration - 1) % len(dataset) + 1

if iter % log_interval == 0:

print('loss', engine.state.output, 'average time', timer.value())

trainer.run(dataset, max_epochs=100)

损失在不断减少,这意味着定价模型可以更好地预测期权价格。平均计算一个批大小(mini-batch)量需要花费大约12ms,在接下的文章中,我们将尝试挖掘GPU的全部潜力来加速训练。

4

TensorCore混合精度训练

V100 GPU有640个张量核,可以加速半精度矩阵乘法运算,这是DL模型的核心运算。由NVIDIA开发的Apex库( https://github.com/NVIDIA/apex )使Pytorch中的混合精度和分布式训练变得容易。通过改变3行代码,可以利用张量核加速训练。

from apex import amp

from ignite.engine import Engine, Events

from torch.nn import MSELoss

from ignite.handlers import Timer

from torch.optim import Adam

from ignite.contrib.handlers.param_scheduler import CosineAnnealingScheduler

from ignite.handlers import ModelCheckpoint

from model import Net

from cupy_dataset import OptionDataSet

timer = Timer(average=True)

model = Net().cuda()

loss_fn = MSELoss()

optimizer = Adam(model.parameters(), lr=1e-3)

opt_level = 'O1'

model, optimizer = amp.initialize(model, optimizer, opt_level=opt_level)

dataset = OptionDataSet(max_len=10000, number_path = 1024, batch=4800)

def train_update(engine, batch):

model.train()

optimizer.zero_grad()

x = batch[0]

y = batch[1]

y_pred = model(x)

loss = loss_fn(y_pred[:,0], y)

with amp.scale_loss(loss, optimizer) as scaled_loss:

scaled_loss.backward()

optimizer.step()

return loss.item()

trainer = Engine(train_update)

log_interval = 100

timer.attach(trainer,

start=Events.EPOCH_STARTED,

resume=Events.ITERATION_STARTED,

pause=Events.ITERATION_COMPLETED,

step=Events.ITERATION_COMPLETED)

scheduler = CosineAnnealingScheduler(optimizer, 'lr', 1e-4, 1e-6, len(dataset))

trainer.add_event_handler(Events.ITERATION_STARTED, scheduler)

@trainer.on(Events.ITERATION_COMPLETED)

def log_training_loss(engine):

iter = (engine.state.iteration - 1) % len(dataset) + 1

if iter % log_interval == 0:

print('loss', engine.state.output, 'average time', timer.value())

trainer.run(dataset, max_epochs=100)

它改进了以8ms计算每个 mini-batch 。为了获得更好的性能,我们将模型权值降低到半精度,因此需要调整损失以确保半精度动态范围与计算结果一致。它是猜测什么是正确的损失比例因子,并自动调整,如果梯度溢出。最后,在保持模型预测精度的前提下,获得最佳的硬件加速性能。

5

多个GPU训练

Apex让多GPU训练变得容易。在同一个训练脚本中,我们需要注意一些额外的步骤:

1、添加参数——local_rank,该参数将由分布式启动程序自动设置。

2、初始化进程组。

3、根据数据集中的进程id生成独立的批处理数据。

4、包装模型和优化器来处理分布式计算。

5、衡量损失和优化。

为了启动分布式训练,我们需要将所有内容都放到一个Python文件中。以下是一个例子:

%%writefile distributed_train.py

import cupy

import numpy as np

import math

import time

import os

import torch

from torch.utils.dlpack import from_dlpack

import torch.nn as nn

import torch.nn.functional as F

import torch

from apex import amp

from ignite.engine import Engine, Events

from torch.nn import MSELoss

from torch.optim import Adam

from ignite.contrib.handlers.param_scheduler import CosineAnnealingScheduler

from ignite.handlers import ModelCheckpoint

from apex.parallel import DistributedDataParallel

import argparse

from model import Net

from cupy_dataset import OptionDataSet

parser = argparse.ArgumentParser()

parser = argparse.ArgumentParser()

parser.add_argument("--local_rank", default=0, type=int)

args = parser.parse_args()

args.distributed = False

if 'WORLD_SIZE' in os.environ:

args.distributed = int(os.environ['WORLD_SIZE']) > 1

if args.distributed:

torch.cuda.set_device(args.local_rank)

torch.distributed.init_process_group(backend='nccl',

init_method='env://')

torch.backends.cudnn.benchmark = True

model = Net().cuda()

loss_fn = MSELoss()

optimizer = Adam(model.parameters(), lr=1e-3)

opt_level = 'O1'

model, optimizer = amp.initialize(model, optimizer, opt_level=opt_level)

if args.distributed:

model = DistributedDataParallel(model)

dataset = OptionDataSet(max_len=10000, number_path = 1024, batch=10240, seed=args.local_rank)

def train_update(engine, batch):

model.train()

optimizer.zero_grad()

x = batch[0]

y = batch[1]

y_pred = model(x)

loss = loss_fn(y_pred[:,0], y)

with amp.scale_loss(loss, optimizer) as scaled_loss:

scaled_loss.backward()

optimizer.step()

return loss.item()

trainer = Engine(train_update)

log_interval = 100

scheduler = CosineAnnealingScheduler(optimizer, 'lr', 1e-4, 1e-6, len(dataset))

trainer.add_event_handler(Events.ITERATION_STARTED, scheduler)

@trainer.on(Events.ITERATION_COMPLETED)

def log_training_loss(engine):

iter = (engine.state.iteration - 1) % len(dataset) + 1

if iter % log_interval == 0:

print('loss', engine.state.output)

trainer.run(dataset, max_epochs=100)

覆盖distributed_train.py

要启动多进程训练,我们需要运行以下命令:

%reset -f

!python -m torch.distributed.launch --nproc_per_node=4 distributed_train.py

所有的GPU都在训练这个网络。然而,它有几个问题:

1、由于没有模型序列化,因此不会保存经过训练的模型;

2、没有验证数据集来检查训练进度;

3、大部分时间都花在蒙特卡罗模拟上,因此训练速度较慢;

4、 我们使用几个路径(1024)作为每个期权参数集,这些参数集是噪声的,并且模型不能收敛到一个低成本值。

解决方案是将蒙特卡罗仿真数据保存在磁盘上。这允许我们:

1、为不同的模型使用相同的数据集,节省蒙特卡罗仿真时间

2、通过增加路径数量来生成更精确的定价数据

我们将使用CuPy来运行蒙特卡罗仿真,因为它是最有效的方法。使用前面文章中定义的OptionDataSet:

from cupy_dataset import OptionDataSet

为保存的数据文件和模型检查点创建目录:

!mkdir -p datafiles

!mkdir -p check_points

定义一个函数来生成数据集文件:

def gen_data(n_files = 630, options_per_file = 10000, seed=3):

counter = 0

ds = OptionDataSet(max_len=n_files * options_per_file, number_path=8192000, batch=1,

seed=seed)

x = []

y = []

for i in ds:

if counter!=0 and counter % options_per_file == 0:

filename = 'datafiles/'+str(seed) + '_' + str(counter//options_per_file) + '.pth'

state = (torch.cat(x, 0), torch.cat(y, 0))

torch.save(state, filename)

x = []

y = []

x.append(i[0].cpu())

y.append(i[1].cpu())

counter += 1

return seed

它将为每个文件生成包含 x 和 y 大小矩阵选项的文件,文件名采用seed_group.pth,我们可以测试运行n_files = 5 和options_per_file = 16。

gen_data(n_files=5, options_per_file = 16, seed=3)

X, Y = torch.load('datafiles/3_1.pth')

print(X)

print(Y)

在本文中,我们将使用DASK在多核 GPU上生成数据集:

import dask

import dask_cudf

from dask.delayed import delayed

from dask_cuda import LocalCUDACluster

cluster = LocalCUDACluster()

from dask.distributed import Client

client = Client(cluster)

client

下面的代码是一个在4个GPU上生成100x5x16个数据点示例。对于真正的深度学习模型训练,我们需要数以百万计的数据点。大家可以尝试将n_files和options_per_file更改为较大的数字。

futures = []

for i in range(0, 100):

future = client.submit(gen_data, 5, 16, i)

futures.append(future)

results = client.gather(futures)

一旦生成了数百万个数据点,我们就可以将这些数据点组合在一起,并将它们拆分为训练和验证数据集。

import pathlib

files = list(pathlib.Path('datafiles/').glob('*.pth'))

trn_size = int(len(files)*0.7)

trn_files = files[:trn_size]

val_files = files[trn_size:]

trn_x = []

trn_y = []

count = 0

for i in trn_files:

tensor = torch.load(i)

if count % 10 == 0:

print(count,'/',len(trn_files))

trn_x.append(tensor[0])

trn_y.append(tensor[1])

count += 1

X = torch.cat(trn_x)

Y = torch.cat(trn_y)

torch.save((X,Y), 'trn.pth')

val_x = []

val_y = []

count = 0

for i in val_files:

tensor = torch.load(i)

if count % 10 == 0:

print(count,'/',len(val_files))

val_x.append(tensor[0])

val_y.append(tensor[1])

count += 1

X = torch.cat(val_x)

Y = torch.cat(val_y)

torch.save((X,Y), 'val.pth')

我们创建了两个用于训练和验证的数据文件trn.pth和valn .pth。我们可以定义一个新的PyTorch数据集来从文件加载数据并将其写入文件。该数据集采用rank和world_size参数进行分布式训练。它将整个数据集加载到GPU内存中,并根据rank id对数据点进行采样,使得不同rank_id的数据集给出不同的数据。

%%writefile filedataset.py

import torch

class OptionDataSet(torch.utils.data.Dataset):

def __init__(self, filename, rank=0, world_size=5):

tensor = torch.load(filename)

self.tensor = (tensor[0].cuda(), tensor[1].cuda())

self.length = len(self.tensor[0]) // world_size

self.world_size = world_size

self.rank = rank

def __getitem__(self, index):

index = index * self.world_size + self.rank

return self.tensor[0][index], self.tensor[1][index]

def __len__(self):

return self.length

写入filedataset.py

在训练深度学习模型时,防止过拟合的一个有效方法是使用单独的验证数据集来监控样本外的性能 。当验证数据集的性能下降时,这意味着发生了过拟合,因此我们可以停止训练。我们把所有的东西放在一个脚本,可以在多个GPU有效地训练模型:

%%writefile distributed_training.py

import torch

from ignite.engine import Engine, Events

from torch.nn import MSELoss

from ignite.contrib.handlers.param_scheduler import CosineAnnealingScheduler

from apex import amp

import argparse

import os

from apex.parallel import DistributedDataParallel

import apex

from apex.optimizers import FusedLAMB

from model import Net

from filedataset import OptionDataSet

from ignite.metrics import MeanAbsoluteError

import ignite

import shutil

import torch.distributed as dist

parser = argparse.ArgumentParser()

parser.add_argument("--local_rank", default=0, type=int)

parser.add_argument("--path", default=None)

parser.add_argument("--mae_improv_tol", default=0.002, type=float)

args = parser.parse_args()

args.distributed = False

if 'WORLD_SIZE' in os.environ:

args.distributed = int(os.environ['WORLD_SIZE']) > 1

if args.distributed:

torch.cuda.set_device(args.local_rank)

torch.distributed.init_process_group(backend='nccl',

init_method='env://')

torch.backends.cudnn.benchmark = True

trn_dataset = OptionDataSet(filename='./trn.pth',

rank=dist.get_rank(),

world_size=int(os.environ['WORLD_SIZE']))

trn_dataset = torch.utils.data.DataLoader(trn_dataset,

batch_size=1024,

shuffle=True,

num_workers=0)

val_dataset = OptionDataSet(filename='./val.pth',

rank=dist.get_rank(),

world_size=int(os.environ['WORLD_SIZE']))

val_dataset = torch.utils.data.DataLoader(val_dataset,

batch_size=1024,

shuffle=False,

num_workers=0)

model = Net().cuda()

optimizer = FusedLAMB(model.parameters(), lr=1e-3)

loss_fn = MSELoss()

model = apex.parallel.convert_syncbn_model(model, channel_last=True)

model, optimizer = amp.initialize(model, optimizer, opt_level='O1')

best_mae = 100000

if args.path is not None:

def resume():

global best_mae

checkpoint = torch.load(args.path)

best_mae = checkpoint['best_mae']

model.load_state_dict(checkpoint['state_dict'])

amp.load_state_dict(checkpoint['amp'])

optimizer.load_state_dict(checkpoint['optimizer'])

resume()

if args.distributed:

model = DistributedDataParallel(model)

def train_update(engine, batch):

model.train()

optimizer.zero_grad()

x = batch[0]

y = batch[1]

y_pred = model(x)

loss = loss_fn(y, y_pred[:, 0])

with amp.scale_loss(loss, optimizer) as scaled_loss:

scaled_loss.backward()

optimizer.step()

return loss.item()

trainer = Engine(train_update)

log_interval = 500

scheduler = CosineAnnealingScheduler(optimizer, 'lr', 1e-5, 5e-6,

len(trn_dataset),

start_value_mult=0.999, end_value_mult=0.999,

save_history=False

)

trainer.add_event_handler(Events.ITERATION_STARTED, scheduler)

def save_checkpoint(state, is_best, filename='checkpoint.pth.tar'):

torch.save(state, filename)

if is_best:

shutil.copyfile(filename, 'check_points/model_best.pth.tar')

@trainer.on(Events.ITERATION_COMPLETED)

def log_training_loss(engine):

iter = (engine.state.iteration - 1) % len(trn_dataset) + 1

if iter % log_interval == 0:

print('loss', engine.state.output, 'iter', engine.state.iteration,

'lr', scheduler.get_param())

metric = MeanAbsoluteError()

loss_m = ignite.metrics.Loss(loss_fn)

def eval_update(engine, batch):

model.eval()

x = batch[0]

y = batch[1]

y_pred = model(x)

return y, y_pred[:, 0]

evaluator = Engine(eval_update)

metric.attach(evaluator, "MAE")

loss_m.attach(evaluator, "loss")

@trainer.on(Events.EPOCH_COMPLETED)

def log_evalnumber(engine):

global best_mae

mae_improv_tol = args.mae_improv_tol

evaluator.run(val_dataset, max_epochs=1)

metrics = evaluator.state.metrics

average_tensor = torch.tensor([metrics['MAE'], metrics['loss']]).cuda()

torch.distributed.reduce(average_tensor, 0, op=torch.distributed.ReduceOp.SUM)

torch.distributed.broadcast(average_tensor, 0)

average_tensor = average_tensor/int(os.environ['WORLD_SIZE'])

mae = average_tensor[0].item()

is_best = False

if (1 - mae / best_mae) >= mae_improv_tol or \

(engine.state.epoch == engine.state.max_epochs and

mae < best_mae):

best_mae = mae

is_best = True

# print("RANK {} Val Results - Epoch: {} Avg MAE: {:.5f} loss: {:.5f} BEST MAE: {:.5f}"

# .format(dist.get_rank(), trainer.state.epoch, metrics['MAE'], metrics['loss'], best_mae))

if dist.get_rank() == 0:

print('Epoch {}/{}'.format(engine.state.epoch, engine.state.max_epochs))

print('Best MAE Improvement Tolerance for checkpointing: {}%'.format(100 * mae_improv_tol))

print("RANK {} AVG {} NGPUs, best-mae: {:.5f} mae: {:.5f} loss: {:.5f}".format(

dist.get_rank(),

int(os.environ['WORLD_SIZE']),

best_mae,

average_tensor[0].item(),

average_tensor[1].item()))

fname = 'check_points/current_pth.tar'

if is_best:

save_checkpoint({'epoch': trainer.state.epoch,

'state_dict': model.module.state_dict(),

'best_mae': best_mae,

'optimizer': optimizer.state_dict(),

'amp': amp.state_dict()

}, is_best,

filename=fname)

inputs = torch.tensor([[110.0, 100.0, 120.0, 0.35, 0.1, 0.05]]).cuda()

res = model(inputs)

print('test one example:', res.item())

trainer.run(trn_dataset, max_epochs=2000)

覆盖distributed_training.py

与前面的代码相比,它有点复杂,因为:它处理验证数据集的评估

它将模型序列化到一个文件中,并根据MAE跟踪执行得最好的模型

它从文件中恢复训练

我们可以通过以下命令来启动分布式训练:

ngpus=!echo $(nvidia-smi -L | wc -l)

!python -m torch.distributed.launch --nproc_per_node={ngpus[0]} distributed_training.py

我们需要一些耐心来训练定价模型,直到它收敛。

6

推断和Greeks

一旦训练被聚合,执行得最好的模型就被保存到check_points/目录中。

为了得到一个好的模型,我们需要数百万个数据点来训练模型,直到它收敛。通常在一台8个GPU的DGX-1机器上需要10-20个小时。我们使用1000万个训练数据点和500万个验证数据点对模型进行训练。我们没有研究训练样本的最小数量是多少,只是简单地使用了大量的数据样本。大家可以通过使用更少的数据点来进行训练。

为了节省时间,可以运行以下命令下载权重并使用它们进行推断:

! ((test ! -f './check_points/model_best.pth.tar' || test ! -f './check_points/512/model_best.pth.tar') && \

bash ./download_data.sh) || echo "Dataset is already present. No need to re-download it."

数据集已经存在。不需要重新下载。

我们可以加载模型参数并使用它进行推断。

from model import Net

import torch

checkpoint = torch.load('check_points/model_best.pth.tar')

model = Net().cuda()

model.load_state_dict(checkpoint['state_dict'])

inputs = torch.tensor([[110.0, 100.0, 120.0, 0.35, 0.1, 0.05]]).cuda()

model(inputs)

tensor([[18.7140]], device='cuda:0', grad_fn=)

建立深度学习模型的好处之一是可以很容易地计算出Greeks。我们只需要利用Pytorch中的auto-grad特征。下面是一个计算多元多项式函数一阶微分的例子。

import torch

from torch.autograd import grad

'''

z = (xy)^2

x = 3, y =2

first order deriv [24 36]

'''

inputs = torch.tensor([3.0,2.0], requires_grad=True)

z = (inputs[0]*inputs[1])**2

first_order_grad = grad(z, inputs, create_graph=True)

print(first_order_grad)

(tensor([24., 36.], grad_fn=),)

我们可以使用grad函数来计算参数:K, B, S0, sigma, mu, r的一阶差分:

inputs = torch.tensor([[110.0, 100.0, 120.0, 0.35, 0.1, 0.05]]).cuda()

inputs.requires_grad = True

x = model(inputs)

x.backward()

first_order_gradient = inputs.grad

first_order_gradient

tensor([[-6.7092e-01, -2.1257e-02, 7.8896e-01, 1.9219e+01, 4.8331e+01,

-1.8419e+01]], device='cuda:0')

画出函数曲线:

%matplotlib inline

import pylab

import numpy as np

def compute_delta(S):

inputs = torch.tensor([[110.0, 100.0, S, 0.35, 0.1, 0.05]]).cuda()

inputs.requires_grad = True

x = model(inputs)

x.backward()

first_order_gradient = inputs.grad

return first_order_gradient[0][2]

prices = np.arange(10, 200, 0.1)

deltas = []

for p in prices:

deltas.append(compute_delta(p).item())

fig = pylab.plot(prices, deltas)

pylab.xlabel('prices')

pylab.ylabel('Delta')

fig

在PyTorch中计算二阶导数也很简单。我们只需要应用两次grad函数。下面是计算同一多项式函数的二阶导数的例子:

import torch

from torch.autograd import grad

'''

z = (xy)^2

x = 3, y =2

first order deriv [24 36]

d2z/dx2 = 8

d2z/dxdy = 24

d2z/dy2 = 18

'''

inputs = torch.tensor([3.0,2.0], requires_grad=True)

z = (inputs[0]*inputs[1])**2

first_order_grad = grad(z, inputs, create_graph=True)

second_order_grad_x, = grad(first_order_grad[0][0], inputs, retain_graph=True) #

second_order_grad_y, = grad(first_order_grad[0][1], inputs)

print(second_order_grad_x)

print(second_order_grad_y)

tensor([ 8., 24.])

tensor([24., 18.])

利用这个机制,我们可以在下面的例子中计算二阶导数:

,,,,,

import torch

from torch import Tensor

from torch.autograd import Variable

from torch.autograd import grad

from torch import nn

inputs = torch.tensor([[110.0, 100.0, 120.0, 0.35, 0.1, 0.05]]).cuda()

inputs.requires_grad = True

x = model(inputs)

loss_grads = grad(x, inputs, create_graph=True)

drv = grad(loss_grads[0][0][2], inputs)

drv

Out[9]:

(tensor([[-0.0143, 0.0039, 0.0098, -0.3183, 1.1455, -0.7876]],

device='cuda:0'),)

Gamma是S的二阶差分。 我们可以把Gamma曲线画成股票价格的函数:

import pylab

import numpy as np

def compute_gamma(S):

inputs = torch.tensor([[110.0, 100.0, S, 0.35, 0.1, 0.05]]).cuda()

inputs.requires_grad = True

x = model(inputs)

loss_grads = grad(x, inputs, create_graph=True)

drv = grad(loss_grads[0][0][2], inputs)

return drv[0][0][2]

prices = np.arange(10, 200, 0.1)

deltas = []

for p in prices:

deltas.append(compute_gamma(p).item())

fig2 = pylab.plot(prices, deltas)

pylab.xlabel('prices')

pylab.ylabel('Gamma')

fig2

隐含波动率是基于期权报价对标的资产的预测波动率。给出的模型是价格与期权参数的反向映射,用蒙特卡罗模拟法很难做到这一点。但如果我们有深度学习定价模型,这是一个简单的任务。我们可以先画出波动率和期权价格之间的关系:

import pylab

import numpy as np

def compute_price(sigma):

inputs = torch.tensor([[110.0, 100.0, 120.0, sigma, 0.1, 0.05]]).cuda()

x = model(inputs)

return x.item()

sigmas = np.arange(0, 0.5, 0.1)

prices = []

for s in sigmas:

prices.append(compute_price(s))

fig3 = pylab.plot(sigmas, prices)

pylab.xlabel('Sigma')

pylab.ylabel('Price')

fig3

给定价格P,隐含波动率是compute_price函数的根。我们可以用二分法求根。

def bisection_root(small, large, fun, target, EPS=1e-6):

if fun(large) - target < 0:

print('upper bound is too small')

return None

if fun(small) - target > 0:

print('lower bound is too large')

return None

while large - small > EPS:

mid = (large + small) / 2.0

if fun(mid) - target >= 0:

large = mid

else:

small = mid

mid = (large + small) / 2.0

return mid, abs(fun(mid) - target)

quoted_price = 16.0

sigma, err = bisection_root(0, 0.5, compute_price, quoted_price)

print('implied volativity', sigma, 'error', err)

implied volativity 0.18517351150512695 error 4.76837158203125e-06

蒙特卡罗模拟需要数以百万计的路径来得到精确的答案,这需要大量的计算。Ryan等人得研究表明,可以训练深度学习模型对衍生品进行估值。深度学习模型是准确和快速的,能够产生比传统模型快一百万倍的估值。在今天的推文中,我们将使用一个 全连接网络来学习亚式障碍期权的定价模式 。采用蒙特卡罗模拟作为训练的定价依据。我们使用与上一篇文章相同的亚式障碍期权模型,参数如下:gengT:到期如(年)

S:现货(美元)

K:Strike(美元)

sigma:波动率(per)

r:无风险利率(per)

mu:Drift Rate(per)

B:Barrier(美元)

下面的内容主要包括两个主题:使用蒙特卡罗定价动态数据集训练期权定价神的经网络模型。

使用蒙特卡罗定价静态数据集训练期权定价神经网络模型并进行推断。

2

批处理数据生成

数据集是深度学习训练的重要组成部分。我们将修改之前的单一亚式障碍期权定价代码来处理一批障碍期权定价。

加载库:

import cupy

import numpy as np

import math

import time

import torch

cupy.cuda.set_allocator(None)

from torch.utils.dlpack import from_dlpack

批量障碍期权定价模拟的CuPy版本如下:

cupy_batched_barrier_option = cupy.RawKernel(r'''

extern "C" __global__ void batched_barrier_option(

float *d_s,

const float T,

const float * K,

const float * B,

const float * S0,

const float * sigma,

const float * mu,

const float * r,

const float * d_normals,

const long N_STEPS,

const long N_PATHS,

const long N_BATCH)

{

unsigned idx = threadIdx.x + blockIdx.x * blockDim.x;

unsigned stride = blockDim.x * gridDim.x;

unsigned tid = threadIdx.x;

const float tmp3 = sqrt(T/N_STEPS);

for (unsigned i = idx; i

{

int batch_id = i / N_PATHS;

int path_id = i % N_PATHS;

float s_curr = S0[batch_id];

float tmp1 = mu[batch_id]*T/N_STEPS;

float tmp2 = exp(-r[batch_id]*T);

unsigned n=0;

double running_average = 0.0;

for(unsigned n = 0; n < N_STEPS; n++){

s_curr += tmp1 * s_curr + sigma[batch_id]*s_curr*tmp3*d_normals[path_id + batch_id * N_PATHS + n * N_PATHS * N_BATCH];

running_average += (s_curr - running_average) / (n + 1.0);

if (running_average <= B[batch_id]){

break;

}

}

float payoff = (running_average>K[batch_id] ? running_average-K[batch_id] : 0.f);

d_s[i] = tmp2 * payoff;

}

}

''', 'batched_barrier_option')

注意,参数(K, B, S0, sigma, mu, r)以批处理长度的数组形式传入。输出数组是一个1-D 的二维数组。第一个维度用于 Batch,第二个维度用于 Path。。

通过输入两组选项参数进行测试:

N_PATHS = 2048000

N_STEPS = 365

N_BATCH = 2

T = 1.0

K = cupy.array([110.0, 120.0], dtype=cupy.float32)

B = cupy.array([100.0, 90.0], dtype=cupy.float32)

S0 = cupy.array([120.0, 100.0], dtype=cupy.float32)

sigma = cupy.array([0.35, 0.2], dtype=cupy.float32)

mu = cupy.array([0.15, 0.1], dtype=cupy.float32)

r =cupy.array([0.05, 0.05], dtype=cupy.float32)

把这一切放进一个简单的函数来启动1GPU内核。每个Path的期权价格是相应路径terminal值的平均值。这可以很容易地通过Cupy函数平均值(axis=1)计算出来

def batch_run():

number_of_threads = 256

number_of_blocks = (N_PATHS * N_BATCH - 1) // number_of_threads + 1

randoms_gpu = cupy.random.normal(0, 1, N_BATCH*N_PATHS * N_STEPS, dtype=cupy.float32)

output = cupy.zeros(N_BATCH*N_PATHS, dtype=cupy.float32)

cupy.cuda.stream.get_current_stream().synchronize()

s = time.time()

cupy_batched_barrier_option((number_of_blocks,), (number_of_threads,),

(output, np.float32(T), K, B, S0, sigma, mu, r,

randoms_gpu, N_STEPS, N_PATHS, N_BATCH))

v = output.reshape(N_BATCH, N_PATHS).mean(axis=1)

cupy.cuda.stream.get_current_stream().synchronize()

e = time.time()

print('time', e-s, 'v',v)

batch_run()

time 0.013919591903686523 v [21.22405 0.8480416]

这将为66ms中的这两组期权参数生成21.22和0.848的期权价格。

它的工作效率很高,因此我们将构造一个OptionDataSet类来包装上面的代码,以便我们可以在Pytorch中使用它。对于每个下一个元素,生成指定范围内的均匀分布随机期权参数,启动GPU内核计算期权价格,通过DLPack将CuPy数组转换为带有zero-copy的Pytorch张量。请注意我们是如何实现iterable Dataset接口的:

class OptionDataSet(torch.utils.data.IterableDataset):

def __init__(self, max_len=10, number_path = 1000, batch=2, threads=256,seed=15):

self.num = 0

self.max_length = max_len

self.N_PATHS = number_path

self.N_STEPS = 365

self.N_BATCH = batch

self.T = np.float32(1.0)

self.output = cupy.zeros(self.N_BATCH*self.N_PATHS, dtype=cupy.float32)

self.number_of_blocks = (self.N_PATHS * self.N_BATCH - 1) // threads + 1

self.number_of_threads = threads

cupy.random.seed(seed)

def __len__(self):

return self.max_length

def __iter__(self):

self.num = 0

return self

def __next__(self):

if self.num > self.max_length:

raise StopIteration

X = cupy.random.rand(self.N_BATCH, 6, dtype=cupy.float32)

X = X * cupy.array([200.0, 0.99, 200.0, 0.4, 0.2, 0.2], dtype=cupy.float32)

X[:, 1] = X[:, 0] * X[:, 1]

randoms = cupy.random.normal(0, 1, self.N_BATCH * self.N_PATHS * self.N_STEPS, dtype=cupy.float32)

cupy_batched_barrier_option((self.number_of_blocks,), (self.number_of_threads,), (self.output, self.T, cupy.ascontiguousarray(X[:, 0]),

cupy.ascontiguousarray(X[:, 1]), cupy.ascontiguousarray(X[:, 2]), cupy.ascontiguousarray(X[:, 3]), cupy.ascontiguousarray(X[:, 4]), cupy.ascontiguousarray(X[:, 5]), randoms, self.N_STEPS, self.N_PATHS, self.N_BATCH))

Y = self.output.reshape(self.N_BATCH, self.N_PATHS).mean(axis=1)

self.num += 1

return (from_dlpack(X.toDlpack()), from_dlpack(Y.toDlpack()))

将所有与Pytorch数据集相关的内容都放到一个名为cupy_dataset.py的文件中:

%%writefile cupy_dataset.py

import cupy

import numpy as np

import torch

from torch.utils.dlpack import from_dlpack

cupy.cuda.set_allocator(None)

cupy_batched_barrier_option = cupy.RawKernel(r'''

extern "C" __global__ void batched_barrier_option(

float *d_s,

const float T,

const float * K,

const float * B,

const float * S0,

const float * sigma,

const float * mu,

const float * r,

const float * d_normals,

const long N_STEPS,

const long N_PATHS,

const long N_BATCH)

{

unsigned idx = threadIdx.x + blockIdx.x * blockDim.x;

unsigned stride = blockDim.x * gridDim.x;

unsigned tid = threadIdx.x;

const float tmp3 = sqrt(T/N_STEPS);

for (unsigned i = idx; i

{

int batch_id = i / N_PATHS;

int path_id = i % N_PATHS;

float s_curr = S0[batch_id];

float tmp1 = mu[batch_id]*T/N_STEPS;

float tmp2 = exp(-r[batch_id]*T);

unsigned n=0;

double running_average = 0.0;

for(unsigned n = 0; n < N_STEPS; n++){

s_curr += tmp1 * s_curr + sigma[batch_id]*s_curr*tmp3*d_normals[path_id + batch_id * N_PATHS + n * N_PATHS * N_BATCH];

running_average += (s_curr - running_average) / (n + 1.0);

if (running_average <= B[batch_id]){

break;

}

}

float payoff = (running_average>K[batch_id] ? running_average-K[batch_id] : 0.f);

d_s[i] = tmp2 * payoff;

}

}

''', 'batched_barrier_option')

class OptionDataSet(torch.utils.data.IterableDataset):

def __init__(self, max_len=10, number_path = 1000, batch=2, threads=256,seed=15):

self.num = 0

self.max_length = max_len

self.N_PATHS = number_path

self.N_STEPS = 365

self.N_BATCH = batch

self.T = np.float32(1.0)

self.output = cupy.zeros(self.N_BATCH*self.N_PATHS, dtype=cupy.float32)

self.number_of_blocks = (self.N_PATHS * self.N_BATCH - 1) // threads + 1

self.number_of_threads = threads

cupy.random.seed(seed)

def __len__(self):

return self.max_length

def __iter__(self):

self.num = 0

return self

def __next__(self):

if self.num > self.max_length:

raise StopIteration

X = cupy.random.rand(self.N_BATCH, 6, dtype=cupy.float32)

X = X * cupy.array([200.0, 0.99, 200.0, 0.4, 0.2, 0.2], dtype=cupy.float32)

X[:, 1] = X[:, 0] * X[:, 1]

randoms = cupy.random.normal(0, 1, self.N_BATCH * self.N_PATHS * self.N_STEPS, dtype=cupy.float32)

cupy_batched_barrier_option((self.number_of_blocks,), (self.number_of_threads,), (self.output, self.T, cupy.ascontiguousarray(X[:, 0]),

cupy.ascontiguousarray(X[:, 1]), cupy.ascontiguousarray(X[:, 2]), cupy.ascontiguousarray(X[:, 3]), cupy.ascontiguousarray(X[:, 4]), cupy.ascontiguousarray(X[:, 5]), randoms, self.N_STEPS, self.N_PATHS, self.N_BATCH))

Y = self.output.reshape(self.N_BATCH, self.N_PATHS).mean(axis=1)

self.num += 1

return (from_dlpack(X.toDlpack()), from_dlpack(Y.toDlpack()))

覆盖cupy_dataset.py

这里是一个测试代码样本,有10个数据点、 batch为 16:

ds = OptionDataSet(10, number_path=100000, batch=16, seed=15)

for i in ds:

print(i[1])

我们可以实现相同的代码使用Numba加速计算在GPU:

import numba

from numba import cuda

@cuda.jit

def batch_barrier_option(d_s, T, K, B, S0, sigma, mu, r, d_normals, N_STEPS, N_PATHS, N_BATCH):

ii = cuda.threadIdx.x + cuda.blockIdx.x * cuda.blockDim.x

stride = cuda.gridDim.x * cuda.blockDim.x

tmp3 = math.sqrt(T/N_STEPS)

for i in range(ii, N_PATHS * N_BATCH, stride):

batch_id = i // N_PATHS

path_id = i % N_PATHS

tmp1 = mu[batch_id]*T/N_STEPS

tmp2 = math.exp(-r[batch_id]*T)

running_average = 0.0

s_curr = S0[batch_id]

for n in range(N_STEPS):

s_curr += tmp1 * s_curr + sigma[batch_id]*s_curr*tmp3*d_normals[path_id + batch_id * N_PATHS + n * N_PATHS * N_BATCH]

running_average = running_average + 1.0/(n + 1.0) * (s_curr - running_average)

if i==0 and batch_id == 2:

print(s_curr)

if running_average <= B[batch_id]:

break

payoff = running_average - K[batch_id] if running_average > K[batch_id] else 0

d_s[i] = tmp2 * payoff

class NumbaOptionDataSet(object):

def __init__(self, max_len=10, number_path = 1000, batch=2, threads=512, seed=15):

self.num = 0

self.max_length = max_len

self.N_PATHS = number_path

self.N_STEPS = 365

self.N_BATCH = batch

self.T = np.float32(1.0)

self.output = cupy.zeros(self.N_BATCH*self.N_PATHS, dtype=cupy.float32)

self.number_of_blocks = (self.N_PATHS * self.N_BATCH - 1) // threads + 1

self.number_of_threads = threads

cupy.random.seed(seed)

def __len__(self):

return self.max_length

def __iter__(self):

self.num = 0

return self

def __next__(self):

if self.num > self.max_length:

raise StopIteration

X = cupy.random.rand(self.N_BATCH, 6, dtype=cupy.float32)

X = X * cupy.array([200.0, 0.99, 200.0, 0.4, 0.2, 0.2], dtype=cupy.float32)

X[:, 1] = X[:, 0] * X[:, 1]

randoms = cupy.random.normal(0, 1, self.N_BATCH * self.N_PATHS * self.N_STEPS, dtype=cupy.float32)

batch_barrier_option[(self.number_of_blocks,), (self.number_of_threads,)](self.output, self.T, X[:, 0],

X[:, 1], X[:, 2], X[:, 3], X[:, 4], X[:, 5], randoms, self.N_STEPS, self.N_PATHS, self.N_BATCH)

o = self.output.reshape(self.N_BATCH, self.N_PATHS)

Y = o.mean(axis = 1)

self.num += 1

return (from_dlpack(X.toDlpack()), from_dlpack(Y.toDlpack()))

ds = NumbaOptionDataSet(10, number_path=100000, batch=16, seed=15)

for i in ds:

print(i[1])

3

模型

为了将期权参数映射到价格,我们使用了6层全连接神经网络,其隐含维度为512。将此DL价格模型写入model.py:

%%writefile model.py

import torch.nn as nn

import torch.nn.functional as F

import torch

class Net(nn.Module):

def __init__(self, hidden=1024):

super(Net, self).__init__()

self.fc1 = nn.Linear(6, hidden)

self.fc2 = nn.Linear(hidden, hidden)

self.fc3 = nn.Linear(hidden, hidden)

self.fc4 = nn.Linear(hidden, hidden)

self.fc5 = nn.Linear(hidden, hidden)

self.fc6 = nn.Linear(hidden, 1)

self.register_buffer('norm',

torch.tensor([200.0,

198.0,

200.0,

0.4,

0.2,

0.2]))

def forward(self, x):

x = x / self.norm

x = F.elu(self.fc1(x))

x = F.elu(self.fc2(x))

x = F.elu(self.fc3(x))

x = F.elu(self.fc4(x))

x = F.elu(self.fc5(x))

return self.fc6(x)

覆盖model.py

输入参数首先通过除以(200.0,198.0,200.0,0.4,0.2,0.2)缩小到0-1范围。然后在ELu激活函数后,将其5次隐射到隐藏维度512。选择ELu是因为我们需要计算参数的二阶微分。如果使用ReLu,二阶微分总是0。最后一层是线性层,它将隐藏维度映射到预测的期权价格。

在训练方面,我们使用了一个高级库Ignite来训练PyTorch中的神经网络:

我们使用MSELoss作为损失函数,Adam作为优化器,CosineAnnealingScheduler作为学习率调度器。下面的代码将随机期权数据提供给定价模型进行训练。

from ignite.engine import Engine, Events

from ignite.handlers import Timer

from torch.nn import MSELoss

from torch.optim import Adam

from ignite.contrib.handlers.param_scheduler import CosineAnnealingScheduler

from ignite.handlers import ModelCheckpoint

from model import Net

from cupy_dataset import OptionDataSet

timer = Timer(average=True)

model = Net().cuda()

loss_fn = MSELoss()

optimizer = Adam(model.parameters(), lr=1e-3)

dataset = OptionDataSet(max_len=10000, number_path = 1024, batch=4800)

def train_update(engine, batch):

model.train()

optimizer.zero_grad()

x = batch[0]

y = batch[1]

y_pred = model(x)

loss = loss_fn(y_pred[:,0], y)

loss.backward()

optimizer.step()

return loss.item()

trainer = Engine(train_update)

log_interval = 100

scheduler = CosineAnnealingScheduler(optimizer, 'lr', 1e-4, 1e-6, len(dataset))

trainer.add_event_handler(Events.ITERATION_STARTED, scheduler)

timer.attach(trainer,

start=Events.EPOCH_STARTED,

resume=Events.ITERATION_STARTED,

pause=Events.ITERATION_COMPLETED,

step=Events.ITERATION_COMPLETED)

@trainer.on(Events.ITERATION_COMPLETED)

def log_training_loss(engine):

iter = (engine.state.iteration - 1) % len(dataset) + 1

if iter % log_interval == 0:

print('loss', engine.state.output, 'average time', timer.value())

trainer.run(dataset, max_epochs=100)

损失在不断减少,这意味着定价模型可以更好地预测期权价格。平均计算一个批大小(mini-batch)量需要花费大约12ms,在接下的文章中,我们将尝试挖掘GPU的全部潜力来加速训练。

4

TensorCore混合精度训练

V100 GPU有640个张量核,可以加速半精度矩阵乘法运算,这是DL模型的核心运算。由NVIDIA开发的Apex库( https://github.com/NVIDIA/apex )使Pytorch中的混合精度和分布式训练变得容易。通过改变3行代码,可以利用张量核加速训练。

from apex import amp

from ignite.engine import Engine, Events

from torch.nn import MSELoss

from ignite.handlers import Timer

from torch.optim import Adam

from ignite.contrib.handlers.param_scheduler import CosineAnnealingScheduler

from ignite.handlers import ModelCheckpoint

from model import Net

from cupy_dataset import OptionDataSet

timer = Timer(average=True)

model = Net().cuda()

loss_fn = MSELoss()

optimizer = Adam(model.parameters(), lr=1e-3)

opt_level = 'O1'

model, optimizer = amp.initialize(model, optimizer, opt_level=opt_level)

dataset = OptionDataSet(max_len=10000, number_path = 1024, batch=4800)

def train_update(engine, batch):

model.train()

optimizer.zero_grad()

x = batch[0]

y = batch[1]

y_pred = model(x)

loss = loss_fn(y_pred[:,0], y)

with amp.scale_loss(loss, optimizer) as scaled_loss:

scaled_loss.backward()

optimizer.step()

return loss.item()

trainer = Engine(train_update)

log_interval = 100

timer.attach(trainer,

start=Events.EPOCH_STARTED,

resume=Events.ITERATION_STARTED,

pause=Events.ITERATION_COMPLETED,

step=Events.ITERATION_COMPLETED)

scheduler = CosineAnnealingScheduler(optimizer, 'lr', 1e-4, 1e-6, len(dataset))

trainer.add_event_handler(Events.ITERATION_STARTED, scheduler)

@trainer.on(Events.ITERATION_COMPLETED)

def log_training_loss(engine):

iter = (engine.state.iteration - 1) % len(dataset) + 1

if iter % log_interval == 0:

print('loss', engine.state.output, 'average time', timer.value())

trainer.run(dataset, max_epochs=100)

它改进了以8ms计算每个 mini-batch 。为了获得更好的性能,我们将模型权值降低到半精度,因此需要调整损失以确保半精度动态范围与计算结果一致。它是猜测什么是正确的损失比例因子,并自动调整,如果梯度溢出。最后,在保持模型预测精度的前提下,获得最佳的硬件加速性能。

5

多个GPU训练

Apex让多GPU训练变得容易。在同一个训练脚本中,我们需要注意一些额外的步骤:

1、添加参数——local_rank,该参数将由分布式启动程序自动设置。

2、初始化进程组。

3、根据数据集中的进程id生成独立的批处理数据。

4、包装模型和优化器来处理分布式计算。

下面的代码是一个在4个GPU上生成100x5x16个数据点示例。对于真正的深度学习模型训练,我们需要数以百万计的数据点。大家可以尝试将n_files和options_per_file更改为较大的数字。

futures = []

for i in range(0, 100):

future = client.submit(gen_data, 5, 16, i)

futures.append(future)

results = client.gather(futures)

一旦生成了数百万个数据点,我们就可以将这些数据点组合在一起,并将它们拆分为训练和验证数据集。

写入filedataset.py

在训练深度学习模型时,防止过拟合的一个有效方法是使用单独的验证数据集来监控样本外的性能 。当验证数据集的性能下降时,这意味着发生了过拟合,因此我们可以停止训练。我们把所有的东西放在一个脚本,可以在多个GPU有效地训练模型:

python barrier option pricing_Python王牌加速库:深度学习下的障碍期权定价!相关推荐

  1. python barrier option pricing_Python王牌加速库深度学习下的障碍期权定价

    Python王牌加速库深度学习下的障碍期权定价 Python王牌加速库:深度学习下的障碍期权定价! 蒙特卡罗模拟需要数以百万计的路径来得到精确的答案,这需要大量的计算.Ryan等人得研究表明,可以训练 ...

  2. 障碍期权定价 python_Python王牌加速库2:深度学习下的障碍期权定价

    作者:Yi Dong 编译:1+1=6 1.前言 上一期推文中,我们使用了Numba和CuPy来运行蒙特卡罗模拟来确定亚式障碍期权的价格.Python王牌加速库:奇异期权定价的利器​mp.weixin ...

  3. python barrier option pricing_《Python金融数据分析》书内代码实战与讲解(二)金融衍生物定价...

    4.2.1 欧式期权定价 先编写StockOption类,存放需要的参数. """ Store common attributes of a stock option & ...

  4. 模型加速:深度学习模型的硬件加速:NVIDIAT240

    作者:禅与计算机程序设计艺术 模型加速:深度学习模型的硬件加速:NVIDIA T240 在当前深度学习模型的规模和复杂度不断增加的情况下,硬件加速已经成为一个重要的技术手段.本文将介绍NVIDIA T ...

  5. Alluxio 助力 Kubernetes,加速云端深度学习

    作者 |  车漾  阿里云高级技术专家 范斌  Alluxio 创始成员,开源社区副总裁 来源 | 阿里巴巴云原生公众号 为什么要加速云端深度学习 人工智能是近几年非常火热的技术领域,而推动这个领域快 ...

  6. 北京 | 免费高效训练及OpenVINO™加速推理深度学习实战,送Intel神经计算棒二代...

    当今人工智能时代,深度学习极大得促进了计算机视觉技术的快速应用和成熟,也是算法工程师们必须掌握的一项技能,然而,不同环境的依赖部署,高算力的需求,海量数据量需求及算法应用高硬件成本也让深度学习陷入了规 ...

  7. 回归素材(part9)--PYTHON机器学习手册-从数据预处理到深度学习

    学习笔记,仅供参考,有错必纠 PYTHON机器学习手册-从数据预处理到深度学习 通过正则化减少方差 我们可以使用岭回归或者Lasso回归,介绍回归模型的方差.

  8. python人工智能课程实例_python人工智能AI深度学习/机器学习全套课程 视频教程+ppt+代码...

    这是一套Python/人工智能/AI/机器学习/深度学习 全套实战课程,包含视频教程以及文档.源码等,欢迎下载 01. python数据分析与机器学习实战 02.深度学习入门视频课程(上篇) 03.深 ...

  9. 阿里云异构计算平台——加速AI深度学习创新

    云栖TechDay第36期,阿里云高级产品专家霁荣带来"阿里云异构计算平台--加速AI深度学习创新"的演讲.本文主要从深度学习催生强大计算力需求开始谈起,包括GPU的适用场景,进而 ...

最新文章

  1. “汇新杯”新兴科技+互联网创新大赛青年创客专项赛即将截止报名
  2. 一次给女朋友转账引发我对分布式事务的思考
  3. awesome字体图标库
  4. C# 实现将网络资源保存到本地
  5. 【Linux】awk指令
  6. 各种池化操作(包括组合池化)
  7. php如何上传txt文件,并且读取txt文件
  8. android手机上传不了图片,【报Bug】nvue页面使用web-view组件,安卓手机无法调用页面的input标签上传图片...
  9. mall商城 -小程序,h5和pc vue前后端分离
  10. 服务器怎么与plc通讯协议,PLC如何与云服务器通讯
  11. 贪心法 第3关:将真分数用埃及分数之和表示
  12. 机智云IOT软件平台受邀参展STM32全国巡回研讨会/中国电信天翼智能生态博览会/签署“5G+天翼云+AI”战略合作
  13. edge浏览器如何把网页放到桌面_win10怎么把网页放在桌面上
  14. MPLAB-IDE-C语言编程代码实例-分析
  15. Docker swarm Docker stack
  16. 应对不确定性的一个有效手段是重塑企业的使命、愿景和价值观。
  17. vue使用element-ui的el-date-picker设置样式无效
  18. linux下的3A游戏,游知有味:任天堂的游戏都不算?什么才叫3A级大作
  19. 拜托,学妹,别再问我怎么自学 Java 了!和盘托出
  20. JQuery--事件总结

热门文章

  1. 导出GMS计算结果,并进行分类汇总
  2. 固件远程更新之STARTUPE2原语(fpga控制flash)
  3. 查看计算机桌面的命令行,Win7命令在哪 win7命令提示符怎么打开
  4. ArcGIS Pro使用路径分析图层计算OD成本矩阵
  5. 如何用公式编辑器打大大于符号?
  6. 现货伦敦金分析中如何应用支撑与压力位
  7. tar -xzvf  *.tar.gz  简单说明
  8. android遍历的方法,android中遍历arrayList的四种方法
  9. 用Qt将一组静态连续图片制作成动图(定时器和QPixmap实现)
  10. 数学建模的论文写作问题