#!/usr/bin/env python
import numpy as np
import pandas as pd
from utility_modules.get_parameters import get_params
import pdb
[docs]class UninsuredPenalty:
# Borg simpleton config object
__shared_state = {}
def __init__(self, policy_dict = None):
# implement borg pattern
self.__dict__ = self.__shared_state
if policy_dict != None:
self.pnlty_unin_fam = get_params(policy_dict['pnlty_unin_f'])
self.pnlty_unin_ind = get_params(policy_dict['pnlty_unin_ind'])
self.tax_thres = get_params(policy_dict['tax_thres'])
[docs] def set_year(self, curr_year):
"""
return slice of pertinent year
allows this to be reset at begining of each simulation year
"""
self.year = curr_year
if int(curr_year) in self.pnlty_unin_fam.year.values:
self.pnlty_unin_fam_use = self.pnlty_unin_fam.loc[self.pnlty_unin_fam.year == curr_year]
self.pnlty_unin_ind_use = self.pnlty_unin_ind.loc[self.pnlty_unin_ind.year == curr_year]
else:
raise ValueError("pnlty_unin_f sheet does not contain information on year under simulation")
self.tax_yr_thres = self.tax_thres.loc[self.tax_thres.year == curr_year]
[docs] def cheapest_coverage(self, exp_choices, pop_df):
"""
input: choice table with esi premium and exchange premium calculated
pop_df with fam_income
return: hieu_level information on if a family has affordable plan
the function determines based on list of criteria those exempt from paying penalty
self.pnlt_unin_fam_use.max_pp is the lowest cost bronze plan
"""
people = ['adult_1', 'adult_2', 'child_1', 'child_2', 'adult_child_1', 'adult_child_2', \
'adult_child_3','adult_child_4']
people_indices = [0, 1, 2, 3, 4, 5, 6, 7]
# step 1: subset to rows where nobody is uninsured and find the least expensive cost
for person in people:
exp_choices = exp_choices[exp_choices[person] != 1]
exp_choices.reset_index(inplace=True)
#substract premium from tiac
for index, person in enumerate(people):
if index > 3:
loc_tiac = np.where(exp_choices['tax_dependent_' + str(index)] == 0)[0]
exp_choices.loc[loc_tiac, 'total_premiums']=exp_choices.loc[loc_tiac,'total_premiums']-exp_choices.loc[loc_tiac,'xc_prem_'+str(index)]-exp_choices.loc[loc_tiac,'esi_premium_'+str(index)]
#create a long table for TIAC student
to_melt=['p_id','xc_prem','esi_premium_','tax_dependent_']
wide_data=self._wide_to_long(exp_choices,to_melt)
wide_data['total_premiums_individual']=wide_data['xc_prem']+wide_data['esi_premium']
loc_ind_adult_child = np.where((wide_data['person_type'] > 3) & (wide_data['tax_dependent'] == 0))[0]
# step 2: calculate the minimum of total premiums to cover everyone,first family exclude TIAC, Then calculate individually for TIAC
family_df =exp_choices.groupby('hieu_id')['total_premiums'].min().reset_index()
tiac_df=wide_data.loc[loc_ind_adult_child].groupby('person_id')['total_premiums_individual'].min().reset_index()
pop_df = pd.merge(pop_df, family_df, on='hieu_id', how='left')
pop_df = pd.merge(pop_df, tiac_df, on='person_id', how='left')
loc_ind_adult_child = np.where((pop_df['person_type'] > 3) & (pop_df['tax_dependent'] == 0))[0]
pop_df.loc[loc_ind_adult_child,'total_premiums']= pop_df.loc[loc_ind_adult_child,'total_premiums_individual']
# step 3 comparing with family income
pop_df['exemption'] = False
pop_df['ind_exemption']=False
tax_threshold = (pop_df['tax_filing_status']=='single') & (pop_df['fam_income'] < self.tax_yr_thres['single'].iloc[0]) \
+ (pop_df['tax_filing_status']=='joint') & (pop_df['fam_income'] < self.tax_yr_thres['joint'].iloc[0]) \
+ (pop_df['tax_filing_status']=='hhead') & (pop_df['fam_income'] < self.tax_yr_thres['hhldhead'].iloc[0])
#for tax_independent children
ind_tax_threshold = (pop_df['tax_filing_status'] == 'single') & (pop_df['ind_income'] < self.tax_yr_thres['single'].iloc[0]) \
+ (pop_df['tax_filing_status'] == 'joint') & (pop_df['ind_income'] <self.tax_yr_thres['joint'].iloc[0]) \
+ (pop_df['tax_filing_status'] == 'hhead') & (pop_df['ind_income'] < self.tax_yr_thres['hhldhead'].iloc[0])
# for esi policy holders and their families
premium_threshold = pop_df['total_premiums'] > (self.pnlty_unin_fam_use.exemption.iloc[0] * pop_df['fam_income'])
# for tax_independent children
ind_premium_threshold = pop_df['total_premiums'] > (self.pnlty_unin_fam_use.exemption.iloc[0] * pop_df['ind_income'])
# non esi offer individuals # TODO: need to subset to non-esi offer individuals
max_pp_threshold = self.pnlty_unin_fam_use.max_pp.iloc[0] > (self.pnlty_unin_fam_use.exemption.iloc[0] * pop_df['fam_income'])
ind_max_pp_threshold = self.pnlty_unin_fam_use.max_pp.iloc[0] > (self.pnlty_unin_fam_use.exemption.iloc[0] * pop_df['ind_income'])
pop_df['exemption'] = pop_df['exemption'] | premium_threshold | tax_threshold | max_pp_threshold | (pop_df['doc_status']==3)
# for tax_independent children
pop_df['ind_exemption']=pop_df['ind_exemption'] | ind_premium_threshold | ind_tax_threshold | ind_max_pp_threshold | (pop_df['doc_status']==3)
loc_ind_adult_child = np.where((pop_df['person_type'] > 3) & (pop_df['tax_dependent'] == 0))[0]
pop_df.loc[loc_ind_adult_child,'exemption']=pop_df.loc[loc_ind_adult_child,'ind_exemption']
pop_df.drop(['ind_exemption','total_premiums_individual'],inplace=True,axis=1)
mcaid_threshold = (pop_df['fpl'] < 100) & pop_df['mcaid_elig'] # unused
return pop_df
# this penalty function has to be called after cheapest_coverage is called, so the relevant variables will be
# available
[docs] def penalty(self, pop_df, exp_choices, individual_mandate, flat_penalties, multiplier):
"""
Assuming that pop_df has the column for exemption, once the cheapest_coverage is called
exemption is an indiviudal level variable
Also assume that exp_choices has fam_income at this point as well
"""
my_choices = exp_choices
# preparing for merging family income, exemption and number of children columns
exempt_cols = pop_df.pivot('hieu_id', 'person_type', 'exemption')
exempt_cols.columns = ['exemption_' + str(i) for i in range(8)]
exempt_cols = exempt_cols.reset_index()
exempt_cols.fillna(value=False,inplace=True)
my_choices = pd.merge(my_choices, exempt_cols, on='hieu_id', how='left')
doc_col_names = ['doc' + str(i) for i in range(8)]
doc_cols = pop_df.pivot('hieu_id', 'person_type', 'doc_status')
doc_cols.columns = doc_col_names
doc_cols.reset_index(inplace=True)
my_choices = pd.merge(my_choices, doc_cols, on='hieu_id', how='left')
# initialize penalty variables
penalty_cols = ['penalty_' + str(i) for i in range(8)]
for i in range(8):
my_choices['penalty_' + str(i)] = 0
if individual_mandate == 1:
adults = ['adult_1', 'adult_2', 'adult_child_1', 'adult_child_2', 'adult_child_3','adult_child_4']
adult_indices = [0,1,4,5,6,7]
children = ['child_1', 'child_2']
children_indices = [2,3]
# counting number of uninsured, non-exempted adults and children
my_choices['unins_adults'] = 0
my_choices['unins_children'] = 0
for index, person in enumerate(adults):
my_choices['unins_adults'] = my_choices['unins_adults'] + ((my_choices[person]==1).astype(int)) * \
((~(my_choices['exemption_'+ str(adult_indices[index])])).astype(int))* \
(((my_choices['tax_dependent_'+str(adult_indices[index])]==1)|(adult_indices[index]<4)).astype(int))
for index, person in enumerate(children):
my_choices['unins_children'] = my_choices['unins_children'] + \
my_choices['n_person_'+str(children_indices[index])]*((my_choices[person]==1).astype(int)) * \
((~(my_choices['exemption_'+ str(children_indices[index])])).astype(int))
my_choices['method1_part1'] = (my_choices['unins_adults'] + my_choices['unins_children']/2)*float(self.pnlty_unin_ind_use.adult)
my_choices['method2_part1'] = np.minimum(my_choices['unins_adults'] + my_choices['unins_children'], 5)*float(self.pnlty_unin_fam_use.max_pp)
my_choices['method1'] = np.minimum(my_choices['method1_part1'], float(self.pnlty_unin_ind_use.max_fam))
my_choices['method2'] = np.minimum(my_choices['method2_part1'], my_choices['fam_income']*float(self.pnlty_unin_fam_use.fam_percent))
my_choices['ind_penalty'] = np.maximum(my_choices['method1'], my_choices['method2'])
# redistributing the penalty to eligible individuals
my_choices['share_ind_penalty'] = my_choices['ind_penalty']/(my_choices['unins_adults']+my_choices['unins_children']/2)
for i, person in enumerate(adults):
my_choices['penalty_' + str(adult_indices[i])] = (my_choices[person]==1)*(1-my_choices['exemption_'+ str(adult_indices[i])])*my_choices['share_ind_penalty']
if (adult_indices[i]>3):
loc_tax_indpendent_i=np.where(my_choices['tax_dependent_'+str(adult_indices[i])]==0)[0]
my_choices['method1_part1_i'] = (my_choices[person]==1) * float(self.pnlty_unin_ind_use.adult)
my_choices['method1_i'] = np.minimum(my_choices['method1_part1_i'], float(self.pnlty_unin_ind_use.max_fam))
my_choices['method2_part1_i'] =(my_choices[person]==1) * float(self.pnlty_unin_fam_use.max_pp)
my_choices['method2_i']=np.minimum(my_choices['method2_part1_i'], my_choices['ind_income_'+str(adult_indices[i])]*float(self.pnlty_unin_fam_use.fam_percent))
my_choices['ind_penalty_i'] = np.maximum(my_choices['method1_i'], my_choices['method2_i'])
my_choices.loc[loc_tax_indpendent_i,'penalty_' + str(adult_indices[i])]=my_choices.loc[loc_tax_indpendent_i,'ind_penalty_i']
my_choices.drop(['method2_i','ind_penalty_i','method1_part1_i','method1_i','method2_part1_i'],inplace=True,axis=1)
for i, person in enumerate(children):
my_choices['penalty_' + str(children_indices[i])] = (my_choices[person]==1)*(1-my_choices['exemption_'+ str(children_indices[i])])* \
my_choices['share_ind_penalty'] / 2*my_choices['n_person_'+ str(children_indices[i])]
exemption_cols = ['exemption_' + str(i) for i in range(8)]
my_choices[penalty_cols] = my_choices[penalty_cols].fillna(0)
# applying multiplier
for i in range(8):
my_choices['penalty_orig_' + str(i)] = my_choices['penalty_' + str(i)]
my_choices[penalty_cols] = my_choices[penalty_cols]*multiplier
# applying flat amount
for i in range(8):
# categories: undoc, doc+exempted, doc+unexempted
col = (my_choices[doc_col_names[i]] == 3) * flat_penalties[0] \
+ ((my_choices[exemption_cols[i]] == True) & (my_choices[doc_col_names[i]].isin([0,1,2]))) * flat_penalties[1] \
+ ((my_choices[exemption_cols[i]] == False) & (my_choices[doc_col_names[i]].isin([0,1,2]))) * flat_penalties[2]
my_choices['penalty_' + str(i)] += col
list=['method1', 'method2', 'method1_part1', 'method2_part1','unins_adults', 'unins_children']
my_choices.drop(list, inplace=True, axis=1)
else:
doc_col_names = ['doc' + str(i) for i in range(8)]
exemption_cols = ['exemption_' + str(i) for i in range(8)]
for i in range(8):
my_choices['penalty_orig_' + str(i)] = my_choices['penalty_' + str(i)]
# applying flat amount
for i in range(8):
# categories: doc+exempted, doc+unexempted
col = ((my_choices[exemption_cols[i]] == True) & (my_choices[doc_col_names[i]].isin([0,1,2]))) * flat_penalties[1] \
+ ((my_choices[exemption_cols[i]] == False) & (my_choices[doc_col_names[i]].isin([0,1,2]))) * flat_penalties[2]
my_choices['penalty_' + str(i)] += col
my_choices.drop(doc_col_names, inplace=True, axis=1)
return my_choices
def _wide_to_long(self, wide_data, to_melt=[], hieu_specific_cols=[]):
"""
input:
wide_data: wide dataframe
to_melt: list of column names designating those that need to be combined
* for all column in to_melt, wide_data must contain column0-column7
* OR 'column' : [list of 8 columns to combine, in CORRECT order] must
* be in special_col_cases
hieu_specific_cols: list of columns that have values that are the same across
all individuals in a family; they will appear in long_data with column names
of the form hieu_[column_name]
output:
long_data: long dataframe after the conversion finishes in this function
"""
wide_data['ins_p2'] = 0 # harmonizing the ins_* columns
wide_data['ins_p3'] = 0
# subset to real people/choices
### SPECIAL CASES
people = ['adult_1', 'adult_2', 'child_1', 'child_2', 'adult_child_1', 'adult_child_2','adult_child_3', 'adult_child_4']
special_col_cases = {
'people' : people,
'person_type' : [i for i in range(8)],
'p_id' : ['p' + str(i) + '_id' for i in range(8)],
'xc_prem' : ['xc_prem_' + str(i) for i in range(8)],
}
### initialize base structure with hieu_id, person_type, and choice
long_data = pd.melt(wide_data, ['hieu_id'], special_col_cases['people'], 'person_type', 'choice')
long_data.replace(people, [i for i in range(8)], inplace=True)
long_data.sort_values(['hieu_id', 'person_type'], inplace=True)
long_data.reset_index(inplace=True)
long_data.drop('index', 1, inplace=True)
### go through to_melt list and convert them to long data
for column in to_melt:
# generate columns to combine and melt
try:
column_list = special_col_cases[column]
except:
if column + '0' in list(wide_data.columns.values):
column_list = [column + str(i) for i in range(8)]
elif column + '1' in list(wide_data.columns.values):
column_list = [column + str(i + 1) for i in range(8)]
else:
raise ValueError(column + " does not exist in data")
# melt columns and fix ordering of rows so they match that of long_data
melted_df = pd.melt(wide_data, ['hieu_id'], column_list)
if column in special_col_cases:
melted_df['variable'] = pd.Categorical(melted_df['variable'], special_col_cases[column])
melted_df.sort_values(['hieu_id', 'variable'], inplace=True)
melted_df.reset_index(inplace=True)
# add column to long_data
if column.endswith('_'):
column = column[:-1]
long_data[column] = melted_df['value']
### go through hieu_specific_cols list and convert them to long data
if len(hieu_specific_cols) > 0:
wide_data.index = wide_data['hieu_id']
long_data.index = long_data['hieu_id']
for column in hieu_specific_cols:
long_data["hieu_" + column] = wide_data[column]
wide_data.reset_index(inplace=True, drop=True)
long_data.reset_index(inplace=True, drop=True)
### merge with hieu_table to get the sampling weights
long_data = long_data[np.isfinite(long_data['p_id'])]
long_data.rename(columns={'p_id':'person_id'},inplace=True)
return long_data