Source code for utility_modules.function_dict

#!/usr/bin/env python
import pandas as pd
import numpy as np
from policy_modules.age_rating import Exchange as Exchange
import pdb

[docs]def min_value(x, y): x_mtx = np.matrix(x) y_mtx = np.matrix(y) if x_mtx.shape[0] != y_mtx.shape[0]: if x_mtx.shape[0] == y_mtx.T.shape[0]: y_mtx = y_mtx.T else: raise assertionerror greater_idx = np.where(x_mtx > y_mtx) rtn_mtx = y_mtx rtn_mtx[greater_idx] = x_mtx[greater_idx] return rtn_mtx
[docs]def assert_type(type_): if type_ not in ['self', 'sp', 'ch', 'fam']: raise AssertionError
[docs]def firm_ids(choices, loc): firm_cols = ['p0_firm_id','p1_firm_id','p4_firm_id', 'p5_firm_id','p6_firm_id','p7_firm_id'] firm_lst = np.matrix(choices[firm_cols])[loc].T if np.in1d(0, firm_lst): raise AssertionError else: return firm_lst
[docs]def plan_ids(choices, loc): plan_cols = ['p0_plan_id','p1_plan_id','p4_plan_id', 'p5_plan_id','p6_plan_id','p7_plan_id'] plan_lst = np.matrix(choices[plan_cols])[loc].T # if np.in1d(0, plan_lst): # raise AssertionError #else: return plan_lst
[docs]def historical_prems(choices, loc, esi0, type_): assert_type(type_) temp_df = pd.DataFrame(firm_ids(choices, loc)) temp_df.columns = ['firm_id'] temp_df = pd.merge(temp_df, esi0, on = 'firm_id', how = 'left') if temp_df.shape[0] != len(loc[0]): raise AssertionError else: return temp_df['premium_' + type_ + '_actual'].reshape(len(loc[0]), 1) ### actual premium, not incremental
[docs]def curr_av(choices, loc, esi): temp_df = pd.DataFrame(plan_ids(choices, loc)) temp_df.columns = ['plan_id'] temp_df = pd.merge(temp_df, esi, on = 'plan_id', how = 'left') if temp_df.shape[0] != len(loc[0]): raise AssertionError else: return temp_df['av_value'].reshape(len(loc[0]), 1)
[docs]def curr_prems(choices, loc, esi, type_): assert_type(type_) temp_df = pd.DataFrame(plan_ids(choices, loc)) temp_df.columns = ['plan_id'] temp_df = pd.merge(temp_df, esi, on = 'plan_id', how = 'left') if temp_df.shape[0] != len(loc[0]): raise AssertionError else: return temp_df['premium_' + type_ +'_actual'].reshape(len(loc[0]), 1) ### using the actual premium
### rather than incremental premium ### Xiao to double check
[docs]def curr_contr(choices, loc, esi, type_): assert_type(type_) temp_df = pd.DataFrame(plan_ids(choices, loc)) temp_df.columns = ['plan_id'] temp_df = pd.merge(temp_df, esi, on = 'plan_id', how = 'left') if temp_df.shape[0] != len(loc[0]): raise AssertionError else: return temp_df['contr_p_' + type_].reshape(len(loc[0]), 1)
[docs]def family_taxes(choices, loc, hieu, family, FedTax): fam_df = pd.DataFrame(FedTax.calculate_marginal_rate(family)) fam_df.columns = ['tax_rate'] fam_df['family_id'] = family['family_id'] temp_df = pd.DataFrame(choices.hieu_id[loc[0]]) temp_df.columns = ['hieu_id'] temp_df = pd.merge(temp_df, hieu[['hieu_id', 'family_id']].drop_duplicates(), on = 'hieu_id', how = 'left') temp_df = pd.merge(temp_df, fam_df[['family_id', 'tax_rate']], on = 'family_id', how = 'left') if temp_df.shape[0] != len(loc[0]): raise AssertionError else: return 1 - temp_df['tax_rate'].reshape(len(loc[0]), 1)
[docs]def firm_taxes(choices, loc, firm): temp_df = pd.DataFrame(firm_ids(choices, loc)) temp_df.columns = ['firm_id'] temp_df = pd.merge(temp_df, firm, on = 'firm_id', how = 'left') if temp_df.shape[0] != len(loc[0]): raise AssertionError else: #changed pACA_tax_rate to marg_tax_rate return temp_df['marg_tax_rate'].reshape(len(loc[0]), 1)
[docs]def rtn_ex_prems(choices, loc, type_, col='silver_premium_'): # summing up total exchange premiums if type_ == 'self': cols = [0,1,4,5,6,7] cols = [col + str(i) for i in cols] ex_prems = np.matrix(choices[cols])[loc] elif type_ == 'sp': cols = [0,1] cols = [col + str(i) for i in cols] temp_loc = (~loc[1].astype('bool')).astype('int') ex_prems = np.matrix(choices[cols])[loc] elif type_ == 'ch': cols = [2,3] cols = [col + str(i) for i in cols] ex_prems = np.matrix(choices[cols]).sum(axis=1)[loc[0]] elif type_ == 'fam': cols = [0,1,2,3,4,5,6,7] cols = [col + str(i) for i in cols] ex_prems = np.matrix(choices[cols]).sum(axis=1)[loc[0]] else: raise AssertionError ex_prems = ex_prems.reshape(len(loc[0]), 1) if ex_prems.shape[0] != len(loc[0]): raise AssertionError("dimensions don't match") elif np.isnan(ex_prems).sum(): raise AssertionError("all missings") else: return ex_prems
[docs]def determine_n_kids(choices, loc): adult_plan = np.matrix(choices[['p0_plan_id', 'p1_plan_id']])[loc] non_plans = adult_plan != 0 cols = ['p%d_plan_id' % i for i in range(2, 8)] kid_mtx = np.multiply((np.matrix(choices[cols])[loc[0]] == adult_plan.T), non_plans.T) if kid_mtx.shape[0] != len(loc[0]): raise AssertionError else: return kid_mtx.sum(axis = 1).reshape(len(loc[0]), 1)
[docs]def populate_fun_dict(): func_dict = {} # on family plan (pre), on single def f_1(choices, loc, esi = None, esi0 = None, hieu = None, family = None, firm = None, FedTax = None): type_ = 'self' #p0 = historical_prems(choices, loc, esi0, type_) # using current premium p0 = curr_prems(choices, loc, esi, 'self') c = curr_contr(choices, loc, esi, type_) #Tfam = family_taxes(choices, loc, hieu, family, FedTax) #t = p0 * c * Tfam #Tfam = family_taxes(choices, loc, hieu, family, FedTax) #t = p0 * c #esi_premium = p0 * c * Tfam esi_premium = p0 * c nrows = esi_premium.shape[0] xc_part = np.zeros((nrows, 1)) return [esi_premium, xc_part] func_dict[1] = f_1 # on family plan (pre), on single + spouse def f_2(choices, loc, esi = None, esi0 = None, hieu = None, family = None, firm = None, FedTax = None): type_ = 'sp' #ps0 = historical_prems(choices, loc, esi0, type_) # using current premium ps0 = curr_prems(choices, loc, esi, 'sp') c = curr_contr(choices, loc, esi, type_) Tfam = family_taxes(choices, loc, hieu, family, FedTax) #esi_premium = ps0 * c * Tfam esi_premium = ps0 * c nrows = esi_premium.shape[0] wage_part = np.zeros((nrows, 1)) return [esi_premium, wage_part] func_dict[2] = f_2 # on family plan (pre), on single + children def f_3(choices, loc, esi = None, esi0 = None, hieu = None, family = None, firm = None, FedTax = None): type_ = 'ch' #pc0 = historical_prems(choices, loc, esi0, type_) # using current premium pc0 = curr_prems(choices, loc, esi, 'ch') c = curr_contr(choices, loc, esi, type_) Tfam = family_taxes(choices, loc, hieu, family, FedTax) #esi_premium = pc0 * c * Tfam esi_premium = pc0 * c nrows = esi_premium.shape[0] wage_part = np.zeros((nrows, 1)) return [esi_premium, wage_part] func_dict[3] = f_3 # on family plan (pre), on single + spouse + children def f_4(choices, loc, esi = None, esi0 = None, hieu = None, family = None, firm = None, FedTax = None): type_ = 'fam' #pf0 = historical_prems(choices, loc, esi0, type_) # using current premium pf0 = curr_prems(choices, loc, esi, 'fam') c = curr_contr(choices, loc, esi, type_) Tfam = family_taxes(choices, loc, hieu, family, FedTax) #esi_premium = pf0 * c * Tfam esi_premium = pf0 * c nrows = esi_premium.shape[0] wage_part = np.zeros((nrows, 1)) return [esi_premium, wage_part] func_dict[4] = f_4 # on single (pre), on single def f_5(choices, loc, esi = None, esi0 = None, hieu = None, family = None, firm = None, FedTax = None): type_ = 'self' #p0 = historical_prems(choices, loc, esi0, type_) # using current premium p0 = curr_prems(choices, loc, esi, 'self') c = curr_contr(choices, loc, esi, type_) Tfam = family_taxes(choices, loc, hieu, family, FedTax) #esi_premium = p0 * c * Tfam esi_premium = p0 * c nrows = esi_premium.shape[0] wage_part = np.zeros((nrows, 1)) return [esi_premium, wage_part] func_dict[5] = f_5 # on single (pre), on single + spouse def f_6(choices, loc, esi = None, esi0 = None, hieu = None, family = None, firm = None, FedTax = None): #p0 = historical_prems(choices, loc, esi0, 'self') # using current premium p0 = curr_prems(choices, loc, esi, 'self') c = curr_contr(choices, loc, esi, 'self') Tfam = family_taxes(choices, loc, hieu, family, FedTax) Tfirm = firm_taxes(choices, loc, firm) psi = curr_prems(choices, loc, esi, 'sp') cs = curr_contr(choices, loc, esi, 'sp') alpha = Tfam / Tfirm psxc = rtn_ex_prems(choices, loc, 'sp') u = psxc + p0 * c * Tfam v = (psi - p0 * (1 - c)) * alpha esi_premium = psi * cs temp = np.minimum(u, v) wage_part = temp - esi_premium wage_part = np.maximum(0,wage_part) return [esi_premium, wage_part] func_dict[6] = f_6 # on single (pre), on single + children def f_7(choices, loc, esi = None, esi0 = None, hieu = None, family = None, firm = None, FedTax = None): #p0 = historical_prems(choices, loc, esi0, 'self') # using current premium p0 = curr_prems(choices, loc, esi, 'self') c = curr_contr(choices, loc, esi, 'self') Tfam = family_taxes(choices, loc, hieu, family, FedTax) Tfirm = firm_taxes(choices, loc, firm) pci = curr_prems(choices, loc, esi, 'ch') cc = curr_contr(choices, loc, esi, 'ch') alpha = Tfam / Tfirm n_kids = determine_n_kids(choices, loc) pcxc = rtn_ex_prems(choices, loc, 'ch') u = pcxc + p0 * c * Tfam v = (pci - p0 * (1 - c)) * alpha esi_premium = pci * cc temp = np.minimum(u, v) wage_part = temp - esi_premium wage_part = np.maximum(0,wage_part) return [esi_premium, wage_part] func_dict[7] = f_7 # on single (pre), on family plan def f_8(choices, loc, esi = None, esi0 = None, hieu = None, family = None, firm = None, FedTax = None): #p0 = historical_prems(choices, loc, esi0, 'self') # using current premium p0 = curr_prems(choices, loc, esi, 'self') c = curr_contr(choices, loc, esi, 'self') Tfam = family_taxes(choices, loc, hieu, family, FedTax) Tfirm = firm_taxes(choices, loc, firm) pfi = curr_prems(choices, loc, esi, 'fam') cf = curr_contr(choices, loc, esi, 'fam') alpha = Tfam / Tfirm n_kids = determine_n_kids(choices, loc) pfxc = rtn_ex_prems(choices, loc, 'fam') u = pfxc + p0 * c * Tfam v = (pfi - p0 * (1 - c)) * alpha esi_premium = pfi * cf temp = np.minimum(u, v) wage_part = temp - esi_premium wage_part = np.maximum(0,wage_part) return [esi_premium, wage_part] func_dict[8] = f_8 # no plan (pre), on single plan def f_9(choices, loc, esi = None, esi0 = None, hieu = None, family = None, firm = None, FedTax = None): c = curr_contr(choices, loc, esi, 'self') Tfam = family_taxes(choices, loc, hieu, family, FedTax) Tfirm = firm_taxes(choices, loc, firm) pi = curr_prems(choices, loc, esi, 'self') psi = curr_prems(choices, loc, esi, 'sp') alpha = Tfam / Tfirm pxc = rtn_ex_prems(choices, loc, 'self') u = pxc v = pi * c * Tfam + psi * (1 - c) * alpha esi_premium = pi * c temp = np.minimum(u, v) wage_part = temp - esi_premium wage_part = np.maximum(0,wage_part) return [esi_premium, wage_part] func_dict[9] = f_9 # no plan (pre), on single + spouse plan def f_10(choices, loc, esi = None, esi0 = None, hieu = None, family = None, firm = None, FedTax = None): Tfam = family_taxes(choices, loc, hieu, family, FedTax) Tfirm = firm_taxes(choices, loc, firm) psi = curr_prems(choices, loc, esi, 'sp') cs = curr_contr(choices, loc, esi, 'sp') alpha = Tfam / Tfirm pxc = rtn_ex_prems(choices, loc, 'self') psxc = rtn_ex_prems(choices, loc, 'sp') u = pxc + psxc v = psi * cs * Tfam + psi * (1 - cs) * alpha esi_premium = psi * cs temp = np.minimum(u, v) wage_part = temp - esi_premium wage_part = np.maximum(0,wage_part) return [esi_premium, wage_part] func_dict[10] = f_10 # no plan (pre), on single + children plan def f_11(choices, loc, esi = None, esi0 = None, hieu = None, family = None, firm = None, FedTax = None): Tfam = family_taxes(choices, loc, hieu, family, FedTax) Tfirm = firm_taxes(choices, loc, firm) pci = curr_prems(choices, loc, esi, 'ch') cc = curr_contr(choices, loc, esi, 'ch') alpha = Tfam / Tfam n_kids = determine_n_kids(choices, loc) pxc = rtn_ex_prems(choices, loc, 'self') pcxc = rtn_ex_prems(choices, loc, 'ch') u = pxc + pcxc v = pci * cc * Tfam + pci * (1 - cc) * alpha esi_premium = pci * cc temp = np.minimum(u, v) wage_part = temp - esi_premium wage_part = np.maximum(0,wage_part) return [esi_premium, wage_part] func_dict[11] = f_11 # no plan (pre), on family plan def f_12(choices, loc, esi = None, esi0 = None, hieu = None, family = None, firm = None, FedTax = None): Tfam = family_taxes(choices, loc, hieu, family, FedTax) Tfirm = firm_taxes(choices, loc, firm) pfi = curr_prems(choices, loc, esi, 'fam') cf = curr_contr(choices, loc, esi, 'fam') alpha = Tfam / Tfirm n_kids = determine_n_kids(choices, loc) pfxc = rtn_ex_prems(choices, loc, 'fam') u = pfxc v = pfi * cf * Tfam + pfi * (1 - cf) * alpha esi_premium = pfi * cf temp = np.minimum(u, v) wage_part = temp - esi_premium wage_part = np.maximum(0,wage_part) return [esi_premium, wage_part] func_dict[12] = f_12 # on single plan (cobra), on single def f_13(choices, loc, esi = None, esi0 = None, hieu = None, family = None, firm = None, FedTax = None): type_ = 'self' p0 = curr_prems(choices, loc, esi, 'self') esi_premium = p0 * 1.02 nrows = esi_premium.shape[0] xc_part = np.zeros((nrows, 1)) return [esi_premium, xc_part] func_dict[13] = f_13 # on family plan (cobra), on family def f_14(choices, loc, esi = None, esi0 = None, hieu = None, family = None, firm = None, FedTax = None): type_ = 'fam' p0 = curr_prems(choices, loc, esi, 'fam') esi_premium = p0 * 1.02 nrows = esi_premium.shape[0] xc_part = np.zeros((nrows, 1)) return [esi_premium, xc_part] func_dict[14] = f_14 return func_dict