# Copyright 2018 The Cornac Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
from typing import List, Tuple
import numpy as np
from cornac.data.dataset import Dataset
from cornac.data import FeatureModality, TextModality, ImageModality
from cornac.metrics.ranking import Precision, Recall
from cornac.models.recommender import Recommender
[docs]
class DMRL(Recommender):
"""
Disentangled multimodal representation learning
Parameters
----------
name: string, default: 'DMRL'
The name of the recommender model.
batch_size: int, optional, default: 32
The number of samples per batch to load.
learning_rate: float, optional, default: 1e-4
The learning rate for the optimizer.
decay_c: float, optional, default: 1
The decay for the disentangled loss term in the loss function.
decay_r: float, optional, default: 0.01
The decay for the regularization term in the loss function.
epochs: int, optional, default: 10
The number of epochs to train the model.
embedding_dim: int, optional, default: 100
The dimension of the embeddings.
bert_text_dim: int, optional, default: 384
The dimension of the bert text embeddings coming from the huggingface transformer model
image_dim: int, optional, default: None
The dimension of the image embeddings.
num_neg: int, optional, default: 4
The number of negative samples to use in the training per user per batch (1 positive and num_neg negatives are used)
num_factors: int, optional, default: 4
The number of factors to use in the model.
trainable: bool, optional, default: True
When False, the model is not trained and Cornac assumes that the model is already trained.
verbose: bool, optional, default: False
When True, the model prints out more information during training.
modalities_pre_built: bool, optional, default: True
When True, the model assumes that the modalities are already built and does not build them.
log_metrics: bool, optional, default: False
When True, the model logs metrics to tensorboard.
References
----------
* Fan Liu, Huilin Chen, Zhiyong Cheng, Anan Liu, Liqiang Nie, Mohan Kankanhalli. DMRL: Disentangled Multimodal Representation Learning for
Recommendation. https://arxiv.org/pdf/2203.05406.pdf.
"""
def __init__(
self,
name: str = "DMRL",
batch_size: int = 32,
learning_rate: float = 1e-4,
decay_c: float = 1,
decay_r: float = 0.01,
epochs: int = 10,
embedding_dim: int = 100,
bert_text_dim: int = 384,
image_dim: int = None,
dropout: float = 0,
num_neg: int = 4,
num_factors: int = 4,
trainable: bool = True,
verbose: bool = False,
log_metrics: bool = False,
):
super().__init__(name=name, trainable=trainable, verbose=verbose)
self.learning_rate = learning_rate
self.decay_c = decay_c
self.decay_r = decay_r
self.batch_size = batch_size
self.epochs = epochs
self.verbose = verbose
self.embedding_dim = embedding_dim
self.text_dim = bert_text_dim
self.image_dim = image_dim
self.dropout = dropout
self.num_neg = num_neg
self.num_factors = num_factors
self.log_metrics = log_metrics
if log_metrics:
from torch.utils.tensorboard import SummaryWriter
self.tb_writer = SummaryWriter("temp/tb_data/run_1")
if self.num_factors == 1:
# deactivate disentangled portion of loss if theres only 1 factor
self.decay_c == 0
[docs]
def fit(self, train_set: Dataset, val_set=None):
"""Fit the model to observations.
Parameters
----------
train_set: :obj:`cornac.data.Dataset`, required
User-Item preference data as well as additional modalities.
val_set: :obj:`cornac.data.Dataset`, optional, default: None
User-Item preference data for model selection purposes (e.g., early stopping).
"""
Recommender.fit(self, train_set, val_set)
if self.trainable:
self._fit_dmrl(train_set, val_set)
return self
[docs]
def get_item_image_embedding(self, batch):
"""
Get the item image embeddings from the image modality. Expect the image
modaility to be preencded and available as a numpy array.
Parameters
----------
param batch: torch.Tensor, user inidices in first column, pos item indices in second
and all other columns are negative item indices
"""
import torch
if not hasattr(self, "item_image"):
return None
shape = batch[:, 1:].shape
all_items = batch[:, 1:].flatten()
item_image_embedding = self.item_image.features[all_items, :].reshape(
(*shape, self.item_image.feature_dim)
)
if not isinstance(item_image_embedding, torch.Tensor):
item_image_embedding = torch.tensor(
item_image_embedding, dtype=torch.float32
)
return item_image_embedding
[docs]
def get_item_text_embeddings(self, batch):
"""
Get the item text embeddings from the BERT model. Either by encoding the
text on the fly or by using the preencoded text.
Parameters
----------
param batch: torch.Tensor, user inidices in first column, pos item indices in second
and all other columns are negative item indices
"""
import torch
shape = batch[:, 1:].shape
all_items = batch[:, 1:].flatten()
if not hasattr(self, "item_text"):
return None
if not self.item_text.preencoded:
item_text_embeddings = self.item_text.batch_encode(all_items)
item_text_embeddings = item_text_embeddings.reshape(
(*shape, self.item_text.output_dim)
)
else:
item_text_embeddings = self.item_text.features[all_items]
item_text_embeddings = item_text_embeddings.reshape(
(*shape, self.item_text.output_dim)
)
if not isinstance(item_text_embeddings, torch.Tensor):
item_text_embeddings = torch.tensor(
item_text_embeddings, dtype=torch.float32
)
return item_text_embeddings
[docs]
def get_modality_embeddings(self, batch):
"""
Get the modality embeddings for both text and image from the respectiv
modality instances.
Parameters
----------
param batch: torch.Tensor, user inidices in first column, pos item
indices in second
and all other columns are negative item indices
"""
item_text_embeddings = self.get_item_text_embeddings(batch)
item_image_embeddings = self.get_item_image_embedding(batch)
return item_text_embeddings, item_image_embeddings
def _fit_dmrl(self, train_set: Dataset, val_set: Dataset = None):
"""
Fit the model to observations.
Parameters
----------
train_set: User-Item preference data as well as additional modalities.
"""
import torch
from torch.utils.data import DataLoader
from cornac.models.dmrl.dmrl import DMRLLoss, DMRLModel
from cornac.models.dmrl.pwlearning_sampler import PWLearningSampler
self.initialize_and_build_modalities(train_set)
self.device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device {self.device} for training")
self.sampler = PWLearningSampler(train_set, num_neg=self.num_neg)
self.model = DMRLModel(
self.num_users,
self.num_items,
self.embedding_dim,
self.text_dim,
self.image_dim,
self.dropout,
self.num_neg,
self.num_factors,
).to(self.device)
loss_function = DMRLLoss(
decay_c=1e-3, num_factors=self.num_factors, num_neg=self.num_neg
)
# add hyperparams to tensorboard
if self.log_metrics:
self.tb_writer.add_hparams(
{
"learning_rate": self.learning_rate,
"decay_c": self.decay_c,
"decay_r": self.decay_r,
"batch_size": self.batch_size,
"epochs": self.epochs,
"embedding_dim": self.embedding_dim,
"bert_text_dim": self.text_dim,
"num_neg": self.num_neg,
"num_factors": self.num_factors,
"dropout": self.dropout,
},
{},
)
optimizer = torch.optim.AdamW(
self.model.parameters(),
lr=self.learning_rate,
weight_decay=self.decay_r,
betas=(0.9, 0.999),
)
# optimizer = torch.optim.RMSprop(self.model.parameters(), lr=self.learning_rate, weight_decay=self.decay_r)
# Create learning rate scheduler if needed
# scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=10, eta_min=0, last_epoch=-1)
# scheduler = torch.optim.lr_scheduler.StepLR(optimizer, gamma=0.25, step_size=35)
dataloader = DataLoader(
self.sampler,
batch_size=self.batch_size,
num_workers=0,
shuffle=True,
prefetch_factor=None,
)
if val_set is not None:
self.val_sampler = PWLearningSampler(val_set, num_neg=self.num_neg)
val_dataloader = DataLoader(
self.val_sampler,
batch_size=self.batch_size,
num_workers=0,
shuffle=True,
prefetch_factor=None,
)
j = 1
stop = False
# Training loop
for epoch in range(self.epochs):
if stop:
break
running_loss = 0
running_loss_val = 0
last_loss = 0
i = 0
batch: torch.Tensor
for i, batch in enumerate(dataloader):
optimizer.zero_grad()
item_text_embeddings, item_image_embeddings = (
self.get_modality_embeddings(batch)
)
# move the data to the device
batch = batch.to(self.device)
if item_text_embeddings is not None:
item_text_embeddings = item_text_embeddings.to(self.device)
if item_image_embeddings is not None:
item_image_embeddings = item_image_embeddings.to(self.device)
# Forward pass
embedding_factor_lists, rating_scores = self.model(
batch, item_text_embeddings, item_image_embeddings
)
# preds = self.model(u_batch, i_batch, text)
loss = loss_function(embedding_factor_lists, rating_scores)
# Backward pass and optimize
loss.backward()
# torch.nn.utils.clip_grad_value_(self.model.parameters(), 5) # use if exploding gradient becomes an issue
if self.log_metrics:
self.model.log_gradients_and_weights()
optimizer.step()
if val_set is not None:
val_batch = next(val_dataloader.__iter__())
item_text_embeddings_val, item_image_embeddings_val = (
self.get_modality_embeddings(val_batch)
)
# Forward pass
with torch.no_grad():
embedding_factor_lists_val, rating_scores_val = self.model(
val_batch,
item_text_embeddings_val,
item_image_embeddings_val,
)
# preds = self.model(u_batch, i_batch, text)
loss_val = loss_function(
embedding_factor_lists_val, rating_scores_val
)
running_loss_val += loss_val.item()
# Gather data and report
running_loss += loss.item()
devider = 5
if i % devider == 4:
last_loss = running_loss / devider # loss per batch
# last_loss = running_loss / (i + 1)
print(" batch {} loss: {}".format(i + 1, last_loss))
if self.log_metrics:
# tb_x = epoch * len(dataloader) + i + 1
self.tb_writer.add_scalar("Loss/train", last_loss, j)
self.tb_writer.add_scalar(
"Loss/val", running_loss_val / devider, j
)
self.tb_writer.add_scalar(
"Gradient Norm/train", np.mean(self.model.grad_norms), j
)
self.tb_writer.add_scalar(
"Param Norm/train", np.mean(self.model.param_norms), j
)
self.tb_writer.add_scalar(
"User-Item based rating", np.mean(self.model.ui_ratings), j
)
self.tb_writer.add_scalar(
"User-Text based rating", np.mean(self.model.ut_ratings), j
)
self.tb_writer.add_scalar(
"User-Itm Attention", np.mean(self.model.ui_attention), j
)
self.tb_writer.add_scalar(
"User-Text Attention", np.mean(self.model.ut_attention), j
)
for name, param in self.model.named_parameters():
self.tb_writer.add_scalar(
name + "/grad_norm",
np.mean(self.model.grad_dict[name]),
j,
)
self.tb_writer.add_histogram(
name + "/grad", param.grad, global_step=epoch
)
self.tb_writer.add_scalar(
"Learning rate", optimizer.param_groups[0]["lr"], j
)
self.model.reset_grad_metrics()
running_loss = 0
running_loss_val = 0
# if i % 999== 0:
# scheduler.step()
i += 1
j += 1
print(f"Epoch: {epoch} is done")
# scheduler.step()
print("Finished training!")
# self.eval_train_set_performance() # evaluate the model on the training set after training if necessary
[docs]
def score(self, user_index: int, item_indices = None):
"""
Scores a user-item pair. If item_index is None, scores for all known
items.
Parameters
----------
name: user_idx
The index of the user for whom to perform score prediction.
item_indices: torch.Tensor, optional, default: None
The index of the item for which to perform score prediction.
If None, scores for all known items will be returned.
"""
import torch
self.model.num_neg = 0
self.model.eval()
encoded_image = None
encoded_text = None
if item_indices is None:
item_indices = torch.tensor(list(self.iid_map.values()), dtype=torch.long)
user_index = user_index * torch.ones(len(item_indices), dtype=torch.long)
if self.item_text.features is None:
self.item_text.preencode_entire_corpus()
# since the model expects as (batch size, 1 + num_neg, encoding dim) we just add one dim and repeat
if hasattr(self, "item_text"):
encoded_text: torch.Tensor = self.item_text.features[
item_indices, :
]
encoded_text = encoded_text[:, None, :]
encoded_text = encoded_text.to(self.device)
if hasattr(self, "item_image"):
encoded_image = torch.tensor(
self.item_image.features[item_indices, :], dtype=torch.float32
)
encoded_image = encoded_image[:, None, :]
encoded_image = encoded_image.to(self.device)
input_tensor = torch.stack((user_index, item_indices), axis=1)
input_tensor = input_tensor.to(self.device)
with torch.no_grad():
_, ratings_sum_over_mods = self.model(
input_tensor, encoded_text, encoded_image
)
return np.array(ratings_sum_over_mods[:, 0].detach().cpu())
[docs]
def initialize_and_build_modalities(self, trainset: Dataset):
"""
Initializes text and image modalities for the model. Either takes in raw
text or image and performs pre-encoding given the transformer models in
TransformerTextModality and TransformerVisionModality. If preencoded
features are given, it uses those instead and simply wrapes them into a
general FeatureModality instance, as no further encoding model is
required.
"""
from cornac.models.dmrl.transformer_text import TransformersTextModality
from cornac.models.dmrl.transformer_vision import TransformersVisionModality
if trainset.item_text is not None:
if (
isinstance(trainset.item_text, TextModality)
and trainset.item_text.corpus is not None
):
self.item_text = TransformersTextModality(
corpus=trainset.item_text.corpus,
ids=trainset.item_text.ids,
preencode=True,
)
elif isinstance(
trainset.item_text, FeatureModality
): # already have preencoded text features from outside
self.item_text = trainset.item_text
assert trainset.item_text.features is not None, "No pre-encoded features found, please use TextModality"
else:
raise ValueError("Not supported type of modality for item text")
if trainset.item_image is not None:
if (
isinstance(trainset.item_image, ImageModality)
and trainset.item_image.images is not None
):
self.item_image = TransformersVisionModality(
images=trainset.item_image.images,
ids=trainset.item_image.ids,
preencode=True,
)
elif isinstance(
trainset.item_image, FeatureModality
): # already have preencoded image features from outside
self.item_image = trainset.item_image
assert trainset.item_image.features is not None, "No pre-encoded features found, please use ImageModality"
else:
raise ValueError("Not supported type of modality for item image")