diff --git a/dbms/include/DB/DataStreams/MergingSortedBlockInputStream.h b/dbms/include/DB/DataStreams/MergingSortedBlockInputStream.h index 352bf5f49cc..46708dcf6d0 100644 --- a/dbms/include/DB/DataStreams/MergingSortedBlockInputStream.h +++ b/dbms/include/DB/DataStreams/MergingSortedBlockInputStream.h @@ -155,7 +155,7 @@ protected: QueueWithCollation queue_with_collation; - /// Эти методы используются в Collapsing/Summing/Aggregating SortedBlockInputStream-ах. + /// Эти методы используются в Collapsing/Summing/Aggregating... SortedBlockInputStream-ах. /// Сохранить строчку, на которую указывает cursor, в row. template diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeData.h b/dbms/include/DB/Storages/MergeTree/MergeTreeData.h index 20f700957b9..d7ed68eb2d9 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeData.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeData.h @@ -186,20 +186,23 @@ public: using AlterDataPartTransactionPtr = std::unique_ptr; - /// Режим работы. См. выше. - enum Mode - { - Ordinary = 0, /// Числа сохраняются - не меняйте. - Collapsing = 1, - Summing = 2, - Aggregating = 3, - Unsorted = 4, - Replacing = 5, - }; /// Настройки для разных режимов работы. struct MergingParams { + /// Режим работы. См. выше. + enum Mode + { + Ordinary = 0, /// Числа сохраняются - не меняйте. + Collapsing = 1, + Summing = 2, + Aggregating = 3, + Unsorted = 4, + Replacing = 5, + }; + + Mode mode; + /// Для Collapsing режима. String sign_column; @@ -208,8 +211,12 @@ public: /// Для Replacing режима. Может быть пустым. String version_column; + + /// Проверить наличие и корректность типов столбцов. + void check(const NamesAndTypesList & columns) const; }; + static void doNothing(const String & name) {} /** Подцепить таблицу с соответствующим именем, по соответствующему пути (с / на конце), @@ -230,7 +237,6 @@ public: const String & date_column_name_, const ASTPtr & sampling_expression_, /// nullptr, если семплирование не поддерживается. size_t index_granularity_, - Mode mode_, const MergingParams & merging_params_, const MergeTreeSettings & settings_, const String & log_name_, @@ -247,9 +253,10 @@ public: bool supportsFinal() const { - return mode == Mode::Collapsing - || mode == Mode::Summing - || mode == Mode::Aggregating; + return merging_params.mode == MergingParams::Collapsing + || merging_params.mode == MergingParams::Summing + || merging_params.mode == MergingParams::Aggregating + || merging_params.mode == MergingParams::Replacing; } Int64 getMaxDataPartIndex(); @@ -447,7 +454,6 @@ public: const size_t index_granularity; /// Режим работы - какие дополнительные действия делать при мердже. - const Mode mode; const MergingParams merging_params; const MergeTreeSettings settings; diff --git a/dbms/include/DB/Storages/MergeTree/MergedBlockOutputStream.h b/dbms/include/DB/Storages/MergeTree/MergedBlockOutputStream.h index 4f31c37cf81..3eae9a2a2f8 100644 --- a/dbms/include/DB/Storages/MergeTree/MergedBlockOutputStream.h +++ b/dbms/include/DB/Storages/MergeTree/MergedBlockOutputStream.h @@ -311,7 +311,7 @@ public: /// Заканчиваем запись и достаем чексуммы. MergeTreeData::DataPart::Checksums checksums; - if (storage.mode != MergeTreeData::Unsorted) + if (storage.merging_params.mode != MergeTreeData::MergingParams::Unsorted) { index_stream->next(); checksums.files["primary.idx"].file_size = index_stream->count(); @@ -366,7 +366,7 @@ private: { Poco::File(part_path).createDirectories(); - if (storage.mode != MergeTreeData::Unsorted) + if (storage.merging_params.mode != MergeTreeData::MergingParams::Unsorted) { index_file_stream = new WriteBufferFromFile(part_path + "primary.idx", DBMS_DEFAULT_BUFFER_SIZE, O_TRUNC | O_CREAT | O_WRONLY); index_stream = new HashingWriteBuffer(*index_file_stream); @@ -444,7 +444,7 @@ private: /// Пишем индекс. Индекс содержит значение Primary Key для каждой index_granularity строки. for (size_t i = index_offset; i < rows; i += storage.index_granularity) { - if (storage.mode != MergeTreeData::Unsorted) + if (storage.merging_params.mode != MergeTreeData::MergingParams::Unsorted) { for (size_t j = 0, size = primary_columns.size(); j < size; ++j) { diff --git a/dbms/include/DB/Storages/StorageMergeTree.h b/dbms/include/DB/Storages/StorageMergeTree.h index 214191f18f1..2cdf61143b5 100644 --- a/dbms/include/DB/Storages/StorageMergeTree.h +++ b/dbms/include/DB/Storages/StorageMergeTree.h @@ -40,7 +40,6 @@ public: const String & date_column_name_, const ASTPtr & sampling_expression_, /// nullptr, если семплирование не поддерживается. size_t index_granularity_, - MergeTreeData::Mode mode_, const MergeTreeData::MergingParams & merging_params_, const MergeTreeSettings & settings_); @@ -183,7 +182,6 @@ private: const String & date_column_name_, const ASTPtr & sampling_expression_, /// nullptr, если семплирование не поддерживается. size_t index_granularity_, - MergeTreeData::Mode mode_, const MergeTreeData::MergingParams & merging_params_, const MergeTreeSettings & settings_); diff --git a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h index 7d8c3e13607..82a38208231 100644 --- a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h +++ b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h @@ -86,7 +86,6 @@ public: const String & date_column_name_, const ASTPtr & sampling_expression_, /// nullptr, если семплирование не поддерживается. size_t index_granularity_, - MergeTreeData::Mode mode_, const MergeTreeData::MergingParams & merging_params_, const MergeTreeSettings & settings_); @@ -317,7 +316,6 @@ private: const String & date_column_name_, const ASTPtr & sampling_expression_, size_t index_granularity_, - MergeTreeData::Mode mode_, const MergeTreeData::MergingParams & merging_params_, const MergeTreeSettings & settings_); diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index c1068acaa4c..153114748a5 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -36,7 +36,6 @@ MergeTreeData::MergeTreeData( ASTPtr & primary_expr_ast_, const String & date_column_name_, const ASTPtr & sampling_expression_, size_t index_granularity_, - Mode mode_, const MergingParams & merging_params_, const MergeTreeSettings & settings_, const String & log_name_, @@ -45,7 +44,7 @@ MergeTreeData::MergeTreeData( : ITableDeclaration{materialized_columns_, alias_columns_, column_defaults_}, context(context_), date_column_name(date_column_name_), sampling_expression(sampling_expression_), index_granularity(index_granularity_), - mode(mode_), merging_params(merging_params_), + merging_params(merging_params_), settings(settings_), primary_expr_ast(primary_expr_ast_ ? primary_expr_ast_->clone() : nullptr), require_part_metadata(require_part_metadata_), full_path(full_path_), columns(columns_), @@ -75,36 +74,7 @@ MergeTreeData::MergeTreeData( "Date column (" + date_column_name + ") does not exist in table declaration.", ErrorCodes::NO_SUCH_COLUMN_IN_TABLE}; - /// Проверяем, что столбец sign_column, если нужен, существует, и имеет тип Int8. - if (mode == Collapsing) - { - if (merging_params.sign_column.empty()) - throw Exception("Logical error: Sign column for storage CollapsingMergeTree is empty", ErrorCodes::LOGICAL_ERROR); - - for (const auto & column : *columns) - { - if (column.name == merging_params.sign_column) - { - if (!typeid_cast(column.type.get())) - throw Exception("Sign column (" + merging_params.sign_column + ")" - " for storage CollapsingMergeTree must have type Int8." - " Provided column of type " + column.type->getName() + ".", ErrorCodes::BAD_TYPE_OF_FIELD); - break; - } - } - } - - /// Если заданы columns_to_sum, проверяем, что такие столбцы существуют. - if (!merging_params.columns_to_sum.empty()) - { - if (mode != Summing) - throw Exception("List of columns to sum for MergeTree cannot be specified in all modes except Summing.", ErrorCodes::LOGICAL_ERROR); - - for (const auto & column_to_sum : merging_params.columns_to_sum) - if (columns->end() == std::find_if(columns->begin(), columns->end(), - [&](const NameAndTypePair & name_and_type) { return column_to_sum == name_and_type.name; })) - throw Exception("Column " + column_to_sum + " listed in columns to sum does not exist in table declaration."); - } + merging_params.check(*columns); /// создаём директорию, если её нет Poco::File(full_path).createDirectories(); @@ -130,10 +100,72 @@ MergeTreeData::MergeTreeData( for (size_t i = 0; i < primary_key_size; ++i) primary_key_data_types[i] = primary_key_sample.unsafeGetByPosition(i).type; } - else if (mode != Unsorted) + else if (merging_params.mode != MergingParams::Unsorted) throw Exception("Primary key could be empty only for UnsortedMergeTree", ErrorCodes::BAD_ARGUMENTS); } + +void MergeTreeData::MergingParams::check(const NamesAndTypesList & columns) const +{ + /// Проверяем, что столбец sign_column, если нужен, существует, и имеет тип Int8. + if (mode == MergingParams::Collapsing) + { + if (sign_column.empty()) + throw Exception("Logical error: Sign column for storage CollapsingMergeTree is empty", ErrorCodes::LOGICAL_ERROR); + + for (const auto & column : columns) + { + if (column.name == sign_column) + { + if (!typeid_cast(column.type.get())) + throw Exception("Sign column (" + sign_column + ")" + " for storage CollapsingMergeTree must have type Int8." + " Provided column of type " + column.type->getName() + ".", ErrorCodes::BAD_TYPE_OF_FIELD); + break; + } + } + } + else if (!sign_column.empty()) + throw Exception("Sign column for MergeTree cannot be specified in all modes except Collapsing.", ErrorCodes::LOGICAL_ERROR); + + /// Если заданы columns_to_sum, проверяем, что такие столбцы существуют. + if (!columns_to_sum.empty()) + { + if (mode != MergingParams::Summing) + throw Exception("List of columns to sum for MergeTree cannot be specified in all modes except Summing.", + ErrorCodes::LOGICAL_ERROR); + + for (const auto & column_to_sum : columns_to_sum) + if (columns.end() == std::find_if(columns.begin(), columns.end(), + [&](const NameAndTypePair & name_and_type) { return column_to_sum == name_and_type.name; })) + throw Exception("Column " + column_to_sum + " listed in columns to sum does not exist in table declaration."); + } + + /// Проверяем, что столбец version_column, если допустим, имеет тип целого беззнакового числа. + if (!version_column.empty()) + { + if (mode != MergingParams::Replacing) + throw Exception("Version column for MergeTree cannot be specified in all modes except Replacing.", + ErrorCodes::LOGICAL_ERROR); + + for (const auto & column : columns) + { + if (column.name == version_column) + { + if (!typeid_cast(column.type.get()) + && !typeid_cast(column.type.get()) + && !typeid_cast(column.type.get()) + && !typeid_cast(column.type.get())) + throw Exception("Version column (" + version_column + ")" + " for storage ReplacingMergeTree must have type of UInt family." + " Provided column of type " + column.type->getName() + ".", ErrorCodes::BAD_TYPE_OF_FIELD); + break; + } + } + } +} + + Int64 MergeTreeData::getMaxDataPartIndex() { std::lock_guard lock_all(all_data_parts_mutex); @@ -147,16 +179,17 @@ Int64 MergeTreeData::getMaxDataPartIndex() std::string MergeTreeData::getModePrefix() const { - switch (mode) + switch (merging_params.mode) { - case Ordinary: return ""; - case Collapsing: return "Collapsing"; - case Summing: return "Summing"; - case Aggregating: return "Aggregating"; - case Unsorted: return "Unsorted"; + case MergingParams::Ordinary: return ""; + case MergingParams::Collapsing: return "Collapsing"; + case MergingParams::Summing: return "Summing"; + case MergingParams::Aggregating: return "Aggregating"; + case MergingParams::Unsorted: return "Unsorted"; + case MergingParams::Replacing: return "Replacing"; default: - throw Exception("Unknown mode of operation for MergeTreeData: " + toString(mode), ErrorCodes::LOGICAL_ERROR); + throw Exception("Unknown mode of operation for MergeTreeData: " + toString(merging_params.mode), ErrorCodes::LOGICAL_ERROR); } } diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp index 513012e9d4b..115dd7486a3 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp @@ -402,7 +402,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart __sync_add_and_fetch(&merge_entry->bytes_read_uncompressed, value.bytes); }); - if (data.mode != MergeTreeData::Unsorted) + if (data.merging_params.mode != MergeTreeData::MergingParams::Unsorted) src_streams.push_back(new MaterializingBlockInputStream{ new ExpressionBlockInputStream(input.release(), data.getPrimaryExpression())}); else @@ -416,39 +416,39 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart /// то есть (примерного) возрастания времени вставки. std::unique_ptr merged_stream; - switch (data.mode) + switch (data.merging_params.mode) { - case MergeTreeData::Ordinary: + case MergeTreeData::MergingParams::Ordinary: merged_stream = std::make_unique( src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE); break; - case MergeTreeData::Collapsing: + case MergeTreeData::MergingParams::Collapsing: merged_stream = std::make_unique( src_streams, data.getSortDescription(), data.merging_params.sign_column, DEFAULT_MERGE_BLOCK_SIZE); break; - case MergeTreeData::Summing: + case MergeTreeData::MergingParams::Summing: merged_stream = std::make_unique( src_streams, data.getSortDescription(), data.merging_params.columns_to_sum, DEFAULT_MERGE_BLOCK_SIZE); break; - case MergeTreeData::Aggregating: + case MergeTreeData::MergingParams::Aggregating: merged_stream = std::make_unique( src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE); break; - case MergeTreeData::Replacing: /// TODO + case MergeTreeData::MergingParams::Replacing: /// TODO merged_stream = std::make_unique( src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE); break; - case MergeTreeData::Unsorted: + case MergeTreeData::MergingParams::Unsorted: merged_stream = std::make_unique(src_streams); break; default: - throw Exception("Unknown mode of operation for MergeTreeData: " + toString(data.mode), ErrorCodes::LOGICAL_ERROR); + throw Exception("Unknown mode of operation for MergeTreeData: " + toString(data.merging_params.mode), ErrorCodes::LOGICAL_ERROR); } String new_part_tmp_path = data.getFullPath() + "tmp_" + merged_name + "/"; @@ -612,7 +612,7 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition( __sync_add_and_fetch(&merge_entry->bytes_read_uncompressed, value.bytes); }); - if (data.mode != MergeTreeData::Unsorted) + if (data.merging_params.mode != MergeTreeData::MergingParams::Unsorted) src_streams.push_back(new MaterializingBlockInputStream{ new ExpressionBlockInputStream(input.release(), data.getPrimaryExpression())}); else @@ -676,39 +676,39 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition( /// то есть (примерного) возрастания времени вставки. std::unique_ptr merged_stream; - switch (data.mode) + switch (data.merging_params.mode) { - case MergeTreeData::Ordinary: + case MergeTreeData::MergingParams::Ordinary: merged_stream = std::make_unique( src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE); break; - case MergeTreeData::Collapsing: + case MergeTreeData::MergingParams::Collapsing: merged_stream = std::make_unique( src_streams, data.getSortDescription(), data.merging_params.sign_column, DEFAULT_MERGE_BLOCK_SIZE); break; - case MergeTreeData::Summing: + case MergeTreeData::MergingParams::Summing: merged_stream = std::make_unique( src_streams, data.getSortDescription(), data.merging_params.columns_to_sum, DEFAULT_MERGE_BLOCK_SIZE); break; - case MergeTreeData::Aggregating: + case MergeTreeData::MergingParams::Aggregating: merged_stream = std::make_unique( src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE); break; - case MergeTreeData::Replacing: /// TODO + case MergeTreeData::MergingParams::Replacing: /// TODO merged_stream = std::make_unique( src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE); break; - case MergeTreeData::Unsorted: + case MergeTreeData::MergingParams::Unsorted: merged_stream = std::make_unique(src_streams); break; default: - throw Exception("Unknown mode of operation for MergeTreeData: " + toString(data.mode), ErrorCodes::LOGICAL_ERROR); + throw Exception("Unknown mode of operation for MergeTreeData: " + toString(data.merging_params.mode), ErrorCodes::LOGICAL_ERROR); } merged_stream->readPrefix(); diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 902a65aa23a..f330efd12ba 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -485,7 +485,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( { RangesInDataPart ranges(part, (*inout_part_index)++); - if (data.mode != MergeTreeData::Unsorted) + if (data.merging_params.mode != MergeTreeData::MergingParams::Unsorted) ranges.ranges = markRangesFromPKRange(part->index, key_condition, settings); else ranges.ranges = MarkRanges{MarkRange{0, part->size}}; @@ -815,30 +815,30 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreadsFinal { BlockInputStreamPtr merged; - switch (data.mode) + switch (data.merging_params.mode) { - case MergeTreeData::Ordinary: + case MergeTreeData::MergingParams::Ordinary: merged = new MergingSortedBlockInputStream(to_merge, data.getSortDescription(), max_block_size); break; - case MergeTreeData::Collapsing: + case MergeTreeData::MergingParams::Collapsing: merged = new CollapsingFinalBlockInputStream(to_merge, data.getSortDescription(), data.merging_params.sign_column); break; - case MergeTreeData::Summing: + case MergeTreeData::MergingParams::Summing: merged = new SummingSortedBlockInputStream(to_merge, data.getSortDescription(), data.merging_params.columns_to_sum, max_block_size); break; - case MergeTreeData::Aggregating: + case MergeTreeData::MergingParams::Aggregating: merged = new AggregatingSortedBlockInputStream(to_merge, data.getSortDescription(), max_block_size); break; - case MergeTreeData::Replacing: /// TODO + case MergeTreeData::MergingParams::Replacing: /// TODO merged = new MergingSortedBlockInputStream(to_merge, data.getSortDescription(), max_block_size); break; - case MergeTreeData::Unsorted: + case MergeTreeData::MergingParams::Unsorted: throw Exception("UnsortedMergeTree doesn't support FINAL", ErrorCodes::LOGICAL_ERROR); } diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp index d780626fe72..1b9d105754e 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp @@ -99,7 +99,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithDa new_data_part->is_temp = true; /// Если для сортировки надо вычислить некоторые столбцы - делаем это. - if (data.mode != MergeTreeData::Unsorted) + if (data.merging_params.mode != MergeTreeData::MergingParams::Unsorted) data.getPrimaryExpression()->execute(block); SortDescription sort_descr = data.getSortDescription(); @@ -107,7 +107,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithDa /// Сортируем. IColumn::Permutation * perm_ptr = nullptr; IColumn::Permutation perm; - if (data.mode != MergeTreeData::Unsorted) + if (data.merging_params.mode != MergeTreeData::MergingParams::Unsorted) { if (!isAlreadySorted(block, sort_descr)) { diff --git a/dbms/src/Storages/StorageFactory.cpp b/dbms/src/Storages/StorageFactory.cpp index c6cd07da596..61d8a216e4e 100644 --- a/dbms/src/Storages/StorageFactory.cpp +++ b/dbms/src/Storages/StorageFactory.cpp @@ -352,12 +352,13 @@ StoragePtr StorageFactory::get( } else if (endsWith(name, "MergeTree")) { - /** Движки [Replicated][|Summing|Collapsing|Aggregating|Unsorted]MergeTree (2 * 5 комбинаций) + /** Движки [Replicated][|Summing|Collapsing|Aggregating|Unsorted|Replacing]MergeTree (2 * 6 комбинаций) * В качестве аргумента для движка должно быть указано: * - (для Replicated) Путь к таблице в ZooKeeper * - (для Replicated) Имя реплики в ZooKeeper * - имя столбца с датой; - * - (не обязательно) выражение для семплирования (запрос с SAMPLE x будет выбирать строки, у которых в этом столбце значение меньше, чем x*UINT32_MAX); + * - (не обязательно) выражение для семплирования + * (запрос с SAMPLE x будет выбирать строки, у которых в этом столбце значение меньше, чем x * UINT32_MAX); * - выражение для сортировки (либо скалярное выражение, либо tuple из нескольких); * - index_granularity; * - (для Collapsing) имя столбца, содержащего тип строчки с изменением "визита" (принимающего значения 1 и -1). @@ -440,16 +441,17 @@ For further info please read the documentation: https://clickhouse.yandex-team.r if (replicated) name_part = name_part.substr(strlen("Replicated")); - MergeTreeData::Mode mode = MergeTreeData::Ordinary; + MergeTreeData::MergingParams merging_params; + merging_params.mode = MergeTreeData::MergingParams::Ordinary; if (name_part == "Collapsing") - mode = MergeTreeData::Collapsing; + merging_params.mode = MergeTreeData::MergingParams::Collapsing; else if (name_part == "Summing") - mode = MergeTreeData::Summing; + merging_params.mode = MergeTreeData::MergingParams::Summing; else if (name_part == "Aggregating") - mode = MergeTreeData::Aggregating; + merging_params.mode = MergeTreeData::MergingParams::Aggregating; else if (name_part == "Unsorted") - mode = MergeTreeData::Unsorted; + merging_params.mode = MergeTreeData::MergingParams::Unsorted; else if (!name_part.empty()) throw Exception("Unknown storage " + name + verbose_help, ErrorCodes::UNKNOWN_STORAGE); @@ -461,9 +463,9 @@ For further info please read the documentation: https://clickhouse.yandex-team.r args = typeid_cast(*args_func.at(0)).children; /// NOTE Слегка запутанно. - size_t num_additional_params = (replicated ? 2 : 0) + (mode == MergeTreeData::Collapsing); + size_t num_additional_params = (replicated ? 2 : 0) + (merging_params.mode == MergeTreeData::MergingParams::Collapsing); - if (mode == MergeTreeData::Unsorted + if (merging_params.mode == MergeTreeData::MergingParams::Unsorted && args.size() != num_additional_params + 2) { String params; @@ -482,7 +484,8 @@ For further info please read the documentation: https://clickhouse.yandex-team.r ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); } - if (mode != MergeTreeData::Summing && mode != MergeTreeData::Unsorted + if (merging_params.mode != MergeTreeData::MergingParams::Summing + && merging_params.mode != MergeTreeData::MergingParams::Unsorted && args.size() != num_additional_params + 3 && args.size() != num_additional_params + 4) { @@ -499,7 +502,7 @@ For further info please read the documentation: https://clickhouse.yandex-team.r "\nprimary key expression," "\nindex granularity\n"; - if (mode == MergeTreeData::Collapsing) + if (merging_params.mode == MergeTreeData::MergingParams::Collapsing) params += ", sign column"; throw Exception("Storage " + name + " requires " @@ -508,7 +511,7 @@ For further info please read the documentation: https://clickhouse.yandex-team.r ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); } - if (mode == MergeTreeData::Summing + if (merging_params.mode == MergeTreeData::MergingParams::Summing && args.size() != num_additional_params + 3 && args.size() != num_additional_params + 4 && args.size() != num_additional_params + 5) @@ -544,8 +547,6 @@ For further info please read the documentation: https://clickhouse.yandex-team.r ASTPtr sampling_expression; UInt64 index_granularity; - MergeTreeData::MergingParams merging_params; - if (replicated) { auto ast = typeid_cast(&*args[0]); @@ -566,7 +567,7 @@ For further info please read the documentation: https://clickhouse.yandex-team.r args.erase(args.begin(), args.begin() + 2); } - if (mode == MergeTreeData::Collapsing) + if (merging_params.mode == MergeTreeData::MergingParams::Collapsing) { if (auto ast = typeid_cast(&*args.back())) merging_params.sign_column = ast->name; @@ -575,7 +576,7 @@ For further info please read the documentation: https://clickhouse.yandex-team.r args.pop_back(); } - else if (mode == MergeTreeData::Summing) + else if (merging_params.mode == MergeTreeData::MergingParams::Summing) { /// Если последний элемент - не index granularity (литерал), то это - список суммируемых столбцов. if (!typeid_cast(&*args.back())) @@ -599,7 +600,7 @@ For further info please read the documentation: https://clickhouse.yandex-team.r else throw Exception(String("Date column name must be an unquoted string") + verbose_help, ErrorCodes::BAD_ARGUMENTS); - if (mode != MergeTreeData::Unsorted) + if (merging_params.mode != MergeTreeData::MergingParams::Unsorted) primary_expr_list = extractPrimaryKey(args[1]); auto ast = typeid_cast(&*args.back()); @@ -613,14 +614,14 @@ For further info please read the documentation: https://clickhouse.yandex-team.r zookeeper_path, replica_name, attach, data_path, database_name, table_name, columns, materialized_columns, alias_columns, column_defaults, context, primary_expr_list, date_column_name, - sampling_expression, index_granularity, mode, merging_params, + sampling_expression, index_granularity, merging_params, context.getMergeTreeSettings()); else return StorageMergeTree::create( data_path, database_name, table_name, columns, materialized_columns, alias_columns, column_defaults, context, primary_expr_list, date_column_name, - sampling_expression, index_granularity, mode, merging_params, + sampling_expression, index_granularity, merging_params, context.getMergeTreeSettings()); } else diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index fc6c3456c87..d21931a9d63 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -32,7 +32,6 @@ StorageMergeTree::StorageMergeTree( const String & date_column_name_, const ASTPtr & sampling_expression_, /// nullptr, если семплирование не поддерживается. size_t index_granularity_, - MergeTreeData::Mode mode_, const MergeTreeData::MergingParams & merging_params_, const MergeTreeSettings & settings_) : IStorage{materialized_columns_, alias_columns_, column_defaults_}, @@ -41,7 +40,7 @@ StorageMergeTree::StorageMergeTree( data(full_path, columns_, materialized_columns_, alias_columns_, column_defaults_, context_, primary_expr_ast_, date_column_name_, - sampling_expression_, index_granularity_,mode_, merging_params_, + sampling_expression_, index_granularity_, merging_params_, settings_, database_name_ + "." + table_name, false), reader(data), writer(data), merger(data), increment(0), @@ -84,7 +83,6 @@ StoragePtr StorageMergeTree::create( const String & date_column_name_, const ASTPtr & sampling_expression_, size_t index_granularity_, - MergeTreeData::Mode mode_, const MergeTreeData::MergingParams & merging_params_, const MergeTreeSettings & settings_) { @@ -92,7 +90,7 @@ StoragePtr StorageMergeTree::create( path_, database_name_, table_name_, columns_, materialized_columns_, alias_columns_, column_defaults_, context_, primary_expr_ast_, date_column_name_, - sampling_expression_, index_granularity_, mode_, merging_params_, settings_ + sampling_expression_, index_granularity_, merging_params_, settings_ }; StoragePtr res_ptr = res->thisPtr(); diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index eba0743c87f..99aa0a97120 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -189,7 +189,6 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( const String & date_column_name_, const ASTPtr & sampling_expression_, size_t index_granularity_, - MergeTreeData::Mode mode_, const MergeTreeData::MergingParams & merging_params_, const MergeTreeSettings & settings_) : IStorage{materialized_columns_, alias_columns_, column_defaults_}, context(context_), @@ -200,7 +199,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( data(full_path, columns_, materialized_columns_, alias_columns_, column_defaults_, context_, primary_expr_ast_, date_column_name_, - sampling_expression_, index_granularity_, mode_, merging_params_, + sampling_expression_, index_granularity_, merging_params_, settings_, database_name_ + "." + table_name, true, [this] (const std::string & name) { enqueuePartForCheck(name); }), reader(data), writer(data), merger(data), fetcher(data), sharded_partition_uploader_client(*this), @@ -273,7 +272,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( unreplicated_data.reset(new MergeTreeData(unreplicated_path, columns_, materialized_columns_, alias_columns_, column_defaults_, context_, primary_expr_ast_, - date_column_name_, sampling_expression_, index_granularity_, mode_, merging_params_, settings_, + date_column_name_, sampling_expression_, index_granularity_, merging_params_, settings_, database_name_ + "." + table_name + "[unreplicated]", false)); unreplicated_data->loadDataParts(skip_sanity_checks); @@ -331,7 +330,6 @@ StoragePtr StorageReplicatedMergeTree::create( const String & date_column_name_, const ASTPtr & sampling_expression_, size_t index_granularity_, - MergeTreeData::Mode mode_, const MergeTreeData::MergingParams & merging_params_, const MergeTreeSettings & settings_) { @@ -340,7 +338,7 @@ StoragePtr StorageReplicatedMergeTree::create( path_, database_name_, name_, columns_, materialized_columns_, alias_columns_, column_defaults_, context_, primary_expr_ast_, date_column_name_, - sampling_expression_, index_granularity_, mode_, + sampling_expression_, index_granularity_, merging_params_, settings_}; StoragePtr res_ptr = res->thisPtr(); @@ -415,7 +413,7 @@ namespace << "date column: " << data.date_column_name << "\n" << "sampling expression: " << formattedAST(data.sampling_expression) << "\n" << "index granularity: " << data.index_granularity << "\n" - << "mode: " << static_cast(data.mode) << "\n" + << "mode: " << static_cast(data.merging_params.mode) << "\n" << "sign column: " << data.merging_params.sign_column << "\n" << "primary key: " << formattedAST(data.primary_expr_ast) << "\n"; } @@ -466,9 +464,10 @@ namespace int read_mode = 0; in >> read_mode; - if (read_mode != static_cast(data.mode)) + if (read_mode != static_cast(data.merging_params.mode)) throw Exception("Existing table metadata in ZooKeeper differs in mode of merge operation." - " Stored in ZooKeeper: " + DB::toString(read_mode) + ", local: " + DB::toString(static_cast(data.mode)), + " Stored in ZooKeeper: " + DB::toString(read_mode) + ", local: " + + DB::toString(static_cast(data.merging_params.mode)), ErrorCodes::METADATA_MISMATCH); in >> "\nsign column: ";