diff --git a/src/Storages/RocksDB/EmbeddedRocksDBBlockInputStream.cpp b/src/Storages/RocksDB/EmbeddedRocksDBBlockInputStream.cpp deleted file mode 100644 index 4900e17ad91..00000000000 --- a/src/Storages/RocksDB/EmbeddedRocksDBBlockInputStream.cpp +++ /dev/null @@ -1,66 +0,0 @@ -#include -#include -#include -#include -#include - -#include - - -namespace DB -{ - -namespace ErrorCodes -{ - extern const int ROCKSDB_ERROR; -} - -EmbeddedRocksDBBlockInputStream::EmbeddedRocksDBBlockInputStream( - StorageEmbeddedRocksDB & storage_, - const StorageMetadataPtr & metadata_snapshot_, - size_t max_block_size_) - : storage(storage_) - , metadata_snapshot(metadata_snapshot_) - , max_block_size(max_block_size_) -{ - sample_block = metadata_snapshot->getSampleBlock(); - primary_key_pos = sample_block.getPositionByName(storage.primary_key); -} - -Block EmbeddedRocksDBBlockInputStream::readImpl() -{ - if (finished) - return {}; - - if (!iterator) - { - iterator = std::unique_ptr(storage.rocksdb_ptr->NewIterator(rocksdb::ReadOptions())); - iterator->SeekToFirst(); - } - - MutableColumns columns = sample_block.cloneEmptyColumns(); - - for (size_t rows = 0; iterator->Valid() && rows < max_block_size; ++rows, iterator->Next()) - { - ReadBufferFromString key_buffer(iterator->key()); - ReadBufferFromString value_buffer(iterator->value()); - - size_t idx = 0; - for (const auto & elem : sample_block) - { - auto serialization = elem.type->getDefaultSerialization(); - serialization->deserializeBinary(*columns[idx], idx == primary_key_pos ? key_buffer : value_buffer); - ++idx; - } - } - - finished = !iterator->Valid(); - if (!iterator->status().ok()) - { - throw Exception("Engine " + getName() + " got error while seeking key value data: " + iterator->status().ToString(), - ErrorCodes::ROCKSDB_ERROR); - } - return sample_block.cloneWithColumns(std::move(columns)); -} - -} diff --git a/src/Storages/RocksDB/EmbeddedRocksDBBlockInputStream.h b/src/Storages/RocksDB/EmbeddedRocksDBBlockInputStream.h deleted file mode 100644 index ddff1fc2e84..00000000000 --- a/src/Storages/RocksDB/EmbeddedRocksDBBlockInputStream.h +++ /dev/null @@ -1,39 +0,0 @@ -#pragma once - -#include - - -namespace rocksdb -{ - class Iterator; -} - -namespace DB -{ - -class StorageEmbeddedRocksDB; -struct StorageInMemoryMetadata; -using StorageMetadataPtr = std::shared_ptr; - -class EmbeddedRocksDBBlockInputStream : public IBlockInputStream -{ - -public: - EmbeddedRocksDBBlockInputStream( - StorageEmbeddedRocksDB & storage_, const StorageMetadataPtr & metadata_snapshot_, size_t max_block_size_); - - String getName() const override { return "EmbeddedRocksDB"; } - Block getHeader() const override { return sample_block; } - Block readImpl() override; - -private: - StorageEmbeddedRocksDB & storage; - StorageMetadataPtr metadata_snapshot; - const size_t max_block_size; - - Block sample_block; - std::unique_ptr iterator; - size_t primary_key_pos; - bool finished = false; -}; -} diff --git a/src/Storages/RocksDB/StorageEmbeddedRocksDB.cpp b/src/Storages/RocksDB/StorageEmbeddedRocksDB.cpp index 0b09a1f94d5..820b7c94ebd 100644 --- a/src/Storages/RocksDB/StorageEmbeddedRocksDB.cpp +++ b/src/Storages/RocksDB/StorageEmbeddedRocksDB.cpp @@ -1,6 +1,5 @@ #include #include -#include #include @@ -182,15 +181,15 @@ class EmbeddedRocksDBSource : public SourceWithProgress public: EmbeddedRocksDBSource( const StorageEmbeddedRocksDB & storage_, - const StorageMetadataPtr & metadata_snapshot_, + const Block & header, FieldVectorPtr keys_, FieldVector::const_iterator begin_, FieldVector::const_iterator end_, const size_t max_block_size_) - : SourceWithProgress(metadata_snapshot_->getSampleBlock()) + : SourceWithProgress(header) , storage(storage_) - , metadata_snapshot(metadata_snapshot_) - , keys(std::move(keys_)) + , primary_key_pos(header.getPositionByName(storage.getPrimaryKey())) + , keys(keys_) , begin(begin_) , end(end_) , it(begin) @@ -198,12 +197,29 @@ public: { } - String getName() const override + EmbeddedRocksDBSource( + const StorageEmbeddedRocksDB & storage_, + const Block & header, + std::unique_ptr iterator_, + const size_t max_block_size_) + : SourceWithProgress(header) + , storage(storage_) + , primary_key_pos(header.getPositionByName(storage.getPrimaryKey())) + , iterator(std::move(iterator_)) + , max_block_size(max_block_size_) { - return storage.getName(); } + String getName() const override { return storage.getName(); } + Chunk generate() override + { + if (keys) + return generateWithKeys(); + return generateFullScan(); + } + + Chunk generateWithKeys() { if (it >= end) return {}; @@ -213,16 +229,15 @@ public: std::vector serialized_keys(num_keys); std::vector slices_keys(num_keys); - const auto & sample_block = metadata_snapshot->getSampleBlock(); - const auto & key_column = sample_block.getByName(storage.getPrimaryKey()); - auto columns = sample_block.cloneEmptyColumns(); - size_t primary_key_pos = sample_block.getPositionByName(storage.getPrimaryKey()); + const auto & sample_block = getPort().getHeader(); + + const auto & key_column_type = sample_block.getByName(storage.getPrimaryKey()).type; size_t rows_processed = 0; while (it < end && rows_processed < max_block_size) { WriteBufferFromString wb(serialized_keys[rows_processed]); - key_column.type->getDefaultSerialization()->serializeBinary(*it, wb); + key_column_type->getDefaultSerialization()->serializeBinary(*it, wb); wb.finalize(); slices_keys[rows_processed] = std::move(serialized_keys[rows_processed]); @@ -230,6 +245,7 @@ public: ++rows_processed; } + MutableColumns columns = sample_block.cloneEmptyColumns(); std::vector values; auto statuses = storage.multiGet(slices_keys, values); for (size_t i = 0; i < statuses.size(); ++i) @@ -238,13 +254,7 @@ public: { ReadBufferFromString key_buffer(slices_keys[i]); ReadBufferFromString value_buffer(values[i]); - - size_t idx = 0; - for (const auto & elem : sample_block) - { - elem.type->getDefaultSerialization()->deserializeBinary(*columns[idx], idx == primary_key_pos ? key_buffer : value_buffer); - ++idx; - } + fillColumns(key_buffer, value_buffer, columns); } } @@ -252,14 +262,54 @@ public: return Chunk(std::move(columns), num_rows); } + Chunk generateFullScan() + { + if (!iterator->Valid()) + return {}; + + const auto & sample_block = getPort().getHeader(); + MutableColumns columns = sample_block.cloneEmptyColumns(); + + for (size_t rows = 0; iterator->Valid() && rows < max_block_size; ++rows, iterator->Next()) + { + ReadBufferFromString key_buffer(iterator->key()); + ReadBufferFromString value_buffer(iterator->value()); + fillColumns(key_buffer, value_buffer, columns); + } + + if (!iterator->status().ok()) + { + throw Exception("Engine " + getName() + " got error while seeking key value data: " + iterator->status().ToString(), + ErrorCodes::ROCKSDB_ERROR); + } + Block block = sample_block.cloneWithColumns(std::move(columns)); + return Chunk(block.getColumns(), block.rows()); + } + + void fillColumns(ReadBufferFromString & key_buffer, ReadBufferFromString & value_buffer, MutableColumns & columns) + { + size_t idx = 0; + for (const auto & elem : getPort().getHeader()) + { + elem.type->getDefaultSerialization()->deserializeBinary(*columns[idx], idx == primary_key_pos ? key_buffer : value_buffer); + ++idx; + } + } + private: const StorageEmbeddedRocksDB & storage; - const StorageMetadataPtr metadata_snapshot; - FieldVectorPtr keys; + size_t primary_key_pos; + + /// For key scan + FieldVectorPtr keys = nullptr; FieldVector::const_iterator begin; FieldVector::const_iterator end; FieldVector::const_iterator it; + + /// For full scan + std::unique_ptr iterator = nullptr; + const size_t max_block_size; }; @@ -379,7 +429,6 @@ void StorageEmbeddedRocksDB::initDB() rocksdb_ptr = std::unique_ptr(db); } - Pipe StorageEmbeddedRocksDB::read( const Names & column_names, const StorageMetadataPtr & metadata_snapshot, @@ -394,13 +443,14 @@ Pipe StorageEmbeddedRocksDB::read( FieldVectorPtr keys; bool all_scan = false; - auto primary_key_data_type = metadata_snapshot->getSampleBlock().getByName(primary_key).type; + Block sample_block = metadata_snapshot->getSampleBlock(); + auto primary_key_data_type = sample_block.getByName(primary_key).type; std::tie(keys, all_scan) = getFilterKeys(primary_key, primary_key_data_type, query_info); if (all_scan) { - auto reader = std::make_shared( - *this, metadata_snapshot, max_block_size); - return Pipe(std::make_shared(reader)); + auto iterator = std::unique_ptr(rocksdb_ptr->NewIterator(rocksdb::ReadOptions())); + iterator->SeekToFirst(); + return Pipe(std::make_shared(*this, sample_block, std::move(iterator), max_block_size)); } else { @@ -424,7 +474,7 @@ Pipe StorageEmbeddedRocksDB::read( size_t end = num_keys * (thread_idx + 1) / num_threads; pipes.emplace_back(std::make_shared( - *this, metadata_snapshot, keys, keys->begin() + begin, keys->begin() + end, max_block_size)); + *this, sample_block, keys, keys->begin() + begin, keys->begin() + end, max_block_size)); } return Pipe::unitePipes(std::move(pipes)); } @@ -436,7 +486,6 @@ SinkToStoragePtr StorageEmbeddedRocksDB::write( return std::make_shared(*this, metadata_snapshot); } - static StoragePtr create(const StorageFactory::Arguments & args) { // TODO custom RocksDBSettings, table function diff --git a/src/Storages/RocksDB/StorageEmbeddedRocksDB.h b/src/Storages/RocksDB/StorageEmbeddedRocksDB.h index b095673a6f5..ade4bd58bf4 100644 --- a/src/Storages/RocksDB/StorageEmbeddedRocksDB.h +++ b/src/Storages/RocksDB/StorageEmbeddedRocksDB.h @@ -23,7 +23,6 @@ class StorageEmbeddedRocksDB final : public shared_ptr_helper; friend class EmbeddedRocksDBSink; - friend class EmbeddedRocksDBBlockInputStream; public: std::string getName() const override { return "EmbeddedRocksDB"; }