Source code for pyDEA.core.models.envelopment_model_base

''' This module contains a base class that implements envelopment model.
'''
import pulp

from pyDEA.core.models.model_base import ModelBase
from pyDEA.core.utils.dea_utils import ZERO_TOLERANCE


[docs]class EnvelopmentModelBase(ModelBase): ''' This class is a base class for different envelopment models. It implements general structure of all envelopment models. Concrete variations of envelopment models are passed in the class constructor. Attributes: _concrete_model: concrete implementation of the envelopment model. _constraint_creator: object that creates a proper constraint depending on presence of disposable variables. _variables (dict of str to pulp.LpVariable): dictionary of pulp variables than maps variable names to pulp variables. _constraints (dict of str to str): dictionary that maps name of the category to the name of the corresponding constraint. _should_add_efficiency (bool): if set to False, previously stored efficiency score is used. This variable is changed in two phase model. Args: input_data (InputData): object that stores all data of a DEA instance. concrete_model: concrete implementation of the envelopment model. constraint_creator: object that creates a proper constraint depending on presence of disposable variables. ''' def __init__(self, input_data, concrete_model, constraint_creator): super(EnvelopmentModelBase, self).__init__(input_data) self._concrete_model = concrete_model self._constraint_creator = constraint_creator self._variables = dict() self._constraints = dict() self._should_add_efficiency = True def _create_lp(self): ''' Creates initial linear program. ''' assert len(self.input_data.DMU_codes) != 0 self._variables.clear() self._constraints.clear() # create LP for the first DMU - it is the easiest way to # adapt current code for elem in self.input_data.DMU_codes: dmu_code = elem break orientation = self._concrete_model.get_orientation() obj_type = self._concrete_model.get_objective_type() self.lp_model = pulp.LpProblem('Envelopment model: {0}-' 'oriented'.format(orientation), obj_type) ub_obj = self._concrete_model.get_upper_bound_for_objective_variable() obj_variable = pulp.LpVariable('Variable in objective function', self._concrete_model. get_lower_bound_for_objective_variable(), ub_obj, pulp.LpContinuous) self._variables['obj_var'] = obj_variable self.lp_model += (obj_variable, 'Efficiency score or inverse of efficiency score') lambda_variables = pulp.LpVariable.dicts('lambda', self.input_data.DMU_codes, 0, None, pulp.LpContinuous) self._variables.update(lambda_variables) self._add_constraints_for_outputs(lambda_variables, dmu_code, obj_variable) self._add_constraints_for_inputs(lambda_variables, dmu_code, obj_variable) def _update_lp(self, dmu_code): ''' Updates existing linear program with coefficients corresponding to a given DMU. Args: dmu_code (str): DMU code. ''' for output_category in self.input_data.output_categories: constraint_name = self._constraints[output_category] current_output = self.input_data.coefficients[(dmu_code, output_category)] constraint = self.lp_model.constraints[constraint_name] self._concrete_model.update_output_category_coefficient( current_output, constraint, self._variables['obj_var'], output_category) for input_category in self.input_data.input_categories: current_input = self.input_data.coefficients[(dmu_code, input_category)] constraint_name = self._constraints[input_category] constraint = self.lp_model.constraints[constraint_name] self._concrete_model.update_input_category_coefficient( current_input, constraint, self._variables['obj_var'], input_category) def _add_constraints_for_outputs(self, variables, dmu_code, obj_variable): ''' Adds constraints for outputs to linear program. Args: variables (dict of str to pulp.LpVariable): a dictionary that maps DMU codes to pulp.LpVariable, created with pulp.LpVariable.dicts. dmu_code (str): DMU code for which LP is being created. obj_variable (pulp.LpVariable): LP variable that is optimised (either efficiency score or inverse of efficiency score). ''' for (count, output_category) in enumerate( self.input_data.output_categories): current_output = self.input_data.coefficients[(dmu_code, output_category)] output_coeff = self._concrete_model.get_output_variable_coefficient( obj_variable, output_category) sum_all_outputs = pulp.lpSum([variables[dmu] * self.input_data.coefficients [(dmu, output_category)] for dmu in self.input_data.DMU_codes]) name = 'constraint_output_{count}'.format(count=count) self.lp_model += (self._constraint_creator.create( -output_coeff * current_output + sum_all_outputs, 0, output_category), name) self._constraints[output_category] = name def _add_constraints_for_inputs(self, variables, dmu_code, obj_variable): ''' Adds constraints for inputs to LP. Args: variables (dict of {str: pulp.LpVariable}): a dictionary that maps DMU codes to pulp.LpVariable, created with pulp.LpVariable.dicts. dmu_code (str): DMU code for which LP is being created. obj_variable (pulp.LpVariable): LP variable that is optimised (either efficiency score or inverse of efficiency score). ''' for (count, input_category) in enumerate( self.input_data.input_categories): current_input = self.input_data.coefficients[(dmu_code, input_category)] input_coeff = self._concrete_model.get_input_variable_coefficient( obj_variable, input_category) sum_all_inputs = pulp.lpSum([variables[dmu] * self.input_data.coefficients [(dmu, input_category)] for dmu in self.input_data.DMU_codes]) name = 'constraint_input_{count}'.format(count=count) self.lp_model += (self._constraint_creator.create( input_coeff * current_input - sum_all_inputs, 0, input_category), name) self._constraints[input_category] = name def _fill_solution(self, dmu_code, model_solution): ''' Fills given solution with data calculated for one DMU. Args: dmu_code (str): DMU code for which the LP was solved. model_solution (Solution): object where solution for one DMU will be written. ''' model_solution.orientation = self._concrete_model.get_orientation() model_solution.add_lp_status(dmu_code, self.lp_model.status) if self.lp_model.status == pulp.LpStatusOptimal: lambda_variables = dict() for dmu in self.input_data.DMU_codes: var = self._variables.get(dmu, None) if (var is not None and var.varValue is not None and abs(var.varValue) > ZERO_TOLERANCE): lambda_variables[dmu] = var.varValue if self._should_add_efficiency: model_solution.add_efficiency_score( dmu_code, self._concrete_model.process_obj_var (pulp.value(self.lp_model.objective))) model_solution.add_lambda_variables(dmu_code, lambda_variables) self._process_duals(dmu_code, self.input_data.input_categories, model_solution.add_input_dual) self._process_duals(dmu_code, self.input_data.output_categories, model_solution.add_output_dual) def _process_duals(self, dmu_code, categories, func): ''' Helper function that adds duals to solution using given method func. Helps to avoid code duplication. Args: dmu_code (str): DMU code under consideration. categories (list of str): list of either input or output categories. func (function): a function that accepts DMU code, category and dual variable value and adds it to solution. ''' for category in categories: constraint_name = self._constraints[category] dual = self._concrete_model.process_dual_value( self.lp_model.constraints[constraint_name].pi) func(dmu_code, category, dual)