#!/usr/bin/env python
import pandas as pd
import numpy as np
from policy_modules.age_rating import Exchange as Exchange
import pdb
[docs]def min_value(x, y):
x_mtx = np.matrix(x)
y_mtx = np.matrix(y)
if x_mtx.shape[0] != y_mtx.shape[0]:
if x_mtx.shape[0] == y_mtx.T.shape[0]:
y_mtx = y_mtx.T
else:
raise assertionerror
greater_idx = np.where(x_mtx > y_mtx)
rtn_mtx = y_mtx
rtn_mtx[greater_idx] = x_mtx[greater_idx]
return rtn_mtx
[docs]def assert_type(type_):
if type_ not in ['self', 'sp', 'ch', 'fam']:
raise AssertionError
[docs]def firm_ids(choices, loc):
firm_cols = ['p0_firm_id','p1_firm_id','p4_firm_id',
'p5_firm_id','p6_firm_id','p7_firm_id']
firm_lst = np.matrix(choices[firm_cols])[loc].T
if np.in1d(0, firm_lst):
raise AssertionError
else:
return firm_lst
[docs]def plan_ids(choices, loc):
plan_cols = ['p0_plan_id','p1_plan_id','p4_plan_id',
'p5_plan_id','p6_plan_id','p7_plan_id']
plan_lst = np.matrix(choices[plan_cols])[loc].T
# if np.in1d(0, plan_lst):
# raise AssertionError
#else:
return plan_lst
[docs]def historical_prems(choices, loc, esi0, type_):
assert_type(type_)
temp_df = pd.DataFrame(firm_ids(choices, loc))
temp_df.columns = ['firm_id']
temp_df = pd.merge(temp_df, esi0, on = 'firm_id', how = 'left')
if temp_df.shape[0] != len(loc[0]):
raise AssertionError
else:
return temp_df['premium_' + type_ + '_actual'].reshape(len(loc[0]), 1) ### actual premium, not incremental
[docs]def curr_av(choices, loc, esi):
temp_df = pd.DataFrame(plan_ids(choices, loc))
temp_df.columns = ['plan_id']
temp_df = pd.merge(temp_df, esi, on = 'plan_id', how = 'left')
if temp_df.shape[0] != len(loc[0]):
raise AssertionError
else:
return temp_df['av_value'].reshape(len(loc[0]), 1)
[docs]def curr_prems(choices, loc, esi, type_):
assert_type(type_)
temp_df = pd.DataFrame(plan_ids(choices, loc))
temp_df.columns = ['plan_id']
temp_df = pd.merge(temp_df, esi, on = 'plan_id', how = 'left')
if temp_df.shape[0] != len(loc[0]):
raise AssertionError
else:
return temp_df['premium_' + type_ +'_actual'].reshape(len(loc[0]), 1) ### using the actual premium
### rather than incremental premium
### Xiao to double check
[docs]def curr_contr(choices, loc, esi, type_):
assert_type(type_)
temp_df = pd.DataFrame(plan_ids(choices, loc))
temp_df.columns = ['plan_id']
temp_df = pd.merge(temp_df, esi, on = 'plan_id', how = 'left')
if temp_df.shape[0] != len(loc[0]):
raise AssertionError
else:
return temp_df['contr_p_' + type_].reshape(len(loc[0]), 1)
[docs]def family_taxes(choices, loc, hieu, family, FedTax):
fam_df = pd.DataFrame(FedTax.calculate_marginal_rate(family))
fam_df.columns = ['tax_rate']
fam_df['family_id'] = family['family_id']
temp_df = pd.DataFrame(choices.hieu_id[loc[0]])
temp_df.columns = ['hieu_id']
temp_df = pd.merge(temp_df, hieu[['hieu_id', 'family_id']].drop_duplicates(), on = 'hieu_id', how = 'left')
temp_df = pd.merge(temp_df, fam_df[['family_id', 'tax_rate']], on = 'family_id', how = 'left')
if temp_df.shape[0] != len(loc[0]):
raise AssertionError
else:
return 1 - temp_df['tax_rate'].reshape(len(loc[0]), 1)
[docs]def firm_taxes(choices, loc, firm):
temp_df = pd.DataFrame(firm_ids(choices, loc))
temp_df.columns = ['firm_id']
temp_df = pd.merge(temp_df, firm, on = 'firm_id', how = 'left')
if temp_df.shape[0] != len(loc[0]):
raise AssertionError
else:
#changed pACA_tax_rate to marg_tax_rate
return temp_df['marg_tax_rate'].reshape(len(loc[0]), 1)
[docs]def rtn_ex_prems(choices, loc, type_, col='silver_premium_'):
# summing up total exchange premiums
if type_ == 'self':
cols = [0,1,4,5,6,7]
cols = [col + str(i) for i in cols]
ex_prems = np.matrix(choices[cols])[loc]
elif type_ == 'sp':
cols = [0,1]
cols = [col + str(i) for i in cols]
temp_loc = (~loc[1].astype('bool')).astype('int')
ex_prems = np.matrix(choices[cols])[loc]
elif type_ == 'ch':
cols = [2,3]
cols = [col + str(i) for i in cols]
ex_prems = np.matrix(choices[cols]).sum(axis=1)[loc[0]]
elif type_ == 'fam':
cols = [0,1,2,3,4,5,6,7]
cols = [col + str(i) for i in cols]
ex_prems = np.matrix(choices[cols]).sum(axis=1)[loc[0]]
else:
raise AssertionError
ex_prems = ex_prems.reshape(len(loc[0]), 1)
if ex_prems.shape[0] != len(loc[0]):
raise AssertionError("dimensions don't match")
elif np.isnan(ex_prems).sum():
raise AssertionError("all missings")
else:
return ex_prems
[docs]def determine_n_kids(choices, loc):
adult_plan = np.matrix(choices[['p0_plan_id', 'p1_plan_id']])[loc]
non_plans = adult_plan != 0
cols = ['p%d_plan_id' % i for i in range(2, 8)]
kid_mtx = np.multiply((np.matrix(choices[cols])[loc[0]] == adult_plan.T), non_plans.T)
if kid_mtx.shape[0] != len(loc[0]):
raise AssertionError
else:
return kid_mtx.sum(axis = 1).reshape(len(loc[0]), 1)
[docs]def populate_fun_dict():
func_dict = {}
# on family plan (pre), on single
def f_1(choices, loc, esi = None, esi0 = None, hieu = None, family = None, firm = None, FedTax = None):
type_ = 'self'
#p0 = historical_prems(choices, loc, esi0, type_)
# using current premium
p0 = curr_prems(choices, loc, esi, 'self')
c = curr_contr(choices, loc, esi, type_)
#Tfam = family_taxes(choices, loc, hieu, family, FedTax)
#t = p0 * c * Tfam
#Tfam = family_taxes(choices, loc, hieu, family, FedTax)
#t = p0 * c
#esi_premium = p0 * c * Tfam
esi_premium = p0 * c
nrows = esi_premium.shape[0]
xc_part = np.zeros((nrows, 1))
return [esi_premium, xc_part]
func_dict[1] = f_1
# on family plan (pre), on single + spouse
def f_2(choices, loc, esi = None, esi0 = None, hieu = None, family = None, firm = None, FedTax = None):
type_ = 'sp'
#ps0 = historical_prems(choices, loc, esi0, type_)
# using current premium
ps0 = curr_prems(choices, loc, esi, 'sp')
c = curr_contr(choices, loc, esi, type_)
Tfam = family_taxes(choices, loc, hieu, family, FedTax)
#esi_premium = ps0 * c * Tfam
esi_premium = ps0 * c
nrows = esi_premium.shape[0]
wage_part = np.zeros((nrows, 1))
return [esi_premium, wage_part]
func_dict[2] = f_2
# on family plan (pre), on single + children
def f_3(choices, loc, esi = None, esi0 = None, hieu = None, family = None, firm = None, FedTax = None):
type_ = 'ch'
#pc0 = historical_prems(choices, loc, esi0, type_)
# using current premium
pc0 = curr_prems(choices, loc, esi, 'ch')
c = curr_contr(choices, loc, esi, type_)
Tfam = family_taxes(choices, loc, hieu, family, FedTax)
#esi_premium = pc0 * c * Tfam
esi_premium = pc0 * c
nrows = esi_premium.shape[0]
wage_part = np.zeros((nrows, 1))
return [esi_premium, wage_part]
func_dict[3] = f_3
# on family plan (pre), on single + spouse + children
def f_4(choices, loc, esi = None, esi0 = None, hieu = None, family = None, firm = None, FedTax = None):
type_ = 'fam'
#pf0 = historical_prems(choices, loc, esi0, type_)
# using current premium
pf0 = curr_prems(choices, loc, esi, 'fam')
c = curr_contr(choices, loc, esi, type_)
Tfam = family_taxes(choices, loc, hieu, family, FedTax)
#esi_premium = pf0 * c * Tfam
esi_premium = pf0 * c
nrows = esi_premium.shape[0]
wage_part = np.zeros((nrows, 1))
return [esi_premium, wage_part]
func_dict[4] = f_4
# on single (pre), on single
def f_5(choices, loc, esi = None, esi0 = None, hieu = None, family = None, firm = None, FedTax = None):
type_ = 'self'
#p0 = historical_prems(choices, loc, esi0, type_)
# using current premium
p0 = curr_prems(choices, loc, esi, 'self')
c = curr_contr(choices, loc, esi, type_)
Tfam = family_taxes(choices, loc, hieu, family, FedTax)
#esi_premium = p0 * c * Tfam
esi_premium = p0 * c
nrows = esi_premium.shape[0]
wage_part = np.zeros((nrows, 1))
return [esi_premium, wage_part]
func_dict[5] = f_5
# on single (pre), on single + spouse
def f_6(choices, loc, esi = None, esi0 = None, hieu = None, family = None, firm = None, FedTax = None):
#p0 = historical_prems(choices, loc, esi0, 'self')
# using current premium
p0 = curr_prems(choices, loc, esi, 'self')
c = curr_contr(choices, loc, esi, 'self')
Tfam = family_taxes(choices, loc, hieu, family, FedTax)
Tfirm = firm_taxes(choices, loc, firm)
psi = curr_prems(choices, loc, esi, 'sp')
cs = curr_contr(choices, loc, esi, 'sp')
alpha = Tfam / Tfirm
psxc = rtn_ex_prems(choices, loc, 'sp')
u = psxc + p0 * c * Tfam
v = (psi - p0 * (1 - c)) * alpha
esi_premium = psi * cs
temp = np.minimum(u, v)
wage_part = temp - esi_premium
wage_part = np.maximum(0,wage_part)
return [esi_premium, wage_part]
func_dict[6] = f_6
# on single (pre), on single + children
def f_7(choices, loc, esi = None, esi0 = None, hieu = None, family = None, firm = None, FedTax = None):
#p0 = historical_prems(choices, loc, esi0, 'self')
# using current premium
p0 = curr_prems(choices, loc, esi, 'self')
c = curr_contr(choices, loc, esi, 'self')
Tfam = family_taxes(choices, loc, hieu, family, FedTax)
Tfirm = firm_taxes(choices, loc, firm)
pci = curr_prems(choices, loc, esi, 'ch')
cc = curr_contr(choices, loc, esi, 'ch')
alpha = Tfam / Tfirm
n_kids = determine_n_kids(choices, loc)
pcxc = rtn_ex_prems(choices, loc, 'ch')
u = pcxc + p0 * c * Tfam
v = (pci - p0 * (1 - c)) * alpha
esi_premium = pci * cc
temp = np.minimum(u, v)
wage_part = temp - esi_premium
wage_part = np.maximum(0,wage_part)
return [esi_premium, wage_part]
func_dict[7] = f_7
# on single (pre), on family plan
def f_8(choices, loc, esi = None, esi0 = None, hieu = None, family = None, firm = None, FedTax = None):
#p0 = historical_prems(choices, loc, esi0, 'self')
# using current premium
p0 = curr_prems(choices, loc, esi, 'self')
c = curr_contr(choices, loc, esi, 'self')
Tfam = family_taxes(choices, loc, hieu, family, FedTax)
Tfirm = firm_taxes(choices, loc, firm)
pfi = curr_prems(choices, loc, esi, 'fam')
cf = curr_contr(choices, loc, esi, 'fam')
alpha = Tfam / Tfirm
n_kids = determine_n_kids(choices, loc)
pfxc = rtn_ex_prems(choices, loc, 'fam')
u = pfxc + p0 * c * Tfam
v = (pfi - p0 * (1 - c)) * alpha
esi_premium = pfi * cf
temp = np.minimum(u, v)
wage_part = temp - esi_premium
wage_part = np.maximum(0,wage_part)
return [esi_premium, wage_part]
func_dict[8] = f_8
# no plan (pre), on single plan
def f_9(choices, loc, esi = None, esi0 = None, hieu = None, family = None, firm = None, FedTax = None):
c = curr_contr(choices, loc, esi, 'self')
Tfam = family_taxes(choices, loc, hieu, family, FedTax)
Tfirm = firm_taxes(choices, loc, firm)
pi = curr_prems(choices, loc, esi, 'self')
psi = curr_prems(choices, loc, esi, 'sp')
alpha = Tfam / Tfirm
pxc = rtn_ex_prems(choices, loc, 'self')
u = pxc
v = pi * c * Tfam + psi * (1 - c) * alpha
esi_premium = pi * c
temp = np.minimum(u, v)
wage_part = temp - esi_premium
wage_part = np.maximum(0,wage_part)
return [esi_premium, wage_part]
func_dict[9] = f_9
# no plan (pre), on single + spouse plan
def f_10(choices, loc, esi = None, esi0 = None, hieu = None, family = None, firm = None, FedTax = None):
Tfam = family_taxes(choices, loc, hieu, family, FedTax)
Tfirm = firm_taxes(choices, loc, firm)
psi = curr_prems(choices, loc, esi, 'sp')
cs = curr_contr(choices, loc, esi, 'sp')
alpha = Tfam / Tfirm
pxc = rtn_ex_prems(choices, loc, 'self')
psxc = rtn_ex_prems(choices, loc, 'sp')
u = pxc + psxc
v = psi * cs * Tfam + psi * (1 - cs) * alpha
esi_premium = psi * cs
temp = np.minimum(u, v)
wage_part = temp - esi_premium
wage_part = np.maximum(0,wage_part)
return [esi_premium, wage_part]
func_dict[10] = f_10
# no plan (pre), on single + children plan
def f_11(choices, loc, esi = None, esi0 = None, hieu = None, family = None, firm = None, FedTax = None):
Tfam = family_taxes(choices, loc, hieu, family, FedTax)
Tfirm = firm_taxes(choices, loc, firm)
pci = curr_prems(choices, loc, esi, 'ch')
cc = curr_contr(choices, loc, esi, 'ch')
alpha = Tfam / Tfam
n_kids = determine_n_kids(choices, loc)
pxc = rtn_ex_prems(choices, loc, 'self')
pcxc = rtn_ex_prems(choices, loc, 'ch')
u = pxc + pcxc
v = pci * cc * Tfam + pci * (1 - cc) * alpha
esi_premium = pci * cc
temp = np.minimum(u, v)
wage_part = temp - esi_premium
wage_part = np.maximum(0,wage_part)
return [esi_premium, wage_part]
func_dict[11] = f_11
# no plan (pre), on family plan
def f_12(choices, loc, esi = None, esi0 = None, hieu = None, family = None, firm = None, FedTax = None):
Tfam = family_taxes(choices, loc, hieu, family, FedTax)
Tfirm = firm_taxes(choices, loc, firm)
pfi = curr_prems(choices, loc, esi, 'fam')
cf = curr_contr(choices, loc, esi, 'fam')
alpha = Tfam / Tfirm
n_kids = determine_n_kids(choices, loc)
pfxc = rtn_ex_prems(choices, loc, 'fam')
u = pfxc
v = pfi * cf * Tfam + pfi * (1 - cf) * alpha
esi_premium = pfi * cf
temp = np.minimum(u, v)
wage_part = temp - esi_premium
wage_part = np.maximum(0,wage_part)
return [esi_premium, wage_part]
func_dict[12] = f_12
# on single plan (cobra), on single
def f_13(choices, loc, esi = None, esi0 = None, hieu = None, family = None, firm = None, FedTax = None):
type_ = 'self'
p0 = curr_prems(choices, loc, esi, 'self')
esi_premium = p0 * 1.02
nrows = esi_premium.shape[0]
xc_part = np.zeros((nrows, 1))
return [esi_premium, xc_part]
func_dict[13] = f_13
# on family plan (cobra), on family
def f_14(choices, loc, esi = None, esi0 = None, hieu = None, family = None, firm = None, FedTax = None):
type_ = 'fam'
p0 = curr_prems(choices, loc, esi, 'fam')
esi_premium = p0 * 1.02
nrows = esi_premium.shape[0]
xc_part = np.zeros((nrows, 1))
return [esi_premium, xc_part]
func_dict[14] = f_14
return func_dict