Source code for cornac.models.tifuknn.recom_tifuknn

# Copyright 2023 The Cornac Authors. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================

import warnings
from time import time

import numpy as np
from tqdm import tqdm

from ..recommender import NextBasketRecommender

[docs] class TIFUKNN(NextBasketRecommender): """Temporal-Item-Frequency-based User-KNN (TIFUKNN) Parameters ---------- name: string, default: 'TIFUKNN' The name of the recommender model. n_neighbors: int, optional, default: 300 The number of neighbors for KNN within_decay_rate: float, optional, default: 0.9 Within-basket time-decayed ratio in range [0, 1] group_decay_rate: float, optional, default: 0.7 Group time-decayed ratio in range [0, 1] alpha: float, optional, default: 0.7 The trade-off between current user vector and neighbors vectors to compute final item scores n_groups: int, optional, default: 7 The historal baskets will be partition into `n_groups` equally. verbose: boolean, optional, default: False When True, running logs are displayed. References ---------- Haoji Hu, Xiangnan He, Jinyang Gao, and Zhi-Li Zhang. 2020. Modeling Personalized Item Frequency Information for Next-basket Recommendation. In Proceedings of the 43rd International ACM SIGIR Conference on Research and Development in Information Retrieval (SIGIR '20). Association for Computing Machinery, New York, NY, USA, 1071–1080. """ def __init__( self, name="TIFUKNN", n_neighbors=300, within_decay_rate=0.9, group_decay_rate=0.7, alpha=0.7, n_groups=7, verbose=False, ): super().__init__(name=name, trainable=False, verbose=verbose) assert within_decay_rate >= 0 and within_decay_rate <= 1 assert group_decay_rate >= 0 and group_decay_rate <= 1 self.n_neighbors = n_neighbors self.within_decay_rate = within_decay_rate self.group_decay_rate = group_decay_rate self.alpha = alpha self.n_groups = n_groups
[docs] def fit(self, train_set, val_set=None): from scipy.spatial import KDTree super().fit(train_set=train_set, val_set=val_set) self.user_vectors = self._get_user_vectors(self.train_set) if self.n_neighbors > len(self.user_vectors): warnings.warn("Number of users is %d, smaller than number of neighbors %d" % (len(self.user_vectors), self.n_neighbors)) self.n_neighbors = len(self.user_vectors) start_time = time() if self.verbose: print("Constructing kd-tree for quick nearest-neighbor lookup") self.tree = KDTree(self.user_vectors) if self.verbose: print("Constructing kd-tree for quick nearest-neighbor lookup takes %.0f" % (time() - start_time)) return self
def _get_user_vectors(self, data_set): user_vectors = [] for _, _, [basket_items] in tqdm( data_set.ubi_iter(batch_size=1, shuffle=False), desc="Getting user vectors", total=data_set.num_users, ): user_vectors.append(self._compute_user_vector(basket_items[:-1])) user_vectors = np.asarray(user_vectors, dtype="float32") return user_vectors def _compute_user_vector(self, history_baskets): his_list = [] n_baskets = len(history_baskets) for inc, iids in enumerate(history_baskets): his_vec = np.zeros(self.total_items, dtype="float32") decayed_val = np.power(self.within_decay_rate, n_baskets - inc - 1) for iid in iids: his_vec[iid] = decayed_val his_list.append(his_vec) grouped_list, real_n_groups = self._group_history_list(his_list, self.n_groups) his_vec = np.zeros(self.total_items, dtype="float32") if real_n_groups == 0: return his_vec for idx in range(real_n_groups): decayed_val = np.power(self.group_decay_rate, self.n_groups - idx - 1) his_vec += grouped_list[idx] * decayed_val return his_vec / real_n_groups def _group_history_list(self, his_list, n_groups): grouped_vec_list = [] if len(his_list) < n_groups: for j in range(len(his_list)): grouped_vec_list.append(his_list[j]) return grouped_vec_list, len(his_list) else: est_num_vec_each_block = len(his_list) / n_groups base_num_vec_each_block = int(np.floor(len(his_list) / n_groups)) residual = est_num_vec_each_block - base_num_vec_each_block num_vec_has_extra_vec = int(np.round(residual * n_groups)) if residual == 0: for i in range(n_groups): sum = np.zeros(len(his_list[0])) for j in range(base_num_vec_each_block): sum += his_list[i * base_num_vec_each_block + j] grouped_vec_list.append(sum / base_num_vec_each_block) else: for i in range(n_groups - num_vec_has_extra_vec): sum = np.zeros(len(his_list[0])) for j in range(base_num_vec_each_block): sum += his_list[i * base_num_vec_each_block + j] last_idx = i * base_num_vec_each_block + j grouped_vec_list.append(sum / base_num_vec_each_block) est_num = int(np.ceil(est_num_vec_each_block)) start_group_idx = n_groups - num_vec_has_extra_vec if len(his_list) - start_group_idx * base_num_vec_each_block >= est_num_vec_each_block: for i in range(start_group_idx, n_groups): sum = np.zeros(len(his_list[0])) for j in range(est_num): iidxx = last_idx + 1 + (i - start_group_idx) * est_num + j sum += his_list[iidxx] grouped_vec_list.append(sum / est_num) return grouped_vec_list, n_groups
[docs] def score(self, user_idx, history_baskets, **kwargs): if len(history_baskets) == 0: return np.zeros(self.total_items, dtype="float32") user_vector = self._compute_user_vector(history_baskets) _, indices = self.tree.query([user_vector], k=self.n_neighbors) return self.alpha * user_vector + (1 - self.alpha) * np.mean(self.user_vectors[indices.squeeze()])