Source code for snowdrop.src.numeric.filters.filters

# -*- coding: utf-8 -*-
"""
Created on Mon Feb 26 18:49:21 2018

@author: agoumilevski
"""

import numpy as np
import scipy.linalg as la
import pandas as pd
import scipy as sp
from scipy.sparse import spdiags,lil_matrix,csr_matrix
from scipy.signal import fftconvolve
from scipy.sparse import linalg as sla
from scipy.signal import butter,lfilter
from snowdrop.src.misc.termcolor import cprint

oldK = 0

[docs] def LRXfilter(y, lmbd=1600, w1=1, w2=0, w3=0, dprior=None, prior=None): """ Apply enchanced HP filter. This is LRX filter named after Laxton-Rose-Xie. A brief explanation of terms used in LRX filter Syntax: y_eq = LRXfilter(y,lmbd[,w1[,w2[,w3[,DPRIOR[,PRIOR]]]]]) Args: lmbd: float tightness constraint for HP filter, y: array, dtype=float series to be detrended, w1: float weight on (y - y_eq)^2, w2: float weight on [(y_eq - y_eq(-1)) - dprior]^2, w3: float weight on (y_eq - prior)^2, dprior: array, dtype=float growth rate of y in steady-state, prior: array, dtype=float previously computed y_eq. Returns: y_eq: The trend component, gap: The cycle component .. note:: the default value of lmbd is 1600, the default value of w1 is a vector of ones, the default value of w2 is a vector of zeros, the default value of w3 is a vector of zeros, the default value of dpr is a vector of zeros, the default value of pr is a vector of zeros, if any of w1, w2, w3, dpr, pr is of length 1, then it is extended to a vector of the appropriate length, in which all the entries are equal. """ b = isinstance(y,pd.Series) if b: index = y.index y = y.values if isinstance(dprior,pd.Series): dprior = dprior.values if isinstance(prior,pd.Series): prior = prior.values n = len(y) w1 *= np.ones(n) if w2 == 0: w2 = np.zeros(n-1) else: w2 *= w2*np.ones(n-1) if w3 == 0: w3 = np.zeros(n) else: w3 *= np.ones(n) if dprior is None: dprior = np.zeros(n-1) elif np.isscalar(dprior): dprior *= np.ones(n-1) if prior is None: prior = np.zeros(n) elif np.isscalar(prior): prior = np.ones(n) # This is the main part of the function I = np.eye(n) B = np.diff(I,n=1,axis=0) A = np.diff(I,n=2,axis=0) # use sparse matrix to save memory and unnecessary computation MW1 = spdiags(w1, 0, n, n) MW2 = spdiags(w2, 0, n-1, n-1) MW3 = spdiags(w3, 0, n, n) Y = y @ MW1 + prior @ MW3 + dprior @ MW2 @ B if np.ndim(Y) > 1: Y = np.squeeze(Y) XX = lmbd * A.T @ A + B.T @ MW2 @ B + MW1 + MW3 y_eq = la.solve(XX,Y) gap = y - y_eq if b: y = pd.Series(y_eq.T,index) gap = pd.Series(gap.T,index) else: y = y_eq.T gap = gap.T return y, gap
[docs] def HPF(data, lmbd=1600): """Hodrick-Prescott filter to a time series.""" data = data.dropna() vals,_ = LRXfilter(data.values, lmbd=lmbd) filtered = pd.Series(vals,data.index) gap = data - filtered return filtered, gap
[docs] def HPfilter(data, lmbd=1600): """ Apply a Hodrick-Prescott filter to a dataset. The return value is the filtered data-set found according to: .. math:: \min_{T_{t}}[ \sum_{t=0}((X_{t} - T_{t})^2 + \lambda*((T_{t+1} - T_{t}) - (T_{t} - T_{t-1}))^2) ] Args: data: array, dtype=float The data set for which you want to apply the HP filter. This mustbe a numpy array. lmbd: array, dtype=float This is the value for lambda as used in the equation. Returns: T: array, dtype=float The solution to the minimization equation above (the trend). Cycle: array, dtype=float This is the 'stationary data' found by X - T. .. note:: This function implements sparse methods to be efficient enough to handle very large data sets. """ Y = np.asarray(data) if Y.ndim == 2: resp = [HPfilter(e) for e in data] T = np.row_stack( [e[0] for e in resp] ) Cycle = np.row_stack( [e[1] for e in resp] ) return [T,Cycle] elif Y.ndim > 2: raise Exception('HP filter is not defined for dimension >= 3.') lil_t = len(Y) big_Lambda = sp.sparse.eye(lil_t, lil_t) big_Lambda = lil_matrix(big_Lambda) # Use FOC's to build rows by group. The first and last rows are similar. # As are the second-second to last. Then all the ones in the middle... first_last = np.array([1 + lmbd, -2 * lmbd, lmbd]) second = np.array([-2 * lmbd, (1 + 5 * lmbd), -4 * lmbd, lmbd]) middle_stuff = np.array([lmbd, -4. * lmbd, 1 + 6 * lmbd, -4 * lmbd, lmbd]) #--------------------------- Putting it together --------------------------# # First two rows big_Lambda[0, 0:3] = first_last big_Lambda[1, 0:4] = second # Last two rows. Second to last first : we have to reverse arrays big_Lambda[lil_t - 2, -4:] = second[::-1] big_Lambda[lil_t - 1, -3:] = first_last[::-1] # Middle rows for i in range(2, lil_t - 2): big_Lambda[i, i - 2:i + 3] = middle_stuff # spla.spsolve requires csr or csc matrix. I choose csr for fun. big_Lambda = csr_matrix(big_Lambda) T = sla.spsolve(big_Lambda, Y) Cycle = Y - T return T, Cycle
[docs] def bandpass_filter(data, w1, w2, k=None): """ Apply a bandpass filter to data. This band-pass filter is of kth order. It selects the band between w1 and w2. Args: data: array, dtype=float The data you wish to filter k: number, int The order of approximation for the filter. A max value for this is data.size/2 w1: number, float This is the lower bound for which frequencies will pass through. w2: number, float This is the upper bound for which frequencies will pass through. Returns: y: array, dtype=float The filtered data. """ if k is None: k = max(1,int(len(data)/2)) data = np.asarray(data) low_w = np.pi * 2 / w2 high_w = np.pi * 2 / w1 bweights = np.zeros(2*k+1) bweights[k] = (high_w - low_w) / np.pi j = np.arange(1,k+1,1) weights = 1.0 / (np.pi * j) * (np.sin(high_w * j) - np.sin(low_w * j)) bweights[k + j] = weights bweights[:k] = weights[::-1] bweights -= bweights.mean() filtered = fftconvolve(bweights, data, mode='same') #'full' return filtered[:-1]
[docs] def BPASS(data,W1,W2,drift=True,bChristianoFitzgerald=True,cutoff=False): """Bandpass filter to time series.""" from scipy.fftpack import rfft, irfft, fftfreq from statsmodels.tsa.filters.cf_filter import cffilter data = data.dropna() time = data.index signal = data.values if bChristianoFitzgerald: filtered_signal = cffilter(signal,low=W1,high=W2,drift=drift)[0] else: if cutoff: W = fftfreq(signal.size, 1./(time[1]-time[0]).days) f_signal = rfft(signal) filtered_signal = np.copy(f_signal) filtered_signal[(np.abs(W)>1./W1)] = 0 filtered_signal[(np.abs(W)<1./W2)] = 0 filtered_signal = irfft(filtered_signal) else: delta = (time[1]-time[0]).days fs = 1/delta nyq = 365.0/2.0 low = 365.0/W2/nyq high = 365.0/W1/nyq b,a = butter(N=6,Wn=[low,high]*fs,btype='bandpass') filtered_signal = lfilter(b,a,data) ts = pd.Series(filtered_signal,time) return ts
[docs] def Standard_Kalman_Filter(x, y, T, Z, P, Q, H, C=0, ind_non_missing=None, t = -1): r""" Implement Kalman filter algorithm for state-space model. This implementation assumes that measurement equation is specified at time t. .. math:: x_{t+1} = T_{t} * x_{t} + C_{t} + w_{t}, w_{t} = Normal(0, Q), y_{t+1} = Z_{t} * x_{t+1} + v_{t}, v_{t} = Normal(0, H) KF uses two phase algorithm Predict .. math:: x_{t|t} &= x_{t|t-1} + P_{t|t-1} * L_{t} * (y_{t} - Z_{t} * x_{t|t-1}) P_{t|t} &= P_{t|t-1} - L * Z_{t} * P_{t|t-1} L{t} &= Z_{t} * [Z_{t}*P_{t|t-1}*Z'_{t} + H]^{-1} Update .. math:: v_{t} &= y_{t} - Z_{t} * x_{t|t-1} x_{t+1|t} &= T_{t} * x_{t|t} + C_{t} P_{t+1|t} &= T_{t} * P_{t|t} * T'_{t} + Q_{t} Args: x : array, dtype=float is the state variables, y : array, dtype=float is the observation variables, T : 2D array, dtype=float is the state-transition matrix, Z : 2D array, dtype=float is the observation matrix, P : 2D array, dtype=float is the predicted error covariance matrix, Q : 2D array, dtype=float is the covariance matrix of state variables (endogenous variables), H : 2D array, dtype=float is the covariance matrix of space variables (measurement variables), C : array, dtype=float is the constant term matrix, ind_non_missing : array, dtype=int are the indices of non-missing observations. For details, please see https://stanford.edu/class/ee363/lectures/kf.pdf """ n_meas = len(y) n_states = len(x) if ind_non_missing is None: cprint("Indeces of non-missing observations are not defined - resetting it.","red") ind_non_missing = np.arange(0,n_meas) iF = np.zeros((n_meas,n_meas)) K = np.zeros((n_states,n_meas)) ZZ = Z[ind_non_missing] # Measurement pre-fit residual v = np.zeros(y.shape) v[ind_non_missing] = y[ind_non_missing] - ZZ @ (T @ x + C) if len(ind_non_missing) == 0: log_likelihood = 0 xn = x else: if True : F = H + Z @ P @ Z.T # pre-fit residual covariance iF = la.pinv(F) # matrix inverse K[:,ind_non_missing] = P @ ZZ.T @ iF[np.ix_(ind_non_missing,ind_non_missing)] # Kalman gain # Prediction step x += K @ v # predicted state estimate eq (4.26) DK(2012) P -= K @ ZZ @ P # predicted error covariance else: F = H + Z @ P @ Z.T # pre-fit residual covariance iF = la.pinv(F) # matrix inverse M = P @ Z.T PZ = M @ iF # Prediction step x += PZ @ v # predicted state estimate eq (4.26) DK(2012) P -= M @ iF @ M.T # predicted error covariance K = T @ M # Kalman gain # Update step xn = T @ x + C # updated state estimate P = T @ P @ T.T + Q # updated error covariance P = 0.5*(P+P.T) # Force covariance matrix to be symmetric # Log-likelihood of the observed data log_likelihood = -0.5 * (n_meas*np.log(2*np.pi) + np.log(la.det(F)) + v.T @ iF @ v) return xn, v, P, K, iF, log_likelihood
[docs] def Kalman_Filter(x, y, T, Z, P, Q, H, C=0, ind_non_missing=None, bUnivariate=False, t=-1, tol=1.e-6): r""" Implement Kalman filter algorithm for state-space model. Args: x: array, dtype=float is the state variables, y: array, dtype=float is the observation variables, T: 2D array, dtype=float is the state-transition matrix, Z: 2D array, dtype=float is the observation matrix, P: 2D array, dtype=float is the predicted error covariance matrix, Q: 2D array, dtype=float is the covariance matrix of state variables (endogenous variables), H: 2D array, dtype=float is the covariance matrix of space variables (measurement variables), C: array, dtype=float is the constant term matrix, bUnivariate: bool if True univariate Kalman filter is used and if False - multivariate, ind_non_missing: array, dtype=int are the indices of non-missing observations, tol: number, dtype=float is the tolerance parameter (iteration over the Riccati equation) For details, see https://en.wikipedia.org/wiki/Kalman_filter """ global oldK n_meas = len(y) n_states = len(x) I = np.eye(n_states) if ind_non_missing is None: cprint("Indeces of non-missing observations are not defined - resetting it.","red") ind_non_missing = np.arange(0,n_meas) log_likelihood = 0 F = np.zeros(shape=(n_meas)) iF = np.zeros((n_meas,n_meas)) K = np.zeros((n_states,n_meas)) ZZ = Z[ind_non_missing] # Prediction step P = T @ P @ T.T + Q # predicted error covariance # Measurement pre-fit residual v = np.zeros(y.shape) v[ind_non_missing] = y[ind_non_missing] - ZZ @ x if len(ind_non_missing) > 0: # Update step F = H[np.ix_(ind_non_missing,ind_non_missing)] + ZZ @ P @ ZZ.T # pre-fit residual covariance iF[np.ix_(ind_non_missing,ind_non_missing)] = la.pinv(F) # matrix inverse K[:,ind_non_missing] = P @ ZZ.T @ iF[np.ix_(ind_non_missing,ind_non_missing)] # optimal Kalman gain # Update (a posteriori) state estimate xn = T @ (x + K @ v) # Update (a posteriori) estimate covariance P = (I-K@Z) @ P @ (I-K@Z).T + K @ H @ K.T # This is a general case. # Force the covariance matrix to be symmetric P = 0.5*(P+P.T) steady = t > 0 and np.max(np.abs(K-oldK)) < tol oldK = np.copy(K) # Log-likelihood of the observed data log_likelihood = -0.5 * (n_meas*np.log(2*np.pi) + np.log(la.det(F)) + v.T @ iF @ v) return xn, v, P, K, F, iF, log_likelihood, steady
[docs] def Kalman_Filter_SS(x, y, T, Z, K, F, iF, C=0, ind_non_missing=None): r""" Implement Kalman filter algorithm for stationary state-space model. Args: x: array, dtype=float is the state variables, y: array, dtype=float is the observation variables, T: 2D array, dtype=float is the state-transition matrix, Z: 2D array, dtype=float is the observation matrix, K: 2D array, dtype=float is the Kalman gain matrix, F: 2D array, dtype=float is the pre-fit residual covariance, iF: 2D array, dtype=float is the inverse of the pre-fit residual covariance, C: array, dtype=float is the constant term matrix, bUnivariate: bool if True univariate Kalman filter is used and if False - multivariate, ind_non_missing: array, dtype=int are the indices of non-missing observations, tol: number, dtype=float is the tolerance parameter For details, see https://en.wikipedia.org/wiki/Kalman_filter """ n_meas = len(y) if ind_non_missing is None: cprint("Indeces of non-missing observations are not defined - resetting it.","red") ind_non_missing = np.arange(0,n_meas) log_likelihood = 0 # Measurement pre-fit residual v = np.zeros(y.shape) v[ind_non_missing] = y[ind_non_missing] - Z @ x if len(ind_non_missing) > 0: # Update (a posteriori) state estimate xn = T @ (x + K @ v) # Log-likelihood of the observed data log_likelihood = -0.5 * (n_meas*np.log(2*np.pi) + np.log(la.det(F)) + v.T @ iF @ v) return xn, v, log_likelihood
[docs] def fast_kalman_filter(Y,N,a,P,T,H,Z,kalman_tol=1.e-10): """Computes the likelihood of a stationary state space model, given initial condition for the states (mean and variance). References: Edward P. Herbst, (2012). 'Using the ”Chandrasekhar Recursions” for Likelihood Evaluation of DSGE Models'' Args: Y: array, dtype=float Measurement data, N: scalar, int Last period, a: array, dtype=float Initial mean of the state vector, P: 2D array, dtype=float Initial covariance matrix of the state vector, T: 2D array, dtype=float Transition matrix of the state equation, H: 2D array, dtype=float Covariance matrix of the measurement errors (if no measurement errors set H as a zero scalar), Z: 2D array, dtype=float Matrix relating the states to the observed variables or vector of indices, kalman_tol: scalar, float Tolerance parameter (rcond, inversibility of the covariance matrix of the prediction errors), """ # Number of measurements n_meas = len(Y) # Initialize some variables. likk = np.zeros(N) # Initialization of the vector gathering the densities. residuals = np.zeros(N) F_singular = True K = T @ P @ Z.T F = Z @ P @ Z.T + H W = np.copy(K) iF = la.inv(F) Kg = K @ iF M = -iF for t in range(N) : v = Y[t] - Z @ a residuals[t] = v dF = la.det(F) if np.linalg.cond(F,-2) < kalman_tol: if abs(dF)<kalman_tol: raise Exception("The univariate diffuse kalman filter should be used.") else: raise Exception("Pathological case, discard draw.") else: F_singular = False likk[t] = np.log(dF) + v.T @ iF @ v a = T @ a + Kg @ v ZWM = Z @ W @ M ZWMWp = ZWM @ W.T M = M + ZWM.T @ iF @ ZWM F = F + ZWMWp @ Z.T iF = la.inv(F) K = K + T @ ZWMWp.T Kg = K @ iF W = (T - Kg @ Z) @ W if F_singular: raise Exception('The variance of the forecast error remains singular until the end of the sample') # Add observation's densities constants and divide by two. likk = -0.5 * (likk + n_meas*np.log(2*np.pi)) # Compute the log-likelihood. if np.any(np.isnan(likk)): LIK = -1.e20 else: LIK = np.sum(likk) return LIK,likk,residuals,a,P
[docs] def Rauch_Tung_Striebel_Smoother(Ytp,Xt,Xtp,Ft,Pt,Ptp,Ttp): r""" Implement Rauch-Tung-Striebel (RTS) smoother for state-space model. RTS algorithm is .. math:: L_{t} &= P_{t|t} * F'_{t} * P_{t+1|t}^{-1} X_{t|T} &= X_{t|t} + L_{t} * (X_{t+1|T} - X_{t+1|t}) P_{t|T} &= P_{t+1|t} + L_{t} * (P_{t+1|T} - P_{t+1|t}) * L'_{t} Args: Ytp: array, dtype=float is the smoothed state variables at time t+1, Xt: array, dtype=float is the state variables at time t, Xtp: array, dtype=float is the state variables at time t+1, Ft: 2D array, dtype=float is the state-transition matrix at time t, Pt: 2D array, dtype=float is the predicted error covariance matrix at time t, Ptp: 2D array, dtype=float is the predicted error covariance matrix at time t+1, Ttp: 2D array, dtype=float is the smoothed covariance matrix of state variables. For details, see https://en.wikipedia.org/wiki/Kalman_filter """ L = Pt @ Ft.T @ la.pinv(Ptp) y = Xt + L @ (Ytp - Xtp) P = Ptp + L @ (Ttp - Ptp) @ L.T return y, P
[docs] def Bryson_Frazier_Smoother(x,u,F,H,K,Sinv,P,L,l): r""" Implement modified Bryson-Frazier (BF) smoother for a state-space model. BF algorithm is: .. math:: L_{t} &= H'_{t}*S_{t}^{-1}*H_{t} + (I-K_{t}*H_{t})'*L_{t}*(I-K_{t}*H_{t}) L_{t-1} &= F'_{t}*Lt_{t}*F_{t} L_{T} &= 0 l_{t} &= -H'_{t}*S_{t}^{-1}*u_{t} + (I-K_{t}*H_{t})'*l_{t} l_{t-1} &= F'_{t}*lt_{t} l_{T} &= 0 Smoothed variables and covariances .. math:: xs_{t|T} &= x_{t} - P_{t}*l_{t} P_{t|T} &= P_{t} - P_{t}*L_{t}*P_{t} Args: x: array, dtype=float is the state variables, u: array, dtype=float is the vector of one-step-ahead prediction errors F: 2D array, dtype=float is the state-transition matrix, H: 2D array, dtype=float is the observation matrix, K: 2D array, dtype=float is the Kalman gain, Sinv: 2D array, dtype=float is the inverse of matrix S, P: 2D array, dtype=float is the predicted error covariance matrix, L: 2D array, dtype=float is the covariance matrix, l: 2D array, dtype=float is the temporary matrix For details, see https://en.wikipedia.org/wiki/Kalman_filter """ n = len(x) C = np.eye(n) - K @ H Lt = H.T @ Sinv @ H + C.T @ L @ C L = F.T @ Lt @ F lt = -H.T @ Sinv @ u + C.T @ l l = F.T @ lt # Smoothed solution xs = x - P @ l # and covariance Ps = P - P.T @ L @ P return xs,Ps,L,l
[docs] def DK_Filter(x, xtilde, y, v, T, Z, P, H, Q, K, C=0, ind_non_missing=None, t=-1): r""" Implement non-diffusive Durbin & Koopman version of Kalman filter algorithm for state-space model. Multivariate approach to Kalman filter. State-space model .. math:: x_{t} = T_{t} * x_{t-1} + C_{t} + w_{t}, w = Normal(0, Q) y_{t} = Z_{t} * x_{t} + v_{t}, v = Normal(0, H) Durbin-Koopman algorithm .. math:: v_{t} &= y_{t} - Z_{t} * x_{t} x_{t} &= T_{t} * (x_{t-1} + K_{t} * v_{t}) + C_{t} F_{t} &= H_{t} + Z_{t} * P_{t-1} * Z'_{t} K_{t} &= P_{t} * Z'_{t} * F_{t}^{-1} L_{t} &= K_{t} * Z_{t} P_{t+1} &= T_{t} * P_{t} *L'_{t-1} + Q_{t} Args: x: array, dtype=float is the variables at t+1|t+1 step, xtilde: array, dtype=float is the variables at t|t+1 forecast step, y: array, dtype=float is the observation variables, v: array, dtype=float is the residual, T: 2D array, dtype=float is the state-transition matrix, Z: 2D array, dtype=float is the observation matrix, P: 2D array, dtype=float is the predicted error covariance matrix, H: 2D array, dtype=float is the covariance matrix of measurement errors, Q: 2D array, dtype=float is the covariance matrix of structural errors, K: 2Darray, dtype=float is the Kalman gain, C: array, dtype=float is the constant term matrix, ind_non_missing: array, dtype=int are the indices of non-missing observations, tol1: number, float Kalman tolerance for reciprocal condition number, tol2: number, float diffuse Kalman tolerance for reciprocal condition number (for Finf) and the rank of Pinf For details, see Durbin & Koopman 2012, "Time series Analysis by State Space Methods" and Koopman & Durbin 2000, "Fast Filtering and Smoothing For Multivariate State Space Models" """ n_states = len(x) n_meas = len(y) iF = None if ind_non_missing is None: cprint("Indeces of non-missing observations are not defined - resetting it.","red") ind_non_missing = np.arange(0,n_meas) ZZ = Z[ind_non_missing] v = np.zeros(y.shape) F = np.zeros(shape=(n_meas)) K = np.zeros(shape=(n_states,n_meas)) PZI = np.zeros(shape=(n_states,n_meas)) ### --------------------------------------------------------- Standard Kalman Filter if t==-1 or len(ind_non_missing) == 0: xn = T @ x + C P = T @ P @ T.T + Q F = H + Z @ P @ Z.T # pre-fit residual covariance iF = la.inv(F) K = T @ P @ Z.T @ iF v = y - Z @ xn v[np.isnan(v)] = 0 log_likelihood = 0 else: ### Multivariate approach to Kalman filter iF = np.zeros(shape=(n_meas,n_meas)) # measurement residual v[ind_non_missing] = y[ind_non_missing] - ZZ @ x ### Multivariate approach to Kalman Filter F = H[np.ix_(ind_non_missing,ind_non_missing)] + ZZ @ P @ ZZ.T # pre-fit residual covariance iF[np.ix_(ind_non_missing,ind_non_missing)] = la.inv(F) # matrix inverse PZI = P @ ZZ.T @ iF[np.ix_(ind_non_missing,ind_non_missing)] K[:,ind_non_missing] = T @ PZI # Kalman gain xtilde = x + PZI @ v[ind_non_missing] # updated (a posteriori) state estimate eq (4.26) DK(2012) xn = T @ xtilde + C # predicted state estimate L = T - K[:,ind_non_missing] @ ZZ P = T @ P @ L.T + Q # Log-likelihood of the observed data set log_likelihood = -0.5 * (n_meas*np.log(2*np.pi) + np.log(la.det(F)) + v @ iF @ v.T) return xn, xtilde, v, P, K, F, iF, log_likelihood
[docs] def DK_Smoother(x,y,T,Z,F,iF,P,QRt,K,H,r,v,ind_non_missing=None,t=-1): r""" Implement non-diffusive Durbin-Koopman smoother for state-space model. Durbin-Koopman smoothing algorithm is .. math:: r_{t-1} &= Z'_{t}*F_{t}^{-1}*v_{t} + L'_{t}**r_{t} r_{T} &= 0 L_{t} &= I - K_{t}*Z_{t} Smoothed variables .. math:: s_{t} = x_{t} + P_{t}*r_{t-1} Args: x: array, dtype=float is the updated state variables vector, y: array, dtype=float is the observation variables, T: 2D array, dtype=float is the state-transition matrix, Z: 2D array, dtype=float is the observation matrix, F: 2D array, dtype=float is the residual covariance matrix, iF: 2D array, dtype=float is the inverse of matrix F, P: 2D array, dtype=float is the updated error covariance matrix, QRt: 2D array, dtype=float is the product of the covariance matrix of structural errors and shock matrix K: 2D array, dtype=float is the Kalman gain matrix, H: 2D array, dtype=float is the covariance matrix of measurement errors, r: array, dtype=float is the vector of one-step-back errors, v: array, dtype=float is the vector of one-step-ahead prediction errors, ind_non_missing: array, dtype=int are the indices of non-missing observations. For details, see Durbin & Koopman (2012), "Time series Analysis by State Space Methods" and Koopman & Durbin (2000), "Fast Filtering and Smoothing For Multivariate State Space Models" and Koopman & Durbin (2003), in Journal of Time Series Analysis, vol. 24(1), pp. 85-98 """ n_meas = len(Z) res = np.zeros(n_meas) if ind_non_missing is None: cprint("Indeces of non-missing observations are not defined - resetting it.","red") ind_non_missing = np.arange(n_meas) etahat = [np.nan]*len(QRt) L = T - K @ Z # Smooth estimate if len(ind_non_missing) == 0: r = L.T @ r etahat = QRt @ r epsilonhat = H @ iF @ v xs = x + P @ r else: r = Z.T @ iF @ v + L.T @ r # eq (4.39) in DK(2012) # Smooth estimation of state variables disturbance etahat = QRt @ r # DK (2012) # Smooth estimation of observation disturbance epsilonhat = H @ (iF @ v - K.T @ r) # Smooth estimate xs = x + P @ r # eq (4.39) in DK(2012) res = y - Z @ xs # Smooth estimation of observation disturbance return xs,r,res,etahat,epsilonhat
[docs] def Durbin_Koopman_Non_Diffuse_Filter(x, xtilde, y, T, Z, P, H, Q, C=0, bUnivariate = False, ind_non_missing=None, tol1=1.e-6, tol2=1.e-6, t=-1): r""" Implement non-diffusive Durbin & Koopman version of Kalman filter algorithm for state-space model. State-space model: .. math:: x_{t} = T_{t} * x_{t-1} + C_{t} + w_{t}, w = Normal(0, Q) y_{t} = Z_{t} * x_{t} + v_{t}, v = Normal(0, H) Durbin-Koopman algorithm .. math:: v_{t} &= y_{t} - Z_{t} * x_{t} x_{t} &= T_{t} * (x_{t-1} + K_{t} * v_{t}) + C_{t} F_{t} &= H_{t} + Z_{t} * P_{t-1} * Z'_{t} K_{t} &= P_{t} * Z'_{t} * F_{t}^{-1} L_{t} &= K_{t} * Z_{t} P_{t+1} &= T_{t} * P_{t} *L'_{t-1} + Q_{t} Args: x: array, dtype=float is the variables at t+1|t+1 step, xtilde: array, dtype=float is the variables at t|t+1 forecast step, y: array, dtype=float is the observation variables, T: 2D array, dtype=float is the state-transition matrix, Z: 2D array, dtype=float is the observation matrix, P: 2D array, dtype=float is the predicted error covariance matrix, H: 2D array, dtype=float is the covariance matrix of measurement errors, Q: 2D array, dtype=float is the covariance matrix of structural errors, C: array, dtype=float is the constant term matrix, ind_non_missing: array, dtype=int are the indices of non-missing observations, tol1: number, float Kalman tolerance for reciprocal condition number, tol2: number, float diffuse Kalman tolerance for reciprocal condition number (for Finf) and the rank of Pinf For details, see Durbin & Koopman 2012, "Time series Analysis by State Space Methods" and Koopman & Durbin 2000, "Fast Filtering and Smoothing For Multivariate State Space Models" """ log_likelihood = 0 n_meas = len(y) n_states = len(x) if ind_non_missing is None: cprint("Indeces of non-missing observations are not defined - resetting it.","red") ind_non_missing = np.arange(0,n_meas) ZZ = Z[ind_non_missing] v = np.zeros(y.shape) F = np.zeros(shape=(n_meas)) K = np.zeros(shape=(n_states,n_meas)) iF,P1 = None,None #P = sp.linalg.tril(P) + np.transpose(sp.linalg.tril(P,-1)) # Check if matrices Finf, Fstar are singular bUnivariate |= np.linalg.cond(H+Z@P@Z.T,-2)<tol2 ### --------------------------------------------------------- Standard Kalman Filter if len(ind_non_missing) == 0: xn = T @ xtilde + C P = T @ P @ T.T + Q else: if not bUnivariate: ### Multivariate approach to Kalman filter iF = np.zeros(shape=(n_meas,n_meas)) # measurement residual v[ind_non_missing] = y[ind_non_missing] - ZZ @ x ### Multivariate approach to Kalman Filter F = H[np.ix_(ind_non_missing,ind_non_missing)] + ZZ @ P @ ZZ.T # pre-fit residual covariance iF[np.ix_(ind_non_missing,ind_non_missing)] = la.inv(F) # matrix inverse PZI = P @ ZZ.T @ iF[np.ix_(ind_non_missing,ind_non_missing)] K[:,ind_non_missing] = T @ PZI # Kalman gain xtilde = x + PZI @ v[ind_non_missing] # updated (a posteriori) state estimate eq (4.26) DK(2012) xn = T @ xtilde + C # predicted state estimate L = T - K[:,ind_non_missing] @ ZZ P = T @ P @ L.T + Q log_likelihood += np.log(la.det(F)) + v @ iF @ v.T else: ### Univariate approach to Kalman filter xtilde = np.copy(x) P1 = np.copy(P) for i in ind_non_missing: Z_i = Z[i] v[i] = y[i] - Z_i @ xtilde # measurement residual F_i = F[i] = Z_i @ P @ Z_i.T + H[i,i] K_i = K[:,i] = P @ Z_i.T if F_i > tol1: xtilde += K_i * v[i] / F_i P -= np.outer(K_i,K_i) / F_i log_likelihood += np.log(F_i) + v[i]**2 / F_i # Transition equations xn = T @ xtilde + C # predicted state estimate P = T @ P @ T.T + Q # Log-likelihood of the observed data set log_likelihood = -0.5 * (n_meas*np.log(2*np.pi) + log_likelihood) if iF is None: iF = la.pinv(H+Z@P@Z.T ) return xn, xtilde, v, P, P1, K, F, iF, bUnivariate, log_likelihood
[docs] def Durbin_Koopman_Non_Diffuse_Smoother(x,y,T,Z,F,iF,P,P1,QRt,K,r,v,ind_non_missing=None,bUnivariate=False,tol=1.e-6,t=-1): r""" Implement non-diffusive Durbin-Koopman smoother for state-space model. Durbin-Koopman algorithm is .. math:: r_{t-1} &= Z'_{t}*F_{t}^{-1}*v_{t} + L'_{t}**r_{t} r_{T} &= 0 N_{t-1} &= Z'_{t}*F_{t}^{-1}*Z_{t} + L'_{t}*N_{t}*L{t} N_{T} &= 0 L_{t} &= I - K_{t}*Z_{t} Smoothed variables .. math:: s_{t} = x_{t} + P_{t}*r_{t-1} Args: x: array, dtype=float is the predicted variables vector, y: array, dtype=float is the observation variables, T: 2D array, dtype=float is the state-transition matrix, Z: 2D array, dtype=float is the observation matrix, F: 2D array, dtype=float is the residual covariance matrix, iF: 2D array, dtype=float is the inverse of matrix F, P: 2D array, dtype=float is the updated error covariance matrix, QRt: 2D array, dtype=float is the product of the covariance matrix of structural errors and shock matrix, K:2D array, dtype=float is the Kalman gain matrix, r: array, dtype=float is the vector of one-step-back errors, v: array, dtype=float is the vector of one-step-ahead prediction errors, ind_non_missing: array, dtype=int are the indices of non-missing observations. bUnivariate: bool True if univariate Kalman filter is used and False otherwise, tol: number, float is Kalman tolerance for reciprocal condition number. For details, see Durbin & Koopman (2012), "Time series Analysis by State Space Methods" and Koopman & Durbin (2000), "Fast Filtering and Smoothing For Multivariate State Space Models" and Koopman & Durbin (2003), in Journal of Time Series Analysis, vol. 24(1), pp. 85-98 """ n_meas = len(Z) n = len(x) if ind_non_missing is None: cprint("Indeces of non-missing observations are not defined - resetting it.","red") ind_non_missing = np.arange(0,n_meas) rev_ind_non_missing = reversed(ind_non_missing) ZZ = Z[ind_non_missing] etahat = [np.nan]*len(QRt) L = T - K[:,ind_non_missing] @ ZZ if not bUnivariate: #----------------------------- Mutivariate smooting KF estimates # Smooth estimate r = ZZ.T @ iF[ind_non_missing,:] @ v[ind_non_missing] + L.T @ r # eq (4.39) in DK(2012) # Smooth estimate xhat = x + P @ r # eq (4.39) in DK(2012) etahat = QRt @ r # DK (2012) else: #----------------------------- Univariate approach to smooting KF estimates for i in rev_ind_non_missing: if F[i] > tol: L_i = np.eye(n) - np.outer(K[:,i],Z[i]) / F[i] r = Z[i].T / F[i] * v[i] + L_i.T @ r # DK (2012), 6.15, equation for r_{i-1} # DK (2012), below (6.15), r_{t-1}=r_{0} xhat = x + P1 @ r # DK (2012), eq (6.15) etahat = QRt @ r # DK (2012), eq. 4.63 r = T.T @ r # Smooth estimation of observation disturbance # Smooth estimation of observation disturbance res = y - Z @ xhat return xhat,r,res,etahat
[docs] def Durbin_Koopman_Diffuse_Filter(x, xtilde, y, T, Z, P, H, Q, C=0, bUnivariate = False, ind_non_missing=None, Pstar=None, Pinf=None, tol1=1.e-6, tol2=1.e-6, t=-1): r""" Implement diffuse Durbin & Koopman version of Kalman filter algorithm for state-space model. State-space model: .. math:: x_{t} = T_{t} * x_{t-1} + C_{t} + w_{t}, w = Normal(0, Q) y_{t} = Z_{t} * x_{t} + v_{t}, v = Normal(0, H) Durbin-Koopman algorithm .. math:: v_{t} &= y_{t} - Z_{t} * x_{t} x_{t} &= T_{t} * (x_{t-1} + K_{t} * v_{t}) + C_{t} F_{t} &= H_{t} + Z_{t} * P_{t-1} * Z'_{t} K_{t} &= P_{t} * Z'_{t} * F_{t}^{-1} L_{t} &= K_{t} * Z_{t} P_{t+1} &= T_{t} * P_{t} *L'_{t-1} + Q_{t} .. note:: Translated from Dynare Matlab code. Args: x: array, dtype=float is the state variables, xtilde: array, dtype=float is the variables at t|t+1 forecast step, y: array, dtype=float is the observation variables, T: 2D array, dtype=float is the state-transition matrix, Z: 2D array, dtype=float is the observation matrix, P: 2D array, dtype=float is the predicted error covariance matrix, H: 2D array, dtype=float is the covariance matrix of measurement errors, Q: 2D array, dtype=float is the covariance matrix of structural errors, C: array, dtype=float is the constant term matrix, ind_non_missing: array, dtype=int are the indices of non-missing observations, Pstar: 2D array, dtype=float is the finite part of the predicted error covariance matrix, Pinf: 2D array, dtype=float is the infinite part of the predicted error covariance matrix, tol1: number, float Kalman tolerance for reciprocal condition number, tol2: number, float diffuse Kalman tolerance for reciprocal condition number (for Finf) and the rank of Pinf For details, see Durbin & Koopman 2012, "Time series Analysis by State Space Methods" and Koopman & Durbin 2000, "Fast Filtering and Smoothing For Multivariate State Space Models" """ log_likelihood = 0 n_meas = len(y) n_states = len(x) Finf_singular = False if ind_non_missing is None: cprint("Indeces of non-missing observations are not defined - resetting it.","red") ind_non_missing = np.arange(0,n_meas) ZZ = Z[ind_non_missing] v = np.zeros(y.shape) F = np.zeros(shape=(n_meas)) Finf = np.zeros(shape=(n_meas,n_meas)) Fstar = np.zeros(shape=(n_meas,n_meas)) K = np.zeros(shape=(n_states,n_meas)) Kstar = np.zeros(shape=(n_states,n_meas)) Kinf = np.zeros(shape=(n_states,n_meas)) iF,Linf,Lstar,P1,Pstar1,Pinf1 = None,None,None,None,None,None #P = sp.linalg.tril(P) + np.transpose(sp.linalg.tril(P,-1)) P1 = np.copy(P) # Check if matrices Finf, Fstar are singular diffuse = False if not Pinf is None and not Pstar is None: bUnivariate |= np.linalg.cond(Z@Pinf@ Z.T,-2)<tol2 or np.linalg.cond(H+Z@Pstar@ Z.T,-2)<tol2 if not Pinf is None: if bUnivariate: rank = np.linalg.matrix_rank(Z@Pinf@Z.T,tol1) else: rank = np.linalg.matrix_rank(Pinf,tol1) diffuse = not (rank == 0) ### --------------------------------------------------------- Standard Kalman Filter if len(ind_non_missing) == 0: xn = T @ xtilde + C L = T Pstar = T @ P @ L.T + Q P = np.copy(Pstar) Pinf = T @ Pinf @ L.T else: if not diffuse: Kinf = Kstar = Linf = Lstar = Finf = Fstar = None if not bUnivariate: iF = np.zeros(shape=(n_meas,n_meas)) ### Multivariate approach to Kalman filter # measurement residual v[ind_non_missing] = y[ind_non_missing] - ZZ @ x ### Multivariate approach to Kalman Filter F = H[np.ix_(ind_non_missing,ind_non_missing)] + ZZ @ P @ ZZ.T # pre-fit residual covariance iF[np.ix_(ind_non_missing,ind_non_missing)] = la.inv(F) # matrix inverse PZI = P @ ZZ.T @ iF[np.ix_(ind_non_missing,ind_non_missing)] K[:,ind_non_missing] = T @ PZI # Kalman gain xtilde = x + PZI @ v[ind_non_missing] # updated (a posteriori) state estimate eq (4.26) DK(2012) xn = T @ xtilde + C # predicted state estimate L = T - K[:,ind_non_missing] @ ZZ Pstar = T @ P @ L.T + Q P = np.copy(Pstar) Pinf = T @ Pinf @ L.T log_likelihood += np.log(la.det(F)) + v @ iF @ v.T else: ### Univariate approach to Kalman filter xtilde = np.copy(x) for i in ind_non_missing: Z_i = Z[i] v[i] = y[i] - Z_i @ xtilde # measurement residual F_i = F[i] = Z_i @ P @ Z_i.T + H[i,i] K_i = K[:,i] = P @ Z_i.T if F_i > tol1: xtilde += K_i * v[i] / F_i P -= np.outer(K_i,K_i) / F_i log_likelihood += np.log(F_i) + v[i]**2 / F_i # Transition equations xn = T @ xtilde + C # predicted state estimate Pstar = T @ P @ T.T + Q P = np.copy(Pstar) Pinf = T @ Pinf @ L.T ### --------------------------------------------------------- Diffuse Kalman Filter else: if not bUnivariate: ### Multivariate approach to KF # measurement residual v[ind_non_missing] = y[ind_non_missing] - ZZ @ x skip = False xtilde = np.copy(x) Finf[np.ix_(ind_non_missing,ind_non_missing)] = ZZ @ Pinf @ ZZ.T Fstar[np.ix_(ind_non_missing,ind_non_missing)] = ZZ @ Pstar @ ZZ.T + H[np.ix_(ind_non_missing,ind_non_missing)] # Check if matrix Finf is singular if la.det(Finf) < tol1: # Matrix Finf is singular if not np.all(np.abs(Finf) < tol1): # The univariate diffuse Kalman filter should be used instead. bUnivariate = True skip = True else: Finf_singular = True if la.det(Fstar[np.ix_(ind_non_missing,ind_non_missing)]) < tol2: if not np.all(np.abs(Fstar) < tol2): # The univariate diffuse Kalman filter should be used. bUnivariate = True skip = True else: #rank 0 Pinf = T @ Pinf @ T.T # eq (5.16) Pstar = T @ Pstar @ T.T + Q xn = T @ x + C log_likelihood += np.log(la.det(Finf)) + v @ la.pinv(Finf) @ v.T else: # Matrix Fstar is positive definite iFstar = la.inv(Fstar[np.ix_(ind_non_missing,ind_non_missing)]) Kstar[:,ind_non_missing] = Pstar @ ZZ.T @ iFstar # K0 eq (5.15) in DK(2012) Lstar = T - T @ Kstar[:,ind_non_missing] @ ZZ # L0 eq (5.15) Pinf = T @ Pinf @ T.T # eq (5.16) Pstar = T @ Pstar @ Lstar.T + Q P = np.copy(Pstar) x += Kstar @ v xn = T @ x + C log_likelihood += np.log(la.det(Fstar)) + v @ iFstar @ v.T elif not skip: # Matrix Finf is positive definite iFinf = la.inv(Finf[np.ix_(ind_non_missing,ind_non_missing)]) Kinf[:,ind_non_missing] = Pinf @ ZZ.T @ iFinf # K0 eq (5.12) Linf = T - T @Kinf[:,ind_non_missing] @ ZZ # L0 eq (5.12) Kstar[:,ind_non_missing] = (Pstar @ ZZ.T - Kinf[:,ind_non_missing] \ @ Fstar[np.ix_(ind_non_missing,ind_non_missing)] ) @ iFinf # K1 in eq (5.12) Pstar = T @ Pstar @ Linf.T \ - T @ Kinf[:,ind_non_missing] @ Finf @ Kstar[:,ind_non_missing].T @ T.T + Q # eq (5.14) Pinf = T @ Pinf @ Linf.T # eq (5.14) P = np.copy(Pstar) xtilde = x + Kinf @ v xn = T @ xtilde + C log_likelihood += np.log(la.det(Finf)) + v @ iFinf @ v if bUnivariate: ### Univariate approach to KF Fstar = np.zeros(n_meas) Finf = np.zeros(n_meas) Pstar1 = np.copy(Pstar) Pinf1 = np.copy(Pinf) xtilde = np.copy(x) for i in ind_non_missing: Z_i = Z[i] v[i] = y[i] - Z_i @ xtilde # measurement residual F[i] = Fstar[i] = Z_i @ Pstar @ Z_i.T + H[i,i] # eq (15) in KD(2000) Finf[i] = Z_i @ Pinf @ Z_i.T # eq (15) K[:,i] = Kstar[:,i] = Kstar_i = Pstar @ Z_i.T # eq (15) Kinf[:,i] = Kinf_i = Pinf @ Z_i.T # eq (15) if abs(Finf[i]) > tol1: Kinf_Finf = Kinf_i / Finf[i] xtilde += Kinf_Finf * v[i] # eq (16) Pstar += np.outer(Kinf_i,Kinf_Finf) * Fstar[i]/Finf[i] \ - np.outer(Kstar_i,Kinf_Finf) \ - np.outer(Kinf_Finf,Kstar_i) Pinf -= np.outer(Kinf_i,Kinf_i.T)/Finf[i] # eq (16) log_likelihood += np.log(Finf[i]) + v[i]**2 / Finf[i] elif abs(Fstar[i]) > tol2: xtilde += Kstar_i * v[i] / Fstar[i] # eq (17) Pstar -= np.outer(Kstar_i,Kstar_i) / Fstar[i] # eq (17) log_likelihood += np.log(Fstar[i]) + v[i]**2 / Fstar[i] # Transition equations xn = T @ xtilde + C Pstar = T @ Pstar @ T.T + Q # eq (18) Pinf = T @ Pinf @ T.T # eq (18) P = np.copy(Pstar) if bUnivariate: rank = np.linalg.matrix_rank(Z@Pinf@Z.T,tol1) else: rank = np.linalg.matrix_rank(Pinf,tol1) # Log-likelihood of the observed data set log_likelihood = -0.5 * (n_meas*np.log(2*np.pi) + log_likelihood) if iF is None: iF = la.pinv(H+Z@P@Z.T ) return xn, xtilde, v, P, P1, K, Kinf, Kstar, L, Linf, Lstar, F, Finf, Fstar, iF, log_likelihood, diffuse, Pstar, Pinf, Pstar1, Pinf1, Finf_singular, bUnivariate
[docs] def Durbin_Koopman_Diffuse_Smoother(x,y,T,Z,F,iF,QRt,K,Kstar,Kinf,H, P,P1,Pstar,Pinf,Pstar1,Pinf1,v,r,r0,r1, Linf=None,Lstar=None,ind_non_missing=None, diffuse=True,Finf_singular=False,bUnivariate=False, tol1=1.e-6,tol2=1.e-6,t=-1): r""" Implement diffuse Durbin-Koopman smoother for state-space model. Durbin-Koopman algorithm is .. math:: r_{t-1} &= Z'_{t}*F_{t}^{-1}*v_{t} + L'_{t}**r_{t} r_{T} &= 0 L_{t} &= I - K_{t}*Z_{t} Smoothed variables .. math:: s_{t} = x_{t} + P_{t}*r_{t-1} .. note:: Translated from Dynare Matlab code. Args: x: array, dtype=float is the predicted state variables vector, y: array, dtype=float is the observation variables, T: 2D array, dtype=float is the state-transition matrix, Z: 2D array, dtype=float is the observation matrix, F: 2D array, dtype=float is the residual covariance matrix, iF: 2D array, dtype=float is the inverse of matrix F, QRt: 2D array, dtype=float is the product of the covariance matrix of structural errors and shock matrix K: 2D array, dtype=float is the Kalman gain matrix, Kinf: 2D array, dtype=float is the infinite part of the Kalman gain matrix, Kstar: 2D array, dtype=float is the finite part of the Kalman gain matrix, H: 2D array, dtype=float is the observation errors covariance matrix, P: 2D array, dtype=float is the updated error covariance matrix P, P1:2D array, dtype=float is the updated error covariance matrix P1, Pstar: 2D array, dtype=float is the finite part of P matrix, Pinf: 2D array, dtype=float is the infinite part of P matrix, Pstar1: 2D array, dtype=float is the finite part of P1 matrix, Pinf1: 2D array, dtype=float is the infinite part of P1 matrix, ind_non_missing: array, dtype=int are the indices of non-missing observations. v: array, dtype=float is the vector of one-step-ahead prediction errors, r: array, dtype=float is a smoothed state vector, r0: array, dtype=float is zero approximation in a Taylor expansion of a smoothed state vector, r1: array, dtype=float is the first approximation in a Taylor expansion of a smoothed state vector, Pstar: 2D array, dtype=float is the finite part of the predicted error covariance matrix, Pinf: 2D array, dtype=float is the infinite part of the predicted error covariance matrix, Finf_singular: bool True if Finf is a singular matrix and False otherwise, bUnivariate: bool True if univariate Kalman filter is used and False otherwise, tol1: number, float is Kalman tolerance for reciprocal condition number, tol2: number, float is diffuse Kalman tolerance for reciprocal condition number (for Finf) and the rank of Pinf For details, see Durbin & Koopman (2012), "Time series Analysis by State Space Methods" and Koopman & Durbin (2000), "Fast Filtering and Smoothing For Multivariate State Space Models" and Koopman & Durbin (2003), in Journal of Time Series Analysis, vol. 24(1), pp. 85-98) """ n_meas = len(Z) n = len(x) if ind_non_missing is None: cprint("Indeces of non-missing observations are not defined - resetting it.","red") ind_non_missing = np.arange(0,n_meas) rev_ind_non_missing = reversed(ind_non_missing) ZZ = Z[ind_non_missing] Finf = ZZ @ Pinf @ ZZ.T Fstar = ZZ @ Pstar @ ZZ.T L = T - T@K[:,ind_non_missing] @ ZZ etahat = [np.nan]*len(QRt) if not diffuse: # Standard Kalman smoother if not bUnivariate: #----------------------------- Mutivariate smooting KF estimates # Smooth estimate r = ZZ.T @ iF[ind_non_missing,:] @ v + L.T @ r # eq (4.39) in DK(2012) # Smooth estimate xhat = x + P @ r # eq (4.39) in DK(2012) etahat = QRt @ r # DK (2012) else: #----------------------------- Univariate approach to smooting KF estimates for i in rev_ind_non_missing: K_i = K[:,i] if F[i] > tol1: L_i = np.eye(n) - np.outer(K_i,Z[i]) / F[i] r = Z[i].T / F[i] * v[i] + L_i.T @ r # DK (2012), 6.15, equation for r_{i-1} # DK (2012), below (6.15), r_{t-1}=r_{0} xhat = x + P1 @ r # DK (2012), eq (6.15) etahat = QRt @ r # Smooth estimation of observation disturbance else: #-----------------------------------> Diffuse Filter: time is within diffuse periods if not bUnivariate: #------------------------------------ Mutivariate smooting KF estimates if len(ind_non_missing) == 0: r1 = Linf.T @ r1 else: if not Finf_singular: iFinf = la.inv(Finf[np.ix_(ind_non_missing,ind_non_missing)]) r0 = Linf.T @ r0 # DK (2012), eq (5.21) where L^(0) is named Linf r1 = ZZ.T @ (iFinf @ v[ind_non_missing] \ - Kstar[:,ind_non_missing].T @ r0) \ + Linf.T @ r1 else: iFstar = la.inv(Fstar[np.ix_(ind_non_missing,ind_non_missing)]) r0 = ZZ.T @ iFstar @ v[ind_non_missing] - Lstar.T @ r0 # DK (2003), eq (14) r1 = T.T @ r1 # DK (2003), eq (14) xhat = x + Pstar @ r0 + Pinf @ r1 # DK (2012), eq (5.23) # Smooth estimation of observation disturbance etahat = QRt @ r0 # DK (2012), p. 135 else: #------------------------------------ Univariate approach to smooting KF estimates for i in rev_ind_non_missing: Z_i = Z[i] Fstar_i = Z_i @ Pstar @ Z_i.T + H[i,i] # eq (15) in KD(2000) Finf_i = Z_i @ Pinf @ Z_i.T # eq (15) Kstar_i = Kstar[i] # eq (15) Kinf_i = Kinf[i] if abs(Finf_i) > tol1: Linf_i = np.eye(n) - np.outer(Kinf_i,Z_i)/Finf_i Lstar_i = np.outer(Kinf_i*Fstar_i/Finf_i-Kstar_i, Z_i) / Finf_i r1 = Z_i.T * v[i] / Finf_i + Lstar_i @ r0 \ + Linf_i.T @ r1 # KD (2000), eq. (25) for r_1 r0 = Linf_i.T @ r0 elif abs(Fstar_i) > tol2: # this step needed when Finf == 0 L_i = np.eye(n) - np.outer(Kstar_i,Z_i) / Fstar_i r0 = Z_i.T / Fstar_i * v[i] + L_i.T @ r0 # propagate r0 and keep r1 fixed xhat = x + Pstar1 @ r0 + Pinf1 @ r1 # DK (2012), eq (5.23) etahat = QRt @ r0 # DK (2012) r0 = T @ r0 r1 = T @ r1 # Smooth estimation of observation disturbance res = y - Z @ xhat return xhat,r,r0,r1,res,etahat
if __name__ == '__main__': """ The test of implementation of Kalman filter for state-space model: .. math:: a(t+1) = T * a(t) + eta(t), eta(t) ~ N(0,Q) y(t) = Z * a(t) + eps(t), eps(t) ~ N(0,H) a(0) ~ N(a0, P0) """ import matplotlib.pyplot as plt # Parameters nobs = 1000 sigma = 0.1 # Example dataset u = np.geomspace(1,10,nobs) phi = 1+np.log(u[nobs-1]/u[0])/(nobs-1) np.random.seed(1234) eps = 0.5*np.random.normal(scale=sigma, size=nobs) x=np.zeros((nobs,1)) x[:,0] = np.sin(10*u/np.pi) y=np.zeros((nobs,1)) y[:,0] = np.sin(10*(u+eps)/np.pi) # State-Space Z = np.ones((1,1)) T = phi*np.ones((1,1)) Q = sigma**2*np.ones((1,1)) H = np.ones((1,1)) P = Q ind_non_missing = np.arange(0,1) yk = [] # Run the Kalman filter for t in range(nobs): xk,res,Q = Kalman_Filter(x=x[t],y=y[t],T=T,Z=Z,P=P,C=0,Q=Q,H=H,ind_non_missing=ind_non_missing)[:3] yk.append(xk) yk=np.array(yk) # Run the HP filter yf = LRXfilter(y[:,0], 1600)[0] y = np.squeeze(y) x = np.squeeze(x) yk = np.squeeze(yk) plt.close("all") plt.figure(figsize=(10, 8)) plt.plot(range(len(y)),y,'o',lw=1,label='Data') plt.plot(range(len(x)),x,lw=3,label='True Solution') plt.plot(range(len(yk)),yk,lw=2,label='Kalman Filter') plt.plot(range(len(yf)),yf,lw=1,label='HP Filter') plt.title('Filters',fontsize = 20) plt.legend(fontsize = 15) plt.grid(True) plt.show()