Source code for gnes.indexer.doc.rocksdb

#  Tencent is pleased to support the open source community by making GNES available.
#
#  Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.


import gc
import pickle
from typing import List, Any

from ..base import BaseDocIndexer as BDI
from ...proto import gnes_pb2


[docs]class RocksDBIndexer(BDI): def __init__(self, data_path: str, drop_raw_data: bool = False, drop_chunk_blob: bool = False, read_only: bool = False, *args, **kwargs): super().__init__(*args, **kwargs) self.data_path = data_path self.drop_raw_data = drop_raw_data self.drop_chunk_blob = drop_chunk_blob self.read_only = read_only self.kwargs = kwargs
[docs] def post_init(self): import rocksdb opts = rocksdb.Options() opts.create_if_missing = True opts.max_open_files = 300000 opts.write_buffer_size = 67108864 opts.max_write_buffer_number = 3 opts.target_file_size_base = 67108864 opts.table_factory = rocksdb.BlockBasedTableFactory( filter_policy=rocksdb.BloomFilterPolicy(10), block_cache=rocksdb.LRUCache(2 * (1024 ** 3)), block_cache_compressed=rocksdb.LRUCache(500 * (1024 ** 2))) for key, value in self.kwargs.items(): setattr(opts, key, value) self._db = rocksdb.DB(self.data_path, opts, read_only=self.read_only)
[docs] @BDI.update_counter def add(self, keys: List[int], docs: List['gnes_pb2.Document'], *args, **kwargs): import rocksdb write_batch = rocksdb.WriteBatch() for k, d in zip(keys, docs): key_bytes = pickle.dumps(k) if self.drop_raw_data: d.ClearField('raw_data') if self.drop_chunk_blob: for c in d.chunks: c.ClearField('blob') value_bytes = d.SerializeToString() write_batch.put(key_bytes, value_bytes) self._db.write(write_batch, sync=True)
[docs] def query(self, keys: List[int], *args, **kwargs) -> List['gnes_pb2.Document']: query_keys = [] for k in keys: key_value = pickle.dumps(k) query_keys.append(key_value) values = self._db.multi_get(query_keys) docs = [] for k in query_keys: v = values[k] if v is not None: _doc = gnes_pb2.Document() _doc.ParseFromString(v) docs.append(_doc) else: docs.append(None) return docs
[docs] def scan(self, reversed_scan: bool=False): iterator = self._db.iterkeys() if reversed_scan: iterator.seek_to_last() else: iterator.seek_to_first() if reversed_scan: iterator = reversed(iterator) for key_bytes in iterator: doc_id = pickle.loads(key_bytes) value_bytes = self._db.get(key_bytes) pb_doc = gnes_pb2.Document() pb_doc.ParseFromString(value_bytes) yield doc_id, pb_doc
[docs] def close(self): super().close() try: del self._db except AttributeError: pass gc.collect()