Source code for src.WorkerTable
#!/usr/bin/env python
import pandas as pd
import numpy as np
import random
[docs]class Worker(pd.DataFrame):
def __init__(self, *args, **kwargs):
super(Worker, self).__init__(*args, **kwargs)
[docs] def match_wage(self, a, b, thresh = 0.10):
"""
take two pandas series of wages and determine the matching index of
series b within a given threshold
"""
import pandas as pd
import numpy as np
#determine size of each
y = b.size
x = a.size
#create table of ones
thresh_diff = np.matrix(a) * thresh
mtx_b = np.ones(shape=(x,y)) * b.values
diff_mtx = abs(mtx_b.T - np.matrix(a))
matches = pd.DataFrame(diff_mtx < thresh_diff)
return(matches)
[docs] def adjust_hours(self, x, y, match):
"""
takes matched worker (x) and (y) and returns adjusted hours;
(x) workers are between either 30 and 32.5 or 32.5 and 35;
(x) workers will either receive or donate enough hours to get out of 35, weighted allowing
"""
import pandas as pd
import numpy as np
if all(self.hrs_week[x] >= 32.5):
hrs = 40
z = -1
elif all(self.hrs_week[x] <= 32.5):
hrs = 30
z = 1
ones_mtx = np.ones(shape=(sum(x),sum(y)))
#set p_weights
if not hasattr(self, 'p_weight'):
self.p_weight = pd.merge(self[['person_id']], self.Hieu[['person_id','p_weight']]).p_weight
#determine maximum weights for y to receive
sum_y_wgts = np.array((self.p_weight[y].values * ones_mtx).T * match).sum(axis=0)
wgts_y = np.array((self.p_weight[y].values * ones_mtx).T * match)/sum_y_wgts
wgts_y[np.isnan(wgts_y)] = 0
y_tot = (hrs - self.hrs_week[y]) * self.p_weight[y].values
#determine maximum weights for x to give
sum_x_wgts = np.array((self.p_weight[x].values * ones_mtx.T).T * match.T).sum(axis=1)
wgts_x = np.array((self.p_weight[x].values * ones_mtx.T).T * match.T).T/sum_x_wgts
wgts_x[np.isnan(wgts_x)] = 0
x_tot = (abs((hrs - self.hrs_week[x]) * self.p_weight[x]).values)
#max number of weighted hours that person can receive
max_y_mtx = wgts_y * (ones_mtx * y_tot.values).T
max_x_mtx = wgts_x * (ones_mtx.T * x_tot)
min_mtx = np.matrix(max_y_mtx > max_x_mtx)
max_comp_mtx = max_y_mtx
max_comp_mtx[min_mtx] = max_x_mtx[min_mtx]
#adjust hours on self
self.loc[y, 'hrs_week'] = self.hrs_week[y] + z*((max_comp_mtx.sum(axis=1) / self.p_weight[y]))
self.loc[x, 'hrs_week'] = self.hrs_week[x] - z*((max_comp_mtx.sum(axis=0) / self.p_weight[x]))
[docs] def match_adjust(self, x, y):
"""
x is the index for the workers working from either 30 to 32.5 or 32.5 to 35
y is the index for the workers working under 30 hours or above 35
df is a shallow copy (pointer) of the WorkerTable instance
"""
import numpy as np
#test to see if there are matches to be found
if (sum(x) == 0) or (sum(y) == 0):
return
else:
matches = self.match_wage(self[x].hourly_wage, self[y].hourly_wage)
#test if matches is valid
if (len(matches.columns)<1 or len(matches.index)<1):
return
else:
self.adjust_hours(x, y, matches)
[docs] def reduce_wkr_hours(self, firm):
"""
reallocates hours to minimize the number of workers that work between 30 and 35 hrs/wk
"""
#return an index of matched firms
if firm not in self.firm_id:
firm_index = self.firm_id == firm
else:
return
#index workers into: <30 hrs and >35 hrs; don't change hours
#and 30-32.5, 32.5, and 32.5-35; move these, randomly choose for 32.5
wrk_30 = (firm_index) & (self.hrs_week<=30)
wrk_30_325 = (firm_index) & (self.hrs_week>30) & (self.hrs_week<32.5)
wrk_325 = (firm_index) & (self.hrs_week==32.5)
wrk_325_35 = (firm_index) & (self.hrs_week>32.5) & (self.hrs_week<35)
wrk_35 = (firm_index) & (self.hrs_week>=35)
#randomly choose for 32.5 hours
rand_choice = np.random.randint(2, size=sum(wrk_325))
wrk_325_down = wrk_325.copy()
wrk_325_up = wrk_325.copy()
wrk_325_down[wrk_325==True] = rand_choice
wrk_325_up[wrk_325==True] = abs(rand_choice-1)
#select workers who move down and who move up
move_down = wrk_30_325 | wrk_325_down
move_up = wrk_325_35 | wrk_325_up
self.match_adjust(move_down, wrk_30)
self.match_adjust(move_up, wrk_35)