Source code for policy_modules.firm_tax
#!/usr/bin/env python
from utility_modules.get_parameters import get_params
import numpy as np
import pandas as pd
[docs]class FirmTax:
# Borg singleton config object
__shared_state = {}
def __init__(self, policy_dict = None):
# implement the borg pattern
self.__dict__ = self.__shared_state
if policy_dict != None:
self.biz_tax = get_params(policy_dict['biz_tax'])
self.w_comp = get_params(policy_dict['w_comp'])
[docs] def total_tax(self, firm_id, Firm, Worker):
"""
input:
firm_id, Firm table (assumes Worker linkage)
effect:
none
output:
total tax burden not including excise taxes for ESI
"""
curr_workers = Worker[Worker.firm_id == firm_id]
#curr_workers = pd.merge(curr_workers, Worker.Hieu[['person_id', 'p_weight']],
# on = 'person_id', how = 'left')
n_rows = curr_workers.shape[0]
p_wghts = np.matrix(curr_workers.in_firm_weight.tolist()).T
if np.isnan(p_wghts).sum():
raise AssertionError("in_firm_weight contains NaN values")
wages = np.matrix(curr_workers.wages.tolist()).T
tax = 0
# calculate general taxes
for row in self.biz_tax.iterrows():
tax =+ self._tax_calculate(row, curr_workers, n_rows, p_wghts, wages)
# calculate industry specific tax
tax =+ self._industry_taxes(Firm, firm_id, curr_workers, n_rows, p_wghts, wages)
return int(tax)
def _tax_calculate(self, row, curr_workers, n_rows, p_wghts, wages):
"""
input:
row (of the tax table);
assumes:
self.curr_workers is created
effect:
none
output:
total tax for the tax type
"""
if row[1]['income_limit_has']:
rate_limit = np.repeat(row[1]['income_limit_amount'], n_rows)
rate_limit *= row[1]['rate']
rate_limit = np.matrix(rate_limit).T
wage_mtx = np.append(row[1]['rate']* wages, rate_limit, axis = 1)
col_idx = (wage_mtx[:,0]>=wage_mtx[:,1]).astype('int')
idx = [np.array(range(0, n_rows)), np.array(col_idx)[:,0]]
return int((np.multiply(p_wghts, wage_mtx[idx].T)).sum(axis=0))
else:
return int((row[1]['rate'] * np.multiply(p_wghts, wages)).sum(axis=0))
def _industry_taxes(self, Firm, firm_id, curr_workers, n_rows, p_wghts, wages):
"""
inputs:
firm_id; n_rows (number of rows); p_wghts; wages
effect:
none
output:
industry specific taxes
"""
mrg_df = pd.merge(Firm[Firm.firm_id == firm_id][['firm_id', 'industry']],
self.w_comp,
on = 'industry', how = 'inner')
if mrg_df.shape[0] == 1:
i_rate = mrg_df['rate'].values[0]
return int((i_rate * np.multiply(p_wghts, wages)).sum(axis=0))
else:
# TODO make own excpetion class
raise TypeError