From de0754ef0d98438da5e716fb328326ae2be34a34 Mon Sep 17 00:00:00 2001 From: alesapin Date: Mon, 30 Mar 2020 15:51:05 +0300 Subject: [PATCH] First working version --- dbms/src/Storages/AlterCommands.cpp | 8 +- .../Storages/MergeTree/IMergeTreeReader.cpp | 2 + dbms/src/Storages/MergeTree/MergeTreeData.cpp | 6 ++ .../MergeTree/MergeTreeDataMergerMutator.cpp | 81 ++++++++++++++++--- .../MergeTree/MergeTreeDataMergerMutator.h | 2 +- .../MergeTree/MergeTreeDataSelectExecutor.cpp | 2 + dbms/src/Storages/StorageMergeTree.cpp | 14 +++- 7 files changed, 99 insertions(+), 16 deletions(-) diff --git a/dbms/src/Storages/AlterCommands.cpp b/dbms/src/Storages/AlterCommands.cpp index f20a3f58382..afd30bdb067 100644 --- a/dbms/src/Storages/AlterCommands.cpp +++ b/dbms/src/Storages/AlterCommands.cpp @@ -533,7 +533,7 @@ bool AlterCommand::isRequireMutationStage(const StorageInMemoryMetadata & metada if (ignore) return false; - if (type == DROP_COLUMN || type == DROP_INDEX) + if (type == DROP_COLUMN || type == DROP_INDEX || type == RENAME_COLUMN) return true; if (type != MODIFY_COLUMN || data_type == nullptr) @@ -599,6 +599,12 @@ std::optional AlterCommand::tryConvertToMutationCommand(const S result.predicate = nullptr; } + else if (type == RENAME_COLUMN) + { + result.type = MutationCommand::Type::RENAME_COLUMN; + result.column_name = column_name; + result.rename_to = rename_to; + } result.ast = ast->clone(); return result; diff --git a/dbms/src/Storages/MergeTree/IMergeTreeReader.cpp b/dbms/src/Storages/MergeTree/IMergeTreeReader.cpp index 7ccbe71938c..58479f39bcb 100644 --- a/dbms/src/Storages/MergeTree/IMergeTreeReader.cpp +++ b/dbms/src/Storages/MergeTree/IMergeTreeReader.cpp @@ -32,6 +32,8 @@ IMergeTreeReader::IMergeTreeReader(const MergeTreeData::DataPartPtr & data_part_ , all_mark_ranges(all_mark_ranges_) , alter_conversions(storage.getAlterConversionsForPart(data_part)) { + LOG_DEBUG(&Poco::Logger::get("IMergeTreeReader"), "Columns to read:" << columns_.toString()); + LOG_DEBUG(&Poco::Logger::get("IMergeTreeReader"), "Columns in part:" << data_part_->getColumns().toString()); } IMergeTreeReader::~IMergeTreeReader() = default; diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index 7d07540741e..b2df5f7ed8f 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -469,6 +469,7 @@ void MergeTreeData::setProperties(const StorageInMemoryMetadata & metadata, bool if (!only_check) { + LOG_DEBUG(log, "SETTING UP COLUMNS:" << metadata.columns.toString()); setColumns(std::move(metadata.columns)); order_by_ast = metadata.order_by_ast; @@ -3595,8 +3596,13 @@ MergeTreeData::AlterConversions MergeTreeData::getAlterConversionsForPart(const AlterConversions result{}; for (const auto & command : commands) + { if (command.type == MutationCommand::Type::RENAME_COLUMN) + { result.rename_map[command.column_name] = command.rename_to; + LOG_DEBUG(log, "Add to rename map:" << command.column_name); + } + } return result; } diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 0c8c39b074c..00637dc5017 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -31,6 +31,8 @@ #include +#include + namespace ProfileEvents { extern const Event MergedRows; @@ -988,6 +990,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor splitMutationCommands(source_part, commands_for_part, for_interpreter, for_file_renames); + + LOG_DEBUG(&Poco::Logger::get("MergerMutator"), "COMMANDS FOR INTERPRETER:" << for_interpreter.size()); + LOG_DEBUG(&Poco::Logger::get("MergerMutator"), "COMMANDS FOR RENAMES:" << for_file_renames.size()); UInt64 watch_prev_elapsed = 0; MergeStageProgress stage_progress(1.0); @@ -1010,6 +1015,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor /// It shouldn't be changed by mutation. new_data_part->index_granularity_info = source_part->index_granularity_info; new_data_part->setColumns(getColumnsForNewDataPart(source_part, updated_header, all_columns, for_file_renames)); + LOG_DEBUG(&Poco::Logger::get("MergerMutator"), "New data part columns:" << new_data_part->getColumns().toString()); new_data_part->partition.assign(source_part->partition); auto disk = new_data_part->disk; @@ -1056,19 +1062,34 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor auto indices_to_recalc = getIndicesToRecalculate(in, storage_from_source_part, updated_header.getNamesAndTypesList(), context); NameSet files_to_skip = collectFilesToSkip(updated_header, indices_to_recalc, mrk_extension); - NameSet files_to_remove = collectFilesToRemove(source_part, for_file_renames, mrk_extension); + NameToNameMap files_to_rename = collectFilesForRenames(source_part, for_file_renames, mrk_extension); + + LOG_DEBUG(&Poco::Logger::get("MergerMutator"), "FILES RENAME MAP:" << files_to_rename.size()); if (need_remove_expired_values) files_to_skip.insert("ttl.txt"); /// Create hardlinks for unchanged files for (auto it = disk->iterateDirectory(source_part->getFullRelativePath()); it->isValid(); it->next()) { - if (files_to_skip.count(it->name()) || files_to_remove.count(it->name())) + if (files_to_skip.count(it->name())) continue; - String destination = new_part_tmp_path + "/" + it->name(); + String destination = new_part_tmp_path + "/"; + auto rename_it = files_to_rename.find(it->name()); + if (rename_it != files_to_rename.end()) + { + LOG_DEBUG(&Poco::Logger::get("MergerMutator"), "RENAME IT FOUND:" << rename_it->first << " to " << rename_it->second); + if (rename_it->second.empty()) + continue; + destination += rename_it->second; + } + else + { + destination += it->name(); + } + LOG_DEBUG(&Poco::Logger::get("MergerMutator"), "HARDLINKING FROM:" << it->path() << " TO " << destination); disk->createHardLink(it->path(), destination); } @@ -1090,9 +1111,19 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor need_remove_expired_values); } - for (const String & removed_file : files_to_remove) - if (new_data_part->checksums.files.count(removed_file)) - new_data_part->checksums.files.erase(removed_file); + for (const auto & [rename_from, rename_to] : files_to_rename) + { + if (rename_to.empty() && new_data_part->checksums.files.count(rename_from)) + { + new_data_part->checksums.files.erase(rename_from); + } + else if (new_data_part->checksums.files.count(rename_from)) + { + new_data_part->checksums.files[rename_to] = new_data_part->checksums.files[rename_from]; + + new_data_part->checksums.files.erase(rename_from); + } + } finalizeMutatedPart(source_part, new_data_part, need_remove_expired_values); } @@ -1262,7 +1293,7 @@ void MergeTreeDataMergerMutator::splitMutationCommands( } -NameSet MergeTreeDataMergerMutator::collectFilesToRemove( +NameToNameMap MergeTreeDataMergerMutator::collectFilesForRenames( MergeTreeData::DataPartPtr source_part, const MutationCommands & commands_for_removes, const String & mrk_extension) { /// Collect counts for shared streams of different columns. As an example, Nested columns have shared stream with array sizes. @@ -1277,14 +1308,14 @@ NameSet MergeTreeDataMergerMutator::collectFilesToRemove( {}); } - NameSet remove_files; + NameToNameMap rename_map; /// Remove old indices for (const auto & command : commands_for_removes) { if (command.type == MutationCommand::Type::DROP_INDEX) { - remove_files.emplace("skp_idx_" + command.column_name + ".idx"); - remove_files.emplace("skp_idx_" + command.column_name + mrk_extension); + rename_map.emplace("skp_idx_" + command.column_name + ".idx", ""); + rename_map.emplace("skp_idx_" + command.column_name + mrk_extension, ""); } else if (command.type == MutationCommand::Type::DROP_COLUMN) { @@ -1294,8 +1325,8 @@ NameSet MergeTreeDataMergerMutator::collectFilesToRemove( /// Delete files if they are no longer shared with another column. if (--stream_counts[stream_name] == 0) { - remove_files.emplace(stream_name + ".bin"); - remove_files.emplace(stream_name + mrk_extension); + rename_map.emplace(stream_name + ".bin", ""); + rename_map.emplace(stream_name + mrk_extension, ""); } }; @@ -1304,9 +1335,25 @@ NameSet MergeTreeDataMergerMutator::collectFilesToRemove( if (column) column->type->enumerateStreams(callback, stream_path); } + else if (command.type == MutationCommand::Type::RENAME_COLUMN) + { + LOG_DEBUG(&Poco::Logger::get("collectFilesForRenames"), "Has mutation command"); + IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path) + { + String stream_from = IDataType::getFileNameForStream(command.column_name, substream_path); + + String stream_to = boost::replace_first_copy(stream_from, command.column_name, command.rename_to); + rename_map.emplace(stream_from + ".bin", stream_to + ".bin"); + rename_map.emplace(stream_from + mrk_extension, stream_to + mrk_extension); + }; + IDataType::SubstreamPath stream_path; + auto column = source_part->getColumns().tryGetByName(command.column_name); + if (column) + column->type->enumerateStreams(callback, stream_path); + } } - return remove_files; + return rename_map; } NameSet MergeTreeDataMergerMutator::collectFilesToSkip( @@ -1344,15 +1391,19 @@ NamesAndTypesList MergeTreeDataMergerMutator::getColumnsForNewDataPart( const MutationCommands & commands_for_removes) { NameSet removed_columns; + NameToNameMap renamed_columns; for (const auto & command : commands_for_removes) { if (command.type == MutationCommand::DROP_COLUMN) removed_columns.insert(command.column_name); + if (command.type == MutationCommand::RENAME_COLUMN) + renamed_columns.emplace(command.rename_to, command.column_name); } Names source_column_names = source_part->getColumns().getNames(); NameSet source_columns_name_set(source_column_names.begin(), source_column_names.end()); for (auto it = all_columns.begin(); it != all_columns.end();) { + LOG_DEBUG(&Poco::Logger::get("getColumnsForNewDataPart"), "Looking at column:" << it->name); if (updated_header.has(it->name)) { auto updated_type = updated_header.getByName(it->name).type; @@ -1364,6 +1415,10 @@ NamesAndTypesList MergeTreeDataMergerMutator::getColumnsForNewDataPart( { ++it; } + else if (renamed_columns.count(it->name) && source_columns_name_set.count(renamed_columns[it->name])) + { + ++it; + } else it = all_columns.erase(it); } diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h index 3d41ceee990..b24b56a4780 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h @@ -147,7 +147,7 @@ private: /// Apply commands to source_part i.e. remove some columns in source_part /// and return set of files, that have to be removed from filesystem and checksums - static NameSet collectFilesToRemove(MergeTreeData::DataPartPtr source_part, const MutationCommands & commands_for_removes, const String & mrk_extension); + static NameToNameMap collectFilesForRenames(MergeTreeData::DataPartPtr source_part, const MutationCommands & commands_for_removes, const String & mrk_extension); /// Files, that we don't need to remove and don't need to hardlink, for example columns.txt and checksums.txt. /// Because we will generate new versions of them after we perform mutation. diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 629a2b2cc18..9bb7db3ea11 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -183,6 +183,7 @@ Pipes MergeTreeDataSelectExecutor::readFromParts( for (const String & name : column_names_to_return) { + LOG_DEBUG(log, "Column name to return:" << name); if (name == "_part") { part_column_queried = true; @@ -209,6 +210,7 @@ Pipes MergeTreeDataSelectExecutor::readFromParts( NamesAndTypesList available_real_columns = data.getColumns().getAllPhysical(); + LOG_DEBUG(log, "Available columns:" << available_real_columns.toString()); /// If there are only virtual columns in the query, you must request at least one non-virtual one. if (real_column_names.empty()) real_column_names.push_back(ExpressionActions::getSmallestColumn(available_real_columns)); diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index 79079aa3095..d6a7fe376f4 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -213,11 +213,14 @@ void StorageMergeTree::alter( StorageInMemoryMetadata metadata = getInMemoryMetadata(); auto maybe_mutation_commands = commands.getMutationCommands(metadata); + LOG_DEBUG(log, "Applying commands"); commands.apply(metadata); + LOG_DEBUG(log, "Commands applied"); /// This alter can be performed at metadata level only if (commands.isSettingsAlter()) { + LOG_DEBUG(log, "Settings alter"); lockStructureExclusively(table_lock_holder, context.getCurrentQueryId()); changeSettings(metadata.settings_ast, table_lock_holder); @@ -226,15 +229,18 @@ void StorageMergeTree::alter( } else { + LOG_DEBUG(log, "Not settings alter"); lockStructureExclusively(table_lock_holder, context.getCurrentQueryId()); changeSettings(metadata.settings_ast, table_lock_holder); /// Reinitialize primary key because primary key column types might have changed. setProperties(metadata); + LOG_DEBUG(log, "Metadata setup"); setTTLExpressions(metadata.columns.getColumnTTLs(), metadata.ttl_for_table_ast); DatabaseCatalog::instance().getDatabase(table_id.database_name)->alterTable(context, table_id.table_name, metadata); + LOG_DEBUG(log, "Data on disk changed"); /// We release all locks except alter_lock which allows /// to execute alter queries sequentially @@ -683,10 +689,16 @@ bool StorageMergeTree::tryMutatePart() MutationCommands commands_for_size_validation; for (const auto & command : it->second.commands) { - if (command.type != MutationCommand::Type::DROP_COLUMN && command.type != MutationCommand::Type::DROP_INDEX) + if (command.type != MutationCommand::Type::DROP_COLUMN + && command.type != MutationCommand::Type::DROP_INDEX + && command.type != MutationCommand::Type::RENAME_COLUMN) + { commands_for_size_validation.push_back(command); + } else + { commands_size += command.ast->size(); + } } if (!commands_for_size_validation.empty())