python barrier option pricing_Python王牌加速库深度学习下的障碍期权定价
Python王牌加速库深度学习下的障碍期权定价
Python王牌加速库:深度学习下的障碍期权定价!
蒙特卡罗模拟需要数以百万计的路径来得到精确的答案,这需要大量的计算。Ryan等人得研究表明,可以训练深度学习模型对衍生品进行估值。深度学习模型是准确和快速的,能够产生比传统模型快一百万倍的估值。在今天的推文中,我们将使用一个 全连接网络来学习亚式障碍期权的定价模式 。采用蒙特卡罗模拟作为训练的定价依据。我们使用与上一篇文章相同的亚式障碍期权模型,参数如下:
T:到期如(年)
S:现货(美元)
K:Strike(美元)
sigma:波动率(per)
r:无风险利率(per)
mu:Drift Rate(per)
B:Barrier(美元)
下面的内容主要包括两个主题:
使用蒙特卡罗定价动态数据集训练期权定价神的经网络模型。
使用蒙特卡罗定价静态数据集训练期权定价神经网络模型并进行推断。
2
批处理数据生成
数据集是深度学习训练的重要组成部分。我们将修改之前的单一亚式障碍期权定价代码来处理一批障碍期权定价。
加载库:
import cupy
import numpy as np
import math
import time
import torch
cupy.cuda.set_allocator(None)
from torch.utils.dlpack import from_dlpack
批量障碍期权定价模拟的CuPy版本如下:
cupy_batched_barrier_option = cupy.RawKernel(r'''
extern "C" __global__ void batched_barrier_option(
float *d_s,
const float T,
const float * K,
const float * B,
const float * S0,
const float * sigma,
const float * mu,
const float * r,
const float * d_normals,
const long N_STEPS,
const long N_PATHS,
const long N_BATCH)
{
unsigned idx = threadIdx.x + blockIdx.x * blockDim.x;
unsigned stride = blockDim.x * gridDim.x;
unsigned tid = threadIdx.x;
const float tmp3 = sqrt(T/N_STEPS);
for (unsigned i = idx; i
{
int batch_id = i / N_PATHS;
int path_id = i % N_PATHS;
float s_curr = S0[batch_id];
float tmp1 = mu[batch_id]*T/N_STEPS;
float tmp2 = exp(-r[batch_id]*T);
unsigned n=0;
double running_average = 0.0;
for(unsigned n = 0; n < N_STEPS; n++){
s_curr += tmp1 * s_curr + sigma[batch_id]*s_curr*tmp3*d_normals[path_id + batch_id * N_PATHS + n * N_PATHS * N_BATCH];
running_average += (s_curr - running_average) / (n + 1.0);
if (running_average <= B[batch_id]){
break;
}
}
float payoff = (running_average>K[batch_id] ? running_average-K[batch_id] : 0.f);
d_s[i] = tmp2 * payoff;
}
}
''', 'batched_barrier_option')
注意,参数(K, B, S0, sigma, mu, r)以批处理长度的数组形式传入。输出数组是一个1-D 的二维数组。第一个维度用于 Batch,第二个维度用于 Path。。
通过输入两组选项参数进行测试:
N_PATHS = 2048000
N_STEPS = 365
N_BATCH = 2
T = 1.0
K = cupy.array([110.0, 120.0], dtype=cupy.float32)
B = cupy.array([100.0, 90.0], dtype=cupy.float32)
S0 = cupy.array([120.0, 100.0], dtype=cupy.float32)
sigma = cupy.array([0.35, 0.2], dtype=cupy.float32)
mu = cupy.array([0.15, 0.1], dtype=cupy.float32)
r =cupy.array([0.05, 0.05], dtype=cupy.float32)
把这一切放进一个简单的函数来启动1GPU内核。每个Path的期权价格是相应路径terminal值的平均值。这可以很容易地通过Cupy函数平均值(axis=1)计算出来
def batch_run():
number_of_threads = 256
number_of_blocks = (N_PATHS * N_BATCH - 1) // number_of_threads + 1
randoms_gpu = cupy.random.normal(0, 1, N_BATCH*N_PATHS * N_STEPS, dtype=cupy.float32)
output = cupy.zeros(N_BATCH*N_PATHS, dtype=cupy.float32)
cupy.cuda.stream.get_current_stream().synchronize()
s = time.time()
cupy_batched_barrier_option((number_of_blocks,), (number_of_threads,),
(output, np.float32(T), K, B, S0, sigma, mu, r,
randoms_gpu, N_STEPS, N_PATHS, N_BATCH))
v = output.reshape(N_BATCH, N_PATHS).mean(axis=1)
cupy.cuda.stream.get_current_stream().synchronize()
e = time.time()
print('time', e-s, 'v',v)
batch_run()
time 0.013919591903686523 v [21.22405 0.8480416]
这将为66ms中的这两组期权参数生成21.22和0.848的期权价格。
它的工作效率很高,因此我们将构造一个OptionDataSet类来包装上面的代码,以便我们可以在Pytorch中使用它。对于每个下一个元素,生成指定范围内的均匀分布随机期权参数,启动GPU内核计算期权价格,通过DLPack将CuPy数组转换为带有zero-copy的Pytorch张量。请注意我们是如何实现iterable Dataset接口的:
class OptionDataSet(torch.utils.data.IterableDataset):
def __init__(self, max_len=10, number_path = 1000, batch=2, threads=256,seed=15):
self.num = 0
self.max_length = max_len
self.N_PATHS = number_path
self.N_STEPS = 365
self.N_BATCH = batch
self.T = np.float32(1.0)
self.output = cupy.zeros(self.N_BATCH*self.N_PATHS, dtype=cupy.float32)
self.number_of_blocks = (self.N_PATHS * self.N_BATCH - 1) // threads + 1
self.number_of_threads = threads
cupy.random.seed(seed)
def __len__(self):
return self.max_length
def __iter__(self):
self.num = 0
return self
def __next__(self):
if self.num > self.max_length:
raise StopIteration
X = cupy.random.rand(self.N_BATCH, 6, dtype=cupy.float32)
X = X * cupy.array([200.0, 0.99, 200.0, 0.4, 0.2, 0.2], dtype=cupy.float32)
X[:, 1] = X[:, 0] * X[:, 1]
randoms = cupy.random.normal(0, 1, self.N_BATCH * self.N_PATHS * self.N_STEPS, dtype=cupy.float32)
cupy_batched_barrier_option((self.number_of_blocks,), (self.number_of_threads,), (self.output, self.T, cupy.ascontiguousarray(X[:, 0]),
cupy.ascontiguousarray(X[:, 1]), cupy.ascontiguousarray(X[:, 2]), cupy.ascontiguousarray(X[:, 3]), cupy.ascontiguousarray(X[:, 4]), cupy.ascontiguousarray(X[:, 5]), randoms, self.N_STEPS, self.N_PATHS, self.N_BATCH))
Y = self.output.reshape(self.N_BATCH, self.N_PATHS).mean(axis=1)
self.num += 1
return (from_dlpack(X.toDlpack()), from_dlpack(Y.toDlpack()))
将所有与Pytorch数据集相关的内容都放到一个名为cupy_dataset.py的文件中:
%%writefile cupy_dataset.py
import cupy
import numpy as np
import torch
from torch.utils.dlpack import from_dlpack
cupy.cuda.set_allocator(None)
cupy_batched_barrier_option = cupy.RawKernel(r'''
extern "C" __global__ void batched_barrier_option(
float *d_s,
const float T,
const float * K,
const float * B,
const float * S0,
const float * sigma,
const float * mu,
const float * r,
const float * d_normals,
const long N_STEPS,
const long N_PATHS,
const long N_BATCH)
{
unsigned idx = threadIdx.x + blockIdx.x * blockDim.x;
unsigned stride = blockDim.x * gridDim.x;
unsigned tid = threadIdx.x;
const float tmp3 = sqrt(T/N_STEPS);
for (unsigned i = idx; i
{
int batch_id = i / N_PATHS;
int path_id = i % N_PATHS;
float s_curr = S0[batch_id];
float tmp1 = mu[batch_id]*T/N_STEPS;
float tmp2 = exp(-r[batch_id]*T);
unsigned n=0;
double running_average = 0.0;
for(unsigned n = 0; n < N_STEPS; n++){
s_curr += tmp1 * s_curr + sigma[batch_id]*s_curr*tmp3*d_normals[path_id + batch_id * N_PATHS + n * N_PATHS * N_BATCH];
running_average += (s_curr - running_average) / (n + 1.0);
if (running_average <= B[batch_id]){
break;
}
}
float payoff = (running_average>K[batch_id] ? running_average-K[batch_id] : 0.f);
d_s[i] = tmp2 * payoff;
}
}
''', 'batched_barrier_option')
class OptionDataSet(torch.utils.data.IterableDataset):
def __init__(self, max_len=10, number_path = 1000, batch=2, threads=256,seed=15):
self.num = 0
self.max_length = max_len
self.N_PATHS = number_path
self.N_STEPS = 365
self.N_BATCH = batch
self.T = np.float32(1.0)
self.output = cupy.zeros(self.N_BATCH*self.N_PATHS, dtype=cupy.float32)
self.number_of_blocks = (self.N_PATHS * self.N_BATCH - 1) // threads + 1
self.number_of_threads = threads
cupy.random.seed(seed)
def __len__(self):
return self.max_length
def __iter__(self):
self.num = 0
return self
def __next__(self):
if self.num > self.max_length:
raise StopIteration
X = cupy.random.rand(self.N_BATCH, 6, dtype=cupy.float32)
X = X * cupy.array([200.0, 0.99, 200.0, 0.4, 0.2, 0.2], dtype=cupy.float32)
X[:, 1] = X[:, 0] * X[:, 1]
randoms = cupy.random.normal(0, 1, self.N_BATCH * self.N_PATHS * self.N_STEPS, dtype=cupy.float32)
cupy_batched_barrier_option((self.number_of_blocks,), (self.number_of_threads,), (self.output, self.T, cupy.ascontiguousarray(X[:, 0]),
cupy.ascontiguousarray(X[:, 1]), cupy.ascontiguousarray(X[:, 2]), cupy.ascontiguousarray(X[:, 3]), cupy.ascontiguousarray(X[:, 4]), cupy.ascontiguousarray(X[:, 5]), randoms, self.N_STEPS, self.N_PATHS, self.N_BATCH))
Y = self.output.reshape(self.N_BATCH, self.N_PATHS).mean(axis=1)
self.num += 1
return (from_dlpack(X.toDlpack()), from_dlpack(Y.toDlpack()))
覆盖cupy_dataset.py
这里是一个测试代码样本,有10个数据点、 batch为 16:
ds = OptionDataSet(10, number_path=100000, batch=16, seed=15)
for i in ds:
print(i[1])
我们可以实现相同的代码使用Numba加速计算在GPU:
import numba
from numba import cuda
@cuda.jit
def batch_barrier_option(d_s, T, K, B, S0, sigma, mu, r, d_normals, N_STEPS, N_PATHS, N_BATCH):
ii = cuda.threadIdx.x + cuda.blockIdx.x * cuda.blockDim.x
stride = cuda.gridDim.x * cuda.blockDim.x
tmp3 = math.sqrt(T/N_STEPS)
for i in range(ii, N_PATHS * N_BATCH, stride):
batch_id = i // N_PATHS
path_id = i % N_PATHS
tmp1 = mu[batch_id]*T/N_STEPS
tmp2 = math.exp(-r[batch_id]*T)
running_average = 0.0
s_curr = S0[batch_id]
for n in range(N_STEPS):
s_curr += tmp1 * s_curr + sigma[batch_id]*s_curr*tmp3*d_normals[path_id + batch_id * N_PATHS + n * N_PATHS * N_BATCH]
running_average = running_average + 1.0/(n + 1.0) * (s_curr - running_average)
if i==0 and batch_id == 2:
print(s_curr)
if running_average <= B[batch_id]:
break
payoff = running_average - K[batch_id] if running_average > K[batch_id] else 0
d_s[i] = tmp2 * payoff
class NumbaOptionDataSet(object):
def __init__(self, max_len=10, number_path = 1000, batch=2, threads=512, seed=15):
self.num = 0
self.max_length = max_len
self.N_PATHS = number_path
self.N_STEPS = 365
self.N_BATCH = batch
self.T = np.float32(1.0)
self.output = cupy.zeros(self.N_BATCH*self.N_PATHS, dtype=cupy.float32)
self.number_of_blocks = (self.N_PATHS * self.N_BATCH - 1) // threads + 1
self.number_of_threads = threads
cupy.random.seed(seed)
def __len__(self):
return self.max_length
def __iter__(self):
self.num = 0
return self
def __next__(self):
if self.num > self.max_length:
raise StopIteration
X = cupy.random.rand(self.N_BATCH, 6, dtype=cupy.float32)
X = X * cupy.array([200.0, 0.99, 200.0, 0.4, 0.2, 0.2], dtype=cupy.float32)
X[:, 1] = X[:, 0] * X[:, 1]
randoms = cupy.random.normal(0, 1, self.N_BATCH * self.N_PATHS * self.N_STEPS, dtype=cupy.float32)
batch_barrier_option[(self.number_of_blocks,), (self.number_of_threads,)](self.output, self.T, X[:, 0],
X[:, 1], X[:, 2], X[:, 3], X[:, 4], X[:, 5], randoms, self.N_STEPS, self.N_PATHS, self.N_BATCH)
o = self.output.reshape(self.N_BATCH, self.N_PATHS)
Y = o.mean(axis = 1)
self.num += 1
return (from_dlpack(X.toDlpack()), from_dlpack(Y.toDlpack()))
ds = NumbaOptionDataSet(10, number_path=100000, batch=16, seed=15)
for i in ds:
print(i[1])
3
模型
为了将期权参数映射到价格,我们使用了6层全连接神经网络,其隐含维度为512。将此DL价格模型写入model.py:
%%writefile model.py
import torch.nn as nn
import torch.nn.functional as F
import torch
class Net(nn.Module):
def __init__(self, hidden=1024):
super(Net, self).__init__()
self.fc1 = nn.Linear(6, hidden)
self.fc2 = nn.Linear(hidden, hidden)
self.fc3 = nn.Linear(hidden, hidden)
self.fc4 = nn.Linear(hidden, hidden)
self.fc5 = nn.Linear(hidden, hidden)
self.fc6 = nn.Linear(hidden, 1)
self.register_buffer('norm',
torch.tensor([200.0,
198.0,
200.0,
0.4,
0.2,
0.2]))
def forward(self, x):
x = x / self.norm
x = F.elu(self.fc1(x))
x = F.elu(self.fc2(x))
x = F.elu(self.fc3(x))
x = F.elu(self.fc4(x))
x = F.elu(self.fc5(x))
return self.fc6(x)
覆盖model.py
输入参数首先通过除以(200.0,198.0,200.0,0.4,0.2,0.2)缩小到0-1范围。然后在ELu激活函数后,将其5次隐射到隐藏维度512。选择ELu是因为我们需要计算参数的二阶微分。如果使用ReLu,二阶微分总是0。最后一层是线性层,它将隐藏维度映射到预测的期权价格。
在训练方面,我们使用了一个高级库Ignite来训练PyTorch中的神经网络:
https://github.com/pytorch/ignite
我们使用MSELoss作为损失函数,Adam作为优化器,CosineAnnealingScheduler作为学习率调度器。下面的代码将随机期权数据提供给定价模型进行训练。
from ignite.engine import Engine, Events
from ignite.handlers import Timer
from torch.nn import MSELoss
from torch.optim import Adam
from ignite.contrib.handlers.param_scheduler import CosineAnnealingScheduler
from ignite.handlers import ModelCheckpoint
from model import Net
from cupy_dataset import OptionDataSet
timer = Timer(average=True)
model = Net().cuda()
loss_fn = MSELoss()
optimizer = Adam(model.parameters(), lr=1e-3)
dataset = OptionDataSet(max_len=10000, number_path = 1024, batch=4800)
def train_update(engine, batch):
model.train()
optimizer.zero_grad()
x = batch[0]
y = batch[1]
y_pred = model(x)
loss = loss_fn(y_pred[:,0], y)
loss.backward()
optimizer.step()
return loss.item()
trainer = Engine(train_update)
log_interval = 100
scheduler = CosineAnnealingScheduler(optimizer, 'lr', 1e-4, 1e-6, len(dataset))
trainer.add_event_handler(Events.ITERATION_STARTED, scheduler)
timer.attach(trainer,
start=Events.EPOCH_STARTED,
resume=Events.ITERATION_STARTED,
pause=Events.ITERATION_COMPLETED,
step=Events.ITERATION_COMPLETED)
@trainer.on(Events.ITERATION_COMPLETED)
def log_training_loss(engine):
iter = (engine.state.iteration - 1) % len(dataset) + 1
if iter % log_interval == 0:
print('loss', engine.state.output, 'average time', timer.value())
trainer.run(dataset, max_epochs=100)
损失在不断减少,这意味着定价模型可以更好地预测期权价格。平均计算一个批大小(mini-batch)量需要花费大约12ms,在接下的文章中,我们将尝试挖掘GPU的全部潜力来加速训练。
4
TensorCore混合精度训练
V100 GPU有640个张量核,可以加速半精度矩阵乘法运算,这是DL模型的核心运算。由NVIDIA开发的Apex库( https://github.com/NVIDIA/apex )使Pytorch中的混合精度和分布式训练变得容易。通过改变3行代码,可以利用张量核加速训练。
from apex import amp
from ignite.engine import Engine, Events
from torch.nn import MSELoss
from ignite.handlers import Timer
from torch.optim import Adam
from ignite.contrib.handlers.param_scheduler import CosineAnnealingScheduler
from ignite.handlers import ModelCheckpoint
from model import Net
from cupy_dataset import OptionDataSet
timer = Timer(average=True)
model = Net().cuda()
loss_fn = MSELoss()
optimizer = Adam(model.parameters(), lr=1e-3)
opt_level = 'O1'
model, optimizer = amp.initialize(model, optimizer, opt_level=opt_level)
dataset = OptionDataSet(max_len=10000, number_path = 1024, batch=4800)
def train_update(engine, batch):
model.train()
optimizer.zero_grad()
x = batch[0]
y = batch[1]
y_pred = model(x)
loss = loss_fn(y_pred[:,0], y)
with amp.scale_loss(loss, optimizer) as scaled_loss:
scaled_loss.backward()
optimizer.step()
return loss.item()
trainer = Engine(train_update)
log_interval = 100
timer.attach(trainer,
start=Events.EPOCH_STARTED,
resume=Events.ITERATION_STARTED,
pause=Events.ITERATION_COMPLETED,
step=Events.ITERATION_COMPLETED)
scheduler = CosineAnnealingScheduler(optimizer, 'lr', 1e-4, 1e-6, len(dataset))
trainer.add_event_handler(Events.ITERATION_STARTED, scheduler)
@trainer.on(Events.ITERATION_COMPLETED)
def log_training_loss(engine):
iter = (engine.state.iteration - 1) % len(dataset) + 1
if iter % log_interval == 0:
print('loss', engine.state.output, 'average time', timer.value())
trainer.run(dataset, max_epochs=100)
它改进了以8ms计算每个 mini-batch 。为了获得更好的性能,我们将模型权值降低到半精度,因此需要调整损失以确保半精度动态范围与计算结果一致。它是猜测什么是正确的损失比例因子,并自动调整,如果梯度溢出。最后,在保持模型预测精度的前提下,获得最佳的硬件加速性能。
5
多个GPU训练
Apex让多GPU训练变得容易。在同一个训练脚本中,我们需要注意一些额外的步骤:
1、添加参数——local_rank,该参数将由分布式启动程序自动设置。
2、初始化进程组。
3、根据数据集中的进程id生成独立的批处理数据。
4、包装模型和优化器来处理分布式计算。
5、衡量损失和优化。
为了启动分布式训练,我们需要将所有内容都放到一个Python文件中。以下是一个例子:
%%writefile distributed_train.py
import cupy
import numpy as np
import math
import time
import os
import torch
from torch.utils.dlpack import from_dlpack
import torch.nn as nn
import torch.nn.functional as F
import torch
from apex import amp
from ignite.engine import Engine, Events
from torch.nn import MSELoss
from torch.optim import Adam
from ignite.contrib.handlers.param_scheduler import CosineAnnealingScheduler
from ignite.handlers import ModelCheckpoint
from apex.parallel import DistributedDataParallel
import argparse
from model import Net
from cupy_dataset import OptionDataSet
parser = argparse.ArgumentParser()
parser = argparse.ArgumentParser()
parser.add_argument("--local_rank", default=0, type=int)
args = parser.parse_args()
args.distributed = False
if 'WORLD_SIZE' in os.environ:
args.distributed = int(os.environ['WORLD_SIZE']) > 1
if args.distributed:
torch.cuda.set_device(args.local_rank)
torch.distributed.init_process_group(backend='nccl',
init_method='env://')
torch.backends.cudnn.benchmark = True
model = Net().cuda()
loss_fn = MSELoss()
optimizer = Adam(model.parameters(), lr=1e-3)
opt_level = 'O1'
model, optimizer = amp.initialize(model, optimizer, opt_level=opt_level)
if args.distributed:
model = DistributedDataParallel(model)
dataset = OptionDataSet(max_len=10000, number_path = 1024, batch=10240, seed=args.local_rank)
def train_update(engine, batch):
model.train()
optimizer.zero_grad()
x = batch[0]
y = batch[1]
y_pred = model(x)
loss = loss_fn(y_pred[:,0], y)
with amp.scale_loss(loss, optimizer) as scaled_loss:
scaled_loss.backward()
optimizer.step()
return loss.item()
trainer = Engine(train_update)
log_interval = 100
scheduler = CosineAnnealingScheduler(optimizer, 'lr', 1e-4, 1e-6, len(dataset))
trainer.add_event_handler(Events.ITERATION_STARTED, scheduler)
@trainer.on(Events.ITERATION_COMPLETED)
def log_training_loss(engine):
iter = (engine.state.iteration - 1) % len(dataset) + 1
if iter % log_interval == 0:
print('loss', engine.state.output)
trainer.run(dataset, max_epochs=100)
覆盖distributed_train.py
要启动多进程训练,我们需要运行以下命令:
%reset -f
!python -m torch.distributed.launch --nproc_per_node=4 distributed_train.py
所有的GPU都在训练这个网络。然而,它有几个问题:
1、由于没有模型序列化,因此不会保存经过训练的模型;
2、没有验证数据集来检查训练进度;
3、大部分时间都花在蒙特卡罗模拟上,因此训练速度较慢;
4、 我们使用几个路径(1024)作为每个期权参数集,这些参数集是噪声的,并且模型不能收敛到一个低成本值。
解决方案是将蒙特卡罗仿真数据保存在磁盘上。这允许我们:
1、为不同的模型使用相同的数据集,节省蒙特卡罗仿真时间
2、通过增加路径数量来生成更精确的定价数据
我们将使用CuPy来运行蒙特卡罗仿真,因为它是最有效的方法。使用前面文章中定义的OptionDataSet:
from cupy_dataset import OptionDataSet
为保存的数据文件和模型检查点创建目录:
!mkdir -p datafiles
!mkdir -p check_points
定义一个函数来生成数据集文件:
def gen_data(n_files = 630, options_per_file = 10000, seed=3):
counter = 0
ds = OptionDataSet(max_len=n_files * options_per_file, number_path=8192000, batch=1,
seed=seed)
x = []
y = []
for i in ds:
if counter!=0 and counter % options_per_file == 0:
filename = 'datafiles/'+str(seed) + '_' + str(counter//options_per_file) + '.pth'
state = (torch.cat(x, 0), torch.cat(y, 0))
torch.save(state, filename)
x = []
y = []
x.append(i[0].cpu())
y.append(i[1].cpu())
counter += 1
return seed
它将为每个文件生成包含 x 和 y 大小矩阵选项的文件,文件名采用seed_group.pth,我们可以测试运行n_files = 5 和options_per_file = 16。
gen_data(n_files=5, options_per_file = 16, seed=3)
X, Y = torch.load('datafiles/3_1.pth')
print(X)
print(Y)
在本文中,我们将使用DASK在多核 GPU上生成数据集:
import dask
import dask_cudf
from dask.delayed import delayed
from dask_cuda import LocalCUDACluster
cluster = LocalCUDACluster()
from dask.distributed import Client
client = Client(cluster)
client
下面的代码是一个在4个GPU上生成100x5x16个数据点示例。对于真正的深度学习模型训练,我们需要数以百万计的数据点。大家可以尝试将n_files和options_per_file更改为较大的数字。
futures = []
for i in range(0, 100):
future = client.submit(gen_data, 5, 16, i)
futures.append(future)
results = client.gather(futures)
一旦生成了数百万个数据点,我们就可以将这些数据点组合在一起,并将它们拆分为训练和验证数据集。
import pathlib
files = list(pathlib.Path('datafiles/').glob('*.pth'))
trn_size = int(len(files)*0.7)
trn_files = files[:trn_size]
val_files = files[trn_size:]
trn_x = []
trn_y = []
count = 0
for i in trn_files:
tensor = torch.load(i)
if count % 10 == 0:
print(count,'/',len(trn_files))
trn_x.append(tensor[0])
trn_y.append(tensor[1])
count += 1
X = torch.cat(trn_x)
Y = torch.cat(trn_y)
torch.save((X,Y), 'trn.pth')
val_x = []
val_y = []
count = 0
for i in val_files:
tensor = torch.load(i)
if count % 10 == 0:
print(count,'/',len(val_files))
val_x.append(tensor[0])
val_y.append(tensor[1])
count += 1
X = torch.cat(val_x)
Y = torch.cat(val_y)
torch.save((X,Y), 'val.pth')
我们创建了两个用于训练和验证的数据文件trn.pth和valn .pth。我们可以定义一个新的PyTorch数据集来从文件加载数据并将其写入文件。该数据集采用rank和world_size参数进行分布式训练。它将整个数据集加载到GPU内存中,并根据rank id对数据点进行采样,使得不同rank_id的数据集给出不同的数据。
%%writefile filedataset.py
import torch
class OptionDataSet(torch.utils.data.Dataset):
def __init__(self, filename, rank=0, world_size=5):
tensor = torch.load(filename)
self.tensor = (tensor[0].cuda(), tensor[1].cuda())
self.length = len(self.tensor[0]) // world_size
self.world_size = world_size
self.rank = rank
def __getitem__(self, index):
index = index * self.world_size + self.rank
return self.tensor[0][index], self.tensor[1][index]
def __len__(self):
return self.length
写入filedataset.py
在训练深度学习模型时,防止过拟合的一个有效方法是使用单独的验证数据集来监控样本外的性能 。当验证数据集的性能下降时,这意味着发生了过拟合,因此我们可以停止训练。我们把所有的东西放在一个脚本,可以在多个GPU有效地训练模型:
%%writefile distributed_training.py
import torch
from ignite.engine import Engine, Events
from torch.nn import MSELoss
from ignite.contrib.handlers.param_scheduler import CosineAnnealingScheduler
from apex import amp
import argparse
import os
from apex.parallel import DistributedDataParallel
import apex
from apex.optimizers import FusedLAMB
from model import Net
from filedataset import OptionDataSet
from ignite.metrics import MeanAbsoluteError
import ignite
import shutil
import torch.distributed as dist
parser = argparse.ArgumentParser()
parser.add_argument("--local_rank", default=0, type=int)
parser.add_argument("--path", default=None)
parser.add_argument("--mae_improv_tol", default=0.002, type=float)
args = parser.parse_args()
args.distributed = False
if 'WORLD_SIZE' in os.environ:
args.distributed = int(os.environ['WORLD_SIZE']) > 1
if args.distributed:
torch.cuda.set_device(args.local_rank)
torch.distributed.init_process_group(backend='nccl',
init_method='env://')
torch.backends.cudnn.benchmark = True
trn_dataset = OptionDataSet(filename='./trn.pth',
rank=dist.get_rank(),
world_size=int(os.environ['WORLD_SIZE']))
trn_dataset = torch.utils.data.DataLoader(trn_dataset,
batch_size=1024,
shuffle=True,
num_workers=0)
val_dataset = OptionDataSet(filename='./val.pth',
rank=dist.get_rank(),
world_size=int(os.environ['WORLD_SIZE']))
val_dataset = torch.utils.data.DataLoader(val_dataset,
batch_size=1024,
shuffle=False,
num_workers=0)
model = Net().cuda()
optimizer = FusedLAMB(model.parameters(), lr=1e-3)
loss_fn = MSELoss()
model = apex.parallel.convert_syncbn_model(model, channel_last=True)
model, optimizer = amp.initialize(model, optimizer, opt_level='O1')
best_mae = 100000
if args.path is not None:
def resume():
global best_mae
checkpoint = torch.load(args.path)
best_mae = checkpoint['best_mae']
model.load_state_dict(checkpoint['state_dict'])
amp.load_state_dict(checkpoint['amp'])
optimizer.load_state_dict(checkpoint['optimizer'])
resume()
if args.distributed:
model = DistributedDataParallel(model)
def train_update(engine, batch):
model.train()
optimizer.zero_grad()
x = batch[0]
y = batch[1]
y_pred = model(x)
loss = loss_fn(y, y_pred[:, 0])
with amp.scale_loss(loss, optimizer) as scaled_loss:
scaled_loss.backward()
optimizer.step()
return loss.item()
trainer = Engine(train_update)
log_interval = 500
scheduler = CosineAnnealingScheduler(optimizer, 'lr', 1e-5, 5e-6,
len(trn_dataset),
start_value_mult=0.999, end_value_mult=0.999,
save_history=False
)
trainer.add_event_handler(Events.ITERATION_STARTED, scheduler)
def save_checkpoint(state, is_best, filename='checkpoint.pth.tar'):
torch.save(state, filename)
if is_best:
shutil.copyfile(filename, 'check_points/model_best.pth.tar')
@trainer.on(Events.ITERATION_COMPLETED)
def log_training_loss(engine):
iter = (engine.state.iteration - 1) % len(trn_dataset) + 1
if iter % log_interval == 0:
print('loss', engine.state.output, 'iter', engine.state.iteration,
'lr', scheduler.get_param())
metric = MeanAbsoluteError()
loss_m = ignite.metrics.Loss(loss_fn)
def eval_update(engine, batch):
model.eval()
x = batch[0]
y = batch[1]
y_pred = model(x)
return y, y_pred[:, 0]
evaluator = Engine(eval_update)
metric.attach(evaluator, "MAE")
loss_m.attach(evaluator, "loss")
@trainer.on(Events.EPOCH_COMPLETED)
def log_evalnumber(engine):
global best_mae
mae_improv_tol = args.mae_improv_tol
evaluator.run(val_dataset, max_epochs=1)
metrics = evaluator.state.metrics
average_tensor = torch.tensor([metrics['MAE'], metrics['loss']]).cuda()
torch.distributed.reduce(average_tensor, 0, op=torch.distributed.ReduceOp.SUM)
torch.distributed.broadcast(average_tensor, 0)
average_tensor = average_tensor/int(os.environ['WORLD_SIZE'])
mae = average_tensor[0].item()
is_best = False
if (1 - mae / best_mae) >= mae_improv_tol or \
(engine.state.epoch == engine.state.max_epochs and
mae < best_mae):
best_mae = mae
is_best = True
# print("RANK {} Val Results - Epoch: {} Avg MAE: {:.5f} loss: {:.5f} BEST MAE: {:.5f}"
# .format(dist.get_rank(), trainer.state.epoch, metrics['MAE'], metrics['loss'], best_mae))
if dist.get_rank() == 0:
print('Epoch {}/{}'.format(engine.state.epoch, engine.state.max_epochs))
print('Best MAE Improvement Tolerance for checkpointing: {}%'.format(100 * mae_improv_tol))
print("RANK {} AVG {} NGPUs, best-mae: {:.5f} mae: {:.5f} loss: {:.5f}".format(
dist.get_rank(),
int(os.environ['WORLD_SIZE']),
best_mae,
average_tensor[0].item(),
average_tensor[1].item()))
fname = 'check_points/current_pth.tar'
if is_best:
save_checkpoint({'epoch': trainer.state.epoch,
'state_dict': model.module.state_dict(),
'best_mae': best_mae,
'optimizer': optimizer.state_dict(),
'amp': amp.state_dict()
}, is_best,
filename=fname)
inputs = torch.tensor([[110.0, 100.0, 120.0, 0.35, 0.1, 0.05]]).cuda()
res = model(inputs)
print('test one example:', res.item())
trainer.run(trn_dataset, max_epochs=2000)
覆盖distributed_training.py
与前面的代码相比,它有点复杂,因为:
它处理验证数据集的评估
它将模型序列化到一个文件中,并根据MAE跟踪执行得最好的模型
它从文件中恢复训练
我们可以通过以下命令来启动分布式训练:
ngpus=!echo $(nvidia-smi -L | wc -l)
!python -m torch.distributed.launch --nproc_per_node={ngpus[0]} distributed_training.py
我们需要一些耐心来训练定价模型,直到它收敛。
6
推断和Greeks
一旦训练被聚合,执行得最好的模型就被保存到check_points/目录中。
为了得到一个好的模型,我们需要数百万个数据点来训练模型,直到它收敛。通常在一台8个GPU的DGX-1机器上需要10-20个小时。我们使用1000万个训练数据点和500万个验证数据点对模型进行训练。我们没有研究训练样本的最小数量是多少,只是简单地使用了大量的数据样本。大家可以通过使用更少的数据点来进行训练。
为了节省时间,可以运行以下命令下载权重并使用它们进行推断:
! ((test ! -f './check_points/model_best.pth.tar' || test ! -f './check_points/512/model_best.pth.tar') && \
bash ./download_data.sh) || echo "Dataset is already present. No need to re-download it."
数据集已经存在。不需要重新下载。
我们可以加载模型参数并使用它进行推断。
from model import Net
import torch
checkpoint = torch.load('check_points/model_best.pth.tar')
model = Net().cuda()
model.load_state_dict(checkpoint['state_dict'])
inputs = torch.tensor([[110.0, 100.0, 120.0, 0.35, 0.1, 0.05]]).cuda()
model(inputs)
tensor([[18.7140]], device='cuda:0', grad_fn=)
建立深度学习模型的好处之一是可以很容易地计算出Greeks。我们只需要利用Pytorch中的auto-grad特征。下面是一个计算多元多项式函数一阶微分的例子。
import torch
from torch.autograd import grad
'''
z = (xy)^2
x = 3, y =2
first order deriv [24 36]
'''
inputs = torch.tensor([3.0,2.0], requires_grad=True)
z = (inputs[0]*inputs[1])**2
first_order_grad = grad(z, inputs, create_graph=True)
print(first_order_grad)
(tensor([24., 36.], grad_fn=),)
我们可以使用grad函数来计算参数:K, B, S0, sigma, mu, r的一阶差分:
inputs = torch.tensor([[110.0, 100.0, 120.0, 0.35, 0.1, 0.05]]).cuda()
inputs.requires_grad = True
x = model(inputs)
x.backward()
first_order_gradient = inputs.grad
first_order_gradient
tensor([[-6.7092e-01, -2.1257e-02, 7.8896e-01, 1.9219e+01, 4.8331e+01,
-1.8419e+01]], device='cuda:0')
画出函数曲线:
%matplotlib inline
import pylab
import numpy as np
def compute_delta(S):
inputs = torch.tensor([[110.0, 100.0, S, 0.35, 0.1, 0.05]]).cuda()
inputs.requires_grad = True
x = model(inputs)
x.backward()
first_order_gradient = inputs.grad
return first_order_gradient[0][2]
prices = np.arange(10, 200, 0.1)
deltas = []
for p in prices:
deltas.append(compute_delta(p).item())
fig = pylab.plot(prices, deltas)
pylab.xlabel('prices')
pylab.ylabel('Delta')
fig
在PyTorch中计算二阶导数也很简单。我们只需要应用两次grad函数。下面是计算同一多项式函数的二阶导数的例子:
import torch
from torch.autograd import grad
'''
z = (xy)^2
x = 3, y =2
first order deriv [24 36]
d2z/dx2 = 8
d2z/dxdy = 24
d2z/dy2 = 18
'''
inputs = torch.tensor([3.0,2.0], requires_grad=True)
z = (inputs[0]*inputs[1])**2
first_order_grad = grad(z, inputs, create_graph=True)
second_order_grad_x, = grad(first_order_grad[0][0], inputs, retain_graph=True) #
second_order_grad_y, = grad(first_order_grad[0][1], inputs)
print(second_order_grad_x)
print(second_order_grad_y)
tensor([ 8., 24.])
tensor([24., 18.])
利用这个机制,我们可以在下面的例子中计算二阶导数:
,,,,,
import torch
from torch import Tensor
from torch.autograd import Variable
from torch.autograd import grad
from torch import nn
inputs = torch.tensor([[110.0, 100.0, 120.0, 0.35, 0.1, 0.05]]).cuda()
inputs.requires_grad = True
x = model(inputs)
loss_grads = grad(x, inputs, create_graph=True)
drv = grad(loss_grads[0][0][2], inputs)
drv
Out[9]:
(tensor([[-0.0143, 0.0039, 0.0098, -0.3183, 1.1455, -0.7876]],
device='cuda:0'),)
Gamma是S的二阶差分。 我们可以把Gamma曲线画成股票价格的函数:
import pylab
import numpy as np
def compute_gamma(S):
inputs = torch.tensor([[110.0, 100.0, S, 0.35, 0.1, 0.05]]).cuda()
inputs.requires_grad = True
x = model(inputs)
loss_grads = grad(x, inputs, create_graph=True)
drv = grad(loss_grads[0][0][2], inputs)
return drv[0][0][2]
prices = np.arange(10, 200, 0.1)
deltas = []
for p in prices:
deltas.append(compute_gamma(p).item())
fig2 = pylab.plot(prices, deltas)
pylab.xlabel('prices')
pylab.ylabel('Gamma')
fig2
隐含波动率是基于期权报价对标的资产的预测波动率。给出的模型是价格与期权参数的反向映射,用蒙特卡罗模拟法很难做到这一点。但如果我们有深度学习定价模型,这是一个简单的任务。我们可以先画出波动率和期权价格之间的关系:
import pylab
import numpy as np
def compute_price(sigma):
inputs = torch.tensor([[110.0, 100.0, 120.0, sigma, 0.1, 0.05]]).cuda()
x = model(inputs)
return x.item()
sigmas = np.arange(0, 0.5, 0.1)
prices = []
for s in sigmas:
prices.append(compute_price(s))
fig3 = pylab.plot(sigmas, prices)
pylab.xlabel('Sigma')
pylab.ylabel('Price')
fig3
给定价格P,隐含波动率是compute_price函数的根。我们可以用二分法求根。
def bisection_root(small, large, fun, target, EPS=1e-6):
if fun(large) - target < 0:
print('upper bound is too small')
return None
if fun(small) - target > 0:
print('lower bound is too large')
return None
while large - small > EPS:
mid = (large + small) / 2.0
if fun(mid) - target >= 0:
large = mid
else:
small = mid
mid = (large + small) / 2.0
return mid, abs(fun(mid) - target)
quoted_price = 16.0
sigma, err = bisection_root(0, 0.5, compute_price, quoted_price)
print('implied volativity', sigma, 'error', err)
implied volativity 0.18517351150512695 error 4.76837158203125e-06
蒙特卡罗模拟需要数以百万计的路径来得到精确的答案,这需要大量的计算。Ryan等人得研究表明,可以训练深度学习模型对衍生品进行估值。深度学习模型是准确和快速的,能够产生比传统模型快一百万倍的估值。在今天的推文中,我们将使用一个 全连接网络来学习亚式障碍期权的定价模式 。采用蒙特卡罗模拟作为训练的定价依据。我们使用与上一篇文章相同的亚式障碍期权模型,参数如下:geng
T:到期如(年)
S:现货(美元)
K:Strike(美元)
sigma:波动率(per)
r:无风险利率(per)
mu:Drift Rate(per)
B:Barrier(美元)
下面的内容主要包括两个主题:
使用蒙特卡罗定价动态数据集训练期权定价神的经网络模型。
使用蒙特卡罗定价静态数据集训练期权定价神经网络模型并进行推断。
2
批处理数据生成
数据集是深度学习训练的重要组成部分。我们将修改之前的单一亚式障碍期权定价代码来处理一批障碍期权定价。
加载库:
import cupy
import numpy as np
import math
import time
import torch
cupy.cuda.set_allocator(None)
from torch.utils.dlpack import from_dlpack
批量障碍期权定价模拟的CuPy版本如下:
cupy_batched_barrier_option = cupy.RawKernel(r'''
extern "C" __global__ void batched_barrier_option(
float *d_s,
const float T,
const float * K,
const float * B,
const float * S0,
const float * sigma,
const float * mu,
const float * r,
const float * d_normals,
const long N_STEPS,
const long N_PATHS,
const long N_BATCH)
{
unsigned idx = threadIdx.x + blockIdx.x * blockDim.x;
unsigned stride = blockDim.x * gridDim.x;
unsigned tid = threadIdx.x;
const float tmp3 = sqrt(T/N_STEPS);
for (unsigned i = idx; i
{
int batch_id = i / N_PATHS;
int path_id = i % N_PATHS;
float s_curr = S0[batch_id];
float tmp1 = mu[batch_id]*T/N_STEPS;
float tmp2 = exp(-r[batch_id]*T);
unsigned n=0;
double running_average = 0.0;
for(unsigned n = 0; n < N_STEPS; n++){
s_curr += tmp1 * s_curr + sigma[batch_id]*s_curr*tmp3*d_normals[path_id + batch_id * N_PATHS + n * N_PATHS * N_BATCH];
running_average += (s_curr - running_average) / (n + 1.0);
if (running_average <= B[batch_id]){
break;
}
}
float payoff = (running_average>K[batch_id] ? running_average-K[batch_id] : 0.f);
d_s[i] = tmp2 * payoff;
}
}
''', 'batched_barrier_option')
注意,参数(K, B, S0, sigma, mu, r)以批处理长度的数组形式传入。输出数组是一个1-D 的二维数组。第一个维度用于 Batch,第二个维度用于 Path。。
通过输入两组选项参数进行测试:
N_PATHS = 2048000
N_STEPS = 365
N_BATCH = 2
T = 1.0
K = cupy.array([110.0, 120.0], dtype=cupy.float32)
B = cupy.array([100.0, 90.0], dtype=cupy.float32)
S0 = cupy.array([120.0, 100.0], dtype=cupy.float32)
sigma = cupy.array([0.35, 0.2], dtype=cupy.float32)
mu = cupy.array([0.15, 0.1], dtype=cupy.float32)
r =cupy.array([0.05, 0.05], dtype=cupy.float32)
把这一切放进一个简单的函数来启动1GPU内核。每个Path的期权价格是相应路径terminal值的平均值。这可以很容易地通过Cupy函数平均值(axis=1)计算出来
def batch_run():
number_of_threads = 256
number_of_blocks = (N_PATHS * N_BATCH - 1) // number_of_threads + 1
randoms_gpu = cupy.random.normal(0, 1, N_BATCH*N_PATHS * N_STEPS, dtype=cupy.float32)
output = cupy.zeros(N_BATCH*N_PATHS, dtype=cupy.float32)
cupy.cuda.stream.get_current_stream().synchronize()
s = time.time()
cupy_batched_barrier_option((number_of_blocks,), (number_of_threads,),
(output, np.float32(T), K, B, S0, sigma, mu, r,
randoms_gpu, N_STEPS, N_PATHS, N_BATCH))
v = output.reshape(N_BATCH, N_PATHS).mean(axis=1)
cupy.cuda.stream.get_current_stream().synchronize()
e = time.time()
print('time', e-s, 'v',v)
batch_run()
time 0.013919591903686523 v [21.22405 0.8480416]
这将为66ms中的这两组期权参数生成21.22和0.848的期权价格。
它的工作效率很高,因此我们将构造一个OptionDataSet类来包装上面的代码,以便我们可以在Pytorch中使用它。对于每个下一个元素,生成指定范围内的均匀分布随机期权参数,启动GPU内核计算期权价格,通过DLPack将CuPy数组转换为带有zero-copy的Pytorch张量。请注意我们是如何实现iterable Dataset接口的:
class OptionDataSet(torch.utils.data.IterableDataset):
def __init__(self, max_len=10, number_path = 1000, batch=2, threads=256,seed=15):
self.num = 0
self.max_length = max_len
self.N_PATHS = number_path
self.N_STEPS = 365
self.N_BATCH = batch
self.T = np.float32(1.0)
self.output = cupy.zeros(self.N_BATCH*self.N_PATHS, dtype=cupy.float32)
self.number_of_blocks = (self.N_PATHS * self.N_BATCH - 1) // threads + 1
self.number_of_threads = threads
cupy.random.seed(seed)
def __len__(self):
return self.max_length
def __iter__(self):
self.num = 0
return self
def __next__(self):
if self.num > self.max_length:
raise StopIteration
X = cupy.random.rand(self.N_BATCH, 6, dtype=cupy.float32)
X = X * cupy.array([200.0, 0.99, 200.0, 0.4, 0.2, 0.2], dtype=cupy.float32)
X[:, 1] = X[:, 0] * X[:, 1]
randoms = cupy.random.normal(0, 1, self.N_BATCH * self.N_PATHS * self.N_STEPS, dtype=cupy.float32)
cupy_batched_barrier_option((self.number_of_blocks,), (self.number_of_threads,), (self.output, self.T, cupy.ascontiguousarray(X[:, 0]),
cupy.ascontiguousarray(X[:, 1]), cupy.ascontiguousarray(X[:, 2]), cupy.ascontiguousarray(X[:, 3]), cupy.ascontiguousarray(X[:, 4]), cupy.ascontiguousarray(X[:, 5]), randoms, self.N_STEPS, self.N_PATHS, self.N_BATCH))
Y = self.output.reshape(self.N_BATCH, self.N_PATHS).mean(axis=1)
self.num += 1
return (from_dlpack(X.toDlpack()), from_dlpack(Y.toDlpack()))
将所有与Pytorch数据集相关的内容都放到一个名为cupy_dataset.py的文件中:
%%writefile cupy_dataset.py
import cupy
import numpy as np
import torch
from torch.utils.dlpack import from_dlpack
cupy.cuda.set_allocator(None)
cupy_batched_barrier_option = cupy.RawKernel(r'''
extern "C" __global__ void batched_barrier_option(
float *d_s,
const float T,
const float * K,
const float * B,
const float * S0,
const float * sigma,
const float * mu,
const float * r,
const float * d_normals,
const long N_STEPS,
const long N_PATHS,
const long N_BATCH)
{
unsigned idx = threadIdx.x + blockIdx.x * blockDim.x;
unsigned stride = blockDim.x * gridDim.x;
unsigned tid = threadIdx.x;
const float tmp3 = sqrt(T/N_STEPS);
for (unsigned i = idx; i
{
int batch_id = i / N_PATHS;
int path_id = i % N_PATHS;
float s_curr = S0[batch_id];
float tmp1 = mu[batch_id]*T/N_STEPS;
float tmp2 = exp(-r[batch_id]*T);
unsigned n=0;
double running_average = 0.0;
for(unsigned n = 0; n < N_STEPS; n++){
s_curr += tmp1 * s_curr + sigma[batch_id]*s_curr*tmp3*d_normals[path_id + batch_id * N_PATHS + n * N_PATHS * N_BATCH];
running_average += (s_curr - running_average) / (n + 1.0);
if (running_average <= B[batch_id]){
break;
}
}
float payoff = (running_average>K[batch_id] ? running_average-K[batch_id] : 0.f);
d_s[i] = tmp2 * payoff;
}
}
''', 'batched_barrier_option')
class OptionDataSet(torch.utils.data.IterableDataset):
def __init__(self, max_len=10, number_path = 1000, batch=2, threads=256,seed=15):
self.num = 0
self.max_length = max_len
self.N_PATHS = number_path
self.N_STEPS = 365
self.N_BATCH = batch
self.T = np.float32(1.0)
self.output = cupy.zeros(self.N_BATCH*self.N_PATHS, dtype=cupy.float32)
self.number_of_blocks = (self.N_PATHS * self.N_BATCH - 1) // threads + 1
self.number_of_threads = threads
cupy.random.seed(seed)
def __len__(self):
return self.max_length
def __iter__(self):
self.num = 0
return self
def __next__(self):
if self.num > self.max_length:
raise StopIteration
X = cupy.random.rand(self.N_BATCH, 6, dtype=cupy.float32)
X = X * cupy.array([200.0, 0.99, 200.0, 0.4, 0.2, 0.2], dtype=cupy.float32)
X[:, 1] = X[:, 0] * X[:, 1]
randoms = cupy.random.normal(0, 1, self.N_BATCH * self.N_PATHS * self.N_STEPS, dtype=cupy.float32)
cupy_batched_barrier_option((self.number_of_blocks,), (self.number_of_threads,), (self.output, self.T, cupy.ascontiguousarray(X[:, 0]),
cupy.ascontiguousarray(X[:, 1]), cupy.ascontiguousarray(X[:, 2]), cupy.ascontiguousarray(X[:, 3]), cupy.ascontiguousarray(X[:, 4]), cupy.ascontiguousarray(X[:, 5]), randoms, self.N_STEPS, self.N_PATHS, self.N_BATCH))
Y = self.output.reshape(self.N_BATCH, self.N_PATHS).mean(axis=1)
self.num += 1
return (from_dlpack(X.toDlpack()), from_dlpack(Y.toDlpack()))
覆盖cupy_dataset.py
这里是一个测试代码样本,有10个数据点、 batch为 16:
ds = OptionDataSet(10, number_path=100000, batch=16, seed=15)
for i in ds:
print(i[1])
我们可以实现相同的代码使用Numba加速计算在GPU:
import numba
from numba import cuda
@cuda.jit
def batch_barrier_option(d_s, T, K, B, S0, sigma, mu, r, d_normals, N_STEPS, N_PATHS, N_BATCH):
ii = cuda.threadIdx.x + cuda.blockIdx.x * cuda.blockDim.x
stride = cuda.gridDim.x * cuda.blockDim.x
tmp3 = math.sqrt(T/N_STEPS)
for i in range(ii, N_PATHS * N_BATCH, stride):
batch_id = i // N_PATHS
path_id = i % N_PATHS
tmp1 = mu[batch_id]*T/N_STEPS
tmp2 = math.exp(-r[batch_id]*T)
running_average = 0.0
s_curr = S0[batch_id]
for n in range(N_STEPS):
s_curr += tmp1 * s_curr + sigma[batch_id]*s_curr*tmp3*d_normals[path_id + batch_id * N_PATHS + n * N_PATHS * N_BATCH]
running_average = running_average + 1.0/(n + 1.0) * (s_curr - running_average)
if i==0 and batch_id == 2:
print(s_curr)
if running_average <= B[batch_id]:
break
payoff = running_average - K[batch_id] if running_average > K[batch_id] else 0
d_s[i] = tmp2 * payoff
class NumbaOptionDataSet(object):
def __init__(self, max_len=10, number_path = 1000, batch=2, threads=512, seed=15):
self.num = 0
self.max_length = max_len
self.N_PATHS = number_path
self.N_STEPS = 365
self.N_BATCH = batch
self.T = np.float32(1.0)
self.output = cupy.zeros(self.N_BATCH*self.N_PATHS, dtype=cupy.float32)
self.number_of_blocks = (self.N_PATHS * self.N_BATCH - 1) // threads + 1
self.number_of_threads = threads
cupy.random.seed(seed)
def __len__(self):
return self.max_length
def __iter__(self):
self.num = 0
return self
def __next__(self):
if self.num > self.max_length:
raise StopIteration
X = cupy.random.rand(self.N_BATCH, 6, dtype=cupy.float32)
X = X * cupy.array([200.0, 0.99, 200.0, 0.4, 0.2, 0.2], dtype=cupy.float32)
X[:, 1] = X[:, 0] * X[:, 1]
randoms = cupy.random.normal(0, 1, self.N_BATCH * self.N_PATHS * self.N_STEPS, dtype=cupy.float32)
batch_barrier_option[(self.number_of_blocks,), (self.number_of_threads,)](self.output, self.T, X[:, 0],
X[:, 1], X[:, 2], X[:, 3], X[:, 4], X[:, 5], randoms, self.N_STEPS, self.N_PATHS, self.N_BATCH)
o = self.output.reshape(self.N_BATCH, self.N_PATHS)
Y = o.mean(axis = 1)
self.num += 1
return (from_dlpack(X.toDlpack()), from_dlpack(Y.toDlpack()))
ds = NumbaOptionDataSet(10, number_path=100000, batch=16, seed=15)
for i in ds:
print(i[1])
3
模型
为了将期权参数映射到价格,我们使用了6层全连接神经网络,其隐含维度为512。将此DL价格模型写入model.py:
%%writefile model.py
import torch.nn as nn
import torch.nn.functional as F
import torch
class Net(nn.Module):
def __init__(self, hidden=1024):
super(Net, self).__init__()
self.fc1 = nn.Linear(6, hidden)
self.fc2 = nn.Linear(hidden, hidden)
self.fc3 = nn.Linear(hidden, hidden)
self.fc4 = nn.Linear(hidden, hidden)
self.fc5 = nn.Linear(hidden, hidden)
self.fc6 = nn.Linear(hidden, 1)
self.register_buffer('norm',
torch.tensor([200.0,
198.0,
200.0,
0.4,
0.2,
0.2]))
def forward(self, x):
x = x / self.norm
x = F.elu(self.fc1(x))
x = F.elu(self.fc2(x))
x = F.elu(self.fc3(x))
x = F.elu(self.fc4(x))
x = F.elu(self.fc5(x))
return self.fc6(x)
覆盖model.py
输入参数首先通过除以(200.0,198.0,200.0,0.4,0.2,0.2)缩小到0-1范围。然后在ELu激活函数后,将其5次隐射到隐藏维度512。选择ELu是因为我们需要计算参数的二阶微分。如果使用ReLu,二阶微分总是0。最后一层是线性层,它将隐藏维度映射到预测的期权价格。
在训练方面,我们使用了一个高级库Ignite来训练PyTorch中的神经网络:
https://github.com/pytorch/ignite
我们使用MSELoss作为损失函数,Adam作为优化器,CosineAnnealingScheduler作为学习率调度器。下面的代码将随机期权数据提供给定价模型进行训练。
from ignite.engine import Engine, Events
from ignite.handlers import Timer
from torch.nn import MSELoss
from torch.optim import Adam
from ignite.contrib.handlers.param_scheduler import CosineAnnealingScheduler
from ignite.handlers import ModelCheckpoint
from model import Net
from cupy_dataset import OptionDataSet
timer = Timer(average=True)
model = Net().cuda()
loss_fn = MSELoss()
optimizer = Adam(model.parameters(), lr=1e-3)
dataset = OptionDataSet(max_len=10000, number_path = 1024, batch=4800)
def train_update(engine, batch):
model.train()
optimizer.zero_grad()
x = batch[0]
y = batch[1]
y_pred = model(x)
loss = loss_fn(y_pred[:,0], y)
loss.backward()
optimizer.step()
return loss.item()
trainer = Engine(train_update)
log_interval = 100
scheduler = CosineAnnealingScheduler(optimizer, 'lr', 1e-4, 1e-6, len(dataset))
trainer.add_event_handler(Events.ITERATION_STARTED, scheduler)
timer.attach(trainer,
start=Events.EPOCH_STARTED,
resume=Events.ITERATION_STARTED,
pause=Events.ITERATION_COMPLETED,
step=Events.ITERATION_COMPLETED)
@trainer.on(Events.ITERATION_COMPLETED)
def log_training_loss(engine):
iter = (engine.state.iteration - 1) % len(dataset) + 1
if iter % log_interval == 0:
print('loss', engine.state.output, 'average time', timer.value())
trainer.run(dataset, max_epochs=100)
损失在不断减少,这意味着定价模型可以更好地预测期权价格。平均计算一个批大小(mini-batch)量需要花费大约12ms,在接下的文章中,我们将尝试挖掘GPU的全部潜力来加速训练。
4
TensorCore混合精度训练
V100 GPU有640个张量核,可以加速半精度矩阵乘法运算,这是DL模型的核心运算。由NVIDIA开发的Apex库( https://github.com/NVIDIA/apex )使Pytorch中的混合精度和分布式训练变得容易。通过改变3行代码,可以利用张量核加速训练。
from apex import amp
from ignite.engine import Engine, Events
from torch.nn import MSELoss
from ignite.handlers import Timer
from torch.optim import Adam
from ignite.contrib.handlers.param_scheduler import CosineAnnealingScheduler
from ignite.handlers import ModelCheckpoint
from model import Net
from cupy_dataset import OptionDataSet
timer = Timer(average=True)
model = Net().cuda()
loss_fn = MSELoss()
optimizer = Adam(model.parameters(), lr=1e-3)
opt_level = 'O1'
model, optimizer = amp.initialize(model, optimizer, opt_level=opt_level)
dataset = OptionDataSet(max_len=10000, number_path = 1024, batch=4800)
def train_update(engine, batch):
model.train()
optimizer.zero_grad()
x = batch[0]
y = batch[1]
y_pred = model(x)
loss = loss_fn(y_pred[:,0], y)
with amp.scale_loss(loss, optimizer) as scaled_loss:
scaled_loss.backward()
optimizer.step()
return loss.item()
trainer = Engine(train_update)
log_interval = 100
timer.attach(trainer,
start=Events.EPOCH_STARTED,
resume=Events.ITERATION_STARTED,
pause=Events.ITERATION_COMPLETED,
step=Events.ITERATION_COMPLETED)
scheduler = CosineAnnealingScheduler(optimizer, 'lr', 1e-4, 1e-6, len(dataset))
trainer.add_event_handler(Events.ITERATION_STARTED, scheduler)
@trainer.on(Events.ITERATION_COMPLETED)
def log_training_loss(engine):
iter = (engine.state.iteration - 1) % len(dataset) + 1
if iter % log_interval == 0:
print('loss', engine.state.output, 'average time', timer.value())
trainer.run(dataset, max_epochs=100)
它改进了以8ms计算每个 mini-batch 。为了获得更好的性能,我们将模型权值降低到半精度,因此需要调整损失以确保半精度动态范围与计算结果一致。它是猜测什么是正确的损失比例因子,并自动调整,如果梯度溢出。最后,在保持模型预测精度的前提下,获得最佳的硬件加速性能。
5
多个GPU训练
Apex让多GPU训练变得容易。在同一个训练脚本中,我们需要注意一些额外的步骤:
1、添加参数——local_rank,该参数将由分布式启动程序自动设置。
2、初始化进程组。
3、根据数据集中的进程id生成独立的批处理数据。
4、包装模型和优化器来处理分布式计算。
5、衡量损失和优化。
为了启动分布式训练,我们需要将所有内容都放到一个Python文件中。以下是一个例子:
%%writefile distributed_train.py
import cupy
import numpy as np
import math
import time
import os
import torch
from torch.utils.dlpack import from_dlpack
import torch.nn as nn
import torch.nn.functional as F
import torch
from apex import amp
from ignite.engine import Engine, Events
from torch.nn import MSELoss
from torch.optim import Adam
from ignite.contrib.handlers.param_scheduler import CosineAnnealingScheduler
from ignite.handlers import ModelCheckpoint
from apex.parallel import DistributedDataParallel
import argparse
from model import Net
from cupy_dataset import OptionDataSet
parser = argparse.ArgumentParser()
parser = argparse.ArgumentParser()
parser.add_argument("--local_rank", default=0, type=int)
args = parser.parse_args()
args.distributed = False
if 'WORLD_SIZE' in os.environ:
args.distributed = int(os.environ['WORLD_SIZE']) > 1
if args.distributed:
torch.cuda.set_device(args.local_rank)
torch.distributed.init_process_group(backend='nccl',
init_method='env://')
torch.backends.cudnn.benchmark = True
model = Net().cuda()
loss_fn = MSELoss()
optimizer = Adam(model.parameters(), lr=1e-3)
opt_level = 'O1'
model, optimizer = amp.initialize(model, optimizer, opt_level=opt_level)
if args.distributed:
model = DistributedDataParallel(model)
dataset = OptionDataSet(max_len=10000, number_path = 1024, batch=10240, seed=args.local_rank)
def train_update(engine, batch):
model.train()
optimizer.zero_grad()
x = batch[0]
y = batch[1]
y_pred = model(x)
loss = loss_fn(y_pred[:,0], y)
with amp.scale_loss(loss, optimizer) as scaled_loss:
scaled_loss.backward()
optimizer.step()
return loss.item()
trainer = Engine(train_update)
log_interval = 100
scheduler = CosineAnnealingScheduler(optimizer, 'lr', 1e-4, 1e-6, len(dataset))
trainer.add_event_handler(Events.ITERATION_STARTED, scheduler)
@trainer.on(Events.ITERATION_COMPLETED)
def log_training_loss(engine):
iter = (engine.state.iteration - 1) % len(dataset) + 1
if iter % log_interval == 0:
print('loss', engine.state.output)
trainer.run(dataset, max_epochs=100)
覆盖distributed_train.py
要启动多进程训练,我们需要运行以下命令:
%reset -f
!python -m torch.distributed.launch --nproc_per_node=4 distributed_train.py
所有的GPU都在训练这个网络。然而,它有几个问题:
1、由于没有模型序列化,因此不会保存经过训练的模型;
2、没有验证数据集来检查训练进度;
3、大部分时间都花在蒙特卡罗模拟上,因此训练速度较慢;
4、 我们使用几个路径(1024)作为每个期权参数集,这些参数集是噪声的,并且模型不能收敛到一个低成本值。
解决方案是将蒙特卡罗仿真数据保存在磁盘上。这允许我们:
1、为不同的模型使用相同的数据集,节省蒙特卡罗仿真时间
2、通过增加路径数量来生成更精确的定价数据
我们将使用CuPy来运行蒙特卡罗仿真,因为它是最有效的方法。使用前面文章中定义的OptionDataSet:
from cupy_dataset import OptionDataSet
为保存的数据文件和模型检查点创建目录:
!mkdir -p datafiles
!mkdir -p check_points
定义一个函数来生成数据集文件:
def gen_data(n_files = 630, options_per_file = 10000, seed=3):
counter = 0
ds = OptionDataSet(max_len=n_files * options_per_file, number_path=8192000, batch=1,
seed=seed)
x = []
y = []
for i in ds:
if counter!=0 and counter % options_per_file == 0:
filename = 'datafiles/'+str(seed) + '_' + str(counter//options_per_file) + '.pth'
state = (torch.cat(x, 0), torch.cat(y, 0))
torch.save(state, filename)
x = []
y = []
x.append(i[0].cpu())
y.append(i[1].cpu())
counter += 1
return seed
它将为每个文件生成包含 x 和 y 大小矩阵选项的文件,文件名采用seed_group.pth,我们可以测试运行n_files = 5 和options_per_file = 16。
gen_data(n_files=5, options_per_file = 16, seed=3)
X, Y = torch.load('datafiles/3_1.pth')
print(X)
print(Y)
在本文中,我们将使用DASK在多核 GPU上生成数据集:
import dask
import dask_cudf
from dask.delayed import delayed
from dask_cuda import LocalCUDACluster
cluster = LocalCUDACluster()
from dask.distributed import Client
client = Client(cluster)
client
下面的代码是一个在4个GPU上生成100x5x16个数据点示例。对于真正的深度学习模型训练,我们需要数以百万计的数据点。大家可以尝试将n_files和options_per_file更改为较大的数字。
futures = []
for i in range(0, 100):
future = client.submit(gen_data, 5, 16, i)
futures.append(future)
results = client.gather(futures)
一旦生成了数百万个数据点,我们就可以将这些数据点组合在一起,并将它们拆分为训练和验证数据集。
import pathlib
files = list(pathlib.Path('datafiles/').glob('*.pth'))
trn_size = int(len(files)*0.7)
trn_files = files[:trn_size]
val_files = files[trn_size:]
trn_x = []
trn_y = []
count = 0
for i in trn_files:
tensor = torch.load(i)
if count % 10 == 0:
print(count,'/',len(trn_files))
trn_x.append(tensor[0])
trn_y.append(tensor[1])
count += 1
X = torch.cat(trn_x)
Y = torch.cat(trn_y)
torch.save((X,Y), 'trn.pth')
val_x = []
val_y = []
count = 0
for i in val_files:
tensor = torch.load(i)
if count % 10 == 0:
print(count,'/',len(val_files))
val_x.append(tensor[0])
val_y.append(tensor[1])
count += 1
X = torch.cat(val_x)
Y = torch.cat(val_y)
torch.save((X,Y), 'val.pth')
我们创建了两个用于训练和验证的数据文件trn.pth和valn .pth。我们可以定义一个新的PyTorch数据集来从文件加载数据并将其写入文件。该数据集采用rank和world_size参数进行分布式训练。它将整个数据集加载到GPU内存中,并根据rank id对数据点进行采样,使得不同rank_id的数据集给出不同的数据。
%%writefile filedataset.py
import torch
class OptionDataSet(torch.utils.data.Dataset):
def __init__(self, filename, rank=0, world_size=5):
tensor = torch.load(filename)
self.tensor = (tensor[0].cuda(), tensor[1].cuda())
self.length = len(self.tensor[0]) // world_size
self.world_size = world_size
self.rank = rank
def __getitem__(self, index):
index = index * self.world_size + self.rank
return self.tensor[0][index], self.tensor[1][index]
def __len__(self):
return self.length
写入filedataset.py
在训练深度学习模型时,防止过拟合的一个有效方法是使用单独的验证数据集来监控样本外的性能 。当验证数据集的性能下降时,这意味着发生了过拟合,因此我们可以停止训练。我们把所有的东西放在一个脚本,可以在多个GPU有效地训练模型:
%%writefile distributed_training.py
import torch
from ignite.engine import Engine, Events
from torch.nn import MSELoss
from ignite.contrib.handlers.param_scheduler import CosineAnnealingScheduler
from apex import amp
import argparse
import os
from apex.parallel import DistributedDataParallel
import apex
from apex.optimizers import FusedLAMB
from model import Net
from filedataset import OptionDataSet
from ignite.metrics import MeanAbsoluteError
import ignite
import shutil
import torch.distributed as dist
parser = argparse.ArgumentParser()
parser.add_argument("--local_rank", default=0, type=int)
parser.add_argument("--path", default=None)
parser.add_argument("--mae_improv_tol", default=0.002, type=float)
args = parser.parse_args()
args.distributed = False
if 'WORLD_SIZE' in os.environ:
args.distributed = int(os.environ['WORLD_SIZE']) > 1
if args.distributed:
torch.cuda.set_device(args.local_rank)
torch.distributed.init_process_group(backend='nccl',
init_method='env://')
torch.backends.cudnn.benchmark = True
trn_dataset = OptionDataSet(filename='./trn.pth',
rank=dist.get_rank(),
world_size=int(os.environ['WORLD_SIZE']))
trn_dataset = torch.utils.data.DataLoader(trn_dataset,
batch_size=1024,
shuffle=True,
num_workers=0)
val_dataset = OptionDataSet(filename='./val.pth',
rank=dist.get_rank(),
world_size=int(os.environ['WORLD_SIZE']))
val_dataset = torch.utils.data.DataLoader(val_dataset,
batch_size=1024,
shuffle=False,
num_workers=0)
model = Net().cuda()
optimizer = FusedLAMB(model.parameters(), lr=1e-3)
loss_fn = MSELoss()
model = apex.parallel.convert_syncbn_model(model, channel_last=True)
model, optimizer = amp.initialize(model, optimizer, opt_level='O1')
best_mae = 100000
if args.path is not None:
def resume():
global best_mae
checkpoint = torch.load(args.path)
best_mae = checkpoint['best_mae']
model.load_state_dict(checkpoint['state_dict'])
amp.load_state_dict(checkpoint['amp'])
optimizer.load_state_dict(checkpoint['optimizer'])
resume()
if args.distributed:
model = DistributedDataParallel(model)
def train_update(engine, batch):
model.train()
optimizer.zero_grad()
x = batch[0]
y = batch[1]
y_pred = model(x)
loss = loss_fn(y, y_pred[:, 0])
with amp.scale_loss(loss, optimizer) as scaled_loss:
scaled_loss.backward()
optimizer.step()
return loss.item()
trainer = Engine(train_update)
log_interval = 500
scheduler = CosineAnnealingScheduler(optimizer, 'lr', 1e-5, 5e-6,
len(trn_dataset),
start_value_mult=0.999, end_value_mult=0.999,
save_history=False
)
trainer.add_event_handler(Events.ITERATION_STARTED, scheduler)
def save_checkpoint(state, is_best, filename='checkpoint.pth.tar'):
torch.save(state, filename)
if is_best:
shutil.copyfile(filename, 'check_points/model_best.pth.tar')
@trainer.on(Events.ITERATION_COMPLETED)
def log_training_loss(engine):
iter = (engine.state.iteration - 1) % len(trn_dataset) + 1
if iter % log_interval == 0:
print('loss', engine.state.output, 'iter', engine.state.iteration,
'lr', scheduler.get_param())
metric = MeanAbsoluteError()
loss_m = ignite.metrics.Loss(loss_fn)
def eval_update(engine, batch):
model.eval()
x = batch[0]
y = batch[1]
y_pred = model(x)
return y, y_pred[:, 0]
evaluator = Engine(eval_update)
metric.attach(evaluator, "MAE")
loss_m.attach(evaluator, "loss")
@trainer.on(Events.EPOCH_COMPLETED)
def log_evalnumber(engine):
global best_mae
mae_improv_tol = args.mae_improv_tol
evaluator.run(val_dataset, max_epochs=1)
metrics = evaluator.state.metrics
average_tensor = torch.tensor([metrics['MAE'], metrics['loss']]).cuda()
torch.distributed.reduce(average_tensor, 0, op=torch.distributed.ReduceOp.SUM)
torch.distributed.broadcast(average_tensor, 0)
average_tensor = average_tensor/int(os.environ['WORLD_SIZE'])
mae = average_tensor[0].item()
is_best = False
if (1 - mae / best_mae) >= mae_improv_tol or \
(engine.state.epoch == engine.state.max_epochs and
mae < best_mae):
best_mae = mae
is_best = True
# print("RANK {} Val Results - Epoch: {} Avg MAE: {:.5f} loss: {:.5f} BEST MAE: {:.5f}"
# .format(dist.get_rank(), trainer.state.epoch, metrics['MAE'], metrics['loss'], best_mae))
if dist.get_rank() == 0:
print('Epoch {}/{}'.format(engine.state.epoch, engine.state.max_epochs))
print('Best MAE Improvement Tolerance for checkpointing: {}%'.format(100 * mae_improv_tol))
print("RANK {} AVG {} NGPUs, best-mae: {:.5f} mae: {:.5f} loss: {:.5f}".format(
dist.get_rank(),
int(os.environ['WORLD_SIZE']),
best_mae,
average_tensor[0].item(),
average_tensor[1].item()))
fname = 'check_points/current_pth.tar'
if is_best:
save_checkpoint({'epoch': trainer.state.epoch,
'state_dict': model.module.state_dict(),
'best_mae': best_mae,
'optimizer': optimizer.state_dict(),
'amp': amp.state_dict()
}, is_best,
filename=fname)
inputs = torch.tensor([[110.0, 100.0, 120.0, 0.35, 0.1, 0.05]]).cuda()
res = model(inputs)
print('test one example:', res.item())
trainer.run(trn_dataset, max_epochs=2000)
覆盖distributed_training.py
与前面的代码相比,它有点复杂,因为:
它处理验证数据集的评估
它将模型序列化到一个文件中,并根据MAE跟踪执行得最好的模型
它从文件中恢复训练
我们可以通过以下命令来启动分布式训练:
ngpus=!echo $(nvidia-smi -L | wc -l)
!python -m torch.distributed.launch --nproc_per_node={ngpus[0]} distributed_training.py
我们需要一些耐心来训练定价模型,直到它收敛。
6
推断和Greeks
一旦训练被聚合,执行得最好的模型就被保存到check_points/目录中。
为了得到一个好的模型,我们需要数百万个数据点来训练模型,直到它收敛。通常在一台8个GPU的DGX-1机器上需要10-20个小时。我们使用1000万个训练数据点和500万个验证数据点对模型进行训练。我们没有研究训练样本的最小数量是多少,只是简单地使用了大量的数据样本。大家可以通过使用更少的数据点来进行训练。
为了节省时间,可以运行以下命令下载权重并使用它们进行推断:
! ((test ! -f './check_points/model_best.pth.tar' || test ! -f './check_points/512/model_best.pth.tar') && \
bash ./download_data.sh) || echo "Dataset is already present. No need to re-download it."
数据集已经存在。不需要重新下载。
我们可以加载模型参数并使用它进行推断。
from model import Net
import torch
checkpoint = torch.load('check_points/model_best.pth.tar')
model = Net().cuda()
model.load_state_dict(checkpoint['state_dict'])
inputs = torch.tensor([[110.0, 100.0, 120.0, 0.35, 0.1, 0.05]]).cuda()
model(inputs)
tensor([[18.7140]], device='cuda:0', grad_fn=)
建立深度学习模型的好处之一是可以很容易地计算出Greeks。我们只需要利用Pytorch中的auto-grad特征。下面是一个计算多元多项式函数一阶微分的例子。
import torch
from torch.autograd import grad
'''
z = (xy)^2
x = 3, y =2
first order deriv [24 36]
'''
inputs = torch.tensor([3.0,2.0], requires_grad=True)
z = (inputs[0]*inputs[1])**2
first_order_grad = grad(z, inputs, create_graph=True)
print(first_order_grad)
(tensor([24., 36.], grad_fn=),)
我们可以使用grad函数来计算参数:K, B, S0, sigma, mu, r的一阶差分:
inputs = torch.tensor([[110.0, 100.0, 120.0, 0.35, 0.1, 0.05]]).cuda()
inputs.requires_grad = True
x = model(inputs)
x.backward()
first_order_gradient = inputs.grad
first_order_gradient
tensor([[-6.7092e-01, -2.1257e-02, 7.8896e-01, 1.9219e+01, 4.8331e+01,
-1.8419e+01]], device='cuda:0')
画出函数曲线:
%matplotlib inline
import pylab
import numpy as np
def compute_delta(S):
inputs = torch.tensor([[110.0, 100.0, S, 0.35, 0.1, 0.05]]).cuda()
inputs.requires_grad = True
x = model(inputs)
x.backward()
first_order_gradient = inputs.grad
return first_order_gradient[0][2]
prices = np.arange(10, 200, 0.1)
deltas = []
for p in prices:
deltas.append(compute_delta(p).item())
fig = pylab.plot(prices, deltas)
pylab.xlabel('prices')
pylab.ylabel('Delta')
fig
在PyTorch中计算二阶导数也很简单。我们只需要应用两次grad函数。下面是计算同一多项式函数的二阶导数的例子:
import torch
from torch.autograd import grad
'''
z = (xy)^2
x = 3, y =2
first order deriv [24 36]
d2z/dx2 = 8
d2z/dxdy = 24
d2z/dy2 = 18
'''
inputs = torch.tensor([3.0,2.0], requires_grad=True)
z = (inputs[0]*inputs[1])**2
first_order_grad = grad(z, inputs, create_graph=True)
second_order_grad_x, = grad(first_order_grad[0][0], inputs, retain_graph=True) #
second_order_grad_y, = grad(first_order_grad[0][1], inputs)
print(second_order_grad_x)
print(second_order_grad_y)
tensor([ 8., 24.])
tensor([24., 18.])
利用这个机制,我们可以在下面的例子中计算二阶导数:
,,,,,
import torch
from torch import Tensor
from torch.autograd import Variable
from torch.autograd import grad
from torch import nn
inputs = torch.tensor([[110.0, 100.0, 120.0, 0.35, 0.1, 0.05]]).cuda()
inputs.requires_grad = True
x = model(inputs)
loss_grads = grad(x, inputs, create_graph=True)
drv = grad(loss_grads[0][0][2], inputs)
drv
Out[9]:
(tensor([[-0.0143, 0.0039, 0.0098, -0.3183, 1.1455, -0.7876]],
device='cuda:0'),)
Gamma是S的二阶差分。 我们可以把Gamma曲线画成股票价格的函数:
import pylab
import numpy as np
def compute_gamma(S):
inputs = torch.tensor([[110.0, 100.0, S, 0.35, 0.1, 0.05]]).cuda()
inputs.requires_grad = True
x = model(inputs)
loss_grads = grad(x, inputs, create_graph=True)
drv = grad(loss_grads[0][0][2], inputs)
return drv[0][0][2]
prices = np.arange(10, 200, 0.1)
deltas = []
for p in prices:
deltas.append(compute_gamma(p).item())
fig2 = pylab.plot(prices, deltas)
pylab.xlabel('prices')
pylab.ylabel('Gamma')
fig2
隐含波动率是基于期权报价对标的资产的预测波动率。给出的模型是价格与期权参数的反向映射,用蒙特卡罗模拟法很难做到这一点。但如果我们有深度学习定价模型,这是一个简单的任务。我们可以先画出波动率和期权价格之间的关系:
import pylab
import numpy as np
def compute_price(sigma):
inputs = torch.tensor([[110.0, 100.0, 120.0, sigma, 0.1, 0.05]]).cuda()
x = model(inputs)
return x.item()
sigmas = np.arange(0, 0.5, 0.1)
prices = []
for s in sigmas:
prices.append(compute_price(s))
fig3 = pylab.plot(sigmas, prices)
pylab.xlabel('Sigma')
pylab.ylabel('Price')
fig3
给定价格P,隐含波动率是compute_price函数的根。我们可以用二分法求根。
def bisection_root(small, large, fun, target, EPS=1e-6):
if fun(large) - target < 0:
print('upper bound is too small')
return None
if fun(small) - target > 0:
print('lower bound is too large')
return None
while large - small > EPS:
mid = (large + small) / 2.0
if fun(mid) - target >= 0:
large = mid
else:
small = mid
mid = (large + small) / 2.0
return mid, abs(fun(mid) - target)
quoted_price = 16.0
sigma, err = bisection_root(0, 0.5, compute_price, quoted_price)
print('implied volativity', sigma, 'error', err)
implied volativity 0.18517351150512695 error 4.76837158203125e-06
更多案例视频完整源代码加群:850591259
Python王牌加速库深度学习下的障碍期权定价相关教程
python barrier option pricing_Python王牌加速库深度学习下的障碍期权定价相关推荐
- python barrier option pricing_Python王牌加速库:深度学习下的障碍期权定价!
蒙特卡罗模拟需要数以百万计的路径来得到精确的答案,这需要大量的计算.Ryan等人得研究表明,可以训练深度学习模型对衍生品进行估值.深度学习模型是准确和快速的,能够产生比传统模型快一百万倍的估值.在今天 ...
- 障碍期权定价 python_Python王牌加速库2:深度学习下的障碍期权定价
作者:Yi Dong 编译:1+1=6 1.前言 上一期推文中,我们使用了Numba和CuPy来运行蒙特卡罗模拟来确定亚式障碍期权的价格.Python王牌加速库:奇异期权定价的利器mp.weixin ...
- python barrier option pricing_《Python金融数据分析》书内代码实战与讲解(二)金融衍生物定价...
4.2.1 欧式期权定价 先编写StockOption类,存放需要的参数. """ Store common attributes of a stock option & ...
- 模型加速:深度学习模型的硬件加速:NVIDIAT240
作者:禅与计算机程序设计艺术 模型加速:深度学习模型的硬件加速:NVIDIA T240 在当前深度学习模型的规模和复杂度不断增加的情况下,硬件加速已经成为一个重要的技术手段.本文将介绍NVIDIA T ...
- Alluxio 助力 Kubernetes,加速云端深度学习
作者 | 车漾 阿里云高级技术专家 范斌 Alluxio 创始成员,开源社区副总裁 来源 | 阿里巴巴云原生公众号 为什么要加速云端深度学习 人工智能是近几年非常火热的技术领域,而推动这个领域快 ...
- 北京 | 免费高效训练及OpenVINO™加速推理深度学习实战,送Intel神经计算棒二代...
当今人工智能时代,深度学习极大得促进了计算机视觉技术的快速应用和成熟,也是算法工程师们必须掌握的一项技能,然而,不同环境的依赖部署,高算力的需求,海量数据量需求及算法应用高硬件成本也让深度学习陷入了规 ...
- 回归素材(part9)--PYTHON机器学习手册-从数据预处理到深度学习
学习笔记,仅供参考,有错必纠 PYTHON机器学习手册-从数据预处理到深度学习 通过正则化减少方差 我们可以使用岭回归或者Lasso回归,介绍回归模型的方差.
- python人工智能课程实例_python人工智能AI深度学习/机器学习全套课程 视频教程+ppt+代码...
这是一套Python/人工智能/AI/机器学习/深度学习 全套实战课程,包含视频教程以及文档.源码等,欢迎下载 01. python数据分析与机器学习实战 02.深度学习入门视频课程(上篇) 03.深 ...
- 阿里云异构计算平台——加速AI深度学习创新
云栖TechDay第36期,阿里云高级产品专家霁荣带来"阿里云异构计算平台--加速AI深度学习创新"的演讲.本文主要从深度学习催生强大计算力需求开始谈起,包括GPU的适用场景,进而 ...
最新文章
- The Swift Code之UITextField的使用,及事件委托
- Linux 入门记录:六、Linux 硬件相关概念(硬盘、磁盘、磁道、柱面、磁头、扇区、分区、MBR、GPT)...
- 美国波特兰市禁用人脸识别,被违规监控可获赔1000美元
- express细节点注意
- WebLogic Classloader分析工具
- Python Web框架Tornado的异步处理代码演示样例
- OpenCV中绘制外围矩形框和圆框
- SYNCHRONIZE_DRAIN的用处
- java 进阶 知乎_(二)零基础写Java知乎爬虫之进阶篇
- More Effective C++ 笔记
- 微信的 Bug 差点让我被老板炒鱿鱼!
- (转载)python日期函数
- ktv服务器管理系统,KTV收银管理系统.doc
- Java去除富文本编辑器中的格式标签
- OSI网络七层协议与TCPIP协议
- 2018年大数据的发展趋势,小白学前必备
- java面试常见知识点整理
- 52单片机定时器0-2实现1ms定时
- 二叉平衡树的基本操作(完整代码)
- 抖音的服务器到底啥配置?
热门文章
- html自动生成盒子,新建一个宽度450px 高度为100px的DIV盒子
- python中执行shell脚本之subprocess模块
- 怎样创建一个简单的mysql数据库
- movie计算机英语作文,求一篇以“Talkaboutyourfavoritemovie”为题的英语作文
- centos 卸载ffmpeg_CentOS7安装ffmpeg
- OpenCVSharpSample04WinForms
- 操作符基础----加减乘除赋值
- java雪花数据库长度_雪花算法(SnowFlake)Java实现
- java utf8乱码_java utf-8 乱码怎么办
- iPhone4 iOS 4.3.3 更换“新增”短信铃声