diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeData.h b/dbms/include/DB/Storages/MergeTree/MergeTreeData.h index a83c32117e7..20f700957b9 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeData.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeData.h @@ -64,6 +64,8 @@ namespace ErrorCodes * - Collapsing - при склейке кусков "схлопывать" * пары записей с разными значениями sign_column для одного значения первичного ключа. * (см. CollapsingSortedBlockInputStream.h) + * - Replacing - при склейке кусков, при совпадении PK, оставлять только одну строчку + * - последнюю, либо, если задан столбец "версия" - последнюю среди строчек с максимальной версией. * - Summing - при склейке кусков, при совпадении PK суммировать все числовые столбцы, не входящие в PK. * - Aggregating - при склейке кусков, при совпадении PK, делается слияние состояний столбцов-агрегатных функций. * - Unsorted - при склейке кусков, данные не упорядочиваются, а всего лишь конкатенируются; @@ -192,6 +194,20 @@ public: Summing = 2, Aggregating = 3, Unsorted = 4, + Replacing = 5, + }; + + /// Настройки для разных режимов работы. + struct MergingParams + { + /// Для Collapsing режима. + String sign_column; + + /// Для Summing режима. Если пустое - то выбирается автоматически. + Names columns_to_sum; + + /// Для Replacing режима. Может быть пустым. + String version_column; }; static void doNothing(const String & name) {} @@ -215,8 +231,7 @@ public: const ASTPtr & sampling_expression_, /// nullptr, если семплирование не поддерживается. size_t index_granularity_, Mode mode_, - const String & sign_column_, /// Для Collapsing режима. - const Names & columns_to_sum_, /// Для Summing режима. Если пустое - то выбирается автоматически. + const MergingParams & merging_params_, const MergeTreeSettings & settings_, const String & log_name_, bool require_part_metadata_, @@ -433,10 +448,7 @@ public: /// Режим работы - какие дополнительные действия делать при мердже. const Mode mode; - /// Для схлопывания записей об изменениях, если используется Collapsing режим работы. - const String sign_column; - /// Для суммирования, если используется Summing режим работы. - const Names columns_to_sum; + const MergingParams merging_params; const MergeTreeSettings settings; diff --git a/dbms/include/DB/Storages/StorageMergeTree.h b/dbms/include/DB/Storages/StorageMergeTree.h index a965757f596..214191f18f1 100644 --- a/dbms/include/DB/Storages/StorageMergeTree.h +++ b/dbms/include/DB/Storages/StorageMergeTree.h @@ -41,8 +41,7 @@ public: const ASTPtr & sampling_expression_, /// nullptr, если семплирование не поддерживается. size_t index_granularity_, MergeTreeData::Mode mode_, - const String & sign_column_, /// Для Collapsing режима. - const Names & columns_to_sum_, /// Для Summing режима. + const MergeTreeData::MergingParams & merging_params_, const MergeTreeSettings & settings_); void shutdown() override; @@ -185,8 +184,7 @@ private: const ASTPtr & sampling_expression_, /// nullptr, если семплирование не поддерживается. size_t index_granularity_, MergeTreeData::Mode mode_, - const String & sign_column_, - const Names & columns_to_sum_, + const MergeTreeData::MergingParams & merging_params_, const MergeTreeSettings & settings_); /** Определяет, какие куски нужно объединять, и объединяет их. diff --git a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h index c717108b608..7d8c3e13607 100644 --- a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h +++ b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h @@ -87,8 +87,7 @@ public: const ASTPtr & sampling_expression_, /// nullptr, если семплирование не поддерживается. size_t index_granularity_, MergeTreeData::Mode mode_, - const String & sign_column_, /// Для Collapsing режима. - const Names & columns_to_sum_, /// Для Summing режима. + const MergeTreeData::MergingParams & merging_params_, const MergeTreeSettings & settings_); void shutdown() override; @@ -319,8 +318,7 @@ private: const ASTPtr & sampling_expression_, size_t index_granularity_, MergeTreeData::Mode mode_, - const String & sign_column_, - const Names & columns_to_sum_, + const MergeTreeData::MergingParams & merging_params_, const MergeTreeSettings & settings_); /// Инициализация. diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index 458ed8c2d64..c1068acaa4c 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -37,8 +37,7 @@ MergeTreeData::MergeTreeData( const String & date_column_name_, const ASTPtr & sampling_expression_, size_t index_granularity_, Mode mode_, - const String & sign_column_, - const Names & columns_to_sum_, + const MergingParams & merging_params_, const MergeTreeSettings & settings_, const String & log_name_, bool require_part_metadata_, @@ -46,7 +45,7 @@ MergeTreeData::MergeTreeData( : ITableDeclaration{materialized_columns_, alias_columns_, column_defaults_}, context(context_), date_column_name(date_column_name_), sampling_expression(sampling_expression_), index_granularity(index_granularity_), - mode(mode_), sign_column(sign_column_), columns_to_sum(columns_to_sum_), + mode(mode_), merging_params(merging_params_), settings(settings_), primary_expr_ast(primary_expr_ast_ ? primary_expr_ast_->clone() : nullptr), require_part_metadata(require_part_metadata_), full_path(full_path_), columns(columns_), @@ -79,15 +78,16 @@ MergeTreeData::MergeTreeData( /// Проверяем, что столбец sign_column, если нужен, существует, и имеет тип Int8. if (mode == Collapsing) { - if (sign_column.empty()) + if (merging_params.sign_column.empty()) throw Exception("Logical error: Sign column for storage CollapsingMergeTree is empty", ErrorCodes::LOGICAL_ERROR); for (const auto & column : *columns) { - if (column.name == sign_column) + if (column.name == merging_params.sign_column) { if (!typeid_cast(column.type.get())) - throw Exception("Sign column (" + sign_column + ") for storage CollapsingMergeTree must have type Int8." + throw Exception("Sign column (" + merging_params.sign_column + ")" + " for storage CollapsingMergeTree must have type Int8." " Provided column of type " + column.type->getName() + ".", ErrorCodes::BAD_TYPE_OF_FIELD); break; } @@ -95,12 +95,12 @@ MergeTreeData::MergeTreeData( } /// Если заданы columns_to_sum, проверяем, что такие столбцы существуют. - if (!columns_to_sum.empty()) + if (!merging_params.columns_to_sum.empty()) { if (mode != Summing) throw Exception("List of columns to sum for MergeTree cannot be specified in all modes except Summing.", ErrorCodes::LOGICAL_ERROR); - for (const auto & column_to_sum : columns_to_sum) + for (const auto & column_to_sum : merging_params.columns_to_sum) if (columns->end() == std::find_if(columns->begin(), columns->end(), [&](const NameAndTypePair & name_and_type) { return column_to_sum == name_and_type.name; })) throw Exception("Column " + column_to_sum + " listed in columns to sum does not exist in table declaration."); @@ -463,7 +463,7 @@ void MergeTreeData::checkAlter(const AlterCommands & params) if (primary_expr) keys = primary_expr->getRequiredColumns(); - keys.push_back(sign_column); + keys.push_back(merging_params.sign_column); std::sort(keys.begin(), keys.end()); diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp index c833df657cf..513012e9d4b 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp @@ -425,12 +425,12 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart case MergeTreeData::Collapsing: merged_stream = std::make_unique( - src_streams, data.getSortDescription(), data.sign_column, DEFAULT_MERGE_BLOCK_SIZE); + src_streams, data.getSortDescription(), data.merging_params.sign_column, DEFAULT_MERGE_BLOCK_SIZE); break; case MergeTreeData::Summing: merged_stream = std::make_unique( - src_streams, data.getSortDescription(), data.columns_to_sum, DEFAULT_MERGE_BLOCK_SIZE); + src_streams, data.getSortDescription(), data.merging_params.columns_to_sum, DEFAULT_MERGE_BLOCK_SIZE); break; case MergeTreeData::Aggregating: @@ -438,6 +438,11 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE); break; + case MergeTreeData::Replacing: /// TODO + merged_stream = std::make_unique( + src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE); + break; + case MergeTreeData::Unsorted: merged_stream = std::make_unique(src_streams); break; @@ -680,12 +685,12 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition( case MergeTreeData::Collapsing: merged_stream = std::make_unique( - src_streams, data.getSortDescription(), data.sign_column, DEFAULT_MERGE_BLOCK_SIZE); + src_streams, data.getSortDescription(), data.merging_params.sign_column, DEFAULT_MERGE_BLOCK_SIZE); break; case MergeTreeData::Summing: merged_stream = std::make_unique( - src_streams, data.getSortDescription(), data.columns_to_sum, DEFAULT_MERGE_BLOCK_SIZE); + src_streams, data.getSortDescription(), data.merging_params.columns_to_sum, DEFAULT_MERGE_BLOCK_SIZE); break; case MergeTreeData::Aggregating: @@ -693,6 +698,11 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition( src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE); break; + case MergeTreeData::Replacing: /// TODO + merged_stream = std::make_unique( + src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE); + break; + case MergeTreeData::Unsorted: merged_stream = std::make_unique(src_streams); break; diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index e63ffbbaf5a..902a65aa23a 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -518,8 +518,8 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( std::vector add_columns = data.getPrimaryExpression()->getRequiredColumns(); column_names_to_read.insert(column_names_to_read.end(), add_columns.begin(), add_columns.end()); - if (!data.sign_column.empty()) - column_names_to_read.push_back(data.sign_column); + if (!data.merging_params.sign_column.empty()) + column_names_to_read.push_back(data.merging_params.sign_column); std::sort(column_names_to_read.begin(), column_names_to_read.end()); column_names_to_read.erase(std::unique(column_names_to_read.begin(), column_names_to_read.end()), column_names_to_read.end()); @@ -799,7 +799,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreadsFinal BlockInputStreams res; if (to_merge.size() == 1) { - if (!data.sign_column.empty()) + if (!data.merging_params.sign_column.empty()) { ExpressionActionsPtr sign_filter_expression; String sign_filter_column; @@ -822,17 +822,22 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreadsFinal break; case MergeTreeData::Collapsing: - merged = new CollapsingFinalBlockInputStream(to_merge, data.getSortDescription(), data.sign_column); + merged = new CollapsingFinalBlockInputStream(to_merge, data.getSortDescription(), data.merging_params.sign_column); break; case MergeTreeData::Summing: - merged = new SummingSortedBlockInputStream(to_merge, data.getSortDescription(), data.columns_to_sum, max_block_size); + merged = new SummingSortedBlockInputStream(to_merge, + data.getSortDescription(), data.merging_params.columns_to_sum, max_block_size); break; case MergeTreeData::Aggregating: merged = new AggregatingSortedBlockInputStream(to_merge, data.getSortDescription(), max_block_size); break; + case MergeTreeData::Replacing: /// TODO + merged = new MergingSortedBlockInputStream(to_merge, data.getSortDescription(), max_block_size); + break; + case MergeTreeData::Unsorted: throw Exception("UnsortedMergeTree doesn't support FINAL", ErrorCodes::LOGICAL_ERROR); } @@ -866,7 +871,7 @@ void MergeTreeDataSelectExecutor::createPositiveSignCondition( arguments->children.push_back(sign_ptr); arguments->children.push_back(one_ptr); - sign->name = data.sign_column; + sign->name = data.merging_params.sign_column; sign->kind = ASTIdentifier::Column; one->value = Field(static_cast(1)); diff --git a/dbms/src/Storages/StorageFactory.cpp b/dbms/src/Storages/StorageFactory.cpp index 931a5165751..c6cd07da596 100644 --- a/dbms/src/Storages/StorageFactory.cpp +++ b/dbms/src/Storages/StorageFactory.cpp @@ -544,11 +544,7 @@ For further info please read the documentation: https://clickhouse.yandex-team.r ASTPtr sampling_expression; UInt64 index_granularity; - /// Для Collapsing. - String sign_column_name; - - /// Для Summing. - Names columns_to_sum; + MergeTreeData::MergingParams merging_params; if (replicated) { @@ -573,7 +569,7 @@ For further info please read the documentation: https://clickhouse.yandex-team.r if (mode == MergeTreeData::Collapsing) { if (auto ast = typeid_cast(&*args.back())) - sign_column_name = ast->name; + merging_params.sign_column = ast->name; else throw Exception(String("Sign column name must be an unquoted string") + verbose_help, ErrorCodes::BAD_ARGUMENTS); @@ -584,7 +580,7 @@ For further info please read the documentation: https://clickhouse.yandex-team.r /// Если последний элемент - не index granularity (литерал), то это - список суммируемых столбцов. if (!typeid_cast(&*args.back())) { - columns_to_sum = extractColumnNames(args.back()); + merging_params.columns_to_sum = extractColumnNames(args.back()); args.pop_back(); } } @@ -617,14 +613,14 @@ For further info please read the documentation: https://clickhouse.yandex-team.r zookeeper_path, replica_name, attach, data_path, database_name, table_name, columns, materialized_columns, alias_columns, column_defaults, context, primary_expr_list, date_column_name, - sampling_expression, index_granularity, mode, sign_column_name, columns_to_sum, + sampling_expression, index_granularity, mode, merging_params, context.getMergeTreeSettings()); else return StorageMergeTree::create( data_path, database_name, table_name, columns, materialized_columns, alias_columns, column_defaults, context, primary_expr_list, date_column_name, - sampling_expression, index_granularity, mode, sign_column_name, columns_to_sum, + sampling_expression, index_granularity, mode, merging_params, context.getMergeTreeSettings()); } else diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index 19a3a69c2d8..fc6c3456c87 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -33,8 +33,7 @@ StorageMergeTree::StorageMergeTree( const ASTPtr & sampling_expression_, /// nullptr, если семплирование не поддерживается. size_t index_granularity_, MergeTreeData::Mode mode_, - const String & sign_column_, - const Names & columns_to_sum_, + const MergeTreeData::MergingParams & merging_params_, const MergeTreeSettings & settings_) : IStorage{materialized_columns_, alias_columns_, column_defaults_}, path(path_), database_name(database_name_), table_name(table_name_), full_path(path + escapeForFileName(table_name) + '/'), @@ -42,7 +41,7 @@ StorageMergeTree::StorageMergeTree( data(full_path, columns_, materialized_columns_, alias_columns_, column_defaults_, context_, primary_expr_ast_, date_column_name_, - sampling_expression_, index_granularity_,mode_, sign_column_, columns_to_sum_, + sampling_expression_, index_granularity_,mode_, merging_params_, settings_, database_name_ + "." + table_name, false), reader(data), writer(data), merger(data), increment(0), @@ -86,15 +85,14 @@ StoragePtr StorageMergeTree::create( const ASTPtr & sampling_expression_, size_t index_granularity_, MergeTreeData::Mode mode_, - const String & sign_column_, - const Names & columns_to_sum_, + const MergeTreeData::MergingParams & merging_params_, const MergeTreeSettings & settings_) { auto res = new StorageMergeTree{ path_, database_name_, table_name_, columns_, materialized_columns_, alias_columns_, column_defaults_, context_, primary_expr_ast_, date_column_name_, - sampling_expression_, index_granularity_, mode_, sign_column_, columns_to_sum_, settings_ + sampling_expression_, index_granularity_, mode_, merging_params_, settings_ }; StoragePtr res_ptr = res->thisPtr(); diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 92d5bfd17fc..eba0743c87f 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -190,8 +190,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( const ASTPtr & sampling_expression_, size_t index_granularity_, MergeTreeData::Mode mode_, - const String & sign_column_, - const Names & columns_to_sum_, + const MergeTreeData::MergingParams & merging_params_, const MergeTreeSettings & settings_) : IStorage{materialized_columns_, alias_columns_, column_defaults_}, context(context_), current_zookeeper(context.getZooKeeper()), database_name(database_name_), @@ -201,7 +200,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( data(full_path, columns_, materialized_columns_, alias_columns_, column_defaults_, context_, primary_expr_ast_, date_column_name_, - sampling_expression_, index_granularity_, mode_, sign_column_, columns_to_sum_, + sampling_expression_, index_granularity_, mode_, merging_params_, settings_, database_name_ + "." + table_name, true, [this] (const std::string & name) { enqueuePartForCheck(name); }), reader(data), writer(data), merger(data), fetcher(data), sharded_partition_uploader_client(*this), @@ -274,7 +273,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( unreplicated_data.reset(new MergeTreeData(unreplicated_path, columns_, materialized_columns_, alias_columns_, column_defaults_, context_, primary_expr_ast_, - date_column_name_, sampling_expression_, index_granularity_, mode_, sign_column_, columns_to_sum_, settings_, + date_column_name_, sampling_expression_, index_granularity_, mode_, merging_params_, settings_, database_name_ + "." + table_name + "[unreplicated]", false)); unreplicated_data->loadDataParts(skip_sanity_checks); @@ -333,8 +332,7 @@ StoragePtr StorageReplicatedMergeTree::create( const ASTPtr & sampling_expression_, size_t index_granularity_, MergeTreeData::Mode mode_, - const String & sign_column_, - const Names & columns_to_sum_, + const MergeTreeData::MergingParams & merging_params_, const MergeTreeSettings & settings_) { auto res = new StorageReplicatedMergeTree{ @@ -343,7 +341,7 @@ StoragePtr StorageReplicatedMergeTree::create( columns_, materialized_columns_, alias_columns_, column_defaults_, context_, primary_expr_ast_, date_column_name_, sampling_expression_, index_granularity_, mode_, - sign_column_, columns_to_sum_, settings_}; + merging_params_, settings_}; StoragePtr res_ptr = res->thisPtr(); @@ -418,7 +416,7 @@ namespace << "sampling expression: " << formattedAST(data.sampling_expression) << "\n" << "index granularity: " << data.index_granularity << "\n" << "mode: " << static_cast(data.mode) << "\n" - << "sign column: " << data.sign_column << "\n" + << "sign column: " << data.merging_params.sign_column << "\n" << "primary key: " << formattedAST(data.primary_expr_ast) << "\n"; } @@ -477,9 +475,9 @@ namespace String read_sign_column; in >> read_sign_column; - if (read_sign_column != data.sign_column) + if (read_sign_column != data.merging_params.sign_column) throw Exception("Existing table metadata in ZooKeeper differs in sign column." - " Stored in ZooKeeper: " + read_sign_column + ", local: " + data.sign_column, + " Stored in ZooKeeper: " + read_sign_column + ", local: " + data.merging_params.sign_column, ErrorCodes::METADATA_MISMATCH); in >> "\nprimary key: ";