#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Mon Feb 8 17:50:30 2021
Originally developed by Fabrice Collard.
Please see Dynamic Programming Notes #7 at:
http://fabcol.free.fr/pdf/lectnotes7.pdf
Translated from Matlab code to Python by A.Goumilevski
"""
import os, math
from time import time
import numpy as np
from numpy.matlib import repmat
import scipy.linalg as la
from scipy.optimize import fmin
from scipy.interpolate import CubicSpline
import scipy.sparse as sparse
from scipy.linalg import pinv
from scipy.sparse.linalg import spsolve
from snowdrop.src.numeric.dp.util import Chebychev_Polinomial, markov, tv, scalar_tv
sigma = 1.50 # utility parameter
delta = 0.10 # depreciation rate
beta = 0.95 # discount factor
alpha = 0.30 # capital elasticity of output
nbk = 1000 # number of data points in the grid
nbc = 100
nba = 2
crit = 1 # convergence criterion
itr = 0 # iteration
epsi = 1e-6 # convergence tolerance
ks = ((1-beta*(1-delta))/(alpha*beta))**(1/(alpha-1))
dev = 0.9 # maximal deviation from steady state
kmin = (1-dev)*ks # lower bound on the grid
kmax = (1+dev)*ks # upper bound on the grid
devk = (kmax-kmin)/(nbk-1) # implied increment
kgrid = np.linspace(kmin,kmax,nbk) # build the grid
v = np.zeros(nbk) # value function
kp0 = 0 # initial guess on k(t+1)
dr = np.zeros(nbk,dtype=int) # decision rule (will contain indices)
cmin = 0.01 # lower bound on the grid
cmax = kmax**alpha # upper bound on the grid
c = np.linspace(cmin,cmax,nbc) # build the grid
Tv = np.zeros(nbk) # value function
u = (c**(1-sigma)-1)/(1-sigma)
k = np.linspace(kmin,kmax,nbk)
[docs]
def value_iteration(interpolate=False):
"""
Solve the OGM by value function iteration.
Parameters:
interpolate : bool, optional
If True performs interpolation of value function to grid nodes.
The default is False.
"""
global itr, crit, nbk, kp0, kgrid, v, dr
t0 = time()
if interpolate:
nbc = 1000; nbk = 100
c = np.linspace(cmin,cmax,nbc) # builds the grid
kgrid = np.linspace(kmin,kmax,nbk) # builds the grid
csy = 1-alpha*beta*delta/(1-beta*(1-delta))
Tv = np.zeros(nbk) # value function
v = ((csy*kgrid**alpha)**(1-sigma)-1)/(1-sigma) # value function
u = (c**(1-sigma)-1)/(1-sigma)
dr = np.zeros(nbk,dtype=int)
### Main loop
while crit>epsi:
for i in range(nbk):
kp = kgrid[i]**alpha+(1-delta)*kgrid[i]-c
cs = CubicSpline(kgrid,v)
vi = cs(kp)
x = u + beta*vi
Tv[i] = np.max(x)
dr[i] = np.argmax(x)
crit = np.max(abs(Tv-v))/la.norm(Tv) # Compute convergence criterion
v = Tv.copy() # Update the value function
itr += 1
print('Iteration # {} \tCriterion: {:.2e}'.format(itr,crit))
c = c[dr]
kp = kgrid**alpha+(1-delta)*kgrid-c
u = (c**(1-sigma)-1)/(1-sigma)
v = u/(1-beta)
else:
Tv = np.zeros(nbk) # value function
c = np.linspace(cmax,cmax,nbk) # ag
### Main loop
while crit>epsi:
for i in range(nbk):
# compute indexes for which consumption is positive
#imax = min(math.floor((kgrid[i]**alpha+(1-delta)*kgrid[i]-kmin)/devk)+1,nbk)
# consumption and utility
c = kgrid[i]**alpha+(1-delta)*kgrid[i] - kgrid #[:imax]
c = np.maximum(0,c)
u = (c**(1-sigma)-1)/(1-sigma)
# find new policy rule
x = u + beta*v #[:imax]
Tv[i] = np.max(x)
dr[i] = np.argmax(x)
# decision rules
kp = kgrid[dr]
c = kgrid**alpha+(1-delta)*kgrid-kp
# update the value
#u = (c**(1-sigma)-1)/(1-sigma)
crit= np.max(abs(Tv-v))/la.norm(Tv)
v = Tv.copy()
itr += 1
print('Iteration # {} \tCriterion: {:.2e}'.format(itr,crit))
elapsed = time() - t0
print("\nElapsed time: %.2f (seconds)" % elapsed)
return (c,kp,u,v),['Consumption','Capital','Utility','Value Function']
[docs]
def policy_iteration():
"""
Solve the OGM by policy function iteration (Howard method).
Returns:
c : numpy array
Consumption.
kp : numpy array
Capital.
u : numpy array
Utility.
v : numpy array
Value function.
"""
global itr, crit, c, u, v, Tv, kp0
t0 = time()
### Main loop
while crit>epsi:
for i in range(nbk):
# compute indexes for which consumption is positive
imin = max(math.ceil(((1-delta)*kgrid[i]-kmin)/devk)+1,0)
imax = min(math.floor((kgrid[i]**alpha+(1-delta)*kgrid[i]-kmin)/devk)+1,nbk)
# consumption and utility
c = kgrid[i]**alpha+(1-delta)*kgrid[i]-kgrid[imin:imax]
u = (c**(1-sigma)-1)/(1-sigma)
# find new policy rule
x = u+beta*v[imin:imax]
#vi = np.max(x)
idr = np.argmax(x)
dr[i] = imin+idr
### Decision rules
kp = kgrid[dr]
c = kgrid**alpha+(1-delta)*kgrid-kp
#Update the value
u = (c**(1-sigma)-1)/(1-sigma)
Q = sparse.lil_matrix((nbk,nbk))
for i in range(nbk):
Q[i,dr[i]] = 1
q = Q.todense()
Tv = spsolve(sparse.eye(nbk)-beta*Q, u)
crit = max(abs(kp-kp0))/la.norm(kp)
crit1 = max(abs(Tv-v))/la.norm(Tv)
v = Tv.copy()
kp0 = kp
itr += 1
print('Iteration # {} \tPolicy Criterion: {:.2e} \tValue Criterion: {:.2e}'.format(itr,crit,crit1))
elapsed = time() - t0
print("\nElapsed time: %.2f (seconds)" % elapsed)
return (c,kp,u,v),['Consumption','Capital','Utility','Value Function']
[docs]
def parametric_value_iteration(n=10, nbk = 20):
"""
Solve the OGM by parametric value iteration.
Parameters:
n : int, optional
Order of polynomials. The default is 10.
nbk : int, optional
Number of data points in the grid. The default is 20.
Returns:
c : numpy array
Consumption.
v : numpy array
Value function.
"""
global itr, crit, c, u, v, Tv, kp0 # Number of data points in the grid
t0 = time()
rk = -np.cos((2*np.arange(1,nbk+1)-1)*np.pi/(2.*nbk)) # Interpolating nodes
kgrid = kmin+(rk+1)*(kmax-kmin)/2 # Mapping
v = (((kgrid**alpha)**(1-sigma)-1)/((1-sigma)*(1-beta))) # Initial guess for the approximation
X = Chebychev_Polinomial(rk,nbk) # Chebyshev polynomials
iX = pinv(X)
theta0 = pinv(X) @ v # Polynomial decomposition
Tv = np.zeros(nbk)
kp = np.zeros(nbk)
### Main loop
while crit>epsi:
k0 = kgrid.copy()
param = (alpha,beta,delta,sigma,kmin,kmax,nbk,kgrid,theta0)
k0 = fmin(scalar_tv,k0,args=param,xtol=1.e-3,ftol=1.e-3,maxiter=20,disp=False)
Tv = -tv(k0,*param)
theta0 = iX @ Tv
crit = np.max(abs(Tv-v))/la.norm(Tv)
print('Iteration # {} \tCriterion: {:.2e}'.format(itr,crit))
v = Tv.copy()
itr += 1
kp = k0
c = kgrid**alpha+(1-delta)*kgrid-kp
elapsed = time() - t0
print("\nElapsed time: %.2f (seconds)" % elapsed)
return (c,kp,v),['Consumption','Capital','Value Function']
[docs]
def stochastic_value_iteration(p=0.9,interpolate=False):
"""
Solve the stochastic OGM by value iteration.
Parameters:
p : float, optional
Probability value. The default is 0.9.
It is used for discrete approximation of VAR(1) stochastic process.
interpolate : bool, optional
If True interpolate grid. The default is False.
Returns:
c : numpy array
Consumption.
u : numpy array
Utility.
v : numpy array
Value function.
"""
global crit, itr
t0 = time()
PI = np.array([[p,1-p],[1-p,p]])
kmin = 0.2 #(1-dev)*ks;
kmax = 6
se = 0.2
ab = 0
A = [np.exp(ab-se), np.exp(ab+se)]
k = np.linspace(kmin,kmax,nbk)
u = np.zeros((nbk,nba))
v = repmat(k**(alpha*(1-sigma))/(1-sigma),nba,1).T
Tv = np.zeros((nbk,nba))
dr = np.zeros((nbk,nba),dtype=int)
if interpolate:
c = np.linspace(0.01,A[1]*kmax**alpha,nbk)
u = (c**(1-sigma)-1)/(1-sigma)
while crit>epsi:
for i in range(nbk):
for j in range(nba):
kp = A[j]*k[i]**alpha + (1-delta)*k[i] - c
cs = CubicSpline(k,v)
vi = cs(kp)
x = u + beta*(vi @ PI[j])
Tv[i] = np.max(x)
dr[i] = np.argmax(x)
crit = np.max(np.max(abs(Tv-v)))/la.norm(Tv)
print('Iteration # {} \tCriterion: {:.2e}'.format(itr,crit))
v = Tv.copy()
itr += 1
kp = np.zeros((nbk,nba))
ct = np.zeros((nbk,nba))
for j in range(nba):
ct[:,j] = c[dr[:,j]]
kp[:,j] = A[j]*k**alpha + (1-delta)*k - ct[:,j]
c = ct
else:
c = np.zeros((nbk,nba))
while crit>epsi:
for i in range(nbk):
for j in range(nba):
x = A[j]*k[i]**alpha + (1-delta)*k[i] - k
neg = (x<=0)
x[neg] = np.nan
u[:,j] = (x**(1-sigma)-1)/(1-sigma)
u[neg,j] = -1.e12
x = u + beta*(v @ PI)
Tv[i] = np.max(x,axis=0)
dr[i] = np.argmax(x,axis=0)
crit = np.max(np.max(abs(Tv-v)))/la.norm(Tv)
print('Iteration # {} \tCriterion: {:.2e}'.format(itr,crit))
v = Tv.copy()
itr += 1
kp = k[dr]
for j in range(nba):
c[:,j] = A[j]*k**alpha + (1-delta)*k - kp[:,j]
u = (c**(1-sigma)-1)/(1-sigma)
v = u/(1-beta)
elapsed = time() - t0
print("\nElapsed time: %.2f (seconds)" % elapsed)
return (c,kp,v),['Consumption','Capital','Value Function']
[docs]
def stochastic_policy_iteration(p=0.9,kmin=0.2,kmax=6):
"""
Solve the stochastic OGM by policy iteration.
Parameters:
p : float, optional
Probability value. The default is 0.9.
It is used for discrete approximation of VAR(1) stochastic process.
kmin : float, optional
Lower bound on the grid. The default is 0.2
kmax : float, optional
Upper bound on the grid. The default is 6.
Returns:
c : numpy array
Consumption.
kp : numpy array
Capital.
v : numpy array
Value function.
"""
global crit, itr, kgrid
t0 = time()
PI = np.array([[p,1-p],[1-p,p]])
se = 0.2
ab = 0
A = np.array([np.exp(ab-se), np.exp(ab+se)])
v = np.zeros((nbk,nba)) # value function
u = np.zeros((nbk,nba)) # utility function
c = np.zeros((nbk,nba)) # consumption
kgrid= np.linspace(kmin,kmax,nbk) # builds the grid
dr = np.zeros((nbk,nba),dtype=int) # decision rule (will contain indices)
kp0 = 0
Tv0 = 0
# Main loop
while crit > epsi:
for i in range(nbk):
for j in range(nba):
x = A[j]*kgrid[i]**alpha + (1-delta)*kgrid[i] - kgrid
neg = (x<=0 )
x[neg] = np.nan
u[:,j] = (x**(1-sigma)-1)/(1-sigma)
u[neg,j] = -np.inf
x = u + beta * v @ PI
# vi = np.max(x,axis=0)
dr[i] = np.argmax(x,axis=0)
# decision rules
kp = kgrid[dr]
# update the value
Q = sparse.lil_matrix((nbk*nba,nbk*nba))
for j in range(nba):
z = A[j]*kgrid**alpha + (1-delta)*kgrid - kp[:,j]
# Update the value
u[:,j] = (z**(1-sigma)-1)/(1-sigma)
Q0 = sparse.lil_matrix((nbk,nbk))
for i in range(nbk):
Q0[i,dr[i,j]] = 1
x = sparse.kron(PI[j],Q0)
Q[j*nbk:(j+1)*nbk,:] = x
q = Q.todense()
M = sparse.eye(nbk*nba) - beta*Q
Tv = spsolve(M,np.ravel(u.T))
v = np.reshape(Tv,(nba,nbk)).T
crit1= np.max(abs(Tv-Tv0))/la.norm(Tv)
crit = np.max(np.max(abs(kp-kp0)))/la.norm(kp)
print('Iteration # {} \tCriterion1: {:.2e} \tCriterion1: {:.2e}'.format(itr,crit,crit1))
Tv0 = Tv.copy()
kp0 = kp.copy()
itr += 1
for j in range(nba):
c[:,j] = A[j]*kgrid**alpha + (1-delta)*kgrid - kp[:,j]
elapsed = time() - t0
print("\nElapsed time: %.2f (seconds)" % elapsed)
return (c,kp,u,v),['Consumption','Capital','Utility','Value Function']
[docs]
def stochastic_constarined_policy_iteration(p=0.8,r=0.02,beta=0.95,gam=0.5,amin=0,amax=10):
"""
Solve and simulate the borrowing constraint problem (policy iteration).
Parameters:
p : float, optional
Probability value. The default is 0.8.
It is used for discrete approximation of VAR(1) stochastic process.
r : float, optional
Interest rate. The default is 0.02.
beta : float, optional
Discount factor. The default is 0.95.
gam : float, optional
Replacement ratio. The default is 0.5.
amin : float, optional
Lower bound on the grid. The default is 0.
amax : float, optional
Upper bound on the grid. The default is 10.
Returns:
c : numpy array
Consumption.
k : numpy array
Capital.
ast : numpy array
Asset values.
v : numpy array
Value function.
"""
global crit, itr
t0 = time()
w = 1 # employment benefits
PI = np.array([[p,1-p],[1-p,p]]) # matrix of probablities
Om = np.array([gam*w, w])
agrid = np.linspace(amin,amax,nbk) # builds the grid
v = np.zeros((nbk,nba)) # value function
u = np.zeros((nbk,nba)) # value function
ap0 = 1 # initial guess on k(t+1)
c = np.zeros((nbk,nba)) # consumption
util = np.zeros((nbk,nba)) # value function
u = []
dr = np.zeros((nbk,nba),dtype=int) # decision rule (will contain indices)
for j in range(nba):
x1 = repmat((1+r)*agrid + Om[j], nbk,1)
x2 = repmat(agrid, nbk,1)
x = x1 - x2.T
neg = (x<=0)
x[neg] = np.nan
uj = (x**(1-sigma)-1)/(1-sigma)
uj[neg] = -np.inf
u.append(uj.copy())
# Main loop
while crit > epsi:
for j in range(nba):
x = u[j] + beta*repmat(v@PI[j],nbk,1).T
dr[:,j] = np.argmax(x,axis=0)
# Decision rules
ap = agrid[dr]
# Update the value
Q = sparse.lil_matrix((nbk*nba,nbk*nba))
for j in range(nba):
for i in range(nbk):
util[i,j] = u[j][dr[i,j],i]
Q0 = sparse.lil_matrix((nbk,nbk))
for i in range(nbk):
Q0[i,dr[i,j]] = 1
Q[j*nbk:(j+1)*nbk] = sparse.kron(PI[j],Q0)
Tv = spsolve(sparse.eye(nbk*nba)-beta*Q, np.ravel(util.T))
crit= np.max(abs(ap-ap0))/la.norm(ap)
print('Iteration # {} \tCriterion: {:.2e}'.format(itr,crit))
v = np.reshape(Tv,(nba,nbk)).T
ap0 = ap.copy()
itr += 1
for j in range(nba):
c[:,j] = (1+r)*agrid + Om[j] - ap[:,j]
n = 7
x = 2*(agrid-amin)/(amax-amin)-1
Ta = Chebychev_Polinomial(x,n)
ba = pinv(Ta) @ ap
ws,ids = markov(PI,Om,nbk,1)
ast = np.ones(nbk)
cs = np.zeros(nbk)
x = 2*(ast-amin)/(amax-amin)-1
Ta = Chebychev_Polinomial(x,n)
for t in range(nbk-1):
ast[t+1] = Ta[t] @ ba[:,ids[t]]
cs[t] = (1+r)*ast[t] + ws[t] - ast[t+1]
elapsed = time() - t0
print("\nElapsed time: %.2f (seconds)" % elapsed)
return [[cs],[k],[ast],v.T.tolist()],['Consumption','Capital','Assets','Value Function']
[docs]
def Plot(path_to_dir,y,variable_names,var_labels={}):
from snowdrop.src.graphs.util import plotTimeSeries
import pandas as pd
header = 'Solution of Bellman Equation'
series = []; labels = []; titles = []
data=np.array(y,dtype=object)
if np.ndim(data) == 3:
nvar,n,ns = data.shape
for i in range(nvar):
ser = []; lbls = []
for j in range(ns):
ser.append(pd.Series(data[i,:,j]))
lbls.append("Markov State " + str(1+j))
series.append(ser)
labels.append(lbls)
var = variable_names[i]
name = var_labels[var] if var in var_labels else var
titles.append(name)
elif np.ndim(data) == 2:
nvar,n = data.shape
for i in range(nvar):
ser = pd.Series(data[i])
series.append([ser])
labels.append([])
var = variable_names[i]
name = var_labels[var] if var in var_labels else var
titles.append(name)
else:
nvar = len(y)
for i in range(nvar):
ser = []; lbls = []
data = y[i]
ns = len(data)
for j in range(ns):
ser.append(pd.Series(data[j]))
if ns>1:
lbls.append("Markov State " + str(1+j))
else:
lbls.append(" ")
series.append(ser)
labels.append(lbls)
var = variable_names[i]
name = var_labels[var] if var in var_labels else var
titles.append(name)
if nvar <= 4:
sizes = [2,2]
else:
sizes = [np.ceil(nvar/3),3]
plotTimeSeries(path_to_dir=path_to_dir,header=header,titles=titles,labels=labels,series=series,sizes=sizes)
if __name__ == '__main__':
"""The main entry point."""
#data,var_names = value_iteration(interpolate=True)
data,var_names = policy_iteration()
#data,var_names = parametric_value_iteration()
#data,var_names = stochastic_value_iteration(interpolate=True)
# data,var_names = stochastic_policy_iteration()
#data,var_names = stochastic_constarined_policy_iteration()
# Plot graphs
path = os.path.dirname(os.path.abspath(__file__))
path_to_dir = os.path.abspath(os.path.join(path,"../../../graphs"))
Plot(path_to_dir=path_to_dir,y=data,variable_names=var_names)
print("Done!")