Source code for policy_modules.fam_tax

#!/usr/bin/env python
import re

from utility_modules.get_parameters import get_params
import numpy as np
import pandas as pd
### works both for federal tax and state tax
[docs]class Tax: def __init__(self, tax_rates = None, tax_deductions = None): self.tax_rates = get_params(tax_rates) self.deductions = get_params(tax_deductions)
[docs] def set_year(self, year): if year not in self.tax_rates.year.values: raise AssertionError if year not in self.deductions.year.values: raise AssertionError self.year = year self.curr_tax_rates = self.tax_rates[self.tax_rates.year == year] self.curr_deductions = self.deductions[self.deductions.year == year] # Below added for performance improvements 4/7/2017 self.xp = {} self.fp = {} for txfstat in ['single_base', 'joint_base', 'hhead_base']: self.xp[txfstat] = self.tax_rates.loc[self.tax_rates['year']==self.year][txfstat] self.xp[txfstat] = pd.to_numeric(self.xp[txfstat]) self.fp[txfstat] = self.tax_rates.loc[self.tax_rates['year']==self.year][txfstat+'_tax'] self.fp[txfstat] = pd.to_numeric(self.fp[txfstat])
[docs] def determine_marginal_limit(self, agi, tax_limits): limit_mtx = np.ones([agi.shape[0], tax_limits.shape[0]]) limit_mtx = np.multiply(limit_mtx, np.matrix(tax_limits)) marginal_limit = (np.matrix(agi).T >= limit_mtx).sum(axis = 1) - 1 marginal_limit[np.where(marginal_limit == -1)] = 0 marginal_limit = np.ravel(marginal_limit) return marginal_limit
[docs] def determine_total_taxes(self, income, family): if family.shape[0] != tax_bracket.shape[0]: raise AssertionError('family and bracket are not the same shape') fam_taxes = np.zeros([income.shape[0], ]) for bracket in family.tax_filing_status.unique(): loc = np.where(family.tax_filing_status == bracket) tax_limits = self.curr_tax_rates[bracket + '_base'] base_taxes = self.curr_tax_rates[bracket + '_base_tax'] marginal_limit = self.determine_marginal_limit(income[loc[0]], tax_limits) temp_tax = base_taxes.values[marginal_limit] temp_tax += (income.values[loc[0]] - tax_limits.values[marginal_limit]) * self.curr_tax_rates.marginal_rates.values[marginal_limit] fam_taxes[loc[0]] = temp_tax fam_taxes[np.where(fam_taxes < 0)] = 0 return fam_taxes
[docs] def determine_tax_rate(self, income, family): if family.shape[0] != income.shape[0]: raise AssertionError('family and bracket are not the same shape') tax_rates = np.zeros([income.shape[0], ]) for bracket in family.tax_filing_status.unique(): loc = np.where(family.tax_filing_status== bracket) tax_limits = self.curr_tax_rates[bracket + '_base'] base_taxs = self.curr_tax_rates[bracket + '_base_tax'] marginal_limit = self.determine_marginal_limit(income[loc[0]], tax_limits) tax_rates[loc[0]] = self.curr_tax_rates.marginal_rates.values[marginal_limit] return tax_rates
[docs] def base_deduction(self, filing_status): temp_df = pd.DataFrame(filing_status) temp_df.columns = ['type'] filing_status_types = ["single", "joint", "hhead"] temp_df = pd.merge(temp_df, self.curr_deductions[self.curr_deductions.type != 'dependent'], on = 'type', how = 'left') return temp_df['value']
[docs] def dependent_deduction(self, dependents): dependent_value = self.curr_deductions[self.curr_deductions.type == 'dependent'].value.values if len(dependent_value) > 1: raise AssertionError return dependent_value[0] * dependents
[docs] def total_deductions(self, family): base = self.base_deduction(family['tax_filing_status']) dependent = self.dependent_deduction(family['num_dependents']) return base + dependent
[docs] def calculate_tax_amount(self, family): curr_agi = family['fam_income'].copy() curr_agi -= self.total_deductions(family) fam_taxes = self.determine_total_taxes(curr_agi, family) return fam_taxes
[docs] def calculate_marginal_rate(self, family): curr_agi = family['fam_income'].copy() curr_agi -= self.total_deductions(family) tax_rates = self.determine_tax_rate(curr_agi, family) return tax_rates
### Qiyang's code
[docs] def calculate_tax_h(self, year, txfstat, incomeArr): xp = self.tax_rates.loc[self.tax_rates['year']==year][txfstat] fp = self.tax_rates.loc[self.tax_rates['year']==year][txfstat+'_tax'] taxArr = np.interp(incomeArr, xp, fp) return taxArr
[docs] def planIsInFirm(self, plan_id, firm_id, df_esi): # need to check curr_esi_table.csv! #return plan_id == firm_id: return not df_esi[ (df_esi["firm_id"]==firm_id) & (df_esi["plan_id"]==plan_id)].empty
# famMemberLst[0:'age', 1:'undoc', 2:'prem cap', 3:'plan_id', 4:'firm_id', 5:'Pr1')]
[docs] def prem_pretax_sum(self, famMemLst, esi_drop, df_esi): ppsum = 0 for m in famMemLst : if esi_drop and self.planIsInFirm(m[3], m[4], df_esi): continue if m[5] is None: continue ppsum = ppsum + m[5] return ppsum
[docs] def calculate_tax_h_df_iter(self, year, incomeArr, df, esi_drop, df_esi): x = df['income'].shape[0] taxArr = np.empty((x, 1), dtype=float) for ind, tr in df.iterrows(): income = incomeArr[int(ind)] txfstat = tr['txfstat'] famMemLst = tr['fam_members'] taxable_income = income - self.prem_pretax_sum(famMemLst, esi_drop, df_esi) xp = self.tax_rates.loc[self.tax_rates['year']==year][txfstat] xp = pd.to_numeric(xp) fp = self.tax_rates.loc[self.tax_rates['year']==year][txfstat+'_tax'] fp = pd.to_numeric(fp) taxArr[int(ind)] = np.interp(taxable_income, xp, fp) return taxArr
[docs] def cal_tax_row(self, income, txfstat, famMemLst) : taxable_income = income - self.prem_pretax_sum(famMemLst, self.esi_drop, self.df_esi) return np.interp(taxable_income, self.xp[txfstat], self.fp[txfstat])
[docs] def calculate_tax_h_df(self, year, incomeArr, df, esi_drop, df_esi): self.esi_drop = esi_drop self.df_esi = df_esi vcal_tax = np.vectorize(self.cal_tax_row) taxArr = vcal_tax(incomeArr, df['txfstat'], df['fam_members']) return taxArr
[docs]class FedTax(Tax): """ Borg singleton config object for FedTax """ __shared_state = {} def __init__(self, tax_rates = None, tax_deductions = None): # implement the borg pattern self.__dict__ = self.__shared_state if (tax_rates != None) and (tax_deductions != None): Tax.__init__(self, tax_rates, tax_deductions)
[docs]class StateTax(Tax): """ Borg singleton config object for StateTax """ __shared_state = {} def __init__(self, tax_rates = None, tax_deductions = None): # implement the borg pattern self.__dict__ = self.__shared_state if (tax_rates != None) and (tax_deductions != None): Tax.__init__(self, tax_rates, tax_deductions)