#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Mar 24 17:01:29 2021
@author: alexei
"""
import os, sys
import pandas
import numpy as np
import datetime as dt
import matplotlib.pyplot as plt
from scipy.optimize import root
from scipy.optimize import minimize,Bounds
working_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),"../../.."))
os.chdir(working_dir)
from snowdrop.src.misc.termcolor import cprint
I=None; R=None; S=None; DD=None
v=None;iw=None;iv=None;ivr=None;rw=None;rv=None
# Initial conditions
Standard = False
N=800; i0=1.e-5; s0=1; r0=0; d0=0; pop0=1
#pi=0.05; pr=0.04; pd=0.0003
pi=0.39/7; pr=0.3/7; pd=0.0017/7 # first strain
pi2=2.6*pi; pd2=pd/3; sigma=0.e-2; theta=0.9 # second strain
pv = 0.0014/7 # Vaccination rate per day.
pmu = 0. # Probability of getting virus strain once vaccinated.
start = 400 # Time when delta variant virus kicks in.
[docs]
def Virus(x):
if Standard:
return SIR(x) # SIR model
else:
#return SIRS(x) # SIR model with wildtype and resistant virus strain
return SIRV(x) # SIR model with vaccination and wildtype and resistant virus strain
# return SIRVS(x) # SIR model with vaccination and wildtype and resistant virus strain
[docs]
def SIR(x):
"""SIR model."""
pi,pr,pd = x
tau = np.zeros(N)
s = np.zeros(N); s[0] = s0
i = np.zeros(N); i[0] = i0
r = np.zeros(N); r[0] = r0
d = np.zeros(N); d[0] = d0
pop = np.zeros(N); pop[0] = pop0
for t in range(N-1):
# New infections
tau[t+1] = pi*s[t]*i[t]
# Total susceptibles
s[t+1] = max(0,s[t]-tau[t+1]-pv*s[t])
# Total infected
i[t+1] = max(0,i[t] + tau[t+1] - (pr+pd)*i[t])
# Total recovered
r[t+1] = r[t] + pr*i[t] + pv*s[t]
# Total deaths
d[t+1] = d[t] + pd*i[t]
# Total population
pop[t+1] = max(0,pop[t] - pd*i[t])
return tau,s,i,r,d,pop
[docs]
def SIRS(x):
"""SIR model with wildtype and resistant virus strain."""
pi,pr,pd = x
tau_v = np.zeros(N); tau_s = np.zeros(N)
s = np.zeros(N); s[0] = s0
i = np.zeros(N)
i_v = np.zeros(N); i_v[0] = i0
i_s = np.zeros(N); i_s[start] = 0.1*i0
r_v = np.zeros(N); r_v[0] = r0
r_s = np.zeros(N); r_s[0] = r0
d = np.zeros(N); d[0] = d0
pop = np.zeros(N); pop[0] = pop0
for t in range(N-1):
# New wildtype infections
tau_v[t+1] = pi*s[t]*i_v[t]
i_v[t+1] = max(0,i_v[t] + tau_v[t+1] - (pr+pd)*i_v[t])
r_v[t+1] = r_v[t] + pr*i_v[t]
# Virus strain infected
if t >= start:
tau_s[t+1] = pi2*s[t]*i_s[t]
i_s[t+1] = max(0,i_s[t] + tau_s[t+1] - (pr+pd2)*i_s[t])
r_s[t+1] = r_s[t] + pr*i_s[t]
# Total susceptibles
s[t+1] = max(0,s[t]-tau_v[t+1]-tau_s[t+1])
# Total infected
i[t+1] = i_v[t+1] + i_s[t+1]
# Total deaths
d[t+1] = d[t] + pd*i_v[t] + pd2*i_s[t]
# Total population
pop[t+1] = max(0,pop[t] - pd*i_v[t] - pd2*i_s[t])
return tau_v+tau_s,s,i,r_v+r_s,d,pop
[docs]
def SIRV(x):
"""SIR model with two virus strains and vaccination."""
pi,pr,pd = x
v = np.zeros(N)
tau_v = np.zeros(N); tau_s = np.zeros(N)
s = np.zeros(N); s[0] = s0
i = np.zeros(N)
i_v = np.zeros(N); i_v[0] = i0
i_s = np.zeros(N); i_s[start] = 0.1*i0
r = np.zeros(N); r[0] = r0
d = np.zeros(N); d[0] = d0
pop = np.zeros(N); pop[0] = pop0
for t in range(N-1):
# Newly vaccinated
v[t] = pv*s[t]
# Wildtype virus infected population
tau_v[t+1] = pi*s[t]*i_v[t]
i_v[t+1] = max(0,i_v[t] + tau_v[t+1] - (pr+pd)*i_v[t])
# Virus variant infected population
if t >= start:
tau_s[t+1] = pi2*s[t]*i_s[t]
i_s[t+1] = max(0,i_s[t] + tau_s[t+1] - (pr+pd2)*i_s[t])
# Susceptible
s[t+1] = max(0,s[t] + sigma*r[t] - tau_v[t+1] - tau_s[t+1] - v[t])
# Total infected
i[t+1] = i_v[t+1] + i_s[t+1]
# Recovered
r[t+1] = r[t] + pr*i[t] - sigma*r[t] + v[t]
# Deceased
d[t+1] = d[t] + pd*i_v[t] + pd2*i_s[t]
# Total population
pop[t+1] = max(0,pop[t] - pd*i_v[t] - pd2*i_s[t])
#total = s+i+v+r+d
return tau_v+tau_s,s,i,r,d,pop
[docs]
def SIRVS(x):
"""SIR model with wildtype and resistant virus strain."""
global v,iw,iv,ivr,rw,rv
pi,pr,pd = x
pmu = 0
tau = np.zeros(N)
s = s0 + np.zeros(N)
iw = i0 + np.zeros(N) # wildtype virus infected
iv = np.zeros(N) # resitant virus strain infected
ivr = np.zeros(N) # vaccinated that become infected again by resitant virus strain
i = iw + iv + ivr
rw = r0 + np.zeros(N)
rv = np.zeros(N)
r = rw + rv
v = np.zeros(N)
d = d0 + np.zeros(N)
pop = pop0 + np.zeros(N)
for t in range(N-1):
# New infections
tau[t+1] = pi*s[t]*iw[t] + pi2*s[t]*(iv[t]+ivr[t])
# Total susceptibles
s[t+1] = s[t] + pmu*rw[t] - pv*s[t] - (pi*iw[t]+pi2*(iv[t]+ivr[t]))*s[t]
# Wildtype infected
iw[t+1] = max(0,iw[t] - (pr+pd)*iw[t] + pi*s[t]*iw[t])
# Virus strain infected
if t == start:
iv[t] = 0.1*i0
ivr[t] = 0.1*i0
iv[t+1] = max(0,iv[t] - (pr+pd)*iv[t] + pi2*s[t]*(iv[t]+ivr[t]))
ivr[t+1] = max(0,ivr[t] - (pr+pd)*ivr[t] + pi2*v[t]*(iv[t]+ivr[t]))
elif t > start:
iv[t+1] = max(0,iv[t] - (pr+pd)*iv[t] + pi2*s[t]*(iv[t]+ivr[t]))
ivr[t+1] = max(0,ivr[t] - (pr+pd)*ivr[t] + pi2*v[t]*(iv[t]+ivr[t]))
else:
iv[t+1] = 0
ivr[t+1] = 0
# Total infected
i[t+1] = iw[t+1] + iv[t+1] + ivr[t+1]
# Wildtype recovered
rw[t+1] = max(0,rw[t] - pmu*rw[t] + pr*(iw[t]+iv[t]))
# Virus strain recovered
rv[t+1] = max(0,rv[t] - pmu*rv[t] + pr*ivr[t])
# Total recovered
r[t+1] = rw[t] + rv[t]
# Total deaths
d[t+1] = d[t] + pd*i[t]
# Vaccinated
v[t+1] = max(0,v[t] + pmu*rv[t] + pv*s[t] - pi2*v[t]*(iv[t]+ivr[t]))
# Total population
pop[t+1] = max(0,pop[t] - pd*i[t])
return tau,s,i,r,d,pop
[docs]
def func(x):
"""Calibration function."""
tau,s,i,r,d,pop = Virus(x)
err1 = np.sum(100*i[:T]-I.values)
err2 = 0
err3 = np.sum(100*d[:T]-DD.values)
# Calculate residuals
err = np.array([err1,err2,err3])
return err
[docs]
def func_squared(x):
"""Error function for constrained calibration."""
y = func(x)
return np.sum(y*y)
[docs]
def getData(start,end,country=None,save=True):
"""Read data files."""
db = {}
pop={"Germany":83.8*10**6,"US":331*10**6,"France":67*10**6,"Italy":60.5*10**6, "Spain":46.8}
population = pop[country]
file_path = os.path.abspath(os.path.join(working_dir,'supplements/data/COVID19/US_data.xlsx'))
df = pandas.read_excel(file_path,index_col=0,parse_dates=True)
index = pandas.to_datetime(df.columns)
data = df.iloc[1].values.astype('float64') / population
Itotal = pandas.Series(data,index)
data = df.iloc[0].values.astype('float64') / population
I = pandas.Series(data,index)
data = df.iloc[3].values.astype('float64') / population
D = pandas.Series(data,index)
data = df.iloc[4].values.astype('float64') / population
R = pandas.Series(data,index)
#R = Itotal - I - D
ind1 = I.index >= dt.datetime.strptime(start,"%Y-%m-%d")
ind2 = I.index < dt.datetime.strptime(end,"%Y-%m-%d")
I = I[ind1*ind2]
Itotal = Itotal[ind1*ind2]
ind1 = D.index >= dt.datetime.strptime(start,"%Y-%m-%d")
ind2 = D.index < dt.datetime.strptime(end,"%Y-%m-%d")
D = D[ind1*ind2]
ind1 = R.index >= dt.datetime.strptime(start,"%Y-%m-%d")
ind2 = R.index <= dt.datetime.strptime(end,"%Y-%m-%d")
R = R[ind1*ind2]
S = 1 - Itotal - D
#I -= R + D
# Get daily infection and death rates.
# beta = np.diff(I)/I.iloc[1:]/S.iloc[1:]
# gamma = np.diff(D)/I.iloc[1:]
# delta = np.diff(R)/I.iloc[1:]
beta = -(Itotal-Itotal.shift(-7))/Itotal.shift(-3)/S.shift(-3)
beta[beta<0] = 0
gamma = (D.shift(-7)-D)/I.shift(-3)
delta = (R.shift(-7)-R)/I.shift(-3)
delta[delta<0] = 0
delta2 = (R.shift(-7)-R)/(I.shift(-7)-I)
# delta2 = np.diff(R)/np.diff(I);
# delta2 = pandas.Series(delta2,I.index[1:])
Ro = beta/(gamma+delta)
Ro[Ro<0] = np.nan
Ro[Ro>3] = np.nan
beta = beta.dropna()
gamma = gamma.dropna()
delta = delta.dropna()
delta2 = delta2.dropna()
Ro = Ro.dropna()
cycle_Ro, trend_Ro = filter_solution(Ro.values)
series_Ro_trend = pandas.Series(trend_Ro,Ro.index)
#series_Ro_cycle = pandas.Series(cycle_Ro,Ro.index)
cycle_infected_total, trend_infected_total = filter_solution(I.values)
series_infected_total_trend = pandas.Series(trend_infected_total,I.index)
#series_infected_total_cycle = pandas.Series(cycle_infected_total,I.index)
cycle_infected, trend_infected = filter_solution(I.values)
series_infected_trend = pandas.Series(trend_infected,I.index)
#series_infected_cycle = pandas.Series(cycle_infected,I.index)
cycle_recovered, trend_recovered = filter_solution(R.values)
series_recovered_trend = pandas.Series(trend_recovered,R.index)
#series_recovered_cycle = pandas.Series(cycle_recovered,R.index)
cycle_suspected, trend_suspected = filter_solution(S.values)
series_suspected_trend = pandas.Series(trend_suspected,S.index)
#series_suspected_cycle = pandas.Series(cycle_suspected,S.index)
cycle_deceased, trend_deceased = filter_solution(D.values)
series_deceased_trend = pandas.Series(trend_deceased,D.index)
#series_deceased_cycle = pandas.Series(cycle_deceased,D.index)
cycle_delta, trend_delta = filter_solution(delta.values)
series_delta_trend = pandas.Series(trend_delta,delta.index)
#series_delta_cycle = pandas.Series(cycle_delta,delta.index)
#cycle_delta2, trend_delta2 = filter_solution(delta2.values)
#series_delta2_trend = pandas.Series(trend_delta2,delta2.index)
#series_delta2_cycle = pandas.Series(cycle_delta2,delta2.index)
cycle_beta, trend_beta = filter_solution(beta.values)
series_beta_trend = pandas.Series(trend_beta,beta.index)
#series_beta_cycle = pandas.Series(cycle_beta,beta.index)
cycle_gamma, trend_gamma = filter_solution(gamma.values)
series_gamma_trend = pandas.Series(trend_gamma,gamma.index)
#series_gamma_cycle = pandas.Series(cycle_gamma,gamma.index)
db[r"$\beta$"] = [100*beta,100*series_beta_trend]
db[r"$\gamma$"] = [100*gamma,100*series_gamma_trend]
db[r"$\mu$"] = [100*delta,100*series_delta_trend]
#db[r"$\mu_2$"] = [100*delta2,100*series_delta2_trend]
db["Basic Reproduction Number"] = [Ro,series_Ro_trend]
db["Infected"] = [100*I,100*series_infected_trend]
db["Total Infected"] = [100*Itotal,100*series_infected_total_trend]
db["Recovered"] = [100*R,100*series_recovered_trend]
db["Susceptibles"] = [100*S,100*series_suspected_trend]
db["Deaths"] = [100*D,100*series_deceased_trend]
m = {"Beta":series_beta_trend,"Gamma":series_gamma_trend,"Delta":series_delta_trend,"OBS_i":I,"OBS_itotal":Itotal,"OBS_d":D,"OBS_r":R}
df = pandas.DataFrame(m)
# Convert daily frequency to weekly
df = df.resample("W").mean()
df.to_csv(os.path.join(working_dir,"supplements/data/COVID19/epidemic.csv"))
#print(df)
return db
[docs]
def plot_data(db,country,fname=None,save=True,show=True,legend=None,sizes=[1,1]):
"""Plots data."""
m = 0
r,c = sizes
fig, axes = plt.subplots(r,c,figsize=(8,8))
fig.tight_layout(rect=[0, 0.03, 1, 0.95])
for k in db:
m += 1
if m <= r*c:
ax = plt.subplot(r,c,m)
series = db[k]
for i,ser in enumerate(series):
lw = 2 if i < 2 else 1
ser.plot(ax=ax,lw=lw)
plt.box(True)
plt.grid(True)
plt.title(k,fontsize=20)
plt.rc('xtick', labelsize=15)
plt.rc('ytick', labelsize=15)
plt.ylabel("Percent",fontsize=15)
if not legend is None:
plt.legend(legend,fontsize=15)
plt.tight_layout()
plt.subplots_adjust(top=0.85)
#plt.suptitle(country,fontsize=30)
if save:
path_to_dir = os.path.join(working_dir,"graphs")
plt.savefig(os.path.join(path_to_dir,fname))
if show:
plt.show(block=False)
[docs]
def filter_solution(data,lmbd=1600):
"""Filter data."""
from statsmodels.tsa.filters.hp_filter import hpfilter
y = hpfilter(data,lmbd)
return y
[docs]
def calibrate(x0):
"""Calibrate pi1, pi2 and pi3 using SIR model."""
#Unconstrained optimization
sol = root(func,x0,method='lm',tol=1e-7,options={"maxiter":1000})
x0 = sol.x
# Constrained optimization.
# We assume that weekly infaction rate can not be smaller than the sum of
# weekly recovery and death rates.
#bounds = Bounds([0,0,0],[1,1,1]) if len(x0)==3 else Bounds([0,0,0,-1],[1,1,1,1])
#sol = minimize(func_squared,x0,method='trust-constr',bounds=bounds)
err = abs(sol.fun)
status = sol.status
if not sol.success:
cprint(f"\nCould not calibrate SIR model: error={err}, status={status}\n","red")
else:
cprint(f"\nCalibrated SIR model: error={np.max(err):.2e}, solution={sol.x}","green")
return sol.x
if __name__ == '__main__':
"""The main entry point."""
country = "US"
db = getData("2020-8-1","2023-1-1",country)
plot_data(db,country,fname=country+"_data1.png",legend=["Data","HP Filter"],sizes=[2,1])
beta = db[r"$\beta$"][0]
gamma = db[r"$\gamma$"][0]
delta = db[r"$\mu$"][0]
#delta2 = db[r"$\mu_2$"][0]
I = db["Infected"][0]
R = db["Recovered"][0]
S = db["Susceptibles"][0]
DD = db["Deaths"][0]
i0 = 0.01*I.values[0]
s0 = 0.01*S.values[0]
r0 = 0.01*R.values[0]
d0 = 0.01*DD.values[0]
T = len(I)
# Calibrate model
x = x0 = [pi,pr,pd]
#x = calibrate(x0)
tau,s,i,r,dd,pop = Virus(x)
index = pandas.date_range(start=beta.index[0],periods=N,freq="D")
tau = 100*pandas.Series(tau,index,dtype=float)
s = 100*pandas.Series(s,index,dtype=float)
i = 100*pandas.Series(i,index,dtype=float)
r = 100*pandas.Series(r,index,dtype=float)
dd = 100*pandas.Series(dd,index,dtype=float)
pop = 100*pandas.Series(pop,index,dtype=float)
if Standard:
m = {"Infected":[i,I],"Deaths":[dd,DD]}
legend=["SIR Model","Data"]
else:
v = 100*pandas.Series(v,index,dtype=float)
iw = 100*pandas.Series(iw,index,dtype=float)
iv = 100*pandas.Series(iv,index,dtype=float)
ivr = 100*pandas.Series(ivr,index,dtype=float)
m = {"Infected":[i,I,iw,iv],"Deaths":[dd,DD]}
legend=["Forecast","Data"]
plot_data(m,country,fname=country+".png",legend=legend,sizes=[2,1])
print(f"Strain #1, Ro = {pi/(pr+pd):.2f}")
print(f"Strain #2, Ro = {pi2/(pr+pd2):.2f}")