"""
We use the RuleFit implementation as provided by the following url: https://raw.githubusercontent.com/christophM/rulefit/master/rulefit/rulefit.py
Linear model of tree-based decision rules
This method implement the RuleFit algorithm
The module structure is the following:
- ``RuleCondition`` implements a binary feature transformation
- ``Rule`` implements a Rule composed of ``RuleConditions``
- ``RuleEnsemble`` implements an ensemble of ``Rules``
- ``RuleFit`` implements the RuleFit algorithm
"""
import pandas as pd
import numpy as np
from sklearn.base import BaseEstimator
from sklearn.base import TransformerMixin
from sklearn.ensemble import GradientBoostingRegressor, GradientBoostingClassifier, RandomForestRegressor, \
RandomForestClassifier
from sklearn.linear_model import LassoCV, LogisticRegressionCV
from functools import reduce
[docs]class RuleCondition:
"""Class for binary rule condition
Warning: this class should not be used directly.
"""
def __init__(self,
feature_index,
threshold,
operator,
support,
feature_name=None):
self.feature_index = feature_index
self.threshold = threshold
self.operator = operator
self.support = support
self.feature_name = feature_name
def __repr__(self):
return self.__str__()
def __str__(self):
if self.feature_name:
feature = self.feature_name
else:
feature = self.feature_index
return "%s %s %s" % (feature, self.operator, self.threshold)
def __eq__(self, other):
return self.__hash__() == other.__hash__()
def __hash__(self):
return hash((self.feature_index, self.threshold, self.operator, self.feature_name))
[docs]class Winsorizer:
"""Performs Winsorization 1->1*
Warning: this class should not be used directly.
"""
def __init__(self, trim_quantile=0.0):
self.trim_quantile = trim_quantile
self.winsor_lims = None
[docs] def train(self, X):
# get winsor limits
self.winsor_lims = np.ones([2, X.shape[1]]) * np.inf
self.winsor_lims[0, :] = -np.inf
if self.trim_quantile > 0:
for i_col in np.arange(X.shape[1]):
lower = np.percentile(X[:, i_col], self.trim_quantile * 100)
upper = np.percentile(X[:, i_col], 100 - self.trim_quantile * 100)
self.winsor_lims[:, i_col] = [lower, upper]
[docs] def trim(self, X):
X_ = np.where(X > self.winsor_lims[1, :], np.tile(self.winsor_lims[1, :], [X.shape[0], 1]),
np.where(X < self.winsor_lims[0, :], np.tile(self.winsor_lims[0, :], [X.shape[0], 1]), X))
return X_
[docs]class FriedScale:
"""Performs scaling of linear variables according to Friedman et al. 2005 Sec 5
Each variable is first Winsorized l->l*, then standardised as 0.4 x l* / std(l*)
Warning: this class should not be used directly.
"""
def __init__(self, winsorizer=None):
self.scale_multipliers = None
self.winsorizer = winsorizer
[docs] def train(self, X):
# get multipliers
if self.winsorizer is not None:
X_trimmed = self.winsorizer.trim(X)
else:
X_trimmed = X
scale_multipliers = np.ones(X.shape[1])
for i_col in np.arange(X.shape[1]):
num_uniq_vals = len(np.unique(X[:, i_col]))
if num_uniq_vals > 2: # don't scale binary variables which are effectively already rules
scale_multipliers[i_col] = 0.4 / (1.0e-12 + np.std(X_trimmed[:, i_col]))
self.scale_multipliers = scale_multipliers
[docs] def scale(self, X):
if self.winsorizer is not None:
return self.winsorizer.trim(X) * self.scale_multipliers
else:
return X * self.scale_multipliers
[docs]class Rule:
"""Class for binary Rules from list of conditions
Warning: this class should not be used directly.
"""
def __init__(self,
rule_conditions, prediction_value):
self.conditions = set(rule_conditions)
self.support = min([x.support for x in rule_conditions])
self.prediction_value = prediction_value
self.rule_direction = None
def __str__(self):
return " & ".join([x.__str__() for x in self.conditions])
def __repr__(self):
return self.__str__()
def __hash__(self):
return sum([condition.__hash__() for condition in self.conditions])
def __eq__(self, other):
return self.__hash__() == other.__hash__()
[docs]class RuleEnsemble:
"""Ensemble of binary decision rules
This class implements an ensemble of decision rules that extracts rules from
an ensemble of decision trees.
Parameters
----------
tree_list: List or array of DecisionTreeClassifier or DecisionTreeRegressor
Trees from which the rules are created
feature_names: List of strings, optional (default=None)
Names of the features
Attributes
----------
rules: List of Rule
The ensemble of rules extracted from the trees
"""
def __init__(self,
tree_list,
feature_names=None):
self.tree_list = tree_list
self.feature_names = feature_names
self.rules = set()
# TODO: Move this out of __init__
self._extract_rules()
self.rules = list(self.rules)
def _extract_rules(self):
"""Recursively extract rules from each tree in the ensemble
"""
for tree in self.tree_list:
rules = extract_rules_from_tree(tree[0].tree_, feature_names=self.feature_names)
self.rules.update(rules)
[docs] def filter_rules(self, func):
self.rules = filter(lambda x: func(x), self.rules)
[docs] def filter_short_rules(self, k):
self.filter_rules(lambda x: len(x.conditions) > k)
def __str__(self):
return (map(lambda x: x.__str__(), self.rules)).__str__()
[docs]class RuleFit(BaseEstimator, TransformerMixin):
"""Rulefit class
Parameters
----------
tree_size: Number of terminal nodes in generated trees. If exp_rand_tree_size=True,
this will be the mean number of terminal nodes.
sample_fract: fraction of randomly chosen training observations used to produce each tree.
FP 2004 (Sec. 2)
max_rules: approximate total number of rules generated for fitting. Note that actual
number of rules will usually be lower than this due to duplicates.
memory_par: scale multiplier (shrinkage factor) applied to each new tree when
sequentially induced. FP 2004 (Sec. 2)
rfmode: 'regress' for regression or 'classify' for binary classification.
lin_standardise: If True, the linear terms will be standardised as per Friedman Sec 3.2
by multiplying the winsorised variable by 0.4/stdev.
lin_trim_quantile: If lin_standardise is True, this quantile will be used to trim linear
terms before standardisation.
exp_rand_tree_size: If True, each boosted tree will have a different maximum number of
terminal nodes based on an exponential distribution about tree_size.
(Friedman Sec 3.3)
model_type: 'r': rules only; 'l': linear terms only; 'rl': both rules and linear terms
random_state: Integer to initialise random objects and provide repeatability.
tree_generator: Optional: this object will be used as provided to generate the rules.
This will override almost all the other properties above.
Must be GradientBoostingRegressor or GradientBoostingClassifier, optional (default=None)
tol: The tolerance for the optimization for LassoCV or LogisticRegressionCV:
if the updates are smaller than `tol`, the optimization code checks the dual
gap for optimality and continues until it is smaller than `tol`.
max_iter: The maximum number of iterations for LassoCV or LogisticRegressionCV.
n_jobs: Number of CPUs to use during the cross validation in LassoCV or
LogisticRegressionCV. None means 1 unless in a joblib.parallel_backend
context. -1 means using all processors.
Attributes
----------
rule_ensemble: RuleEnsemble
The rule ensemble
feature_names: list of strings, optional (default=None)
The names of the features (columns)
"""
def __init__(
self,
tree_size=4,
sample_fract='default',
max_rules=2000,
memory_par=0.01,
tree_generator=None,
rfmode='regress',
lin_trim_quantile=0.025,
lin_standardise=True,
exp_rand_tree_size=True,
model_type='rl',
Cs=None,
cv=3,
tol=0.0001,
max_iter=None,
n_jobs=None,
random_state=None):
self.rfmode = rfmode
self.lin_trim_quantile = lin_trim_quantile
self.lin_standardise = lin_standardise
self.winsorizer = Winsorizer(trim_quantile=lin_trim_quantile)
self.friedscale = FriedScale(self.winsorizer)
self.stddev = None
self.mean = None
self.exp_rand_tree_size = exp_rand_tree_size
self.max_rules = max_rules
self.sample_fract = sample_fract
self.max_rules = max_rules
self.memory_par = memory_par
if tree_generator is None \
or type(tree_generator) in [GradientBoostingRegressor, RandomForestRegressor]\
or type(tree_generator) in [GradientBoostingClassifier, RandomForestClassifier]:
self.tree_generator = tree_generator
else:
print("RuleFit only work with 4 types of tree generator as follows, GradientBoostingRegressor,\
RandomForestRegressor, GradientBoostingClassifier, RandomForestClassifier")
raise TypeError
self.tree_size = tree_size
self.random_state = random_state
self.model_type = model_type
self.cv = cv
self.tol = tol
# LassoCV default max_iter is 1000 while LogisticRegressionCV 100.
if max_iter is not None:
self.max_iter = max_iter
else:
if rfmode == 'regress':
self.max_iter = 100
else:
self.max_iter = 1000
self.n_jobs = n_jobs
self.Cs = Cs
[docs] def fit(self, X, y=None, feature_names=None):
"""Fit and estimate linear combination of rule ensemble
"""
## Enumerate features if feature names not provided
N = X.shape[0]
if feature_names is None:
self.feature_names = ['feature_' + str(x) for x in range(0, X.shape[1])]
else:
self.feature_names = feature_names
if 'r' in self.model_type:
## initialise tree generator
if self.tree_generator is None:
n_estimators_default = int(np.ceil(self.max_rules / self.tree_size))
self.sample_fract_ = min(0.5, (100 + 6 * np.sqrt(N)) / N)
if self.rfmode == 'regress':
self.tree_generator = GradientBoostingRegressor(n_estimators=n_estimators_default,
max_leaf_nodes=self.tree_size,
learning_rate=self.memory_par,
subsample=self.sample_fract_,
random_state=self.random_state, max_depth=100)
else:
self.tree_generator = GradientBoostingClassifier(n_estimators=n_estimators_default,
max_leaf_nodes=self.tree_size,
learning_rate=self.memory_par,
subsample=self.sample_fract_,
random_state=self.random_state, max_depth=100)
# fit tree generator
if not self.exp_rand_tree_size: # simply fit with constant tree size
self.tree_generator.fit(X, y)
else: # randomise tree size as per Friedman 2005 Sec 3.3
np.random.seed(self.random_state)
tree_sizes = np.random.exponential(scale=self.tree_size - 2,
size=int(np.ceil(self.max_rules * 2 / self.tree_size)))
tree_sizes = np.asarray([2 + np.floor(tree_sizes[i_]) for i_ in np.arange(len(tree_sizes))], dtype=int)
i = int(len(tree_sizes) / 4)
while np.sum(tree_sizes[0:i]) < self.max_rules:
i = i + 1
tree_sizes = tree_sizes[0:i]
self.tree_generator.set_params(warm_start=True)
curr_est_ = 0
for i_size in np.arange(len(tree_sizes)):
size = tree_sizes[i_size]
self.tree_generator.set_params(n_estimators=curr_est_ + 1)
self.tree_generator.set_params(max_leaf_nodes=size)
random_state_add = self.random_state if self.random_state else 0
# warm_state=True seems to reset random_state, such that the trees are highly correlated,
# unless we manually change the random_sate here.
self.tree_generator.set_params(
random_state=i_size + random_state_add)
self.tree_generator.get_params()['n_estimators']
self.tree_generator.fit(np.copy(X, order='C'), np.copy(y, order='C'))
curr_est_ = curr_est_ + 1
self.tree_generator.set_params(warm_start=False)
tree_list = self.tree_generator.estimators_
if isinstance(self.tree_generator, RandomForestRegressor) or isinstance(self.tree_generator,
RandomForestClassifier):
tree_list = [[x] for x in self.tree_generator.estimators_]
# extract rules
self.rule_ensemble = RuleEnsemble(tree_list=tree_list,
feature_names=self.feature_names)
# concatenate original features and rules
X_rules = self.rule_ensemble.transform(X)
# standardise linear variables if requested (for regression model only)
if 'l' in self.model_type:
# standard deviation and mean of winsorized features
self.winsorizer.train(X)
winsorized_X = self.winsorizer.trim(X)
self.stddev = np.std(winsorized_X, axis=0)
self.mean = np.mean(winsorized_X, axis=0)
if self.lin_standardise:
self.friedscale.train(X)
X_regn = self.friedscale.scale(X)
else:
X_regn = X.copy()
# Compile Training data
X_concat = np.zeros([X.shape[0], 0])
if 'l' in self.model_type:
X_concat = np.concatenate((X_concat, X_regn), axis=1)
if 'r' in self.model_type:
if X_rules.shape[0] > 0:
X_concat = np.concatenate((X_concat, X_rules), axis=1)
# fit Lasso
if self.rfmode == 'regress':
if self.Cs is None: # use defaultshasattr(self.Cs, "__len__"):
n_alphas = 100
alphas = None
elif hasattr(self.Cs, "__len__"):
n_alphas = None
alphas = 1. / self.Cs
else:
n_alphas = self.Cs
alphas = None
self.lscv = LassoCV(
n_alphas=n_alphas, alphas=alphas, cv=self.cv,
max_iter=self.max_iter, tol=self.tol,
n_jobs=self.n_jobs,
random_state=self.random_state)
self.lscv.fit(X_concat, y)
self.coef_ = self.lscv.coef_
self.intercept_ = self.lscv.intercept_
else:
Cs = 10 if self.Cs is None else self.Cs
self.lscv = LogisticRegressionCV(
Cs=Cs, cv=self.cv, penalty='l2', max_iter=self.max_iter,
tol=self.tol, n_jobs=self.n_jobs,
random_state=self.random_state, solver='lbfgs')
self.lscv.fit(X_concat, y)
self.coef_ = self.lscv.coef_[0]
self.intercept_ = self.lscv.intercept_[0]
return self
[docs] def predict(self, X):
"""Predict outcome for X
"""
X_concat = np.zeros([X.shape[0], 0])
if 'l' in self.model_type:
if self.lin_standardise:
X_concat = np.concatenate((X_concat, self.friedscale.scale(X)), axis=1)
else:
X_concat = np.concatenate((X_concat, X), axis=1)
if 'r' in self.model_type:
rule_coefs = self.coef_[-len(self.rule_ensemble.rules):]
if len(rule_coefs) > 0:
X_rules = self.rule_ensemble.transform(X, coefs=rule_coefs)
if X_rules.shape[0] > 0:
X_concat = np.concatenate((X_concat, X_rules), axis=1)
return self.lscv.predict(X_concat)
[docs] def predict_proba(self, X):
"""Predict outcome probability for X, if model type supports probability prediction method
"""
if 'predict_proba' not in dir(self.lscv):
error_message = '''
Probability prediction using predict_proba not available for
model type {lscv}
'''.format(lscv=self.lscv)
raise ValueError(error_message)
X_concat = np.zeros([X.shape[0], 0])
if 'l' in self.model_type:
if self.lin_standardise:
X_concat = np.concatenate((X_concat, self.friedscale.scale(X)), axis=1)
else:
X_concat = np.concatenate((X_concat, X), axis=1)
if 'r' in self.model_type:
rule_coefs = self.coef_[-len(self.rule_ensemble.rules):]
if len(rule_coefs) > 0:
X_rules = self.rule_ensemble.transform(X, coefs=rule_coefs)
if X_rules.shape[0] > 0:
X_concat = np.concatenate((X_concat, X_rules), axis=1)
return self.lscv.predict_proba(X_concat)
[docs] def get_rules(self, exclude_zero_coef=False, subregion=None):
"""Return the estimated rules
Parameters
----------
exclude_zero_coef: If True (default), returns only the rules with an estimated
coefficient not equalt to zero.
subregion: If None (default) returns global importances (FP 2004 eq. 28/29), else returns importance over
subregion of inputs (FP 2004 eq. 30/31/32).
Returns
-------
rules: pandas.DataFrame with the rules. Column 'rule' describes the rule, 'coef' holds
the coefficients and 'support' the support of the rule in the training
data set (X)
"""
n_features = len(self.coef_) - len(self.rule_ensemble.rules)
rule_ensemble = list(self.rule_ensemble.rules)
output_rules = []
## Add coefficients for linear effects
for i in range(0, n_features):
if self.lin_standardise:
coef = self.coef_[i] * self.friedscale.scale_multipliers[i]
else:
coef = self.coef_[i]
if subregion is None:
importance = abs(coef) * self.stddev[i]
else:
subregion = np.array(subregion)
importance = sum(abs(coef) * abs([x[i] for x in self.winsorizer.trim(subregion)] - self.mean[i])) / len(
subregion)
output_rules += [(self.feature_names[i], 'linear', coef, 1, importance)]
## Add rules
for i in range(0, len(self.rule_ensemble.rules)):
rule = rule_ensemble[i]
coef = self.coef_[i + n_features]
if subregion is None:
importance = abs(coef) * (rule.support * (1 - rule.support)) ** (1 / 2)
else:
rkx = rule.transform(subregion)
importance = sum(abs(coef) * abs(rkx - rule.support)) / len(subregion)
output_rules += [(rule.__str__(), 'rule', coef, rule.support, importance)]
rules = pd.DataFrame(output_rules, columns=["rule", "type", "coef", "support", "importance"])
if exclude_zero_coef:
rules = rules.loc[rules.coef != 0]
return rules
[docs] def get_feature_importance(self, exclude_zero_coef=False, subregion=None, scaled=False):
"""Returns feature importance for input features to RuleFit model.
Parameters:
-----------
exclude_zero_coef: If True, returns only the rules with an estimated
coefficient not equalt to zero.
subregion: If None (default) returns global importances (FP 2004 eq. 28/29), else returns importance over
subregion of inputs (FP 2004 eq. 30/31/32).
scaled: If True, will scale the importances to have a max of 100.
Returns:
--------
return_df (pandas DataFrame): DataFrame for feature names and feature importances (FP 2004 eq. 35)
"""
def find_mk(rule: str):
"""Finds the number of features in a given rule from the get_rules method.
Parameters:
-----------
rule (str):
Returns:
--------
var_count (int):
"""
## Count the number of features found in a rule
feature_count = 0
for feature in self.feature_names:
if feature in rule:
feature_count += 1
return (feature_count)
feature_set = self.feature_names
rules = self.get_rules(exclude_zero_coef, subregion)
# Return an array of counts for features found in rules
features_in_rule = rules.rule.apply(lambda x: find_mk(x))
feature_imp = list()
for feature in feature_set:
# Rules where feature is found
feature_rk = rules.rule.apply(lambda x: feature in x)
# Linear importance array for feature
linear_imp = rules[(rules.type == 'linear') & (rules.rule == feature)].importance.values
# Rule importance array
rule_imp = rules[rules.type != 'linear'].importance[feature_rk]
# Total count of features in each rule feature is found
mk_array = features_in_rule[feature_rk]
feature_imp.append(float(linear_imp + (rule_imp / mk_array).sum())) # (FP 2004 eq. 35)
# Scaled output
if scaled:
feature_imp = 100 * (feature_imp / np.array(feature_imp).max())
return_df = pd.DataFrame({'feature': self.feature_names, 'importance': feature_imp})
return (return_df)