#!/usr/bin/env python
import re
from utility_modules.get_parameters import get_params
import numpy as np
import pandas as pd
### works both for federal tax and state tax
[docs]class Tax:
def __init__(self, tax_rates = None, tax_deductions = None):
self.tax_rates = get_params(tax_rates)
self.deductions = get_params(tax_deductions)
[docs] def set_year(self, year):
if year not in self.tax_rates.year.values:
raise AssertionError
if year not in self.deductions.year.values:
raise AssertionError
self.year = year
self.curr_tax_rates = self.tax_rates[self.tax_rates.year == year]
self.curr_deductions = self.deductions[self.deductions.year == year]
# Below added for performance improvements 4/7/2017
self.xp = {}
self.fp = {}
for txfstat in ['single_base', 'joint_base', 'hhead_base']:
self.xp[txfstat] = self.tax_rates.loc[self.tax_rates['year']==self.year][txfstat]
self.xp[txfstat] = pd.to_numeric(self.xp[txfstat])
self.fp[txfstat] = self.tax_rates.loc[self.tax_rates['year']==self.year][txfstat+'_tax']
self.fp[txfstat] = pd.to_numeric(self.fp[txfstat])
[docs] def determine_marginal_limit(self, agi, tax_limits):
limit_mtx = np.ones([agi.shape[0], tax_limits.shape[0]])
limit_mtx = np.multiply(limit_mtx, np.matrix(tax_limits))
marginal_limit = (np.matrix(agi).T >= limit_mtx).sum(axis = 1) - 1
marginal_limit[np.where(marginal_limit == -1)] = 0
marginal_limit = np.ravel(marginal_limit)
return marginal_limit
[docs] def determine_total_taxes(self, income, family):
if family.shape[0] != tax_bracket.shape[0]:
raise AssertionError('family and bracket are not the same shape')
fam_taxes = np.zeros([income.shape[0], ])
for bracket in family.tax_filing_status.unique():
loc = np.where(family.tax_filing_status == bracket)
tax_limits = self.curr_tax_rates[bracket + '_base']
base_taxes = self.curr_tax_rates[bracket + '_base_tax']
marginal_limit = self.determine_marginal_limit(income[loc[0]], tax_limits)
temp_tax = base_taxes.values[marginal_limit]
temp_tax += (income.values[loc[0]] - tax_limits.values[marginal_limit]) * self.curr_tax_rates.marginal_rates.values[marginal_limit]
fam_taxes[loc[0]] = temp_tax
fam_taxes[np.where(fam_taxes < 0)] = 0
return fam_taxes
[docs] def determine_tax_rate(self, income, family):
if family.shape[0] != income.shape[0]:
raise AssertionError('family and bracket are not the same shape')
tax_rates = np.zeros([income.shape[0], ])
for bracket in family.tax_filing_status.unique():
loc = np.where(family.tax_filing_status== bracket)
tax_limits = self.curr_tax_rates[bracket + '_base']
base_taxs = self.curr_tax_rates[bracket + '_base_tax']
marginal_limit = self.determine_marginal_limit(income[loc[0]], tax_limits)
tax_rates[loc[0]] = self.curr_tax_rates.marginal_rates.values[marginal_limit]
return tax_rates
[docs] def base_deduction(self, filing_status):
temp_df = pd.DataFrame(filing_status)
temp_df.columns = ['type']
filing_status_types = ["single", "joint", "hhead"]
temp_df = pd.merge(temp_df, self.curr_deductions[self.curr_deductions.type != 'dependent'], on = 'type', how = 'left')
return temp_df['value']
[docs] def dependent_deduction(self, dependents):
dependent_value = self.curr_deductions[self.curr_deductions.type == 'dependent'].value.values
if len(dependent_value) > 1:
raise AssertionError
return dependent_value[0] * dependents
[docs] def total_deductions(self, family):
base = self.base_deduction(family['tax_filing_status'])
dependent = self.dependent_deduction(family['num_dependents'])
return base + dependent
[docs] def calculate_tax_amount(self, family):
curr_agi = family['fam_income'].copy()
curr_agi -= self.total_deductions(family)
fam_taxes = self.determine_total_taxes(curr_agi, family)
return fam_taxes
[docs] def calculate_marginal_rate(self, family):
curr_agi = family['fam_income'].copy()
curr_agi -= self.total_deductions(family)
tax_rates = self.determine_tax_rate(curr_agi, family)
return tax_rates
### Qiyang's code
[docs] def calculate_tax_h(self, year, txfstat, incomeArr):
xp = self.tax_rates.loc[self.tax_rates['year']==year][txfstat]
fp = self.tax_rates.loc[self.tax_rates['year']==year][txfstat+'_tax']
taxArr = np.interp(incomeArr, xp, fp)
return taxArr
[docs] def planIsInFirm(self, plan_id, firm_id, df_esi):
# need to check curr_esi_table.csv!
#return plan_id == firm_id:
return not df_esi[ (df_esi["firm_id"]==firm_id) & (df_esi["plan_id"]==plan_id)].empty
# famMemberLst[0:'age', 1:'undoc', 2:'prem cap', 3:'plan_id', 4:'firm_id', 5:'Pr1')]
[docs] def prem_pretax_sum(self, famMemLst, esi_drop, df_esi):
ppsum = 0
for m in famMemLst :
if esi_drop and self.planIsInFirm(m[3], m[4], df_esi): continue
if m[5] is None: continue
ppsum = ppsum + m[5]
return ppsum
[docs] def calculate_tax_h_df_iter(self, year, incomeArr, df, esi_drop, df_esi):
x = df['income'].shape[0]
taxArr = np.empty((x, 1), dtype=float)
for ind, tr in df.iterrows():
income = incomeArr[int(ind)]
txfstat = tr['txfstat']
famMemLst = tr['fam_members']
taxable_income = income - self.prem_pretax_sum(famMemLst, esi_drop, df_esi)
xp = self.tax_rates.loc[self.tax_rates['year']==year][txfstat]
xp = pd.to_numeric(xp)
fp = self.tax_rates.loc[self.tax_rates['year']==year][txfstat+'_tax']
fp = pd.to_numeric(fp)
taxArr[int(ind)] = np.interp(taxable_income, xp, fp)
return taxArr
[docs] def cal_tax_row(self, income, txfstat, famMemLst) :
taxable_income = income - self.prem_pretax_sum(famMemLst, self.esi_drop, self.df_esi)
return np.interp(taxable_income, self.xp[txfstat], self.fp[txfstat])
[docs] def calculate_tax_h_df(self, year, incomeArr, df, esi_drop, df_esi):
self.esi_drop = esi_drop
self.df_esi = df_esi
vcal_tax = np.vectorize(self.cal_tax_row)
taxArr = vcal_tax(incomeArr, df['txfstat'], df['fam_members'])
return taxArr
[docs]class FedTax(Tax):
"""
Borg singleton config object for FedTax
"""
__shared_state = {}
def __init__(self, tax_rates = None, tax_deductions = None):
# implement the borg pattern
self.__dict__ = self.__shared_state
if (tax_rates != None) and (tax_deductions != None):
Tax.__init__(self, tax_rates, tax_deductions)
[docs]class StateTax(Tax):
"""
Borg singleton config object for StateTax
"""
__shared_state = {}
def __init__(self, tax_rates = None, tax_deductions = None):
# implement the borg pattern
self.__dict__ = self.__shared_state
if (tax_rates != None) and (tax_deductions != None):
Tax.__init__(self, tax_rates, tax_deductions)