Source code for snowdrop.src.utils.d2s

### Class D2S retrieves variables position

[docs] class D2S: """Declare member variables.""" N_backward = -1 N_forward = -1 N_static = -1 systemid = None variables = None meas_variables = None shock_variables = None def __init__(self): """Pre-allocate member lists.""" # Pre-allocate vectors of positions in derivative matrices. self.y_ = list() self.xu1_ = list() self.xp1_ = list() self.xp_ = list() self.e_ = list() # Pre-allocate vectors of positions in unsolved system matrices #--------------------------------------------------------------- self.y = list() self.xu1 = list() self.xp1 = list() self.xp = list() self.e = list() # Pre-allocate identity and duplicate vectors self.ident_ = list() self.ident1_ = list() self.ident = list() self.ident1 = list() # Pre-allocate identity and duplicate vectors self.c = list()
[docs] def order(self): """ Re-order variables.""" un_predetermined = self.xu1_ self.xu1 = reorder(un_predetermined,self.xu1) self.xp = reorder(un_predetermined,self.xp) self.xp1 = reorder(un_predetermined,self.xp1) self.ident = reorder(un_predetermined,self.ident) self.ident1 = reorder(un_predetermined,self.ident1)
def __str__(self): """ Representation of D2S class object as a string. :param self: D2S object. :type self: D2S. """ s = u''' D2S: ------ N_forward = {0} N_backward = {1} N_static = {2} systemid = {3}\n transition equations: A * [xb+;xf+] + B * [xb;xf] + E * e + K = 0 A[:,(xp+,xu+)] = Jacob[:,(xp+_,xu+_)]\n'''.format(self.N_forward,self.N_backward,self.N_static,self.systemid,self.variables) s += 'xu+ = {0}\n'.format(self.xu1) s += 'xu+_ = {0}\n'.format(self.xu1_) s += 'xp+ = {0}\n'''.format(self.xp1) s += 'xp+_ = {0}\n\n'''.format(self.xp1_) s += 'B[:,xp] = Jacob[:,xp_]\n' s += 'xp = {0}\n'''.format(self.xp) s += 'xp_ = {0}\n\n'''.format(self.xp_) s += 'E[:,e] = Jacob[:,e_]\n' s += 'e = {0}\n'.format(self.e) s += 'e_ = {0}\n\n'.format(self.e_) s += 'Measurement equations: A * y + B * xb+ + E * e + K1 = 0\n' s += 'y = {0}\n'.format(self.y) s += 'y_ = {0}\n\n'.format(self.y_) s += 'Identity equations:\n' s += 'ident1 = {0}\n'.format(self.ident1) s += 'ident = {0}\n\n'.format(self.ident) s += 'Constants:\n' s += 'c = {0}\n'.format(self.c) return s def __repr__(self): """ Official representation of D2S class object. Parameters: :param self: D2S object. :type self: D2S. """ return self.__str__()
[docs] def reorder(un_predetermined,lst): """Reorder list.""" un = [] for x in un_predetermined: if x in lst: un.append(x) order = un + [i for i in lst if not i in un] return order
[docs] def myd2s(model,debug=False) -> D2S: """ Create derivative-to-system convertor. Args: model : Model Instance of Model class. Returns: None. """ import numpy as np #from snowdrop.src.utils.equations import getTopology from snowdrop.src.utils.equations import getLeadLagIncidence # Create 'structure' object d2s = D2S() # Get lead Lag incidence matrix lli = getLeadLagIncidence(model) var = model.symbols['variables'] if 'measurement_variables' in model.symbols: meas_var = model.symbols['measurement_variables'] else: meas_var = [] measVars = [x.lower() for x in meas_var] shock_var = model.symbols['shocks'] d2s.variables = var d2s.meas_variables = meas_var d2s.shock_variables = shock_var # Find indices of variables in measurement equations meas_ind = list() for mv,x in zip(meas_var,measVars): if "_meas" in x: v = mv[:-5] elif "obs_" in x: v = mv[4:] lst = [i for i,x in enumerate(var) if x==v] if bool(lst): meas_ind.append(lst[0]) predetermined = [int(b) for b in lli[0] if not np.isnan(b)] un_predetermined = [int(f) for f in lli[2] if not np.isnan(f)] static = [int(c) for b,c,f in zip(lli[0],lli[1],lli[2]) if not np.isnan(c) and np.isnan(b) and np.isnan(f)] nb = len(predetermined) nu = len(un_predetermined) ny = len(meas_var) nxx = len(var) ne = len(shock_var) nx = nxx + nu # Find min lag `minSh`, and max lead, `maxSh`, for each tratransition equations: A [xf+;xb+] + B [xf;xb] + E e + K = 0nsition variable. minSh = np.array([0 if np.isnan(b) else -1 for b in lli[0]]) # If `x(t-k)` occurs in measurement equations then add k-1 lag minSh = np.array([-1 if i in meas_ind else minSh[i] for i in range(len(minSh))]) maxSh = np.array([0 if np.isnan(f) else 1 for f in lli[2]]) # If `minSh(i)` == `maxSh(i)` == 0, add an artificial lead to treat the variable as forward-looking (to reduce state space), # and to guarantee that all variables will have `maxShift > minShift`. maxSh = np.array([1 if (b==0 and f==0) else f for b,f in zip(minSh,maxSh)]) # Minimum lag and maximum lead min_lag = min(minSh) max_lead = max(maxSh) systemid = list() for k in range(max_lead,min_lag-1,-1): # Add transition variables with this shift. b = (k >= minSh) * (k < maxSh) ind = [(k,i,var[i]) for i in range(nxx) if b[i]] systemid += list(ind) d2s.systemid = systemid ### Measurement equations: A1*y + B1*xb+ + E1*e + K1 = 0 #-------------------------------------------------------- # Transition equations: A [xb+;xf+] + B [xb;xf] + E e + K = 0 #------------------------------------------------------------ d2s.xu1 = np.arange(nxx,nx) d2s.xu1_ = np.array(un_predetermined,dtype=int) d2s.xp1 = np.arange(nxx) d2s.xp1_ = np.arange(nxx,2*nxx) d2s.xp = np.arange(nxx) d2s.xp_ = np.arange(2*nxx,3*nxx) lst = []; lst1 = [] # Identity equations for i in range(nu): v1 = np.zeros(nx,dtype=int) col = un_predetermined[i] v1[col] = 1 lst1.append(v1) v = np.zeros(nx,dtype=int) v[nxx+i] = -1 lst.append(v) d2s.ident = np.arange(nx) d2s.ident1 = np.arange(nx) d2s.ident_ = np.array(lst) d2s.ident1_ = np.array(lst1) # Shocks #-------- d2s.e_ = np.arange(3*nxx,3*nxx+ne) d2s.e = np.arange(ne) # Constants # -------- d2s.c = np.arange(nxx+nu) d2s.N_backward = nb d2s.N_forward = nu d2s.N_static = len(static) # Reoder variables so that un-predetermined variables come first. #TODO: need additional work. #d2s.order() if debug: print(d2s) return d2s