Source code for policy_modules.firm_tax

#!/usr/bin/env python
from utility_modules.get_parameters import get_params
import numpy as np
import pandas as pd

[docs]class FirmTax: # Borg singleton config object __shared_state = {} def __init__(self, policy_dict = None): # implement the borg pattern self.__dict__ = self.__shared_state if policy_dict != None: self.biz_tax = get_params(policy_dict['biz_tax']) self.w_comp = get_params(policy_dict['w_comp'])
[docs] def total_tax(self, firm_id, Firm, Worker): """ input: firm_id, Firm table (assumes Worker linkage) effect: none output: total tax burden not including excise taxes for ESI """ curr_workers = Worker[Worker.firm_id == firm_id] #curr_workers = pd.merge(curr_workers, Worker.Hieu[['person_id', 'p_weight']], # on = 'person_id', how = 'left') n_rows = curr_workers.shape[0] p_wghts = np.matrix(curr_workers.in_firm_weight.tolist()).T if np.isnan(p_wghts).sum(): raise AssertionError("in_firm_weight contains NaN values") wages = np.matrix(curr_workers.wages.tolist()).T tax = 0 # calculate general taxes for row in self.biz_tax.iterrows(): tax =+ self._tax_calculate(row, curr_workers, n_rows, p_wghts, wages) # calculate industry specific tax tax =+ self._industry_taxes(Firm, firm_id, curr_workers, n_rows, p_wghts, wages) return int(tax)
def _tax_calculate(self, row, curr_workers, n_rows, p_wghts, wages): """ input: row (of the tax table); assumes: self.curr_workers is created effect: none output: total tax for the tax type """ if row[1]['income_limit_has']: rate_limit = np.repeat(row[1]['income_limit_amount'], n_rows) rate_limit *= row[1]['rate'] rate_limit = np.matrix(rate_limit).T wage_mtx = np.append(row[1]['rate']* wages, rate_limit, axis = 1) col_idx = (wage_mtx[:,0]>=wage_mtx[:,1]).astype('int') idx = [np.array(range(0, n_rows)), np.array(col_idx)[:,0]] return int((np.multiply(p_wghts, wage_mtx[idx].T)).sum(axis=0)) else: return int((row[1]['rate'] * np.multiply(p_wghts, wages)).sum(axis=0)) def _industry_taxes(self, Firm, firm_id, curr_workers, n_rows, p_wghts, wages): """ inputs: firm_id; n_rows (number of rows); p_wghts; wages effect: none output: industry specific taxes """ mrg_df = pd.merge(Firm[Firm.firm_id == firm_id][['firm_id', 'industry']], self.w_comp, on = 'industry', how = 'inner') if mrg_df.shape[0] == 1: i_rate = mrg_df['rate'].values[0] return int((i_rate * np.multiply(p_wghts, wages)).sum(axis=0)) else: # TODO make own excpetion class raise TypeError