From 6a02b99fafc5097bf9ffafce76f8eeaa77b4df86 Mon Sep 17 00:00:00 2001 From: alesapin Date: Mon, 17 Feb 2020 18:44:13 +0300 Subject: [PATCH] Slightly refactor merger mutator --- dbms/src/Core/NamesAndTypes.h | 2 + .../AddingDefaultsBlockInputStream.cpp | 2 +- dbms/src/DataStreams/TTLBlockInputStream.cpp | 2 +- dbms/src/Interpreters/addMissingDefaults.cpp | 2 +- ...faults.cpp => inplaceBlockConversions.cpp} | 2 +- ...ngDefaults.h => inplaceBlockConversions.h} | 2 + .../MergeTree/MergeTreeDataMergerMutator.cpp | 213 ++++++++++-------- .../MergeTree/MergeTreeDataMergerMutator.h | 30 ++- .../Storages/MergeTree/MergeTreeReader.cpp | 2 +- .../Storages/StorageReplicatedMergeTree.cpp | 1 - 10 files changed, 149 insertions(+), 109 deletions(-) rename dbms/src/Interpreters/{evaluateMissingDefaults.cpp => inplaceBlockConversions.cpp} (99%) rename dbms/src/Interpreters/{evaluateMissingDefaults.h => inplaceBlockConversions.h} (85%) diff --git a/dbms/src/Core/NamesAndTypes.h b/dbms/src/Core/NamesAndTypes.h index 55f3d989dbc..28567fed3e3 100644 --- a/dbms/src/Core/NamesAndTypes.h +++ b/dbms/src/Core/NamesAndTypes.h @@ -73,8 +73,10 @@ public: /// Unlike `filter`, returns columns in the order in which they go in `names`. NamesAndTypesList addTypes(const Names & names) const; + /// Check that column contains in list bool contains(const String & name) const; + /// Try to get column by name, return empty optional if column not found std::optional tryGetByName(const std::string & name) const; }; diff --git a/dbms/src/DataStreams/AddingDefaultsBlockInputStream.cpp b/dbms/src/DataStreams/AddingDefaultsBlockInputStream.cpp index 112afe61183..3cb0f436355 100644 --- a/dbms/src/DataStreams/AddingDefaultsBlockInputStream.cpp +++ b/dbms/src/DataStreams/AddingDefaultsBlockInputStream.cpp @@ -1,7 +1,7 @@ #include #include #include -#include +#include #include #include diff --git a/dbms/src/DataStreams/TTLBlockInputStream.cpp b/dbms/src/DataStreams/TTLBlockInputStream.cpp index c08abba3bdf..b34b361910d 100644 --- a/dbms/src/DataStreams/TTLBlockInputStream.cpp +++ b/dbms/src/DataStreams/TTLBlockInputStream.cpp @@ -1,6 +1,6 @@ #include #include -#include +#include #include #include diff --git a/dbms/src/Interpreters/addMissingDefaults.cpp b/dbms/src/Interpreters/addMissingDefaults.cpp index 10318ee89cf..cbe6811ea9e 100644 --- a/dbms/src/Interpreters/addMissingDefaults.cpp +++ b/dbms/src/Interpreters/addMissingDefaults.cpp @@ -4,7 +4,7 @@ #include #include #include -#include +#include #include #include diff --git a/dbms/src/Interpreters/evaluateMissingDefaults.cpp b/dbms/src/Interpreters/inplaceBlockConversions.cpp similarity index 99% rename from dbms/src/Interpreters/evaluateMissingDefaults.cpp rename to dbms/src/Interpreters/inplaceBlockConversions.cpp index 100687833ad..2b3f3d84ebc 100644 --- a/dbms/src/Interpreters/evaluateMissingDefaults.cpp +++ b/dbms/src/Interpreters/inplaceBlockConversions.cpp @@ -1,4 +1,4 @@ -#include "evaluateMissingDefaults.h" +#include "inplaceBlockConversions.h" #include #include diff --git a/dbms/src/Interpreters/evaluateMissingDefaults.h b/dbms/src/Interpreters/inplaceBlockConversions.h similarity index 85% rename from dbms/src/Interpreters/evaluateMissingDefaults.h rename to dbms/src/Interpreters/inplaceBlockConversions.h index 18156388b97..b86a23bde13 100644 --- a/dbms/src/Interpreters/evaluateMissingDefaults.h +++ b/dbms/src/Interpreters/inplaceBlockConversions.h @@ -12,6 +12,8 @@ class Context; class NamesAndTypesList; struct ColumnDefault; +/// Adds missing defaults to block according to required_columns +/// using column_defaults map void evaluateMissingDefaults(Block & block, const NamesAndTypesList & required_columns, const std::unordered_map & column_defaults, diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 66ff533ded6..573d5b84d30 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -54,6 +54,7 @@ namespace DB namespace ErrorCodes { extern const int ABORTED; + extern const int UNKNOWN_MUTATION_COMMAND; } @@ -984,25 +985,25 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor Block updated_header; std::optional interpreter; - std::vector for_interpreter; - std::vector for_file_renames; + std::vector for_interpreter, for_file_renames; splitMutationCommands(source_part, commands_for_part, for_interpreter, for_file_renames); + UInt64 watch_prev_elapsed = 0; + MergeStageProgress stage_progress(1.0); if (!for_interpreter.empty()) { interpreter.emplace(storage_from_source_part, for_interpreter, context_for_reading, true); in = interpreter->execute(table_lock_holder); updated_header = interpreter->getUpdatedHeader(); + in->setProgressCallback(MergeProgressCallback(merge_entry, watch_prev_elapsed, stage_progress)); } NamesAndTypesList all_columns = data.getColumns().getAllPhysical(); const auto data_settings = data.getSettings(); + /// Don't change granularity type while mutating subset of columns + String mrk_extension = source_part->index_granularity_info.is_adaptive ? getAdaptiveMrkExtension() : getNonAdaptiveMrkExtension(); - UInt64 watch_prev_elapsed = 0; - MergeStageProgress stage_progress(1.0); - if (in) - in->setProgressCallback(MergeProgressCallback(merge_entry, watch_prev_elapsed, stage_progress)); if (updated_header.columns() == all_columns.size()) { @@ -1076,76 +1077,14 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor std::make_shared(in, indices_recalc_expr)); } - NameSet files_to_skip = {"checksums.txt", "columns.txt"}; - - /// Don't change granularity type while mutating subset of columns - auto mrk_extension = source_part->index_granularity_info.is_adaptive ? getAdaptiveMrkExtension() : getNonAdaptiveMrkExtension(); - - /// Skip updated files - for (const auto & entry : updated_header) - { - IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path) - { - String stream_name = IDataType::getFileNameForStream(entry.name, substream_path); - files_to_skip.insert(stream_name + ".bin"); - files_to_skip.insert(stream_name + mrk_extension); - }; - - IDataType::SubstreamPath stream_path; - entry.type->enumerateStreams(callback, stream_path); - } - for (const auto & index : indices_to_recalc) - { - files_to_skip.insert(index->getFileName() + ".idx"); - files_to_skip.insert(index->getFileName() + mrk_extension); - } - - /// Collect counts for shared streams of different columns. As an example, Nested columns have shared stream with array sizes. - std::map stream_counts; - for (const NameAndTypePair & column : source_part->columns) - { - column.type->enumerateStreams( - [&](const IDataType::SubstreamPath & substream_path) - { - ++stream_counts[IDataType::getFileNameForStream(column.name, substream_path)]; - }, - {}); - } - - - std::unordered_set remove_files; - /// Remove old indices - for (const auto & command : for_file_renames) - { - if (command.type == MutationCommand::Type::DROP_INDEX) - { - remove_files.emplace("skp_idx_" + command.column_name + ".idx"); - remove_files.emplace("skp_idx_" + command.column_name + mrk_extension); - } - else if (command.type == MutationCommand::Type::DROP_COLUMN) - { - IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path) - { - String stream_name = IDataType::getFileNameForStream(command.column_name, substream_path); - /// Delete files if they are no longer shared with another column. - if (--stream_counts[stream_name] == 0) - { - remove_files.emplace(stream_name + ".bin"); - remove_files.emplace(stream_name + mrk_extension); - } - }; - - IDataType::SubstreamPath stream_path; - auto column = source_part->columns.tryGetByName(command.column_name); - if (column) - column->type->enumerateStreams(callback, stream_path); - } - } + NameSet files_to_skip = collectFilesToSkip(updated_header, indices_to_recalc, mrk_extension); + NameSet files_to_remove = collectFilesToRemove(source_part, for_file_renames, mrk_extension); Poco::DirectoryIterator dir_end; + /// Create hardlinks for unchanged files for (Poco::DirectoryIterator dir_it(source_part->getFullPath()); dir_it != dir_end; ++dir_it) { - if (files_to_skip.count(dir_it.name()) || remove_files.count(dir_it.name())) + if (files_to_skip.count(dir_it.name()) || files_to_remove.count(dir_it.name())) continue; Poco::Path destination(new_part_tmp_path); @@ -1192,41 +1131,22 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor new_data_part->checksums.add(std::move(changed_checksums)); } - for (const String & removed_file : remove_files) + for (const String & removed_file : files_to_remove) if (new_data_part->checksums.files.count(removed_file)) new_data_part->checksums.files.erase(removed_file); - { /// Write file with checksums. WriteBufferFromFile out_checksums(new_part_tmp_path + "checksums.txt", 4096); new_data_part->checksums.write(out_checksums); - } + } /// close fd - /// Write the columns list of the resulting part in the same order as all_columns. - new_data_part->columns = all_columns; - Names source_column_names = source_part->columns.getNames(); - NameSet source_columns_name_set(source_column_names.begin(), source_column_names.end()); - for (auto it = new_data_part->columns.begin(); it != new_data_part->columns.end();) - { - if (updated_header.has(it->name)) - { - auto updated_type = updated_header.getByName(it->name).type; - if (updated_type != it->type) - it->type = updated_type; - ++it; - } - else if (source_columns_name_set.count(it->name)) - { - ++it; - } - else - it = new_data_part->columns.erase(it); - } + + new_data_part->columns = getColumnsForNewDataPart(source_part, updated_header, all_columns); { /// Write a file with a description of columns. WriteBufferFromFile out_columns(new_part_tmp_path + "columns.txt", 4096); new_data_part->columns.writeText(out_columns); - } + } /// close new_data_part->rows_count = source_part->rows_count; new_data_part->index_granularity = source_part->index_granularity; @@ -1338,7 +1258,7 @@ void MergeTreeDataMergerMutator::splitMutationCommands( MergeTreeData::DataPartPtr part, const std::vector & commands, std::vector & for_interpreter, - std::vector & for_file_renames) + std::vector & for_file_renames) const { for (const auto & command : commands) { @@ -1364,4 +1284,103 @@ void MergeTreeDataMergerMutator::splitMutationCommands( } } + +NameSet MergeTreeDataMergerMutator::collectFilesToRemove( + MergeTreeData::DataPartPtr source_part, const std::vector & commands_for_removes, const String & mrk_extension) const +{ + /// Collect counts for shared streams of different columns. As an example, Nested columns have shared stream with array sizes. + std::map stream_counts; + for (const NameAndTypePair & column : source_part->columns) + { + column.type->enumerateStreams( + [&](const IDataType::SubstreamPath & substream_path) + { + ++stream_counts[IDataType::getFileNameForStream(column.name, substream_path)]; + }, + {}); + } + + + NameSet remove_files; + /// Remove old indices + for (const auto & command : commands_for_removes) + { + if (command.type == MutationCommand::Type::DROP_INDEX) + { + remove_files.emplace("skp_idx_" + command.column_name + ".idx"); + remove_files.emplace("skp_idx_" + command.column_name + mrk_extension); + } + else if (command.type == MutationCommand::Type::DROP_COLUMN) + { + IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path) + { + String stream_name = IDataType::getFileNameForStream(command.column_name, substream_path); + /// Delete files if they are no longer shared with another column. + if (--stream_counts[stream_name] == 0) + { + remove_files.emplace(stream_name + ".bin"); + remove_files.emplace(stream_name + mrk_extension); + } + }; + + IDataType::SubstreamPath stream_path; + auto column = source_part->columns.tryGetByName(command.column_name); + if (column) + column->type->enumerateStreams(callback, stream_path); + } + } + return remove_files; +} + +NameSet MergeTreeDataMergerMutator::collectFilesToSkip( + const Block & updated_header, const std::set & indices_to_recalc, const String & mrk_extension) const +{ + NameSet files_to_skip = {"checksums.txt", "columns.txt"}; + + /// Skip updated files + for (const auto & entry : updated_header) + { + IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path) { + String stream_name = IDataType::getFileNameForStream(entry.name, substream_path); + files_to_skip.insert(stream_name + ".bin"); + files_to_skip.insert(stream_name + mrk_extension); + }; + + IDataType::SubstreamPath stream_path; + entry.type->enumerateStreams(callback, stream_path); + } + for (const auto & index : indices_to_recalc) + { + files_to_skip.insert(index->getFileName() + ".idx"); + files_to_skip.insert(index->getFileName() + mrk_extension); + } + + return files_to_skip; +} + + +NamesAndTypesList MergeTreeDataMergerMutator::getColumnsForNewDataPart( + MergeTreeData::DataPartPtr source_part, const Block & updated_header, NamesAndTypesList all_columns) const +{ + Names source_column_names = source_part->columns.getNames(); + NameSet source_columns_name_set(source_column_names.begin(), source_column_names.end()); + for (auto it = all_columns.begin(); it != all_columns.end();) + { + if (updated_header.has(it->name)) + { + auto updated_type = updated_header.getByName(it->name).type; + if (updated_type != it->type) + it->type = updated_type; + ++it; + } + else if (source_columns_name_set.count(it->name)) + { + ++it; + } + else + it = all_columns.erase(it); + } + return all_columns; +} + } diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h index 08276e051a1..952b12155d9 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h @@ -114,11 +114,6 @@ public: const MergeTreeData::DataPartsVector & parts, MergeTreeData::Transaction * out_transaction = nullptr); - void splitMutationCommands( - MergeTreeData::DataPartPtr part, - const std::vector & commands, - std::vector & for_interpreter, - std::vector & for_file_renames); /// The approximate amount of disk space needed for merge or mutation. With a surplus. static size_t estimateNeededDiskSpace(const MergeTreeData::DataPartsVector & source_parts); @@ -128,7 +123,30 @@ private: */ MergeTreeData::DataPartsVector selectAllPartsFromPartition(const String & partition_id); -public: + /** Split mutation commands into two parts: + * First part should be executed by mutations interpreter. + * Other is just simple drop/renames, so they can be executed without interpreter. + */ + void splitMutationCommands( + MergeTreeData::DataPartPtr part, + const std::vector & commands, + std::vector & for_interpreter, + std::vector & for_file_renames) const; + + + /// Apply commands to source_part i.e. remove some columns in source_part + /// and return set of files, that have to be removed from filesystem and checksums + NameSet collectFilesToRemove(MergeTreeData::DataPartPtr source_part, const std::vector & commands_for_removes, const String & mrk_extension) const; + + /// Files, that we don't need to remove and don't need to hardlink, for example columns.txt and checksums.txt. + /// Because we will generate new versions of them after we perform mutation. + NameSet collectFilesToSkip(const Block & updated_header, const std::set & indices_to_recalc, const String & mrk_extension) const; + + /// Get the columns list of the resulting part in the same order as all_columns. + NamesAndTypesList getColumnsForNewDataPart(MergeTreeData::DataPartPtr source_part, const Block & updated_header, NamesAndTypesList all_columns) const; + + +public : /** Is used to cancel all merges and mutations. On cancel() call all currently running actions will throw exception soon. * All new attempts to start a merge or mutation will throw an exception until all 'LockHolder' objects will be destroyed. */ diff --git a/dbms/src/Storages/MergeTree/MergeTreeReader.cpp b/dbms/src/Storages/MergeTree/MergeTreeReader.cpp index 7dc474dd948..22a33051d31 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeReader.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeReader.cpp @@ -2,7 +2,7 @@ #include #include #include -#include +#include #include #include diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index dacf299443d..8861e62867d 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -3369,7 +3369,6 @@ void StorageReplicatedMergeTree::alter( if (rc == Coordination::ZOK) { - queue.pullLogsToQueue(zookeeper); if (alter_entry->have_mutation) { /// Record in replication /log