Source code for argamma.arg

#!/usr/bin/env python
# -*- coding: utf-8 -*-
r"""
ARG model
=========

The code is an implementation of ARG model given in [1]_.
Its major features include:

    * simulation of stochastic volatility and returns
    * estimation using both MLE and GMM
    * option pricing

References
----------

.. [1] Stanislav Khrapov and Eric Renault (2014)
    "Affine Option Pricing Model in Discrete Time",
    working paper, New Economic School.
    <http://goo.gl/yRVsZp>

.. [2] Christian Gourieroux and Joann Jasiak (2006)
    "Autoregressive Gamma Processes",
    2006, *Journal of Forecasting*, 25(2), 129–152. doi:10.1002/for.978

.. [3] Serge Darolles, Christian Gourieroux, and Joann Jasiak (2006)
    "Structural Laplace Transform and Compound Autoregressive Models"
    *Journal of Time Series Analysis*, 27(4), 477–503.
    doi:10.1111/j.1467-9892.2006.00479.x

"""
from __future__ import print_function, division

import warnings

import numpy as np
import numexpr as ne
import matplotlib.pylab as plt
import seaborn as sns
import scipy.stats as scs
from scipy.optimize import minimize
import numdifftools as nd

from statsmodels.tsa.tsatools import lagmat

from .argparams import ARGparams
from .helper_functions import days_from_maturity
from mygmm import GMM
from fangoosterlee import cosmethod

__all__ = ['ARG']


[docs]class ARG(object): r"""Class for ARG model. .. math:: E\left[\left.\exp\left\{ -uY_{t}\right\} \right|Y_{t-1}\right] =\exp\left\{ -a(u)Y_{t-1}-b(u)\right\} Attributes ---------- vol Volatility series ret Asset return series param Parameters of the model maturity Maturity of the option or simply time horizon. Fraction of a year, i.e. 30/365 riskfree Risk-free rate of return per day Methods ------- afun a(u) function bfun b(u) function cfun c(u) function plot_abc Vizualize functions a, b, and c vsim Simulate ARG(1) volatility process vsim2 Simulate ARG(1) volatility process rsim Simulate returns given volatility load_data Load data to the class estimate_mle Estimate model parameters via Maximum Likelihood estimate_gmm Estimate model parameters using GMM cos_restriction Restrictions used in COS method of option pricing charfun Risk-neutral conditional characteristic function (one argument only) option_premium Model implied option premium via COS method """ def __init__(self, param=None): """Initialize class instance. Parameters ---------- param : ARGparams instance, optional Parameters of the model """ self.vol = None self.ret = None self.param = param self.maturity = None self.riskfree = None def convert_to_q(self, param): """Convert physical (P) parameters to risk-neutral (Q) parameters. Parameters ---------- param : ARGparams instance Physical (P) parameters Returns ------- ARGparams instance Risk-neutral parameters """ paramq = ARGparams() paramq.update(theta=param.get_theta(), price_vol=param.price_vol) factor = 1/(1 + param.get_scale() \ * (param.price_vol + self.alpha(param.price_ret, param))) if factor <= 0 or factor > 1/param.rho**.5: print('Lower bound for theta1 is %.2f' % self.vol_price_lower_bound(param)) raise ValueError('Invalid parameters in Q conversion!') delta = param.delta scale = param.get_scale() * factor beta = param.get_beta() * factor rho = scale * beta mean = scale * delta / (1 - rho) paramq.update(theta_vol=[mean, rho, delta]) return paramq
[docs] def load_data(self, vol=None, ret=None): """Load data into the model object. Parameters ---------- vol : (nobs, ) array Volatility time series ret : (nobs, ) array Return time series """ if vol is not None: self.vol = vol if ret is not None: self.ret = ret
[docs] def afun(self, uarg, param): r"""Function a(). .. math:: a\left(u\right)=\frac{\rho u}{1+cu} Parameters ---------- uarg : array Grid param : ARGparams instance Model parameters Returns ------- array Same dimension as uarg """ # return param.rho * uarg / (1 + param.scale * uarg) rho = param.rho scale = param.get_scale() return ne.evaluate("rho * uarg / (1 + scale * uarg)")
[docs] def bfun(self, uarg, param): r"""Function b(). .. math:: b\left(u\right)=\delta\log\left(1+cu\right) Parameters ---------- uarg : array Grid param : ARGparams instance Model parameters Returns ------- array Same dimension as uarg """ # return param.delta * np.log(1 + param.scale * uarg) scale = param.get_scale() delta = param.delta return ne.evaluate("delta * log(1 + scale * uarg)")
def afun_q(self, uarg, param): r"""Risk-neutral function a(). Parameters ---------- uarg : array Grid param : ARGparams instance Model parameters Returns ------- array Same dimension as uarg """ return self.afun(uarg, self.convert_to_q(param)) def bfun_q(self, uarg, param): r"""Risk-neutral function b(). Parameters ---------- uarg : array Grid param : ARGparams instance Model parameters Returns ------- array Same dimension as uarg """ return self.bfun(uarg, self.convert_to_q(param)) def dafun(self, uarg, param): r"""Derivative of function a() with respect to scale, rho, and delta. .. math:: \frac{\partial a}{\partial c}\left(u\right) &=-\frac{\rho u^2}{\left(1+cu\right)^2} \\ \frac{\partial a}{\partial \rho}a\left(u\right) &=\frac{u}{1+cu} \\ \frac{\partial a}{\partial \delta}a\left(u\right) &=0 Parameters ---------- uarg : (nu, ) array Grid param : ARGparams instance Model parameters Returns ------- (3, nu) array """ da_scale = -param.rho * uarg**2 / (param.get_scale()*uarg + 1)**2 da_rho = uarg / (param.get_scale()*uarg + 1) da_delta = np.zeros_like(uarg) return np.vstack((da_scale, da_rho, da_delta)) def dbfun(self, uarg, param): r"""Derivative of function b() with respect to scale, rho, and delta. .. math:: \frac{\partial b}{\partial c}\left(u\right) &=\frac{\delta u}{1+cu} \\ \frac{\partial b}{\partial \rho}\left(u\right) &=0 \\ \frac{\partial b}{\partial \delta}\left(u\right) &=\log\left(1+cu\right) Parameters ---------- uarg : (nu, ) array Grid param : ARGparams instance Model parameters Returns ------- (3, nu) array """ db_scale = param.delta * uarg / (1 + param.get_scale() * uarg) db_rho = np.zeros_like(uarg) db_delta = np.log(1 + param.get_scale() * uarg) return np.vstack((db_scale, db_rho, db_delta))
[docs] def cfun(self, uarg, param): r"""Function c(). .. math:: c\left(u\right)=\delta\log\left\{1+\frac{cu}{1-\rho}\right\} Parameters ---------- uarg : array Grid param : ARGparams instance Model parameters Returns ------- array Same dimension as uarg """ return param.delta * np.log(1 + param.get_scale()*uarg / (1-param.rho))
def center(self, param): """No-arb restriction parameter. Parameters ---------- param : ARGparams instance Model parameters Returns ------- float Same dimension as uarg """ return param.phi / (param.get_scale() * (1 + param.rho))**.5 def psi(self, param): """Function psi. Parameters ---------- param : ARGparams instance Model parameters Returns ------- float """ return (param.price_ret-.5) * (1-param.phi**2) + self.center(param) def alpha(self, uarg, param): """Function alpha(). Parameters ---------- uarg : array Grid param : ARGparams instance Model parameters Returns ------- array Same dimension as uarg """ return self.psi(param) * uarg - .5 * uarg**2 * (1 - param.phi**2) def beta(self, uarg, param): """Function beta(). Same for risk-neutral. Parameters ---------- uarg : array Grid param : ARGparams instance Model parameters Returns ------- array Same dimension as uarg """ return uarg * self.afun_q(- self.center(param), param) def gamma(self, uarg, param): """Function gamma(). Same for risk-neutral. Parameters ---------- uarg : array Grid param : ARGparams instance Model parameters Returns ------- array Same dimension as uarg """ return uarg * self.bfun_q(- self.center(param), param) def lgfun(self, uarg, varg, param): """Function l(u, v) and g(u, v) in joint characteristic function. Parameters ---------- uarg : array Grid for volatility varg : array Grid for returns param : ARGparams instance Model parameters Returns ------- lfun : array gfun : array """ alpha = self.alpha(varg, param) lfun = self.afun(uarg + alpha, param) + self.beta(varg, param) gfun = self.bfun(uarg + alpha, param) + self.gamma(varg, param) return lfun, gfun def lgfun_q(self, uarg, varg, param): """Function l(u, v) and g(u, v) in joint risk-neutral characteristic function. Parameters ---------- uarg : array Grid for volatility varg : array Grid for returns param : ARGparams instance Model parameters Returns ------- lfun : array gfun : array """ lfun1, gfun1 = self.lgfun(uarg + param.price_vol, varg + param.price_ret, param) lfun2, gfun2 = self.lgfun(param.price_vol, param.price_ret, param) lfun = lfun1 - lfun2 gfun = gfun1 - gfun2 return lfun, gfun def ch_fun_elements(self, varg, param): """Functions psi(v, n) and ups(v, n) in risk-neutral characteristic function of returns for n periods. Parameters ---------- varg : array Grid for returns param : ARGparams instance Model parameters Returns ------- psi : array ups : array """ periods = days_from_maturity(self.maturity) ones = np.ones_like(periods) lfun, gfun = self.lgfun_q(0., varg, param) varg, psi, ups = varg * ones, lfun * ones, gfun * ones while True: if np.array_equal(periods, ones): return psi, ups cond = periods > 1 periods[cond] -= 1 lfun, gfun = self.lgfun_q(psi[:, cond], varg[:, cond], param) ups[:, cond] += gfun psi[:, cond] = lfun def store_ch_fun_elements(self, varg, param): """Functions psi(v, n) and ups(v, n) in risk-neutral characteristic function of returns for n periods. Parameters ---------- varg : array Grid for returns param : ARGparams instance Model parameters Returns ------- psi : array ups : array """ periods = np.max(days_from_maturity(self.maturity)) psi, ups = self.lgfun_q(0., varg, param) psidict, upsdict = {1: psi}, {1: ups} while True: if periods == 1: return psidict, upsdict periods -= 1 lfun, gfun = self.lgfun_q(psi, varg, param) ups += gfun psi = lfun psidict[periods], upsdict[periods] = psi, ups def char_fun_ret_q(self, varg, param): r"""Conditional risk-neutral Characteristic function (return). Parameters ---------- varg : array_like Grid for returns. Real values only. param : ARGparams instance Model parameters Returns ------- array_like Characteristic function for each observation and each grid point Notes ----- Conditional on :math:`\sigma_t` only All market data (vol, maturity, riskfree) can be vectors of the same size, and varg can be a vector of another size, but of transposed shape, i.e. vol = np.ones(5), and varg = np.ones((10, 1)) """ if self.vol is None: raise ValueError('Volatility is not loaded!') if self.maturity is None: raise ValueError('Maturity is not loaded!') if self.riskfree is None: raise ValueError('Risk-free rate is not loaded!') if np.iscomplex(varg).any(): raise ValueError('Argument must be real!') psi, ups = self.ch_fun_elements(-1j * varg, param) # discount = np.exp(- 1j * varg * self.riskfree * self.maturity) return np.exp(- self.vol * psi - ups) def char_fun_vol(self, uarg, param): """Conditional Characteristic function (volatility). Parameters ---------- uarg : array Grid. If real, then returns Laplace transform. If complex, then returns characteristi function. param : ARGparams instance Model parameters Returns ------- (nobs-1, nu) array Characteristic function for each observation and each grid point """ return np.exp(- self.vol[:-1, np.newaxis] * self.afun(uarg, param) - np.ones((self.vol[1:].shape[0], 1)) * self.bfun(uarg, param)) def char_fun_ret(self, uarg, param): r"""Conditional Characteristic function (return). Parameters ---------- uarg : array Grid. If real, then returns Laplace transform. If complex, then returns characteristi function. param : ARGparams instance Model parameters Returns ------- (nobs-1, nu) array Characteristic function for each observation and each grid point Notes ----- Conditional on current :math:`\sigma_{t+1}` and past :math:`\sigma_t` """ return np.exp(-self.vol[1:, np.newaxis] * self.alpha(uarg, param) - self.vol[:-1, np.newaxis] * self.beta(uarg, param) - np.ones((self.vol[1:].shape[0], 1)) * self.gamma(uarg, param)) def umean(self, param): r"""Unconditional mean of the volatility process. .. math:: E\left[Y_{t}\right]=\frac{c\delta}{1-\rho} Parameters ---------- param : ARGparams instance Model parameters Returns ------- float """ return param.get_scale() * param.delta / (1 - param.rho) def uvar(self, param): r"""Unconditional variance of the volatility process. .. math:: V\left[Y_{t}\right]=\frac{c^{2}\delta}{\left(1-\rho\right)^{2}} Parameters ---------- param : ARGparams instance Model parameters Returns ------- float """ return self.umean(param) / param.delta def ustd(self, param): r"""Unconditional standard deviation of the volatility process. .. math:: \sqrt{V\left[Y_{t}\right]} Parameters ---------- param : ARGparams instance Model parameters Returns ------- float """ return self.uvar(param) ** .5 def plot_abc(self, uarg, param): """Plot a() and b() functions on the same plot. """ plt.figure(figsize=(8, 4)) plt.subplot(1, 3, 1) plt.plot(uarg, self.afun(uarg, param)) plt.axhline(0) plt.axvline(0) plt.ylabel('$a(u)$') plt.xlabel('$u$') plt.subplot(1, 3, 2) plt.plot(uarg, self.bfun(uarg, param)) plt.axhline(0) plt.axvline(0) plt.ylabel('$b(u)$') plt.xlabel('$u$') plt.subplot(1, 3, 3) plt.plot(uarg, self.cfun(uarg, param)) plt.axhline(0) plt.axvline(0) plt.ylabel('$c(u)$') plt.xlabel('$u$') plt.tight_layout() plt.show()
[docs] def vsim(self, nsim=1, nobs=int(1e2), param=None): r"""Simulate ARG(1) process for volatility. .. math:: Z_{t}|Y_{t-1}&\sim\mathcal{P}\left(\beta Y_{t-1}\right)\\ Y_{t}|Z_{t}&\sim\gamma\left(\delta+Z_{t},c\right) Parameters ---------- nsim : int Number of series to simulate nobs : int Number of observations to simulate param : ARGparams instance Model parameters Returns ------- (nobs, nsim) array Simulated data """ vol = np.empty((nobs, nsim)) vol[0] = param.get_mean() for i in range(1, nobs): temp = np.random.poisson(param.get_beta() * vol[i-1]) vol[i] = param.get_scale() * np.random.gamma(param.delta + temp) return vol
[docs] def vsim2(self, nsim=1, nobs=int(1e2), param=None): """Simulate ARG(1) process for volatility. Uses non-central Chi-square distribution to simulate in one step. Parameters ---------- nsim : int Number of series to simulate nobs : int Number of observations to simulate param : ARGparams instance Model parameters Returns ------- (nobs, nsim) array Simulated data """ vol = np.empty((nobs, nsim)) vol[0] = param.get_mean() for i in range(1, nobs): df = param.delta * 2 nc = param.rho * vol[i-1] vol[i] = scs.ncx2.rvs(df, nc, size=nsim) return vol * param.get_scale() / 2
def vol_cmean(self, param): """Conditional mean of volatility. Parameters ---------- param : ARGparams instance Model parameters Returns ------- (nobs, nsim) array Conditional mean """ return param.rho * self.vol[:-1] + param.delta * param.get_scale() def vol_cvar(self, param): """Conditional variance of volatility. Parameters ---------- param : ARGparams instance Model parameters Returns ------- (nobs, nsim) array Conditional variance """ return (2 * param.rho * self.vol[:-1] + param.delta * param.get_scale()) * param.get_scale() def vol_kfun(self, param): """Conditional variance of volatility. Parameters ---------- param : ARGparams instance Model parameters Returns ------- (nobs, nsim) array Conditional mean """ return self.umean(param) / ((2 * param.rho * self.umean(param) + param.delta * param.get_scale()) * param.get_scale()) def ret_cmean(self, param): """Conditional mean of return. Parameters ---------- param : ARGparams instance Model parameters Returns ------- (nobs, nsim) array Conditional mean """ A1 = self.psi(param) B1 = self.afun_q(- self.center(param), param) C1 = self.bfun_q(- self.center(param), param) return A1 * self.vol[1:] + B1 * self.vol[:-1] + C1 def ret_cvar(self, param): """Conditional variance of return. Parameters ---------- param : ARGparams instance Model parameters Returns ------- (nobs, nsim) array Conditional mean """ # u = sp.Symbol('u') # A2 = float(-self.alpha(u, param).diff(u, 2).subs(u, 0)) # B2 = float(-self.beta(u, param).diff(u, 2).subs(u, 0)) # C2 = float(-self.gamma(u, param).diff(u, 2).subs(u, 0)) # return A2 * self.vol[1:] + B2 * self.vol[:-1] + C2 return (1 - param.phi**2) * self.vol[1:] def erp(self, param): """Conditional equity risk premium. Parameters ---------- param : ARGparams instance Model parameters Returns ------- (nobs, nsim) array ERP """ vrp = self.vol_cmean(param) - self.vol_cmean(self.convert_to_q(param)) return (param.price_ret * (1-param.phi**2) * self.vol_cmean(param) + (self.center(param) - (1-param.phi**2)/2) * vrp) def vrp(self, param): """Conditional volatility risk premium. Parameters ---------- param : ARGparams instance Model parameters Returns ------- (nobs, nsim) array VRP """ volp = self.vol_cmean(param)**.5 volq = self.vol_cmean(self.convert_to_q(param))**.5 return volp - volq def overdispersion(self, param): """Conditional overdispersion. Parameters ---------- param : ARGparams instance Model parameters Returns ------- (nobs, nsim) array Conditional overdispersion """ return self.vol_cmean(param) / self.vol_cvar(param) def corr_series(self, param): """Conditional correlation time series. Parameters ---------- param : ARGparams instance Model parameters Returns ------- (nobs, nsim) array Conditional correlation """ return self.psi(param) * (self.psi(param)**2 + (1-param.phi**2) * self.overdispersion(param)) ** (-.5) def approx_ratio(self, param): """Approximation ratio. Parameters ---------- param : ARGparams instance Model parameters Returns ------- (nobs, nsim) array Approximation ratio """ return 1 + param.phi**2 * (self.vol_kfun(param) / self.overdispersion(param) - 1)
[docs] def rsim(self, param=None): """Simulate returns given ARG(1) process for volatility. Parameters ---------- param : ARGparams instance Model parameters Returns ------- (nobs, nsim) array Simulated data """ # simulate returns ret = np.zeros_like(self.vol) ret[1:] = self.ret_cmean(param) + self.ret_cvar(param)**.5 \ * np.random.normal(size=self.vol[1:].shape) return ret
def vsim_last(self, **args): """The last observation in the series of simulations. Parameters ---------- args : dict Same parameters as in vsim function Returns ------- (nsim, ) array Last observations TODO : This function could be less time consuming if intermediate values were not created. """ return self.vsim(**args)[-1] def plot_vsim(self, param=None): """Plot simulated ARG process.""" np.random.seed(seed=1) vol = self.vsim2(nsim=2, param=param).T plt.figure(figsize=(8, 4)) for voli in vol: plt.plot(voli) plt.show() def plot_vlast_density(self, nsim=100, nobs=100, param=None): """Plot the marginal density of ARG process.""" plt.figure(figsize=(8, 4)) vol = self.vsim_last(nsim=int(nsim), nobs=int(nobs), param=param) sns.distplot(vol, rug=True, hist=False) plt.show()
[docs] def estimate_mle(self, param_start=None, model=None, bounds=None): """Estimate model parameters via Maximum Likelihood. Parameters ---------- param_start : ARGparams instance, optional Starting value for optimization model : str Type of model to estimate. Must be in: - 'vol' - 'ret' - 'joint' bounds : list of tuples Bounds on parameters, i.e. [(min, max)] Returns ------- param_final : ARGparams instance Estimated parameters results : OptimizeResult instance Optimization output """ if param_start is None: param_start = ARGparams() # Optimization options options = {'disp': False, 'maxiter': int(1e6)} if model == 'vol': likelihood = self.likelihood_vol theta_start = param_start.get_theta_vol() elif model == 'ret': likelihood = lambda x: \ self.likelihood_ret(x, param_start.get_theta_vol(), param_start.price_vol) theta_start = param_start.get_theta_ret() elif model == 'joint': likelihood = lambda x: \ self.likelihood_joint(x, param_start.price_vol) theta_start = param_start.get_theta() else: raise ValueError('Model type not supported') results = minimize(likelihood, theta_start, method='L-BFGS-B', options=options, bounds=bounds) with warnings.catch_warnings(): warnings.simplefilter("ignore") try: hess_mat = nd.Hessian(likelihood)(results.x) except ValueError: hess_mat = nd.Jacobian(nd.Gradient(likelihood))(results.x) results.std_theta = np.diag(np.linalg.inv(hess_mat) \ / len(self.vol))**.5 results.tstat = results.x / results.std_theta param_final = param_start if model == 'vol': param_final.update(theta_vol=results.x) elif model == 'ret': param_final.update(theta_ret=results.x) elif model == 'joint': param_final.update(theta=results.x) else: raise ValueError('Model type not supported') return param_final, results
def likelihood_vol(self, theta_vol): """Log-likelihood for ARG(1) volatility model. Parameters ---------- theta : array_like Model parameters. [mean, rho, delta] Returns ------- float Value of the log-likelihood function """ param = ARGparams() try: param.update(theta_vol=theta_vol) except ValueError: return 1e10 degf = param.delta * 2 nonc = param.rho * self.vol[:-1] / param.get_scale() * 2 scale = param.get_scale() / 2 logf = scs.ncx2.logpdf(self.vol[1:], degf, nonc, scale=scale) return -logf[~np.isnan(logf)].mean() def likelihood_ret(self, theta_ret, theta_vol, price_vol): """Log-likelihood for return model. Parameters ---------- theta_ret : array_like Model parameters. [phi, price_ret] theta_vol : array_like Volatility model parameters. [phi, price_ret] price_vol : float Price of volatility risk Returns ------- float Value of the log-likelihood function """ param = ARGparams() try: param.update(theta_ret=theta_ret, theta_vol=theta_vol, price_vol=price_vol) if price_vol < self.vol_price_lower_bound(param): raise ValueError r_mean = self.ret_cmean(param) r_var = self.ret_cvar(param) except ValueError: return 1e10 return - scs.norm.logpdf(self.ret[1:], r_mean, np.sqrt(r_var)).mean() def likelihood_joint(self, theta, price_vol): """Log-likelihood for joint model. Parameters ---------- theta : array_like Model parameters price_vol : float Price of volatility risk Returns ------- float Value of the log-likelihood function """ theta_vol, theta_ret = theta[:3], theta[3:] return self.likelihood_vol(theta_vol) \ + self.likelihood_ret(theta_ret, theta_vol, price_vol) def vol_price_lower_bound(self, param): """Get lower bound for volatility risk price. Parameters ---------- param : ARGparams instance Returns ------- float """ return (param.rho**.5 - 1) / param.get_scale() \ -self.alpha(param.price_ret, param) def momcond_vol(self, theta_vol, uarg=None, zlag=1): """Moment conditions (volatility) for spectral GMM estimator. Parameters ---------- theta_vol : (3, ) array Vector of model parameters. [scale, rho, delta] uarg : (nu, ) array Grid to evaluate a and b functions zlag : int Number of lags to use for the instrument Returns ------- moment : (nobs, nmoms) array Matrix of momcond restrictions dmoment : (nmoms, nparams) array Gradient of momcond restrictions. Mean over observations Raises ------ ValueError """ if uarg is None: raise ValueError("uarg is missing!") vollag, vol = lagmat(self.vol, maxlag=zlag, original='sep', trim='both') prevvol = vollag[:, 0][:, np.newaxis] # Number of observations after truncation nobs = vol.shape[0] # Number of moments nmoms = 2 * uarg.shape[0] * (zlag+1) # Number of parameters nparams = theta_vol.shape[0] # Change class attribute with the current theta param = ARGparams() try: param.update(theta_vol=theta_vol) except ValueError: return np.ones((nobs, nmoms))*1e10, np.ones((nmoms, nparams))*1e10 # Must be (nobs, nu) array error = np.exp(-vol * uarg) - self.char_fun_vol(uarg, param)[zlag-1:] # Instruments, (nobs, ninstr) array instr = np.hstack([np.exp(-1j * vollag), np.ones((nobs, 1))]) # Must be (nobs, nmoms) array moment = error[:, np.newaxis, :] * instr[:, :, np.newaxis] moment = moment.reshape((nobs, nmoms//2)) # (nobs, 2 * ninstr) moment = np.hstack([np.real(moment), np.imag(moment)]) # Initialize derivative matrix dmoment = np.empty((nmoms, nparams)) for i in range(nparams): dexparg = - prevvol * self.dafun(uarg, param)[i] \ - np.ones((nobs, 1)) * self.dbfun(uarg, param)[i] derror = - self.char_fun_vol(uarg, param)[zlag-1:] * dexparg derrorinstr = derror[:, np.newaxis, :] * instr[:, :, np.newaxis] derrorinstr = derrorinstr.reshape((nobs, nmoms//2)) derrorinstr = np.hstack([np.real(derrorinstr), np.imag(derrorinstr)]) dmoment[:, i] = derrorinstr.mean(0) return moment, dmoment def moment_ret(self, theta_ret, theta_vol=None, uarg=None, zlag=1, **kwargs): """Moment conditions (returns) for spectral GMM estimator. Parameters ---------- theta_ret : (2, ) array Vector of model parameters. [phi, price_ret] theta_vol : (3, ) array Vector of model parameters. [mean, rho, delta] uarg : (nu, ) array Grid to evaluate a and b functions zlag : int Number of lags to use for the instrument Returns ------- moment : (nobs, nmoms) array Matrix of momcond restrictions Raises ------ ValueError """ if uarg is None: raise ValueError("uarg is missing!") vollag, vol = lagmat(self.vol, maxlag=zlag, original='sep', trim='both') # Number of observations after truncation nobs = vol.shape[0] # Number of moments nmoms = 2 * uarg.shape[0] * (zlag+1) # Change class attribute with the current theta param = ARGparams() try: param.update(theta_ret=theta_ret, theta_vol=theta_vol) except ValueError: return np.ones((nobs, nmoms))*1e10 # Must be (nobs, nu) array try: cfun = self.char_fun_ret(uarg, param)[zlag-1:] except ValueError: return np.ones((nobs, nmoms))*1e10 # Must be (nobs, nu) array error = np.exp(-self.ret[zlag:, np.newaxis] * uarg) - cfun # Instruments, (nobs, ninstr) array instr = np.hstack([np.exp(-1j * vollag), np.ones((nobs, 1))]) # Must be (nobs, nmoms) array moment = error[:, np.newaxis, :] * instr[:, :, np.newaxis] moment = moment.reshape((nobs, nmoms//2)) # (nobs, 2 * ninstr) moment = np.hstack([np.real(moment), np.imag(moment)]) return moment def momcond_ret(self, theta_ret, **kwargs): """Moment conditions (returns) for spectral GMM estimator. Parameters ---------- theta_ret : (2, ) array Vector of model parameters. [phi, price_ret] Returns ------- moment : (nobs, nmoms) array Matrix of momcond restrictions dmoment : (nmoms, nparams) array Gradient of momcond restrictions. Mean over observations """ mom = self.moment_ret(theta_ret, **kwargs) dmom = self.dmoment_ret(theta_ret, **kwargs) return mom, dmom def dmoment_ret(self, theta_ret, **kwargs): """Derivative of moments (returns) for spectral GMM estimator. Parameters ---------- theta_ret : (2, ) array Vector of model parameters. [phi, price_ret] Returns ------- (nmoms, nparams) array Gradient of moment restrictions. Mean over observations """ mom = lambda theta: self.moment_ret(theta, **kwargs).mean(0) with np.errstate(divide='ignore'): return nd.Jacobian(mom)(theta_ret) def moment_joint(self, theta, **kwargs): """Moment conditions (joint) for spectral GMM estimator. Parameters ---------- theta : (5, ) array Vector of model parameters. [phi, price_ret] Returns ------- moment : (nobs, nmoms) array Matrix of momcond restrictions """ theta_vol, theta_ret = theta[:3], theta[3:] mom_vol = self.momcond_vol(theta_vol, **kwargs)[0] mom_ret = self.moment_ret(theta_ret, **kwargs) return np.hstack([mom_vol, mom_ret]) def dmoment_joint(self, theta, **kwargs): """Derivative of moment conditions (joint) for spectral GMM estimator. Parameters ---------- theta : (5, ) array Vector of model parameters. [phi, price_ret] Returns ------- (nmoms, nparams) array Gradient of moment restrictions. Mean over observations """ mom = lambda theta: self.moment_joint(theta, **kwargs).mean(0) with np.errstate(divide='ignore'): return nd.Jacobian(mom)(theta) def momcond_joint(self, theta, **kwargs): """Moment conditions (joint) for spectral GMM estimator. Parameters ---------- theta : (5, ) array Vector of model parameters. [phi, price_ret] Returns ------- moment : (nobs, nmoms) array Matrix of momcond restrictions dmoment : (nmoms, nparams) array Gradient of momcond restrictions. Mean over observations """ mom = self.moment_joint(theta, **kwargs) dmom = self.dmoment_joint(theta, **kwargs) return mom, dmom
[docs] def estimate_gmm(self, param_start=None, model='vol', **kwargs): """Estimate model parameters using GMM. Parameters ---------- param_start : ARGparams instance Starting value for optimization model : str Type of the model to estimate. Must be in: - 'vol' - 'ret' - 'joint' uarg : array Grid to evaluate a and b functions zlag : int, optional Number of lags in the instrument. Default is 1 Returns ------- param_final : ARGparams instance Estimated model parameters mygmm.Results instance GMM estimation results """ if model == 'vol': estimator = GMM(self.momcond_vol) results = estimator.gmmest(param_start.get_theta_vol(), **kwargs) elif model == 'ret': estimator = GMM(self.momcond_ret) results = estimator.gmmest(param_start.get_theta_ret(), theta_vol=param_start.get_theta_vol(), **kwargs) elif model == 'joint': estimator = GMM(self.momcond_joint) results = estimator.gmmest(param_start.get_theta(), **kwargs) else: raise ValueError('Model type not supported') param_final = param_start if model == 'vol': param_final.update(theta_vol=results.theta) elif model == 'ret': param_final.update(theta_vol=param_start.get_theta_vol(), theta_ret=results.theta) elif model == 'joint': param_final.update(theta=results.theta) else: raise ValueError('Model type not supported') return param_final, results
def cos_restriction(self): """Restrictions used in COS method of option pricing. Parameters ---------- riskfree : array_like Risk-free rate of returns, per day maturity : array_like Maturity, fraction of the year, i.e. 30/365 Returns ------- alim : array_like blim : array_like Notes ----- This method is used by COS method of option pricing """ L = 5 periods = days_from_maturity(self.maturity) c1 = self.riskfree * periods c2 = self.vol * periods alim = c1 - L * c2**.5 blim = c1 + L * c2**.5 return alim, blim
[docs] def charfun(self, varg): """Risk-neutral conditional characteristic function. Parameters ---------- varg : array Grid for evaluation of CF. Real values only. Returns ------- array Same dimension as varg Notes ----- This method is used by COS method of option pricing """ if self.param is None: raise ValueError('Parameters are not set!') return self.char_fun_ret_q(varg, self.param)
[docs] def option_premium(self, vol=None, moneyness=None, maturity=None, riskfree=None, call=None, data=None, npoints=2**10): """Model implied option premium via COS method. Parameters ---------- vol : array_like Current variance per day moneyness : array_like Log-forward moneyness, np.log(strike/price) - riskfree * maturity maturity : float, optional Maturity of the option or simply time horizon. Fraction of a year, i.e. 30/365 riskfree : float, optional Risk-free rate of return per day call : bool array_like Call/Put flag data : pandas DataFrame, record array, or dictionary of arrays Structured data. Mandatory labels: vol, moneyness, maturity, riskfree, call npoints : int Number of points on the grid. The more the better, but slower. Returns ------- array_like Model implied option premium via COS method """ if data is not None: try: data = data.to_records() except: pass maturity = data['maturity'] riskfree = data['riskfree'] vol = data['vol'] moneyness = data['moneyness'] call = data['call'] self.maturity = maturity self.riskfree = riskfree self.vol = vol return cosmethod(self, moneyness=moneyness, call=call, npoints=npoints)
if __name__ == '__main__': pass