diff --git a/cmake/find/rocksdb.cmake b/cmake/find/rocksdb.cmake index d53febee762..24d959eb074 100644 --- a/cmake/find/rocksdb.cmake +++ b/cmake/find/rocksdb.cmake @@ -13,9 +13,8 @@ if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/rocksdb/CMakeLists.txt") if (USE_INTERNAL_ROCKSDB_LIBRARY) message (WARNING "submodule contrib is missing. to fix try run: \n git submodule update --init --recursive") message(${RECONFIGURE_MESSAGE_LEVEL} "cannot find internal rocksdb") - set (USE_INTERNAL_ROCKSDB_LIBRARY 0) endif() - set (MISSING_ROCKSDB 1) + set (MISSING_INTERNAL_ROCKSDB 1) endif () if (NOT USE_INTERNAL_ROCKSDB_LIBRARY) @@ -26,7 +25,9 @@ if (NOT USE_INTERNAL_ROCKSDB_LIBRARY) endif() endif () -if (NOT ROCKSDB_LIBRARY AND NOT MISSING_ROCKSDB) +if(ROCKSDB_LIBRARY AND ROCKSDB_INCLUDE_DIR) + set(USE_ROCKSDB 1) +elseif (NOT MISSING_INTERNAL_ROCKSDB) set (USE_INTERNAL_ROCKSDB_LIBRARY 1) set (ROCKSDB_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/rocksdb/include") diff --git a/docker/test/fasttest/run.sh b/docker/test/fasttest/run.sh index c1c76b93829..9d84274c297 100755 --- a/docker/test/fasttest/run.sh +++ b/docker/test/fasttest/run.sh @@ -270,7 +270,7 @@ TESTS_TO_SKIP=( 00646_url_engine 00974_query_profiler - # Rocksdb is not enabled by default + # In fasttest, ENABLE_LIBRARIES=0, so rocksdb engine is not enabled by default 01504_rocksdb # Look at DistributedFilesToInsert, so cannot run in parallel. diff --git a/src/Storages/Rocksdb/EmbeddedRocksdbBlockInputStream.cpp b/src/Storages/Rocksdb/EmbeddedRocksdbBlockInputStream.cpp new file mode 100644 index 00000000000..0f75f41c7dd --- /dev/null +++ b/src/Storages/Rocksdb/EmbeddedRocksdbBlockInputStream.cpp @@ -0,0 +1,65 @@ +#include +#include +#include +#include + +#include + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + +EmbeddedRocksdbBlockInputStream::EmbeddedRocksdbBlockInputStream( + StorageEmbeddedRocksdb & storage_, + const StorageMetadataPtr & metadata_snapshot_, + size_t max_block_size_) + : storage(storage_) + , metadata_snapshot(metadata_snapshot_) + , max_block_size(max_block_size_) +{ + sample_block = metadata_snapshot->getSampleBlock(); + primary_key_pos = sample_block.getPositionByName(storage.primary_key); +} + +Block EmbeddedRocksdbBlockInputStream::readImpl() +{ + if (finished) + return {}; + + if (!iterator) + { + iterator = std::unique_ptr(storage.rocksdb_ptr->NewIterator(rocksdb::ReadOptions())); + iterator->SeekToFirst(); + } + + MutableColumns columns = sample_block.cloneEmptyColumns(); + size_t rows = 0; + for (; iterator->Valid(); iterator->Next()) + { + ReadBufferFromString key_buffer(iterator->key()); + ReadBufferFromString value_buffer(iterator->value()); + + for (const auto [idx, column_type] : ext::enumerate(sample_block.getColumnsWithTypeAndName())) + { + column_type.type->deserializeBinary(*columns[idx], idx == primary_key_pos? key_buffer: value_buffer); + } + ++rows; + if (rows >= max_block_size) + break; + } + + finished = !iterator->Valid(); + if (!iterator->status().ok()) + { + throw Exception("Engine " + getName() + " got error while seeking key value datas: " + iterator->status().ToString(), + ErrorCodes::LOGICAL_ERROR); + } + return sample_block.cloneWithColumns(std::move(columns)); +} + +} diff --git a/src/Storages/Rocksdb/EmbeddedRocksdbBlockInputStream.h b/src/Storages/Rocksdb/EmbeddedRocksdbBlockInputStream.h new file mode 100644 index 00000000000..817aace58d0 --- /dev/null +++ b/src/Storages/Rocksdb/EmbeddedRocksdbBlockInputStream.h @@ -0,0 +1,33 @@ +#pragma once + +#include +#include +#include + +#include + +namespace DB +{ + +class EmbeddedRocksdbBlockInputStream : public IBlockInputStream +{ + +public: + EmbeddedRocksdbBlockInputStream( + StorageEmbeddedRocksdb & storage_, const StorageMetadataPtr & metadata_snapshot_, size_t max_block_size_); + + String getName() const override { return storage.getName(); } + Block getHeader() const override { return sample_block; } + Block readImpl() override; + +private: + StorageEmbeddedRocksdb & storage; + StorageMetadataPtr metadata_snapshot; + const size_t max_block_size; + + Block sample_block; + std::unique_ptr iterator; + size_t primary_key_pos; + bool finished = false; +}; +} diff --git a/src/Storages/Rocksdb/EmbeddedRocksdbBlockOutputStream.cpp b/src/Storages/Rocksdb/EmbeddedRocksdbBlockOutputStream.cpp new file mode 100644 index 00000000000..aebc41addda --- /dev/null +++ b/src/Storages/Rocksdb/EmbeddedRocksdbBlockOutputStream.cpp @@ -0,0 +1,50 @@ + +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int SYSTEM_ERROR; +} + +Block EmbeddedRocksdbBlockOutputStream::getHeader() const +{ + return metadata_snapshot->getSampleBlock(); +} + +void EmbeddedRocksdbBlockOutputStream::write(const Block & block) +{ + metadata_snapshot->check(block, true); + auto rows = block.rows(); + + WriteBufferFromOwnString wb_key; + WriteBufferFromOwnString wb_value; + + rocksdb::WriteBatch batch; + auto columns = metadata_snapshot->getColumns(); + + for (size_t i = 0; i < rows; i++) + { + wb_key.restart(); + wb_value.restart(); + + for (const auto & col : columns) + { + const auto & type = block.getByName(col.name).type; + const auto & column = block.getByName(col.name).column; + if (col.name == storage.primary_key) + type->serializeBinary(*column, i, wb_key); + else + type->serializeBinary(*column, i, wb_value); + } + batch.Put(wb_key.str(), wb_value.str()); + } + auto status = storage.rocksdb_ptr->Write(rocksdb::WriteOptions(), &batch); + if (!status.ok()) + throw Exception("Rocksdb write error: " + status.ToString(), ErrorCodes::SYSTEM_ERROR); +} + +} diff --git a/src/Storages/Rocksdb/EmbeddedRocksdbBlockOutputStream.h b/src/Storages/Rocksdb/EmbeddedRocksdbBlockOutputStream.h new file mode 100644 index 00000000000..d92e15d553a --- /dev/null +++ b/src/Storages/Rocksdb/EmbeddedRocksdbBlockOutputStream.h @@ -0,0 +1,29 @@ +#pragma once + +#include +#include +#include +#include + +namespace DB +{ + +class EmbeddedRocksdbBlockOutputStream : public IBlockOutputStream +{ +public: + explicit EmbeddedRocksdbBlockOutputStream( + StorageEmbeddedRocksdb & storage_, + const StorageMetadataPtr & metadata_snapshot_) + : storage(storage_) + , metadata_snapshot(metadata_snapshot_) + {} + + Block getHeader() const override; + void write(const Block & block) override; + +private: + StorageEmbeddedRocksdb & storage; + StorageMetadataPtr metadata_snapshot; +}; + +} diff --git a/src/Storages/Rocksdb/StorageEmbeddedRocksdb.cpp b/src/Storages/Rocksdb/StorageEmbeddedRocksdb.cpp index 127d1349a2c..86d2f6ae406 100644 --- a/src/Storages/Rocksdb/StorageEmbeddedRocksdb.cpp +++ b/src/Storages/Rocksdb/StorageEmbeddedRocksdb.cpp @@ -3,6 +3,8 @@ #include #include #include +#include +#include #include #include #include @@ -17,7 +19,7 @@ #include #include -#include +#include #include #include #include @@ -39,13 +41,12 @@ namespace ErrorCodes { extern const int BAD_ARGUMENTS; extern const int SYSTEM_ERROR; - extern const int LOGICAL_ERROR; extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; } // returns keys may be filter by condition -static bool traverseASTFilter(const String & primary_key, const ASTPtr & elem, const PreparedSets & sets, FieldVector & res) +static bool traverseASTFilter(const String & primary_key, const DataTypePtr & primary_key_type, const ASTPtr & elem, const PreparedSets & sets, FieldVector & res) { const auto * function = elem->as(); if (!function) @@ -53,34 +54,50 @@ static bool traverseASTFilter(const String & primary_key, const ASTPtr & elem, c if (function->name == "and") { + // one child has the key filter condition is ok for (const auto & child : function->arguments->children) - if (traverseASTFilter(primary_key, child, sets, res)) + if (traverseASTFilter(primary_key, primary_key_type, child, sets, res)) return true; return false; } + else if (function->name == "or") + { + // make sure every child has the key filter condition + FieldVector child_res; + for (const auto & child : function->arguments->children) + { + if (!traverseASTFilter(primary_key, primary_key_type, child, sets, child_res)) + return false; + } + res.insert(res.end(), child_res.begin(), child_res.end()); + return true; + } else if (function->name == "equals" || function->name == "in") { const auto & args = function->arguments->as(); + const ASTIdentifier * ident; const IAST * value; if (args.children.size() != 2) - return false; + return false; - const ASTIdentifier * ident; - if ((ident = args.children.at(0)->as())) - value = args.children.at(1).get(); - else if ((ident = args.children.at(1)->as())) - value = args.children.at(0).get(); - else - return false; - - if (ident->name != primary_key) - return false; - - - if (function->name == "in" && ((value->as() || value->as()))) + if (function->name == "in") { - auto set_it = sets.find(PreparedSetKey::forSubquery(*value)); + ident = args.children.at(0)->as(); + if (!ident) + return false; + + if (ident->name != primary_key) + return false; + value = args.children.at(1).get(); + + PreparedSetKey set_key; + if ((value->as() || value->as())) + set_key = PreparedSetKey::forSubquery(*value); + else + set_key = PreparedSetKey::forLiteral(*value, {primary_key_type}); + + auto set_it = sets.find(set_key); if (set_it == sets.end()) return false; SetPtr prepared_set = set_it->second; @@ -89,7 +106,6 @@ static bool traverseASTFilter(const String & primary_key, const ASTPtr & elem, c return false; prepared_set->checkColumnsNumber(1); - const auto & set_column = *prepared_set->getSetElements()[0]; for (size_t row = 0; row < set_column.size(); ++row) { @@ -97,27 +113,24 @@ static bool traverseASTFilter(const String & primary_key, const ASTPtr & elem, c } return true; } - - if (const auto * literal = value->as()) + else { - if (function->name == "equals") + if ((ident = args.children.at(0)->as())) + value = args.children.at(1).get(); + else if ((ident = args.children.at(1)->as())) + value = args.children.at(0).get(); + else + return false; + + if (ident->name != primary_key) + return false; + + //function->name == "equals" + if (const auto * literal = value->as()) { res.push_back(literal->value); return true; } - else if (function->name == "in") - { - if (literal->value.getType() == Field::Types::Tuple) - { - auto tuple = literal->value.safeGet(); - for (const auto & f : tuple) - { - res.push_back(f); - } - return true; - } - } - else return false; } } return false; @@ -127,7 +140,7 @@ static bool traverseASTFilter(const String & primary_key, const ASTPtr & elem, c /** Retrieve from the query a condition of the form `key = 'key'`, `key in ('xxx_'), from conjunctions in the WHERE clause. * TODO support key like search */ -static std::pair getFilterKeys(const String & primary_key, const SelectQueryInfo & query_info) +static std::pair getFilterKeys(const String & primary_key, const DataTypePtr & primary_key_type, const SelectQueryInfo & query_info) { const auto & select = query_info.query->as(); if (!select.where()) @@ -135,7 +148,7 @@ static std::pair getFilterKeys(const String & primary_key, co return std::make_pair(FieldVector{}, true); } FieldVector res; - auto matched_keys = traverseASTFilter(primary_key, select.where(), query_info.sets, res); + auto matched_keys = traverseASTFilter(primary_key, primary_key_type, select.where(), query_info.sets, res); return std::make_pair(res, !matched_keys); } @@ -149,13 +162,13 @@ public: const FieldVector & keys_, const size_t start_, const size_t end_, - const size_t rocksdb_batch_read_size_) + const size_t max_block_size_) : SourceWithProgress(metadata_snapshot_->getSampleBlock()) , storage(storage_) , metadata_snapshot(metadata_snapshot_) , start(start_) , end(end_) - , rocksdb_batch_read_size(rocksdb_batch_read_size_) + , max_block_size(max_block_size_) { // slice the keys if (end > start) @@ -185,7 +198,7 @@ public: auto columns = sample_block.cloneEmptyColumns(); size_t primary_key_pos = sample_block.getPositionByName(storage.primary_key); - for (size_t i = processed_keys; i < std::min(keys.size(), processed_keys + size_t(rocksdb_batch_read_size)); ++i) + for (size_t i = processed_keys; i < std::min(keys.size(), processed_keys + max_block_size); ++i) { key_column.type->serializeBinary(keys[i], wbs[i]); auto str_ref = wbs[i].stringRef(); @@ -206,7 +219,7 @@ public: } } } - processed_keys += rocksdb_batch_read_size; + processed_keys += max_block_size; UInt64 num_rows = columns.at(0)->size(); return Chunk(std::move(columns), num_rows); @@ -218,62 +231,13 @@ private: const StorageMetadataPtr metadata_snapshot; const size_t start; const size_t end; - const size_t rocksdb_batch_read_size; + const size_t max_block_size; FieldVector keys; size_t processed_keys = 0; }; -class EmbeddedRocksdbBlockOutputStream : public IBlockOutputStream -{ -public: - explicit EmbeddedRocksdbBlockOutputStream( - StorageEmbeddedRocksdb & storage_, - const StorageMetadataPtr & metadata_snapshot_) - : storage(storage_) - , metadata_snapshot(metadata_snapshot_) - {} - - Block getHeader() const override { return metadata_snapshot->getSampleBlock(); } - - void write(const Block & block) override - { - metadata_snapshot->check(block, true); - auto rows = block.rows(); - - WriteBufferFromOwnString wb_key; - WriteBufferFromOwnString wb_value; - - rocksdb::WriteBatch batch; - auto columns = metadata_snapshot->getColumns(); - - for (size_t i = 0; i < rows; i++) - { - wb_key.restart(); - wb_value.restart(); - - for (const auto & col : columns) - { - const auto & type = block.getByName(col.name).type; - const auto & column = block.getByName(col.name).column; - if (col.name == storage.primary_key) - type->serializeBinary(*column, i, wb_key); - else - type->serializeBinary(*column, i, wb_value); - } - batch.Put(wb_key.str(), wb_value.str()); - } - auto status = storage.rocksdb_ptr->Write(rocksdb::WriteOptions(), &batch); - if (!status.ok()) - throw Exception("Rocksdb write error: " + status.ToString(), ErrorCodes::SYSTEM_ERROR); - } - -private: - StorageEmbeddedRocksdb & storage; - StorageMetadataPtr metadata_snapshot; -}; - StorageEmbeddedRocksdb::StorageEmbeddedRocksdb(const StorageID & table_id_, const String & relative_data_path_, const StorageInMemoryMetadata & metadata_, @@ -318,46 +282,31 @@ Pipe StorageEmbeddedRocksdb::read( const SelectQueryInfo & query_info, const Context & /*context*/, QueryProcessingStage::Enum /*processed_stage*/, - size_t /*max_block_size*/, + size_t max_block_size, unsigned num_streams) { metadata_snapshot->check(column_names, getVirtuals(), getStorageID()); - Block sample_block = metadata_snapshot->getSampleBlock(); - size_t primary_key_pos = sample_block.getPositionByName(primary_key); - FieldVector keys; bool all_scan = false; - std::tie(keys, all_scan) = getFilterKeys(primary_key, query_info); + auto primary_key_data_type = metadata_snapshot->getSampleBlock().getByName(primary_key).type; + std::tie(keys, all_scan) = getFilterKeys(primary_key, primary_key_data_type, query_info); if (all_scan) { - MutableColumns columns = sample_block.cloneEmptyColumns(); - auto it = std::unique_ptr(rocksdb_ptr->NewIterator(rocksdb::ReadOptions())); - for (it->SeekToFirst(); it->Valid(); it->Next()) - { - ReadBufferFromString key_buffer(it->key()); - ReadBufferFromString value_buffer(it->value()); - - for (const auto [idx, column_type] : ext::enumerate(sample_block.getColumnsWithTypeAndName())) - { - column_type.type->deserializeBinary(*columns[idx], idx == primary_key_pos? key_buffer: value_buffer); - } - } - - if (!it->status().ok()) - { - throw Exception("Engine " + getName() + " got error while seeking key value datas: " + it->status().ToString(), - ErrorCodes::LOGICAL_ERROR); - } - UInt64 num_rows = columns.at(0)->size(); - Chunk chunk(std::move(columns), num_rows); - return Pipe(std::make_shared(sample_block, std::move(chunk))); + auto reader = std::make_shared( + *this, metadata_snapshot, max_block_size); + return Pipe(std::make_shared(reader)); } else { if (keys.empty()) return {}; + std::sort(keys.begin(), keys.end()); + auto unique_iter = std::unique(keys.begin(), keys.end()); + if (unique_iter != keys.end()) + keys.erase(unique_iter, keys.end()); + Pipes pipes; size_t start = 0; size_t end; @@ -365,9 +314,6 @@ Pipe StorageEmbeddedRocksdb::read( const size_t num_threads = std::min(size_t(num_streams), keys.size()); const size_t batch_per_size = ceil(keys.size() * 1.0 / num_threads); - // TODO settings - static constexpr size_t rocksdb_batch_read_size = 81920; - for (size_t t = 0; t < num_threads; ++t) { if (start >= keys.size()) @@ -376,7 +322,7 @@ Pipe StorageEmbeddedRocksdb::read( end = start + batch_per_size > keys.size() ? keys.size() : start + batch_per_size; pipes.emplace_back( - std::make_shared(*this, metadata_snapshot, keys, start, end, rocksdb_batch_read_size)); + std::make_shared(*this, metadata_snapshot, keys, start, end, max_block_size)); start += batch_per_size; } return Pipe::unitePipes(std::move(pipes)); @@ -391,7 +337,7 @@ BlockOutputStreamPtr StorageEmbeddedRocksdb::write(const ASTPtr & /*query*/, con static StoragePtr create(const StorageFactory::Arguments & args) { - // TODO custom RocksdbSettings + // TODO custom RocksdbSettings, table function if (!args.engine_args.empty()) throw Exception( "Engine " + args.engine_name + " doesn't support any arguments (" + toString(args.engine_args.size()) + " given)", diff --git a/src/Storages/Rocksdb/StorageEmbeddedRocksdb.h b/src/Storages/Rocksdb/StorageEmbeddedRocksdb.h index bc70e795cba..c803055223b 100644 --- a/src/Storages/Rocksdb/StorageEmbeddedRocksdb.h +++ b/src/Storages/Rocksdb/StorageEmbeddedRocksdb.h @@ -20,6 +20,7 @@ class StorageEmbeddedRocksdb final : public ext::shared_ptr_helper; friend class EmbeddedRocksdbSource; friend class EmbeddedRocksdbBlockOutputStream; + friend class EmbeddedRocksdbBlockInputStream; public: std::string getName() const override { return "EmbeddedRocksdb"; } diff --git a/src/Storages/ya.make b/src/Storages/ya.make index f45767d42f2..d9fb44e69f1 100644 --- a/src/Storages/ya.make +++ b/src/Storages/ya.make @@ -105,6 +105,12 @@ SRCS( MutationCommands.cpp PartitionCommands.cpp ReadInOrderOptimizer.cpp +<<<<<<< HEAD +======= + registerStorages.cpp + Rocksdb/EmbeddedRocksdbBlockInputStream.cpp + Rocksdb/EmbeddedRocksdbBlockOutputStream.cpp +>>>>>>> * fix fasttest and cmake && pipline for all_scan Rocksdb/StorageEmbeddedRocksdb.cpp SelectQueryDescription.cpp SetSettings.cpp diff --git a/tests/queries/0_stateless/01504_rocksdb.sql b/tests/queries/0_stateless/01504_rocksdb.sql index 8b01aafa38a..2ad00f18cd8 100644 --- a/tests/queries/0_stateless/01504_rocksdb.sql +++ b/tests/queries/0_stateless/01504_rocksdb.sql @@ -24,8 +24,8 @@ SELECT A.a - B.a, A.b - B.b, A.c - B.c, A.d - B.d, A.e - B.e FROM ( SELECT 0 AS CREATE TEMPORARY TABLE keys AS SELECT * FROM numbers(1000); SET max_rows_to_read = 2; -SELECT dummy == (1,1.2) FROM test WHERE k IN (1, 3); -SELECT k == 4 FROM test WHERE k = 4; +SELECT dummy == (1,1.2) FROM test WHERE k IN (1, 3) OR k IN (1) OR k IN (3, 1) OR k IN [1] OR k IN [1, 3] ; +SELECT k == 4 FROM test WHERE k = 4 OR k IN [4]; SELECT k == 4 FROM test WHERE k IN (SELECT toUInt32(number) FROM keys WHERE number = 4); SELECT k, value FROM test WHERE k = 0 OR value > 0; -- { serverError 158 }