Wrapping Multi-Modal Multiple Kernel Learning in a Python Pipeline

Want to do Multiple Kernel Learning (MKL) in python but not entirely sure how to integrate it with sklearn functionality, such as hyperparameter search? Sounds familiar!

All the hard work is already done and provided in MKLpy. Therefore, we only need to create a few wrappers to bridge to sklearn functionality.

Baseline

Let’s create a simple baseline to outline the functionality I’ll address and to verify the results. This runs on a small subset of an sklearn dataset and creates a Pipeline to wrap our transformer and estimator. We then do cross-validation in combination with grid-search to score our considered hyperparameters.

import numpy as np

from sklearn.model_selection import train_test_split, GridSearchCV, StratifiedKFold
from imblearn.pipeline import Pipeline

from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
from sklearn.datasets import load_breast_cancer
from sklearn.metrics import balanced_accuracy_score, roc_auc_score

X, Y = load_breast_cancer(return_X_y=True)
X = X[:100]
Y = Y[:100]
Xtr, Xte, Ytr, Yte = train_test_split(X, Y, test_size=0.3)

pipe_n_grid = (
        Pipeline([
            ("scale", StandardScaler()),
            ("model_LR(c)", LogisticRegression(max_iter=100))
        ]),
        {"model_LR(c)__C" : [100, 1.0, 0.01]}, 
    )
pipe, grid = pipe_n_grid

cv = StratifiedKFold(3, shuffle=True, random_state=1)      
cv_splits = cv.split(Xtr, Ytr)

# grid search for hyperparameter tuning with the inner cv         
gs = GridSearchCV(estimator=pipe, param_grid=grid, n_jobs=1,
                  cv=cv_splits,
                  return_train_score=True, refit=True, verbose=True)
gs = gs.fit(Xtr, Ytr)

estimator = gs.best_estimator_

train_score = np.mean(gs.cv_results_["mean_train_score"])
inner_cv_score = gs.best_score_ # mean cross-validated score of the best_estimator
val_score = gs.score(Xte, Yte)

print("Best parameters found:", gs.best_params_)
print(train_score, inner_cv_score, val_score)

# OUTPUT:
# Best parameters found: {'model_LR(c)__C': 1.0}
# 0.9809846849624834 0.9571256038647343 0.9333333333333333

The issue

When doing MKL we’d like to compute our kernels based on the training data and subsequently apply them to the test data to avoid information leakage. This is the same concept as preprocessing steps like StandardScaler use. Classes from sklearn such as GridSearchCV rely on a few methods to make this work flexibly. All we have to do is wrap the kernel computation with the .fit and .transform functionality which scalers have.

A solution

Here I provide a starter wrapper which can use MKLpy’s linear_kernel, homogeneous_polynomial_kernel, and rbf_kernel. It supports multi-modal modeling, if len(N)>1: this list should contain the amount of features per modality.

from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.metrics.pairwise import rbf_kernel

from MKLpy.preprocessing import normalization
from MKLpy.metrics.pairwise.vector import linear_kernel
from MKLpy.metrics.pairwise import homogeneous_polynomial_kernel as hpk
from MKLpy.generators import HPK_generator, RBF_generator

class KernelTransformer(BaseEstimator, TransformerMixin):
    """
    Multi-modal sklearn-transformer-style wrapper to handle MKLpy-based kernel computation.
    Can be used as a transformer (such as StandardScaler) inside a pipeline.
    input:
        N: list: contains number of features per data modality. len=1 for unimodal modeling.
        kernel_types: list: kernels to be used for each modality. len=1 for unimodal modeling.
        norm_data: bool: whether to normalize the data using MKLpy's normalization.
        degrees: list: hyperparameters for the hpk kernel
        gamma: list: hyperparameters for the rbf kernel
    """
        
    def __init__(self, N: list, kernel_types: list, norm_data: bool=True, degrees: list=[1,2,3], gamma: list=[.001, .01, .1]) -> None:
        # Initialize the degree of the polynomial kernel
        self.reference_data_ = None
        self.N = N
        self.kernel_types = kernel_types
        self.norm_data = norm_data
        if not isinstance(degrees, list):
            self.degrees = [degrees]
        else:
            self.degrees = degrees
        if not isinstance(gamma, list):
            self.gamma = [gamma]
        else:
            self.gamma = gamma
        assert len(self.N) == len(self.kernel_types)
        
    def fit(self, X, y=None):
        # Compute the kernel matrix for the training data per modality
        assert np.sum(self.N) == X.shape[1]
        self.K_train_ = []
        self.reference_data_ = self.normalize_data_(X) if self.norm_data else X
        
        sum = 0
        for i, n in enumerate(self.N):
            self.K_train_.extend(self.compute_kernel_(self.reference_data_[:, sum:sum+n],
                                k_type=self.kernel_types[i]))
            sum += n

        return self

    def transform(self, X):
        # Apply the kernel transformation to input data
        assert np.sum(self.N) == X.shape[1]
        if self.reference_data_ is None:
            raise ValueError("Transformer has not been fitted")
        
        # If X is the training data, return the precomputed kernel matrix
        normalized_data = self.normalize_data_(X) if self.norm_data else X
        if normalized_data is self.reference_data_:
            return self.K_train_

        K = []
        sum = 0
        for i, n in enumerate(self.N):
            K.extend(self.compute_kernel_(normalized_data[:, sum:sum+n],
                                          self.reference_data_[:, sum:sum+n], 
                                        k_type=self.kernel_types[i]))
            sum += n

        return K
            
    def compute_kernel_(self, X, y=None, k_type="linear"):
        if k_type == "hpk":
            if isinstance(self.degrees, float) or isinstance(self.degrees, int):
                return [hpk(X, y, degree=self.degrees)]
            else:
                return HPK_generator(X, y, degrees=self.degrees)
        elif k_type == "rbf":
            if isinstance(self.gamma, float):
                return [rbf_kernel(X, y, gamma=self.gamma)]
            else:
                return RBF_generator(X, y, gamma=self.gamma)
        elif k_type == "linear":
            return [linear_kernel(X, y)] 
        else: 
            raise NotImplementedError("Kernel type not implemented.")

    def normalize_data_(self, X):
        normalized_data = np.empty_like(X)
        sum = 0
        for n in self.N:
            normalized_data[:, sum:sum+n] = normalization(X[:, sum:sum+n])
            sum += n
        return normalized_data

Usage

This class can be used inside a Pipeline. Let’s pretend we’re actually using multi-modal data for the moment, see N=[20, 10]: our first 20 features correspond to modality 1, while the next 10 features correspond to modality 2. We’ll fit seperate kernels for each modality.

kernel_types=["rbf", "linear"]
pipeline = Pipeline([
    ('kernel_transform', KernelTransformer(
       N=[20, 10], norm_data=False, kernel_types=kernel_types)),
    ('AvgMKL', AverageMKL(learner=SVC(C=1.))) 
])

However, if we want to grid-search also for the classifier (SVC), we’ll need to extend the functionality of the fitting algorithms. Specifically, out-of-the-box they do not support setting nested hyperparameters. This makes any parameters associated with the ‘learner’ inaccesible (such as the regularization parameter C). Here is the implementation for AverageMKL and EasyMKL, of which the latter learns (non-uniform) weights for the kernels:

class AverageMKLWrapper(AverageMKL):
    def set_params(self, **parameters):
        for parameter, value in parameters.items():
            if '__' in parameter:
                # Nested parameter
                param_list = parameter.split('__')
                target = self
                for p in param_list[:-1]:
                    target = getattr(target, p)
                setattr(target, param_list[-1], value)
            else:
                # Top-level parameter
                setattr(self, parameter, value)
        return self

    def get_params(self, deep=True):
        params = super().get_params(deep=deep)
        if deep and hasattr(self, 'learner'):
            for key, value in self.learner.get_params(deep=deep).items():
                params[f'learner__{key}'] = value
        return params

class EasyMKLWrapper(BaseEstimator, ClassifierMixin):
    def __init__(self, *args, **kwargs):
        self.model = EasyMKL(*args, **kwargs)

    def fit(self, X, y):
        return self.model.fit(X, y)

    def predict(self, X):
        return self.model.predict(X)

    def score(self, X, y):
        from sklearn.metrics import balanced_accuracy_score
        return balanced_accuracy_score(y, self.predict(X))

    def set_params(self, **params):
        for param, value in params.items():
            if '__' in param:
                # Nested parameter
                param_list = param.split('__')
                target = self.model
                for p in param_list[:-1]:
                    target = getattr(target, p)
                setattr(target, param_list[-1], value)
            else:
                # Top-level parameter
                setattr(self.model, param, value)
        return self

    def get_params(self, deep=True):
        params = {}
        if deep:
            for key, value in self.model.get_params(deep=True).items():
                params[key] = value
                if hasattr(value, 'get_params'):
                    for sub_key, sub_value in value.get_params(deep=True).items():
                        params[f'{key}__{sub_key}'] = sub_value
        else:
            params = self.model.get_params(deep=False)
        return params
    
    def get_weights(self):
        return self.model.solution.weights.numpy()

As you can see, EasyMKL requires a bit extra functionality and I’ve set-up to work for binary classification (i.e. my current use-case). Now, we can use this wrapper to grid-search properly:

cv = StratifiedKFold(3, shuffle=True, random_state=1)      
cv_splits = cv.split(Xtr, Ytr)

kernel_types=["rbf", "linear"]
pipeline = Pipeline([
    ("scale", StandardScaler()),
    ('kernel_transform', KernelTransformer(
       N=[20, 10], norm_data=False, kernel_types=kernel_types)),
    ('AvgMKL', AverageMKLWrapper(learner=SVC())) 
])
grid = {"AvgMKL__learner__C": [0.01, 1., 100.]}

gs = GridSearchCV(estimator=pipeline, param_grid=grid, n_jobs=1,
                  cv=cv_splits, return_train_score=True, refit=True, verbose=True)

gs = gs.fit(Xtr, Ytr)
estimator = gs.best_estimator_
train_score = np.mean(gs.cv_results_["mean_train_score"])
inner_cv_score = gs.best_score_ 
val_score = gs.score(Xte, Yte)

print("Best parameters found:", gs.best_params_)
print("scores using kernels:", kernel_types, train_score, inner_cv_score, val_score)

# OUTPUT
# Best parameters found: {'AvgMKL__learner__C': 1.0}
# scores using kernels: ['rbf', 'linear'] 0.9904923424812416 0.9426328502415459 0.9

Note that we use AvgMKL from MKLpy here to fuse our kernels. To be precise, the first modality will have three kernels associated with it, with $\gamma=[0.001, 0.01, 0.1]$. We can also grid-search over the per-modality kernels and only fuse a single kernel per modality:

# search over the gamma-parameter
grid = { 
    "kernel_transform__gamma": [0.001, 0.01, 0.1],
    "AvgMKL__learner__C": [0.01, 1., 100.]}
# ...
print("Best parameters found:", gs.best_params_)
# OUTPUT: 
# Best parameters found: {'AvgMKL__learner__C': 1.0, 'kernel_transform__gamma': 0.001}

EasyMKL and Kernel weights

Besides AvgMKL I’ve only used EasyMKL, which as stated learns to weight the kernels. Let’s grid-search for a good kernel per-modality and fuse single per-modality kernels. Note that EasyMKL’s default learner uses relies on the regularization hyperparamater $\lambda$:

kernel_types=["rbf", "hpk"]
pipeline = Pipeline([
     ("scale", StandardScaler()),
    ('kernel_transform', KernelTransformer(
       N=[20, 10], norm_data=False, kernel_types=kernel_types)),
    ('EasyMKL', EasyMKLWrapper()) 
])
grid = {
    "kernel_transform__degrees": [1, 2, 3],
    "kernel_transform__gamma": [0.001, 0.01, 0.1],
    "EasyMKL__learner__lam": [0.01, 0.1, 0.5]}

gs = GridSearchCV(estimator=pipeline, param_grid=grid, n_jobs=1,
                cv=cv_splits, return_train_score=True, refit=True, verbose=True)
gs = gs.fit(Xtr, Ytr)

estimator = gs.best_estimator_

train_score = np.mean(gs.cv_results_["mean_train_score"])
inner_cv_score = gs.best_score_ 
val_score = gs.score(Xte, Yte)

print("Best parameters found:", gs.best_params_)
print("scores using kernels:", kernel_types, train_score, inner_cv_score, val_score)
# OUTPUT
# Best parameters found: {'EasyMKL__learner__lam': 0.5, 'kernel_transform__degrees': 1, 'kernel_transform__gamma': 0.1}
# scores using kernels: ['rbf', 'hpk'] 0.988011615885179 0.9584656084656085 0.8920454545454546

The provided wrapper function makes the kernel-weights (which can be interpreted as modality-weights in our toy example) available as such:

estimator[-1].get_weights()
# OUTPUT
# [0.223, 0.777]

|