diff --git a/programs/copier/Internals.cpp b/programs/copier/Internals.cpp index 518395e3b7d..12da07a772a 100644 --- a/programs/copier/Internals.cpp +++ b/programs/copier/Internals.cpp @@ -169,17 +169,6 @@ ASTPtr extractOrderBy(const ASTPtr & storage_ast) } -String createCommaSeparatedStringFrom(const Names & names) -{ - std::ostringstream ss; - if (!names.empty()) - { - std::copy(names.begin(), std::prev(names.end()), std::ostream_iterator(ss, ", ")); - ss << names.back(); - } - return ss.str(); -} - Names extractPrimaryKeyColumnNames(const ASTPtr & storage_ast) { const auto sorting_key_ast = extractOrderBy(storage_ast); diff --git a/programs/copier/Internals.h b/programs/copier/Internals.h index b1a94e1a5ca..b61b6d59629 100644 --- a/programs/copier/Internals.h +++ b/programs/copier/Internals.h @@ -40,6 +40,7 @@ #include #include #include +#include #include #include #include @@ -197,8 +198,6 @@ ASTPtr extractPrimaryKey(const ASTPtr & storage_ast); ASTPtr extractOrderBy(const ASTPtr & storage_ast); -String createCommaSeparatedStringFrom(const Names & names); - Names extractPrimaryKeyColumnNames(const ASTPtr & storage_ast); String extractReplicatedTableZookeeperPath(const ASTPtr & storage_ast); diff --git a/programs/copier/TaskTableAndShard.h b/programs/copier/TaskTableAndShard.h index 32841e93a14..0ac533d9209 100644 --- a/programs/copier/TaskTableAndShard.h +++ b/programs/copier/TaskTableAndShard.h @@ -268,7 +268,7 @@ inline TaskTable::TaskTable(TaskCluster & parent, const Poco::Util::AbstractConf ParserStorage parser_storage; engine_push_ast = parseQuery(parser_storage, engine_push_str, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH); engine_push_partition_key_ast = extractPartitionKey(engine_push_ast); - primary_key_comma_separated = createCommaSeparatedStringFrom(extractPrimaryKeyColumnNames(engine_push_ast)); + primary_key_comma_separated = Nested::createCommaSeparatedStringFrom(extractPrimaryKeyColumnNames(engine_push_ast)); engine_push_zk_path = extractReplicatedTableZookeeperPath(engine_push_ast); } diff --git a/src/DataTypes/NestedUtils.cpp b/src/DataTypes/NestedUtils.cpp index 5f8b9f877bf..e365c73a845 100644 --- a/src/DataTypes/NestedUtils.cpp +++ b/src/DataTypes/NestedUtils.cpp @@ -70,6 +70,17 @@ std::pair splitName(const std::string & name) return {{ begin, first_end }, { second_begin, end }}; } +std::string createCommaSeparatedStringFrom(const Names & names) +{ + std::ostringstream ss; + if (!names.empty()) + { + std::copy(names.begin(), std::prev(names.end()), std::ostream_iterator(ss, ", ")); + ss << names.back(); + } + return ss.str(); +} + std::string extractTableName(const std::string & nested_name) { diff --git a/src/DataTypes/NestedUtils.h b/src/DataTypes/NestedUtils.h index 3039fd7f118..4be3e69edfd 100644 --- a/src/DataTypes/NestedUtils.h +++ b/src/DataTypes/NestedUtils.h @@ -13,6 +13,8 @@ namespace Nested std::pair splitName(const std::string & name); + std::string createCommaSeparatedStringFrom(const Names & names); + /// Returns the prefix of the name to the first '.'. Or the name is unchanged if there is no dot. std::string extractTableName(const std::string & nested_name); diff --git a/src/Processors/Merges/Algorithms/SummingSortedAlgorithm.cpp b/src/Processors/Merges/Algorithms/SummingSortedAlgorithm.cpp index 3833e3288fd..bc0c0fef6dc 100644 --- a/src/Processors/Merges/Algorithms/SummingSortedAlgorithm.cpp +++ b/src/Processors/Merges/Algorithms/SummingSortedAlgorithm.cpp @@ -94,6 +94,12 @@ static bool isInPrimaryKey(const SortDescription & description, const std::strin return false; } +static bool isInPartitionKey(const std::string & column_name, const Names & partition_key_columns) +{ + auto is_in_partition_key = std::find(partition_key_columns.begin(), partition_key_columns.end(), column_name); + return is_in_partition_key != partition_key_columns.end(); +} + /// Returns true if merge result is not empty static bool mergeMap(const SummingSortedAlgorithm::MapDescription & desc, Row & row, const ColumnRawPtrs & raw_columns, size_t row_number) @@ -181,7 +187,8 @@ static bool mergeMap(const SummingSortedAlgorithm::MapDescription & desc, static SummingSortedAlgorithm::ColumnsDefinition defineColumns( const Block & header, const SortDescription & description, - const Names & column_names_to_sum) + const Names & column_names_to_sum, + const Names & partition_key_columns) { size_t num_columns = header.columns(); SummingSortedAlgorithm::ColumnsDefinition def; @@ -223,8 +230,8 @@ static SummingSortedAlgorithm::ColumnsDefinition defineColumns( continue; } - /// Are they inside the PK? - if (isInPrimaryKey(description, column.name, i)) + /// Are they inside the primary key or partiton key? + if (isInPrimaryKey(description, column.name, i) || isInPartitionKey(column.name, partition_key_columns)) { def.column_numbers_not_to_aggregate.push_back(i); continue; @@ -617,9 +624,10 @@ SummingSortedAlgorithm::SummingSortedAlgorithm( const Block & header, size_t num_inputs, SortDescription description_, const Names & column_names_to_sum, + const Names & partition_key_columns, size_t max_block_size) : IMergingAlgorithmWithDelayedChunk(num_inputs, std::move(description_)) - , columns_definition(defineColumns(header, description, column_names_to_sum)) + , columns_definition(defineColumns(header, description, column_names_to_sum, partition_key_columns)) , merged_data(getMergedDataColumns(header, columns_definition), max_block_size, columns_definition) { } diff --git a/src/Processors/Merges/Algorithms/SummingSortedAlgorithm.h b/src/Processors/Merges/Algorithms/SummingSortedAlgorithm.h index a188a5fb538..2a455ad4cea 100644 --- a/src/Processors/Merges/Algorithms/SummingSortedAlgorithm.h +++ b/src/Processors/Merges/Algorithms/SummingSortedAlgorithm.h @@ -20,6 +20,8 @@ public: SortDescription description_, /// List of columns to be summed. If empty, all numeric columns that are not in the description are taken. const Names & column_names_to_sum, + /// List of partition key columns. They have to be excluded. + const Names & partition_key_columns, size_t max_block_size); void initialize(Inputs inputs) override; diff --git a/src/Processors/Merges/SummingSortedTransform.h b/src/Processors/Merges/SummingSortedTransform.h index 6fc22681132..22361bb1a44 100644 --- a/src/Processors/Merges/SummingSortedTransform.h +++ b/src/Processors/Merges/SummingSortedTransform.h @@ -16,6 +16,7 @@ public: SortDescription description_, /// List of columns to be summed. If empty, all numeric columns that are not in the description are taken. const Names & column_names_to_sum, + const Names & partition_key_columns, size_t max_block_size) : IMergingTransform( num_inputs, header, header, true, @@ -23,6 +24,7 @@ public: num_inputs, std::move(description_), column_names_to_sum, + partition_key_columns, max_block_size) { } diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 1adb245d9e1..79b3a9aff35 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -169,7 +169,7 @@ MergeTreeData::MergeTreeData( const auto settings = getSettings(); /// NOTE: using the same columns list as is read when performing actual merges. - merging_params.check(metadata_.getColumns().getAllPhysical()); + merging_params.check(metadata_); if (metadata_.sampling_key.definition_ast != nullptr) { @@ -521,8 +521,10 @@ void MergeTreeData::checkStoragePolicy(const StoragePolicyPtr & new_storage_poli } -void MergeTreeData::MergingParams::check(const NamesAndTypesList & columns) const +void MergeTreeData::MergingParams::check(const StorageInMemoryMetadata & metadata) const { + const auto columns = metadata.getColumns().getAllPhysical(); + if (!sign_column.empty() && mode != MergingParams::Collapsing && mode != MergingParams::VersionedCollapsing) throw Exception("Sign column for MergeTree cannot be specified in modes except Collapsing or VersionedCollapsing.", ErrorCodes::LOGICAL_ERROR); @@ -607,6 +609,21 @@ void MergeTreeData::MergingParams::check(const NamesAndTypesList & columns) cons throw Exception( "Column " + column_to_sum + " listed in columns to sum does not exist in table declaration.", ErrorCodes::NO_SUCH_COLUMN_IN_TABLE); } + + /// Check that summing columns are not in partition key. + if (metadata.isPartitionKeyDefined()) + { + auto partition_key_columns = metadata.getPartitionKey().column_names; + + Names names_intersection; + std::set_intersection(columns_to_sum.begin(), columns_to_sum.end(), + partition_key_columns.begin(), partition_key_columns.end(), + std::back_inserter(names_intersection)); + + if (!names_intersection.empty()) + throw Exception("Colums: " + Nested::createCommaSeparatedStringFrom(names_intersection) + + " listed both in colums to sum and in partition key. That is not allowed.", ErrorCodes::BAD_ARGUMENTS); + } } if (mode == MergingParams::Replacing) @@ -638,7 +655,6 @@ String MergeTreeData::MergingParams::getModeName() const __builtin_unreachable(); } - Int64 MergeTreeData::getMaxBlockNumber() const { auto lock = lockParts(); diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index 8fcb879b3ff..0038d2306a4 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -301,7 +301,7 @@ public: Graphite::Params graphite_params; /// Check that needed columns are present and have correct types. - void check(const NamesAndTypesList & columns) const; + void check(const StorageInMemoryMetadata & metadata) const; String getModeName() const; }; diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 00830dd78c2..52aa92d9f48 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -739,6 +739,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor size_t sort_columns_size = sort_columns.size(); sort_description.reserve(sort_columns_size); + Names partition_key_columns = metadata_snapshot->getPartitionKey().column_names; + Block header = pipes.at(0).getHeader(); for (size_t i = 0; i < sort_columns_size; ++i) sort_description.emplace_back(header.getPositionByName(sort_columns[i]), 1, 1); @@ -767,7 +769,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor case MergeTreeData::MergingParams::Summing: merged_transform = std::make_unique( - header, pipes.size(), sort_description, data.merging_params.columns_to_sum, merge_block_size); + header, pipes.size(), sort_description, data.merging_params.columns_to_sum, partition_key_columns, merge_block_size); break; case MergeTreeData::MergingParams::Aggregating: diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 69e819a3cf5..4eee1f1fccb 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -1160,6 +1160,8 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal( size_t sort_columns_size = sort_columns.size(); sort_description.reserve(sort_columns_size); + Names partition_key_columns = metadata_snapshot->getPartitionKey().column_names; + Block header = pipes.at(0).getHeader(); for (size_t i = 0; i < sort_columns_size; ++i) sort_description.emplace_back(header.getPositionByName(sort_columns[i]), 1, 1); @@ -1180,7 +1182,7 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal( case MergeTreeData::MergingParams::Summing: return std::make_shared(header, pipes.size(), - sort_description, data.merging_params.columns_to_sum, max_block_size); + sort_description, data.merging_params.columns_to_sum, partition_key_columns, max_block_size); case MergeTreeData::MergingParams::Aggregating: return std::make_shared(header, pipes.size(), diff --git a/tests/queries/0_stateless/01373_summing_merge_tree_exclude_partition_key.reference b/tests/queries/0_stateless/01373_summing_merge_tree_exclude_partition_key.reference new file mode 100644 index 00000000000..3732e9403ce --- /dev/null +++ b/tests/queries/0_stateless/01373_summing_merge_tree_exclude_partition_key.reference @@ -0,0 +1,20 @@ +--- +1000000 +--- +17 +--- +0 17 +--- +0 17 +1 17 +2 17 +3 17 +4 17 +5 17 +6 17 +7 17 +8 17 +9 17 +10 17 +11 17 +12 17 diff --git a/tests/queries/0_stateless/01373_summing_merge_tree_exclude_partition_key.sql b/tests/queries/0_stateless/01373_summing_merge_tree_exclude_partition_key.sql new file mode 100644 index 00000000000..60c988a2e2f --- /dev/null +++ b/tests/queries/0_stateless/01373_summing_merge_tree_exclude_partition_key.sql @@ -0,0 +1,26 @@ +DROP TABLE IF EXISTS tt_01373; + +CREATE TABLE tt_01373 +(a Int64, d Int64, val Int64) +ENGINE = SummingMergeTree PARTITION BY (a) ORDER BY (d); + +SYSTEM STOP MERGES; + +INSERT INTO tt_01373 SELECT number%13, number%17, 1 from numbers(1000000); + +SELECT '---'; +SELECT count(*) FROM tt_01373; + +SELECT '---'; +SELECT count(*) FROM tt_01373 FINAL; + +SELECT '---'; +SELECT a, count() FROM tt_01373 FINAL GROUP BY a ORDER BY a; + +SYSTEM START MERGES; + +OPTIMIZE TABLE tt_01373 FINAL; +SELECT '---'; +SELECT a, count() FROM tt_01373 GROUP BY a ORDER BY a; + +DROP TABLE IF EXISTS tt_01373; diff --git a/tests/queries/0_stateless/01373_summing_merge_tree_explicit_columns_definition.reference b/tests/queries/0_stateless/01373_summing_merge_tree_explicit_columns_definition.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/01373_summing_merge_tree_explicit_columns_definition.sql b/tests/queries/0_stateless/01373_summing_merge_tree_explicit_columns_definition.sql new file mode 100644 index 00000000000..cc456b3a257 --- /dev/null +++ b/tests/queries/0_stateless/01373_summing_merge_tree_explicit_columns_definition.sql @@ -0,0 +1,7 @@ +DROP TABLE IF EXISTS tt_error_1373; + +CREATE TABLE tt_error_1373 +( a Int64, d Int64, val Int64 ) +ENGINE = SummingMergeTree((a, val)) PARTITION BY (a) ORDER BY (d); -- { serverError 36 } + +DROP TABLE IF EXISTS tt_error_1373; \ No newline at end of file