Source code for policy_modules.uninsured_penalty

 #!/usr/bin/env python

import numpy as np
import pandas as pd
from utility_modules.get_parameters import get_params
import pdb

[docs]class UninsuredPenalty: # Borg simpleton config object __shared_state = {} def __init__(self, policy_dict = None): # implement borg pattern self.__dict__ = self.__shared_state if policy_dict != None: self.pnlty_unin_fam = get_params(policy_dict['pnlty_unin_f']) self.pnlty_unin_ind = get_params(policy_dict['pnlty_unin_ind']) self.tax_thres = get_params(policy_dict['tax_thres'])
[docs] def set_year(self, curr_year): """ return slice of pertinent year allows this to be reset at begining of each simulation year """ self.year = curr_year if int(curr_year) in self.pnlty_unin_fam.year.values: self.pnlty_unin_fam_use = self.pnlty_unin_fam.loc[self.pnlty_unin_fam.year == curr_year] self.pnlty_unin_ind_use = self.pnlty_unin_ind.loc[self.pnlty_unin_ind.year == curr_year] else: raise ValueError("pnlty_unin_f sheet does not contain information on year under simulation") self.tax_yr_thres = self.tax_thres.loc[self.tax_thres.year == curr_year]
[docs] def cheapest_coverage(self, exp_choices, pop_df): """ input: choice table with esi premium and exchange premium calculated pop_df with fam_income return: hieu_level information on if a family has affordable plan the function determines based on list of criteria those exempt from paying penalty self.pnlt_unin_fam_use.max_pp is the lowest cost bronze plan """ people = ['adult_1', 'adult_2', 'child_1', 'child_2', 'adult_child_1', 'adult_child_2', \ 'adult_child_3','adult_child_4'] people_indices = [0, 1, 2, 3, 4, 5, 6, 7] # step 1: subset to rows where nobody is uninsured and find the least expensive cost for person in people: exp_choices = exp_choices[exp_choices[person] != 1] exp_choices.reset_index(inplace=True) #substract premium from tiac for index, person in enumerate(people): if index > 3: loc_tiac = np.where(exp_choices['tax_dependent_' + str(index)] == 0)[0] exp_choices.loc[loc_tiac, 'total_premiums']=exp_choices.loc[loc_tiac,'total_premiums']-exp_choices.loc[loc_tiac,'xc_prem_'+str(index)]-exp_choices.loc[loc_tiac,'esi_premium_'+str(index)] #create a long table for TIAC student to_melt=['p_id','xc_prem','esi_premium_','tax_dependent_'] wide_data=self._wide_to_long(exp_choices,to_melt) wide_data['total_premiums_individual']=wide_data['xc_prem']+wide_data['esi_premium'] loc_ind_adult_child = np.where((wide_data['person_type'] > 3) & (wide_data['tax_dependent'] == 0))[0] # step 2: calculate the minimum of total premiums to cover everyone,first family exclude TIAC, Then calculate individually for TIAC family_df =exp_choices.groupby('hieu_id')['total_premiums'].min().reset_index() tiac_df=wide_data.loc[loc_ind_adult_child].groupby('person_id')['total_premiums_individual'].min().reset_index() pop_df = pd.merge(pop_df, family_df, on='hieu_id', how='left') pop_df = pd.merge(pop_df, tiac_df, on='person_id', how='left') loc_ind_adult_child = np.where((pop_df['person_type'] > 3) & (pop_df['tax_dependent'] == 0))[0] pop_df.loc[loc_ind_adult_child,'total_premiums']= pop_df.loc[loc_ind_adult_child,'total_premiums_individual'] # step 3 comparing with family income pop_df['exemption'] = False pop_df['ind_exemption']=False tax_threshold = (pop_df['tax_filing_status']=='single') & (pop_df['fam_income'] < self.tax_yr_thres['single'].iloc[0]) \ + (pop_df['tax_filing_status']=='joint') & (pop_df['fam_income'] < self.tax_yr_thres['joint'].iloc[0]) \ + (pop_df['tax_filing_status']=='hhead') & (pop_df['fam_income'] < self.tax_yr_thres['hhldhead'].iloc[0]) #for tax_independent children ind_tax_threshold = (pop_df['tax_filing_status'] == 'single') & (pop_df['ind_income'] < self.tax_yr_thres['single'].iloc[0]) \ + (pop_df['tax_filing_status'] == 'joint') & (pop_df['ind_income'] <self.tax_yr_thres['joint'].iloc[0]) \ + (pop_df['tax_filing_status'] == 'hhead') & (pop_df['ind_income'] < self.tax_yr_thres['hhldhead'].iloc[0]) # for esi policy holders and their families premium_threshold = pop_df['total_premiums'] > (self.pnlty_unin_fam_use.exemption.iloc[0] * pop_df['fam_income']) # for tax_independent children ind_premium_threshold = pop_df['total_premiums'] > (self.pnlty_unin_fam_use.exemption.iloc[0] * pop_df['ind_income']) # non esi offer individuals # TODO: need to subset to non-esi offer individuals max_pp_threshold = self.pnlty_unin_fam_use.max_pp.iloc[0] > (self.pnlty_unin_fam_use.exemption.iloc[0] * pop_df['fam_income']) ind_max_pp_threshold = self.pnlty_unin_fam_use.max_pp.iloc[0] > (self.pnlty_unin_fam_use.exemption.iloc[0] * pop_df['ind_income']) pop_df['exemption'] = pop_df['exemption'] | premium_threshold | tax_threshold | max_pp_threshold | (pop_df['doc_status']==3) # for tax_independent children pop_df['ind_exemption']=pop_df['ind_exemption'] | ind_premium_threshold | ind_tax_threshold | ind_max_pp_threshold | (pop_df['doc_status']==3) loc_ind_adult_child = np.where((pop_df['person_type'] > 3) & (pop_df['tax_dependent'] == 0))[0] pop_df.loc[loc_ind_adult_child,'exemption']=pop_df.loc[loc_ind_adult_child,'ind_exemption'] pop_df.drop(['ind_exemption','total_premiums_individual'],inplace=True,axis=1) mcaid_threshold = (pop_df['fpl'] < 100) & pop_df['mcaid_elig'] # unused return pop_df
# this penalty function has to be called after cheapest_coverage is called, so the relevant variables will be # available
[docs] def penalty(self, pop_df, exp_choices, individual_mandate, flat_penalties, multiplier): """ Assuming that pop_df has the column for exemption, once the cheapest_coverage is called exemption is an indiviudal level variable Also assume that exp_choices has fam_income at this point as well """ my_choices = exp_choices # preparing for merging family income, exemption and number of children columns exempt_cols = pop_df.pivot('hieu_id', 'person_type', 'exemption') exempt_cols.columns = ['exemption_' + str(i) for i in range(8)] exempt_cols = exempt_cols.reset_index() exempt_cols.fillna(value=False,inplace=True) my_choices = pd.merge(my_choices, exempt_cols, on='hieu_id', how='left') doc_col_names = ['doc' + str(i) for i in range(8)] doc_cols = pop_df.pivot('hieu_id', 'person_type', 'doc_status') doc_cols.columns = doc_col_names doc_cols.reset_index(inplace=True) my_choices = pd.merge(my_choices, doc_cols, on='hieu_id', how='left') # initialize penalty variables penalty_cols = ['penalty_' + str(i) for i in range(8)] for i in range(8): my_choices['penalty_' + str(i)] = 0 if individual_mandate == 1: adults = ['adult_1', 'adult_2', 'adult_child_1', 'adult_child_2', 'adult_child_3','adult_child_4'] adult_indices = [0,1,4,5,6,7] children = ['child_1', 'child_2'] children_indices = [2,3] # counting number of uninsured, non-exempted adults and children my_choices['unins_adults'] = 0 my_choices['unins_children'] = 0 for index, person in enumerate(adults): my_choices['unins_adults'] = my_choices['unins_adults'] + ((my_choices[person]==1).astype(int)) * \ ((~(my_choices['exemption_'+ str(adult_indices[index])])).astype(int))* \ (((my_choices['tax_dependent_'+str(adult_indices[index])]==1)|(adult_indices[index]<4)).astype(int)) for index, person in enumerate(children): my_choices['unins_children'] = my_choices['unins_children'] + \ my_choices['n_person_'+str(children_indices[index])]*((my_choices[person]==1).astype(int)) * \ ((~(my_choices['exemption_'+ str(children_indices[index])])).astype(int)) my_choices['method1_part1'] = (my_choices['unins_adults'] + my_choices['unins_children']/2)*float(self.pnlty_unin_ind_use.adult) my_choices['method2_part1'] = np.minimum(my_choices['unins_adults'] + my_choices['unins_children'], 5)*float(self.pnlty_unin_fam_use.max_pp) my_choices['method1'] = np.minimum(my_choices['method1_part1'], float(self.pnlty_unin_ind_use.max_fam)) my_choices['method2'] = np.minimum(my_choices['method2_part1'], my_choices['fam_income']*float(self.pnlty_unin_fam_use.fam_percent)) my_choices['ind_penalty'] = np.maximum(my_choices['method1'], my_choices['method2']) # redistributing the penalty to eligible individuals my_choices['share_ind_penalty'] = my_choices['ind_penalty']/(my_choices['unins_adults']+my_choices['unins_children']/2) for i, person in enumerate(adults): my_choices['penalty_' + str(adult_indices[i])] = (my_choices[person]==1)*(1-my_choices['exemption_'+ str(adult_indices[i])])*my_choices['share_ind_penalty'] if (adult_indices[i]>3): loc_tax_indpendent_i=np.where(my_choices['tax_dependent_'+str(adult_indices[i])]==0)[0] my_choices['method1_part1_i'] = (my_choices[person]==1) * float(self.pnlty_unin_ind_use.adult) my_choices['method1_i'] = np.minimum(my_choices['method1_part1_i'], float(self.pnlty_unin_ind_use.max_fam)) my_choices['method2_part1_i'] =(my_choices[person]==1) * float(self.pnlty_unin_fam_use.max_pp) my_choices['method2_i']=np.minimum(my_choices['method2_part1_i'], my_choices['ind_income_'+str(adult_indices[i])]*float(self.pnlty_unin_fam_use.fam_percent)) my_choices['ind_penalty_i'] = np.maximum(my_choices['method1_i'], my_choices['method2_i']) my_choices.loc[loc_tax_indpendent_i,'penalty_' + str(adult_indices[i])]=my_choices.loc[loc_tax_indpendent_i,'ind_penalty_i'] my_choices.drop(['method2_i','ind_penalty_i','method1_part1_i','method1_i','method2_part1_i'],inplace=True,axis=1) for i, person in enumerate(children): my_choices['penalty_' + str(children_indices[i])] = (my_choices[person]==1)*(1-my_choices['exemption_'+ str(children_indices[i])])* \ my_choices['share_ind_penalty'] / 2*my_choices['n_person_'+ str(children_indices[i])] exemption_cols = ['exemption_' + str(i) for i in range(8)] my_choices[penalty_cols] = my_choices[penalty_cols].fillna(0) # applying multiplier for i in range(8): my_choices['penalty_orig_' + str(i)] = my_choices['penalty_' + str(i)] my_choices[penalty_cols] = my_choices[penalty_cols]*multiplier # applying flat amount for i in range(8): # categories: undoc, doc+exempted, doc+unexempted col = (my_choices[doc_col_names[i]] == 3) * flat_penalties[0] \ + ((my_choices[exemption_cols[i]] == True) & (my_choices[doc_col_names[i]].isin([0,1,2]))) * flat_penalties[1] \ + ((my_choices[exemption_cols[i]] == False) & (my_choices[doc_col_names[i]].isin([0,1,2]))) * flat_penalties[2] my_choices['penalty_' + str(i)] += col list=['method1', 'method2', 'method1_part1', 'method2_part1','unins_adults', 'unins_children'] my_choices.drop(list, inplace=True, axis=1) else: doc_col_names = ['doc' + str(i) for i in range(8)] exemption_cols = ['exemption_' + str(i) for i in range(8)] for i in range(8): my_choices['penalty_orig_' + str(i)] = my_choices['penalty_' + str(i)] # applying flat amount for i in range(8): # categories: doc+exempted, doc+unexempted col = ((my_choices[exemption_cols[i]] == True) & (my_choices[doc_col_names[i]].isin([0,1,2]))) * flat_penalties[1] \ + ((my_choices[exemption_cols[i]] == False) & (my_choices[doc_col_names[i]].isin([0,1,2]))) * flat_penalties[2] my_choices['penalty_' + str(i)] += col my_choices.drop(doc_col_names, inplace=True, axis=1) return my_choices
def _wide_to_long(self, wide_data, to_melt=[], hieu_specific_cols=[]): """ input: wide_data: wide dataframe to_melt: list of column names designating those that need to be combined * for all column in to_melt, wide_data must contain column0-column7 * OR 'column' : [list of 8 columns to combine, in CORRECT order] must * be in special_col_cases hieu_specific_cols: list of columns that have values that are the same across all individuals in a family; they will appear in long_data with column names of the form hieu_[column_name] output: long_data: long dataframe after the conversion finishes in this function """ wide_data['ins_p2'] = 0 # harmonizing the ins_* columns wide_data['ins_p3'] = 0 # subset to real people/choices ### SPECIAL CASES people = ['adult_1', 'adult_2', 'child_1', 'child_2', 'adult_child_1', 'adult_child_2','adult_child_3', 'adult_child_4'] special_col_cases = { 'people' : people, 'person_type' : [i for i in range(8)], 'p_id' : ['p' + str(i) + '_id' for i in range(8)], 'xc_prem' : ['xc_prem_' + str(i) for i in range(8)], } ### initialize base structure with hieu_id, person_type, and choice long_data = pd.melt(wide_data, ['hieu_id'], special_col_cases['people'], 'person_type', 'choice') long_data.replace(people, [i for i in range(8)], inplace=True) long_data.sort_values(['hieu_id', 'person_type'], inplace=True) long_data.reset_index(inplace=True) long_data.drop('index', 1, inplace=True) ### go through to_melt list and convert them to long data for column in to_melt: # generate columns to combine and melt try: column_list = special_col_cases[column] except: if column + '0' in list(wide_data.columns.values): column_list = [column + str(i) for i in range(8)] elif column + '1' in list(wide_data.columns.values): column_list = [column + str(i + 1) for i in range(8)] else: raise ValueError(column + " does not exist in data") # melt columns and fix ordering of rows so they match that of long_data melted_df = pd.melt(wide_data, ['hieu_id'], column_list) if column in special_col_cases: melted_df['variable'] = pd.Categorical(melted_df['variable'], special_col_cases[column]) melted_df.sort_values(['hieu_id', 'variable'], inplace=True) melted_df.reset_index(inplace=True) # add column to long_data if column.endswith('_'): column = column[:-1] long_data[column] = melted_df['value'] ### go through hieu_specific_cols list and convert them to long data if len(hieu_specific_cols) > 0: wide_data.index = wide_data['hieu_id'] long_data.index = long_data['hieu_id'] for column in hieu_specific_cols: long_data["hieu_" + column] = wide_data[column] wide_data.reset_index(inplace=True, drop=True) long_data.reset_index(inplace=True, drop=True) ### merge with hieu_table to get the sampling weights long_data = long_data[np.isfinite(long_data['p_id'])] long_data.rename(columns={'p_id':'person_id'},inplace=True) return long_data