Source code for cornac.metrics.ranking

# Copyright 2018 The Cornac Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================

import numpy as np
from scipy.stats import rankdata


class RankingMetric:
    """Ranking Metric.

    Attributes
    ----------
    type: string, value: 'ranking'
        Type of the metric, e.g., "ranking", "rating".

    name: string, default: None
        Name of the measure.

    k: int or list, optional, default: -1 (all)
        The number of items in the top@k list.
        If None, all items will be considered.

    """

    def __init__(self, name=None, k=-1, higher_better=True):
        assert hasattr(k, "__len__") or k == -1 or k > 0

        self.type = "ranking"
        self.name = name
        self.k = k
        self.higher_better = higher_better

    def compute(self, **kwargs):
        raise NotImplementedError()


[docs] class NDCG(RankingMetric): """Normalized Discount Cumulative Gain. Parameters ---------- k: int or list, optional, default: -1 (all) The number of items in the top@k list. If None, all items will be considered. References ---------- https://en.wikipedia.org/wiki/Discounted_cumulative_gain """ def __init__(self, k=-1): RankingMetric.__init__(self, name="NDCG@{}".format(k), k=k) @staticmethod def dcg_score(gt_pos, pd_rank, k=-1): """Compute Discounted Cumulative Gain score. Parameters ---------- gt_pos: Numpy array Vector of positive items. pd_rank: Numpy array Item ranking prediction. k: int, optional, default: -1 (all) The number of items in the top@k list. If None, all items will be considered. Returns ------- dcg: A scalar Discounted Cumulative Gain score. """ if k > 0: truncated_pd_rank = pd_rank[:k] else: truncated_pd_rank = pd_rank ranked_scores = np.in1d(truncated_pd_rank, gt_pos).astype(int) gain = 2**ranked_scores - 1 discounts = np.log2(np.arange(len(ranked_scores)) + 2) return np.sum(gain / discounts) def compute(self, gt_pos, pd_rank, **kwargs): """Compute Normalized Discounted Cumulative Gain score. Parameters ---------- gt_pos: Numpy array Vector of positive items. pd_rank: Numpy array Item ranking prediction. **kwargs: For compatibility Returns ------- ndcg: A scalar Normalized Discounted Cumulative Gain score. """ dcg = self.dcg_score(gt_pos, pd_rank, self.k) idcg = self.dcg_score(gt_pos, gt_pos, self.k) ndcg = dcg / idcg return ndcg
[docs] class NCRR(RankingMetric): """Normalized Cumulative Reciprocal Rank. Parameters ---------- k: int or list, optional, default: -1 (all) The number of items in the top@k list. If None, all items will be considered. """ def __init__(self, k=-1): RankingMetric.__init__(self, name="NCRR@{}".format(k), k=k) def compute(self, gt_pos, pd_rank, **kwargs): """Compute Normalized Cumulative Reciprocal Rank score. Parameters ---------- gt_pos: Numpy array Vector of positive items. pd_rank: Numpy array Item ranking prediction. **kwargs: For compatibility Returns ------- ncrr: A scalar Normalized Cumulative Reciprocal Rank score. """ if self.k > 0: truncated_pd_rank = pd_rank[: self.k] else: truncated_pd_rank = pd_rank # Compute CRR rec_rank = np.where(np.in1d(truncated_pd_rank, gt_pos))[0] if len(rec_rank) == 0: return 0.0 rec_rank = rec_rank + 1 # +1 because indices starts from 0 in python crr = np.sum(1.0 / rec_rank) # Compute Ideal CRR max_nb_pos = min(len(gt_pos), len(truncated_pd_rank)) ideal_rank = np.arange(max_nb_pos) ideal_rank = ideal_rank + 1 # +1 because indices starts from 0 in python icrr = np.sum(1.0 / ideal_rank) # Compute nDCG ncrr_i = crr / icrr return ncrr_i
[docs] class MRR(RankingMetric): """Mean Reciprocal Rank. References ---------- https://en.wikipedia.org/wiki/Mean_reciprocal_rank """ def __init__(self): RankingMetric.__init__(self, name="MRR") def compute(self, gt_pos, pd_rank, **kwargs): """Compute Mean Reciprocal Rank score. Parameters ---------- gt_pos: Numpy array Vector of positive items. pd_rank: Numpy array Item ranking prediction. **kwargs: For compatibility Returns ------- mrr: A scalar Mean Reciprocal Rank score. """ matched_items = np.nonzero(np.in1d(pd_rank, gt_pos))[0] if len(matched_items) == 0: raise ValueError( "No matched between ground-truth items and recommendations" ) mrr = np.divide( 1, (matched_items[0] + 1) ) # +1 because indices start from 0 in python return mrr
class MeasureAtK(RankingMetric): """Measure at K. Attributes ---------- k: int or list, optional, default: -1 (all) The number of items in the top@k list. If None, all items will be considered. """ def __init__(self, name=None, k=-1): RankingMetric.__init__(self, name, k) def compute(self, gt_pos, pd_rank, **kwargs): """Compute TP, TP+FN, and TP+FP. Parameters ---------- gt_pos: Numpy array Vector of positive items. pd_rank: Numpy array Item ranking prediction. **kwargs: For compatibility Returns ------- tp: A scalar True positive. tp_fn: A scalar True positive + false negative. tp_fp: A scalar True positive + false positive. """ if self.k > 0: truncated_pd_rank = pd_rank[: self.k] else: truncated_pd_rank = pd_rank tp = np.sum(np.in1d(truncated_pd_rank, gt_pos)) tp_fn = len(gt_pos) tp_fp = self.k if self.k > 0 else len(truncated_pd_rank) return tp, tp_fn, tp_fp
[docs] class HitRatio(MeasureAtK): """Hit Ratio. Parameters ---------- k: int, optional, default: -1 (all) The number of items in the top@k list. If None, all items will be considered. """ def __init__(self, k=-1): super().__init__(name="HitRatio@{}".format(k), k=k) def compute(self, gt_pos, pd_rank, **kwargs): """Compute Hit Ratio. Parameters ---------- gt_pos: Numpy array Vector of positive items. pd_rank: Numpy array Item ranking prediction. **kwargs: For compatibility Returns ------- res: A scalar Hit Ratio score (1.0 ground truth item(s) appear in top-k, 0 otherwise). """ tp, *_ = MeasureAtK.compute(self, gt_pos, pd_rank, **kwargs) return 1.0 if tp > 0 else 0.0
[docs] class Precision(MeasureAtK): """Precision@K. Parameters ---------- k: int or list, optional, default: -1 (all) The number of items in the top@k list. If None, all items will be considered. """ def __init__(self, k=-1): super().__init__(name="Precision@{}".format(k), k=k) def compute(self, gt_pos, pd_rank, **kwargs): """Compute Precision score. Parameters ---------- gt_pos: Numpy array Vector of positive items. pd_rank: Numpy array Item ranking prediction. **kwargs: For compatibility Returns ------- res: A scalar Precision score. """ tp, _, tp_fp = MeasureAtK.compute(self, gt_pos, pd_rank, **kwargs) return tp / tp_fp
[docs] class Recall(MeasureAtK): """Recall@K. Parameters ---------- k: int or list, optional, default: -1 (all) The number of items in the top@k list. If None, all items will be considered. """ def __init__(self, k=-1): super().__init__(name="Recall@{}".format(k), k=k) def compute(self, gt_pos, pd_rank, **kwargs): """Compute Recall score. Parameters ---------- gt_pos: Numpy array Vector of positive items. pd_rank: Numpy array Item ranking prediction. **kwargs: For compatibility Returns ------- res: A scalar Recall score. """ tp, tp_fn, _ = MeasureAtK.compute(self, gt_pos, pd_rank, **kwargs) return tp / tp_fn
[docs] class FMeasure(MeasureAtK): """F-measure@K. Parameters ---------- k: int or list, optional, default: -1 (all) The number of items in the top@k list. If None, all items will be considered. """ def __init__(self, k=-1): super().__init__(name="F1@{}".format(k), k=k) def compute(self, gt_pos, pd_rank, **kwargs): """Compute F-Measure. Parameters ---------- gt_pos: Numpy array Vector of positive items. pd_rank: Numpy array Item ranking prediction. **kwargs: For compatibility Returns ------- res: A scalar F-Measure score. """ tp, tp_fn, tp_fp = MeasureAtK.compute(self, gt_pos, pd_rank, **kwargs) prec = tp / tp_fp rec = tp / tp_fn if (prec + rec) > 0: f1 = 2 * (prec * rec) / (prec + rec) else: f1 = 0 return f1
[docs] class AUC(RankingMetric): """Area Under the ROC Curve (AUC). References ---------- https://arxiv.org/ftp/arxiv/papers/1205/1205.2618.pdf """ def __init__(self): RankingMetric.__init__(self, name="AUC") def compute(self, item_indices, pd_scores, gt_pos, gt_neg=None, **kwargs): """Compute Area Under the ROC Curve (AUC). Parameters ---------- item_indices: Numpy array Items being considered for evaluation. pd_scores: Numpy array Prediction scores for items in item_indices. gt_pos: Numpy array Vector of positive items. gt_neg: Numpy array, optional Vector of negative items. If None, negation of gt_pos will be used. **kwargs: For compatibility Returns ------- res: A scalar AUC score. """ gt_pos_mask = np.in1d(item_indices, gt_pos) gt_neg_mask = ( np.logical_not(gt_pos_mask) if gt_neg is None else np.in1d(item_indices, gt_neg) ) pos_scores = pd_scores[gt_pos_mask] neg_scores = pd_scores[gt_neg_mask] ui_scores = np.repeat(pos_scores, len(neg_scores)) uj_scores = np.tile(neg_scores, len(pos_scores)) return (ui_scores > uj_scores).sum() / len(uj_scores)
[docs] class MAP(RankingMetric): """Mean Average Precision (MAP). References ---------- https://en.wikipedia.org/wiki/Evaluation_measures_(information_retrieval)#Mean_average_precision """ def __init__(self): RankingMetric.__init__(self, name="MAP") def compute(self, item_indices, pd_scores, gt_pos, **kwargs): """Compute Average Precision. Parameters ---------- item_indices: Numpy array Items being considered for evaluation. pd_scores: Numpy array Prediction scores for items. gt_pos: Numpy array Vector of positive items. **kwargs: For compatibility Returns ------- res: A scalar AP score. """ relevant = np.in1d(item_indices, gt_pos) rank = rankdata(-pd_scores, "max")[relevant] L = rankdata(-pd_scores[relevant], "max") ans = (L / rank).mean() return ans