From 49148ac3e901eb4b3a7300440c65646a4b474d4f Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Tue, 8 Jan 2019 20:27:44 +0300 Subject: [PATCH 1/7] minmax index --- .../MergeTree/MergeTreeDataSelectExecutor.cpp | 2 +- .../Storages/MergeTree/MergeTreeIndexes.cpp | 11 +- .../src/Storages/MergeTree/MergeTreeIndexes.h | 15 +- .../MergeTree/MergeTreeMinMaxIndex.cpp | 140 ++++++++++++++++++ .../Storages/MergeTree/MergeTreeMinMaxIndex.h | 80 ++++++++++ .../Storages/MergeTree/MergeTreeTestIndex.h | 20 ++- .../MergeTree/registerStorageMergeTree.cpp | 2 + 7 files changed, 244 insertions(+), 26 deletions(-) create mode 100644 dbms/src/Storages/MergeTree/MergeTreeMinMaxIndex.cpp create mode 100644 dbms/src/Storages/MergeTree/MergeTreeMinMaxIndex.h diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 87846e491bc..c8d92f32cac 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -997,7 +997,7 @@ namespace DB std::max(range.begin, index_mark * index->granularity), std::min(range.end, (index_mark + 1) * index->granularity)); - if (!condition->mayBeTrueOnGranule(*granule)) + if (!condition->mayBeTrueOnGranule(granule)) continue; if (res.empty() || res.back().end - data_range.begin >= min_marks_for_seek) diff --git a/dbms/src/Storages/MergeTree/MergeTreeIndexes.cpp b/dbms/src/Storages/MergeTree/MergeTreeIndexes.cpp index 2620dc6cc38..02f58fe6275 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeIndexes.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeIndexes.cpp @@ -6,6 +6,8 @@ #include +#include + namespace DB { @@ -17,7 +19,6 @@ namespace ErrorCodes extern const int UNKNOWN_EXCEPTION; } - void MergeTreeIndexFactory::registerIndex(const std::string &name, Creator creator) { if (!indexes.emplace(name, std::move(creator)).second) @@ -32,8 +33,12 @@ std::unique_ptr MergeTreeIndexFactory::get( { if (!node->type) throw Exception( - "for INDEX TYPE is required", - ErrorCodes::INCORRECT_QUERY); + "for index TYPE is required", ErrorCodes::INCORRECT_QUERY); + if (node->type->parameters && !node->type->parameters->children.empty()) + throw Exception( + "Index type can not have parameters", ErrorCodes::INCORRECT_QUERY); + + boost::algorithm::to_lower(node->type->name); auto it = indexes.find(node->type->name); if (it == indexes.end()) throw Exception( diff --git a/dbms/src/Storages/MergeTree/MergeTreeIndexes.h b/dbms/src/Storages/MergeTree/MergeTreeIndexes.h index 239678e1d21..515d2843548 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeIndexes.h +++ b/dbms/src/Storages/MergeTree/MergeTreeIndexes.h @@ -43,15 +43,11 @@ using MergeTreeIndexGranules = std::vector; /// Condition on the index. class IndexCondition { public: - IndexCondition() = default; virtual ~IndexCondition() = default; - /// Checks if this index is useful for query. virtual bool alwaysUnknownOrTrue() const = 0; - virtual bool mayBeTrueOnGranule(const MergeTreeIndexGranule & granule) const = 0; - - MergeTreeIndexPtr index; + virtual bool mayBeTrueOnGranule(MergeTreeIndexGranulePtr granule) const = 0; }; using IndexConditionPtr = std::shared_ptr; @@ -61,12 +57,10 @@ using IndexConditionPtr = std::shared_ptr; class MergeTreeIndex { public: - MergeTreeIndex(String name, ExpressionActionsPtr expr, size_t granularity, Block key) - : name(name), expr(expr), granularity(granularity), sample(key) {} + MergeTreeIndex(String name, ExpressionActionsPtr expr, size_t granularity) + : name(name), expr(expr), granularity(granularity) {} - virtual ~MergeTreeIndex() {}; - - virtual String indexType() const { return "UNKNOWN"; }; + virtual ~MergeTreeIndex() = default; /// gets filename without extension String getFileName() const { return INDEX_FILE_PREFIX + name; }; @@ -81,7 +75,6 @@ public: size_t granularity; Names columns; DataTypes data_types; - Block sample; }; diff --git a/dbms/src/Storages/MergeTree/MergeTreeMinMaxIndex.cpp b/dbms/src/Storages/MergeTree/MergeTreeMinMaxIndex.cpp new file mode 100644 index 00000000000..0cdea36621a --- /dev/null +++ b/dbms/src/Storages/MergeTree/MergeTreeMinMaxIndex.cpp @@ -0,0 +1,140 @@ +#include + + +namespace DB +{ + +MergeTreeMinMaxGranule::MergeTreeMinMaxGranule(const MergeTreeMinMaxIndex & index) + : MergeTreeIndexGranule(), emp(true), index(index) +{ + parallelogram.reserve(index.columns.size()); +} + +void MergeTreeMinMaxGranule::serializeBinary(WriteBuffer & ostr) const +{ + if (empty()) + throw Exception( + "Attempt to write empty minmax index `" + index.name + "`", ErrorCodes::LOGICAL_ERROR); + + for (size_t i = 0; i < index.columns.size(); ++i) + { + const DataTypePtr & type = index.data_types[i]; + + type->serializeBinary(parallelogram[i].left, ostr); + type->serializeBinary(parallelogram[i].right, ostr); + } +} + +void MergeTreeMinMaxGranule::deserializeBinary(ReadBuffer & istr) +{ + for (size_t i = 0; i < index.columns.size(); ++i) + { + const DataTypePtr & type = index.data_types[i]; + + Field min_val; + type->deserializeBinary(min_val, istr); + Field max_val; + type->deserializeBinary(max_val, istr); + } + emp = true; +} + +void MergeTreeMinMaxGranule::update(const Block & block, size_t * pos, size_t limit) +{ + size_t rows_read = 0; + for (size_t i = 0; i < index.columns.size(); ++i) + { + auto column = block.getByName(index.columns[i]).column; + size_t cur; + /// TODO: more effective (index + getExtremes??) + for (cur = 0; cur < limit && cur + *pos < column->size(); ++cur) + { + Field field; + column->get(i, field); + if (parallelogram.size() < i) + { + parallelogram.emplace_back(field, true, field, true); + } + else + { + parallelogram[i].left = std::min(parallelogram[i].left, field); + parallelogram[i].right = std::max(parallelogram[i].right, field); + } + } + rows_read = cur; + } + + *pos += rows_read; + if (rows_read > 0) + emp = false; +}; + + +MinMaxCondition::MinMaxCondition( + const SelectQueryInfo &query, + const Context &context, + const MergeTreeMinMaxIndex &index) + : IndexCondition(), index(index), condition(query, context, index.columns, index.expr) {}; + +bool MinMaxCondition::alwaysUnknownOrTrue() const +{ + return condition.alwaysUnknownOrTrue(); +} + +bool MinMaxCondition::mayBeTrueOnGranule(MergeTreeIndexGranulePtr idx_granule) const +{ + std::shared_ptr granule + = std::dynamic_pointer_cast(idx_granule); + if (!granule) { + throw Exception( + "Minmax index condition got wrong granule", ErrorCodes::LOGICAL_ERROR); + } + + return condition.mayBeTrueInParallelogram(granule->parallelogram, index.data_types); +} + + +MergeTreeIndexGranulePtr MergeTreeMinMaxIndex::createIndexGranule() const +{ + return std::make_shared(*this); +} + +IndexConditionPtr MergeTreeMinMaxIndex::createIndexCondition( + const SelectQueryInfo & query, const Context & context) const +{ +return std::make_shared(query, context, *this); +}; + + +std::unique_ptr MergeTreeMinMaxIndexCreator( + const MergeTreeData & data, + std::shared_ptr node, + const Context & context) +{ + if (node->name.empty()) + throw Exception("Index must have unique name", ErrorCodes::INCORRECT_QUERY); + + if (node->type->arguments) + throw Exception("Minmax index have not any arguments", ErrorCodes::INCORRECT_QUERY); + + ASTPtr expr_list = MergeTreeData::extractKeyExpressionList(node->expr->clone()); + auto syntax = SyntaxAnalyzer(context, {}).analyze( + expr_list, data.getColumns().getAllPhysical()); + auto minmax_expr = ExpressionAnalyzer(expr_list, syntax, context).getActions(false); + + + auto minmax = std::make_unique( + node->name, std::move(minmax_expr), node->granularity.get()); + + const auto & columns_with_types = minmax->expr->getRequiredColumnsWithTypes(); + + for (const auto & column : columns_with_types) + { + minmax->columns.emplace_back(column.name); + minmax->data_types.emplace_back(column.type); + } + + return minmax; +} + +} \ No newline at end of file diff --git a/dbms/src/Storages/MergeTree/MergeTreeMinMaxIndex.h b/dbms/src/Storages/MergeTree/MergeTreeMinMaxIndex.h new file mode 100644 index 00000000000..266cfbf04bc --- /dev/null +++ b/dbms/src/Storages/MergeTree/MergeTreeMinMaxIndex.h @@ -0,0 +1,80 @@ +#pragma once + +#include +#include +#include +#include + +#include +#include +#include + +#include + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; + extern const int INCORRECT_QUERY; +} + +class MergeTreeMinMaxIndex; + +struct MergeTreeMinMaxGranule : public MergeTreeIndexGranule +{ + explicit MergeTreeMinMaxGranule(const MergeTreeMinMaxIndex & index); + + void serializeBinary(WriteBuffer & ostr) const override; + void deserializeBinary(ReadBuffer & istr) override; + bool empty() const override { return emp; } + + void update(const Block & block, size_t * pos, size_t limit) override; + + ~MergeTreeMinMaxGranule() override = default; + + bool emp; + const MergeTreeMinMaxIndex & index; + std::vector parallelogram; +}; + +class MinMaxCondition : public IndexCondition +{ +public: + MinMaxCondition( + const SelectQueryInfo & query, + const Context & context, + const MergeTreeMinMaxIndex & index); + + bool alwaysUnknownOrTrue() const override; + + bool mayBeTrueOnGranule(MergeTreeIndexGranulePtr idx_granule) const override; + + ~MinMaxCondition() override = default; +private: + const MergeTreeMinMaxIndex & index; + KeyCondition condition; +}; + + +class MergeTreeMinMaxIndex : public MergeTreeIndex +{ +public: + MergeTreeMinMaxIndex(String name, ExpressionActionsPtr expr, size_t granularity) + : MergeTreeIndex(name, expr, granularity) {} + + ~MergeTreeMinMaxIndex() override = default; + + MergeTreeIndexGranulePtr createIndexGranule() const override; + + IndexConditionPtr createIndexCondition( + const SelectQueryInfo & query, const Context & context) const override; + +}; + +std::unique_ptr MergeTreeMinMaxIndexCreator( + const MergeTreeData & data, std::shared_ptr node, const Context & context); + +} \ No newline at end of file diff --git a/dbms/src/Storages/MergeTree/MergeTreeTestIndex.h b/dbms/src/Storages/MergeTree/MergeTreeTestIndex.h index 920c4bdb4b6..64d298661b9 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeTestIndex.h +++ b/dbms/src/Storages/MergeTree/MergeTreeTestIndex.h @@ -21,7 +21,7 @@ namespace ErrorCodes class MergeTreeTestIndex; struct MergeTreeTestGranule : public MergeTreeIndexGranule { - ~MergeTreeTestGranule() override {}; + ~MergeTreeTestGranule() override = default;; void serializeBinary(WriteBuffer &ostr) const override { //std::cerr << "TESTINDEX: written " << emp << "\n"; @@ -50,13 +50,13 @@ struct MergeTreeTestGranule : public MergeTreeIndexGranule { class IndexTestCondition : public IndexCondition{ public: - IndexTestCondition() = default; - ~IndexTestCondition() override {}; + IndexTestCondition(int) {}; + ~IndexTestCondition() override = default; /// Checks if this index is useful for query. bool alwaysUnknownOrTrue() const override { return false; }; - bool mayBeTrueOnGranule(const MergeTreeIndexGranule &) const override { + bool mayBeTrueOnGranule(MergeTreeIndexGranulePtr) const override { return true; } @@ -66,12 +66,10 @@ public: class MergeTreeTestIndex : public MergeTreeIndex { public: - MergeTreeTestIndex(String name, ExpressionActionsPtr expr, size_t granularity, Block key) - : MergeTreeIndex(name, expr, granularity, key) {} + MergeTreeTestIndex(String name, ExpressionActionsPtr expr, size_t granularity) + : MergeTreeIndex(name, expr, granularity) {} - ~MergeTreeTestIndex() override {} - - String indexType() const override { return "TEST"; } + ~MergeTreeTestIndex() override = default; /// gets filename without extension @@ -81,7 +79,7 @@ public: IndexConditionPtr createIndexCondition( const SelectQueryInfo & , const Context & ) const override { - return std::make_shared(); + return std::make_shared(4); }; }; @@ -89,7 +87,7 @@ public: std::unique_ptr MTItestCreator( const MergeTreeData & data, std::shared_ptr node, const Context & ) { return std::make_unique( - node->name, data.primary_key_expr, node->granularity.get(), Block{}); + node->name, data.primary_key_expr, node->granularity.get()); } } \ No newline at end of file diff --git a/dbms/src/Storages/MergeTree/registerStorageMergeTree.cpp b/dbms/src/Storages/MergeTree/registerStorageMergeTree.cpp index 095009db00d..8e964c80357 100644 --- a/dbms/src/Storages/MergeTree/registerStorageMergeTree.cpp +++ b/dbms/src/Storages/MergeTree/registerStorageMergeTree.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include @@ -638,6 +639,7 @@ static StoragePtr create(const StorageFactory::Arguments & args) static void registerMergeTreeSkipIndexes() { auto & factory = MergeTreeIndexFactory::instance(); factory.registerIndex("test", MTItestCreator); + factory.registerIndex("minmax", MergeTreeMinMaxIndexCreator); } From ad4df16899a66f055550318326c93f4eeeebce43 Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Tue, 8 Jan 2019 22:41:36 +0300 Subject: [PATCH 2/7] fix --- .../MergeTree/MergeTreeDataSelectExecutor.cpp | 1726 ++++++++--------- 1 file changed, 863 insertions(+), 863 deletions(-) diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index c8d92f32cac..efa18a7d5d5 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -62,953 +62,953 @@ namespace ProfileEvents namespace DB { - namespace ErrorCodes - { - extern const int INDEX_NOT_USED; - extern const int SAMPLING_NOT_SUPPORTED; - extern const int ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER; - extern const int ILLEGAL_COLUMN; - extern const int ARGUMENT_OUT_OF_BOUND; - } +namespace ErrorCodes +{ + extern const int INDEX_NOT_USED; + extern const int SAMPLING_NOT_SUPPORTED; + extern const int ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER; + extern const int ILLEGAL_COLUMN; + extern const int ARGUMENT_OUT_OF_BOUND; +} - MergeTreeDataSelectExecutor::MergeTreeDataSelectExecutor(const MergeTreeData & data_) - : data(data_), log(&Logger::get(data.getLogName() + " (SelectExecutor)")) - { - } +MergeTreeDataSelectExecutor::MergeTreeDataSelectExecutor(const MergeTreeData & data_) + : data(data_), log(&Logger::get(data.getLogName() + " (SelectExecutor)")) +{ +} /// Construct a block consisting only of possible values of virtual columns - static Block getBlockWithPartColumn(const MergeTreeData::DataPartsVector & parts) +static Block getBlockWithPartColumn(const MergeTreeData::DataPartsVector & parts) +{ + auto column = ColumnString::create(); + + for (const auto & part : parts) + column->insert(part->name); + + return Block{ColumnWithTypeAndName(std::move(column), std::make_shared(), "_part")}; +} + + +size_t MergeTreeDataSelectExecutor::getApproximateTotalRowsToRead( + const MergeTreeData::DataPartsVector & parts, const KeyCondition & key_condition, const Settings & settings) const +{ + size_t full_marks_count = 0; + + /// We will find out how many rows we would have read without sampling. + LOG_DEBUG(log, "Preliminary index scan with condition: " << key_condition.toString()); + + for (size_t i = 0; i < parts.size(); ++i) { - auto column = ColumnString::create(); + const MergeTreeData::DataPartPtr & part = parts[i]; + MarkRanges ranges = markRangesFromPKRange(part->index, key_condition, settings); - for (const auto & part : parts) - column->insert(part->name); - - return Block{ColumnWithTypeAndName(std::move(column), std::make_shared(), "_part")}; + /** In order to get a lower bound on the number of rows that match the condition on PK, + * consider only guaranteed full marks. + * That is, do not take into account the first and last marks, which may be incomplete. + */ + for (size_t j = 0; j < ranges.size(); ++j) + if (ranges[j].end - ranges[j].begin > 2) + full_marks_count += ranges[j].end - ranges[j].begin - 2; } - - size_t MergeTreeDataSelectExecutor::getApproximateTotalRowsToRead( - const MergeTreeData::DataPartsVector & parts, const KeyCondition & key_condition, const Settings & settings) const - { - size_t full_marks_count = 0; - - /// We will find out how many rows we would have read without sampling. - LOG_DEBUG(log, "Preliminary index scan with condition: " << key_condition.toString()); - - for (size_t i = 0; i < parts.size(); ++i) - { - const MergeTreeData::DataPartPtr & part = parts[i]; - MarkRanges ranges = markRangesFromPKRange(part->index, key_condition, settings); - - /** In order to get a lower bound on the number of rows that match the condition on PK, - * consider only guaranteed full marks. - * That is, do not take into account the first and last marks, which may be incomplete. - */ - for (size_t j = 0; j < ranges.size(); ++j) - if (ranges[j].end - ranges[j].begin > 2) - full_marks_count += ranges[j].end - ranges[j].begin - 2; - } - - return full_marks_count * data.index_granularity; - } + return full_marks_count * data.index_granularity; +} - using RelativeSize = boost::rational; +using RelativeSize = boost::rational; - std::string toString(const RelativeSize & x) - { - return ASTSampleRatio::toString(x.numerator()) + "/" + ASTSampleRatio::toString(x.denominator()); - } +std::string toString(const RelativeSize & x) +{ + return ASTSampleRatio::toString(x.numerator()) + "/" + ASTSampleRatio::toString(x.denominator()); +} /// Converts sample size to an approximate number of rows (ex. `SAMPLE 1000000`) to relative value (ex. `SAMPLE 0.1`). - static RelativeSize convertAbsoluteSampleSizeToRelative(const ASTPtr & node, size_t approx_total_rows) +static RelativeSize convertAbsoluteSampleSizeToRelative(const ASTPtr & node, size_t approx_total_rows) +{ + if (approx_total_rows == 0) + return 1; + + const ASTSampleRatio & node_sample = typeid_cast(*node); + + auto absolute_sample_size = node_sample.ratio.numerator / node_sample.ratio.denominator; + return std::min(RelativeSize(1), RelativeSize(absolute_sample_size) / RelativeSize(approx_total_rows)); +} + + +BlockInputStreams MergeTreeDataSelectExecutor::read( + const Names & column_names_to_return, + const SelectQueryInfo & query_info, + const Context & context, + const size_t max_block_size, + const unsigned num_streams, + const PartitionIdToMaxBlock * max_block_numbers_to_read) const +{ + return readFromParts( + data.getDataPartsVector(), column_names_to_return, query_info, context, + max_block_size, num_streams, max_block_numbers_to_read); +} + +BlockInputStreams MergeTreeDataSelectExecutor::readFromParts( + MergeTreeData::DataPartsVector parts, + const Names & column_names_to_return, + const SelectQueryInfo & query_info, + const Context & context, + const size_t max_block_size, + const unsigned num_streams, + const PartitionIdToMaxBlock * max_block_numbers_to_read) const +{ + size_t part_index = 0; + + /// If query contains restrictions on the virtual column `_part` or `_part_index`, select only parts suitable for it. + /// The virtual column `_sample_factor` (which is equal to 1 / used sample rate) can be requested in the query. + Names virt_column_names; + Names real_column_names; + + bool part_column_queried = false; + + bool sample_factor_column_queried = false; + Float64 used_sample_factor = 1; + + for (const String & name : column_names_to_return) { - if (approx_total_rows == 0) - return 1; - - const ASTSampleRatio & node_sample = typeid_cast(*node); - - auto absolute_sample_size = node_sample.ratio.numerator / node_sample.ratio.denominator; - return std::min(RelativeSize(1), RelativeSize(absolute_sample_size) / RelativeSize(approx_total_rows)); - } - - - BlockInputStreams MergeTreeDataSelectExecutor::read( - const Names & column_names_to_return, - const SelectQueryInfo & query_info, - const Context & context, - const size_t max_block_size, - const unsigned num_streams, - const PartitionIdToMaxBlock * max_block_numbers_to_read) const - { - return readFromParts( - data.getDataPartsVector(), column_names_to_return, query_info, context, - max_block_size, num_streams, max_block_numbers_to_read); - } - - BlockInputStreams MergeTreeDataSelectExecutor::readFromParts( - MergeTreeData::DataPartsVector parts, - const Names & column_names_to_return, - const SelectQueryInfo & query_info, - const Context & context, - const size_t max_block_size, - const unsigned num_streams, - const PartitionIdToMaxBlock * max_block_numbers_to_read) const - { - size_t part_index = 0; - - /// If query contains restrictions on the virtual column `_part` or `_part_index`, select only parts suitable for it. - /// The virtual column `_sample_factor` (which is equal to 1 / used sample rate) can be requested in the query. - Names virt_column_names; - Names real_column_names; - - bool part_column_queried = false; - - bool sample_factor_column_queried = false; - Float64 used_sample_factor = 1; - - for (const String & name : column_names_to_return) + if (name == "_part") { - if (name == "_part") - { - part_column_queried = true; - virt_column_names.push_back(name); - } - else if (name == "_part_index") - { - virt_column_names.push_back(name); - } - else if (name == "_partition_id") - { - virt_column_names.push_back(name); - } - else if (name == "_sample_factor") - { - sample_factor_column_queried = true; - virt_column_names.push_back(name); - } - else - { - real_column_names.push_back(name); - } + part_column_queried = true; + virt_column_names.push_back(name); } - - NamesAndTypesList available_real_columns = data.getColumns().getAllPhysical(); - - /// If there are only virtual columns in the query, you must request at least one non-virtual one. - if (real_column_names.empty()) - real_column_names.push_back(ExpressionActions::getSmallestColumn(available_real_columns)); - - /// If `_part` virtual column is requested, we try to use it as an index. - Block virtual_columns_block = getBlockWithPartColumn(parts); - if (part_column_queried) - VirtualColumnUtils::filterBlockWithQuery(query_info.query, virtual_columns_block, context); - - std::multiset part_values = VirtualColumnUtils::extractSingleValueFromBlock(virtual_columns_block, "_part"); - - data.check(real_column_names); - - const Settings & settings = context.getSettingsRef(); - Names primary_key_columns = data.primary_key_columns; - - KeyCondition key_condition(query_info, context, primary_key_columns, data.primary_key_expr); - - if (settings.force_primary_key && key_condition.alwaysUnknownOrTrue()) + else if (name == "_part_index") { - std::stringstream exception_message; - exception_message << "Primary key ("; - for (size_t i = 0, size = primary_key_columns.size(); i < size; ++i) - exception_message << (i == 0 ? "" : ", ") << primary_key_columns[i]; - exception_message << ") is not used and setting 'force_primary_key' is set."; - - throw Exception(exception_message.str(), ErrorCodes::INDEX_NOT_USED); + virt_column_names.push_back(name); } - - std::optional minmax_idx_condition; - if (data.minmax_idx_expr) + else if (name == "_partition_id") { - minmax_idx_condition.emplace(query_info, context, data.minmax_idx_columns, data.minmax_idx_expr); - - if (settings.force_index_by_date && minmax_idx_condition->alwaysUnknownOrTrue()) - { - String msg = "MinMax index by columns ("; - bool first = true; - for (const String & col : data.minmax_idx_columns) - { - if (first) - first = false; - else - msg += ", "; - msg += col; - } - msg += ") is not used and setting 'force_index_by_date' is set"; - - throw Exception(msg, ErrorCodes::INDEX_NOT_USED); - } + virt_column_names.push_back(name); } - - /// Select the parts in which there can be data that satisfy `minmax_idx_condition` and that match the condition on `_part`, - /// as well as `max_block_number_to_read`. + else if (name == "_sample_factor") { - auto prev_parts = parts; - parts.clear(); - - for (const auto & part : prev_parts) - { - if (part_values.find(part->name) == part_values.end()) - continue; - - if (part->isEmpty()) - continue; - - if (minmax_idx_condition && !minmax_idx_condition->mayBeTrueInParallelogram( - part->minmax_idx.parallelogram, data.minmax_idx_column_types)) - continue; - - if (max_block_numbers_to_read) - { - auto blocks_iterator = max_block_numbers_to_read->find(part->info.partition_id); - if (blocks_iterator == max_block_numbers_to_read->end() || part->info.max_block > blocks_iterator->second) - continue; - } - - parts.push_back(part); - } - } - - /// Sampling. - Names column_names_to_read = real_column_names; - std::shared_ptr filter_function; - ExpressionActionsPtr filter_expression; - - RelativeSize relative_sample_size = 0; - RelativeSize relative_sample_offset = 0; - - ASTSelectQuery & select = typeid_cast(*query_info.query); - - auto select_sample_size = select.sample_size(); - auto select_sample_offset = select.sample_offset(); - - if (select_sample_size) - { - relative_sample_size.assign( - typeid_cast(*select_sample_size).ratio.numerator, - typeid_cast(*select_sample_size).ratio.denominator); - - if (relative_sample_size < 0) - throw Exception("Negative sample size", ErrorCodes::ARGUMENT_OUT_OF_BOUND); - - relative_sample_offset = 0; - if (select_sample_offset) - relative_sample_offset.assign( - typeid_cast(*select_sample_offset).ratio.numerator, - typeid_cast(*select_sample_offset).ratio.denominator); - - if (relative_sample_offset < 0) - throw Exception("Negative sample offset", ErrorCodes::ARGUMENT_OUT_OF_BOUND); - - /// Convert absolute value of the sampling (in form `SAMPLE 1000000` - how many rows to read) into the relative `SAMPLE 0.1` (how much data to read). - size_t approx_total_rows = 0; - if (relative_sample_size > 1 || relative_sample_offset > 1) - approx_total_rows = getApproximateTotalRowsToRead(parts, key_condition, settings); - - if (relative_sample_size > 1) - { - relative_sample_size = convertAbsoluteSampleSizeToRelative(select_sample_size, approx_total_rows); - LOG_DEBUG(log, "Selected relative sample size: " << toString(relative_sample_size)); - } - - /// SAMPLE 1 is the same as the absence of SAMPLE. - if (relative_sample_size == RelativeSize(1)) - relative_sample_size = 0; - - if (relative_sample_offset > 0 && RelativeSize(0) == relative_sample_size) - throw Exception("Sampling offset is incorrect because no sampling", ErrorCodes::ARGUMENT_OUT_OF_BOUND); - - if (relative_sample_offset > 1) - { - relative_sample_offset = convertAbsoluteSampleSizeToRelative(select_sample_offset, approx_total_rows); - LOG_DEBUG(log, "Selected relative sample offset: " << toString(relative_sample_offset)); - } - } - - /** Which range of sampling key values do I need to read? - * First, in the whole range ("universe") we select the interval - * of relative `relative_sample_size` size, offset from the beginning by `relative_sample_offset`. - * - * Example: SAMPLE 0.4 OFFSET 0.3 - * - * [------********------] - * ^ - offset - * <------> - size - * - * If the interval passes through the end of the universe, then cut its right side. - * - * Example: SAMPLE 0.4 OFFSET 0.8 - * - * [----------------****] - * ^ - offset - * <------> - size - * - * Next, if the `parallel_replicas_count`, `parallel_replica_offset` settings are set, - * then it is necessary to break the received interval into pieces of the number `parallel_replicas_count`, - * and select a piece with the number `parallel_replica_offset` (from zero). - * - * Example: SAMPLE 0.4 OFFSET 0.3, parallel_replicas_count = 2, parallel_replica_offset = 1 - * - * [----------****------] - * ^ - offset - * <------> - size - * <--><--> - pieces for different `parallel_replica_offset`, select the second one. - * - * It is very important that the intervals for different `parallel_replica_offset` cover the entire range without gaps and overlaps. - * It is also important that the entire universe can be covered using SAMPLE 0.1 OFFSET 0, ... OFFSET 0.9 and similar decimals. - */ - - bool use_sampling = relative_sample_size > 0 || settings.parallel_replicas_count > 1; - bool no_data = false; /// There is nothing left after sampling. - - if (use_sampling) - { - if (!data.supportsSampling()) - throw Exception("Illegal SAMPLE: table doesn't support sampling", ErrorCodes::SAMPLING_NOT_SUPPORTED); - - if (sample_factor_column_queried && relative_sample_size != RelativeSize(0)) - used_sample_factor = 1.0 / boost::rational_cast(relative_sample_size); - - RelativeSize size_of_universum = 0; - DataTypePtr type = data.primary_key_sample.getByName(data.sampling_expr_column_name).type; - - if (typeid_cast(type.get())) - size_of_universum = RelativeSize(std::numeric_limits::max()) + RelativeSize(1); - else if (typeid_cast(type.get())) - size_of_universum = RelativeSize(std::numeric_limits::max()) + RelativeSize(1); - else if (typeid_cast(type.get())) - size_of_universum = RelativeSize(std::numeric_limits::max()) + RelativeSize(1); - else if (typeid_cast(type.get())) - size_of_universum = RelativeSize(std::numeric_limits::max()) + RelativeSize(1); - else - throw Exception("Invalid sampling column type in storage parameters: " + type->getName() + ". Must be unsigned integer type.", - ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER); - - if (settings.parallel_replicas_count > 1) - { - if (relative_sample_size == RelativeSize(0)) - relative_sample_size = 1; - - relative_sample_size /= settings.parallel_replicas_count.value; - relative_sample_offset += relative_sample_size * RelativeSize(settings.parallel_replica_offset.value); - } - - if (relative_sample_offset >= RelativeSize(1)) - no_data = true; - - /// Calculate the half-interval of `[lower, upper)` column values. - bool has_lower_limit = false; - bool has_upper_limit = false; - - RelativeSize lower_limit_rational = relative_sample_offset * size_of_universum; - RelativeSize upper_limit_rational = (relative_sample_offset + relative_sample_size) * size_of_universum; - - UInt64 lower = boost::rational_cast(lower_limit_rational); - UInt64 upper = boost::rational_cast(upper_limit_rational); - - if (lower > 0) - has_lower_limit = true; - - if (upper_limit_rational < size_of_universum) - has_upper_limit = true; - - /*std::cerr << std::fixed << std::setprecision(100) - << "relative_sample_size: " << relative_sample_size << "\n" - << "relative_sample_offset: " << relative_sample_offset << "\n" - << "lower_limit_float: " << lower_limit_rational << "\n" - << "upper_limit_float: " << upper_limit_rational << "\n" - << "lower: " << lower << "\n" - << "upper: " << upper << "\n";*/ - - if ((has_upper_limit && upper == 0) - || (has_lower_limit && has_upper_limit && lower == upper)) - no_data = true; - - if (no_data || (!has_lower_limit && !has_upper_limit)) - { - use_sampling = false; - } - else - { - /// Let's add the conditions to cut off something else when the index is scanned again and when the request is processed. - - std::shared_ptr lower_function; - std::shared_ptr upper_function; - - if (has_lower_limit) - { - if (!key_condition.addCondition(data.sampling_expr_column_name, Range::createLeftBounded(lower, true))) - throw Exception("Sampling column not in primary key", ErrorCodes::ILLEGAL_COLUMN); - - ASTPtr args = std::make_shared(); - args->children.push_back(data.getSamplingExpression()); - args->children.push_back(std::make_shared(lower)); - - lower_function = std::make_shared(); - lower_function->name = "greaterOrEquals"; - lower_function->arguments = args; - lower_function->children.push_back(lower_function->arguments); - - filter_function = lower_function; - } - - if (has_upper_limit) - { - if (!key_condition.addCondition(data.sampling_expr_column_name, Range::createRightBounded(upper, false))) - throw Exception("Sampling column not in primary key", ErrorCodes::ILLEGAL_COLUMN); - - ASTPtr args = std::make_shared(); - args->children.push_back(data.getSamplingExpression()); - args->children.push_back(std::make_shared(upper)); - - upper_function = std::make_shared(); - upper_function->name = "less"; - upper_function->arguments = args; - upper_function->children.push_back(upper_function->arguments); - - filter_function = upper_function; - } - - if (has_lower_limit && has_upper_limit) - { - ASTPtr args = std::make_shared(); - args->children.push_back(lower_function); - args->children.push_back(upper_function); - - filter_function = std::make_shared(); - filter_function->name = "and"; - filter_function->arguments = args; - filter_function->children.push_back(filter_function->arguments); - } - - ASTPtr query = filter_function; - auto syntax_result = SyntaxAnalyzer(context, {}).analyze(query, available_real_columns); - filter_expression = ExpressionAnalyzer(filter_function, syntax_result, context).getActions(false); - - /// Add columns needed for `sample_by_ast` to `column_names_to_read`. - std::vector add_columns = filter_expression->getRequiredColumns(); - column_names_to_read.insert(column_names_to_read.end(), add_columns.begin(), add_columns.end()); - std::sort(column_names_to_read.begin(), column_names_to_read.end()); - column_names_to_read.erase(std::unique(column_names_to_read.begin(), column_names_to_read.end()), column_names_to_read.end()); - } - } - - if (no_data) - { - LOG_DEBUG(log, "Sampling yields no data."); - return {}; - } - - LOG_DEBUG(log, "Key condition: " << key_condition.toString()); - if (minmax_idx_condition) - LOG_DEBUG(log, "MinMax index condition: " << minmax_idx_condition->toString()); - - /// PREWHERE - String prewhere_column; - if (select.prewhere_expression) - prewhere_column = select.prewhere_expression->getColumnName(); - - RangesInDataParts parts_with_ranges; - - /// Let's find what range to read from each part. - size_t sum_marks = 0; - size_t sum_ranges = 0; - for (auto & part : parts) - { - RangesInDataPart ranges(part, part_index++); - - if (data.hasPrimaryKey()) - ranges.ranges = markRangesFromPKRange(part->index, key_condition, settings); - else - ranges.ranges = MarkRanges{MarkRange{0, part->marks_count}}; - - /// It can be done in multiple threads (one thread for each part). - /// Maybe it should be moved to BlockInputStream, but it can cause some problems. - for (auto index : data.indexes) { - auto condition = index->createIndexCondition(query_info, context); - if (!condition->alwaysUnknownOrTrue()) { - ranges.ranges = filterMarksUsingIndex(index, condition, part, ranges.ranges, settings); - } - } - - if (!ranges.ranges.empty()) - { - parts_with_ranges.push_back(ranges); - - sum_ranges += ranges.ranges.size(); - for (const auto & range : ranges.ranges) - sum_marks += range.end - range.begin; - } - } - - LOG_DEBUG(log, "Selected " << parts.size() << " parts by date, " << parts_with_ranges.size() << " parts by key, " - << sum_marks << " marks to read from " << sum_ranges << " ranges"); - - if (parts_with_ranges.empty()) - return {}; - - ProfileEvents::increment(ProfileEvents::SelectedParts, parts_with_ranges.size()); - ProfileEvents::increment(ProfileEvents::SelectedRanges, sum_ranges); - ProfileEvents::increment(ProfileEvents::SelectedMarks, sum_marks); - - BlockInputStreams res; - - if (select.final()) - { - /// Add columns needed to calculate the sorting expression and the sign. - std::vector add_columns = data.sorting_key_expr->getRequiredColumns(); - column_names_to_read.insert(column_names_to_read.end(), add_columns.begin(), add_columns.end()); - - if (!data.merging_params.sign_column.empty()) - column_names_to_read.push_back(data.merging_params.sign_column); - if (!data.merging_params.version_column.empty()) - column_names_to_read.push_back(data.merging_params.version_column); - - std::sort(column_names_to_read.begin(), column_names_to_read.end()); - column_names_to_read.erase(std::unique(column_names_to_read.begin(), column_names_to_read.end()), column_names_to_read.end()); - - res = spreadMarkRangesAmongStreamsFinal( - std::move(parts_with_ranges), - column_names_to_read, - max_block_size, - settings.use_uncompressed_cache, - query_info.prewhere_info, - virt_column_names, - settings); + sample_factor_column_queried = true; + virt_column_names.push_back(name); } else { - res = spreadMarkRangesAmongStreams( - std::move(parts_with_ranges), - num_streams, - column_names_to_read, - max_block_size, - settings.use_uncompressed_cache, - query_info.prewhere_info, - virt_column_names, - settings); + real_column_names.push_back(name); } - - if (use_sampling) - for (auto & stream : res) - stream = std::make_shared(stream, filter_expression, filter_function->getColumnName()); - - /// By the way, if a distributed query or query to a Merge table is made, then the `_sample_factor` column can have different values. - if (sample_factor_column_queried) - for (auto & stream : res) - stream = std::make_shared>( - stream, std::make_shared(), used_sample_factor, "_sample_factor"); - - if (query_info.prewhere_info && query_info.prewhere_info->remove_columns_actions) - for (auto & stream : res) - stream = std::make_shared(stream, query_info.prewhere_info->remove_columns_actions); - - return res; } + NamesAndTypesList available_real_columns = data.getColumns().getAllPhysical(); - BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams( - RangesInDataParts && parts, - size_t num_streams, - const Names & column_names, - size_t max_block_size, - bool use_uncompressed_cache, - const PrewhereInfoPtr & prewhere_info, - const Names & virt_columns, - const Settings & settings) const + /// If there are only virtual columns in the query, you must request at least one non-virtual one. + if (real_column_names.empty()) + real_column_names.push_back(ExpressionActions::getSmallestColumn(available_real_columns)); + + /// If `_part` virtual column is requested, we try to use it as an index. + Block virtual_columns_block = getBlockWithPartColumn(parts); + if (part_column_queried) + VirtualColumnUtils::filterBlockWithQuery(query_info.query, virtual_columns_block, context); + + std::multiset part_values = VirtualColumnUtils::extractSingleValueFromBlock(virtual_columns_block, "_part"); + + data.check(real_column_names); + + const Settings & settings = context.getSettingsRef(); + Names primary_key_columns = data.primary_key_columns; + + KeyCondition key_condition(query_info, context, primary_key_columns, data.primary_key_expr); + + if (settings.force_primary_key && key_condition.alwaysUnknownOrTrue()) { - const size_t min_marks_for_concurrent_read = - (settings.merge_tree_min_rows_for_concurrent_read + data.index_granularity - 1) / data.index_granularity; - const size_t max_marks_to_use_cache = - (settings.merge_tree_max_rows_to_use_cache + data.index_granularity - 1) / data.index_granularity; + std::stringstream exception_message; + exception_message << "Primary key ("; + for (size_t i = 0, size = primary_key_columns.size(); i < size; ++i) + exception_message << (i == 0 ? "" : ", ") << primary_key_columns[i]; + exception_message << ") is not used and setting 'force_primary_key' is set."; - /// Count marks for each part. - std::vector sum_marks_in_parts(parts.size()); - size_t sum_marks = 0; - for (size_t i = 0; i < parts.size(); ++i) + throw Exception(exception_message.str(), ErrorCodes::INDEX_NOT_USED); + } + + std::optional minmax_idx_condition; + if (data.minmax_idx_expr) + { + minmax_idx_condition.emplace(query_info, context, data.minmax_idx_columns, data.minmax_idx_expr); + + if (settings.force_index_by_date && minmax_idx_condition->alwaysUnknownOrTrue()) { - /// Let the ranges be listed from right to left so that the leftmost range can be dropped using `pop_back()`. - std::reverse(parts[i].ranges.begin(), parts[i].ranges.end()); - - for (const auto & range : parts[i].ranges) - sum_marks_in_parts[i] += range.end - range.begin; - - sum_marks += sum_marks_in_parts[i]; - } - - if (sum_marks > max_marks_to_use_cache) - use_uncompressed_cache = false; - - BlockInputStreams res; - - if (sum_marks > 0 && settings.merge_tree_uniform_read_distribution == 1) - { - /// Reduce the number of num_streams if the data is small. - if (sum_marks < num_streams * min_marks_for_concurrent_read && parts.size() < num_streams) - num_streams = std::max((sum_marks + min_marks_for_concurrent_read - 1) / min_marks_for_concurrent_read, parts.size()); - - MergeTreeReadPoolPtr pool = std::make_shared( - num_streams, sum_marks, min_marks_for_concurrent_read, parts, data, prewhere_info, true, - column_names, MergeTreeReadPool::BackoffSettings(settings), settings.preferred_block_size_bytes, false); - - /// Let's estimate total number of rows for progress bar. - const size_t total_rows = data.index_granularity * sum_marks; - LOG_TRACE(log, "Reading approx. " << total_rows << " rows with " << num_streams << " streams"); - - for (size_t i = 0; i < num_streams; ++i) + String msg = "MinMax index by columns ("; + bool first = true; + for (const String & col : data.minmax_idx_columns) { - res.emplace_back(std::make_shared( - i, pool, min_marks_for_concurrent_read, max_block_size, settings.preferred_block_size_bytes, - settings.preferred_max_column_in_block_size_bytes, data, use_uncompressed_cache, - prewhere_info, settings, virt_columns)); - - if (i == 0) - { - /// Set the approximate number of rows for the first source only - static_cast(*res.front()).addTotalRowsApprox(total_rows); - } + if (first) + first = false; + else + msg += ", "; + msg += col; } + msg += ") is not used and setting 'force_index_by_date' is set"; + + throw Exception(msg, ErrorCodes::INDEX_NOT_USED); } - else if (sum_marks > 0) + } + + /// Select the parts in which there can be data that satisfy `minmax_idx_condition` and that match the condition on `_part`, + /// as well as `max_block_number_to_read`. + { + auto prev_parts = parts; + parts.clear(); + + for (const auto & part : prev_parts) { - const size_t min_marks_per_stream = (sum_marks - 1) / num_streams + 1; + if (part_values.find(part->name) == part_values.end()) + continue; - for (size_t i = 0; i < num_streams && !parts.empty(); ++i) + if (part->isEmpty()) + continue; + + if (minmax_idx_condition && !minmax_idx_condition->mayBeTrueInParallelogram( + part->minmax_idx.parallelogram, data.minmax_idx_column_types)) + continue; + + if (max_block_numbers_to_read) { - size_t need_marks = min_marks_per_stream; - - /// Loop over parts. - /// We will iteratively take part or some subrange of a part from the back - /// and assign a stream to read from it. - while (need_marks > 0 && !parts.empty()) - { - RangesInDataPart part = parts.back(); - parts.pop_back(); - - size_t & marks_in_part = sum_marks_in_parts.back(); - - /// We will not take too few rows from a part. - if (marks_in_part >= min_marks_for_concurrent_read && - need_marks < min_marks_for_concurrent_read) - need_marks = min_marks_for_concurrent_read; - - /// Do not leave too few rows in the part. - if (marks_in_part > need_marks && - marks_in_part - need_marks < min_marks_for_concurrent_read) - need_marks = marks_in_part; - - MarkRanges ranges_to_get_from_part; - - /// We take the whole part if it is small enough. - if (marks_in_part <= need_marks) - { - /// Restore the order of segments. - std::reverse(part.ranges.begin(), part.ranges.end()); - - ranges_to_get_from_part = part.ranges; - - need_marks -= marks_in_part; - sum_marks_in_parts.pop_back(); - } - else - { - /// Loop through ranges in part. Take enough ranges to cover "need_marks". - while (need_marks > 0) - { - if (part.ranges.empty()) - throw Exception("Unexpected end of ranges while spreading marks among streams", ErrorCodes::LOGICAL_ERROR); - - MarkRange & range = part.ranges.back(); - - const size_t marks_in_range = range.end - range.begin; - const size_t marks_to_get_from_range = std::min(marks_in_range, need_marks); - - ranges_to_get_from_part.emplace_back(range.begin, range.begin + marks_to_get_from_range); - range.begin += marks_to_get_from_range; - marks_in_part -= marks_to_get_from_range; - need_marks -= marks_to_get_from_range; - if (range.begin == range.end) - part.ranges.pop_back(); - } - parts.emplace_back(part); - } - - BlockInputStreamPtr source_stream = std::make_shared( - data, part.data_part, max_block_size, settings.preferred_block_size_bytes, - settings.preferred_max_column_in_block_size_bytes, column_names, ranges_to_get_from_part, - use_uncompressed_cache, prewhere_info, true, settings.min_bytes_to_use_direct_io, - settings.max_read_buffer_size, true, virt_columns, part.part_index_in_query); - - res.push_back(source_stream); - } + auto blocks_iterator = max_block_numbers_to_read->find(part->info.partition_id); + if (blocks_iterator == max_block_numbers_to_read->end() || part->info.max_block > blocks_iterator->second) + continue; } - if (!parts.empty()) - throw Exception("Couldn't spread marks among streams", ErrorCodes::LOGICAL_ERROR); + parts.push_back(part); } - - return res; } - BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal( - RangesInDataParts && parts, - const Names & column_names, - size_t max_block_size, - bool use_uncompressed_cache, - const PrewhereInfoPtr & prewhere_info, - const Names & virt_columns, - const Settings & settings) const + /// Sampling. + Names column_names_to_read = real_column_names; + std::shared_ptr filter_function; + ExpressionActionsPtr filter_expression; + + RelativeSize relative_sample_size = 0; + RelativeSize relative_sample_offset = 0; + + ASTSelectQuery & select = typeid_cast(*query_info.query); + + auto select_sample_size = select.sample_size(); + auto select_sample_offset = select.sample_offset(); + + if (select_sample_size) { - const size_t max_marks_to_use_cache = - (settings.merge_tree_max_rows_to_use_cache + data.index_granularity - 1) / data.index_granularity; + relative_sample_size.assign( + typeid_cast(*select_sample_size).ratio.numerator, + typeid_cast(*select_sample_size).ratio.denominator); - size_t sum_marks = 0; - for (size_t i = 0; i < parts.size(); ++i) - for (size_t j = 0; j < parts[i].ranges.size(); ++j) - sum_marks += parts[i].ranges[j].end - parts[i].ranges[j].begin; + if (relative_sample_size < 0) + throw Exception("Negative sample size", ErrorCodes::ARGUMENT_OUT_OF_BOUND); - if (sum_marks > max_marks_to_use_cache) - use_uncompressed_cache = false; + relative_sample_offset = 0; + if (select_sample_offset) + relative_sample_offset.assign( + typeid_cast(*select_sample_offset).ratio.numerator, + typeid_cast(*select_sample_offset).ratio.denominator); - BlockInputStreams to_merge; + if (relative_sample_offset < 0) + throw Exception("Negative sample offset", ErrorCodes::ARGUMENT_OUT_OF_BOUND); - /// NOTE `merge_tree_uniform_read_distribution` is not used for FINAL + /// Convert absolute value of the sampling (in form `SAMPLE 1000000` - how many rows to read) into the relative `SAMPLE 0.1` (how much data to read). + size_t approx_total_rows = 0; + if (relative_sample_size > 1 || relative_sample_offset > 1) + approx_total_rows = getApproximateTotalRowsToRead(parts, key_condition, settings); - for (size_t part_index = 0; part_index < parts.size(); ++part_index) + if (relative_sample_size > 1) { - RangesInDataPart & part = parts[part_index]; - - BlockInputStreamPtr source_stream = std::make_shared( - data, part.data_part, max_block_size, settings.preferred_block_size_bytes, - settings.preferred_max_column_in_block_size_bytes, column_names, part.ranges, use_uncompressed_cache, - prewhere_info, true, settings.min_bytes_to_use_direct_io, settings.max_read_buffer_size, true, - virt_columns, part.part_index_in_query); - - to_merge.emplace_back(std::make_shared(source_stream, data.sorting_key_expr)); + relative_sample_size = convertAbsoluteSampleSizeToRelative(select_sample_size, approx_total_rows); + LOG_DEBUG(log, "Selected relative sample size: " << toString(relative_sample_size)); } - Names sort_columns = data.sorting_key_columns; - SortDescription sort_description; - size_t sort_columns_size = sort_columns.size(); - sort_description.reserve(sort_columns_size); + /// SAMPLE 1 is the same as the absence of SAMPLE. + if (relative_sample_size == RelativeSize(1)) + relative_sample_size = 0; - Block header = to_merge.at(0)->getHeader(); - for (size_t i = 0; i < sort_columns_size; ++i) - sort_description.emplace_back(header.getPositionByName(sort_columns[i]), 1, 1); + if (relative_sample_offset > 0 && RelativeSize(0) == relative_sample_size) + throw Exception("Sampling offset is incorrect because no sampling", ErrorCodes::ARGUMENT_OUT_OF_BOUND); - BlockInputStreamPtr merged; - switch (data.merging_params.mode) + if (relative_sample_offset > 1) { - case MergeTreeData::MergingParams::Ordinary: - merged = std::make_shared(to_merge, sort_description, max_block_size); - break; - - case MergeTreeData::MergingParams::Collapsing: - merged = std::make_shared( - to_merge, sort_description, data.merging_params.sign_column); - break; - - case MergeTreeData::MergingParams::Summing: - merged = std::make_shared(to_merge, - sort_description, data.merging_params.columns_to_sum, max_block_size); - break; - - case MergeTreeData::MergingParams::Aggregating: - merged = std::make_shared(to_merge, sort_description, max_block_size); - break; - - case MergeTreeData::MergingParams::Replacing: /// TODO Make ReplacingFinalBlockInputStream - merged = std::make_shared(to_merge, - sort_description, data.merging_params.version_column, max_block_size); - break; - - case MergeTreeData::MergingParams::VersionedCollapsing: /// TODO Make VersionedCollapsingFinalBlockInputStream - merged = std::make_shared( - to_merge, sort_description, data.merging_params.sign_column, max_block_size); - break; - - case MergeTreeData::MergingParams::Graphite: - throw Exception("GraphiteMergeTree doesn't support FINAL", ErrorCodes::LOGICAL_ERROR); + relative_sample_offset = convertAbsoluteSampleSizeToRelative(select_sample_offset, approx_total_rows); + LOG_DEBUG(log, "Selected relative sample offset: " << toString(relative_sample_offset)); } - - return {merged}; } + /** Which range of sampling key values do I need to read? + * First, in the whole range ("universe") we select the interval + * of relative `relative_sample_size` size, offset from the beginning by `relative_sample_offset`. + * + * Example: SAMPLE 0.4 OFFSET 0.3 + * + * [------********------] + * ^ - offset + * <------> - size + * + * If the interval passes through the end of the universe, then cut its right side. + * + * Example: SAMPLE 0.4 OFFSET 0.8 + * + * [----------------****] + * ^ - offset + * <------> - size + * + * Next, if the `parallel_replicas_count`, `parallel_replica_offset` settings are set, + * then it is necessary to break the received interval into pieces of the number `parallel_replicas_count`, + * and select a piece with the number `parallel_replica_offset` (from zero). + * + * Example: SAMPLE 0.4 OFFSET 0.3, parallel_replicas_count = 2, parallel_replica_offset = 1 + * + * [----------****------] + * ^ - offset + * <------> - size + * <--><--> - pieces for different `parallel_replica_offset`, select the second one. + * + * It is very important that the intervals for different `parallel_replica_offset` cover the entire range without gaps and overlaps. + * It is also important that the entire universe can be covered using SAMPLE 0.1 OFFSET 0, ... OFFSET 0.9 and similar decimals. + */ - void MergeTreeDataSelectExecutor::createPositiveSignCondition( - ExpressionActionsPtr & out_expression, String & out_column, const Context & context) const + bool use_sampling = relative_sample_size > 0 || settings.parallel_replicas_count > 1; + bool no_data = false; /// There is nothing left after sampling. + + if (use_sampling) { - auto function = std::make_shared(); - auto arguments = std::make_shared(); - auto sign = std::make_shared(data.merging_params.sign_column); - auto one = std::make_shared(1); + if (!data.supportsSampling()) + throw Exception("Illegal SAMPLE: table doesn't support sampling", ErrorCodes::SAMPLING_NOT_SUPPORTED); - function->name = "equals"; - function->arguments = arguments; - function->children.push_back(arguments); + if (sample_factor_column_queried && relative_sample_size != RelativeSize(0)) + used_sample_factor = 1.0 / boost::rational_cast(relative_sample_size); - arguments->children.push_back(sign); - arguments->children.push_back(one); + RelativeSize size_of_universum = 0; + DataTypePtr type = data.primary_key_sample.getByName(data.sampling_expr_column_name).type; - ASTPtr query = function; - auto syntax_result = SyntaxAnalyzer(context, {}).analyze(query, data.getColumns().getAllPhysical()); - out_expression = ExpressionAnalyzer(query, syntax_result, context).getActions(false); - out_column = function->getColumnName(); + if (typeid_cast(type.get())) + size_of_universum = RelativeSize(std::numeric_limits::max()) + RelativeSize(1); + else if (typeid_cast(type.get())) + size_of_universum = RelativeSize(std::numeric_limits::max()) + RelativeSize(1); + else if (typeid_cast(type.get())) + size_of_universum = RelativeSize(std::numeric_limits::max()) + RelativeSize(1); + else if (typeid_cast(type.get())) + size_of_universum = RelativeSize(std::numeric_limits::max()) + RelativeSize(1); + else + throw Exception("Invalid sampling column type in storage parameters: " + type->getName() + ". Must be unsigned integer type.", + ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER); + + if (settings.parallel_replicas_count > 1) + { + if (relative_sample_size == RelativeSize(0)) + relative_sample_size = 1; + + relative_sample_size /= settings.parallel_replicas_count.value; + relative_sample_offset += relative_sample_size * RelativeSize(settings.parallel_replica_offset.value); + } + + if (relative_sample_offset >= RelativeSize(1)) + no_data = true; + + /// Calculate the half-interval of `[lower, upper)` column values. + bool has_lower_limit = false; + bool has_upper_limit = false; + + RelativeSize lower_limit_rational = relative_sample_offset * size_of_universum; + RelativeSize upper_limit_rational = (relative_sample_offset + relative_sample_size) * size_of_universum; + + UInt64 lower = boost::rational_cast(lower_limit_rational); + UInt64 upper = boost::rational_cast(upper_limit_rational); + + if (lower > 0) + has_lower_limit = true; + + if (upper_limit_rational < size_of_universum) + has_upper_limit = true; + + /*std::cerr << std::fixed << std::setprecision(100) + << "relative_sample_size: " << relative_sample_size << "\n" + << "relative_sample_offset: " << relative_sample_offset << "\n" + << "lower_limit_float: " << lower_limit_rational << "\n" + << "upper_limit_float: " << upper_limit_rational << "\n" + << "lower: " << lower << "\n" + << "upper: " << upper << "\n";*/ + + if ((has_upper_limit && upper == 0) + || (has_lower_limit && has_upper_limit && lower == upper)) + no_data = true; + + if (no_data || (!has_lower_limit && !has_upper_limit)) + { + use_sampling = false; + } + else + { + /// Let's add the conditions to cut off something else when the index is scanned again and when the request is processed. + + std::shared_ptr lower_function; + std::shared_ptr upper_function; + + if (has_lower_limit) + { + if (!key_condition.addCondition(data.sampling_expr_column_name, Range::createLeftBounded(lower, true))) + throw Exception("Sampling column not in primary key", ErrorCodes::ILLEGAL_COLUMN); + + ASTPtr args = std::make_shared(); + args->children.push_back(data.getSamplingExpression()); + args->children.push_back(std::make_shared(lower)); + + lower_function = std::make_shared(); + lower_function->name = "greaterOrEquals"; + lower_function->arguments = args; + lower_function->children.push_back(lower_function->arguments); + + filter_function = lower_function; + } + + if (has_upper_limit) + { + if (!key_condition.addCondition(data.sampling_expr_column_name, Range::createRightBounded(upper, false))) + throw Exception("Sampling column not in primary key", ErrorCodes::ILLEGAL_COLUMN); + + ASTPtr args = std::make_shared(); + args->children.push_back(data.getSamplingExpression()); + args->children.push_back(std::make_shared(upper)); + + upper_function = std::make_shared(); + upper_function->name = "less"; + upper_function->arguments = args; + upper_function->children.push_back(upper_function->arguments); + + filter_function = upper_function; + } + + if (has_lower_limit && has_upper_limit) + { + ASTPtr args = std::make_shared(); + args->children.push_back(lower_function); + args->children.push_back(upper_function); + + filter_function = std::make_shared(); + filter_function->name = "and"; + filter_function->arguments = args; + filter_function->children.push_back(filter_function->arguments); + } + + ASTPtr query = filter_function; + auto syntax_result = SyntaxAnalyzer(context, {}).analyze(query, available_real_columns); + filter_expression = ExpressionAnalyzer(filter_function, syntax_result, context).getActions(false); + + /// Add columns needed for `sample_by_ast` to `column_names_to_read`. + std::vector add_columns = filter_expression->getRequiredColumns(); + column_names_to_read.insert(column_names_to_read.end(), add_columns.begin(), add_columns.end()); + std::sort(column_names_to_read.begin(), column_names_to_read.end()); + column_names_to_read.erase(std::unique(column_names_to_read.begin(), column_names_to_read.end()), column_names_to_read.end()); + } } + if (no_data) + { + LOG_DEBUG(log, "Sampling yields no data."); + return {}; + } + + LOG_DEBUG(log, "Key condition: " << key_condition.toString()); + if (minmax_idx_condition) + LOG_DEBUG(log, "MinMax index condition: " << minmax_idx_condition->toString()); + + /// PREWHERE + String prewhere_column; + if (select.prewhere_expression) + prewhere_column = select.prewhere_expression->getColumnName(); + + RangesInDataParts parts_with_ranges; + + /// Let's find what range to read from each part. + size_t sum_marks = 0; + size_t sum_ranges = 0; + for (auto & part : parts) + { + RangesInDataPart ranges(part, part_index++); + + if (data.hasPrimaryKey()) + ranges.ranges = markRangesFromPKRange(part->index, key_condition, settings); + else + ranges.ranges = MarkRanges{MarkRange{0, part->marks_count}}; + + /// It can be done in multiple threads (one thread for each part). + /// Maybe it should be moved to BlockInputStream, but it can cause some problems. + for (auto index : data.indexes) { + auto condition = index->createIndexCondition(query_info, context); + if (!condition->alwaysUnknownOrTrue()) { + ranges.ranges = filterMarksUsingIndex(index, condition, part, ranges.ranges, settings); + } + } + + if (!ranges.ranges.empty()) + { + parts_with_ranges.push_back(ranges); + + sum_ranges += ranges.ranges.size(); + for (const auto & range : ranges.ranges) + sum_marks += range.end - range.begin; + } + } + + LOG_DEBUG(log, "Selected " << parts.size() << " parts by date, " << parts_with_ranges.size() << " parts by key, " + << sum_marks << " marks to read from " << sum_ranges << " ranges"); + + if (parts_with_ranges.empty()) + return {}; + + ProfileEvents::increment(ProfileEvents::SelectedParts, parts_with_ranges.size()); + ProfileEvents::increment(ProfileEvents::SelectedRanges, sum_ranges); + ProfileEvents::increment(ProfileEvents::SelectedMarks, sum_marks); + + BlockInputStreams res; + + if (select.final()) + { + /// Add columns needed to calculate the sorting expression and the sign. + std::vector add_columns = data.sorting_key_expr->getRequiredColumns(); + column_names_to_read.insert(column_names_to_read.end(), add_columns.begin(), add_columns.end()); + + if (!data.merging_params.sign_column.empty()) + column_names_to_read.push_back(data.merging_params.sign_column); + if (!data.merging_params.version_column.empty()) + column_names_to_read.push_back(data.merging_params.version_column); + + std::sort(column_names_to_read.begin(), column_names_to_read.end()); + column_names_to_read.erase(std::unique(column_names_to_read.begin(), column_names_to_read.end()), column_names_to_read.end()); + + res = spreadMarkRangesAmongStreamsFinal( + std::move(parts_with_ranges), + column_names_to_read, + max_block_size, + settings.use_uncompressed_cache, + query_info.prewhere_info, + virt_column_names, + settings); + } + else + { + res = spreadMarkRangesAmongStreams( + std::move(parts_with_ranges), + num_streams, + column_names_to_read, + max_block_size, + settings.use_uncompressed_cache, + query_info.prewhere_info, + virt_column_names, + settings); + } + + if (use_sampling) + for (auto & stream : res) + stream = std::make_shared(stream, filter_expression, filter_function->getColumnName()); + + /// By the way, if a distributed query or query to a Merge table is made, then the `_sample_factor` column can have different values. + if (sample_factor_column_queried) + for (auto & stream : res) + stream = std::make_shared>( + stream, std::make_shared(), used_sample_factor, "_sample_factor"); + + if (query_info.prewhere_info && query_info.prewhere_info->remove_columns_actions) + for (auto & stream : res) + stream = std::make_shared(stream, query_info.prewhere_info->remove_columns_actions); + + return res; +} + + +BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams( + RangesInDataParts && parts, + size_t num_streams, + const Names & column_names, + size_t max_block_size, + bool use_uncompressed_cache, + const PrewhereInfoPtr & prewhere_info, + const Names & virt_columns, + const Settings & settings) const +{ + const size_t min_marks_for_concurrent_read = + (settings.merge_tree_min_rows_for_concurrent_read + data.index_granularity - 1) / data.index_granularity; + const size_t max_marks_to_use_cache = + (settings.merge_tree_max_rows_to_use_cache + data.index_granularity - 1) / data.index_granularity; + + /// Count marks for each part. + std::vector sum_marks_in_parts(parts.size()); + size_t sum_marks = 0; + for (size_t i = 0; i < parts.size(); ++i) + { + /// Let the ranges be listed from right to left so that the leftmost range can be dropped using `pop_back()`. + std::reverse(parts[i].ranges.begin(), parts[i].ranges.end()); + + for (const auto & range : parts[i].ranges) + sum_marks_in_parts[i] += range.end - range.begin; + + sum_marks += sum_marks_in_parts[i]; + } + + if (sum_marks > max_marks_to_use_cache) + use_uncompressed_cache = false; + + BlockInputStreams res; + + if (sum_marks > 0 && settings.merge_tree_uniform_read_distribution == 1) + { + /// Reduce the number of num_streams if the data is small. + if (sum_marks < num_streams * min_marks_for_concurrent_read && parts.size() < num_streams) + num_streams = std::max((sum_marks + min_marks_for_concurrent_read - 1) / min_marks_for_concurrent_read, parts.size()); + + MergeTreeReadPoolPtr pool = std::make_shared( + num_streams, sum_marks, min_marks_for_concurrent_read, parts, data, prewhere_info, true, + column_names, MergeTreeReadPool::BackoffSettings(settings), settings.preferred_block_size_bytes, false); + + /// Let's estimate total number of rows for progress bar. + const size_t total_rows = data.index_granularity * sum_marks; + LOG_TRACE(log, "Reading approx. " << total_rows << " rows with " << num_streams << " streams"); + + for (size_t i = 0; i < num_streams; ++i) + { + res.emplace_back(std::make_shared( + i, pool, min_marks_for_concurrent_read, max_block_size, settings.preferred_block_size_bytes, + settings.preferred_max_column_in_block_size_bytes, data, use_uncompressed_cache, + prewhere_info, settings, virt_columns)); + + if (i == 0) + { + /// Set the approximate number of rows for the first source only + static_cast(*res.front()).addTotalRowsApprox(total_rows); + } + } + } + else if (sum_marks > 0) + { + const size_t min_marks_per_stream = (sum_marks - 1) / num_streams + 1; + + for (size_t i = 0; i < num_streams && !parts.empty(); ++i) + { + size_t need_marks = min_marks_per_stream; + + /// Loop over parts. + /// We will iteratively take part or some subrange of a part from the back + /// and assign a stream to read from it. + while (need_marks > 0 && !parts.empty()) + { + RangesInDataPart part = parts.back(); + parts.pop_back(); + + size_t & marks_in_part = sum_marks_in_parts.back(); + + /// We will not take too few rows from a part. + if (marks_in_part >= min_marks_for_concurrent_read && + need_marks < min_marks_for_concurrent_read) + need_marks = min_marks_for_concurrent_read; + + /// Do not leave too few rows in the part. + if (marks_in_part > need_marks && + marks_in_part - need_marks < min_marks_for_concurrent_read) + need_marks = marks_in_part; + + MarkRanges ranges_to_get_from_part; + + /// We take the whole part if it is small enough. + if (marks_in_part <= need_marks) + { + /// Restore the order of segments. + std::reverse(part.ranges.begin(), part.ranges.end()); + + ranges_to_get_from_part = part.ranges; + + need_marks -= marks_in_part; + sum_marks_in_parts.pop_back(); + } + else + { + /// Loop through ranges in part. Take enough ranges to cover "need_marks". + while (need_marks > 0) + { + if (part.ranges.empty()) + throw Exception("Unexpected end of ranges while spreading marks among streams", ErrorCodes::LOGICAL_ERROR); + + MarkRange & range = part.ranges.back(); + + const size_t marks_in_range = range.end - range.begin; + const size_t marks_to_get_from_range = std::min(marks_in_range, need_marks); + + ranges_to_get_from_part.emplace_back(range.begin, range.begin + marks_to_get_from_range); + range.begin += marks_to_get_from_range; + marks_in_part -= marks_to_get_from_range; + need_marks -= marks_to_get_from_range; + if (range.begin == range.end) + part.ranges.pop_back(); + } + parts.emplace_back(part); + } + + BlockInputStreamPtr source_stream = std::make_shared( + data, part.data_part, max_block_size, settings.preferred_block_size_bytes, + settings.preferred_max_column_in_block_size_bytes, column_names, ranges_to_get_from_part, + use_uncompressed_cache, prewhere_info, true, settings.min_bytes_to_use_direct_io, + settings.max_read_buffer_size, true, virt_columns, part.part_index_in_query); + + res.push_back(source_stream); + } + } + + if (!parts.empty()) + throw Exception("Couldn't spread marks among streams", ErrorCodes::LOGICAL_ERROR); + } + + return res; +} + +BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal( + RangesInDataParts && parts, + const Names & column_names, + size_t max_block_size, + bool use_uncompressed_cache, + const PrewhereInfoPtr & prewhere_info, + const Names & virt_columns, + const Settings & settings) const +{ + const size_t max_marks_to_use_cache = + (settings.merge_tree_max_rows_to_use_cache + data.index_granularity - 1) / data.index_granularity; + + size_t sum_marks = 0; + for (size_t i = 0; i < parts.size(); ++i) + for (size_t j = 0; j < parts[i].ranges.size(); ++j) + sum_marks += parts[i].ranges[j].end - parts[i].ranges[j].begin; + + if (sum_marks > max_marks_to_use_cache) + use_uncompressed_cache = false; + + BlockInputStreams to_merge; + + /// NOTE `merge_tree_uniform_read_distribution` is not used for FINAL + + for (size_t part_index = 0; part_index < parts.size(); ++part_index) + { + RangesInDataPart & part = parts[part_index]; + + BlockInputStreamPtr source_stream = std::make_shared( + data, part.data_part, max_block_size, settings.preferred_block_size_bytes, + settings.preferred_max_column_in_block_size_bytes, column_names, part.ranges, use_uncompressed_cache, + prewhere_info, true, settings.min_bytes_to_use_direct_io, settings.max_read_buffer_size, true, + virt_columns, part.part_index_in_query); + + to_merge.emplace_back(std::make_shared(source_stream, data.sorting_key_expr)); + } + + Names sort_columns = data.sorting_key_columns; + SortDescription sort_description; + size_t sort_columns_size = sort_columns.size(); + sort_description.reserve(sort_columns_size); + + Block header = to_merge.at(0)->getHeader(); + for (size_t i = 0; i < sort_columns_size; ++i) + sort_description.emplace_back(header.getPositionByName(sort_columns[i]), 1, 1); + + BlockInputStreamPtr merged; + switch (data.merging_params.mode) + { + case MergeTreeData::MergingParams::Ordinary: + merged = std::make_shared(to_merge, sort_description, max_block_size); + break; + + case MergeTreeData::MergingParams::Collapsing: + merged = std::make_shared( + to_merge, sort_description, data.merging_params.sign_column); + break; + + case MergeTreeData::MergingParams::Summing: + merged = std::make_shared(to_merge, + sort_description, data.merging_params.columns_to_sum, max_block_size); + break; + + case MergeTreeData::MergingParams::Aggregating: + merged = std::make_shared(to_merge, sort_description, max_block_size); + break; + + case MergeTreeData::MergingParams::Replacing: /// TODO Make ReplacingFinalBlockInputStream + merged = std::make_shared(to_merge, + sort_description, data.merging_params.version_column, max_block_size); + break; + + case MergeTreeData::MergingParams::VersionedCollapsing: /// TODO Make VersionedCollapsingFinalBlockInputStream + merged = std::make_shared( + to_merge, sort_description, data.merging_params.sign_column, max_block_size); + break; + + case MergeTreeData::MergingParams::Graphite: + throw Exception("GraphiteMergeTree doesn't support FINAL", ErrorCodes::LOGICAL_ERROR); + } + + return {merged}; +} + + +void MergeTreeDataSelectExecutor::createPositiveSignCondition( + ExpressionActionsPtr & out_expression, String & out_column, const Context & context) const +{ + auto function = std::make_shared(); + auto arguments = std::make_shared(); + auto sign = std::make_shared(data.merging_params.sign_column); + auto one = std::make_shared(1); + + function->name = "equals"; + function->arguments = arguments; + function->children.push_back(arguments); + + arguments->children.push_back(sign); + arguments->children.push_back(one); + + ASTPtr query = function; + auto syntax_result = SyntaxAnalyzer(context, {}).analyze(query, data.getColumns().getAllPhysical()); + out_expression = ExpressionAnalyzer(query, syntax_result, context).getActions(false); + out_column = function->getColumnName(); +} + /// Calculates a set of mark ranges, that could possibly contain keys, required by condition. /// In other words, it removes subranges from whole range, that definitely could not contain required keys. - MarkRanges MergeTreeDataSelectExecutor::markRangesFromPKRange( - const MergeTreeData::DataPart::Index & index, const KeyCondition & key_condition, const Settings & settings) const +MarkRanges MergeTreeDataSelectExecutor::markRangesFromPKRange( + const MergeTreeData::DataPart::Index & index, const KeyCondition & key_condition, const Settings & settings) const +{ + MarkRanges res; + + size_t marks_count = index.at(0)->size(); + if (marks_count == 0) + return res; + + /// If index is not used. + if (key_condition.alwaysUnknownOrTrue()) { - MarkRanges res; + res.push_back(MarkRange(0, marks_count)); + } + else + { + size_t used_key_size = key_condition.getMaxKeyColumn() + 1; + size_t min_marks_for_seek = (settings.merge_tree_min_rows_for_seek + data.index_granularity - 1) / data.index_granularity; - size_t marks_count = index.at(0)->size(); - if (marks_count == 0) - return res; + /** There will always be disjoint suspicious segments on the stack, the leftmost one at the top (back). + * At each step, take the left segment and check if it fits. + * If fits, split it into smaller ones and put them on the stack. If not, discard it. + * If the segment is already of one mark length, add it to response and discard it. + */ + std::vector ranges_stack{ {0, marks_count} }; + + /// NOTE Creating temporary Field objects to pass to KeyCondition. + Row index_left(used_key_size); + Row index_right(used_key_size); + + while (!ranges_stack.empty())/// In other words, it removes subranges from whole range, that definitely could not contain required keys. - /// If index is not used. - if (key_condition.alwaysUnknownOrTrue()) { - res.push_back(MarkRange(0, marks_count)); - } - else - { - size_t used_key_size = key_condition.getMaxKeyColumn() + 1; - size_t min_marks_for_seek = (settings.merge_tree_min_rows_for_seek + data.index_granularity - 1) / data.index_granularity; - - /** There will always be disjoint suspicious segments on the stack, the leftmost one at the top (back). - * At each step, take the left segment and check if it fits. - * If fits, split it into smaller ones and put them on the stack. If not, discard it. - * If the segment is already of one mark length, add it to response and discard it. - */ - std::vector ranges_stack{ {0, marks_count} }; - - /// NOTE Creating temporary Field objects to pass to KeyCondition. - Row index_left(used_key_size); - Row index_right(used_key_size); - - while (!ranges_stack.empty())/// In other words, it removes subranges from whole range, that definitely could not contain required keys. + MarkRange range = ranges_stack.back(); + ranges_stack.pop_back(); + bool may_be_true; + if (range.end == marks_count) { - MarkRange range = ranges_stack.back(); - ranges_stack.pop_back(); - - bool may_be_true; - if (range.end == marks_count) + for (size_t i = 0; i < used_key_size; ++i) { - for (size_t i = 0; i < used_key_size; ++i) - { - index[i]->get(range.begin, index_left[i]); - } - - may_be_true = key_condition.mayBeTrueAfter( - used_key_size, index_left.data(), data.primary_key_data_types); + index[i]->get(range.begin, index_left[i]); } + + may_be_true = key_condition.mayBeTrueAfter( + used_key_size, index_left.data(), data.primary_key_data_types); + } + else + { + for (size_t i = 0; i < used_key_size; ++i) + { + index[i]->get(range.begin, index_left[i]); + index[i]->get(range.end, index_right[i]); + } + + may_be_true = key_condition.mayBeTrueInRange( + used_key_size, index_left.data(), index_right.data(), data.primary_key_data_types); + } + + if (!may_be_true) + continue; + + if (range.end == range.begin + 1) + { + /// We saw a useful gap between neighboring marks. Either add it to the last range, or start a new range. + if (res.empty() || range.begin - res.back().end > min_marks_for_seek) // is it a bug?? + res.push_back(range); else - { - for (size_t i = 0; i < used_key_size; ++i) - { - index[i]->get(range.begin, index_left[i]); - index[i]->get(range.end, index_right[i]); - } + res.back().end = range.end; + } + else + { + /// Break the segment and put the result on the stack from right to left. + size_t step = (range.end - range.begin - 1) / settings.merge_tree_coarse_index_granularity + 1; + size_t end; - may_be_true = key_condition.mayBeTrueInRange( - used_key_size, index_left.data(), index_right.data(), data.primary_key_data_types); - } + for (end = range.end; end > range.begin + step; end -= step) + ranges_stack.push_back(MarkRange(end - step, end)); - if (!may_be_true) - continue; - - if (range.end == range.begin + 1) - { - /// We saw a useful gap between neighboring marks. Either add it to the last range, or start a new range. - if (res.empty() || range.begin - res.back().end > min_marks_for_seek) // is it a bug?? - res.push_back(range); - else - res.back().end = range.end; - } - else - { - /// Break the segment and put the result on the stack from right to left. - size_t step = (range.end - range.begin - 1) / settings.merge_tree_coarse_index_granularity + 1; - size_t end; - - for (end = range.end; end > range.begin + step; end -= step) - ranges_stack.push_back(MarkRange(end - step, end)); - - ranges_stack.push_back(MarkRange(range.begin, end)); - } + ranges_stack.push_back(MarkRange(range.begin, end)); } } - - return res; } - MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex( - MergeTreeIndexPtr index, - IndexConditionPtr condition, - MergeTreeData::DataPartPtr part, - const MarkRanges & ranges, - const Settings & settings) const - { - if (!Poco::File(part->getFullPath() + index->getFileName() + ".idx").exists()) { - return ranges; - } + return res; +} - const size_t min_marks_for_seek = (settings.merge_tree_min_rows_for_seek + data.index_granularity - 1) / data.index_granularity; - - MergeTreeIndexReader reader( - index, part, - ((part->marks_count + index->granularity - 1) / index->granularity), - ranges); - - MarkRanges res; - - MergeTreeIndexGranulePtr granule = nullptr; - size_t last_index_mark = 0; - for (const auto & range : ranges) - { - MarkRange index_range( - range.begin / index->granularity, - (range.end + index->granularity - 1) / index->granularity); - - if (last_index_mark != index_range.begin || !granule) { - reader.seek(index_range.begin); - } - - for (size_t index_mark = index_range.begin; index_mark < index_range.end; ++index_mark) - { - if (index_mark != index_range.begin || !granule || last_index_mark != index_range.begin) - granule = reader.read(); - - MarkRange data_range( - std::max(range.begin, index_mark * index->granularity), - std::min(range.end, (index_mark + 1) * index->granularity)); - - if (!condition->mayBeTrueOnGranule(granule)) - continue; - - if (res.empty() || res.back().end - data_range.begin >= min_marks_for_seek) - res.push_back(data_range); - else - res.back().end = data_range.end; - } - - last_index_mark = index_range.end - 1; - } - return res; +MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex( + MergeTreeIndexPtr index, + IndexConditionPtr condition, + MergeTreeData::DataPartPtr part, + const MarkRanges & ranges, + const Settings & settings) const +{ + if (!Poco::File(part->getFullPath() + index->getFileName() + ".idx").exists()) { + return ranges; } + const size_t min_marks_for_seek = (settings.merge_tree_min_rows_for_seek + data.index_granularity - 1) / data.index_granularity; + + MergeTreeIndexReader reader( + index, part, + ((part->marks_count + index->granularity - 1) / index->granularity), + ranges); + + MarkRanges res; + + MergeTreeIndexGranulePtr granule = nullptr; + size_t last_index_mark = 0; + for (const auto & range : ranges) + { + MarkRange index_range( + range.begin / index->granularity, + (range.end + index->granularity - 1) / index->granularity); + + if (last_index_mark != index_range.begin || !granule) { + reader.seek(index_range.begin); + } + + for (size_t index_mark = index_range.begin; index_mark < index_range.end; ++index_mark) + { + if (index_mark != index_range.begin || !granule || last_index_mark != index_range.begin) + granule = reader.read(); + + MarkRange data_range( + std::max(range.begin, index_mark * index->granularity), + std::min(range.end, (index_mark + 1) * index->granularity)); + + if (!condition->mayBeTrueOnGranule(granule)) + continue; + + if (res.empty() || res.back().end - data_range.begin >= min_marks_for_seek) + res.push_back(data_range); + else + res.back().end = data_range.end; + } + + last_index_mark = index_range.end - 1; + } + return res; +} + } \ No newline at end of file From 6eeed48e862ec132fa413726f9e4ec3289447ea3 Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Tue, 8 Jan 2019 23:17:45 +0300 Subject: [PATCH 3/7] fixed select --- dbms/src/Storages/MergeTree/MergeTreeMinMaxIndex.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dbms/src/Storages/MergeTree/MergeTreeMinMaxIndex.cpp b/dbms/src/Storages/MergeTree/MergeTreeMinMaxIndex.cpp index 0cdea36621a..0b9b990ba98 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeMinMaxIndex.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeMinMaxIndex.cpp @@ -35,6 +35,8 @@ void MergeTreeMinMaxGranule::deserializeBinary(ReadBuffer & istr) type->deserializeBinary(min_val, istr); Field max_val; type->deserializeBinary(max_val, istr); + + parallelogram.emplace_back(min_val, true, max_val, true); } emp = true; } From e580180efcb999385ee035202db0a95edf6a0bcf Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Wed, 9 Jan 2019 12:54:18 +0300 Subject: [PATCH 4/7] fixed merging --- .../MergeTree/MergeTreeDataMergerMutator.cpp | 29 ++++++++++++++----- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 232146ea417..f2b2d9dbdaa 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -637,19 +637,16 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor BlockInputStreamPtr stream = std::move(input); for (const auto & index : data.indexes) { - stream = std::make_shared(stream, index->expr); + stream = std::make_shared( + std::make_shared(stream, index->expr)); } if (data.hasPrimaryKey()) { - stream = std::make_shared( - BlockInputStreamPtr(std::move(stream)), data.sorting_key_expr); + stream = std::make_shared( + std::make_shared(stream, data.sorting_key_expr)); } - if (!data.indexes.empty() || data.hasPrimaryKey()) { - src_streams.emplace_back(std::make_shared(stream)); - } else { - src_streams.emplace_back(stream); - } + src_streams.emplace_back(stream); } Names sort_columns = data.sorting_key_columns; @@ -658,6 +655,15 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor sort_description.reserve(sort_columns_size); Block header = src_streams.at(0)->getHeader(); + + for (size_t i = 0; i < src_streams.size(); ++i) { + LOG_DEBUG(log, "merging header " << i << "\n"); + auto tmp_h = src_streams.at(i)->getHeader(); + for (auto column : tmp_h.getNames()) { + LOG_DEBUG(log, "column: " << column); + } + } + for (size_t i = 0; i < sort_columns_size; ++i) sort_description.emplace_back(header.getPositionByName(sort_columns[i]), 1, 1); @@ -720,6 +726,13 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor Block block; while (!actions_blocker.isCancelled() && (block = merged_stream->read())) { + LOG_DEBUG(log, "merging\n"); + for (auto column : block.getNames()) + { + LOG_DEBUG(log, "column: " << column); + } + LOG_DEBUG(log, ">>>>>> rows read:: " << block.rows()); + rows_written += block.rows(); to.write(block); From 91fb17f27f99e0bbd5d37f0aa3c30e50a9ceeed2 Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Wed, 9 Jan 2019 12:55:28 +0300 Subject: [PATCH 5/7] fixed mutation --- dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index f2b2d9dbdaa..4e51c744a1c 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -930,14 +930,13 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor /// All columns are modified, proceed to write a new part from scratch. for (const auto & index : data.indexes) { - in = std::make_shared(in, index->expr); + in = std::make_shared( + std::make_shared(in, index->expr)); } if (data.hasPrimaryKey()) in = std::make_shared( std::make_shared(in, data.primary_key_expr)); - else if (!data.indexes.empty()) { - in = std::make_shared(in); } MergeTreeDataPart::MinMaxIndex minmax_idx; From 1e8fa5d9ea774358d916d479a174296a22d10166 Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Wed, 9 Jan 2019 17:15:23 +0300 Subject: [PATCH 6/7] working minmax --- .../MergeTree/MergeTreeDataMergerMutator.cpp | 19 +----- .../MergeTree/MergeTreeDataSelectExecutor.cpp | 8 +++ .../MergeTree/MergeTreeDataWriter.cpp | 6 -- .../src/Storages/MergeTree/MergeTreeIndexes.h | 1 + .../MergeTree/MergeTreeMinMaxIndex.cpp | 64 ++++++++++++++++--- .../Storages/MergeTree/MergeTreeMinMaxIndex.h | 5 +- .../Storages/MergeTree/MergeTreeTestIndex.h | 4 ++ 7 files changed, 71 insertions(+), 36 deletions(-) diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 4e51c744a1c..302ed852926 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -656,14 +656,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor Block header = src_streams.at(0)->getHeader(); - for (size_t i = 0; i < src_streams.size(); ++i) { - LOG_DEBUG(log, "merging header " << i << "\n"); - auto tmp_h = src_streams.at(i)->getHeader(); - for (auto column : tmp_h.getNames()) { - LOG_DEBUG(log, "column: " << column); - } - } - for (size_t i = 0; i < sort_columns_size; ++i) sort_description.emplace_back(header.getPositionByName(sort_columns[i]), 1, 1); @@ -726,13 +718,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor Block block; while (!actions_blocker.isCancelled() && (block = merged_stream->read())) { - LOG_DEBUG(log, "merging\n"); - for (auto column : block.getNames()) - { - LOG_DEBUG(log, "column: " << column); - } - LOG_DEBUG(log, ">>>>>> rows read:: " << block.rows()); - rows_written += block.rows(); to.write(block); @@ -929,15 +914,13 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor { /// All columns are modified, proceed to write a new part from scratch. - for (const auto & index : data.indexes) { + for (const auto & index : data.indexes) in = std::make_shared( std::make_shared(in, index->expr)); - } if (data.hasPrimaryKey()) in = std::make_shared( std::make_shared(in, data.primary_key_expr)); - } MergeTreeDataPart::MinMaxIndex minmax_idx; diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index efa18a7d5d5..5d398bcbacd 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -996,9 +996,17 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex( MarkRange data_range( std::max(range.begin, index_mark * index->granularity), std::min(range.end, (index_mark + 1) * index->granularity)); + LOG_DEBUG(log, "drop out:: " << " data_range [" << + data_range.begin << ", " << data_range.end << ") index_mark = " << index_mark << + " granule data: "); + + LOG_DEBUG(log, granule->toString()); if (!condition->mayBeTrueOnGranule(granule)) + { + LOG_DEBUG(log, "DROP"); continue; + } if (res.empty() || res.back().end - data_range.begin >= min_marks_for_seek) res.push_back(data_range); diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp index 5e5a7fecd21..a0fca13f34a 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp @@ -216,12 +216,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa for (auto index : data.indexes) { - auto index_columns = index->expr->getRequiredColumnsWithTypes(); - for (const auto & column : index_columns) - { - if (!block.has(column.name)) - block.insert(ColumnWithTypeAndName(column.type, column.name)); - } index->expr->execute(block); } diff --git a/dbms/src/Storages/MergeTree/MergeTreeIndexes.h b/dbms/src/Storages/MergeTree/MergeTreeIndexes.h index 515d2843548..62049bed322 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeIndexes.h +++ b/dbms/src/Storages/MergeTree/MergeTreeIndexes.h @@ -31,6 +31,7 @@ struct MergeTreeIndexGranule virtual void serializeBinary(WriteBuffer & ostr) const = 0; virtual void deserializeBinary(ReadBuffer & istr) = 0; + virtual String toString() const = 0; virtual bool empty() const = 0; virtual void update(const Block & block, size_t * pos, size_t limit) = 0; diff --git a/dbms/src/Storages/MergeTree/MergeTreeMinMaxIndex.cpp b/dbms/src/Storages/MergeTree/MergeTreeMinMaxIndex.cpp index 0b9b990ba98..c4b49e51b3b 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeMinMaxIndex.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeMinMaxIndex.cpp @@ -1,13 +1,12 @@ #include - +#include namespace DB { MergeTreeMinMaxGranule::MergeTreeMinMaxGranule(const MergeTreeMinMaxIndex & index) - : MergeTreeIndexGranule(), emp(true), index(index) + : MergeTreeIndexGranule(), index(index), parallelogram() { - parallelogram.reserve(index.columns.size()); } void MergeTreeMinMaxGranule::serializeBinary(WriteBuffer & ostr) const @@ -15,11 +14,18 @@ void MergeTreeMinMaxGranule::serializeBinary(WriteBuffer & ostr) const if (empty()) throw Exception( "Attempt to write empty minmax index `" + index.name + "`", ErrorCodes::LOGICAL_ERROR); + Poco::Logger * log = &Poco::Logger::get("minmax_idx"); + + LOG_DEBUG(log, "serializeBinary Granule"); for (size_t i = 0; i < index.columns.size(); ++i) { const DataTypePtr & type = index.data_types[i]; + LOG_DEBUG(log, "parallel " << i << " :: " + << applyVisitor(FieldVisitorToString(), parallelogram[i].left) << " " + << applyVisitor(FieldVisitorToString(), parallelogram[i].right)); + type->serializeBinary(parallelogram[i].left, ostr); type->serializeBinary(parallelogram[i].right, ostr); } @@ -27,6 +33,10 @@ void MergeTreeMinMaxGranule::serializeBinary(WriteBuffer & ostr) const void MergeTreeMinMaxGranule::deserializeBinary(ReadBuffer & istr) { + Poco::Logger * log = &Poco::Logger::get("minmax_idx"); + + LOG_DEBUG(log, "deserializeBinary Granule"); + parallelogram.clear(); for (size_t i = 0; i < index.columns.size(); ++i) { const DataTypePtr & type = index.data_types[i]; @@ -36,25 +46,51 @@ void MergeTreeMinMaxGranule::deserializeBinary(ReadBuffer & istr) Field max_val; type->deserializeBinary(max_val, istr); + LOG_DEBUG(log, "parallel " << i << " :: " + << applyVisitor(FieldVisitorToString(), min_val) << " " + << applyVisitor(FieldVisitorToString(), max_val)); + parallelogram.emplace_back(min_val, true, max_val, true); } - emp = true; +} + +String MergeTreeMinMaxGranule::toString() const +{ + String res = "minmax granule: "; + + for (size_t i = 0; i < parallelogram.size(); ++i) + { + res += "[" + + applyVisitor(FieldVisitorToString(), parallelogram[i].left) + ", " + + applyVisitor(FieldVisitorToString(), parallelogram[i].right) + "]"; + } + + return res; } void MergeTreeMinMaxGranule::update(const Block & block, size_t * pos, size_t limit) { + Poco::Logger * log = &Poco::Logger::get("minmax_idx"); + + LOG_DEBUG(log, "update Granule " << parallelogram.size() + << " pos: "<< *pos << " limit: " << limit << " rows: " << block.rows()); + size_t rows_read = 0; for (size_t i = 0; i < index.columns.size(); ++i) { + LOG_DEBUG(log, "granule column: " << index.columns[i]); + auto column = block.getByName(index.columns[i]).column; size_t cur; /// TODO: more effective (index + getExtremes??) for (cur = 0; cur < limit && cur + *pos < column->size(); ++cur) { Field field; - column->get(i, field); - if (parallelogram.size() < i) + column->get(cur + *pos, field); + LOG_DEBUG(log, "upd:: " << applyVisitor(FieldVisitorToString(), field)); + if (parallelogram.size() <= i) { + LOG_DEBUG(log, "emplaced"); parallelogram.emplace_back(field, true, field, true); } else @@ -63,12 +99,14 @@ void MergeTreeMinMaxGranule::update(const Block & block, size_t * pos, size_t li parallelogram[i].right = std::max(parallelogram[i].right, field); } } + LOG_DEBUG(log, "res:: [" + << applyVisitor(FieldVisitorToString(), parallelogram[i].left) << ", " + << applyVisitor(FieldVisitorToString(), parallelogram[i].right) << "]"); rows_read = cur; } + LOG_DEBUG(log, "updated rows_read: " << rows_read); *pos += rows_read; - if (rows_read > 0) - emp = false; }; @@ -128,12 +166,18 @@ std::unique_ptr MergeTreeMinMaxIndexCreator( auto minmax = std::make_unique( node->name, std::move(minmax_expr), node->granularity.get()); - const auto & columns_with_types = minmax->expr->getRequiredColumnsWithTypes(); + auto sample = ExpressionAnalyzer(expr_list, syntax, context) + .getActions(true)->getSampleBlock(); - for (const auto & column : columns_with_types) + Poco::Logger * log = &Poco::Logger::get("minmax_idx"); + LOG_DEBUG(log, "new minmax index"); + for (size_t i = 0; i < expr_list->children.size(); ++i) { + const auto & column = sample.getByPosition(i); + minmax->columns.emplace_back(column.name); minmax->data_types.emplace_back(column.type); + LOG_DEBUG(log, ">" << column.name << " " << column.type->getName()); } return minmax; diff --git a/dbms/src/Storages/MergeTree/MergeTreeMinMaxIndex.h b/dbms/src/Storages/MergeTree/MergeTreeMinMaxIndex.h index 266cfbf04bc..3311d8815e9 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeMinMaxIndex.h +++ b/dbms/src/Storages/MergeTree/MergeTreeMinMaxIndex.h @@ -29,13 +29,14 @@ struct MergeTreeMinMaxGranule : public MergeTreeIndexGranule void serializeBinary(WriteBuffer & ostr) const override; void deserializeBinary(ReadBuffer & istr) override; - bool empty() const override { return emp; } + + String toString() const override; + bool empty() const override { return parallelogram.empty(); } void update(const Block & block, size_t * pos, size_t limit) override; ~MergeTreeMinMaxGranule() override = default; - bool emp; const MergeTreeMinMaxIndex & index; std::vector parallelogram; }; diff --git a/dbms/src/Storages/MergeTree/MergeTreeTestIndex.h b/dbms/src/Storages/MergeTree/MergeTreeTestIndex.h index 64d298661b9..93c002f8295 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeTestIndex.h +++ b/dbms/src/Storages/MergeTree/MergeTreeTestIndex.h @@ -36,6 +36,10 @@ struct MergeTreeTestGranule : public MergeTreeIndexGranule { //std::cerr << "TESTINDEX: read " << emp << "\n"; } + String toString() const override { + return "test_index"; + } + bool empty() const override { return emp == 0; } From d8f8b6352b236cce0690f4b5a80f8b9563d8f817 Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Wed, 9 Jan 2019 17:30:25 +0300 Subject: [PATCH 7/7] removed test index --- .../Storages/MergeTree/MergeTreeTestIndex.cpp | 2 - .../Storages/MergeTree/MergeTreeTestIndex.h | 97 ------------------- .../MergeTree/registerStorageMergeTree.cpp | 2 - 3 files changed, 101 deletions(-) delete mode 100644 dbms/src/Storages/MergeTree/MergeTreeTestIndex.cpp delete mode 100644 dbms/src/Storages/MergeTree/MergeTreeTestIndex.h diff --git a/dbms/src/Storages/MergeTree/MergeTreeTestIndex.cpp b/dbms/src/Storages/MergeTree/MergeTreeTestIndex.cpp deleted file mode 100644 index 29e15b66503..00000000000 --- a/dbms/src/Storages/MergeTree/MergeTreeTestIndex.cpp +++ /dev/null @@ -1,2 +0,0 @@ -#include - diff --git a/dbms/src/Storages/MergeTree/MergeTreeTestIndex.h b/dbms/src/Storages/MergeTree/MergeTreeTestIndex.h deleted file mode 100644 index 93c002f8295..00000000000 --- a/dbms/src/Storages/MergeTree/MergeTreeTestIndex.h +++ /dev/null @@ -1,97 +0,0 @@ -#pragma once - -#include -#include -#include - -#include -#include - -#include -#include - -namespace DB { - -namespace ErrorCodes -{ - extern const int FILE_DOESNT_EXIST; -} - - -class MergeTreeTestIndex; - -struct MergeTreeTestGranule : public MergeTreeIndexGranule { - ~MergeTreeTestGranule() override = default;; - - void serializeBinary(WriteBuffer &ostr) const override { - //std::cerr << "TESTINDEX: written " << emp << "\n"; - writeIntBinary(emp, ostr); - } - - void deserializeBinary(ReadBuffer &istr) override { - readIntBinary(emp, istr); - if (emp != 10) { - throw Exception("kek bad read", ErrorCodes::FILE_DOESNT_EXIST); - } - //std::cerr << "TESTINDEX: read " << emp << "\n"; - } - - String toString() const override { - return "test_index"; - } - - bool empty() const override { - return emp == 0; - } - - void update(const Block &block, size_t *pos, size_t limit) override { - *pos += std::min(limit, block.rows() - *pos); - emp = 10; - }; - - Int32 emp = 0; -}; - -class IndexTestCondition : public IndexCondition{ -public: - IndexTestCondition(int) {}; - ~IndexTestCondition() override = default; - - /// Checks if this index is useful for query. - bool alwaysUnknownOrTrue() const override { return false; }; - - bool mayBeTrueOnGranule(MergeTreeIndexGranulePtr) const override { - return true; - } - -}; - - -class MergeTreeTestIndex : public MergeTreeIndex -{ -public: - MergeTreeTestIndex(String name, ExpressionActionsPtr expr, size_t granularity) - : MergeTreeIndex(name, expr, granularity) {} - - ~MergeTreeTestIndex() override = default; - - /// gets filename without extension - - MergeTreeIndexGranulePtr createIndexGranule() const override { - return std::make_shared(); - } - - IndexConditionPtr createIndexCondition( - const SelectQueryInfo & , const Context & ) const override { - return std::make_shared(4); - }; - -}; - -std::unique_ptr MTItestCreator( - const MergeTreeData & data, std::shared_ptr node, const Context & ) { - return std::make_unique( - node->name, data.primary_key_expr, node->granularity.get()); -} - -} \ No newline at end of file diff --git a/dbms/src/Storages/MergeTree/registerStorageMergeTree.cpp b/dbms/src/Storages/MergeTree/registerStorageMergeTree.cpp index 8e964c80357..1f32a7443a9 100644 --- a/dbms/src/Storages/MergeTree/registerStorageMergeTree.cpp +++ b/dbms/src/Storages/MergeTree/registerStorageMergeTree.cpp @@ -2,7 +2,6 @@ #include #include #include -#include #include #include @@ -638,7 +637,6 @@ static StoragePtr create(const StorageFactory::Arguments & args) static void registerMergeTreeSkipIndexes() { auto & factory = MergeTreeIndexFactory::instance(); - factory.registerIndex("test", MTItestCreator); factory.registerIndex("minmax", MergeTreeMinMaxIndexCreator); }