Source code for gnes.encoder.numeric.pca

#  Tencent is pleased to support the open source community by making GNES available.
#
#  Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.


import numpy as np

from ..base import BaseNumericEncoder
from ...helper import get_perm, batching, get_optimal_sample_size, train_required


[docs]class PCAEncoder(BaseNumericEncoder): batch_size = 2048 def __init__(self, output_dim: int, whiten: bool=False, *args, **kwargs): super().__init__(*args, **kwargs) self.output_dim = output_dim self.whiten = whiten self.pca_components = None self.mean = None
[docs] def post_init(self): from sklearn.decomposition import IncrementalPCA self.pca = IncrementalPCA(n_components=self.output_dim)
[docs] @batching def train(self, vecs: np.ndarray, *args, **kwargs) -> None: num_samples, num_dim = vecs.shape if self.output_dim > num_samples: if self.mean.size: return else: raise ValueError('training PCA requires at least %d points, but %d was given' % (self.output_dim, num_samples)) assert self.output_dim < num_dim, 'PCA output dimension should < data dimension, received (%d, %d)' % ( self.output_dim, num_dim) self.pca.partial_fit(vecs) self.pca_components = np.transpose(self.pca.components_) self.mean = self.pca.mean_.astype('float32') self.explained_variance = self.pca.explained_variance_.astype('float32')
[docs] @train_required @batching def encode(self, vecs: np.ndarray, *args, **kwargs) -> np.ndarray: X_transformed = np.matmul(vecs - self.mean, self.pca_components) if self.whiten: X_transformed /= np.sqrt(self.explained_variance) return X_transformed
[docs]class PCALocalEncoder(BaseNumericEncoder): batch_size = 2048 def __init__(self, output_dim: int, num_locals: int, *args, **kwargs): super().__init__(*args, **kwargs) assert output_dim >= num_locals and output_dim % num_locals == 0, \ 'output_dim should >= num_locals and can be divided by num_locals!' self.output_dim = output_dim self.num_locals = num_locals self.pca_components = None self.mean = None
[docs] @batching(batch_size=get_optimal_sample_size, num_batch=1) def train(self, vecs: np.ndarray, *args, **kwargs) -> None: import faiss num_samples, num_dim = vecs.shape assert self.output_dim <= num_samples, 'training PCA requires at least %d points, but %d was given' % ( self.output_dim, num_samples) assert self.output_dim < num_dim, 'PCA output dimension should < data dimension, received (%d, %d)' % ( self.output_dim, num_dim) pca = faiss.PCAMatrix(num_dim, self.output_dim) self.mean = np.mean(vecs, axis=0) # 1 x 768 pca.train(vecs) explained_variance_ratio = faiss.vector_to_array(pca.eigenvalues)[:self.output_dim] components = faiss.vector_to_array(pca.PCAMat).reshape([-1, num_dim])[:self.output_dim] # permutate engive according to variance opt_order = get_perm(explained_variance_ratio, self.num_locals) comp_tmp = np.reshape(components[opt_order], [self.output_dim, num_dim]) self.pca_components = np.transpose(comp_tmp) # 768 x 200
[docs] @train_required @batching def encode(self, vecs: np.ndarray, *args, **kwargs) -> np.ndarray: return np.matmul(vecs - self.mean, self.pca_components)
def _copy_from(self, x: 'PCALocalEncoder') -> None: self.output_dim = x.output_dim self.pca_components = x.pca_components self.mean = x.mean self.num_locals = x.num_locals self.is_trained = x.is_trained