diff --git a/src/Core/MultiEnum.h b/src/Core/MultiEnum.h index a7e3393c9c3..40cf166ccf7 100644 --- a/src/Core/MultiEnum.h +++ b/src/Core/MultiEnum.h @@ -13,12 +13,12 @@ struct MultiEnum MultiEnum() = default; template ...>>> - explicit MultiEnum(EnumValues ... v) + constexpr explicit MultiEnum(EnumValues ... v) : MultiEnum((toBitFlag(v) | ... | 0u)) {} template >> - explicit MultiEnum(ValueType v) + constexpr explicit MultiEnum(ValueType v) : bitset(v) { static_assert(std::is_unsigned_v); @@ -95,5 +95,5 @@ struct MultiEnum private: StorageType bitset = 0; - static StorageType toBitFlag(EnumType v) { return StorageType{1} << static_cast(v); } + static constexpr StorageType toBitFlag(EnumType v) { return StorageType{1} << static_cast(v); } }; diff --git a/src/Interpreters/InterpreterInsertQuery.cpp b/src/Interpreters/InterpreterInsertQuery.cpp index 39381bf0241..2db950d6f90 100644 --- a/src/Interpreters/InterpreterInsertQuery.cpp +++ b/src/Interpreters/InterpreterInsertQuery.cpp @@ -32,6 +32,7 @@ #include #include #include +#include namespace { @@ -50,7 +51,6 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } - InterpreterInsertQuery::InterpreterInsertQuery( const ASTPtr & query_ptr_, const Context & context_, bool allow_materialized_, bool no_squash_, bool no_destination_) : query_ptr(query_ptr_) @@ -93,27 +93,7 @@ Block InterpreterInsertQuery::getSampleBlock( Block table_sample = metadata_snapshot->getSampleBlock(); - /// Process column transformers (e.g. * EXCEPT(a)), asterisks and qualified columns. - const auto & columns = metadata_snapshot->getColumns(); - auto names_and_types = columns.getOrdinary(); - removeDuplicateColumns(names_and_types); - auto table_expr = std::make_shared(); - table_expr->database_and_table_name = createTableIdentifier(table->getStorageID()); - table_expr->children.push_back(table_expr->database_and_table_name); - TablesWithColumns tables_with_columns; - tables_with_columns.emplace_back(DatabaseAndTableWithAlias(*table_expr, context.getCurrentDatabase()), names_and_types); - - tables_with_columns[0].addHiddenColumns(columns.getMaterialized()); - tables_with_columns[0].addHiddenColumns(columns.getAliases()); - tables_with_columns[0].addHiddenColumns(table->getVirtuals()); - - NameSet source_columns_set; - for (const auto & identifier : query.columns->children) - source_columns_set.insert(identifier->getColumnName()); - TranslateQualifiedNamesVisitor::Data visitor_data(source_columns_set, tables_with_columns); - TranslateQualifiedNamesVisitor visitor(visitor_data); - auto columns_ast = query.columns->clone(); - visitor.visit(columns_ast); + const auto columns_ast = processColumnTransformers(context.getCurrentDatabase(), table, metadata_snapshot, query.columns); /// Form the block based on the column names from the query Block res; diff --git a/src/Interpreters/InterpreterOptimizeQuery.cpp b/src/Interpreters/InterpreterOptimizeQuery.cpp index 680dd9b803b..714c2afa4ff 100644 --- a/src/Interpreters/InterpreterOptimizeQuery.cpp +++ b/src/Interpreters/InterpreterOptimizeQuery.cpp @@ -5,13 +5,18 @@ #include #include #include +#include +#include + +#include namespace DB { namespace ErrorCodes { + extern const int THERE_IS_NO_COLUMN; } @@ -27,7 +32,30 @@ BlockIO InterpreterOptimizeQuery::execute() auto table_id = context.resolveStorageID(ast, Context::ResolveOrdinary); StoragePtr table = DatabaseCatalog::instance().getTable(table_id, context); auto metadata_snapshot = table->getInMemoryMetadataPtr(); - table->optimize(query_ptr, metadata_snapshot, ast.partition, ast.final, ast.deduplicate, context); + + // Empty list of names means we deduplicate by all columns, but user can explicitly state which columns to use. + Names column_names; + if (ast.deduplicate_by_columns) + { + // User requested custom set of columns for deduplication, possibly with Column Transformer expression. + { + const auto cols = processColumnTransformers(context.getCurrentDatabase(), table, metadata_snapshot, ast.deduplicate_by_columns); + for (const auto & col : cols->children) + column_names.emplace_back(col->getColumnName()); + } + + metadata_snapshot->check(column_names, NamesAndTypesList{}, table_id); + // TODO: validate that deduplicate_by_columns contains all primary key columns. + for (const auto & primary_key : metadata_snapshot->getPrimaryKeyColumns()) + { + if (std::find(column_names.begin(), column_names.end(), primary_key) == column_names.end()) + throw Exception("Deduplicate expression doesn't contain primary key column: " + primary_key, + ErrorCodes::THERE_IS_NO_COLUMN); + } + } + + table->optimize(query_ptr, metadata_snapshot, ast.partition, ast.final, ast.deduplicate, column_names, context); + return {}; } diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 43c5102fa32..ce2917b96f0 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -80,6 +80,8 @@ #include #include +#include +#include namespace DB { diff --git a/src/Interpreters/processColumnTransformers.cpp b/src/Interpreters/processColumnTransformers.cpp new file mode 100644 index 00000000000..afd99cb6f07 --- /dev/null +++ b/src/Interpreters/processColumnTransformers.cpp @@ -0,0 +1,49 @@ +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +ASTPtr processColumnTransformers( + const String & current_database, + const StoragePtr & table, + const StorageMetadataPtr & metadata_snapshot, + ASTPtr query_columns) +{ + const auto & columns = metadata_snapshot->getColumns(); + auto names_and_types = columns.getOrdinary(); + removeDuplicateColumns(names_and_types); + + TablesWithColumns tables_with_columns; + { + auto table_expr = std::make_shared(); + table_expr->database_and_table_name = createTableIdentifier(table->getStorageID()); + table_expr->children.push_back(table_expr->database_and_table_name); + tables_with_columns.emplace_back(DatabaseAndTableWithAlias(*table_expr, current_database), names_and_types); + } + + tables_with_columns[0].addHiddenColumns(columns.getMaterialized()); + tables_with_columns[0].addHiddenColumns(columns.getAliases()); + tables_with_columns[0].addHiddenColumns(table->getVirtuals()); + + NameSet source_columns_set; + for (const auto & identifier : query_columns->children) + source_columns_set.insert(identifier->getColumnName()); + + TranslateQualifiedNamesVisitor::Data visitor_data(source_columns_set, tables_with_columns); + TranslateQualifiedNamesVisitor visitor(visitor_data); + auto columns_ast = query_columns->clone(); + visitor.visit(columns_ast); + + return columns_ast; +} + +} diff --git a/src/Interpreters/processColumnTransformers.h b/src/Interpreters/processColumnTransformers.h new file mode 100644 index 00000000000..8c15cf60a02 --- /dev/null +++ b/src/Interpreters/processColumnTransformers.h @@ -0,0 +1,19 @@ +#pragma once + +#include +#include + +namespace DB +{ + +struct StorageInMemoryMetadata; +using StorageMetadataPtr = std::shared_ptr; + +/// Process column transformers (e.g. * EXCEPT(a)), asterisks and qualified columns. +ASTPtr processColumnTransformers( + const String & current_database, + const StoragePtr & table, + const StorageMetadataPtr & metadata_snapshot, + ASTPtr query_columns); + +} diff --git a/src/Interpreters/ya.make b/src/Interpreters/ya.make index 7d72e1949b7..84bfbcecf9f 100644 --- a/src/Interpreters/ya.make +++ b/src/Interpreters/ya.make @@ -155,6 +155,7 @@ SRCS( interpretSubquery.cpp join_common.cpp loadMetadata.cpp + processColumnTransformers.cpp sortBlock.cpp ) diff --git a/src/Parsers/ASTOptimizeQuery.cpp b/src/Parsers/ASTOptimizeQuery.cpp index ae83952899d..6423e247ecc 100644 --- a/src/Parsers/ASTOptimizeQuery.cpp +++ b/src/Parsers/ASTOptimizeQuery.cpp @@ -23,6 +23,12 @@ void ASTOptimizeQuery::formatQueryImpl(const FormatSettings & settings, FormatSt if (deduplicate) settings.ostr << (settings.hilite ? hilite_keyword : "") << " DEDUPLICATE" << (settings.hilite ? hilite_none : ""); + + if (deduplicate_by_columns) + { + settings.ostr << (settings.hilite ? hilite_keyword : "") << " BY " << (settings.hilite ? hilite_none : ""); + deduplicate_by_columns->formatImpl(settings, state, frame); + } } } diff --git a/src/Parsers/ASTOptimizeQuery.h b/src/Parsers/ASTOptimizeQuery.h index c93fea2b6d3..5372b37fd90 100644 --- a/src/Parsers/ASTOptimizeQuery.h +++ b/src/Parsers/ASTOptimizeQuery.h @@ -16,9 +16,11 @@ public: /// The partition to optimize can be specified. ASTPtr partition; /// A flag can be specified - perform optimization "to the end" instead of one step. - bool final; + bool final = false; /// Do deduplicate (default: false) bool deduplicate; + /// Deduplicate by columns. + ASTPtr deduplicate_by_columns; /** Get the text that identifies this element. */ String getID(char delim) const override @@ -37,6 +39,12 @@ public: res->children.push_back(res->partition); } + if (deduplicate_by_columns) + { + res->deduplicate_by_columns = deduplicate_by_columns->clone(); + res->children.push_back(res->deduplicate_by_columns); + } + return res; } diff --git a/src/Parsers/ExpressionElementParsers.cpp b/src/Parsers/ExpressionElementParsers.cpp index 6fbe9abc96e..6bb131cca9c 100644 --- a/src/Parsers/ExpressionElementParsers.cpp +++ b/src/Parsers/ExpressionElementParsers.cpp @@ -1273,7 +1273,7 @@ bool ParserColumnsTransformers::parseImpl(Pos & pos, ASTPtr & node, Expected & e ParserKeyword as("AS"); ParserKeyword strict("STRICT"); - if (apply.ignore(pos, expected)) + if (allowed_transformers.isSet(ColumnTransformer::APPLY) && apply.ignore(pos, expected)) { bool with_open_round_bracket = false; @@ -1326,7 +1326,7 @@ bool ParserColumnsTransformers::parseImpl(Pos & pos, ASTPtr & node, Expected & e node = std::move(res); return true; } - else if (except.ignore(pos, expected)) + else if (allowed_transformers.isSet(ColumnTransformer::EXCEPT) && except.ignore(pos, expected)) { if (strict.ignore(pos, expected)) is_strict = true; @@ -1366,7 +1366,7 @@ bool ParserColumnsTransformers::parseImpl(Pos & pos, ASTPtr & node, Expected & e node = std::move(res); return true; } - else if (replace.ignore(pos, expected)) + else if (allowed_transformers.isSet(ColumnTransformer::REPLACE) && replace.ignore(pos, expected)) { if (strict.ignore(pos, expected)) is_strict = true; diff --git a/src/Parsers/ExpressionElementParsers.h b/src/Parsers/ExpressionElementParsers.h index 4713a7ea8a6..0bc2fbe745f 100644 --- a/src/Parsers/ExpressionElementParsers.h +++ b/src/Parsers/ExpressionElementParsers.h @@ -70,42 +70,81 @@ protected: bool allow_query_parameter; }; +/** *, t.*, db.table.*, COLUMNS('') APPLY(...) or EXCEPT(...) or REPLACE(...) + */ +class ParserColumnsTransformers : public IParserBase +{ +public: + enum class ColumnTransformer : UInt8 + { + APPLY, + EXCEPT, + REPLACE, + }; + using ColumnTransformers = MultiEnum; + static constexpr auto AllTransformers = ColumnTransformers{ColumnTransformer::APPLY, ColumnTransformer::EXCEPT, ColumnTransformer::REPLACE}; + + ParserColumnsTransformers(ColumnTransformers allowed_transformers_ = AllTransformers, bool is_strict_ = false) + : allowed_transformers(allowed_transformers_) + , is_strict(is_strict_) + {} + +protected: + const char * getName() const override { return "COLUMNS transformers"; } + bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override; + ColumnTransformers allowed_transformers; + bool is_strict; +}; + + /// Just * class ParserAsterisk : public IParserBase { +public: + using ColumnTransformers = ParserColumnsTransformers::ColumnTransformers; + ParserAsterisk(ColumnTransformers allowed_transformers_ = ParserColumnsTransformers::AllTransformers) + : allowed_transformers(allowed_transformers_) + {} + protected: const char * getName() const override { return "asterisk"; } bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override; + + ColumnTransformers allowed_transformers; }; /** Something like t.* or db.table.* */ class ParserQualifiedAsterisk : public IParserBase { +public: + using ColumnTransformers = ParserColumnsTransformers::ColumnTransformers; + ParserQualifiedAsterisk(ColumnTransformers allowed_transformers_ = ParserColumnsTransformers::AllTransformers) + : allowed_transformers(allowed_transformers_) + {} + protected: const char * getName() const override { return "qualified asterisk"; } bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override; + + ColumnTransformers allowed_transformers; }; /** COLUMNS('') */ class ParserColumnsMatcher : public IParserBase { +public: + using ColumnTransformers = ParserColumnsTransformers::ColumnTransformers; + ParserColumnsMatcher(ColumnTransformers allowed_transformers_ = ParserColumnsTransformers::AllTransformers) + : allowed_transformers(allowed_transformers_) + {} + protected: const char * getName() const override { return "COLUMNS matcher"; } bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override; -}; -/** *, t.*, db.table.*, COLUMNS('') APPLY(...) or EXCEPT(...) or REPLACE(...) - */ -class ParserColumnsTransformers : public IParserBase -{ -public: - ParserColumnsTransformers(bool is_strict_ = false): is_strict(is_strict_) {} -protected: - const char * getName() const override { return "COLUMNS transformers"; } - bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override; - bool is_strict; + ColumnTransformers allowed_transformers; }; /** A function, for example, f(x, y + 1, g(z)). diff --git a/src/Parsers/IParserBase.cpp b/src/Parsers/IParserBase.cpp index 0241250926d..7e5a0edbdcf 100644 --- a/src/Parsers/IParserBase.cpp +++ b/src/Parsers/IParserBase.cpp @@ -1,5 +1,8 @@ #include +#include +#include +#include namespace DB { @@ -10,9 +13,20 @@ bool IParserBase::parse(Pos & pos, ASTPtr & node, Expected & expected) return wrapParseImpl(pos, IncreaseDepthTag{}, [&] { +// std::cerr << pos.depth << " 0x" << static_cast(this) << " " << getName() << " parsing \"" << pos.get().begin << "\" ... " << std::endl; bool res = parseImpl(pos, node, expected); +// std::cerr << pos.depth << " 0x" << static_cast(this) << " " << getName() << " " << (res ? "OK" : "FAIL") << std::endl; if (!res) node = nullptr; +// else if (node) +// { +// std::cerr << pos.depth << " 0x" << static_cast(this) << "\t" << std::ends; +// { +// WriteBufferFromOStream out(std::cerr, 4096); +// formatAST(*node, out); +// } +// std::cerr << std::endl; +// } return res; }); } diff --git a/src/Parsers/ParserOptimizeQuery.cpp b/src/Parsers/ParserOptimizeQuery.cpp index da63c89c374..a16030cd56b 100644 --- a/src/Parsers/ParserOptimizeQuery.cpp +++ b/src/Parsers/ParserOptimizeQuery.cpp @@ -4,11 +4,24 @@ #include #include +#include namespace DB { +bool ParserOptimizeQueryColumnsSpecification::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) +{ + // Do not allow APPLY and REPLACE transformers. + // Since we use Columns Transformers only to get list of columns, + // ad we can't actuall modify content of the columns for deduplication. + const auto allowed_transformers = ParserColumnsTransformers::ColumnTransformers{ParserColumnsTransformers::ColumnTransformer::EXCEPT}; + + return ParserColumnsMatcher(allowed_transformers).parse(pos, node, expected) + || ParserAsterisk(allowed_transformers).parse(pos, node, expected) + || ParserIdentifier(false).parse(pos, node, expected); +} + bool ParserOptimizeQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) { @@ -16,6 +29,7 @@ bool ParserOptimizeQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expecte ParserKeyword s_partition("PARTITION"); ParserKeyword s_final("FINAL"); ParserKeyword s_deduplicate("DEDUPLICATE"); + ParserKeyword s_by("BY"); ParserToken s_dot(TokenType::Dot); ParserIdentifier name_p; ParserPartition partition_p; @@ -55,6 +69,14 @@ bool ParserOptimizeQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expecte if (s_deduplicate.ignore(pos, expected)) deduplicate = true; + ASTPtr deduplicate_by_columns; + if (deduplicate && s_by.ignore(pos, expected)) + { + if (!ParserList(std::make_unique(), std::make_unique(TokenType::Comma)) + .parse(pos, deduplicate_by_columns, expected)) + return false; + } + auto query = std::make_shared(); node = query; @@ -66,6 +88,7 @@ bool ParserOptimizeQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expecte query->children.push_back(partition); query->final = final; query->deduplicate = deduplicate; + query->deduplicate_by_columns = deduplicate_by_columns; return true; } diff --git a/src/Parsers/ParserOptimizeQuery.h b/src/Parsers/ParserOptimizeQuery.h index 860b0b2927a..c8294d9ff6a 100644 --- a/src/Parsers/ParserOptimizeQuery.h +++ b/src/Parsers/ParserOptimizeQuery.h @@ -7,6 +7,13 @@ namespace DB { +class ParserOptimizeQueryColumnsSpecification : public IParserBase +{ +protected: + const char * getName() const override { return "column specification for OPTIMIZE ... DEDUPLICATE BY"; } + bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override; +}; + /** Query OPTIMIZE TABLE [db.]name [PARTITION partition] [FINAL] [DEDUPLICATE] */ class ParserOptimizeQuery : public IParserBase diff --git a/src/Parsers/tests/gtest_Parser.cpp b/src/Parsers/tests/gtest_Parser.cpp new file mode 100644 index 00000000000..9a5a99f8f5a --- /dev/null +++ b/src/Parsers/tests/gtest_Parser.cpp @@ -0,0 +1,134 @@ +#include + +#include +#include +#include +#include + +#include + +#include + +namespace +{ +using namespace DB; +using namespace std::literals; +} + +struct ParserTestCase +{ + std::shared_ptr parser; + const std::string_view input_text; + const char * expected_ast = nullptr; +}; + +std::ostream & operator<<(std::ostream & ostr, const ParserTestCase & test_case) +{ + return ostr << "parser: " << test_case.parser->getName() << ", input: " << test_case.input_text; +} + +class ParserTest : public ::testing::TestWithParam +{}; + +TEST_P(ParserTest, parseQuery) +{ + const auto & [parser, input_text, expected_ast] = GetParam(); + + ASSERT_NE(nullptr, parser); + + if (expected_ast) + { + ASTPtr ast; + ASSERT_NO_THROW(ast = parseQuery(*parser, input_text.begin(), input_text.end(), 0, 0)); + EXPECT_EQ(expected_ast, serializeAST(*ast->clone(), false)); + } + else + { + ASSERT_THROW(parseQuery(*parser, input_text.begin(), input_text.end(), 0, 0), DB::Exception); + } +} + + +INSTANTIATE_TEST_SUITE_P(ParserOptimizeQuery, ParserTest, ::testing::Values( + ParserTestCase + { + std::make_shared(), + "OPTIMIZE TABLE table_name DEDUPLICATE BY COLUMNS('a, b')", + "OPTIMIZE TABLE table_name DEDUPLICATE BY COLUMNS('a, b')" + }, + ParserTestCase + { + std::make_shared(), + "OPTIMIZE TABLE table_name DEDUPLICATE BY COLUMNS('[a]')", + "OPTIMIZE TABLE table_name DEDUPLICATE BY COLUMNS('[a]')" + }, + ParserTestCase + { + std::make_shared(), + "OPTIMIZE TABLE table_name DEDUPLICATE BY COLUMNS('[a]') EXCEPT b", + "OPTIMIZE TABLE table_name DEDUPLICATE BY COLUMNS('[a]') EXCEPT b" + }, + ParserTestCase + { + std::make_shared(), + "OPTIMIZE TABLE table_name DEDUPLICATE BY COLUMNS('[a]') EXCEPT (a, b)", + "OPTIMIZE TABLE table_name DEDUPLICATE BY COLUMNS('[a]') EXCEPT (a, b)" + }, + ParserTestCase + { + std::make_shared(), + "OPTIMIZE TABLE table_name DEDUPLICATE BY a, b, c", + "OPTIMIZE TABLE table_name DEDUPLICATE BY a, b, c" + }, + ParserTestCase + { + std::make_shared(), + "OPTIMIZE TABLE table_name DEDUPLICATE BY *", + "OPTIMIZE TABLE table_name DEDUPLICATE BY *" + }, + ParserTestCase + { + std::make_shared(), + "OPTIMIZE TABLE table_name DEDUPLICATE BY * EXCEPT a", + "OPTIMIZE TABLE table_name DEDUPLICATE BY * EXCEPT a" + }, + ParserTestCase + { + std::make_shared(), + "OPTIMIZE TABLE table_name DEDUPLICATE BY * EXCEPT (a, b)", + "OPTIMIZE TABLE table_name DEDUPLICATE BY * EXCEPT (a, b)" + } +)); + +INSTANTIATE_TEST_SUITE_P(ParserOptimizeQuery_FAIL, ParserTest, ::testing::Values( + ParserTestCase + { + std::make_shared(), + "OPTIMIZE TABLE table_name DEDUPLICATE BY", + }, + ParserTestCase + { + std::make_shared(), + "OPTIMIZE TABLE table_name DEDUPLICATE BY COLUMNS('[a]') APPLY(x)", + }, + ParserTestCase + { + std::make_shared(), + "OPTIMIZE TABLE table_name DEDUPLICATE BY COLUMNS('[a]') REPLACE(y)", + }, + ParserTestCase + { + std::make_shared(), + "OPTIMIZE TABLE table_name DEDUPLICATE BY * APPLY(x)", + }, + ParserTestCase + { + std::make_shared(), + "OPTIMIZE TABLE table_name DEDUPLICATE BY * REPLACE(y)", + }, + ParserTestCase + { + std::make_shared(), + "OPTIMIZE TABLE table_name DEDUPLICATE BY db.a, db.b, db.c", + } +)); diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index 61ce207a09d..89241f461f6 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -380,6 +380,7 @@ public: const ASTPtr & /*partition*/, bool /*final*/, bool /*deduplicate*/, + const Names & /* deduplicate_by_columns */, const Context & /*context*/) { throw Exception("Method optimize is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED); diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 1065b992396..ca0ee9cea27 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -642,7 +642,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor time_t time_of_merge, const Context & context, const ReservationPtr & space_reservation, - bool deduplicate) + bool deduplicate, + const Names & deduplicate_by_columns) { static const String TMP_PREFIX = "tmp_merge_"; @@ -881,7 +882,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor BlockInputStreamPtr merged_stream = std::make_shared(std::move(pipeline)); if (deduplicate) - merged_stream = std::make_shared(merged_stream, sort_description, SizeLimits(), 0 /*limit_hint*/, Names()); + merged_stream = std::make_shared(merged_stream, sort_description, SizeLimits(), 0 /*limit_hint*/, deduplicate_by_columns); if (need_remove_expired_values) merged_stream = std::make_shared(merged_stream, data, metadata_snapshot, new_data_part, time_of_merge, force_ttl); diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.h b/src/Storages/MergeTree/MergeTreeDataMergerMutator.h index bb74d8b091f..2f3a898ba84 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.h +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.h @@ -127,7 +127,8 @@ public: time_t time_of_merge, const Context & context, const ReservationPtr & space_reservation, - bool deduplicate); + bool deduplicate, + const Names & deduplicate_by_columns); /// Mutate a single data part with the specified commands. Will create and return a temporary part. MergeTreeData::MutableDataPartPtr mutatePartToTemporaryPart( diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp index 055a2970c82..20811a66f10 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp @@ -6,6 +6,7 @@ #include #include #include +#include namespace DB @@ -16,15 +17,29 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } +enum FormatVersion : UInt8 +{ + FORMAT_WITH_CREATE_TIME = 2, + FORMAT_WITH_BLOCK_ID = 3, + FORMAT_WITH_DEDUPLICATE = 4, + FORMAT_WITH_UUID = 5, + FORMAT_WITH_DEDUPLICATE_BY_COLUMNS = 6, + + FORMAT_LAST +}; + void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const { - UInt8 format_version = 4; + UInt8 format_version = FORMAT_WITH_DEDUPLICATE; + + if (!deduplicate_by_columns.empty()) + format_version = std::max(format_version, FORMAT_WITH_DEDUPLICATE_BY_COLUMNS); /// Conditionally bump format_version only when uuid has been assigned. /// If some other feature requires bumping format_version to >= 5 then this code becomes no-op. if (new_part_uuid != UUIDHelpers::Nil) - format_version = std::max(format_version, static_cast(5)); + format_version = std::max(format_version, FORMAT_WITH_UUID); out << "format version: " << format_version << "\n" << "create_time: " << LocalDateTime(create_time ? create_time : time(nullptr)) << "\n" @@ -50,6 +65,19 @@ void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const if (new_part_uuid != UUIDHelpers::Nil) out << "\ninto_uuid: " << new_part_uuid; + if (!deduplicate_by_columns.empty()) + { + const FormatSettings format_settings; + out << "\ndeduplicate_by_columns: ["; + for (size_t i = 0; i < deduplicate_by_columns.size(); ++i) + { + writeJSONString(deduplicate_by_columns[i], out, format_settings); + if (i != deduplicate_by_columns.size() - 1) + out << ","; + } + out << "]"; + } + break; case DROP_RANGE: @@ -129,10 +157,10 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in) in >> "format version: " >> format_version >> "\n"; - if (format_version < 1 || format_version > 5) + if (format_version < 1 || format_version >= FORMAT_LAST) throw Exception("Unknown ReplicatedMergeTreeLogEntry format version: " + DB::toString(format_version), ErrorCodes::UNKNOWN_FORMAT_VERSION); - if (format_version >= 2) + if (format_version >= FORMAT_WITH_CREATE_TIME) { LocalDateTime create_time_dt; in >> "create_time: " >> create_time_dt >> "\n"; @@ -141,7 +169,7 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in) in >> "source replica: " >> source_replica >> "\n"; - if (format_version >= 3) + if (format_version >= FORMAT_WITH_BLOCK_ID) { in >> "block_id: " >> escape >> block_id >> "\n"; } @@ -167,7 +195,7 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in) } in >> new_part_name; - if (format_version >= 4) + if (format_version >= FORMAT_WITH_DEDUPLICATE) { in >> "\ndeduplicate: " >> deduplicate; @@ -184,6 +212,21 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in) } else if (checkString("into_uuid: ", in)) in >> new_part_uuid; + else if (checkString("deduplicate_by_columns: [", in)) + { + Strings new_deduplicate_by_columns; + for (;;) + { + String tmp_column_name; + readJSONString(tmp_column_name, in); + new_deduplicate_by_columns.emplace_back(std::move(tmp_column_name)); + if (!checkString(",", in)) + break; + } + assertString("]", in); + + deduplicate_by_columns = std::move(new_deduplicate_by_columns); + } else trailing_newline_found = true; } diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h b/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h index 4b384171dde..1fb9caeff4c 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h @@ -81,6 +81,7 @@ struct ReplicatedMergeTreeLogEntryData Strings source_parts; bool deduplicate = false; /// Do deduplicate on merge + Strings deduplicate_by_columns = {}; // Which columns should be checked for duplicates, empty means 'all' (default). MergeType merge_type = MergeType::REGULAR; String column_name; String index_name; diff --git a/src/Storages/MergeTree/tests/gtest_ReplicatedMergeTreeLogEntry.cpp b/src/Storages/MergeTree/tests/gtest_ReplicatedMergeTreeLogEntry.cpp new file mode 100644 index 00000000000..a1a1f18d99e --- /dev/null +++ b/src/Storages/MergeTree/tests/gtest_ReplicatedMergeTreeLogEntry.cpp @@ -0,0 +1,311 @@ +#include + +#include + +#include + +#include + +#include + +namespace DB +{ +std::ostream & operator<<(std::ostream & ostr, const MergeTreeDataPartType & type) +{ + return ostr << type.toString(); +} + +std::ostream & operator<<(std::ostream & ostr, const UInt128 & v) +{ + return ostr << v.toHexString(); +} + +template +std::ostream & operator<<(std::ostream & ostr, const StrongTypedef & v) +{ + return ostr << v.toUnderType(); +} + +std::ostream & operator<<(std::ostream & ostr, const MergeType & v) +{ + return ostr << toString(v); +} + +} + +namespace std +{ + +std::ostream & operator<<(std::ostream & ostr, const std::exception_ptr & e) +{ + try + { + if (e) + { + std::rethrow_exception(e); + } + return ostr << ""; + } + catch (const std::exception& e) + { + return ostr << e.what(); + } +} + +template +inline std::ostream& operator<<(std::ostream & ostr, const std::vector & v) +{ + ostr << "["; + for (size_t i = 0; i < v.size(); ++i) + { + ostr << i; + if (i != v.size() - 1) + ostr << ", "; + } + return ostr << "] (" << v.size() << ") items"; +} + +} + +namespace +{ +using namespace DB; + +template +void compareAttributes(::testing::AssertionResult & result, const char * name, const T & expected_value, const T & actual_value); + +#define CMP_ATTRIBUTE(attribute) compareAttributes(result, #attribute, expected.attribute, actual.attribute) + +::testing::AssertionResult compare( + const ReplicatedMergeTreeLogEntryData::ReplaceRangeEntry & expected, + const ReplicatedMergeTreeLogEntryData::ReplaceRangeEntry & actual) +{ + auto result = ::testing::AssertionSuccess(); + + CMP_ATTRIBUTE(drop_range_part_name); + CMP_ATTRIBUTE(from_database); + CMP_ATTRIBUTE(from_table); + CMP_ATTRIBUTE(src_part_names); + CMP_ATTRIBUTE(new_part_names); + CMP_ATTRIBUTE(part_names_checksums); + CMP_ATTRIBUTE(columns_version); + + return result; +} + +template +bool compare(const T & expected, const T & actual) +{ + return expected == actual; +} + +template +::testing::AssertionResult compare(const std::shared_ptr & expected, const std::shared_ptr & actual) +{ + if (!!expected != !!actual) + return ::testing::AssertionFailure() + << "expected : " << static_cast(expected.get()) + << "\nactual : " << static_cast(actual.get()); + + if (expected && actual) + return compare(*expected, *actual); + + return ::testing::AssertionSuccess(); +} + +template +void compareAttributes(::testing::AssertionResult & result, const char * name, const T & expected_value, const T & actual_value) +{ + const auto cmp_result = compare(expected_value, actual_value); + if (cmp_result == false) + { + if (result) + result = ::testing::AssertionFailure(); + + result << "\nMismatching attribute: \"" << name << "\""; + if constexpr (std::is_same_v, ::testing::AssertionResult>) + result << "\n" << cmp_result.message(); + else + result << "\n\texpected: " << expected_value + << "\n\tactual : " << actual_value; + } +}; + +::testing::AssertionResult compare(const ReplicatedMergeTreeLogEntryData & expected, const ReplicatedMergeTreeLogEntryData & actual) +{ + ::testing::AssertionResult result = ::testing::AssertionSuccess(); + + CMP_ATTRIBUTE(znode_name); + CMP_ATTRIBUTE(type); + CMP_ATTRIBUTE(source_replica); + CMP_ATTRIBUTE(new_part_name); + CMP_ATTRIBUTE(new_part_type); + CMP_ATTRIBUTE(block_id); + CMP_ATTRIBUTE(actual_new_part_name); + CMP_ATTRIBUTE(new_part_uuid); + CMP_ATTRIBUTE(source_parts); + CMP_ATTRIBUTE(deduplicate); + CMP_ATTRIBUTE(deduplicate_by_columns); + CMP_ATTRIBUTE(merge_type); + CMP_ATTRIBUTE(column_name); + CMP_ATTRIBUTE(index_name); + CMP_ATTRIBUTE(detach); + CMP_ATTRIBUTE(replace_range_entry); + CMP_ATTRIBUTE(alter_version); + CMP_ATTRIBUTE(have_mutation); + CMP_ATTRIBUTE(columns_str); + CMP_ATTRIBUTE(metadata_str); + CMP_ATTRIBUTE(currently_executing); + CMP_ATTRIBUTE(removed_by_other_entry); + CMP_ATTRIBUTE(num_tries); + CMP_ATTRIBUTE(exception); + CMP_ATTRIBUTE(last_attempt_time); + CMP_ATTRIBUTE(num_postponed); + CMP_ATTRIBUTE(postpone_reason); + CMP_ATTRIBUTE(last_postpone_time); + CMP_ATTRIBUTE(create_time); + CMP_ATTRIBUTE(quorum); + + return result; +} +} + + +// TEST(ReplicatedMergeTreeLogEntryData, writeToText) +// { +// const ReplicatedMergeTreeLogEntryData expected +// { +// .type = ReplicatedMergeTreeLogEntryData::MERGE_PARTS, +// .new_part_uuid = UUIDHelpers::generateV4(), +// .deduplicate_by_columns = {"foo", "bar", "quix"}, +// .alter_version = 123456, +// }; + +// ReplicatedMergeTreeLogEntryData actual; +// { +// const auto str = actual.toString(); +// DB::ReadBufferFromString buffer(str); +// EXPECT_NO_THROW(expected.readText(buffer)) << "While reading:\n" << str; +// } + +// ASSERT_TRUE(compare(expected, actual)); +// } + + +class ReplicatedMergeTreeLogEntryDataTest : public ::testing::TestWithParam +{}; + +TEST_P(ReplicatedMergeTreeLogEntryDataTest, transcode) +{ + const auto & expected = GetParam(); + const auto str = expected.toString(); + + ReplicatedMergeTreeLogEntryData actual; + // To simplify comparison, since it is rarely set. + actual.alter_version = expected.alter_version; + { + DB::ReadBufferFromString buffer(str); + EXPECT_NO_THROW(actual.readText(buffer)) << "While reading:\n" << str; + } + + ASSERT_TRUE(compare(expected, actual)) << "Via text:\n" << str; +} + +INSTANTIATE_TEST_SUITE_P(Merge, ReplicatedMergeTreeLogEntryDataTest, + ::testing::ValuesIn(std::initializer_list{ + { + // Basic: minimal set of attributes. + .type = ReplicatedMergeTreeLogEntryData::MERGE_PARTS, + .new_part_type = MergeTreeDataPartType::WIDE, + .alter_version = 0, + .create_time = 123, + }, + { + .type = ReplicatedMergeTreeLogEntryData::MERGE_PARTS, + .new_part_type = MergeTreeDataPartType::WIDE, + // Format version 4 + .deduplicate = true, + + .alter_version = 0, + .create_time = 123, + }, + { + .type = ReplicatedMergeTreeLogEntryData::MERGE_PARTS, + .new_part_type = MergeTreeDataPartType::WIDE, + + // Format version 5 + .new_part_uuid = UUID(UInt128(123456789, 10111213141516)), + + .alter_version = 0, + .create_time = 123, + }, + { + .type = ReplicatedMergeTreeLogEntryData::MERGE_PARTS, + .new_part_type = MergeTreeDataPartType::WIDE, + + // Format version 6 + .deduplicate = true, + .deduplicate_by_columns = {"foo", "bar", "quix"}, + + .alter_version = 0, + .create_time = 123, + }, + { + .type = ReplicatedMergeTreeLogEntryData::MERGE_PARTS, + .new_part_type = MergeTreeDataPartType::WIDE, + + // Mixing features + .new_part_uuid = UUID(UInt128(123456789, 10111213141516)), + .deduplicate = true, + .deduplicate_by_columns = {"foo", "bar", "quix"}, + + .alter_version = 0, + .create_time = 123, + }, +})); + + +// INSTANTIATE_TEST_SUITE_P(Full, ReplicatedMergeTreeLogEntryDataTest, +// ::testing::ValuesIn(std::initializer_list{ +// { +// .znode_name = "znode name", +// .type = ReplicatedMergeTreeLogEntryData::MERGE_PARTS, +// .source_replica = "source replica", +// .new_part_name = "new part name", +// .new_part_type = MergeTreeDataPartType::WIDE, +// .block_id = "block id", +// .actual_new_part_name = "new part name", +// .new_part_uuid = UUID(UInt128(123456789, 10111213141516)), +// .source_parts = {"part1", "part2"}, +// .deduplicate = true, +// .deduplicate_by_columns = {"col1", "col2"}, +// .merge_type = MergeType::REGULAR, +// .column_name = "column name", +// .index_name = "index name", +// .detach = false, +// .replace_range_entry = std::make_shared( +// ReplicatedMergeTreeLogEntryData::ReplaceRangeEntry +// { +// .drop_range_part_name = "drop range part name", +// .from_database = "from database", +// .src_part_names = {"src part name1", "src part name2"}, +// .new_part_names = {"new part name1", "new part name2"}, +// .columns_version = 123456, +// }), +// .alter_version = 56789, +// .have_mutation = false, +// .columns_str = "columns str", +// .metadata_str = "metadata str", +// // Those attributes are not serialized to string, hence it makes no sense to set. +// // .currently_executing +// // .removed_by_other_entry +// // .num_tries +// // .exception +// // .last_attempt_time +// // .num_postponed +// // .postpone_reason +// // .last_postpone_time, +// .create_time = static_cast(123456789), +// .quorum = 321, +// }, +// })); diff --git a/src/Storages/StorageBuffer.cpp b/src/Storages/StorageBuffer.cpp index 549caf427ea..af595d10e28 100644 --- a/src/Storages/StorageBuffer.cpp +++ b/src/Storages/StorageBuffer.cpp @@ -583,7 +583,7 @@ void StorageBuffer::shutdown() try { - optimize(nullptr /*query*/, getInMemoryMetadataPtr(), {} /*partition*/, false /*final*/, false /*deduplicate*/, global_context); + optimize(nullptr /*query*/, getInMemoryMetadataPtr(), {} /*partition*/, false /*final*/, false /*deduplicate*/, {}, global_context); } catch (...) { @@ -608,6 +608,7 @@ bool StorageBuffer::optimize( const ASTPtr & partition, bool final, bool deduplicate, + const Names & /* deduplicate_by_columns */, const Context & /*context*/) { if (partition) @@ -906,7 +907,7 @@ void StorageBuffer::alter(const AlterCommands & params, const Context & context, /// Flush all buffers to storages, so that no non-empty blocks of the old /// structure remain. Structure of empty blocks will be updated during first /// insert. - optimize({} /*query*/, metadata_snapshot, {} /*partition_id*/, false /*final*/, false /*deduplicate*/, context); + optimize({} /*query*/, metadata_snapshot, {} /*partition_id*/, false /*final*/, false /*deduplicate*/, {}, context); StorageInMemoryMetadata new_metadata = *metadata_snapshot; params.apply(new_metadata, context); diff --git a/src/Storages/StorageBuffer.h b/src/Storages/StorageBuffer.h index b8031a42a1d..38596531a1a 100644 --- a/src/Storages/StorageBuffer.h +++ b/src/Storages/StorageBuffer.h @@ -87,6 +87,7 @@ public: const ASTPtr & partition, bool final, bool deduplicate, + const Names & deduplicate_by_columns, const Context & context) override; bool supportsSampling() const override { return true; } diff --git a/src/Storages/StorageMaterializedView.cpp b/src/Storages/StorageMaterializedView.cpp index 91d0aabd589..1a75a585c85 100644 --- a/src/Storages/StorageMaterializedView.cpp +++ b/src/Storages/StorageMaterializedView.cpp @@ -233,12 +233,13 @@ bool StorageMaterializedView::optimize( const ASTPtr & partition, bool final, bool deduplicate, + const Names & deduplicate_by_columns, const Context & context) { checkStatementCanBeForwarded(); auto storage_ptr = getTargetTable(); auto metadata_snapshot = storage_ptr->getInMemoryMetadataPtr(); - return getTargetTable()->optimize(query, metadata_snapshot, partition, final, deduplicate, context); + return getTargetTable()->optimize(query, metadata_snapshot, partition, final, deduplicate, deduplicate_by_columns, context); } void StorageMaterializedView::alter( diff --git a/src/Storages/StorageMaterializedView.h b/src/Storages/StorageMaterializedView.h index fd7ecea5526..e1dd73e8580 100644 --- a/src/Storages/StorageMaterializedView.h +++ b/src/Storages/StorageMaterializedView.h @@ -46,6 +46,7 @@ public: const ASTPtr & partition, bool final, bool deduplicate, + const Names & deduplicate_by_columns, const Context & context) override; void alter(const AlterCommands & params, const Context & context, TableLockHolder & table_lock_holder) override; diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index eb28ccfa3d5..440e6d8162d 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -741,6 +741,7 @@ bool StorageMergeTree::merge( const String & partition_id, bool final, bool deduplicate, + const Names & deduplicate_by_columns, String * out_disable_reason, bool optimize_skip_merged_partitions) { @@ -758,10 +759,15 @@ bool StorageMergeTree::merge( if (!merge_mutate_entry) return false; - return mergeSelectedParts(metadata_snapshot, deduplicate, *merge_mutate_entry, table_lock_holder); + return mergeSelectedParts(metadata_snapshot, deduplicate, deduplicate_by_columns, *merge_mutate_entry, table_lock_holder); } -bool StorageMergeTree::mergeSelectedParts(const StorageMetadataPtr & metadata_snapshot, bool deduplicate, MergeMutateSelectedEntry & merge_mutate_entry, TableLockHolder & table_lock_holder) +bool StorageMergeTree::mergeSelectedParts( + const StorageMetadataPtr & metadata_snapshot, + bool deduplicate, + const Names & deduplicate_by_columns, + MergeMutateSelectedEntry & merge_mutate_entry, + TableLockHolder & table_lock_holder) { auto & future_part = merge_mutate_entry.future_part; Stopwatch stopwatch; @@ -786,7 +792,7 @@ bool StorageMergeTree::mergeSelectedParts(const StorageMetadataPtr & metadata_sn { new_part = merger_mutator.mergePartsToTemporaryPart( future_part, metadata_snapshot, *(merge_list_entry), table_lock_holder, time(nullptr), - global_context, merge_mutate_entry.tagger->reserved_space, deduplicate); + global_context, merge_mutate_entry.tagger->reserved_space, deduplicate, deduplicate_by_columns); merger_mutator.renameMergedTemporaryPart(new_part, future_part.parts, nullptr); write_part_log({}); @@ -953,7 +959,7 @@ std::optional StorageMergeTree::getDataProcessingJob() return JobAndPool{[this, metadata_snapshot, merge_entry, mutate_entry, share_lock] () mutable { if (merge_entry) - mergeSelectedParts(metadata_snapshot, false, *merge_entry, share_lock); + mergeSelectedParts(metadata_snapshot, false, {}, *merge_entry, share_lock); else if (mutate_entry) mutateSelectedPart(metadata_snapshot, *mutate_entry, share_lock); }, PoolType::MERGE_MUTATE}; @@ -1036,6 +1042,7 @@ bool StorageMergeTree::optimize( const ASTPtr & partition, bool final, bool deduplicate, + const Names & deduplicate_by_columns, const Context & context) { String disable_reason; @@ -1049,7 +1056,7 @@ bool StorageMergeTree::optimize( for (const String & partition_id : partition_ids) { - if (!merge(true, partition_id, true, deduplicate, &disable_reason, context.getSettingsRef().optimize_skip_merged_partitions)) + if (!merge(true, partition_id, true, deduplicate, deduplicate_by_columns, &disable_reason, context.getSettingsRef().optimize_skip_merged_partitions)) { constexpr const char * message = "Cannot OPTIMIZE table: {}"; if (disable_reason.empty()) @@ -1068,7 +1075,7 @@ bool StorageMergeTree::optimize( if (partition) partition_id = getPartitionIDFromQuery(partition, context); - if (!merge(true, partition_id, final, deduplicate, &disable_reason, context.getSettingsRef().optimize_skip_merged_partitions)) + if (!merge(true, partition_id, final, deduplicate, deduplicate_by_columns, &disable_reason, context.getSettingsRef().optimize_skip_merged_partitions)) { constexpr const char * message = "Cannot OPTIMIZE table: {}"; if (disable_reason.empty()) diff --git a/src/Storages/StorageMergeTree.h b/src/Storages/StorageMergeTree.h index 5e0e8478ae6..5ee47832b1e 100644 --- a/src/Storages/StorageMergeTree.h +++ b/src/Storages/StorageMergeTree.h @@ -70,6 +70,7 @@ public: const ASTPtr & partition, bool final, bool deduplicate, + const Names & deduplicate_by_columns, const Context & context) override; void mutate(const MutationCommands & commands, const Context & context) override; @@ -132,7 +133,7 @@ private: * If aggressive - when selects parts don't takes into account their ratio size and novelty (used for OPTIMIZE query). * Returns true if merge is finished successfully. */ - bool merge(bool aggressive, const String & partition_id, bool final, bool deduplicate, String * out_disable_reason = nullptr, bool optimize_skip_merged_partitions = false); + bool merge(bool aggressive, const String & partition_id, bool final, bool deduplicate, const Names & deduplicate_by_columns, String * out_disable_reason = nullptr, bool optimize_skip_merged_partitions = false); ActionLock stopMergesAndWait(); @@ -183,7 +184,8 @@ private: TableLockHolder & table_lock_holder, bool optimize_skip_merged_partitions = false, SelectPartsDecision * select_decision_out = nullptr); - bool mergeSelectedParts(const StorageMetadataPtr & metadata_snapshot, bool deduplicate, MergeMutateSelectedEntry & entry, TableLockHolder & table_lock_holder); + + bool mergeSelectedParts(const StorageMetadataPtr & metadata_snapshot, bool deduplicate, const Names & deduplicate_by_columns, MergeMutateSelectedEntry & entry, TableLockHolder & table_lock_holder); std::shared_ptr selectPartsToMutate(const StorageMetadataPtr & metadata_snapshot, String * disable_reason, TableLockHolder & table_lock_holder); bool mutateSelectedPart(const StorageMetadataPtr & metadata_snapshot, MergeMutateSelectedEntry & entry, TableLockHolder & table_lock_holder); diff --git a/src/Storages/StorageProxy.h b/src/Storages/StorageProxy.h index e2dc241dc6a..b7b948af4ba 100644 --- a/src/Storages/StorageProxy.h +++ b/src/Storages/StorageProxy.h @@ -122,9 +122,10 @@ public: const ASTPtr & partition, bool final, bool deduplicate, + const Names & deduplicate_by_columns, const Context & context) override { - return getNested()->optimize(query, metadata_snapshot, partition, final, deduplicate, context); + return getNested()->optimize(query, metadata_snapshot, partition, final, deduplicate, deduplicate_by_columns, context); } void mutate(const MutationCommands & commands, const Context & context) override { getNested()->mutate(commands, context); } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index e2bf1592659..f787eae5ab9 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -1507,7 +1507,7 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry) { part = merger_mutator.mergePartsToTemporaryPart( future_merged_part, metadata_snapshot, *merge_entry, - table_lock, entry.create_time, global_context, reserved_space, entry.deduplicate); + table_lock, entry.create_time, global_context, reserved_space, entry.deduplicate, entry.deduplicate_by_columns); merger_mutator.renameMergedTemporaryPart(part, parts, &transaction); @@ -2711,6 +2711,7 @@ void StorageReplicatedMergeTree::mergeSelectingTask() const auto storage_settings_ptr = getSettings(); const bool deduplicate = false; /// TODO: read deduplicate option from table config + const Names deduplicate_by_columns = {}; CreateMergeEntryResult create_result = CreateMergeEntryResult::Other; try @@ -2761,6 +2762,7 @@ void StorageReplicatedMergeTree::mergeSelectingTask() future_merged_part.uuid, future_merged_part.type, deduplicate, + deduplicate_by_columns, nullptr, merge_pred.getVersion(), future_merged_part.merge_type); @@ -2850,6 +2852,7 @@ StorageReplicatedMergeTree::CreateMergeEntryResult StorageReplicatedMergeTree::c const UUID & merged_part_uuid, const MergeTreeDataPartType & merged_part_type, bool deduplicate, + const Names & deduplicate_by_columns, ReplicatedMergeTreeLogEntryData * out_log_entry, int32_t log_version, MergeType merge_type) @@ -2887,6 +2890,7 @@ StorageReplicatedMergeTree::CreateMergeEntryResult StorageReplicatedMergeTree::c entry.new_part_type = merged_part_type; entry.merge_type = merge_type; entry.deduplicate = deduplicate; + entry.deduplicate_by_columns = deduplicate_by_columns; entry.merge_type = merge_type; entry.create_time = time(nullptr); @@ -3855,6 +3859,7 @@ BlockOutputStreamPtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/, const Settings & query_settings = context.getSettingsRef(); bool deduplicate = storage_settings_ptr->replicated_deduplication_window != 0 && query_settings.insert_deduplicate; + // TODO: should we also somehow pass list of columns to deduplicate on to the ReplicatedMergeTreeBlockOutputStream ? return std::make_shared( *this, metadata_snapshot, query_settings.insert_quorum, query_settings.insert_quorum_timeout.totalMilliseconds(), @@ -3870,6 +3875,7 @@ bool StorageReplicatedMergeTree::optimize( const ASTPtr & partition, bool final, bool deduplicate, + const Names & deduplicate_by_columns, const Context & query_context) { assertNotReadonly(); @@ -3927,7 +3933,8 @@ bool StorageReplicatedMergeTree::optimize( ReplicatedMergeTreeLogEntryData merge_entry; CreateMergeEntryResult create_result = createLogEntryToMergeParts( zookeeper, future_merged_part.parts, - future_merged_part.name, future_merged_part.uuid, future_merged_part.type, deduplicate, + future_merged_part.name, future_merged_part.uuid, future_merged_part.type, + deduplicate, deduplicate_by_columns, &merge_entry, can_merge.getVersion(), future_merged_part.merge_type); if (create_result == CreateMergeEntryResult::MissingPart) @@ -3988,7 +3995,8 @@ bool StorageReplicatedMergeTree::optimize( ReplicatedMergeTreeLogEntryData merge_entry; CreateMergeEntryResult create_result = createLogEntryToMergeParts( zookeeper, future_merged_part.parts, - future_merged_part.name, future_merged_part.uuid, future_merged_part.type, deduplicate, + future_merged_part.name, future_merged_part.uuid, future_merged_part.type, + deduplicate, deduplicate_by_columns, &merge_entry, can_merge.getVersion(), future_merged_part.merge_type); if (create_result == CreateMergeEntryResult::MissingPart) diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index f1094793944..1c595ea51e6 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -119,6 +119,7 @@ public: const ASTPtr & partition, bool final, bool deduplicate, + const Names & deduplicate_by_columns, const Context & query_context) override; void alter(const AlterCommands & commands, const Context & query_context, TableLockHolder & table_lock_holder) override; @@ -469,6 +470,7 @@ private: const UUID & merged_part_uuid, const MergeTreeDataPartType & merged_part_type, bool deduplicate, + const Names & deduplicate_by_columns, ReplicatedMergeTreeLogEntryData * out_log_entry, int32_t log_version, MergeType merge_type); diff --git a/tests/queries/0_stateless/01581_deduplicate_by_columns_local.reference b/tests/queries/0_stateless/01581_deduplicate_by_columns_local.reference new file mode 100644 index 00000000000..7be65240459 --- /dev/null +++ b/tests/queries/0_stateless/01581_deduplicate_by_columns_local.reference @@ -0,0 +1,39 @@ +TOTAL rows 4 +OLD DEDUPLICATE +0 0 0 +1 1 2 +1 1 3 +DEDUPLICATE BY * +0 0 0 +1 1 2 +1 1 3 +DEDUPLICATE BY * EXCEPT mat +0 0 0 +1 1 2 +1 1 3 +DEDUPLICATE BY pk,sk,val,mat +0 0 0 +1 1 2 +1 1 3 +Can not remove full duplicates +OLD DEDUPLICATE +4 +DEDUPLICATE BY pk,sk,val,mat +4 +Remove partial duplicates +DEDUPLICATE BY * +3 +DEDUPLICATE BY * EXCEPT mat +0 0 0 +1 1 2 +1 1 3 +DEDUPLICATE BY COLUMNS("*") EXCEPT mat +0 0 0 +1 1 2 +1 1 3 +DEDUPLICATE BY pk,sk +0 0 0 +1 1 2 +DEDUPLICATE BY COLUMNS(".*k") +0 0 0 +1 1 2 diff --git a/tests/queries/0_stateless/01581_deduplicate_by_columns_local.sql b/tests/queries/0_stateless/01581_deduplicate_by_columns_local.sql new file mode 100644 index 00000000000..366aaf97bd4 --- /dev/null +++ b/tests/queries/0_stateless/01581_deduplicate_by_columns_local.sql @@ -0,0 +1,114 @@ +--- local case + +-- Just in case if previous tests run left some stuff behind. +DROP TABLE IF EXISTS source_data; + +CREATE TABLE source_data ( + pk Int32, sk Int32, val UInt32, + PRIMARY KEY (pk) +) ENGINE=MergeTree +ORDER BY (pk, sk); + +INSERT INTO source_data VALUES (0, 0, 0), (0, 0, 0), (1, 1, 2), (1, 1, 3); + +SELECT 'TOTAL rows', count() FROM source_data; + +DROP TABLE IF EXISTS full_duplicates; +-- table with duplicates on MATERIALIZED columns +CREATE TABLE full_duplicates ( + pk Int32, sk Int32, val UInt32, mat UInt32 MATERIALIZED 12345, alias UInt32 ALIAS 2, + PRIMARY KEY (pk) +) ENGINE=MergeTree +ORDER BY (pk, sk); + +-- ERROR cases +OPTIMIZE TABLE full_duplicates DEDUPLICATE BY pk, sk, val, mat, alias; -- { serverError 16 } -- alias column is present +OPTIMIZE TABLE full_duplicates DEDUPLICATE BY sk, val; -- { serverError 8 } -- primary key column is missing +OPTIMIZE TABLE full_duplicates DEDUPLICATE BY; -- { serverError 51 } -- list is empty +OPTIMIZE TABLE full_duplicates DEDUPLICATE BY * EXCEPT(pk, sk, val, mat, alias); -- { serverError 51 } -- list is empty +OPTIMIZE TABLE full_duplicates DEDUPLICATE BY * EXCEPT(pk); -- { serverError 8 } -- primary key column is missing +OPTIMIZE TABLE partial_duplicates DEDUPLICATE BY pk,sk,val,mat EXCEPT mat; -- { clientError 62 } -- invalid syntax + +-- Valid cases +-- NOTE: here and below we need FINAL to force deduplication in such a small set of data in only 1 part. + +SELECT 'OLD DEDUPLICATE'; +INSERT INTO full_duplicates SELECT * FROM source_data; +OPTIMIZE TABLE full_duplicates FINAL DEDUPLICATE; +SELECT * FROM full_duplicates; +TRUNCATE full_duplicates; + +SELECT 'DEDUPLICATE BY *'; +INSERT INTO full_duplicates SELECT * FROM source_data; +OPTIMIZE TABLE full_duplicates FINAL DEDUPLICATE BY *; +SELECT * FROM full_duplicates; +TRUNCATE full_duplicates; + +SELECT 'DEDUPLICATE BY * EXCEPT mat'; +INSERT INTO full_duplicates SELECT * FROM source_data; +OPTIMIZE TABLE full_duplicates FINAL DEDUPLICATE BY * EXCEPT mat; +SELECT * FROM full_duplicates; +TRUNCATE full_duplicates; + +SELECT 'DEDUPLICATE BY pk,sk,val,mat'; +INSERT INTO full_duplicates SELECT * FROM source_data; +OPTIMIZE TABLE full_duplicates FINAL DEDUPLICATE BY pk,sk,val,mat; +SELECT * FROM full_duplicates; +TRUNCATE full_duplicates; + +--DROP TABLE full_duplicates; + +-- Now to the partial duplicates when MATERIALIZED column alway has unique value. +DROP TABLE IF EXISTS partial_duplicates; +CREATE TABLE partial_duplicates ( + pk Int32, sk Int32, val UInt32, mat UInt32 MATERIALIZED rand(), alias UInt32 ALIAS 2, + PRIMARY KEY (pk) +) ENGINE=MergeTree +ORDER BY (pk, sk); + +SELECT 'Can not remove full duplicates'; + +-- should not remove anything +SELECT 'OLD DEDUPLICATE'; +INSERT INTO partial_duplicates SELECT * FROM source_data; +OPTIMIZE TABLE partial_duplicates FINAL DEDUPLICATE; +SELECT count() FROM partial_duplicates; +TRUNCATE partial_duplicates; + +SELECT 'DEDUPLICATE BY pk,sk,val,mat'; +INSERT INTO partial_duplicates SELECT * FROM source_data; +OPTIMIZE TABLE partial_duplicates FINAL DEDUPLICATE BY pk,sk,val,mat; +SELECT count() FROM partial_duplicates; +TRUNCATE partial_duplicates; + +SELECT 'Remove partial duplicates'; + +SELECT 'DEDUPLICATE BY *'; -- all except MATERIALIZED columns, hence will reduce number of rows. +INSERT INTO partial_duplicates SELECT * FROM source_data; +OPTIMIZE TABLE partial_duplicates FINAL DEDUPLICATE BY *; +SELECT count() FROM partial_duplicates; +TRUNCATE partial_duplicates; + +SELECT 'DEDUPLICATE BY * EXCEPT mat'; +INSERT INTO partial_duplicates SELECT * FROM source_data; +OPTIMIZE TABLE partial_duplicates FINAL DEDUPLICATE BY * EXCEPT mat; +SELECT * FROM partial_duplicates; +TRUNCATE partial_duplicates; + +SELECT 'DEDUPLICATE BY COLUMNS("*") EXCEPT mat'; +INSERT INTO partial_duplicates SELECT * FROM source_data; +OPTIMIZE TABLE partial_duplicates FINAL DEDUPLICATE BY COLUMNS('.*') EXCEPT mat; +SELECT * FROM partial_duplicates; +TRUNCATE partial_duplicates; + +SELECT 'DEDUPLICATE BY pk,sk'; +INSERT INTO partial_duplicates SELECT * FROM source_data; +OPTIMIZE TABLE partial_duplicates FINAL DEDUPLICATE BY pk,sk; +SELECT * FROM partial_duplicates; +TRUNCATE partial_duplicates; + +SELECT 'DEDUPLICATE BY COLUMNS(".*k")'; +INSERT INTO partial_duplicates SELECT * FROM source_data; +OPTIMIZE TABLE partial_duplicates FINAL DEDUPLICATE BY COLUMNS('.*k'); +SELECT * FROM partial_duplicates; +TRUNCATE partial_duplicates; diff --git a/tests/queries/0_stateless/01581_deduplicate_by_columns_replicated.reference b/tests/queries/0_stateless/01581_deduplicate_by_columns_replicated.reference new file mode 100644 index 00000000000..5eeab2afea5 --- /dev/null +++ b/tests/queries/0_stateless/01581_deduplicate_by_columns_replicated.reference @@ -0,0 +1,47 @@ +check that we have a data +r1 1 1001 2 1 1 +r1 1 2001 1 1 1 +r1 2 1002 1 1 1 +r1 2 2002 1 1 1 +r1 3 1003 2 2 2 +r1 4 1004 2 2 2 +r1 5 2005 2 1 1 +r1 9 1002 1 1 1 +r1 1 1001 2 1 1 +r1 1 2001 1 1 1 +r1 2 1002 1 1 1 +r1 2 2002 1 1 1 +r1 3 1003 2 2 2 +r1 4 1004 2 2 2 +r1 5 2005 2 1 1 +r1 9 1002 1 1 1 +after old OPTIMIZE DEDUPLICATE +r1 1 1001 1 1 1 +r1 1 2001 1 1 1 +r1 2 1002 1 1 1 +r1 2 2002 1 1 1 +r1 3 1003 2 2 2 +r1 4 1004 2 2 2 +r1 5 2005 1 1 1 +r1 9 1002 1 1 1 +r2 1 1001 1 1 1 +r2 1 2001 1 1 1 +r2 2 1002 1 1 1 +r2 2 2002 1 1 1 +r2 3 1003 2 2 2 +r2 4 1004 2 2 2 +r2 5 2005 1 1 1 +r2 9 1002 1 1 1 +check data again after multiple deduplications with new syntax +r1 1 1001 1 1 1 +r1 2 1002 1 1 1 +r1 3 1003 1 1 1 +r1 4 1004 1 1 1 +r1 5 2005 1 1 1 +r1 9 1002 1 1 1 +r2 1 1001 1 1 1 +r2 2 1002 1 1 1 +r2 3 1003 1 1 1 +r2 4 1004 1 1 1 +r2 5 2005 1 1 1 +r2 9 1002 1 1 1 diff --git a/tests/queries/0_stateless/01581_deduplicate_by_columns_replicated.sql b/tests/queries/0_stateless/01581_deduplicate_by_columns_replicated.sql new file mode 100644 index 00000000000..2a8a1e8fda4 --- /dev/null +++ b/tests/queries/0_stateless/01581_deduplicate_by_columns_replicated.sql @@ -0,0 +1,55 @@ +--- Put it into tests/queries/0_stateless/01581_deduplicate_by_columns.sql + +--- replicated case + +-- Just in case if previous tests run left some stuff behind. +DROP TABLE IF EXISTS replicated_deduplicate_by_columns_r1; +DROP TABLE IF EXISTS replicated_deduplicate_by_columns_r2; + +SET replication_alter_partitions_sync = 2; + +-- IRL insert_replica_id were filled from hostname +CREATE TABLE IF NOT EXISTS replicated_deduplicate_by_columns_r1 ( + id Int32, val UInt32, insert_time_ns DateTime64(9) MATERIALIZED now64(9), insert_replica_id UInt8 MATERIALIZED randConstant() +) ENGINE=ReplicatedMergeTree('/clickhouse/tables/test_01581/replicated_deduplicate', 'r1') ORDER BY id; + +CREATE TABLE IF NOT EXISTS replicated_deduplicate_by_columns_r2 ( + id Int32, val UInt32, insert_time_ns DateTime64(9) MATERIALIZED now64(9), insert_replica_id UInt8 MATERIALIZED randConstant() +) ENGINE=ReplicatedMergeTree('/clickhouse/tables/test_01581/replicated_deduplicate', 'r2') ORDER BY id; + +-- insert some data, 2 records: (3, 1003), (4, 1004) are duplicated and have difference in insert_time_ns / insert_replica_id +-- (1, 1001), (5, 2005) has full duplicates +INSERT INTO replicated_deduplicate_by_columns_r1 VALUES (1, 1001), (1, 1001), (2, 1002), (3, 1003), (4, 1004), (1, 2001), (9, 1002); +INSERT INTO replicated_deduplicate_by_columns_r2 VALUES (2, 2002), (3, 1003), (4, 1004), (5, 2005), (5, 2005); + +-- wait for syncing replicas +SYSTEM SYNC REPLICA replicated_deduplicate_by_columns_r2; +SYSTEM SYNC REPLICA replicated_deduplicate_by_columns_r1; + +SELECT 'check that we have a data'; +SELECT 'r1', id, val, count(), uniqExact(insert_time_ns), uniqExact(insert_replica_id) FROM replicated_deduplicate_by_columns_r1 GROUP BY id, val ORDER BY id, val; +SELECT 'r1', id, val, count(), uniqExact(insert_time_ns), uniqExact(insert_replica_id) FROM replicated_deduplicate_by_columns_r2 GROUP BY id, val ORDER BY id, val; + +-- NOTE: here and below we need FINAL to force deduplication in such a small set of data in only 1 part. +-- that should remove full duplicates +OPTIMIZE TABLE replicated_deduplicate_by_columns_r1 FINAL DEDUPLICATE; + +SELECT 'after old OPTIMIZE DEDUPLICATE'; +SELECT 'r1', id, val, count(), uniqExact(insert_time_ns), uniqExact(insert_replica_id) FROM replicated_deduplicate_by_columns_r1 GROUP BY id, val ORDER BY id, val; +SELECT 'r2', id, val, count(), uniqExact(insert_time_ns), uniqExact(insert_replica_id) FROM replicated_deduplicate_by_columns_r2 GROUP BY id, val ORDER BY id, val; + +OPTIMIZE TABLE replicated_deduplicate_by_columns_r1 FINAL DEDUPLICATE BY id, val; +OPTIMIZE TABLE replicated_deduplicate_by_columns_r1 FINAL DEDUPLICATE BY COLUMNS('[id, val]'); +OPTIMIZE TABLE replicated_deduplicate_by_columns_r1 FINAL DEDUPLICATE BY * EXCEPT(insert_time_ns, insert_replica_id); +OPTIMIZE TABLE replicated_deduplicate_by_columns_r1 FINAL DEDUPLICATE BY COLUMNS('[i]') EXCEPT(insert_time_ns, insert_replica_id); + +SELECT 'check data again after multiple deduplications with new syntax'; +SELECT 'r1', id, val, count(), uniqExact(insert_time_ns), uniqExact(insert_replica_id) FROM replicated_deduplicate_by_columns_r1 GROUP BY id, val ORDER BY id, val; +SELECT 'r2', id, val, count(), uniqExact(insert_time_ns), uniqExact(insert_replica_id) FROM replicated_deduplicate_by_columns_r2 GROUP BY id, val ORDER BY id, val; + +-- cleanup the mess +DROP TABLE replicated_deduplicate_by_columns_r1; +DROP TABLE replicated_deduplicate_by_columns_r2; + +SYSTEM DROP REPLICA '/clickhouse/tables/test_01581/replicated_deduplicate/replicas/r1'; +SYSTEM DROP REPLICA '/clickhouse/tables/test_01581/replicated_deduplicate/replicas/r2';