OPTIMIZE DEDUPLICATE BY columns

Extended OPTIMIZE ... DEDUPLICATE syntax to allow explicit (or implicit with asterisk/column transformers) list of columns to check for duplicates on.

Following syntax variants are now supported:

OPTIMIZE TABLE table DEDUPLICATE; -- the old one
OPTIMIZE TABLE table DEDUPLICATE BY *;
OPTIMIZE TABLE table DEDUPLICATE BY * EXCEPT colX;
OPTIMIZE TABLE table DEDUPLICATE BY * EXCEPT (colX, colY);
OPTIMIZE TABLE table DEDUPLICATE BY col1,col2,col3;
OPTIMIZE TABLE table DEDUPLICATE BY COLUMNS('column-matched-by-regex');
OPTIMIZE TABLE table DEDUPLICATE BY COLUMNS('column-matched-by-regex') EXCEPT colX;
OPTIMIZE TABLE table DEDUPLICATE BY COLUMNS('column-matched-by-regex') EXCEPT (colX, colY);

Note that * behaves just like in SELECT: MATERIALIZED, and ALIAS columns are not used for expansion.
Also, it is an error to specify empty list of columns, or write an expression that results in an empty list of columns, or deduplicate by an ALIAS column.
Column transformers other than EXCEPT are not supported.
This commit is contained in:
Vasily Nemkov 2020-12-01 10:10:12 +01:00
parent a3bcf60298
commit 70ea507dae
34 changed files with 1012 additions and 65 deletions

View File

@ -13,12 +13,12 @@ struct MultiEnum
MultiEnum() = default;
template <typename ... EnumValues, typename = std::enable_if_t<std::conjunction_v<std::is_same<EnumTypeT, EnumValues>...>>>
explicit MultiEnum(EnumValues ... v)
constexpr explicit MultiEnum(EnumValues ... v)
: MultiEnum((toBitFlag(v) | ... | 0u))
{}
template <typename ValueType, typename = std::enable_if_t<std::is_convertible_v<ValueType, StorageType>>>
explicit MultiEnum(ValueType v)
constexpr explicit MultiEnum(ValueType v)
: bitset(v)
{
static_assert(std::is_unsigned_v<ValueType>);
@ -95,5 +95,5 @@ struct MultiEnum
private:
StorageType bitset = 0;
static StorageType toBitFlag(EnumType v) { return StorageType{1} << static_cast<StorageType>(v); }
static constexpr StorageType toBitFlag(EnumType v) { return StorageType{1} << static_cast<StorageType>(v); }
};

View File

@ -32,6 +32,7 @@
#include <Common/checkStackSize.h>
#include <Interpreters/TranslateQualifiedNamesVisitor.h>
#include <Interpreters/getTableExpressions.h>
#include <Interpreters/processColumnTransformers.h>
namespace
{
@ -50,7 +51,6 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
InterpreterInsertQuery::InterpreterInsertQuery(
const ASTPtr & query_ptr_, const Context & context_, bool allow_materialized_, bool no_squash_, bool no_destination_)
: query_ptr(query_ptr_)
@ -93,27 +93,7 @@ Block InterpreterInsertQuery::getSampleBlock(
Block table_sample = metadata_snapshot->getSampleBlock();
/// Process column transformers (e.g. * EXCEPT(a)), asterisks and qualified columns.
const auto & columns = metadata_snapshot->getColumns();
auto names_and_types = columns.getOrdinary();
removeDuplicateColumns(names_and_types);
auto table_expr = std::make_shared<ASTTableExpression>();
table_expr->database_and_table_name = createTableIdentifier(table->getStorageID());
table_expr->children.push_back(table_expr->database_and_table_name);
TablesWithColumns tables_with_columns;
tables_with_columns.emplace_back(DatabaseAndTableWithAlias(*table_expr, context.getCurrentDatabase()), names_and_types);
tables_with_columns[0].addHiddenColumns(columns.getMaterialized());
tables_with_columns[0].addHiddenColumns(columns.getAliases());
tables_with_columns[0].addHiddenColumns(table->getVirtuals());
NameSet source_columns_set;
for (const auto & identifier : query.columns->children)
source_columns_set.insert(identifier->getColumnName());
TranslateQualifiedNamesVisitor::Data visitor_data(source_columns_set, tables_with_columns);
TranslateQualifiedNamesVisitor visitor(visitor_data);
auto columns_ast = query.columns->clone();
visitor.visit(columns_ast);
const auto columns_ast = processColumnTransformers(context.getCurrentDatabase(), table, metadata_snapshot, query.columns);
/// Form the block based on the column names from the query
Block res;

View File

@ -5,13 +5,18 @@
#include <Interpreters/InterpreterOptimizeQuery.h>
#include <Access/AccessRightsElement.h>
#include <Common/typeid_cast.h>
#include <Parsers/ASTExpressionList.h>
#include <Interpreters/processColumnTransformers.h>
#include <memory>
namespace DB
{
namespace ErrorCodes
{
extern const int THERE_IS_NO_COLUMN;
}
@ -27,7 +32,30 @@ BlockIO InterpreterOptimizeQuery::execute()
auto table_id = context.resolveStorageID(ast, Context::ResolveOrdinary);
StoragePtr table = DatabaseCatalog::instance().getTable(table_id, context);
auto metadata_snapshot = table->getInMemoryMetadataPtr();
table->optimize(query_ptr, metadata_snapshot, ast.partition, ast.final, ast.deduplicate, context);
// Empty list of names means we deduplicate by all columns, but user can explicitly state which columns to use.
Names column_names;
if (ast.deduplicate_by_columns)
{
// User requested custom set of columns for deduplication, possibly with Column Transformer expression.
{
const auto cols = processColumnTransformers(context.getCurrentDatabase(), table, metadata_snapshot, ast.deduplicate_by_columns);
for (const auto & col : cols->children)
column_names.emplace_back(col->getColumnName());
}
metadata_snapshot->check(column_names, NamesAndTypesList{}, table_id);
// TODO: validate that deduplicate_by_columns contains all primary key columns.
for (const auto & primary_key : metadata_snapshot->getPrimaryKeyColumns())
{
if (std::find(column_names.begin(), column_names.end(), primary_key) == column_names.end())
throw Exception("Deduplicate expression doesn't contain primary key column: " + primary_key,
ErrorCodes::THERE_IS_NO_COLUMN);
}
}
table->optimize(query_ptr, metadata_snapshot, ast.partition, ast.final, ast.deduplicate, column_names, context);
return {};
}

View File

@ -80,6 +80,8 @@
#include <ext/scope_guard.h>
#include <memory>
#include <Core/iostream_debug_helpers.h>
#include <Parsers/formatAST.h>
namespace DB
{

View File

@ -0,0 +1,49 @@
#include <Interpreters/processColumnTransformers.h>
#include <Interpreters/DatabaseAndTableWithAlias.h>
#include <Interpreters/TranslateQualifiedNamesVisitor.h>
#include <Interpreters/getTableExpressions.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/IAST.h>
#include <Storages/IStorage.h>
#include <Storages/StorageInMemoryMetadata.h>
namespace DB
{
ASTPtr processColumnTransformers(
const String & current_database,
const StoragePtr & table,
const StorageMetadataPtr & metadata_snapshot,
ASTPtr query_columns)
{
const auto & columns = metadata_snapshot->getColumns();
auto names_and_types = columns.getOrdinary();
removeDuplicateColumns(names_and_types);
TablesWithColumns tables_with_columns;
{
auto table_expr = std::make_shared<ASTTableExpression>();
table_expr->database_and_table_name = createTableIdentifier(table->getStorageID());
table_expr->children.push_back(table_expr->database_and_table_name);
tables_with_columns.emplace_back(DatabaseAndTableWithAlias(*table_expr, current_database), names_and_types);
}
tables_with_columns[0].addHiddenColumns(columns.getMaterialized());
tables_with_columns[0].addHiddenColumns(columns.getAliases());
tables_with_columns[0].addHiddenColumns(table->getVirtuals());
NameSet source_columns_set;
for (const auto & identifier : query_columns->children)
source_columns_set.insert(identifier->getColumnName());
TranslateQualifiedNamesVisitor::Data visitor_data(source_columns_set, tables_with_columns);
TranslateQualifiedNamesVisitor visitor(visitor_data);
auto columns_ast = query_columns->clone();
visitor.visit(columns_ast);
return columns_ast;
}
}

View File

@ -0,0 +1,19 @@
#pragma once
#include <Parsers/IAST_fwd.h>
#include <Storages/IStorage_fwd.h>
namespace DB
{
struct StorageInMemoryMetadata;
using StorageMetadataPtr = std::shared_ptr<const StorageInMemoryMetadata>;
/// Process column transformers (e.g. * EXCEPT(a)), asterisks and qualified columns.
ASTPtr processColumnTransformers(
const String & current_database,
const StoragePtr & table,
const StorageMetadataPtr & metadata_snapshot,
ASTPtr query_columns);
}

View File

@ -155,6 +155,7 @@ SRCS(
interpretSubquery.cpp
join_common.cpp
loadMetadata.cpp
processColumnTransformers.cpp
sortBlock.cpp
)

View File

@ -23,6 +23,12 @@ void ASTOptimizeQuery::formatQueryImpl(const FormatSettings & settings, FormatSt
if (deduplicate)
settings.ostr << (settings.hilite ? hilite_keyword : "") << " DEDUPLICATE" << (settings.hilite ? hilite_none : "");
if (deduplicate_by_columns)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " BY " << (settings.hilite ? hilite_none : "");
deduplicate_by_columns->formatImpl(settings, state, frame);
}
}
}

View File

@ -16,9 +16,11 @@ public:
/// The partition to optimize can be specified.
ASTPtr partition;
/// A flag can be specified - perform optimization "to the end" instead of one step.
bool final;
bool final = false;
/// Do deduplicate (default: false)
bool deduplicate;
/// Deduplicate by columns.
ASTPtr deduplicate_by_columns;
/** Get the text that identifies this element. */
String getID(char delim) const override
@ -37,6 +39,12 @@ public:
res->children.push_back(res->partition);
}
if (deduplicate_by_columns)
{
res->deduplicate_by_columns = deduplicate_by_columns->clone();
res->children.push_back(res->deduplicate_by_columns);
}
return res;
}

View File

@ -1273,7 +1273,7 @@ bool ParserColumnsTransformers::parseImpl(Pos & pos, ASTPtr & node, Expected & e
ParserKeyword as("AS");
ParserKeyword strict("STRICT");
if (apply.ignore(pos, expected))
if (allowed_transformers.isSet(ColumnTransformer::APPLY) && apply.ignore(pos, expected))
{
bool with_open_round_bracket = false;
@ -1326,7 +1326,7 @@ bool ParserColumnsTransformers::parseImpl(Pos & pos, ASTPtr & node, Expected & e
node = std::move(res);
return true;
}
else if (except.ignore(pos, expected))
else if (allowed_transformers.isSet(ColumnTransformer::EXCEPT) && except.ignore(pos, expected))
{
if (strict.ignore(pos, expected))
is_strict = true;
@ -1366,7 +1366,7 @@ bool ParserColumnsTransformers::parseImpl(Pos & pos, ASTPtr & node, Expected & e
node = std::move(res);
return true;
}
else if (replace.ignore(pos, expected))
else if (allowed_transformers.isSet(ColumnTransformer::REPLACE) && replace.ignore(pos, expected))
{
if (strict.ignore(pos, expected))
is_strict = true;

View File

@ -70,42 +70,81 @@ protected:
bool allow_query_parameter;
};
/** *, t.*, db.table.*, COLUMNS('<regular expression>') APPLY(...) or EXCEPT(...) or REPLACE(...)
*/
class ParserColumnsTransformers : public IParserBase
{
public:
enum class ColumnTransformer : UInt8
{
APPLY,
EXCEPT,
REPLACE,
};
using ColumnTransformers = MultiEnum<ColumnTransformer, UInt8>;
static constexpr auto AllTransformers = ColumnTransformers{ColumnTransformer::APPLY, ColumnTransformer::EXCEPT, ColumnTransformer::REPLACE};
ParserColumnsTransformers(ColumnTransformers allowed_transformers_ = AllTransformers, bool is_strict_ = false)
: allowed_transformers(allowed_transformers_)
, is_strict(is_strict_)
{}
protected:
const char * getName() const override { return "COLUMNS transformers"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
ColumnTransformers allowed_transformers;
bool is_strict;
};
/// Just *
class ParserAsterisk : public IParserBase
{
public:
using ColumnTransformers = ParserColumnsTransformers::ColumnTransformers;
ParserAsterisk(ColumnTransformers allowed_transformers_ = ParserColumnsTransformers::AllTransformers)
: allowed_transformers(allowed_transformers_)
{}
protected:
const char * getName() const override { return "asterisk"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
ColumnTransformers allowed_transformers;
};
/** Something like t.* or db.table.*
*/
class ParserQualifiedAsterisk : public IParserBase
{
public:
using ColumnTransformers = ParserColumnsTransformers::ColumnTransformers;
ParserQualifiedAsterisk(ColumnTransformers allowed_transformers_ = ParserColumnsTransformers::AllTransformers)
: allowed_transformers(allowed_transformers_)
{}
protected:
const char * getName() const override { return "qualified asterisk"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
ColumnTransformers allowed_transformers;
};
/** COLUMNS('<regular expression>')
*/
class ParserColumnsMatcher : public IParserBase
{
public:
using ColumnTransformers = ParserColumnsTransformers::ColumnTransformers;
ParserColumnsMatcher(ColumnTransformers allowed_transformers_ = ParserColumnsTransformers::AllTransformers)
: allowed_transformers(allowed_transformers_)
{}
protected:
const char * getName() const override { return "COLUMNS matcher"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
/** *, t.*, db.table.*, COLUMNS('<regular expression>') APPLY(...) or EXCEPT(...) or REPLACE(...)
*/
class ParserColumnsTransformers : public IParserBase
{
public:
ParserColumnsTransformers(bool is_strict_ = false): is_strict(is_strict_) {}
protected:
const char * getName() const override { return "COLUMNS transformers"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
bool is_strict;
ColumnTransformers allowed_transformers;
};
/** A function, for example, f(x, y + 1, g(z)).

View File

@ -1,5 +1,8 @@
#include <Parsers/IParserBase.h>
#include <Parsers/formatAST.h>
#include <IO/WriteBufferFromOStream.h>
#include <iostream>
namespace DB
{
@ -10,9 +13,20 @@ bool IParserBase::parse(Pos & pos, ASTPtr & node, Expected & expected)
return wrapParseImpl(pos, IncreaseDepthTag{}, [&]
{
// std::cerr << pos.depth << " 0x" << static_cast<const void*>(this) << " " << getName() << " parsing \"" << pos.get().begin << "\" ... " << std::endl;
bool res = parseImpl(pos, node, expected);
// std::cerr << pos.depth << " 0x" << static_cast<const void*>(this) << " " << getName() << " " << (res ? "OK" : "FAIL") << std::endl;
if (!res)
node = nullptr;
// else if (node)
// {
// std::cerr << pos.depth << " 0x" << static_cast<const void*>(this) << "\t" << std::ends;
// {
// WriteBufferFromOStream out(std::cerr, 4096);
// formatAST(*node, out);
// }
// std::cerr << std::endl;
// }
return res;
});
}

View File

@ -4,11 +4,24 @@
#include <Parsers/ASTOptimizeQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ExpressionListParsers.h>
namespace DB
{
bool ParserOptimizeQueryColumnsSpecification::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
// Do not allow APPLY and REPLACE transformers.
// Since we use Columns Transformers only to get list of columns,
// ad we can't actuall modify content of the columns for deduplication.
const auto allowed_transformers = ParserColumnsTransformers::ColumnTransformers{ParserColumnsTransformers::ColumnTransformer::EXCEPT};
return ParserColumnsMatcher(allowed_transformers).parse(pos, node, expected)
|| ParserAsterisk(allowed_transformers).parse(pos, node, expected)
|| ParserIdentifier(false).parse(pos, node, expected);
}
bool ParserOptimizeQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
@ -16,6 +29,7 @@ bool ParserOptimizeQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expecte
ParserKeyword s_partition("PARTITION");
ParserKeyword s_final("FINAL");
ParserKeyword s_deduplicate("DEDUPLICATE");
ParserKeyword s_by("BY");
ParserToken s_dot(TokenType::Dot);
ParserIdentifier name_p;
ParserPartition partition_p;
@ -55,6 +69,14 @@ bool ParserOptimizeQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expecte
if (s_deduplicate.ignore(pos, expected))
deduplicate = true;
ASTPtr deduplicate_by_columns;
if (deduplicate && s_by.ignore(pos, expected))
{
if (!ParserList(std::make_unique<ParserOptimizeQueryColumnsSpecification>(), std::make_unique<ParserToken>(TokenType::Comma))
.parse(pos, deduplicate_by_columns, expected))
return false;
}
auto query = std::make_shared<ASTOptimizeQuery>();
node = query;
@ -66,6 +88,7 @@ bool ParserOptimizeQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expecte
query->children.push_back(partition);
query->final = final;
query->deduplicate = deduplicate;
query->deduplicate_by_columns = deduplicate_by_columns;
return true;
}

View File

@ -7,6 +7,13 @@
namespace DB
{
class ParserOptimizeQueryColumnsSpecification : public IParserBase
{
protected:
const char * getName() const override { return "column specification for OPTIMIZE ... DEDUPLICATE BY"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
/** Query OPTIMIZE TABLE [db.]name [PARTITION partition] [FINAL] [DEDUPLICATE]
*/
class ParserOptimizeQuery : public IParserBase

View File

@ -0,0 +1,134 @@
#include <Parsers/ParserOptimizeQuery.h>
#include <Parsers/ParserQueryWithOutput.h>
#include <Parsers/parseQuery.h>
#include <Parsers/formatAST.h>
#include <IO/WriteBufferFromOStream.h>
#include <string_view>
#include <gtest/gtest.h>
namespace
{
using namespace DB;
using namespace std::literals;
}
struct ParserTestCase
{
std::shared_ptr<IParser> parser;
const std::string_view input_text;
const char * expected_ast = nullptr;
};
std::ostream & operator<<(std::ostream & ostr, const ParserTestCase & test_case)
{
return ostr << "parser: " << test_case.parser->getName() << ", input: " << test_case.input_text;
}
class ParserTest : public ::testing::TestWithParam<ParserTestCase>
{};
TEST_P(ParserTest, parseQuery)
{
const auto & [parser, input_text, expected_ast] = GetParam();
ASSERT_NE(nullptr, parser);
if (expected_ast)
{
ASTPtr ast;
ASSERT_NO_THROW(ast = parseQuery(*parser, input_text.begin(), input_text.end(), 0, 0));
EXPECT_EQ(expected_ast, serializeAST(*ast->clone(), false));
}
else
{
ASSERT_THROW(parseQuery(*parser, input_text.begin(), input_text.end(), 0, 0), DB::Exception);
}
}
INSTANTIATE_TEST_SUITE_P(ParserOptimizeQuery, ParserTest, ::testing::Values(
ParserTestCase
{
std::make_shared<ParserOptimizeQuery>(),
"OPTIMIZE TABLE table_name DEDUPLICATE BY COLUMNS('a, b')",
"OPTIMIZE TABLE table_name DEDUPLICATE BY COLUMNS('a, b')"
},
ParserTestCase
{
std::make_shared<ParserOptimizeQuery>(),
"OPTIMIZE TABLE table_name DEDUPLICATE BY COLUMNS('[a]')",
"OPTIMIZE TABLE table_name DEDUPLICATE BY COLUMNS('[a]')"
},
ParserTestCase
{
std::make_shared<ParserOptimizeQuery>(),
"OPTIMIZE TABLE table_name DEDUPLICATE BY COLUMNS('[a]') EXCEPT b",
"OPTIMIZE TABLE table_name DEDUPLICATE BY COLUMNS('[a]') EXCEPT b"
},
ParserTestCase
{
std::make_shared<ParserOptimizeQuery>(),
"OPTIMIZE TABLE table_name DEDUPLICATE BY COLUMNS('[a]') EXCEPT (a, b)",
"OPTIMIZE TABLE table_name DEDUPLICATE BY COLUMNS('[a]') EXCEPT (a, b)"
},
ParserTestCase
{
std::make_shared<ParserOptimizeQuery>(),
"OPTIMIZE TABLE table_name DEDUPLICATE BY a, b, c",
"OPTIMIZE TABLE table_name DEDUPLICATE BY a, b, c"
},
ParserTestCase
{
std::make_shared<ParserOptimizeQuery>(),
"OPTIMIZE TABLE table_name DEDUPLICATE BY *",
"OPTIMIZE TABLE table_name DEDUPLICATE BY *"
},
ParserTestCase
{
std::make_shared<ParserOptimizeQuery>(),
"OPTIMIZE TABLE table_name DEDUPLICATE BY * EXCEPT a",
"OPTIMIZE TABLE table_name DEDUPLICATE BY * EXCEPT a"
},
ParserTestCase
{
std::make_shared<ParserOptimizeQuery>(),
"OPTIMIZE TABLE table_name DEDUPLICATE BY * EXCEPT (a, b)",
"OPTIMIZE TABLE table_name DEDUPLICATE BY * EXCEPT (a, b)"
}
));
INSTANTIATE_TEST_SUITE_P(ParserOptimizeQuery_FAIL, ParserTest, ::testing::Values(
ParserTestCase
{
std::make_shared<ParserOptimizeQuery>(),
"OPTIMIZE TABLE table_name DEDUPLICATE BY",
},
ParserTestCase
{
std::make_shared<ParserOptimizeQuery>(),
"OPTIMIZE TABLE table_name DEDUPLICATE BY COLUMNS('[a]') APPLY(x)",
},
ParserTestCase
{
std::make_shared<ParserOptimizeQuery>(),
"OPTIMIZE TABLE table_name DEDUPLICATE BY COLUMNS('[a]') REPLACE(y)",
},
ParserTestCase
{
std::make_shared<ParserOptimizeQuery>(),
"OPTIMIZE TABLE table_name DEDUPLICATE BY * APPLY(x)",
},
ParserTestCase
{
std::make_shared<ParserOptimizeQuery>(),
"OPTIMIZE TABLE table_name DEDUPLICATE BY * REPLACE(y)",
},
ParserTestCase
{
std::make_shared<ParserOptimizeQuery>(),
"OPTIMIZE TABLE table_name DEDUPLICATE BY db.a, db.b, db.c",
}
));

View File

@ -380,6 +380,7 @@ public:
const ASTPtr & /*partition*/,
bool /*final*/,
bool /*deduplicate*/,
const Names & /* deduplicate_by_columns */,
const Context & /*context*/)
{
throw Exception("Method optimize is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);

View File

@ -642,7 +642,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
time_t time_of_merge,
const Context & context,
const ReservationPtr & space_reservation,
bool deduplicate)
bool deduplicate,
const Names & deduplicate_by_columns)
{
static const String TMP_PREFIX = "tmp_merge_";
@ -881,7 +882,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
BlockInputStreamPtr merged_stream = std::make_shared<PipelineExecutingBlockInputStream>(std::move(pipeline));
if (deduplicate)
merged_stream = std::make_shared<DistinctSortedBlockInputStream>(merged_stream, sort_description, SizeLimits(), 0 /*limit_hint*/, Names());
merged_stream = std::make_shared<DistinctSortedBlockInputStream>(merged_stream, sort_description, SizeLimits(), 0 /*limit_hint*/, deduplicate_by_columns);
if (need_remove_expired_values)
merged_stream = std::make_shared<TTLBlockInputStream>(merged_stream, data, metadata_snapshot, new_data_part, time_of_merge, force_ttl);

View File

@ -127,7 +127,8 @@ public:
time_t time_of_merge,
const Context & context,
const ReservationPtr & space_reservation,
bool deduplicate);
bool deduplicate,
const Names & deduplicate_by_columns);
/// Mutate a single data part with the specified commands. Will create and return a temporary part.
MergeTreeData::MutableDataPartPtr mutatePartToTemporaryPart(

View File

@ -6,6 +6,7 @@
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
namespace DB
@ -16,15 +17,29 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
enum FormatVersion : UInt8
{
FORMAT_WITH_CREATE_TIME = 2,
FORMAT_WITH_BLOCK_ID = 3,
FORMAT_WITH_DEDUPLICATE = 4,
FORMAT_WITH_UUID = 5,
FORMAT_WITH_DEDUPLICATE_BY_COLUMNS = 6,
FORMAT_LAST
};
void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const
{
UInt8 format_version = 4;
UInt8 format_version = FORMAT_WITH_DEDUPLICATE;
if (!deduplicate_by_columns.empty())
format_version = std::max<UInt8>(format_version, FORMAT_WITH_DEDUPLICATE_BY_COLUMNS);
/// Conditionally bump format_version only when uuid has been assigned.
/// If some other feature requires bumping format_version to >= 5 then this code becomes no-op.
if (new_part_uuid != UUIDHelpers::Nil)
format_version = std::max(format_version, static_cast<UInt8>(5));
format_version = std::max<UInt8>(format_version, FORMAT_WITH_UUID);
out << "format version: " << format_version << "\n"
<< "create_time: " << LocalDateTime(create_time ? create_time : time(nullptr)) << "\n"
@ -50,6 +65,19 @@ void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const
if (new_part_uuid != UUIDHelpers::Nil)
out << "\ninto_uuid: " << new_part_uuid;
if (!deduplicate_by_columns.empty())
{
const FormatSettings format_settings;
out << "\ndeduplicate_by_columns: [";
for (size_t i = 0; i < deduplicate_by_columns.size(); ++i)
{
writeJSONString(deduplicate_by_columns[i], out, format_settings);
if (i != deduplicate_by_columns.size() - 1)
out << ",";
}
out << "]";
}
break;
case DROP_RANGE:
@ -129,10 +157,10 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in)
in >> "format version: " >> format_version >> "\n";
if (format_version < 1 || format_version > 5)
if (format_version < 1 || format_version >= FORMAT_LAST)
throw Exception("Unknown ReplicatedMergeTreeLogEntry format version: " + DB::toString(format_version), ErrorCodes::UNKNOWN_FORMAT_VERSION);
if (format_version >= 2)
if (format_version >= FORMAT_WITH_CREATE_TIME)
{
LocalDateTime create_time_dt;
in >> "create_time: " >> create_time_dt >> "\n";
@ -141,7 +169,7 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in)
in >> "source replica: " >> source_replica >> "\n";
if (format_version >= 3)
if (format_version >= FORMAT_WITH_BLOCK_ID)
{
in >> "block_id: " >> escape >> block_id >> "\n";
}
@ -167,7 +195,7 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in)
}
in >> new_part_name;
if (format_version >= 4)
if (format_version >= FORMAT_WITH_DEDUPLICATE)
{
in >> "\ndeduplicate: " >> deduplicate;
@ -184,6 +212,21 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in)
}
else if (checkString("into_uuid: ", in))
in >> new_part_uuid;
else if (checkString("deduplicate_by_columns: [", in))
{
Strings new_deduplicate_by_columns;
for (;;)
{
String tmp_column_name;
readJSONString(tmp_column_name, in);
new_deduplicate_by_columns.emplace_back(std::move(tmp_column_name));
if (!checkString(",", in))
break;
}
assertString("]", in);
deduplicate_by_columns = std::move(new_deduplicate_by_columns);
}
else
trailing_newline_found = true;
}

View File

@ -81,6 +81,7 @@ struct ReplicatedMergeTreeLogEntryData
Strings source_parts;
bool deduplicate = false; /// Do deduplicate on merge
Strings deduplicate_by_columns = {}; // Which columns should be checked for duplicates, empty means 'all' (default).
MergeType merge_type = MergeType::REGULAR;
String column_name;
String index_name;

View File

@ -0,0 +1,311 @@
#include <Storages/MergeTree/ReplicatedMergeTreeLogEntry.h>
#include <IO/ReadBufferFromString.h>
#include <Core/iostream_debug_helpers.h>
#include <type_traits>
#include <gtest/gtest.h>
namespace DB
{
std::ostream & operator<<(std::ostream & ostr, const MergeTreeDataPartType & type)
{
return ostr << type.toString();
}
std::ostream & operator<<(std::ostream & ostr, const UInt128 & v)
{
return ostr << v.toHexString();
}
template <typename T, typename Tag>
std::ostream & operator<<(std::ostream & ostr, const StrongTypedef<T, Tag> & v)
{
return ostr << v.toUnderType();
}
std::ostream & operator<<(std::ostream & ostr, const MergeType & v)
{
return ostr << toString(v);
}
}
namespace std
{
std::ostream & operator<<(std::ostream & ostr, const std::exception_ptr & e)
{
try
{
if (e)
{
std::rethrow_exception(e);
}
return ostr << "<NULL EXCEPTION>";
}
catch (const std::exception& e)
{
return ostr << e.what();
}
}
template <typename T>
inline std::ostream& operator<<(std::ostream & ostr, const std::vector<T> & v)
{
ostr << "[";
for (size_t i = 0; i < v.size(); ++i)
{
ostr << i;
if (i != v.size() - 1)
ostr << ", ";
}
return ostr << "] (" << v.size() << ") items";
}
}
namespace
{
using namespace DB;
template <typename T>
void compareAttributes(::testing::AssertionResult & result, const char * name, const T & expected_value, const T & actual_value);
#define CMP_ATTRIBUTE(attribute) compareAttributes(result, #attribute, expected.attribute, actual.attribute)
::testing::AssertionResult compare(
const ReplicatedMergeTreeLogEntryData::ReplaceRangeEntry & expected,
const ReplicatedMergeTreeLogEntryData::ReplaceRangeEntry & actual)
{
auto result = ::testing::AssertionSuccess();
CMP_ATTRIBUTE(drop_range_part_name);
CMP_ATTRIBUTE(from_database);
CMP_ATTRIBUTE(from_table);
CMP_ATTRIBUTE(src_part_names);
CMP_ATTRIBUTE(new_part_names);
CMP_ATTRIBUTE(part_names_checksums);
CMP_ATTRIBUTE(columns_version);
return result;
}
template <typename T>
bool compare(const T & expected, const T & actual)
{
return expected == actual;
}
template <typename T>
::testing::AssertionResult compare(const std::shared_ptr<T> & expected, const std::shared_ptr<T> & actual)
{
if (!!expected != !!actual)
return ::testing::AssertionFailure()
<< "expected : " << static_cast<const void*>(expected.get())
<< "\nactual : " << static_cast<const void*>(actual.get());
if (expected && actual)
return compare(*expected, *actual);
return ::testing::AssertionSuccess();
}
template <typename T>
void compareAttributes(::testing::AssertionResult & result, const char * name, const T & expected_value, const T & actual_value)
{
const auto cmp_result = compare(expected_value, actual_value);
if (cmp_result == false)
{
if (result)
result = ::testing::AssertionFailure();
result << "\nMismatching attribute: \"" << name << "\"";
if constexpr (std::is_same_v<std::decay_t<decltype(cmp_result)>, ::testing::AssertionResult>)
result << "\n" << cmp_result.message();
else
result << "\n\texpected: " << expected_value
<< "\n\tactual : " << actual_value;
}
};
::testing::AssertionResult compare(const ReplicatedMergeTreeLogEntryData & expected, const ReplicatedMergeTreeLogEntryData & actual)
{
::testing::AssertionResult result = ::testing::AssertionSuccess();
CMP_ATTRIBUTE(znode_name);
CMP_ATTRIBUTE(type);
CMP_ATTRIBUTE(source_replica);
CMP_ATTRIBUTE(new_part_name);
CMP_ATTRIBUTE(new_part_type);
CMP_ATTRIBUTE(block_id);
CMP_ATTRIBUTE(actual_new_part_name);
CMP_ATTRIBUTE(new_part_uuid);
CMP_ATTRIBUTE(source_parts);
CMP_ATTRIBUTE(deduplicate);
CMP_ATTRIBUTE(deduplicate_by_columns);
CMP_ATTRIBUTE(merge_type);
CMP_ATTRIBUTE(column_name);
CMP_ATTRIBUTE(index_name);
CMP_ATTRIBUTE(detach);
CMP_ATTRIBUTE(replace_range_entry);
CMP_ATTRIBUTE(alter_version);
CMP_ATTRIBUTE(have_mutation);
CMP_ATTRIBUTE(columns_str);
CMP_ATTRIBUTE(metadata_str);
CMP_ATTRIBUTE(currently_executing);
CMP_ATTRIBUTE(removed_by_other_entry);
CMP_ATTRIBUTE(num_tries);
CMP_ATTRIBUTE(exception);
CMP_ATTRIBUTE(last_attempt_time);
CMP_ATTRIBUTE(num_postponed);
CMP_ATTRIBUTE(postpone_reason);
CMP_ATTRIBUTE(last_postpone_time);
CMP_ATTRIBUTE(create_time);
CMP_ATTRIBUTE(quorum);
return result;
}
}
// TEST(ReplicatedMergeTreeLogEntryData, writeToText)
// {
// const ReplicatedMergeTreeLogEntryData expected
// {
// .type = ReplicatedMergeTreeLogEntryData::MERGE_PARTS,
// .new_part_uuid = UUIDHelpers::generateV4(),
// .deduplicate_by_columns = {"foo", "bar", "quix"},
// .alter_version = 123456,
// };
// ReplicatedMergeTreeLogEntryData actual;
// {
// const auto str = actual.toString();
// DB::ReadBufferFromString buffer(str);
// EXPECT_NO_THROW(expected.readText(buffer)) << "While reading:\n" << str;
// }
// ASSERT_TRUE(compare(expected, actual));
// }
class ReplicatedMergeTreeLogEntryDataTest : public ::testing::TestWithParam<ReplicatedMergeTreeLogEntryData>
{};
TEST_P(ReplicatedMergeTreeLogEntryDataTest, transcode)
{
const auto & expected = GetParam();
const auto str = expected.toString();
ReplicatedMergeTreeLogEntryData actual;
// To simplify comparison, since it is rarely set.
actual.alter_version = expected.alter_version;
{
DB::ReadBufferFromString buffer(str);
EXPECT_NO_THROW(actual.readText(buffer)) << "While reading:\n" << str;
}
ASSERT_TRUE(compare(expected, actual)) << "Via text:\n" << str;
}
INSTANTIATE_TEST_SUITE_P(Merge, ReplicatedMergeTreeLogEntryDataTest,
::testing::ValuesIn(std::initializer_list<ReplicatedMergeTreeLogEntryData>{
{
// Basic: minimal set of attributes.
.type = ReplicatedMergeTreeLogEntryData::MERGE_PARTS,
.new_part_type = MergeTreeDataPartType::WIDE,
.alter_version = 0,
.create_time = 123,
},
{
.type = ReplicatedMergeTreeLogEntryData::MERGE_PARTS,
.new_part_type = MergeTreeDataPartType::WIDE,
// Format version 4
.deduplicate = true,
.alter_version = 0,
.create_time = 123,
},
{
.type = ReplicatedMergeTreeLogEntryData::MERGE_PARTS,
.new_part_type = MergeTreeDataPartType::WIDE,
// Format version 5
.new_part_uuid = UUID(UInt128(123456789, 10111213141516)),
.alter_version = 0,
.create_time = 123,
},
{
.type = ReplicatedMergeTreeLogEntryData::MERGE_PARTS,
.new_part_type = MergeTreeDataPartType::WIDE,
// Format version 6
.deduplicate = true,
.deduplicate_by_columns = {"foo", "bar", "quix"},
.alter_version = 0,
.create_time = 123,
},
{
.type = ReplicatedMergeTreeLogEntryData::MERGE_PARTS,
.new_part_type = MergeTreeDataPartType::WIDE,
// Mixing features
.new_part_uuid = UUID(UInt128(123456789, 10111213141516)),
.deduplicate = true,
.deduplicate_by_columns = {"foo", "bar", "quix"},
.alter_version = 0,
.create_time = 123,
},
}));
// INSTANTIATE_TEST_SUITE_P(Full, ReplicatedMergeTreeLogEntryDataTest,
// ::testing::ValuesIn(std::initializer_list<ReplicatedMergeTreeLogEntryData>{
// {
// .znode_name = "znode name",
// .type = ReplicatedMergeTreeLogEntryData::MERGE_PARTS,
// .source_replica = "source replica",
// .new_part_name = "new part name",
// .new_part_type = MergeTreeDataPartType::WIDE,
// .block_id = "block id",
// .actual_new_part_name = "new part name",
// .new_part_uuid = UUID(UInt128(123456789, 10111213141516)),
// .source_parts = {"part1", "part2"},
// .deduplicate = true,
// .deduplicate_by_columns = {"col1", "col2"},
// .merge_type = MergeType::REGULAR,
// .column_name = "column name",
// .index_name = "index name",
// .detach = false,
// .replace_range_entry = std::make_shared<ReplicatedMergeTreeLogEntryData::ReplaceRangeEntry>(
// ReplicatedMergeTreeLogEntryData::ReplaceRangeEntry
// {
// .drop_range_part_name = "drop range part name",
// .from_database = "from database",
// .src_part_names = {"src part name1", "src part name2"},
// .new_part_names = {"new part name1", "new part name2"},
// .columns_version = 123456,
// }),
// .alter_version = 56789,
// .have_mutation = false,
// .columns_str = "columns str",
// .metadata_str = "metadata str",
// // Those attributes are not serialized to string, hence it makes no sense to set.
// // .currently_executing
// // .removed_by_other_entry
// // .num_tries
// // .exception
// // .last_attempt_time
// // .num_postponed
// // .postpone_reason
// // .last_postpone_time,
// .create_time = static_cast<time_t>(123456789),
// .quorum = 321,
// },
// }));

View File

@ -583,7 +583,7 @@ void StorageBuffer::shutdown()
try
{
optimize(nullptr /*query*/, getInMemoryMetadataPtr(), {} /*partition*/, false /*final*/, false /*deduplicate*/, global_context);
optimize(nullptr /*query*/, getInMemoryMetadataPtr(), {} /*partition*/, false /*final*/, false /*deduplicate*/, {}, global_context);
}
catch (...)
{
@ -608,6 +608,7 @@ bool StorageBuffer::optimize(
const ASTPtr & partition,
bool final,
bool deduplicate,
const Names & /* deduplicate_by_columns */,
const Context & /*context*/)
{
if (partition)
@ -906,7 +907,7 @@ void StorageBuffer::alter(const AlterCommands & params, const Context & context,
/// Flush all buffers to storages, so that no non-empty blocks of the old
/// structure remain. Structure of empty blocks will be updated during first
/// insert.
optimize({} /*query*/, metadata_snapshot, {} /*partition_id*/, false /*final*/, false /*deduplicate*/, context);
optimize({} /*query*/, metadata_snapshot, {} /*partition_id*/, false /*final*/, false /*deduplicate*/, {}, context);
StorageInMemoryMetadata new_metadata = *metadata_snapshot;
params.apply(new_metadata, context);

View File

@ -87,6 +87,7 @@ public:
const ASTPtr & partition,
bool final,
bool deduplicate,
const Names & deduplicate_by_columns,
const Context & context) override;
bool supportsSampling() const override { return true; }

View File

@ -233,12 +233,13 @@ bool StorageMaterializedView::optimize(
const ASTPtr & partition,
bool final,
bool deduplicate,
const Names & deduplicate_by_columns,
const Context & context)
{
checkStatementCanBeForwarded();
auto storage_ptr = getTargetTable();
auto metadata_snapshot = storage_ptr->getInMemoryMetadataPtr();
return getTargetTable()->optimize(query, metadata_snapshot, partition, final, deduplicate, context);
return getTargetTable()->optimize(query, metadata_snapshot, partition, final, deduplicate, deduplicate_by_columns, context);
}
void StorageMaterializedView::alter(

View File

@ -46,6 +46,7 @@ public:
const ASTPtr & partition,
bool final,
bool deduplicate,
const Names & deduplicate_by_columns,
const Context & context) override;
void alter(const AlterCommands & params, const Context & context, TableLockHolder & table_lock_holder) override;

View File

@ -741,6 +741,7 @@ bool StorageMergeTree::merge(
const String & partition_id,
bool final,
bool deduplicate,
const Names & deduplicate_by_columns,
String * out_disable_reason,
bool optimize_skip_merged_partitions)
{
@ -758,10 +759,15 @@ bool StorageMergeTree::merge(
if (!merge_mutate_entry)
return false;
return mergeSelectedParts(metadata_snapshot, deduplicate, *merge_mutate_entry, table_lock_holder);
return mergeSelectedParts(metadata_snapshot, deduplicate, deduplicate_by_columns, *merge_mutate_entry, table_lock_holder);
}
bool StorageMergeTree::mergeSelectedParts(const StorageMetadataPtr & metadata_snapshot, bool deduplicate, MergeMutateSelectedEntry & merge_mutate_entry, TableLockHolder & table_lock_holder)
bool StorageMergeTree::mergeSelectedParts(
const StorageMetadataPtr & metadata_snapshot,
bool deduplicate,
const Names & deduplicate_by_columns,
MergeMutateSelectedEntry & merge_mutate_entry,
TableLockHolder & table_lock_holder)
{
auto & future_part = merge_mutate_entry.future_part;
Stopwatch stopwatch;
@ -786,7 +792,7 @@ bool StorageMergeTree::mergeSelectedParts(const StorageMetadataPtr & metadata_sn
{
new_part = merger_mutator.mergePartsToTemporaryPart(
future_part, metadata_snapshot, *(merge_list_entry), table_lock_holder, time(nullptr),
global_context, merge_mutate_entry.tagger->reserved_space, deduplicate);
global_context, merge_mutate_entry.tagger->reserved_space, deduplicate, deduplicate_by_columns);
merger_mutator.renameMergedTemporaryPart(new_part, future_part.parts, nullptr);
write_part_log({});
@ -953,7 +959,7 @@ std::optional<JobAndPool> StorageMergeTree::getDataProcessingJob()
return JobAndPool{[this, metadata_snapshot, merge_entry, mutate_entry, share_lock] () mutable
{
if (merge_entry)
mergeSelectedParts(metadata_snapshot, false, *merge_entry, share_lock);
mergeSelectedParts(metadata_snapshot, false, {}, *merge_entry, share_lock);
else if (mutate_entry)
mutateSelectedPart(metadata_snapshot, *mutate_entry, share_lock);
}, PoolType::MERGE_MUTATE};
@ -1036,6 +1042,7 @@ bool StorageMergeTree::optimize(
const ASTPtr & partition,
bool final,
bool deduplicate,
const Names & deduplicate_by_columns,
const Context & context)
{
String disable_reason;
@ -1049,7 +1056,7 @@ bool StorageMergeTree::optimize(
for (const String & partition_id : partition_ids)
{
if (!merge(true, partition_id, true, deduplicate, &disable_reason, context.getSettingsRef().optimize_skip_merged_partitions))
if (!merge(true, partition_id, true, deduplicate, deduplicate_by_columns, &disable_reason, context.getSettingsRef().optimize_skip_merged_partitions))
{
constexpr const char * message = "Cannot OPTIMIZE table: {}";
if (disable_reason.empty())
@ -1068,7 +1075,7 @@ bool StorageMergeTree::optimize(
if (partition)
partition_id = getPartitionIDFromQuery(partition, context);
if (!merge(true, partition_id, final, deduplicate, &disable_reason, context.getSettingsRef().optimize_skip_merged_partitions))
if (!merge(true, partition_id, final, deduplicate, deduplicate_by_columns, &disable_reason, context.getSettingsRef().optimize_skip_merged_partitions))
{
constexpr const char * message = "Cannot OPTIMIZE table: {}";
if (disable_reason.empty())

View File

@ -70,6 +70,7 @@ public:
const ASTPtr & partition,
bool final,
bool deduplicate,
const Names & deduplicate_by_columns,
const Context & context) override;
void mutate(const MutationCommands & commands, const Context & context) override;
@ -132,7 +133,7 @@ private:
* If aggressive - when selects parts don't takes into account their ratio size and novelty (used for OPTIMIZE query).
* Returns true if merge is finished successfully.
*/
bool merge(bool aggressive, const String & partition_id, bool final, bool deduplicate, String * out_disable_reason = nullptr, bool optimize_skip_merged_partitions = false);
bool merge(bool aggressive, const String & partition_id, bool final, bool deduplicate, const Names & deduplicate_by_columns, String * out_disable_reason = nullptr, bool optimize_skip_merged_partitions = false);
ActionLock stopMergesAndWait();
@ -183,7 +184,8 @@ private:
TableLockHolder & table_lock_holder,
bool optimize_skip_merged_partitions = false,
SelectPartsDecision * select_decision_out = nullptr);
bool mergeSelectedParts(const StorageMetadataPtr & metadata_snapshot, bool deduplicate, MergeMutateSelectedEntry & entry, TableLockHolder & table_lock_holder);
bool mergeSelectedParts(const StorageMetadataPtr & metadata_snapshot, bool deduplicate, const Names & deduplicate_by_columns, MergeMutateSelectedEntry & entry, TableLockHolder & table_lock_holder);
std::shared_ptr<MergeMutateSelectedEntry> selectPartsToMutate(const StorageMetadataPtr & metadata_snapshot, String * disable_reason, TableLockHolder & table_lock_holder);
bool mutateSelectedPart(const StorageMetadataPtr & metadata_snapshot, MergeMutateSelectedEntry & entry, TableLockHolder & table_lock_holder);

View File

@ -122,9 +122,10 @@ public:
const ASTPtr & partition,
bool final,
bool deduplicate,
const Names & deduplicate_by_columns,
const Context & context) override
{
return getNested()->optimize(query, metadata_snapshot, partition, final, deduplicate, context);
return getNested()->optimize(query, metadata_snapshot, partition, final, deduplicate, deduplicate_by_columns, context);
}
void mutate(const MutationCommands & commands, const Context & context) override { getNested()->mutate(commands, context); }

View File

@ -1507,7 +1507,7 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
{
part = merger_mutator.mergePartsToTemporaryPart(
future_merged_part, metadata_snapshot, *merge_entry,
table_lock, entry.create_time, global_context, reserved_space, entry.deduplicate);
table_lock, entry.create_time, global_context, reserved_space, entry.deduplicate, entry.deduplicate_by_columns);
merger_mutator.renameMergedTemporaryPart(part, parts, &transaction);
@ -2711,6 +2711,7 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
const auto storage_settings_ptr = getSettings();
const bool deduplicate = false; /// TODO: read deduplicate option from table config
const Names deduplicate_by_columns = {};
CreateMergeEntryResult create_result = CreateMergeEntryResult::Other;
try
@ -2761,6 +2762,7 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
future_merged_part.uuid,
future_merged_part.type,
deduplicate,
deduplicate_by_columns,
nullptr,
merge_pred.getVersion(),
future_merged_part.merge_type);
@ -2850,6 +2852,7 @@ StorageReplicatedMergeTree::CreateMergeEntryResult StorageReplicatedMergeTree::c
const UUID & merged_part_uuid,
const MergeTreeDataPartType & merged_part_type,
bool deduplicate,
const Names & deduplicate_by_columns,
ReplicatedMergeTreeLogEntryData * out_log_entry,
int32_t log_version,
MergeType merge_type)
@ -2887,6 +2890,7 @@ StorageReplicatedMergeTree::CreateMergeEntryResult StorageReplicatedMergeTree::c
entry.new_part_type = merged_part_type;
entry.merge_type = merge_type;
entry.deduplicate = deduplicate;
entry.deduplicate_by_columns = deduplicate_by_columns;
entry.merge_type = merge_type;
entry.create_time = time(nullptr);
@ -3855,6 +3859,7 @@ BlockOutputStreamPtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/,
const Settings & query_settings = context.getSettingsRef();
bool deduplicate = storage_settings_ptr->replicated_deduplication_window != 0 && query_settings.insert_deduplicate;
// TODO: should we also somehow pass list of columns to deduplicate on to the ReplicatedMergeTreeBlockOutputStream ?
return std::make_shared<ReplicatedMergeTreeBlockOutputStream>(
*this, metadata_snapshot, query_settings.insert_quorum,
query_settings.insert_quorum_timeout.totalMilliseconds(),
@ -3870,6 +3875,7 @@ bool StorageReplicatedMergeTree::optimize(
const ASTPtr & partition,
bool final,
bool deduplicate,
const Names & deduplicate_by_columns,
const Context & query_context)
{
assertNotReadonly();
@ -3927,7 +3933,8 @@ bool StorageReplicatedMergeTree::optimize(
ReplicatedMergeTreeLogEntryData merge_entry;
CreateMergeEntryResult create_result = createLogEntryToMergeParts(
zookeeper, future_merged_part.parts,
future_merged_part.name, future_merged_part.uuid, future_merged_part.type, deduplicate,
future_merged_part.name, future_merged_part.uuid, future_merged_part.type,
deduplicate, deduplicate_by_columns,
&merge_entry, can_merge.getVersion(), future_merged_part.merge_type);
if (create_result == CreateMergeEntryResult::MissingPart)
@ -3988,7 +3995,8 @@ bool StorageReplicatedMergeTree::optimize(
ReplicatedMergeTreeLogEntryData merge_entry;
CreateMergeEntryResult create_result = createLogEntryToMergeParts(
zookeeper, future_merged_part.parts,
future_merged_part.name, future_merged_part.uuid, future_merged_part.type, deduplicate,
future_merged_part.name, future_merged_part.uuid, future_merged_part.type,
deduplicate, deduplicate_by_columns,
&merge_entry, can_merge.getVersion(), future_merged_part.merge_type);
if (create_result == CreateMergeEntryResult::MissingPart)

View File

@ -119,6 +119,7 @@ public:
const ASTPtr & partition,
bool final,
bool deduplicate,
const Names & deduplicate_by_columns,
const Context & query_context) override;
void alter(const AlterCommands & commands, const Context & query_context, TableLockHolder & table_lock_holder) override;
@ -469,6 +470,7 @@ private:
const UUID & merged_part_uuid,
const MergeTreeDataPartType & merged_part_type,
bool deduplicate,
const Names & deduplicate_by_columns,
ReplicatedMergeTreeLogEntryData * out_log_entry,
int32_t log_version,
MergeType merge_type);

View File

@ -0,0 +1,39 @@
TOTAL rows 4
OLD DEDUPLICATE
0 0 0
1 1 2
1 1 3
DEDUPLICATE BY *
0 0 0
1 1 2
1 1 3
DEDUPLICATE BY * EXCEPT mat
0 0 0
1 1 2
1 1 3
DEDUPLICATE BY pk,sk,val,mat
0 0 0
1 1 2
1 1 3
Can not remove full duplicates
OLD DEDUPLICATE
4
DEDUPLICATE BY pk,sk,val,mat
4
Remove partial duplicates
DEDUPLICATE BY *
3
DEDUPLICATE BY * EXCEPT mat
0 0 0
1 1 2
1 1 3
DEDUPLICATE BY COLUMNS("*") EXCEPT mat
0 0 0
1 1 2
1 1 3
DEDUPLICATE BY pk,sk
0 0 0
1 1 2
DEDUPLICATE BY COLUMNS(".*k")
0 0 0
1 1 2

View File

@ -0,0 +1,114 @@
--- local case
-- Just in case if previous tests run left some stuff behind.
DROP TABLE IF EXISTS source_data;
CREATE TABLE source_data (
pk Int32, sk Int32, val UInt32,
PRIMARY KEY (pk)
) ENGINE=MergeTree
ORDER BY (pk, sk);
INSERT INTO source_data VALUES (0, 0, 0), (0, 0, 0), (1, 1, 2), (1, 1, 3);
SELECT 'TOTAL rows', count() FROM source_data;
DROP TABLE IF EXISTS full_duplicates;
-- table with duplicates on MATERIALIZED columns
CREATE TABLE full_duplicates (
pk Int32, sk Int32, val UInt32, mat UInt32 MATERIALIZED 12345, alias UInt32 ALIAS 2,
PRIMARY KEY (pk)
) ENGINE=MergeTree
ORDER BY (pk, sk);
-- ERROR cases
OPTIMIZE TABLE full_duplicates DEDUPLICATE BY pk, sk, val, mat, alias; -- { serverError 16 } -- alias column is present
OPTIMIZE TABLE full_duplicates DEDUPLICATE BY sk, val; -- { serverError 8 } -- primary key column is missing
OPTIMIZE TABLE full_duplicates DEDUPLICATE BY; -- { serverError 51 } -- list is empty
OPTIMIZE TABLE full_duplicates DEDUPLICATE BY * EXCEPT(pk, sk, val, mat, alias); -- { serverError 51 } -- list is empty
OPTIMIZE TABLE full_duplicates DEDUPLICATE BY * EXCEPT(pk); -- { serverError 8 } -- primary key column is missing
OPTIMIZE TABLE partial_duplicates DEDUPLICATE BY pk,sk,val,mat EXCEPT mat; -- { clientError 62 } -- invalid syntax
-- Valid cases
-- NOTE: here and below we need FINAL to force deduplication in such a small set of data in only 1 part.
SELECT 'OLD DEDUPLICATE';
INSERT INTO full_duplicates SELECT * FROM source_data;
OPTIMIZE TABLE full_duplicates FINAL DEDUPLICATE;
SELECT * FROM full_duplicates;
TRUNCATE full_duplicates;
SELECT 'DEDUPLICATE BY *';
INSERT INTO full_duplicates SELECT * FROM source_data;
OPTIMIZE TABLE full_duplicates FINAL DEDUPLICATE BY *;
SELECT * FROM full_duplicates;
TRUNCATE full_duplicates;
SELECT 'DEDUPLICATE BY * EXCEPT mat';
INSERT INTO full_duplicates SELECT * FROM source_data;
OPTIMIZE TABLE full_duplicates FINAL DEDUPLICATE BY * EXCEPT mat;
SELECT * FROM full_duplicates;
TRUNCATE full_duplicates;
SELECT 'DEDUPLICATE BY pk,sk,val,mat';
INSERT INTO full_duplicates SELECT * FROM source_data;
OPTIMIZE TABLE full_duplicates FINAL DEDUPLICATE BY pk,sk,val,mat;
SELECT * FROM full_duplicates;
TRUNCATE full_duplicates;
--DROP TABLE full_duplicates;
-- Now to the partial duplicates when MATERIALIZED column alway has unique value.
DROP TABLE IF EXISTS partial_duplicates;
CREATE TABLE partial_duplicates (
pk Int32, sk Int32, val UInt32, mat UInt32 MATERIALIZED rand(), alias UInt32 ALIAS 2,
PRIMARY KEY (pk)
) ENGINE=MergeTree
ORDER BY (pk, sk);
SELECT 'Can not remove full duplicates';
-- should not remove anything
SELECT 'OLD DEDUPLICATE';
INSERT INTO partial_duplicates SELECT * FROM source_data;
OPTIMIZE TABLE partial_duplicates FINAL DEDUPLICATE;
SELECT count() FROM partial_duplicates;
TRUNCATE partial_duplicates;
SELECT 'DEDUPLICATE BY pk,sk,val,mat';
INSERT INTO partial_duplicates SELECT * FROM source_data;
OPTIMIZE TABLE partial_duplicates FINAL DEDUPLICATE BY pk,sk,val,mat;
SELECT count() FROM partial_duplicates;
TRUNCATE partial_duplicates;
SELECT 'Remove partial duplicates';
SELECT 'DEDUPLICATE BY *'; -- all except MATERIALIZED columns, hence will reduce number of rows.
INSERT INTO partial_duplicates SELECT * FROM source_data;
OPTIMIZE TABLE partial_duplicates FINAL DEDUPLICATE BY *;
SELECT count() FROM partial_duplicates;
TRUNCATE partial_duplicates;
SELECT 'DEDUPLICATE BY * EXCEPT mat';
INSERT INTO partial_duplicates SELECT * FROM source_data;
OPTIMIZE TABLE partial_duplicates FINAL DEDUPLICATE BY * EXCEPT mat;
SELECT * FROM partial_duplicates;
TRUNCATE partial_duplicates;
SELECT 'DEDUPLICATE BY COLUMNS("*") EXCEPT mat';
INSERT INTO partial_duplicates SELECT * FROM source_data;
OPTIMIZE TABLE partial_duplicates FINAL DEDUPLICATE BY COLUMNS('.*') EXCEPT mat;
SELECT * FROM partial_duplicates;
TRUNCATE partial_duplicates;
SELECT 'DEDUPLICATE BY pk,sk';
INSERT INTO partial_duplicates SELECT * FROM source_data;
OPTIMIZE TABLE partial_duplicates FINAL DEDUPLICATE BY pk,sk;
SELECT * FROM partial_duplicates;
TRUNCATE partial_duplicates;
SELECT 'DEDUPLICATE BY COLUMNS(".*k")';
INSERT INTO partial_duplicates SELECT * FROM source_data;
OPTIMIZE TABLE partial_duplicates FINAL DEDUPLICATE BY COLUMNS('.*k');
SELECT * FROM partial_duplicates;
TRUNCATE partial_duplicates;

View File

@ -0,0 +1,47 @@
check that we have a data
r1 1 1001 2 1 1
r1 1 2001 1 1 1
r1 2 1002 1 1 1
r1 2 2002 1 1 1
r1 3 1003 2 2 2
r1 4 1004 2 2 2
r1 5 2005 2 1 1
r1 9 1002 1 1 1
r1 1 1001 2 1 1
r1 1 2001 1 1 1
r1 2 1002 1 1 1
r1 2 2002 1 1 1
r1 3 1003 2 2 2
r1 4 1004 2 2 2
r1 5 2005 2 1 1
r1 9 1002 1 1 1
after old OPTIMIZE DEDUPLICATE
r1 1 1001 1 1 1
r1 1 2001 1 1 1
r1 2 1002 1 1 1
r1 2 2002 1 1 1
r1 3 1003 2 2 2
r1 4 1004 2 2 2
r1 5 2005 1 1 1
r1 9 1002 1 1 1
r2 1 1001 1 1 1
r2 1 2001 1 1 1
r2 2 1002 1 1 1
r2 2 2002 1 1 1
r2 3 1003 2 2 2
r2 4 1004 2 2 2
r2 5 2005 1 1 1
r2 9 1002 1 1 1
check data again after multiple deduplications with new syntax
r1 1 1001 1 1 1
r1 2 1002 1 1 1
r1 3 1003 1 1 1
r1 4 1004 1 1 1
r1 5 2005 1 1 1
r1 9 1002 1 1 1
r2 1 1001 1 1 1
r2 2 1002 1 1 1
r2 3 1003 1 1 1
r2 4 1004 1 1 1
r2 5 2005 1 1 1
r2 9 1002 1 1 1

View File

@ -0,0 +1,55 @@
--- Put it into tests/queries/0_stateless/01581_deduplicate_by_columns.sql
--- replicated case
-- Just in case if previous tests run left some stuff behind.
DROP TABLE IF EXISTS replicated_deduplicate_by_columns_r1;
DROP TABLE IF EXISTS replicated_deduplicate_by_columns_r2;
SET replication_alter_partitions_sync = 2;
-- IRL insert_replica_id were filled from hostname
CREATE TABLE IF NOT EXISTS replicated_deduplicate_by_columns_r1 (
id Int32, val UInt32, insert_time_ns DateTime64(9) MATERIALIZED now64(9), insert_replica_id UInt8 MATERIALIZED randConstant()
) ENGINE=ReplicatedMergeTree('/clickhouse/tables/test_01581/replicated_deduplicate', 'r1') ORDER BY id;
CREATE TABLE IF NOT EXISTS replicated_deduplicate_by_columns_r2 (
id Int32, val UInt32, insert_time_ns DateTime64(9) MATERIALIZED now64(9), insert_replica_id UInt8 MATERIALIZED randConstant()
) ENGINE=ReplicatedMergeTree('/clickhouse/tables/test_01581/replicated_deduplicate', 'r2') ORDER BY id;
-- insert some data, 2 records: (3, 1003), (4, 1004) are duplicated and have difference in insert_time_ns / insert_replica_id
-- (1, 1001), (5, 2005) has full duplicates
INSERT INTO replicated_deduplicate_by_columns_r1 VALUES (1, 1001), (1, 1001), (2, 1002), (3, 1003), (4, 1004), (1, 2001), (9, 1002);
INSERT INTO replicated_deduplicate_by_columns_r2 VALUES (2, 2002), (3, 1003), (4, 1004), (5, 2005), (5, 2005);
-- wait for syncing replicas
SYSTEM SYNC REPLICA replicated_deduplicate_by_columns_r2;
SYSTEM SYNC REPLICA replicated_deduplicate_by_columns_r1;
SELECT 'check that we have a data';
SELECT 'r1', id, val, count(), uniqExact(insert_time_ns), uniqExact(insert_replica_id) FROM replicated_deduplicate_by_columns_r1 GROUP BY id, val ORDER BY id, val;
SELECT 'r1', id, val, count(), uniqExact(insert_time_ns), uniqExact(insert_replica_id) FROM replicated_deduplicate_by_columns_r2 GROUP BY id, val ORDER BY id, val;
-- NOTE: here and below we need FINAL to force deduplication in such a small set of data in only 1 part.
-- that should remove full duplicates
OPTIMIZE TABLE replicated_deduplicate_by_columns_r1 FINAL DEDUPLICATE;
SELECT 'after old OPTIMIZE DEDUPLICATE';
SELECT 'r1', id, val, count(), uniqExact(insert_time_ns), uniqExact(insert_replica_id) FROM replicated_deduplicate_by_columns_r1 GROUP BY id, val ORDER BY id, val;
SELECT 'r2', id, val, count(), uniqExact(insert_time_ns), uniqExact(insert_replica_id) FROM replicated_deduplicate_by_columns_r2 GROUP BY id, val ORDER BY id, val;
OPTIMIZE TABLE replicated_deduplicate_by_columns_r1 FINAL DEDUPLICATE BY id, val;
OPTIMIZE TABLE replicated_deduplicate_by_columns_r1 FINAL DEDUPLICATE BY COLUMNS('[id, val]');
OPTIMIZE TABLE replicated_deduplicate_by_columns_r1 FINAL DEDUPLICATE BY * EXCEPT(insert_time_ns, insert_replica_id);
OPTIMIZE TABLE replicated_deduplicate_by_columns_r1 FINAL DEDUPLICATE BY COLUMNS('[i]') EXCEPT(insert_time_ns, insert_replica_id);
SELECT 'check data again after multiple deduplications with new syntax';
SELECT 'r1', id, val, count(), uniqExact(insert_time_ns), uniqExact(insert_replica_id) FROM replicated_deduplicate_by_columns_r1 GROUP BY id, val ORDER BY id, val;
SELECT 'r2', id, val, count(), uniqExact(insert_time_ns), uniqExact(insert_replica_id) FROM replicated_deduplicate_by_columns_r2 GROUP BY id, val ORDER BY id, val;
-- cleanup the mess
DROP TABLE replicated_deduplicate_by_columns_r1;
DROP TABLE replicated_deduplicate_by_columns_r2;
SYSTEM DROP REPLICA '/clickhouse/tables/test_01581/replicated_deduplicate/replicas/r1';
SYSTEM DROP REPLICA '/clickhouse/tables/test_01581/replicated_deduplicate/replicas/r2';