Merge pull request #12173 from nikitamikhaylov/summing-bug-fix

Exclude partition key columns from SummingMergeTree
This commit is contained in:
alexey-milovidov 2020-07-08 01:46:12 +03:00 committed by GitHub
commit ffdd6e1af6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 110 additions and 24 deletions

View File

@ -169,17 +169,6 @@ ASTPtr extractOrderBy(const ASTPtr & storage_ast)
}
String createCommaSeparatedStringFrom(const Names & names)
{
std::ostringstream ss;
if (!names.empty())
{
std::copy(names.begin(), std::prev(names.end()), std::ostream_iterator<std::string>(ss, ", "));
ss << names.back();
}
return ss.str();
}
Names extractPrimaryKeyColumnNames(const ASTPtr & storage_ast)
{
const auto sorting_key_ast = extractOrderBy(storage_ast);

View File

@ -40,6 +40,7 @@
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/NestedUtils.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ParserQuery.h>
@ -197,8 +198,6 @@ ASTPtr extractPrimaryKey(const ASTPtr & storage_ast);
ASTPtr extractOrderBy(const ASTPtr & storage_ast);
String createCommaSeparatedStringFrom(const Names & names);
Names extractPrimaryKeyColumnNames(const ASTPtr & storage_ast);
String extractReplicatedTableZookeeperPath(const ASTPtr & storage_ast);

View File

@ -268,7 +268,7 @@ inline TaskTable::TaskTable(TaskCluster & parent, const Poco::Util::AbstractConf
ParserStorage parser_storage;
engine_push_ast = parseQuery(parser_storage, engine_push_str, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
engine_push_partition_key_ast = extractPartitionKey(engine_push_ast);
primary_key_comma_separated = createCommaSeparatedStringFrom(extractPrimaryKeyColumnNames(engine_push_ast));
primary_key_comma_separated = Nested::createCommaSeparatedStringFrom(extractPrimaryKeyColumnNames(engine_push_ast));
engine_push_zk_path = extractReplicatedTableZookeeperPath(engine_push_ast);
}

View File

@ -70,6 +70,17 @@ std::pair<std::string, std::string> splitName(const std::string & name)
return {{ begin, first_end }, { second_begin, end }};
}
std::string createCommaSeparatedStringFrom(const Names & names)
{
std::ostringstream ss;
if (!names.empty())
{
std::copy(names.begin(), std::prev(names.end()), std::ostream_iterator<std::string>(ss, ", "));
ss << names.back();
}
return ss.str();
}
std::string extractTableName(const std::string & nested_name)
{

View File

@ -13,6 +13,8 @@ namespace Nested
std::pair<std::string, std::string> splitName(const std::string & name);
std::string createCommaSeparatedStringFrom(const Names & names);
/// Returns the prefix of the name to the first '.'. Or the name is unchanged if there is no dot.
std::string extractTableName(const std::string & nested_name);

View File

@ -94,6 +94,12 @@ static bool isInPrimaryKey(const SortDescription & description, const std::strin
return false;
}
static bool isInPartitionKey(const std::string & column_name, const Names & partition_key_columns)
{
auto is_in_partition_key = std::find(partition_key_columns.begin(), partition_key_columns.end(), column_name);
return is_in_partition_key != partition_key_columns.end();
}
/// Returns true if merge result is not empty
static bool mergeMap(const SummingSortedAlgorithm::MapDescription & desc,
Row & row, const ColumnRawPtrs & raw_columns, size_t row_number)
@ -181,7 +187,8 @@ static bool mergeMap(const SummingSortedAlgorithm::MapDescription & desc,
static SummingSortedAlgorithm::ColumnsDefinition defineColumns(
const Block & header,
const SortDescription & description,
const Names & column_names_to_sum)
const Names & column_names_to_sum,
const Names & partition_key_columns)
{
size_t num_columns = header.columns();
SummingSortedAlgorithm::ColumnsDefinition def;
@ -223,8 +230,8 @@ static SummingSortedAlgorithm::ColumnsDefinition defineColumns(
continue;
}
/// Are they inside the PK?
if (isInPrimaryKey(description, column.name, i))
/// Are they inside the primary key or partiton key?
if (isInPrimaryKey(description, column.name, i) || isInPartitionKey(column.name, partition_key_columns))
{
def.column_numbers_not_to_aggregate.push_back(i);
continue;
@ -617,9 +624,10 @@ SummingSortedAlgorithm::SummingSortedAlgorithm(
const Block & header, size_t num_inputs,
SortDescription description_,
const Names & column_names_to_sum,
const Names & partition_key_columns,
size_t max_block_size)
: IMergingAlgorithmWithDelayedChunk(num_inputs, std::move(description_))
, columns_definition(defineColumns(header, description, column_names_to_sum))
, columns_definition(defineColumns(header, description, column_names_to_sum, partition_key_columns))
, merged_data(getMergedDataColumns(header, columns_definition), max_block_size, columns_definition)
{
}

View File

@ -20,6 +20,8 @@ public:
SortDescription description_,
/// List of columns to be summed. If empty, all numeric columns that are not in the description are taken.
const Names & column_names_to_sum,
/// List of partition key columns. They have to be excluded.
const Names & partition_key_columns,
size_t max_block_size);
void initialize(Inputs inputs) override;

View File

@ -16,6 +16,7 @@ public:
SortDescription description_,
/// List of columns to be summed. If empty, all numeric columns that are not in the description are taken.
const Names & column_names_to_sum,
const Names & partition_key_columns,
size_t max_block_size)
: IMergingTransform(
num_inputs, header, header, true,
@ -23,6 +24,7 @@ public:
num_inputs,
std::move(description_),
column_names_to_sum,
partition_key_columns,
max_block_size)
{
}

View File

@ -169,7 +169,7 @@ MergeTreeData::MergeTreeData(
const auto settings = getSettings();
/// NOTE: using the same columns list as is read when performing actual merges.
merging_params.check(metadata_.getColumns().getAllPhysical());
merging_params.check(metadata_);
if (metadata_.sampling_key.definition_ast != nullptr)
{
@ -521,8 +521,10 @@ void MergeTreeData::checkStoragePolicy(const StoragePolicyPtr & new_storage_poli
}
void MergeTreeData::MergingParams::check(const NamesAndTypesList & columns) const
void MergeTreeData::MergingParams::check(const StorageInMemoryMetadata & metadata) const
{
const auto columns = metadata.getColumns().getAllPhysical();
if (!sign_column.empty() && mode != MergingParams::Collapsing && mode != MergingParams::VersionedCollapsing)
throw Exception("Sign column for MergeTree cannot be specified in modes except Collapsing or VersionedCollapsing.",
ErrorCodes::LOGICAL_ERROR);
@ -607,6 +609,21 @@ void MergeTreeData::MergingParams::check(const NamesAndTypesList & columns) cons
throw Exception(
"Column " + column_to_sum + " listed in columns to sum does not exist in table declaration.", ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
}
/// Check that summing columns are not in partition key.
if (metadata.isPartitionKeyDefined())
{
auto partition_key_columns = metadata.getPartitionKey().column_names;
Names names_intersection;
std::set_intersection(columns_to_sum.begin(), columns_to_sum.end(),
partition_key_columns.begin(), partition_key_columns.end(),
std::back_inserter(names_intersection));
if (!names_intersection.empty())
throw Exception("Colums: " + Nested::createCommaSeparatedStringFrom(names_intersection) +
" listed both in colums to sum and in partition key. That is not allowed.", ErrorCodes::BAD_ARGUMENTS);
}
}
if (mode == MergingParams::Replacing)
@ -638,7 +655,6 @@ String MergeTreeData::MergingParams::getModeName() const
__builtin_unreachable();
}
Int64 MergeTreeData::getMaxBlockNumber() const
{
auto lock = lockParts();

View File

@ -301,7 +301,7 @@ public:
Graphite::Params graphite_params;
/// Check that needed columns are present and have correct types.
void check(const NamesAndTypesList & columns) const;
void check(const StorageInMemoryMetadata & metadata) const;
String getModeName() const;
};

View File

@ -739,6 +739,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
size_t sort_columns_size = sort_columns.size();
sort_description.reserve(sort_columns_size);
Names partition_key_columns = metadata_snapshot->getPartitionKey().column_names;
Block header = pipes.at(0).getHeader();
for (size_t i = 0; i < sort_columns_size; ++i)
sort_description.emplace_back(header.getPositionByName(sort_columns[i]), 1, 1);
@ -767,7 +769,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
case MergeTreeData::MergingParams::Summing:
merged_transform = std::make_unique<SummingSortedTransform>(
header, pipes.size(), sort_description, data.merging_params.columns_to_sum, merge_block_size);
header, pipes.size(), sort_description, data.merging_params.columns_to_sum, partition_key_columns, merge_block_size);
break;
case MergeTreeData::MergingParams::Aggregating:

View File

@ -1160,6 +1160,8 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
size_t sort_columns_size = sort_columns.size();
sort_description.reserve(sort_columns_size);
Names partition_key_columns = metadata_snapshot->getPartitionKey().column_names;
Block header = pipes.at(0).getHeader();
for (size_t i = 0; i < sort_columns_size; ++i)
sort_description.emplace_back(header.getPositionByName(sort_columns[i]), 1, 1);
@ -1180,7 +1182,7 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
case MergeTreeData::MergingParams::Summing:
return std::make_shared<SummingSortedTransform>(header, pipes.size(),
sort_description, data.merging_params.columns_to_sum, max_block_size);
sort_description, data.merging_params.columns_to_sum, partition_key_columns, max_block_size);
case MergeTreeData::MergingParams::Aggregating:
return std::make_shared<AggregatingSortedTransform>(header, pipes.size(),

View File

@ -0,0 +1,20 @@
---
1000000
---
17
---
0 17
---
0 17
1 17
2 17
3 17
4 17
5 17
6 17
7 17
8 17
9 17
10 17
11 17
12 17

View File

@ -0,0 +1,26 @@
DROP TABLE IF EXISTS tt_01373;
CREATE TABLE tt_01373
(a Int64, d Int64, val Int64)
ENGINE = SummingMergeTree PARTITION BY (a) ORDER BY (d);
SYSTEM STOP MERGES;
INSERT INTO tt_01373 SELECT number%13, number%17, 1 from numbers(1000000);
SELECT '---';
SELECT count(*) FROM tt_01373;
SELECT '---';
SELECT count(*) FROM tt_01373 FINAL;
SELECT '---';
SELECT a, count() FROM tt_01373 FINAL GROUP BY a ORDER BY a;
SYSTEM START MERGES;
OPTIMIZE TABLE tt_01373 FINAL;
SELECT '---';
SELECT a, count() FROM tt_01373 GROUP BY a ORDER BY a;
DROP TABLE IF EXISTS tt_01373;

View File

@ -0,0 +1,7 @@
DROP TABLE IF EXISTS tt_error_1373;
CREATE TABLE tt_error_1373
( a Int64, d Int64, val Int64 )
ENGINE = SummingMergeTree((a, val)) PARTITION BY (a) ORDER BY (d); -- { serverError 36 }
DROP TABLE IF EXISTS tt_error_1373;